DEV Community

Renuka Patil
Renuka Patil

Posted on

Kafka Producer and Consumer Example in .NET 6 with ASP.NET Core

In this blog, we’ll dive into Kafka, a distributed streaming platform, and learn how to create a Producer and Consumer in .NET 6 using ASP.NET Core. We’ll cover Kafka’s core concepts, provide detailed explanations for each code snippet, and build a functional application that sends and receives messages.


What is Kafka?
Kafka is a high-throughput, distributed messaging system designed to handle real-time data streams. It has three key components:

  1. Producer: Sends data (messages) to Kafka topics.
  2. Consumer: Reads data (messages) from Kafka topics.
  3. Broker: A Kafka server that stores and manages incoming messages. Kafka typically runs in a cluster with multiple brokers.

Kafka organizes data into topics, which are like categories for storing messages. Messages within topics are immutable and ordered.

Image description


Core Kafka Concepts

  1. Topic: Logical channel to which messages are sent.
  2. Partition: Each topic is divided into partitions for parallel processing. Partitions ensure scalability.
  3. Offset: Unique identifier for messages within a partition.
  4. Broker: Kafka server managing topics and partitions.
  5. Producer: Sends data to Kafka topics.
  6. Consumer: Reads data from Kafka topics and processes it.
  7. Group: Consumers are organized into groups to share load and ensure each message is processed by one consumer within the group.

Prerequisites

  1. .NET 6 SDK: Download and install from the official .NET website.
  2. Kafka Installed: Follow the Kafka installation guide or use Docker to set up Kafka.

Getting Started with Kafka in .NET 6

Step 1: Set up Kafka

1.Start Zookeeper (Kafka’s dependency):

zookeeper-server-start.bat ..\..\config\zookeeper.properties
Enter fullscreen mode Exit fullscreen mode

2.Start Kafka:

kafka-server-start.bat ..\..\config\server.properties
Enter fullscreen mode Exit fullscreen mode

Image description

3.Create a Kafka topic for this example:

kafka-topics.bat --create --topic fruit --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1
Enter fullscreen mode Exit fullscreen mode

Image description


Step 2: Create a .NET 6 Project

Run the following command to create an ASP.NET Core Web API project:

dotnet new webapi -n KafkaProducerConsumer
cd KafkaProducerConsumer

Enter fullscreen mode Exit fullscreen mode

Step 3: Install Kafka Library
Kafka communication in .NET is enabled by the Confluent.Kafka library. Install it via NuGet:

dotnet add package Confluent.Kafka
dotnet add package Swashbuckle.AspNetCore

Enter fullscreen mode Exit fullscreen mode

Kafka in .NET 6: Step-by-Step Implementation

We will build two services:

  1. Kafka Producer Service: Sends messages to Kafka topics.
  2. Kafka Consumer Service: Continuously listens to and processes messages from Kafka topics.

Step 4: Configure Kafka Settings

Add Kafka configurations in appsettings.json to simplify access throughout the application:

{
    "Kafka": {
    "BootstrapServers": "localhost:9092"
  },
}

Enter fullscreen mode Exit fullscreen mode

Explanation:

  • BootstrapServers: Address of the Kafka broker.
  • Topic: The topic where messages will be sent or received.
  • GroupId: Identifies the consumer group for message processing.

Step 5: Create the Kafka Producer Service

Create a folder named Services and add KafkaProducerService.cs.

using Confluent.Kafka;

namespace KafkaExample.Services;

public interface IKafkaProducerService
{
    Task SendMessageAsync(string topic, string message);
}

public class KafkaProducerService : IKafkaProducerService
{
    private readonly IProducer<Null, string> _producer;

    // Constructor to initialize Kafka producer with configuration
    public KafkaProducerService()
    {
        var config = new ProducerConfig
        {
            BootstrapServers = "localhost:9092" // Kafka server details (ensure this is correct)
        };
        _producer = new ProducerBuilder<Null, string>(config).Build();
    }

    // Method to send message to Kafka topic
    public async Task SendMessageAsync(string topic, string message)
    {
        try
        {
            // Send message to the specified Kafka topic
            await _producer.ProduceAsync(topic, new Message<Null, string> { Value = message });
            Console.WriteLine($"Message '{message}' sent to topic '{topic}'.");
        }
        catch (Exception ex)
        {
            // Log any errors encountered while sending message
            Console.WriteLine($"Error sending message to Kafka: {ex.Message}");
            throw;
        }
    }
}
Enter fullscreen mode Exit fullscreen mode

Explanation:

  • ProducerConfig: Configures the producer, specifying the Kafka broker.
  • ProduceAsync: Sends a message to Kafka asynchronously.
  • Null: Key is set to null since our example doesn’t use keyed messages.
  • _producer: The Kafka producer instance sends messages to the specified topic.

