DEV Community

Jan Schulte for Outshift By Cisco

Posted on • Edited on

How to Build a Simple Kafka Producer/Consumer Application in Rust

Thought about using Kafka? This blog post will walk you through the
steps necessary to bootstrap your Rust producer and consumer application
with a batteries-included local dev setup featuring VSCode Dev
Containers, Kafka, and Zookeeper.

In this tutorial, we will build a simple Rust application consisting of
a producer and consumer working with search data from Hacker News.

Step 0: Prerequisites

  • Rust 1.66 or later
  • VSCode
  • Docker

Step 1: Setup a new Rust Project

We will leverage Cargo's workspace
feature

in this example. A workspace contains more than one application or
library but can be compiled from the top-level directory.

Let's get started. Let's create our project directory that will contain
all source code.

$ mkdir kafka_hn_processing
$ cd kafka_hn_processing
Enter fullscreen mode Exit fullscreen mode

Once we're in the new directory, let's set up our workspace:

$ cat > Cargo.toml
[workspace]
members = [
    "producer"
]
Enter fullscreen mode Exit fullscreen mode

For now, this Cargo.toml only references a producer application which
we will create in the next step.

$ cargo new producer
$ cd producer
Enter fullscreen mode Exit fullscreen mode

To make this application work, we need to install a few dependencies. We
use cargo add for this (Note: cargo add requires at least Rust
1.62). cargo add downloads the dependency and adds it to the project's
Cargo.toml.

$ cargo add reqwest --features=json -p producer 
$ cargo add tokio -p producer --features=full
$ cargo add serde --features=derive -p producer
$ cargo add serde_json  -p producer
$ cargo add urlencoding -p producer
Enter fullscreen mode Exit fullscreen mode

We're installing:

  • reqwest - To perform HTTP Requests
  • tokio - For async support
  • serde - Serialization/Deserialization
  • serde_json To serialize and deserialize JSON
  • urlencoding To url-encode parameters

Step 2: Setup a Dev Container

Before we start writing code, we need to set up our Docker environment.
In the project's root directory, create a docker-compose.yml:

---
 version: '3.8'
 services:
   rust-log-processing:
     image: mcr.microsoft.com/devcontainers/rust:0-1-bullseye
     volumes:
       - ..:/workspaces:cached
     cap_add:
       - SYS_PTRACE
     security_opt:
       - seccomp:unconfined
     command: /bin/sh -c "while sleep 1000; do :; done"
   zookeeper:
     image: confluentinc/cp-zookeeper:7.3.0
     container_name: zookeeper
     environment:
       ZOOKEEPER_CLIENT_PORT: 2181
       ZOOKEEPER_TICK_TIME: 2000
   broker:
     image: confluentinc/cp-kafka:7.3.0
     container_name: broker
     ports:
     # To learn about configuring Kafka for access across networks see
     # https://www.confluent.io/blog/kafka-client-cannot-connect-to-broker-on-aws-on-docker-etc/
       - "9092:9092"
     depends_on:
       - zookeeper
     environment:
       KAFKA_BROKER_ID: 1
       KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
       KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_INTERNAL:PLAINTEXT
       KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092,PLAINTEXT_INTERNAL://broker:29092
       KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
       KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
       KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
Enter fullscreen mode Exit fullscreen mode

(Find a detailed explanation of this docker-compose.yml file
here.)

We won't run Rust code directly on the local machine but in a Docker
container instead. VSCode provides us with the
devcontainer
feature, which allows us to run our workspace code in a Docker
container.

To get started with a Dev Container, we need a .devcontainer folder
for our VSCode settings. Create .devcontainer on the root level.
Folder structure:

$ mkdir .devcontainer
$ ls -l
 drwxr-xr-x     - user  7 Feb 13:45 .devcontainer
 drwxr-xr-x     - user  7 Feb 13:51 .git
 .rw-r--r--     8 user  7 Feb 10:52 .gitignore
 .rw-r--r--  9.6k user  7 Feb 13:53 Cargo.lock
 .rw-r--r--   199 user  7 Feb 13:53 Cargo.toml
 .rw-r--r--  1.2k user  7 Feb 13:46 docker-compose.yml
 drwxr-xr-x     - user  7 Feb 10:52 src
 drwxr-xr-x@    - user  7 Feb 10:52 target
Enter fullscreen mode Exit fullscreen mode

