Skip to main content

Build a RAG Pipeline with FoundryDB

Deploy a production-ready retrieval-augmented generation (RAG) pipeline in under 15 minutes. This tutorial sets up Kafka for document ingestion, PostgreSQL with pgvector for embedding storage and semantic search, and Valkey for low-latency query caching.

Architecture

Data flow:

  1. Documents are published to a Kafka topic
  2. An embedding worker consumes documents, generates embeddings (via OpenAI, Claude, or a local model), and stores them in PostgreSQL with pgvector
  3. Query requests first check Valkey for cached results
  4. On cache miss, PostgreSQL performs a vector similarity search
  5. Results are cached in Valkey for subsequent queries

Step 1: Deploy the Pipeline

Option A: One-Click (Dashboard)

  1. Navigate to Pipelines in the sidebar
  2. Click Deploy Pipeline
  3. Select RAG Pipeline
  4. Enter a name (e.g., my-rag) and select your zone
  5. Click Deploy Pipeline

Option B: REST API

curl -u $USER:$PASS -X POST https://api.foundrydb.com/pipelines \
-H "Content-Type: application/json" \
-d '{
"template_id": "rag-pipeline",
"name": "my-rag",
"zone": "se-sto1"
}'

Option C: CLI

fdb pipeline deploy --template rag-pipeline --name my-rag --zone se-sto1

This creates three services:

ServiceEngineRoleDefault Plan
my-rag-kafkaKafka 3.9Document ingestiontier-2, 50GB
my-rag-pgvectorPostgreSQL 17 + pgvectorVector storetier-2, 50GB
my-rag-cacheValkey 8Query cachetier-1, 20GB

Step 2: Wait for Deployment

Monitor the pipeline status:

curl -s -u $USER:$PASS https://api.foundrydb.com/pipelines/$PIPELINE_ID | jq '.status'

Or watch the Pipeline Detail page in the dashboard. It auto-refreshes every 5 seconds during deployment. Typical deployment time: 5-8 minutes.

Step 3: Get Connection Details

Once the pipeline status is running, retrieve connection parameters:

curl -s -u $USER:$PASS https://api.foundrydb.com/pipelines/$PIPELINE_ID \
| jq '.connections[] | {role, connections}'

Get database credentials:

# Reveal PostgreSQL password
curl -s -u $USER:$PASS -X POST \
https://api.foundrydb.com/managed-services/$PG_SERVICE_ID/database-users/app_user/reveal-password \
| jq '.password'

Step 4: Set Up the Vector Store

Connect to PostgreSQL and create the embeddings table:

-- pgvector extension is pre-installed
CREATE EXTENSION IF NOT EXISTS vector;

-- Create embeddings table
CREATE TABLE documents (
id BIGSERIAL PRIMARY KEY,
content TEXT NOT NULL,
metadata JSONB DEFAULT '{}',
embedding vector(1536), -- OpenAI ada-002 dimension
created_at TIMESTAMPTZ DEFAULT NOW()
);

-- Create HNSW index for fast similarity search
CREATE INDEX ON documents USING hnsw (embedding vector_cosine_ops)
WITH (m = 16, ef_construction = 64);

Step 5: Build the Ingestion Worker (Python)

This worker consumes documents from Kafka, generates embeddings, and stores them in PostgreSQL.

import json
import psycopg2
from kafka import KafkaConsumer
from openai import OpenAI

KAFKA_BOOTSTRAP = "my-rag-kafka.abc123.db.foundrydb.com:9093"
PG_URI = "postgresql://app_user:PASSWORD@my-rag-pgvector.abc123.db.foundrydb.com:5432/defaultdb?sslmode=require"

openai = OpenAI()

def get_embedding(text: str) -> list[float]:
response = openai.embeddings.create(model="text-embedding-ada-002", input=text)
return response.data[0].embedding

consumer = KafkaConsumer(
"documents",
bootstrap_servers=KAFKA_BOOTSTRAP,
security_protocol="SASL_PLAINTEXT",
sasl_mechanism="SCRAM-SHA-256",
sasl_plain_username="app_user",
sasl_plain_password="KAFKA_PASSWORD",
value_deserializer=lambda m: json.loads(m.decode("utf-8")),
group_id="embedding-worker",
)

conn = psycopg2.connect(PG_URI)

for message in consumer:
doc = message.value
embedding = get_embedding(doc["content"])
with conn.cursor() as cur:
cur.execute(
"INSERT INTO documents (content, metadata, embedding) VALUES (%s, %s, %s)",
(doc["content"], json.dumps(doc.get("metadata", {})), embedding)
)
conn.commit()

Step 6: Build the Query Service (Python)

This service handles semantic search with Valkey caching.

import json, hashlib
import psycopg2, redis
from openai import OpenAI

PG_URI = "postgresql://app_user:PASSWORD@my-rag-pgvector.abc123.db.foundrydb.com:5432/defaultdb?sslmode=require"
VALKEY_URL = "rediss://default:VALKEY_PASSWORD@my-rag-cache.abc123.db.foundrydb.com:6380"

openai = OpenAI()
conn = psycopg2.connect(PG_URI)
cache = redis.from_url(VALKEY_URL)

def search(query: str, top_k: int = 5) -> list[dict]:
cache_key = f"rag:{hashlib.sha256(query.encode()).hexdigest()[:16]}"
cached = cache.get(cache_key)
if cached:
return json.loads(cached)

query_embedding = openai.embeddings.create(
model="text-embedding-ada-002", input=query
).data[0].embedding

with conn.cursor() as cur:
cur.execute("""
SELECT content, metadata, 1 - (embedding <=> %s::vector) AS similarity
FROM documents
ORDER BY embedding <=> %s::vector
LIMIT %s
""", (query_embedding, query_embedding, top_k))
results = [
{"content": row[0], "metadata": row[1], "similarity": float(row[2])}
for row in cur.fetchall()
]

cache.setex(cache_key, 3600, json.dumps(results))
return results

Step 7: Produce Documents

import json
from kafka import KafkaProducer

producer = KafkaProducer(
bootstrap_servers="my-rag-kafka.abc123.db.foundrydb.com:9093",
security_protocol="SASL_PLAINTEXT",
sasl_mechanism="SCRAM-SHA-256",
sasl_plain_username="app_user",
sasl_plain_password="KAFKA_PASSWORD",
value_serializer=lambda v: json.dumps(v).encode("utf-8"),
)

producer.send("documents", {
"content": "PostgreSQL supports TLS encryption for all client connections.",
"metadata": {"source": "docs", "category": "security"}
})
producer.flush()

Monitoring

Each service has full observability via the FoundryDB dashboard:

  • Kafka: Topic metrics, consumer group lag, partition health
  • PostgreSQL: Query statistics, index advisor, connection pooling
  • Valkey: Memory usage, hit/miss ratio, connected clients

Next Steps

  • Add read replicas to PostgreSQL for search throughput
  • Set up alerts for Kafka consumer lag
  • Export metrics to Prometheus or Datadog
  • Scale individual services via the dashboard or API

Cleanup

curl -u $USER:$PASS -X DELETE https://api.foundrydb.com/pipelines/$PIPELINE_ID