Build a Real-time Recommendation Engine
Deploy a recommendation pipeline that streams user events through Kafka, computes features in PostgreSQL, and serves recommendations from Valkey with sub-millisecond latency.
Architecture
Step 1: Deploy the Pipeline
curl -u $USER:$PASS -X POST https://api.foundrydb.com/pipelines \
-H "Content-Type: application/json" \
-d '{
"template_id": "recommendation-engine",
"name": "my-recs",
"zone": "se-sto1"
}'
This creates:
| Service | Engine | Role |
|---|---|---|
my-recs-events | Kafka 3.9 | Event stream |
my-recs-features | PostgreSQL 17 | Feature store |
my-recs-cache | Valkey 8 | Recommendation cache |
Step 2: Set Up the Feature Store
CREATE TABLE user_features (
user_id TEXT PRIMARY KEY,
category_scores JSONB DEFAULT '{}',
total_views INT DEFAULT 0,
total_purchases INT DEFAULT 0,
last_active TIMESTAMPTZ DEFAULT NOW(),
embedding vector(128),
updated_at TIMESTAMPTZ DEFAULT NOW()
);
CREATE TABLE items (
item_id TEXT PRIMARY KEY,
title TEXT NOT NULL,
category TEXT NOT NULL,
embedding vector(128),
popularity_score FLOAT DEFAULT 0
);
CREATE INDEX ON items USING hnsw (embedding vector_cosine_ops);
CREATE TABLE interactions (
id BIGSERIAL PRIMARY KEY,
user_id TEXT NOT NULL,
item_id TEXT NOT NULL,
action TEXT NOT NULL,
timestamp TIMESTAMPTZ DEFAULT NOW()
);
CREATE INDEX ON interactions (user_id, timestamp DESC);
Step 3: Build the Event Consumer
import json
import psycopg2, redis
from kafka import KafkaConsumer
consumer = KafkaConsumer(
"user-events",
bootstrap_servers="my-recs-events.abc123.db.foundrydb.com:9093",
security_protocol="SASL_PLAINTEXT",
sasl_mechanism="SCRAM-SHA-256",
sasl_plain_username="app_user",
sasl_plain_password="KAFKA_PASSWORD",
value_deserializer=lambda m: json.loads(m.decode("utf-8")),
group_id="feature-worker",
)
conn = psycopg2.connect("postgresql://app_user:PG_PASS@my-recs-features.abc123.db.foundrydb.com:5432/defaultdb?sslmode=require")
cache = redis.from_url("rediss://default:VALKEY_PASS@my-recs-cache.abc123.db.foundrydb.com:6380")
for message in consumer:
event = message.value
user_id, item_id, action = event["user_id"], event["item_id"], event["action"]
with conn.cursor() as cur:
cur.execute("INSERT INTO interactions (user_id, item_id, action) VALUES (%s, %s, %s)", (user_id, item_id, action))
cur.execute("""
INSERT INTO user_features (user_id, total_views, last_active) VALUES (%s, 1, NOW())
ON CONFLICT (user_id) DO UPDATE SET total_views = user_features.total_views + 1, last_active = NOW()
""", (user_id,))
conn.commit()
cache.delete(f"recs:{user_id}") # Invalidate cached recommendations
Step 4: Build the Recommendation Service
import json
import psycopg2, redis
conn = psycopg2.connect("postgresql://app_user:PG_PASS@my-recs-features.abc123.db.foundrydb.com:5432/defaultdb?sslmode=require")
cache = redis.from_url("rediss://default:VALKEY_PASS@my-recs-cache.abc123.db.foundrydb.com:6380")
def get_recommendations(user_id: str, top_k: int = 10) -> list[dict]:
cached = cache.get(f"recs:{user_id}")
if cached:
return json.loads(cached)
with conn.cursor() as cur:
cur.execute("""
WITH recent_items AS (
SELECT DISTINCT item_id FROM interactions
WHERE user_id = %s ORDER BY item_id LIMIT 20
),
user_taste AS (
SELECT AVG(i.embedding) as avg_embedding
FROM recent_items ri JOIN items i ON i.item_id = ri.item_id
WHERE i.embedding IS NOT NULL
)
SELECT i.item_id, i.title, i.category, 1 - (i.embedding <=> ut.avg_embedding) AS score
FROM items i, user_taste ut
WHERE i.item_id NOT IN (SELECT item_id FROM recent_items) AND i.embedding IS NOT NULL
ORDER BY i.embedding <=> ut.avg_embedding LIMIT %s
""", (user_id, top_k))
results = [{"item_id": r[0], "title": r[1], "category": r[2], "score": float(r[3])} for r in cur.fetchall()]
cache.setex(f"recs:{user_id}", 300, json.dumps(results))
return results
Next Steps
- Add item embedding generation using your ML model
- Scale PostgreSQL with read replicas for query throughput
- Configure Valkey Sentinel for high availability
- Set up alerts for Kafka consumer lag
Cleanup
curl -u $USER:$PASS -X DELETE https://api.foundrydb.com/pipelines/$PIPELINE_ID