Skip to main content

Build a Real-time Recommendation Engine

Deploy a recommendation pipeline that streams user events through Kafka, computes features in PostgreSQL, and serves recommendations from Valkey with sub-millisecond latency.

Architecture

Step 1: Deploy the Pipeline

curl -u $USER:$PASS -X POST https://api.foundrydb.com/pipelines \
-H "Content-Type: application/json" \
-d '{
"template_id": "recommendation-engine",
"name": "my-recs",
"zone": "se-sto1"
}'

This creates:

ServiceEngineRole
my-recs-eventsKafka 3.9Event stream
my-recs-featuresPostgreSQL 17Feature store
my-recs-cacheValkey 8Recommendation cache

Step 2: Set Up the Feature Store

CREATE TABLE user_features (
user_id TEXT PRIMARY KEY,
category_scores JSONB DEFAULT '{}',
total_views INT DEFAULT 0,
total_purchases INT DEFAULT 0,
last_active TIMESTAMPTZ DEFAULT NOW(),
embedding vector(128),
updated_at TIMESTAMPTZ DEFAULT NOW()
);

CREATE TABLE items (
item_id TEXT PRIMARY KEY,
title TEXT NOT NULL,
category TEXT NOT NULL,
embedding vector(128),
popularity_score FLOAT DEFAULT 0
);

CREATE INDEX ON items USING hnsw (embedding vector_cosine_ops);

CREATE TABLE interactions (
id BIGSERIAL PRIMARY KEY,
user_id TEXT NOT NULL,
item_id TEXT NOT NULL,
action TEXT NOT NULL,
timestamp TIMESTAMPTZ DEFAULT NOW()
);
CREATE INDEX ON interactions (user_id, timestamp DESC);

Step 3: Build the Event Consumer

import json
import psycopg2, redis
from kafka import KafkaConsumer

consumer = KafkaConsumer(
"user-events",
bootstrap_servers="my-recs-events.abc123.db.foundrydb.com:9093",
security_protocol="SASL_PLAINTEXT",
sasl_mechanism="SCRAM-SHA-256",
sasl_plain_username="app_user",
sasl_plain_password="KAFKA_PASSWORD",
value_deserializer=lambda m: json.loads(m.decode("utf-8")),
group_id="feature-worker",
)

conn = psycopg2.connect("postgresql://app_user:PG_PASS@my-recs-features.abc123.db.foundrydb.com:5432/defaultdb?sslmode=require")
cache = redis.from_url("rediss://default:VALKEY_PASS@my-recs-cache.abc123.db.foundrydb.com:6380")

for message in consumer:
event = message.value
user_id, item_id, action = event["user_id"], event["item_id"], event["action"]

with conn.cursor() as cur:
cur.execute("INSERT INTO interactions (user_id, item_id, action) VALUES (%s, %s, %s)", (user_id, item_id, action))
cur.execute("""
INSERT INTO user_features (user_id, total_views, last_active) VALUES (%s, 1, NOW())
ON CONFLICT (user_id) DO UPDATE SET total_views = user_features.total_views + 1, last_active = NOW()
""", (user_id,))
conn.commit()
cache.delete(f"recs:{user_id}") # Invalidate cached recommendations

Step 4: Build the Recommendation Service

import json
import psycopg2, redis

conn = psycopg2.connect("postgresql://app_user:PG_PASS@my-recs-features.abc123.db.foundrydb.com:5432/defaultdb?sslmode=require")
cache = redis.from_url("rediss://default:VALKEY_PASS@my-recs-cache.abc123.db.foundrydb.com:6380")

def get_recommendations(user_id: str, top_k: int = 10) -> list[dict]:
cached = cache.get(f"recs:{user_id}")
if cached:
return json.loads(cached)

with conn.cursor() as cur:
cur.execute("""
WITH recent_items AS (
SELECT DISTINCT item_id FROM interactions
WHERE user_id = %s ORDER BY item_id LIMIT 20
),
user_taste AS (
SELECT AVG(i.embedding) as avg_embedding
FROM recent_items ri JOIN items i ON i.item_id = ri.item_id
WHERE i.embedding IS NOT NULL
)
SELECT i.item_id, i.title, i.category, 1 - (i.embedding <=> ut.avg_embedding) AS score
FROM items i, user_taste ut
WHERE i.item_id NOT IN (SELECT item_id FROM recent_items) AND i.embedding IS NOT NULL
ORDER BY i.embedding <=> ut.avg_embedding LIMIT %s
""", (user_id, top_k))
results = [{"item_id": r[0], "title": r[1], "category": r[2], "score": float(r[3])} for r in cur.fetchall()]

cache.setex(f"recs:{user_id}", 300, json.dumps(results))
return results

Next Steps

  • Add item embedding generation using your ML model
  • Scale PostgreSQL with read replicas for query throughput
  • Configure Valkey Sentinel for high availability
  • Set up alerts for Kafka consumer lag

Cleanup

curl -u $USER:$PASS -X DELETE https://api.foundrydb.com/pipelines/$PIPELINE_ID