Then, inside .devcontainer/, create the
.devcontainer/devcontainer.json config file:

// For format details, see https://aka.ms/devcontainer.json. For config options, see the
// README at: https://github.com/devcontainers/templates/tree/main/src/rust
{
  "name": "Rust",
  "service": "rust-log-processing",
  "dockerComposeFile": "../docker-compose.yml",
  "features": {
    "ghcr.io/devcontainers/features/rust:1": {}
  },
  "workspaceFolder": "/workspaces/${localWorkspaceFolderBasename}",
  "shutdownAction": "stopCompose"
}
Enter fullscreen mode Exit fullscreen mode

Our configuration is more advanced because we also leverage Docker
Compose. By default, VSCode runs everything in a single container. Since
we need Kafka to test our code, we let it know to take our
docker-compose.yml into account.

With these files in place, we're ready to start writing code. Open the
project in VSCode:

$ code .
Enter fullscreen mode Exit fullscreen mode

Important: Make sure you open the project In Container. VSCode
usually prompts you for this, but if not, click the little icon in the
bottom left corner and select Reopen in Container in the command
menu.

Step 3: Add HN Search Code

To kick everything off, we add some code to allow us to communicate with
the Hacker News API. We're choosing Hacker News because its API does not
require authentication and offers a vast amount of data we can process.

We will cover this code sparingly and add it to a separate module.
Create a new file producer/src/hn.rs:

use serde::{Serialize, Deserialize};

#[derive(Debug, Serialize, Deserialize)]
pub struct HackerNewsResponse {
    pub hits: Vec<HNSearchResult>,
}

#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
pub struct HNSearchResult {
    pub author: String,
    #[serde(alias = "objectID")]
    pub id: String,
    pub title: String,
    url: Option<String>,
    pub story_text: Option<String>,
    #[serde(alias = "_tags")]
    pub tags: Option<Vec<String>>,
    pub points: u32,
}

pub async fn fetch_hn_stories(search_term: String, search_result_limit: u32) -> Result<HackerNewsResponse, reqwest::Error> {
    let url_encoded_search_term = urlencoding::encode(&search_term);
    let url_str= format!("https://hn.algolia.com/api/v1/search_by_date?query={}&tags=story&hitsPerPage={}", url_encoded_search_term, search_result_limit);
    let client = reqwest::Client::new();
    let request = client.get(url_str)
            .build().unwrap();
    let json_response = client.execute(request)
            .await?
            .json::<HackerNewsResponse>()
            .await?;

    Ok(json_response)
}
Enter fullscreen mode Exit fullscreen mode

We'll use this code to fetch stories via the Search API from Hacker News
to generate content for Kafka. From our main function, we'll call
fetch_hn_stories with a search term and a limit, indicating how many
results we want at maximum.

Step 4: Add Producer Code

In main.rs, add a new function:

 use kafka::producer::{Producer, Record, RequiredAcks};
 use std::time::Duration;
 use crate::hn::HNSearchResult;

 mod hn;

 fn send_to_kafka(host: &str, topic: &str, payload: Vec<HNSearchResult>) {
     let mut producer = Producer::from_hosts(vec![host.to_owned()])
         .with_ack_timeout(Duration::from_secs(1))
         .with_required_acks(RequiredAcks::One)
         .create()
         .unwrap();

     for search_result in payload {
         let buffer = serde_json::to_string(&search_result).unwrap();

         producer
             .send(&Record::from_value(topic, buffer.as_bytes()))
             .unwrap();
     }
 }

fn main() {
   //...
}
Enter fullscreen mode Exit fullscreen mode

send_to_kafka contains the minimum setup needed to talk to a Kafka
broker. We configure a timeout (.with_ack_timeout) and how many
Ack's we need at least to move on (.with_required_acks). Since our
Dev Setup only uses a single broker, we're setting this to 1 (this
setting might differ in production depending on your use case and the
number of available brokers).

Within topics, Kafka stores payloads as bytes. Therefore we need to
serialize it to a byte array (buffer.as_bytes).

With our send_to_kafka function in place, let's call in main():

//producer/src/main.rs
 #[tokio::main]
 async fn main() -> Result<(), Box<dyn std::error::Error>> {
     let stories = hn::fetch_hn_stories("Ruby".into(), 100).await?;
     println!("Fetched {} stories", stories.hits.len());
     send_to_kafka("broker:9092", "hnstories", stories.hits);
     Ok(())
 }
