DEV Community

Alexander
Alexander

Posted on • Edited on

Tutorial: Efficient Canister Networking With ic-event-hub

This tutorial is dedicated to Rust smart-contract (canister) development on Internet Computer (Dfinity) platform. Completing it, you’ll know how to use ic-event-hub library APIs in order to perform efficient cross-canister integrations.

Before digging into this tutorial it is recommended to learn the basics of smart-contract development for the Internet Computer. Here are some good starting points: official website, dedicated to canister development on the IC; IC developer forum, where one could find an answer to almost any technical question.

Complete code of this tutorial is here.

Motivation

Smart-contracts are not cheap. Running a simple "hello world" smart-contract is at least 10 times more expensive on the IC than running an similar "hello world" service on the AWS. And there is actually a very simple reason why this will always be like that - infrastructure costs.

When you run a "hello world" service on a single EC2 AWS replica you waste only a little of the resource (memory, processing power and bandwidth). All of this happens on a single replica (it doesn't matter if the replica is virtual or not).

When you run the same exact service on the IC, you're using the same amount of the resource, but (!) your program is processed by the whole subnet (7 replicas by now). Moreover it is not only processed simultaneously on all these replicas, but on top of that each message to your program goes through the consensus protocol (which under the hood hides much more additional replica interactions).

This means that it is just wrong to compare computation costs between the IC and AWS without taking additional infrastructure costs into an account. In order to make such a comparison, you have to imagine your program running not on a single EC2 AWS instance, but on seven EC2 instances with the help of some kind of consensus protocol (e.g. Kafka) instead. This way your AWS setup will (roughly) provide the same set of security guarantees and is eligible for comparison with the IC.

Smart-contract developers on the IC understand that "time is money" like nobody else. The more efficient code you write, the more money you'll save. Developers now are basically financially motivated to build faster programs, otherwise they just won't be able to sustain their services. Code optimizations and frequent code refactoring are now make much more sense from the business point of view - they cut the losses.

Efficient code -> less money lost -> more money in your pocket. Simple as that.

ic-event-hub helps with cycles saving a lot, by providing a special event-batching functionality. Basically it lets your canister accumulate messages you want to send to other canisters and then send them all together in a single batch.

How does it help with saving cycles? Let's look at this table from the Dfinity's website. We're particularly interested in these two rows:

  • Xnet Call - 260,000 cycles
  • Xnet Byte Transmission - 1,000 cycles

It means that it is much cheaper to send one 1000 bytes message, then to send a thousand of 1 byte messages, because for each message sent to another canister there is a fee taken.

Sending one 1000 bytes message would cost you (260,000 + (1,000 * 1,000)) * 1 = 1260000 cycles, while sending a thousand of 1 byte messages would cost you (260,000 + 1,000) * 1000 = 261000000 cycles, 200 times more expensive.

So, if you find yourself in a situation when your canister sends lots of small (under 100KB) messages to other canisters, it is better for you to find a tool that will optimize this process so you could save cycles. ic-event-hub library is exactly that tool.

In this tutorial, we'll go through an example of such a situation and learn how to use ic-event-hub in order to fix it.

Task definition

Imagine the following task: we want data supplied by users to be available on multiple canisters.

Naive solution is to make users to send the same piece of data to multiple canisters. This won't work, because there is no easy way to fight against malicious users.

It means that we're left with the only option - to make users send data to one canister and to make this canister to re-transmit this data to all other canisters. The only problem with this solution is that it is expensive - for each data piece sent by user we send N (where N is canisters count) more messages to propagate this piece of data to other canisters. Let's use ic-event-hub to solve this problem.

Implementation

Let's start with the file system layout:

project/
    canisters/
        emitter/     // a canister that will accept user data
            actor.rs
            build.sh
            can.did
            cargo.toml
        listener/   // a canister that will receive re-transmited user-data
            actor.rs
            build.sh
            can.did
            cargo.toml
    e2e-test/       // typescript-based test suite to check our logic
        src/
            example.spec.ts
            deploy.ts
        dfx.json
        package.json
        tsconfig.json
Enter fullscreen mode Exit fullscreen mode

As you can see, we have two canisters: an emitter and a listener, as well as a typescript project with tests. In this tutorial, instead of using terminal and dfx, we'll use typescript for deployment and testing.

Emitter canister

This canister will receive data from users and re-transmit it to the listener canister in form of events.

Dependencies

# project/emitter/cargo.toml

[package]
name = "emitter"
version = "0.1.0"
edition = "2018"

[lib]
crate-type = ["cdylib"]
path = "actor.rs"

[dependencies]
ic-cdk = "0.4.0"
ic-cdk-macros = "0.4.0"
candid = "0.7.12"
serde = "1.0.126"
ic-event-hub = "0.3.0"
ic-event-hub-macros = "0.3.0"
Enter fullscreen mode Exit fullscreen mode

Buildscript

# project/emitter/build.sh

#!/usr/bin/env bash

# this part makes it possible to execute this script from another directory
SCRIPT=$(readlink -f "$0")
SCRIPTPATH=$(dirname "$SCRIPT")
cd "$SCRIPTPATH" || exit

cargo build --target wasm32-unknown-unknown --release --package emitter && \
 ic-cdk-optimizer ./target/wasm32-unknown-unknown/release/emitter.wasm -o ./target/wasm32-unknown-unknown/release/emitter-opt.wasm
Enter fullscreen mode Exit fullscreen mode

Canister logic

To make it more representative, let's make our canisters count how many pieces of data they process and how many inter-canister messages does it take.

Emitter canister is the one that receives data pieces and re-transmits them. One data piece - one request. Let's define its state and the init() function:

// project/emitter/actor.rs

#[derive(Default)]
pub struct RequestCounter {
    pub counter: u64,
}

static mut STATE: Option<RequestCounter> = None;

pub fn get_state() -> &'static mut RequestCounter {
    unsafe { STATE.as_mut().unwrap() }
}

