Build a RAG Pipeline with FoundryDB
Deploy a production-ready retrieval-augmented generation (RAG) pipeline in under 15 minutes. This tutorial sets up Kafka for document ingestion, PostgreSQL with pgvector for embedding storage and semantic search, and Valkey for low-latency query caching.
Architecture
Data flow:
- Documents are published to a Kafka topic
- An embedding worker consumes documents, generates embeddings (via OpenAI, Claude, or a local model), and stores them in PostgreSQL with pgvector
- Query requests first check Valkey for cached results
- On cache miss, PostgreSQL performs a vector similarity search
- Results are cached in Valkey for subsequent queries
Step 1: Deploy the Pipeline
Option A: One-Click (Dashboard)
- Navigate to Pipelines in the sidebar
- Click Deploy Pipeline
- Select RAG Pipeline
- Enter a name (e.g.,
my-rag) and select your zone - Click Deploy Pipeline
Option B: REST API
curl -u $USER:$PASS -X POST https://api.foundrydb.com/pipelines \
-H "Content-Type: application/json" \
-d '{
"template_id": "rag-pipeline",
"name": "my-rag",
"zone": "se-sto1"
}'
Option C: CLI
fdb pipeline deploy --template rag-pipeline --name my-rag --zone se-sto1
This creates three services:
| Service | Engine | Role | Default Plan |
|---|---|---|---|
my-rag-kafka | Kafka 3.9 | Document ingestion | tier-2, 50GB |
my-rag-pgvector | PostgreSQL 17 + pgvector | Vector store | tier-2, 50GB |
my-rag-cache | Valkey 8 | Query cache | tier-1, 20GB |
Step 2: Wait for Deployment
Monitor the pipeline status:
curl -s -u $USER:$PASS https://api.foundrydb.com/pipelines/$PIPELINE_ID | jq '.status'
Or watch the Pipeline Detail page in the dashboard. It auto-refreshes every 5 seconds during deployment. Typical deployment time: 5-8 minutes.
Step 3: Get Connection Details
Once the pipeline status is running, retrieve connection parameters:
curl -s -u $USER:$PASS https://api.foundrydb.com/pipelines/$PIPELINE_ID \
| jq '.connections[] | {role, connections}'
Get database credentials:
# Reveal PostgreSQL password
curl -s -u $USER:$PASS -X POST \
https://api.foundrydb.com/managed-services/$PG_SERVICE_ID/database-users/app_user/reveal-password \
| jq '.password'
Step 4: Set Up the Vector Store
Connect to PostgreSQL and create the embeddings table:
-- pgvector extension is pre-installed
CREATE EXTENSION IF NOT EXISTS vector;
-- Create embeddings table
CREATE TABLE documents (
id BIGSERIAL PRIMARY KEY,
content TEXT NOT NULL,
metadata JSONB DEFAULT '{}',
embedding vector(1536), -- OpenAI ada-002 dimension
created_at TIMESTAMPTZ DEFAULT NOW()
);
-- Create HNSW index for fast similarity search
CREATE INDEX ON documents USING hnsw (embedding vector_cosine_ops)
WITH (m = 16, ef_construction = 64);
Step 5: Build the Ingestion Worker (Python)
This worker consumes documents from Kafka, generates embeddings, and stores them in PostgreSQL.
import json
import psycopg2
from kafka import KafkaConsumer
from openai import OpenAI
KAFKA_BOOTSTRAP = "my-rag-kafka.abc123.db.foundrydb.com:9093"
PG_URI = "postgresql://app_user:PASSWORD@my-rag-pgvector.abc123.db.foundrydb.com:5432/defaultdb?sslmode=require"
openai = OpenAI()
def get_embedding(text: str) -> list[float]:
response = openai.embeddings.create(model="text-embedding-ada-002", input=text)
return response.data[0].embedding
consumer = KafkaConsumer(
"documents",
bootstrap_servers=KAFKA_BOOTSTRAP,
security_protocol="SASL_PLAINTEXT",
sasl_mechanism="SCRAM-SHA-256",
sasl_plain_username="app_user",
sasl_plain_password="KAFKA_PASSWORD",
value_deserializer=lambda m: json.loads(m.decode("utf-8")),
group_id="embedding-worker",
)
conn = psycopg2.connect(PG_URI)
for message in consumer:
doc = message.value
embedding = get_embedding(doc["content"])
with conn.cursor() as cur:
cur.execute(
"INSERT INTO documents (content, metadata, embedding) VALUES (%s, %s, %s)",
(doc["content"], json.dumps(doc.get("metadata", {})), embedding)
)
conn.commit()
Step 6: Build the Query Service (Python)
This service handles semantic search with Valkey caching.
import json, hashlib
import psycopg2, redis
from openai import OpenAI
PG_URI = "postgresql://app_user:PASSWORD@my-rag-pgvector.abc123.db.foundrydb.com:5432/defaultdb?sslmode=require"
VALKEY_URL = "rediss://default:VALKEY_PASSWORD@my-rag-cache.abc123.db.foundrydb.com:6380"
openai = OpenAI()
conn = psycopg2.connect(PG_URI)
cache = redis.from_url(VALKEY_URL)
def search(query: str, top_k: int = 5) -> list[dict]:
cache_key = f"rag:{hashlib.sha256(query.encode()).hexdigest()[:16]}"
cached = cache.get(cache_key)
if cached:
return json.loads(cached)
query_embedding = openai.embeddings.create(
model="text-embedding-ada-002", input=query
).data[0].embedding
with conn.cursor() as cur:
cur.execute("""
SELECT content, metadata, 1 - (embedding <=> %s::vector) AS similarity
FROM documents
ORDER BY embedding <=> %s::vector
LIMIT %s
""", (query_embedding, query_embedding, top_k))
results = [
{"content": row[0], "metadata": row[1], "similarity": float(row[2])}
for row in cur.fetchall()
]
cache.setex(cache_key, 3600, json.dumps(results))
return results
Step 7: Produce Documents
import json
from kafka import KafkaProducer
producer = KafkaProducer(
bootstrap_servers="my-rag-kafka.abc123.db.foundrydb.com:9093",
security_protocol="SASL_PLAINTEXT",
sasl_mechanism="SCRAM-SHA-256",
sasl_plain_username="app_user",
sasl_plain_password="KAFKA_PASSWORD",
value_serializer=lambda v: json.dumps(v).encode("utf-8"),
)
producer.send("documents", {
"content": "PostgreSQL supports TLS encryption for all client connections.",
"metadata": {"source": "docs", "category": "security"}
})
producer.flush()
Monitoring
Each service has full observability via the FoundryDB dashboard:
- Kafka: Topic metrics, consumer group lag, partition health
- PostgreSQL: Query statistics, index advisor, connection pooling
- Valkey: Memory usage, hit/miss ratio, connected clients
Next Steps
- Add read replicas to PostgreSQL for search throughput
- Set up alerts for Kafka consumer lag
- Export metrics to Prometheus or Datadog
- Scale individual services via the dashboard or API
Cleanup
curl -u $USER:$PASS -X DELETE https://api.foundrydb.com/pipelines/$PIPELINE_ID