Enter fullscreen mode Exit fullscreen mode

Also notice how we added #[tokio::main] and
async fn main() -> Result<(), Box<dyn std::error::Error>>. This
addition is necessary to allow async code to run.

We fetch a hundred stories with an arbitrary search term (such as
"Ruby") and then send it to Kafka.

Step 5: Create Kafka Topic

Before we can run our code, we need to create a Kafka topic. By default,
a Kafka installation ships with command-line utilities to help with such
maintenance tasks. In a terminal on your local machine, run the
following command:

$ docker exec broker \
   kafka-topics --bootstrap-server broker:9092 \
                --create \
                --topic hnstories
   Created topic hnstories.
Enter fullscreen mode Exit fullscreen mode

Step 6: The Consumer

With the producer in place, let's create the consumer, reading Hacker
News search results from the topic. Before we run cargo new, open
Cargo.toml in the project root directory to add a new project:

[workspace]

members = [
    "producer",
    "consumer"
]
Enter fullscreen mode Exit fullscreen mode

Add "consumer" to the member list. Save and close the file. In the dev
container, run the following command to create a new project:

$ cargo new consumer 
  Created binary (application) `consumer` package
Enter fullscreen mode Exit fullscreen mode

Add the following dependencies for the consumer:

$ cargo add serde --features=derive -p consumer
$ cargo add serde_json -p consumer
$ cargo add kafka -p consumer
Enter fullscreen mode Exit fullscreen mode

In main.rs, add the following code:

use kafka::consumer::{Consumer, FetchOffset};

fn main() {
    let mut consumer =
       Consumer::from_hosts(vec!("broker:9092".to_owned()))
          .with_topic("hnstories".to_owned())
          .with_fallback_offset(FetchOffset::Earliest)
          .create()
          .unwrap();

    loop {
      for ms in consumer.poll().unwrap().iter() {
        for m in ms.messages() {
          let str = String::from_utf8_lossy(m.value);
          println!("{:?}",str);
        }
        let _ = consumer.consume_messageset(ms);
      }
      consumer.commit_consumed().unwrap();
    }
}
Enter fullscreen mode Exit fullscreen mode

Let's go through this code step by step.

First of all, we create a new consumer:

let mut consumer =
   Consumer::from_hosts(vec!("broker:9092".to_owned()))
      .with_topic("hnstories".to_owned())
      .with_fallback_offset(FetchOffset::Earliest)
      .create()
      .unwrap();
Enter fullscreen mode Exit fullscreen mode

We connect to a single broker (broker:9092). broker in this case, is
the domain name managed by Docker compose. We're listening to a single
topic (hnstories) and configuring a fallback offset.

The fallback offset allows the consumer to start reading messages from
the beginning of the topic. If we omit this configuration, it will not
consider previous events once up and running.

It's time to run the code. In VSCode, open the terminal to run commands
within the context of the dev container.

Open two terminal sessions within VSCode. Then, run the code:

$ cargo run -p producer
Enter fullscreen mode Exit fullscreen mode

And in the second session, run:

$ cargo run -p consumer
Enter fullscreen mode Exit fullscreen mode

This command starts the consumer application. The consumer polls the
topic for content and prints out messages whenever it receives new ones.

Final Thoughts

We only need a few steps to start building applications with Kafka. To
get us kicked off, we rely on a Docker Compose configuration that stands
up a single instance of Kafka and Zookeeper. With that out of the way,
we need a producer and consumer. A producer writes new data on a
topic while the consumer reads it.

To keep everything self-contained, we use VSCode's Dev Containers.

Once you are ready to move your application into production, make sure to check out Calisti.
Calisti allows you to stand up a production-ready Kafka cluster in your Kubernetes cluster in minutes. Additionally, it supports you in operating Kafka, even if you're still working on becoming a Kafka
expert.

Find the source code here.

Top comments (1)

Collapse
 
viskazz profile image
Viskazz • Edited

Hi Jan! Thanks for great write up!

But I have one question. Every time I'm start consumer its receive all messages sended by producer (already received last time). The producer sended only once and was shut down.
I was thinking that consumer.commit_consumed().unwrap(); must tell to broker to commit offsets, but seems it not happened. Did you expect this behavior? Or I miss some broker settings?