DEV Community

Cover image for Building a Real-Time Recommendation Engine with RisingWave, Kafka, and Redis
RisingWave Labs
RisingWave Labs

Posted on

Building a Real-Time Recommendation Engine with RisingWave, Kafka, and Redis

Stop Serving Stale Recommendations: Go Real-Time!

Personalized product recommendations are no longer a "nice-to-have" for e-commerce sites – they're essential. Customers expect tailored suggestions, and they expect them now. Traditional, batch-processed recommendation systems simply can't keep up. They rely on periodic updates, often leaving customers with outdated suggestions that don't reflect their current interests. Imagine browsing for running shoes and being shown recommendations for winter coats based on data from yesterday! That's a missed opportunity (and a frustrated customer).

This post will guide you through building a real-time recommendation engine using RisingWave, Kafka, and Redis. This system reacts immediately to user actions, delivering relevant suggestions that improve the shopping experience and drive conversions.

The Architecture: A Real-Time Recommendation Pipeline

Here’s how the system comes together:

A Real-Time Recommendation Pipeline

Data Flow Overview

  1. Data Ingestion:
    • User activity (e.g., page views, clicks, searches, add-to-cart events) streams into a Kafka topic in real time.
    • Product catalog data (e.g., IDs, names, categories, prices) is stored in a database like PostgreSQL, with updates captured via Change Data Capture (CDC) and streamed directly to RisingWave through its built-in connector. For simplicity, you can also save the product catalog data in RisingWave directly, if the data is not updated frequently.
    • Optionally, a purchase stream can fuel collaborative filtering.
  2. Stream Processing (RisingWave):

    RisingWave ingests these streams by defining sources linked to Kafka topics. SQL-based materialized views handle the recommendation logic, updating continuously as new data arrives.

  3. Recommendation Storage (Redis):

    Precomputed recommendations from RisingWave’s materialized views are stored in Redis, an in-memory cache optimized for rapid retrieval.

    Note: For low-traffic setups or prototypes, you could query RisingWave directly, but Redis is preferred in production for its performance advantage (for the time being).

  4. Recommendation Serving:

    The e-commerce app fetches recommendations from Redis using a user ID, ensuring a fast, seamless experience.

Sample Data (JSON):

  • User Activity (Kafka):

    
    // Page View Event
    {
      "event_type": "page_view",
      "user_id": 123,
      "product_id": "product_abc",
      "timestamp": "2024-07-27T10:00:00Z"
    }
    
    // Search Event
    {
        "event_type": "search",
        "user_id": 456,
        "query": "running shoes",
        "timestamp": "2024-07-27T10:03:00Z"
    }
    
  • Product Catalog (Kafka - via CDC):

    {
      "product_id": "product_abc",
      "name": "Awesome Running Shoes",
      "category": "shoes/running",
      "price": 99.99,
      "description": "...",
      "image_url": "..."
    }
    

Building the Pipeline with RisingWave

Let's build the core components of our recommendation engine using RisingWave's SQL interface.

Prerequisites

First, please ensure these systems are up and running.

  • A RisingWave cluster. For details about how to start a cluster, see RisingWave quickstart.
  • A Kafka instance that stores the live user activities stream in a topic.
  • A Postgres instance that stores the product_catalog table and has enabled Change Data Capture. For detailed configurations, see Ingest data from PostgreSQL CDC.
  • A Redis instance for caching recommendations.

Step 1: Define Sources

Connect to Kafka topics with these source definitions:

-- User Activity Stream
CREATE SOURCE user_activity_stream (
    event_type VARCHAR,
    user_id INT,
    product_id VARCHAR,
    timestamp TIMESTAMP
) WITH (
    connector = 'kafka',
    topic = 'user_activity',
    brokers = 'kafka-broker1:9092,kafka-broker2:9092',
    scan.startup.mode = 'earliest',
    format = 'json'
);

-- Product Catalog Stream
CREATE SOURCE product_catalog (
    product_id VARCHAR,
    name VARCHAR,
    category VARCHAR,
    price DOUBLE PRECISION,
    description VARCHAR,
    image_url VARCHAR
) WITH (
    connector = 'kafka',
    topic = 'product_catalog',
    brokers = 'kafka-broker1:9092,kafka-broker2:9092',
    scan.startup.mode = 'earliest',
    format = 'json'
);
Enter fullscreen mode Exit fullscreen mode

Step 2: Materialized Views for Trending Products

Track the top 10 most-viewed products in hourly windows:

CREATE MATERIALIZED VIEW trending_products AS
WITH windowed_views AS (
    SELECT
        window_start,
        product_id,
        COUNT(*) AS view_count
    FROM
        TUMBLE(user_activity_stream, timestamp, INTERVAL '1 hour')
    WHERE event_type = 'page_view'
    GROUP BY window_start, product_id
),
ranked_products AS (
    SELECT
        window_start,
        product_id,
        view_count,
        RANK() OVER (PARTITION BY window_start ORDER BY view_count DESC) AS rank
    FROM windowed_views
)
SELECT
    window_start,
    product_id,
    view_count
FROM ranked_products
WHERE rank <= 10; -- Top 10 trending products
Enter fullscreen mode Exit fullscreen mode

This view:

  • Groups page views into 1-hour windows.
  • Ranks products by view count per window.
  • Updates incrementally as new events stream in.

Step 3: Materialized Views for User-Specific Recommendations

Generate personalized suggestions based on recent category views:

