DEV Community

tonybui1812
tonybui1812

Posted on

Redis Streams - model data as a log or a stream of events

Redis Streams is a feature that allows you to model data as a log or a stream of events. You can use it for various purposes, including event-driven architecture, message queuing, and real-time data processing. Here's an example of how you can use Redis Streams to implement a simple message queue:

const redis = require('redis');
const { promisify } = require('util');

const redisClient = redis.createClient();
const xaddAsync = promisify(redisClient.xadd).bind(redisClient);
const xreadgroupAsync = promisify(redisClient.xreadgroup).bind(redisClient);

const STREAM_NAME = 'message_stream';
const CONSUMER_GROUP_NAME = 'message_consumers';

// Create a stream (if it doesn't exist)
async function createStreamIfNotExists() {
  try {
    await redisClient.xgroup('CREATE', STREAM_NAME, CONSUMER_GROUP_NAME, '$', 'MKSTREAM');
  } catch (err) {
    // Ignore if the stream already exists
    if (!err.message.includes('BUSYGROUP Consumer Group name already exists')) {
      throw err;
    }
  }
}

// Produce a message to the stream
async function produceMessage(message) {
  const messageId = await xaddAsync(STREAM_NAME, '*', 'message', message);
  console.log(`Produced message with ID: ${messageId}`);
}

// Consume messages from the stream
async function consumeMessages() {
  const consumerName = 'consumer-1';

  while (true) {
    try {
      const messages = await xreadgroupAsync(
        'GROUP',
        CONSUMER_GROUP_NAME,
        consumerName,
        'BLOCK',
        0,
        'COUNT',
        10,
        'STREAMS',
        STREAM_NAME,
        '>',
      );

      for (const [stream, messageData] of messages) {
        for (const [messageId, message] of messageData) {
          console.log(`Received message with ID ${messageId}: ${message}`);
          // Process the message here

          // Acknowledge the message to remove it from the stream
          await redisClient.xack(STREAM_NAME, CONSUMER_GROUP_NAME, messageId);
        }
      }
    } catch (err) {
      console.error('Error consuming messages:', err);
    }
  }
}

(async () => {
  await createStreamIfNotExists();

  // Start consuming messages
  consumeMessages();

  // Produce some example messages
  for (let i = 1; i <= 10; i++) {
    await produceMessage(`Message ${i}`);
    await new Promise((resolve) => setTimeout(resolve, 1000)); // Delay between messages
  }
})();
Enter fullscreen mode Exit fullscreen mode

In this example:

  • We create a Redis stream named message_stream and a consumer group named message_consumers.

  • The produceMessage function is used to produce messages to the stream, and the consumeMessages function consumes messages from the stream.

  • Messages are produced to the stream with unique IDs, and consumers can read and acknowledge (ack) them. Once a message is acknowledged, it's considered processed and removed from the stream.

  • The consumer script runs continuously, waiting for new messages to arrive in the stream. It processes each message and acknowledges it.

Redis Streams is a versatile feature that can be used for more complex use cases beyond simple message queues, including real-time event processing and log aggregation. You can customize and extend this example to fit your specific requirements.

Top comments (0)