#[init]
fn init() {
    unsafe {
        STATE = Some(RequestCounter::default());
    }
}
Enter fullscreen mode Exit fullscreen mode

As you can see, it's pretty trivial. We only store requests count inside the state. Let's then define everything we need for ic-event-hub in order to work:

// project/emitter/actor.rs

#[derive(Event)]
pub struct MirrorEvent {
    pub data: Vec<u8>,
}

implement_event_emitter!(1_000_000_000 * 20, 500 * 1024);
Enter fullscreen mode Exit fullscreen mode

We have a simple event MirrorEvent that contains some abstract data supplied by a user - nothing interesting. But let's have a closer look at invoked macros.

implement_event_emitter macro initializes ic-event-hub's state (enabling us to call functions like emit() and send_events()) with some arguments which define event-batching behavior. The first one is the maximum batch forming time. The second one is the maximum batch size in bytes.

These arguments will work the following way:

  1. For each event listener there is a separate "forming batch" queue.
  2. Once an event is emitted, this queue is checked.
  3. If the size of this queue, after adding the new event to it, exceeds the max batch size (the second argument) this queue is transformed into a batch and is pushed into special "ready-to-send" queue. The new "forming batch" queue is created and the event is placed there instead.
  4. If the size of this queue, after adding the new event to it, is less than max batch size, then this new event is just gets added to the queue.
  5. Once each heartbeat each such "forming batch" queue is checked for it's creation timestamp.
  6. If it was created more than maximum batch forming time ago, this queue is transformed into a batch and is pushed into "ready-to-send" queue.

Yes, it sounds complicated, but it is simple in practice. In our case we did set these arguments like:

  • maximum batch forming time = 20 seconds;
  • maximum batch size = 500 KB.

This means, that if our canister is under heavy load (emits hundreds of events per second), ic-event-hub guarantees that each batch will never exceed 500 KB in size. This is a useful restriction, because message size is limited in the IC (currently 2MB per message).

On the other hand, if our canister is idle most of the time (emits only a couple of events per hour), ic-event-hub guarantees that these events will reach their destination in no more than 20 seconds.

There are two more macro invocations we have to do here:

// project/emitter/actor.rs

implement_subscribe!();
implement_unsubscribe!();
Enter fullscreen mode Exit fullscreen mode

These macros create two more update functions in your canister: subscribe() and unsubscribe().

You don't have to list them in your .did file.
These update functions let listener canisters to subscribe to events. We'll see how it's done a little later.
By the way. You can supply a guard-function into these macros, like implement_subscribe!(guard = "guard_fn_name"). This will enable you to deny any unauthorized subscription, if you want such a functionality.