CREATE MATERIALIZED VIEW user_recommendations AS
WITH recent_user_activity AS (
    SELECT user_id, product_id, timestamp
    FROM user_activity_stream
    WHERE event_type = 'page_view'
      AND timestamp > NOW() - INTERVAL '24 hours'
),
user_category_views AS (
    SELECT
        r.user_id,
        p.category,
        COUNT(*) AS category_views
    FROM recent_user_activity r
    JOIN product_catalog p ON r.product_id = p.product_id
    GROUP BY r.user_id, p.category
),
ranked_categories AS (
    SELECT
        user_id,
        category,
        category_views,
        RANK() OVER (PARTITION BY user_id ORDER BY category_views DESC) AS rank
    FROM user_category_views
),
recommendations AS (
    SELECT
        rc.user_id,
        p.product_id AS recommended_product_id
    FROM ranked_categories rc
    JOIN product_catalog p ON rc.category = p.category
    WHERE rc.rank <= 3
    AND p.product_id NOT IN (
        SELECT product_id FROM recent_user_activity WHERE user_id = rc.user_id
    )
)
SELECT
    user_id,
    array_agg(recommended_product_id) AS recommended_products
FROM recommendations
GROUP BY user_id;
Enter fullscreen mode Exit fullscreen mode

This view:

  • Analyzes the last 24 hours of activity.
  • Ranks a user’s top 3 categories by views.
  • Recommends unviewed products from those categories.

Step 4: Sink to Redis

Export recommendations to Redis:

-- Sink for user-specific recommendations
CREATE SINK user_recommendations_sink
FROM user_recommendations WITH (
    connector = 'redis',
    primary_key = 'user_id',
    redis.url = 'redis://127.0.0.1:6379/'
) FORMAT PLAIN ENCODE JSON (
    force_append_only = 'true'
);

-- Sink for trending products
CREATE SINK trending_products_sink
FROM trending_products WITH (
    connector = 'redis',
    primary_key = 'window_start,product_id',
    redis.url = 'redis://127.0.0.1:6379/'
) FORMAT PLAIN ENCODE TEMPLATE (
    force_append_only = 'true',
    key_format = 'trending:{window_start}',
    value_format = '{product_id}:{view_count}'
);
Enter fullscreen mode Exit fullscreen mode
  • Redis Structure:
    • User Recommendations: user_id → {"user_id": 123, "recommended_products": ["prod1", "prod2"]}
    • Trending Products: trending:2024-03-21T10:00:00 → product_abc:42

Serving Recommendations (Application-Side)

Fetch recommendations from Redis using a simple Python script:

import redis
import json

# Connect to Redis
redis_client = redis.Redis(host='127.0.0.1', port=6379, decode_responses=True)

def get_user_recommendations(user_id):
    data = redis_client.get(str(user_id))
    return json.loads(data)['recommended_products'] if data else []

def get_trending_products(window_start):
    key = f"trending:{window_start}"
    return redis_client.hgetall(key)

# Example usage
print(get_user_recommendations(123))
print(get_trending_products('2025-02-25T10:00:00'))

Enter fullscreen mode Exit fullscreen mode

Direct RisingWave Querying (for low-traffic scenarios):

As mentioned earlier, for low-traffic scenarios or during initial prototyping, you could query the materialized views directly. Here's how that would look:

from risingwave import RisingWave, RisingWaveConnOptions, OutputFormat
import pandas as pd

# Connect to RisingWave using the official SDK
rw = RisingWave(
    RisingWaveConnOptions.from_connection_info(
        host='localhost',
        port=4566,  # Default RisingWave port
        user='root',
        password='root',
        database='dev'
    )
)

def get_recommendations_direct(user_id: int) -> list:
    """Retrieves recommendations directly from RisingWave."""
    query = f"""
        SELECT recommended_products 
        FROM user_recommendations 
        WHERE user_id = {user_id}
    """

    # Using fetch to get results as a DataFrame
    result: pd.DataFrame = rw.fetch(
        query,
        format=OutputFormat.DATAFRAME
    )

    if not result.empty:
        return result['recommended_products'].iloc[0]
    return []

# Example usage
def example_usage():
    user_recs = get_recommendations_direct(123)
    print(f"Recommendations for user 123 (direct): {user_recs}")

    # You can also get results as raw tuples if preferred
    raw_results = rw.fetch(
        "SELECT * FROM trending_products LIMIT 5",
        format=OutputFormat.RAW
    )
    print(f"Trending products (raw): {raw_results}")

# Note: While direct querying is simpler, using Redis as a cache layer
# is recommended for production environments with frequent requests.
Enter fullscreen mode Exit fullscreen mode

Benefits and Scalability of This Architecture

This setup delivers:

  • Real-Time Updates: Instant reaction to user actions.
  • Low Latency: Redis ensures fast retrieval; RisingWave handles efficiently otherwise.
  • Scalability: Both RisingWave and Redis scale horizontally with traffic.
  • Ease of Use: SQL simplifies development.
  • Reliability: Distributed design enhances fault tolerance.

Taking it Further: Future Enhancements

This blog post provides a solid foundation. Here are some ways you could enhance the system:

  • Implement A/B testing for recommendation strategies.
  • Integrate machine learning models for deeper insights.
  • Add segmented trends tailored to user demographics.
  • Enrich data with feature stores for advanced personalization.

Get Started Today!

RisingWave makes building a real-time recommendation system surprisingly straightforward. By combining the power of streaming data processing with the speed of Redis, you can create a dynamic and engaging e-commerce experience that keeps your customers coming back for more. Dive into the RisingWave documentation to begin!

Top comments (0)