Building a RAG Pipeline with PostgreSQL pgvector and Kafka on FoundryDB
Retrieval-Augmented Generation (RAG) has become the standard approach for grounding LLMs in factual, up-to-date data. Instead of fine-tuning a model on your corpus (expensive, slow, stale within weeks), you retrieve relevant context at query time and feed it to the LLM alongside the user's question.
In 2026, RAG is no longer experimental. It powers customer support bots, internal knowledge search, legal document analysis, and code assistants at thousands of companies. The architecture has stabilized around a common pattern: ingest documents, generate embeddings, store vectors, retrieve at query time. What varies is how well you operate the infrastructure underneath.
This post walks through building a production RAG pipeline on FoundryDB using PostgreSQL with pgvector, Kafka for document ingestion, and Valkey for result caching.
Architecture Overview
The pipeline has four stages:
- Ingest: Documents arrive via Kafka topics. Producers can be web scrapers, file upload handlers, or webhook receivers.
- Embed: A consumer reads from Kafka, chunks the documents, calls an embedding API (OpenAI, Cohere, or a local model), and writes vectors to PostgreSQL.
- Store: PostgreSQL with pgvector stores embeddings alongside metadata. HNSW indexes enable sub-millisecond approximate nearest neighbor search.
- Query: At query time, the user's question is embedded, similar vectors are retrieved from PostgreSQL, and the results (plus the original question) are sent to the LLM. Valkey caches frequent queries to reduce latency and cost.
┌──────────┐ ┌───────┐ ┌────────────────────┐ ┌────────┐
│ Documents├───►│ Kafka ├───►│ PostgreSQL pgvector │◄───┤ App/LLM│
└──────────┘ └───────┘ └────────────────────┘ └───┬────┘
│
┌────▼────┐
│ Valkey │
│ (cache) │
└─────────┘
Deploying the Services
Create all three services using the FoundryDB CLI. Each service is production-ready with TLS, automated backups, and monitoring out of the box.
# PostgreSQL with pgvector enabled
fdb services create \
--name rag-vectors \
--engine postgresql \
--version 17 \
--plan tier-4 \
--storage 100 \
--storage-tier maxiops \
--zone eu-helsinki \
--preset pgvector
# Kafka for document ingestion
fdb services create \
--name rag-ingest \
--engine kafka \
--version 3.9 \
--plan tier-4 \
--storage 200 \
--zone eu-helsinki
# Valkey for query result caching
fdb services create \
--name rag-cache \
--engine valkey \
--version 8.1 \
--plan tier-2 \
--storage 20 \
--zone eu-helsinki
The --preset pgvector flag automatically enables the pgvector extension on your PostgreSQL instance. All three services deploy in the same EU zone, keeping inter-service latency under 1ms.
Setting Up the Vector Store
Once PostgreSQL is running, create your embeddings table and index. The schema stores the embedding vector, the original text chunk, and metadata for filtering.
-- Enable pgvector (already done if you used the pgvector preset)
CREATE EXTENSION IF NOT EXISTS vector;
-- Embeddings table: 1536 dimensions for OpenAI text-embedding-3-small
CREATE TABLE embeddings (
id bigint GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
content text NOT NULL,
metadata jsonb NOT NULL DEFAULT '{}',
embedding vector(1536) NOT NULL,
created_at timestamptz NOT NULL DEFAULT now()
);
-- HNSW index for approximate nearest neighbor search
-- m=24 and ef_construction=200 are good defaults for 1M+ vectors
CREATE INDEX idx_embeddings_hnsw ON embeddings
USING hnsw (embedding vector_cosine_ops)
WITH (m = 24, ef_construction = 200);
-- Metadata index for filtered queries
CREATE INDEX idx_embeddings_metadata ON embeddings USING gin (metadata);
Kafka Topic for Document Ingestion
Create a topic for incoming documents. Using multiple partitions allows parallel processing by embedding workers.
# Get Kafka connection details
fdb services get rag-ingest --format json | jq '.connection_info'
# Create the documents topic with 6 partitions
fdb kafka topics create \
--service rag-ingest \
--name documents \
--partitions 6 \
--replication-factor 1 \
--config retention.ms=604800000
Here is a Python consumer that reads documents from Kafka, generates embeddings, and stores them in PostgreSQL:
import json
from kafka import KafkaConsumer
from openai import OpenAI
import psycopg
# Initialize clients
openai_client = OpenAI()
pg_conn = psycopg.connect("postgresql://user:pass@rag-vectors.foundrydb.com/defaultdb?sslmode=verify-full")
consumer = KafkaConsumer(
"documents",
bootstrap_servers="rag-ingest.foundrydb.com:9093",
security_protocol="SASL_PLAINTEXT",
sasl_mechanism="SCRAM-SHA-256",
sasl_plain_username="app_user",
sasl_plain_password="your-password",
value_deserializer=lambda m: json.loads(m.decode("utf-8")),
group_id="embedding-workers",
)
def chunk_text(text: str, max_tokens: int = 512) -> list[str]:
"""Split text into overlapping chunks."""
words = text.split()
chunks = []
for i in range(0, len(words), max_tokens - 50):
chunk = " ".join(words[i:i + max_tokens])
if chunk:
chunks.append(chunk)
return chunks
for message in consumer:
doc = message.value
chunks = chunk_text(doc["content"])
# Batch embed all chunks
response = openai_client.embeddings.create(
model="text-embedding-3-small",
input=chunks
)
# Bulk insert into PostgreSQL
with pg_conn.cursor() as cur:
for chunk, emb_data in zip(chunks, response.data):
cur.execute(
"""INSERT INTO embeddings (content, metadata, embedding)
VALUES (%s, %s, %s::vector)""",
(chunk, json.dumps(doc.get("metadata", {})), emb_data.embedding)
)
pg_conn.commit()
Querying with Caching
The query path embeds the user's question, searches PostgreSQL for similar vectors, and caches the result in Valkey. Subsequent identical queries return instantly from cache.
import hashlib
import json
import valkey
import psycopg
from openai import OpenAI
openai_client = OpenAI()
pg_conn = psycopg.connect("postgresql://user:pass@rag-vectors.foundrydb.com/defaultdb?sslmode=verify-full")
cache = valkey.Valkey(
host="rag-cache.foundrydb.com", port=6380,
username="app_user", password="your-password", ssl=True
)
def search(query: str, top_k: int = 5, cache_ttl: int = 3600) -> list[dict]:
# Check cache first
cache_key = f"rag:{hashlib.sha256(query.encode()).hexdigest()}"
cached = cache.get(cache_key)
if cached:
return json.loads(cached)
# Embed the query
response = openai_client.embeddings.create(
model="text-embedding-3-small",
input=[query]
)
query_vec = response.data[0].embedding
# Similarity search via pgvector
with pg_conn.cursor() as cur:
cur.execute(
"""SELECT content, metadata,
1 - (embedding <=> %s::vector) AS similarity
FROM embeddings
ORDER BY embedding <=> %s::vector
LIMIT %s""",
(query_vec, query_vec, top_k)
)
results = [
{"content": row[0], "metadata": row[1], "similarity": float(row[2])}
for row in cur.fetchall()
]
# Cache results
cache.set(cache_key, json.dumps(results), ex=cache_ttl)
return results
Performance: IVFFlat vs HNSW
pgvector supports two index types. Choosing the right one depends on your dataset size and query pattern.
| Index | Build Time | Query Speed | Recall | Best For |
|---|---|---|---|---|
| HNSW | Slower (hours at 10M+) | ~1ms | 99%+ | Production queries, high recall requirements |
| IVFFlat | Fast (minutes at 10M+) | ~2-5ms | 95-98% | Large datasets with acceptable recall tradeoff |
For most RAG applications, HNSW is the right choice. The recall difference matters: at 95% recall, 1 in 20 queries misses a relevant chunk. At 99%+, missed context is rare.
Two tuning parameters to know:
ef_search(HNSW): higher values increase recall at the cost of latency. Default is 40; set to 100-200 for production RAG.- Parallel query: enable
SET max_parallel_workers_per_gather = 4for similarity searches across large tables.
-- Tune HNSW search quality (set per session or in postgresql.conf)
SET hnsw.ef_search = 150;
-- Enable parallel workers for large scans
SET max_parallel_workers_per_gather = 4;
Why This Stack Works
This architecture handles the full RAG lifecycle without duct tape:
- Kafka decouples document ingestion from embedding generation. You can scale embedding workers independently, replay failed batches, and add new document sources without changing downstream code.
- PostgreSQL pgvector stores vectors alongside your relational data. No need for a separate vector database. You can join embeddings with user tables, filter by metadata using standard SQL, and use transactions for consistency.
- Valkey eliminates redundant embedding API calls and database queries. A 1-hour TTL on cached results cuts costs significantly for repeated queries.
- FoundryDB handles the operational overhead: automated backups, TLS everywhere, monitoring, and all three services in one dashboard.
Next Steps
Deploy this pipeline on FoundryDB and start indexing your documents. The quick start guide covers creating your first service, and the PostgreSQL documentation has details on pgvector configuration and tuning.
For production deployments, consider adding read replicas to your PostgreSQL service for query scaling, and enabling Kafka's multi-node mode for ingestion high availability.