Last, but not least, we want to enable event sending. ic-event-hub is designed to sent events during heartbeats, so here is how we should do it:

// project/emitter/actor.rs

#[heartbeat]
pub fn tick() {
    send_events();
}
Enter fullscreen mode Exit fullscreen mode

All that's left to do is to write an update function that would receive users' data and emit it in form of events:

// project/emitter/actor.rs

#[update]
fn mirror(data: Vec<u8>) {
    get_state().counter += 1;

    emit(MirrorEvent { data });
}

#[query]
fn get_requests_count() -> u64 {
    get_state().counter
}
Enter fullscreen mode Exit fullscreen mode

As you might notice there is also a counter increment for each update function call as well as a query function that returns current counter's value.

That's it for the emitter canister. The complete project/emitter/actor.rs file should look like this:

// project/emitter/actor.rs

// ------------- MAIN LOGIC -------------------

#[update]
fn mirror(data: Vec<u8>) {
    get_state().counter += 1;

    emit(MirrorEvent { data });
}

#[query]
fn get_requests_count() -> u64 {
    get_state().counter
}

// ------------------ EVENT HUB ------------------

#[derive(Event)]
pub struct MirrorEvent {
    pub data: Vec<u8>,
}

implement_event_emitter!(1_000_000_000 * 20, 500 * 1024);
implement_subscribe!();
implement_unsubscribe!();

#[heartbeat]
pub fn tick() {
    send_events();
}

// ------------------ STATE ----------------------

#[derive(Default)]
pub struct RequestCounter {
    pub counter: u64,
}

static mut STATE: Option<RequestCounter> = None;

pub fn get_state() -> &'static mut RequestCounter {
    unsafe { STATE.as_mut().unwrap() }
}

#[init]
fn init() {
    unsafe {
        STATE = Some(RequestCounter::default());
    }
}
Enter fullscreen mode Exit fullscreen mode

Candid interface

// project/emitter/can.did

service : {
    "mirror" : (blob) -> ();
    "get_requests_count" : () -> (nat64) query;
}
Enter fullscreen mode Exit fullscreen mode

Listener canister

This canister will receive re-transmitted data from the emitter canister in form of event batches.

Dependencies

# project/listener/cargo.toml

[package]
name = "listener"
version = "0.1.0"
edition = "2018"

[lib]
crate-type = ["cdylib"]
path = "actor.rs"

[dependencies]
ic-cdk = "0.4.0"
ic-cdk-macros = "0.4.0"
candid = "0.7.12"
serde = "1.0.126"
ic-event-hub = "0.3.0"
ic-event-hub-macros = "0.3.0"
Enter fullscreen mode Exit fullscreen mode

Buildscript

# project/listener/build.sh

#!/usr/bin/env bash

SCRIPT=$(readlink -f "$0")
SCRIPTPATH=$(dirname "$SCRIPT")
cd "$SCRIPTPATH" || exit

cargo build --target wasm32-unknown-unknown --release --package listener && \
 ic-cdk-optimizer ./target/wasm32-unknown-unknown/release/listener.wasm -o ./target/wasm32-unknown-unknown/release/listener-opt.wasm
Enter fullscreen mode Exit fullscreen mode

Canister logic

Let's start with the state of this canister. It have to store the principal of the emitter canister and we want to count how many events and batches this canister would receive:

// project/listener/actor.rs

pub struct RequestCounterMirror {
    pub emitter_canister_id: Principal,
    pub events_received: u64,
    pub batches_received: u64,
}

static mut STATE: Option<RequestCounterMirror> = None;

pub fn get_state() -> &'static mut RequestCounterMirror {
    unsafe { STATE.as_mut().unwrap() }
}

#[init]
fn init(emitter_canister_id: Principal) {
    unsafe {
        STATE = Some(RequestCounterMirror {
            emitter_canister_id,
            events_received: 0,
            batches_received: 0,
        });
    }
}
Enter fullscreen mode Exit fullscreen mode

The code is pretty straightforward. Please notice that we pass the principal of the emitter canister into init() function.

The listener canister needs to define the type of events it will receive (you can import it from a shared library or just redefine in place):

// project/listener/actor.rs

