Stop Serving Stale Recommendations: Go Real-Time!
Personalized product recommendations are no longer a "nice-to-have" for e-commerce sites – they're essential. Customers expect tailored suggestions, and they expect them now. Traditional, batch-processed recommendation systems simply can't keep up. They rely on periodic updates, often leaving customers with outdated suggestions that don't reflect their current interests. Imagine browsing for running shoes and being shown recommendations for winter coats based on data from yesterday! That's a missed opportunity (and a frustrated customer).
This post will guide you through building a real-time recommendation engine using RisingWave, Kafka, and Redis. This system reacts immediately to user actions, delivering relevant suggestions that improve the shopping experience and drive conversions.
The Architecture: A Real-Time Recommendation Pipeline
Here’s how the system comes together:
Data Flow Overview
-
Data Ingestion:
- User activity (e.g., page views, clicks, searches, add-to-cart events) streams into a Kafka topic in real time.
- Product catalog data (e.g., IDs, names, categories, prices) is stored in a database like PostgreSQL, with updates captured via Change Data Capture (CDC) and streamed directly to RisingWave through its built-in connector. For simplicity, you can also save the product catalog data in RisingWave directly, if the data is not updated frequently.
- Optionally, a purchase stream can fuel collaborative filtering.
-
Stream Processing (RisingWave):
RisingWave ingests these streams by defining sources linked to Kafka topics. SQL-based materialized views handle the recommendation logic, updating continuously as new data arrives.
-
Recommendation Storage (Redis):
Precomputed recommendations from RisingWave’s materialized views are stored in Redis, an in-memory cache optimized for rapid retrieval.
Note: For low-traffic setups or prototypes, you could query RisingWave directly, but Redis is preferred in production for its performance advantage (for the time being).
-
Recommendation Serving:
The e-commerce app fetches recommendations from Redis using a user ID, ensuring a fast, seamless experience.
Sample Data (JSON):
-
User Activity (Kafka):
// Page View Event { "event_type": "page_view", "user_id": 123, "product_id": "product_abc", "timestamp": "2024-07-27T10:00:00Z" } // Search Event { "event_type": "search", "user_id": 456, "query": "running shoes", "timestamp": "2024-07-27T10:03:00Z" }
-
Product Catalog (Kafka - via CDC):
{ "product_id": "product_abc", "name": "Awesome Running Shoes", "category": "shoes/running", "price": 99.99, "description": "...", "image_url": "..." }
Building the Pipeline with RisingWave
Let's build the core components of our recommendation engine using RisingWave's SQL interface.
Prerequisites
First, please ensure these systems are up and running.
- A RisingWave cluster. For details about how to start a cluster, see RisingWave quickstart.
- A Kafka instance that stores the live user activities stream in a topic.
- A Postgres instance that stores the
product_catalog
table and has enabled Change Data Capture. For detailed configurations, see Ingest data from PostgreSQL CDC. - A Redis instance for caching recommendations.
Step 1: Define Sources
Connect to Kafka topics with these source definitions:
-- User Activity Stream
CREATE SOURCE user_activity_stream (
event_type VARCHAR,
user_id INT,
product_id VARCHAR,
timestamp TIMESTAMP
) WITH (
connector = 'kafka',
topic = 'user_activity',
brokers = 'kafka-broker1:9092,kafka-broker2:9092',
scan.startup.mode = 'earliest',
format = 'json'
);
-- Product Catalog Stream
CREATE SOURCE product_catalog (
product_id VARCHAR,
name VARCHAR,
category VARCHAR,
price DOUBLE PRECISION,
description VARCHAR,
image_url VARCHAR
) WITH (
connector = 'kafka',
topic = 'product_catalog',
brokers = 'kafka-broker1:9092,kafka-broker2:9092',
scan.startup.mode = 'earliest',
format = 'json'
);
Step 2: Materialized Views for Trending Products
Track the top 10 most-viewed products in hourly windows:
CREATE MATERIALIZED VIEW trending_products AS
WITH windowed_views AS (
SELECT
window_start,
product_id,
COUNT(*) AS view_count
FROM
TUMBLE(user_activity_stream, timestamp, INTERVAL '1 hour')
WHERE event_type = 'page_view'
GROUP BY window_start, product_id
),
ranked_products AS (
SELECT
window_start,
product_id,
view_count,
RANK() OVER (PARTITION BY window_start ORDER BY view_count DESC) AS rank
FROM windowed_views
)
SELECT
window_start,
product_id,
view_count
FROM ranked_products
WHERE rank <= 10; -- Top 10 trending products
This view:
- Groups page views into 1-hour windows.
- Ranks products by view count per window.
- Updates incrementally as new events stream in.
Step 3: Materialized Views for User-Specific Recommendations
Generate personalized suggestions based on recent category views:
CREATE MATERIALIZED VIEW user_recommendations AS
WITH recent_user_activity AS (
SELECT user_id, product_id, timestamp
FROM user_activity_stream
WHERE event_type = 'page_view'
AND timestamp > NOW() - INTERVAL '24 hours'
),
user_category_views AS (
SELECT
r.user_id,
p.category,
COUNT(*) AS category_views
FROM recent_user_activity r
JOIN product_catalog p ON r.product_id = p.product_id
GROUP BY r.user_id, p.category
),
ranked_categories AS (
SELECT
user_id,
category,
category_views,
RANK() OVER (PARTITION BY user_id ORDER BY category_views DESC) AS rank
FROM user_category_views
),
recommendations AS (
SELECT
rc.user_id,
p.product_id AS recommended_product_id
FROM ranked_categories rc
JOIN product_catalog p ON rc.category = p.category
WHERE rc.rank <= 3
AND p.product_id NOT IN (
SELECT product_id FROM recent_user_activity WHERE user_id = rc.user_id
)
)
SELECT
user_id,
array_agg(recommended_product_id) AS recommended_products
FROM recommendations
GROUP BY user_id;
This view:
- Analyzes the last 24 hours of activity.
- Ranks a user’s top 3 categories by views.
- Recommends unviewed products from those categories.
Step 4: Sink to Redis
Export recommendations to Redis:
-- Sink for user-specific recommendations
CREATE SINK user_recommendations_sink
FROM user_recommendations WITH (
connector = 'redis',
primary_key = 'user_id',
redis.url = 'redis://127.0.0.1:6379/'
) FORMAT PLAIN ENCODE JSON (
force_append_only = 'true'
);
-- Sink for trending products
CREATE SINK trending_products_sink
FROM trending_products WITH (
connector = 'redis',
primary_key = 'window_start,product_id',
redis.url = 'redis://127.0.0.1:6379/'
) FORMAT PLAIN ENCODE TEMPLATE (
force_append_only = 'true',
key_format = 'trending:{window_start}',
value_format = '{product_id}:{view_count}'
);
-
Redis Structure:
- User Recommendations:
user_id → {"user_id": 123, "recommended_products": ["prod1", "prod2"]}
- Trending Products:
trending:2024-03-21T10:00:00 → product_abc:42
- User Recommendations:
Serving Recommendations (Application-Side)
Fetch recommendations from Redis using a simple Python script:
import redis
import json
# Connect to Redis
redis_client = redis.Redis(host='127.0.0.1', port=6379, decode_responses=True)
def get_user_recommendations(user_id):
data = redis_client.get(str(user_id))
return json.loads(data)['recommended_products'] if data else []
def get_trending_products(window_start):
key = f"trending:{window_start}"
return redis_client.hgetall(key)
# Example usage
print(get_user_recommendations(123))
print(get_trending_products('2025-02-25T10:00:00'))
Direct RisingWave Querying (for low-traffic scenarios):
As mentioned earlier, for low-traffic scenarios or during initial prototyping, you could query the materialized views directly. Here's how that would look:
from risingwave import RisingWave, RisingWaveConnOptions, OutputFormat
import pandas as pd
# Connect to RisingWave using the official SDK
rw = RisingWave(
RisingWaveConnOptions.from_connection_info(
host='localhost',
port=4566, # Default RisingWave port
user='root',
password='root',
database='dev'
)
)
def get_recommendations_direct(user_id: int) -> list:
"""Retrieves recommendations directly from RisingWave."""
query = f"""
SELECT recommended_products
FROM user_recommendations
WHERE user_id = {user_id}
"""
# Using fetch to get results as a DataFrame
result: pd.DataFrame = rw.fetch(
query,
format=OutputFormat.DATAFRAME
)
if not result.empty:
return result['recommended_products'].iloc[0]
return []
# Example usage
def example_usage():
user_recs = get_recommendations_direct(123)
print(f"Recommendations for user 123 (direct): {user_recs}")
# You can also get results as raw tuples if preferred
raw_results = rw.fetch(
"SELECT * FROM trending_products LIMIT 5",
format=OutputFormat.RAW
)
print(f"Trending products (raw): {raw_results}")
# Note: While direct querying is simpler, using Redis as a cache layer
# is recommended for production environments with frequent requests.
Benefits and Scalability of This Architecture
This setup delivers:
- Real-Time Updates: Instant reaction to user actions.
- Low Latency: Redis ensures fast retrieval; RisingWave handles efficiently otherwise.
- Scalability: Both RisingWave and Redis scale horizontally with traffic.
- Ease of Use: SQL simplifies development.
- Reliability: Distributed design enhances fault tolerance.
Taking it Further: Future Enhancements
This blog post provides a solid foundation. Here are some ways you could enhance the system:
- Implement A/B testing for recommendation strategies.
- Integrate machine learning models for deeper insights.
- Add segmented trends tailored to user demographics.
- Enrich data with feature stores for advanced personalization.
Get Started Today!
RisingWave makes building a real-time recommendation system surprisingly straightforward. By combining the power of streaming data processing with the speed of Redis, you can create a dynamic and engaging e-commerce experience that keeps your customers coming back for more. Dive into the RisingWave documentation to begin!
Top comments (0)