Step 6: Create the Kafka Consumer Service

Add KafkaConsumerService.cs to the Services folder.

using Confluent.Kafka;
using System;
using System.Threading.Tasks;

namespace KafkaExample.Services
{
    public class KafkaConsumerService
    {
        private readonly IConsumer<Null, string> _consumer;

        public KafkaConsumerService()
        {
            var config = new ConsumerConfig
            {
                BootstrapServers = "localhost:9092",
                GroupId = "my-consumer-group",
                AutoOffsetReset = AutoOffsetReset.Earliest
            };

            _consumer = new ConsumerBuilder<Null, string>(config).Build();
        }

        public void ConsumeMessages(string topic)
        {
            _consumer.Subscribe(topic);

            try
            {
                while (true)
                {
                    var consumeResult = _consumer.Consume();
                    Console.WriteLine($"Consumed message: {consumeResult.Message.Value}");
                }
            }
            catch (ConsumeException e)
            {
                Console.WriteLine($"Error consuming message: {e.Error.Reason}");
            }
        }
    }
}

Enter fullscreen mode Exit fullscreen mode

Explanation:

  • ConsumerConfig: Configures the consumer to connect to the broker and specify the consumer group.
  • Subscribe: Subscribes the consumer to a topic.
  • Consume: Reads messages from the topic.
  • AutoOffsetReset.Earliest: Ensures the consumer starts reading messages from the beginning of the topic if no offsets exist.

Step 7: Register Services in Program.cs

Add both Kafka services to the application in Program.cs.

using KafkaExample.Services;
using Microsoft.AspNetCore.Builder;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;

var builder = WebApplication.CreateBuilder(args);

// Register Kafka producer service with Dependency Injection
builder.Services.AddSingleton<IKafkaProducerService, KafkaProducerService>();

// Add controllers (required for API endpoints)
builder.Services.AddControllers();

// Add Swagger for API documentation
builder.Services.AddEndpointsApiExplorer(); // For Swagger UI
builder.Services.AddSwaggerGen(); // For Swagger UI

var app = builder.Build();

// Configure Swagger in development environment
if (app.Environment.IsDevelopment())
{
    app.UseSwagger();
    app.UseSwaggerUI();
}

// Use HTTPS Redirection
app.UseHttpsRedirection();

// Configure the HTTP request pipeline to use controllers
app.MapControllers();

app.Run();

Enter fullscreen mode Exit fullscreen mode

Step 8: Create an API Endpoint for Sending Messages.

Add a KafkaController.cs in the Controllers folder to handle message requests.

using KafkaExample.Services;
using Microsoft.AspNetCore.Mvc;

namespace KafkaProducerConsumer.Controllers
{
    [ApiController]
    [Route("api/[controller]")]
    public class KafkaController : ControllerBase
    {
        private readonly IKafkaProducerService _producerService;

        public KafkaController(IKafkaProducerService producerService)
        {
            _producerService = producerService;
        }

        [HttpPost("send")]
        public async Task<IActionResult> SendMessage([FromQuery] string topic, [FromQuery] string message)
        {
            if (string.IsNullOrEmpty(topic) || string.IsNullOrEmpty(message))
            {
                return BadRequest("Both 'topic' and 'message' query parameters are required.");
            }

            await _producerService.SendMessageAsync(topic, message);
            return Ok($"Message '{message}' sent successfully to topic '{topic}'.");
        }
    }
}


Enter fullscreen mode Exit fullscreen mode

Explanation:

This API accepts a message as a query parameter and passes it to the Kafka producer.


This will be your solution explorer should look like:

Image description


Step 9: Run and Test

1.Start the Kafka and Zookeeper servers.

2.Run the .NET application:

dotnet run

Enter fullscreen mode Exit fullscreen mode

3.Use a REST client like Postman to send a message

POST
http://localhost:5292/api/kafka/send?topic=fruit&message=apple

Enter fullscreen mode Exit fullscreen mode

Image description

Image description

Image description


Kafka Workflow Recap

  1. The producer sends the message to the fruits topic.
  2. The Kafka broker receives and stores the message.
  3. The consumer reads the message from the topic and processes it.

This demonstrates the basic producer-consumer pattern in Kafka, integrated with a .NET 6 application.


You will get source code from github.

Conclusion

In this blog, we explored Kafka concepts and implemented a real-time producer-consumer application in .NET 6. Kafka is highly scalable, fault-tolerant, and suitable for distributed systems. This example can be extended to include advanced features like message keying, batch processing, error handling, and monitoring.

Happy coding!

Top comments (0)