DEV Community

César Fabián CHÁVEZ LINARES
César Fabián CHÁVEZ LINARES

Posted on

Building Real-Time Data Pipelines with Debezium and Kafka: A Practical Guide

Introduction

In today's data-driven world, maintaining synchronized data across different systems is crucial. Change Data Capture (CDC) has emerged as a powerful pattern for tracking and propagating changes from your database in real-time. In this guide, we'll build a practical example using Debezium and Apache Kafka to create a robust CDC pipeline.

What We'll Build

We'll create a simple e-commerce scenario where order updates in a PostgreSQL database are automatically synchronized with an Elasticsearch instance for real-time search capabilities. This setup demonstrates a common real-world use case for CDC.

Prerequisites

  • Docker and Docker Compose
  • Java 11 or higher
  • Maven
  • Git
  • PostgreSQL client (psql)
  • curl (for testing)

Architecture Overview

Our architecture consists of several components:

  1. PostgreSQL database (source)
  2. Debezium connector
  3. Apache Kafka
  4. Kafka Connect
  5. Elasticsearch (target)
  6. Simple Spring Boot application for testing
graph LR
    A[PostgreSQL] -->|Debezium| B[Kafka Connect]
    B -->|Events| C[Kafka]
    C -->|Sink Connector| D[Elasticsearch]
    E[Spring Boot App] -->|Writes| A
    D -->|Search| E
Enter fullscreen mode Exit fullscreen mode

Implementation Steps

1. Setting Up the Environment

First, let's create our project structure:

mkdir cdc-demo
cd cdc-demo
git init
Enter fullscreen mode Exit fullscreen mode

Create a docker-compose.yml file:

version: '3'
services:
  postgres:
    image: debezium/postgres:13
    ports:
      - "5432:5432"
    environment:
      - POSTGRES_DB=inventory
      - POSTGRES_USER=postgres
      - POSTGRES_PASSWORD=postgres
    volumes:
      - ./postgres/init:/docker-entrypoint-initdb.d

  kafka:
    image: confluentinc/cp-kafka:7.3.0
    ports:
      - "9092:9092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
    depends_on:
      - zookeeper

  zookeeper:
    image: confluentinc/cp-zookeeper:7.3.0
    ports:
      - "2181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181

  connect:
    image: debezium/connect:2.1
    ports:
      - "8083:8083"
    environment:
      GROUP_ID: 1
      CONFIG_STORAGE_TOPIC: my_connect_configs
      OFFSET_STORAGE_TOPIC: my_connect_offsets
      STATUS_STORAGE_TOPIC: my_connect_statuses
      BOOTSTRAP_SERVERS: kafka:29092
    depends_on:
      - kafka
      - postgres

  elasticsearch:
    image: docker.elastic.co/elasticsearch/elasticsearch:7.17.0
    ports:
      - "9200:9200"
    environment:
      - discovery.type=single-node
      - ES_JAVA_OPTS=-Xms512m -Xmx512m
Enter fullscreen mode Exit fullscreen mode

2. Creating the Database Schema

Create postgres/init/init.sql:

CREATE TABLE orders (
    id SERIAL PRIMARY KEY,
    customer_id INTEGER NOT NULL,
    order_date TIMESTAMP NOT NULL,
    status VARCHAR(50) NOT NULL,
    total_amount DECIMAL(10,2) NOT NULL
);

ALTER TABLE orders REPLICA IDENTITY FULL;
Enter fullscreen mode Exit fullscreen mode

3. Configuring Debezium

After starting the containers, configure the Debezium connector:

curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d '{
    "name": "inventory-connector",
    "config": {
        "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
        "database.hostname": "postgres",
        "database.port": "5432",
        "database.user": "postgres",
        "database.password": "postgres",
        "database.dbname": "inventory",
        "database.server.name": "dbserver1",
        "table.include.list": "public.orders",
        "plugin.name": "pgoutput"
    }
}'
Enter fullscreen mode Exit fullscreen mode

4. Spring Boot Application

Create a new Spring Boot project with the following dependencies:

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-data-jpa</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
        <groupId>org.postgresql</groupId>
        <artifactId>postgresql</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka</artifactId>
    </dependency>
</dependencies>
Enter fullscreen mode Exit fullscreen mode

Create the Order entity:

@Entity
@Table(name = "orders")
public class Order {
    @Id
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    private Long id;
    private Long customerId;
    private LocalDateTime orderDate;
    private String status;
    private BigDecimal totalAmount;

    // Getters, setters, and constructors
}
Enter fullscreen mode Exit fullscreen mode

Create a REST controller for testing:

@RestController
@RequestMapping("/api/orders")
public class OrderController {
    private final OrderRepository orderRepository;

    public OrderController(OrderRepository orderRepository) {
        this.orderRepository = orderRepository;
    }

    @PostMapping
    public Order createOrder(@RequestBody Order order) {
        order.setOrderDate(LocalDateTime.now());
        return orderRepository.save(order);
    }

    @PutMapping("/{id}")
    public Order updateOrder(@PathVariable Long id, @RequestBody Order order) {
        return orderRepository.findById(id)
            .map(existingOrder -> {
                existingOrder.setStatus(order.getStatus());
                existingOrder.setTotalAmount(order.getTotalAmount());
                return orderRepository.save(existingOrder);
            })
            .orElseThrow(() -> new ResponseStatusException(HttpStatus.NOT_FOUND));
    }
}
Enter fullscreen mode Exit fullscreen mode

5. Testing the Pipeline

  1. Start all containers:
docker-compose up -d
Enter fullscreen mode Exit fullscreen mode
  1. Create a test order:
curl -X POST http://localhost:8080/api/orders \
  -H "Content-Type: application/json" \
  -d '{
    "customerId": 1,
    "status": "NEW",
    "totalAmount": 99.99
  }'
Enter fullscreen mode Exit fullscreen mode
  1. Update the order:
curl -X PUT http://localhost:8080/api/orders/1 \
  -H "Content-Type: application/json" \
  -d '{
    "status": "PROCESSING",
    "totalAmount": 99.99
  }'
Enter fullscreen mode Exit fullscreen mode
  1. Check Kafka topics to verify the CDC events:
docker-compose exec kafka kafka-console-consumer \
  --bootstrap-server kafka:29092 \
  --topic dbserver1.public.orders \
  --from-beginning
Enter fullscreen mode Exit fullscreen mode

Common Challenges and Solutions

  1. Data Consistency

    • Use transaction logs for accurate change capture
    • Implement idempotent consumers
    • Handle out-of-order events
  2. Performance Optimization

    • Batch updates when possible
    • Monitor Kafka partition lag
    • Tune PostgreSQL replication slots
  3. Error Handling

    • Implement dead letter queues
    • Set up proper monitoring and alerting
    • Create retry mechanisms

Best Practices

  1. Schema Evolution

    • Use Avro for schema management
    • Plan for backward/forward compatibility
    • Test schema changes thoroughly
  2. Monitoring

    • Track replication lag
    • Monitor Kafka consumer group offsets
    • Set up alerts for connector failures
  3. Security

    • Use SSL/TLS for communication
    • Implement proper authentication
    • Follow least privilege principle

Conclusion

CDC with Debezium and Kafka provides a robust solution for real-time data synchronization. This setup can be extended to handle more complex scenarios like:

  • Multi-region deployment
  • Multiple target systems
  • Complex transformation pipelines
  • High availability requirements

Resources

Top comments (0)