#[derive(Event, Debug)]
pub struct MirrorEvent {
    pub data: Vec<u8>,
}
Enter fullscreen mode Exit fullscreen mode

In order to start receiving events from the emitter canister, listener canister has to subscribe() to them. This is done the following way:

// project/listener/actor.rs

#[update]
async fn start_listening() {
    get_state()
        .emitter_canister_id
        .subscribe(SubscribeRequest {
            callbacks: vec![CallbackInfo {
                filter: EventFilter::empty(),
                method_name: String::from("events_callback"),
            }],
        })
        .await
        .ok()
        .unwrap();
}
Enter fullscreen mode Exit fullscreen mode

In the perfect world we would make this subscribe() call from the init() function, but we can't. There is a workaround for this, but we won't use it to keep it simple here.

We're using IEventEmitter trait here in order to make a subscribe() call by simply dotting emitter canister's principal. Come take a look at this trait if you want to make your code cleaner, while calling remote canisters.

Basically, by making this call, the listener canister tells the emitter canister "Hey there! Would you kindly send all events you emit to the 'events_callback' method of mine?".

Let's define this events_callback function. It is important to say that this function should follow some rules to work properly:

  • it should be an update function;
  • it should be named exactly like you've registered it sending a subscribe() request;
  • it should have only a single argument of type Vec<Event>;
  • it should have no return value (you can return something, but it will be ignored anyway). It is also important to note that this function will be executed once for each received event batch:
// project/listener/actor.rs

#[update]
fn events_callback(events: Vec<Event>) {
    get_state().batches_received += 1;

    for event in events {
        if event.get_name().as_str() == "MirrorEvent" {
            let ev: MirrorEvent = MirrorEvent::from_event(event);
            print(format!("Got event: {:?}", ev).as_str());

            get_state().events_received += 1;
        }
    }
}
Enter fullscreen mode Exit fullscreen mode

This function is pretty simple. For each event in a batch, we're checking for an event's name and if it matches "MirrorEvent" (the name of our event structure), we're decoding a MirrorEvent structure from the generic event that we've received and print this event.
During this function we're also updating the counters.

All that's left to do is to define query functions to be able to read counter values:

// project/listener/actor.rs

#[query]
fn get_events_received() -> u64 {
    get_state().events_received
}

#[query]
fn get_batches_received() -> u64 {
    get_state().batches_received
}
Enter fullscreen mode Exit fullscreen mode

The complete project/listener/actor.rs file should look like this:

// project/listener/actor.rs


// ------------- MAIN LOGIC -------------------

#[query]
fn get_events_received() -> u64 {
    get_state().events_received
}

#[query]
fn get_batches_received() -> u64 {
    get_state().batches_received
}

// ----------------- EVENT HUB ----------------------

#[derive(Event, Debug)]
pub struct MirrorEvent {
    pub data: Vec<u8>,
}

#[update]
async fn start_listening() {
    get_state()
        .emitter_canister_id
        .subscribe(SubscribeRequest {
            callbacks: vec![CallbackInfo {
                filter: EventFilter::empty(),
                method_name: String::from("events_callback"),
            }],
        })
        .await
        .ok()
        .unwrap();
}

#[update]
fn events_callback(events: Vec<Event>) {
    get_state().batches_received += 1;

    for event in events {
        if event.get_name().as_str() == "MirrorEvent" {
            let ev: MirrorEvent = MirrorEvent::from_event(event);
            print(format!("Got event: {:?}", ev).as_str());

            get_state().events_received += 1;
        }
    }
}

// ------------------ STATE ----------------------

pub struct RequestCounterMirror {
    pub emitter_canister_id: Principal,
    pub events_received: u64,
    pub batches_received: u64,
}

static mut STATE: Option<RequestCounterMirror> = None;

pub fn get_state() -> &'static mut RequestCounterMirror {
    unsafe { STATE.as_mut().unwrap() }
}

#[init]
fn init(emitter_canister_id: Principal) {
    unsafe {
        STATE = Some(RequestCounterMirror {
            emitter_canister_id,
            events_received: 0,
            batches_received: 0,
        });
    }
}
Enter fullscreen mode Exit fullscreen mode

Candid interface

// project/listener/can.did

service : (principal) -> {
    "start_listening" : () -> ();
    "get_events_received" : () -> (nat64) query;
    "get_batches_received" : () -> (nat64) query;
}
Enter fullscreen mode Exit fullscreen mode

Tests

Now let's write a typescript test in order to check if everything works as expected.

Dependencies

// e2e-test/package.json

{
  "name": "e2e-tests",
  "version": "1.0.0",
  "main": "index.js",
  "license": "MIT",
  "scripts": {
    "start": "dfx start --clean",
    "build": "dfx build --check",
    "test": "ts-mocha --paths --timeout 1000000 src/**/*.spec.ts"
  },
  "devDependencies": {
    "@dfinity/agent": "^0.9.2",
    "@dfinity/candid": "^0.9.2",
    "@dfinity/identity": "^0.9.2",
    "@dfinity/principal": "^0.9.2",
    "@types/chai": "^4.2.21",
    "@types/mocha": "^9.0.0",
    "@types/node": "^16.4.3",
    "@types/node-fetch": "^2.5.12",
    "chai": "^4.3.4",
    "mocha": "^9.0.3",
    "node-fetch": "^2.6.1",
    "ts-mocha": "^8.0.0",
    "ts-node": "^10.1.0",
    "tsconfig-paths": "^3.10.1",
    "typescript": "^4.3.5"
  }
}
Enter fullscreen mode Exit fullscreen mode

Typescript config

// e2e-test/tsconfig.json

{
  "compilerOptions": {
    "target": "ESNext",
    "module": "commonjs",
    "outDir": "./dist",
    "strict": true,
    "moduleResolution": "node",
    "baseUrl": "./",
    "paths": {
      "dfx/*": [
        ".dfx/local/canisters/*.did.js"
      ],
      "dfx-type/*": [
        ".dfx/local/canisters/*.did.d.ts"
      ]
    },
    "allowJs": true,
    "allowSyntheticDefaultImports": true,
    "esModuleInterop": true,
    "preserveSymlinks": true,
    "skipLibCheck": true,
    "forceConsistentCasingInFileNames": true
  }
}
Enter fullscreen mode Exit fullscreen mode

Notice how the paths section is arranged.

DFX config

// e2e-test/dfx.json

{
  "canisters": {
    "emitter": {
      "build": "../canisters/emitter/build.sh",
      "candid": "../canisters/emitter/can.did",
      "wasm": "../canisters/emitter/target/wasm32-unknown-unknown/release/emitter-opt.wasm",
      "type": "custom"
    },
    "listener": {
      "build": "../canisters/listener/build.sh",
      "candid": "../canisters/listener/can.did",
      "wasm": "../canisters/listener/target/wasm32-unknown-unknown/release/listener-opt.wasm",
      "type": "custom"
    }
  },
  "dfx": "0.9.2",
  "networks": {
    "local": {
      "bind": "127.0.0.1:8000",
      "type": "ephemeral"
    }
  },
  "version": 1
}
Enter fullscreen mode Exit fullscreen mode

Canister deployment with typescript

So, we want to have a function that would deploy our canisters, instantiate actor objects with the correct IDLs and return them back for us to use. In order to implement this function, we first need to implement another one, that will create a new canister, install the provided wasm-module on that canister and return an actor object with the provided interface. Let's script that function:

// e2e-test/src/deploy.ts

export async function deployCanister<T>(name: string, arg: number[], agent: HttpAgent): Promise<{ actor: T, canisterId: Principal }> {
    const managementCanister = getManagementCanister({agent});
    const {canister_id} = await managementCanister.provisional_create_canister_with_cycles({amount: [], settings: []});
    const wasm = fs.readFileSync(`.dfx/local/canisters/${name}/${name}.wasm`);
    const {idlFactory} = await import(`dfx/${name}/${name}`)

    await managementCanister.install_code({
        canister_id,
        mode: {[CanisterInstallMode.Install]: null},
        wasm_module: [...wasm],
        arg
    });

    console.log(`Canister ${name} ${canister_id} deployed`);

    return {
        actor: Actor.createActor(idlFactory, {
            agent,
            canisterId: canister_id
        }),
        canisterId: canister_id
    };
}
Enter fullscreen mode Exit fullscreen mode

Let's now define a data type we want to receive back, when deployment is over. We want to receive back at least a HttpAgent and an of actor object for each of both: the emitter canister and the listener canister:

// e2e-test/src/deploy.ts

import {_SERVICE as IEmitterService} from 'dfx-type/emitter/emitter';
import {_SERVICE as IListenerService} from 'dfx-type/listener/listener';

export interface ISetup {
    agent: HttpAgent;
    emitterService: IEmitterService;
    listenerService: IListenerService;
}
Enter fullscreen mode Exit fullscreen mode

Notice how clean our imports look, thanks to tsconfig-paths package.

Now we can finally write that function that will do all the work for us:

// e2e-test/src/deploy.ts

import fetch from "node-fetch";

export async function setup(identity: Identity): Promise<ISetup> {
    const agent = new HttpAgent({
        host: 'http://localhost:8000/',
        // @ts-ignore
        fetch,
        identity,
    });
    await agent.fetchRootKey();

    const {
        actor: emitterService,
        canisterId: emitterCanisterId
    } = await deployCanister<IEmitterService>('emitter', [], agent);

    const {
        actor: listenerService
    } = await deployCanister<IListenerService>('listener', [...IDL.encode([IDL.Principal], [emitterCanisterId])], agent);

    return {
        agent,
        emitterService,
        listenerService,
    };
}
Enter fullscreen mode Exit fullscreen mode

Now we can use this function inside tests any possible way we like. One could imagine calling this function from beforeEach callback, to have a clean canister state before each test.

Testing

We're almost there. Now we only left to implement the test-case itself:

// e2e-test/src/example.spec.ts

describe('event batching', () => {
    let s: ISetup;

    before(async () => {
        s = await setup(Ed25519KeyIdentity.generate());
    });

    it("flow works fine", async () => {
        // this listener should catch all events
        await s.listenerService.start_listening();

        // checking before
        const emitterRequestsBefore = await s.emitterService.get_requests_count();
        assert.equal(emitterRequestsBefore, 0n, "Emitter state should be clean before everything");

        const listenerEventsBefore = await s.listenerService.get_events_received();
        const listenerBatchesBefore = await s.listenerService.get_batches_received();
        assert.equal(listenerEventsBefore, 0n, "Listener events state should be clean before everything");
        assert.equal(listenerBatchesBefore, 0n, "Listener batches state should be clean before everything");

        // sending 10 events each of 100 bytes of data
        for (let i = 0; i < 10; i++) {
            await s.emitterService.mirror(Array(100).fill(1));
        }

        // it should send all events in one batch
        const emitterRequestsAfter = await s.emitterService.get_requests_count();
        assert.equal(emitterRequestsAfter, 10n, "Emitter requests count should equal 10");

        // waiting for at least 10 seconds
        await delay(10_000);

        const listenerEventsAfter = await s.listenerService.get_events_received();
        assert.equal(listenerEventsAfter, 10n, "Listener events count should be equal to 10");

        const listenerBatchesAfter = await s.listenerService.get_batches_received();
        assert.equal(listenerBatchesAfter, 1n, "Listener batches count should be equal to 1");
    });
});
Enter fullscreen mode Exit fullscreen mode

The test-case is pretty simple. First we check if the state of both canisters is clean. Then we call emitter.mirror() method 10 times. Then we wait for some time and check if:

  • the emitter received exactly 10 requests;
  • the listener received exactly 10 events in a single batch.

By the way, delay() function, that just asynchronously waits for some provided amount of time, is defined like this:

function delay(ms: number) {
    return new Promise(resolve => setTimeout(resolve, ms));
}
Enter fullscreen mode Exit fullscreen mode

Trying it out

Now let's run our test-case. Start a local IC replica:

yarn start
Enter fullscreen mode Exit fullscreen mode

Then (in a separate terminal window) build the project:

yarn build
Enter fullscreen mode Exit fullscreen mode

Then run tests:

yarn test
Enter fullscreen mode Exit fullscreen mode

This last command should deploy a new pair of canisters and run our test-case against them.

Afterword

In this tutorial we used ic-event-hub library in order to set an efficient communication channel between two canisters. It's important to mention, that this pattern could be easily applied to more complex scenarios with tens (or even hundreds) of interacting canisters.

Complete code of this tutorial can be found here.

Thanks for reading!

Top comments (0)