Skip to main content

Kafka to PostgreSQL Streaming (Manual Setup)

Build a streaming pipeline from Kafka to PostgreSQL without templates, giving you full control over service configuration, versions, and plans.

Architecture

Step 1: Create Services Individually

# Kafka with a larger plan
curl -u $USER:$PASS -X POST https://api.foundrydb.com/managed-services \
-H "Content-Type: application/json" \
-d '{"name":"stream-kafka","database_type":"kafka","version":"3.9","plan_name":"tier-3","zone":"se-sto1","storage_size_gb":200,"storage_tier":"maxiops"}'

# PostgreSQL with read replica
curl -u $USER:$PASS -X POST https://api.foundrydb.com/managed-services \
-H "Content-Type: application/json" \
-d '{"name":"stream-pg","database_type":"postgresql","version":"17","plan_name":"tier-3","zone":"se-sto1","storage_size_gb":100,"storage_tier":"maxiops","node_count":2,"replication_mode":"async"}'

Step 2: Create PostgreSQL Schema

CREATE TABLE events (
id BIGSERIAL PRIMARY KEY,
event_type TEXT NOT NULL,
payload JSONB NOT NULL,
kafka_topic TEXT NOT NULL,
kafka_partition INT,
kafka_offset BIGINT,
received_at TIMESTAMPTZ DEFAULT NOW()
);

CREATE INDEX ON events (received_at DESC);
-- Deduplicate by Kafka offset for exactly-once semantics
CREATE UNIQUE INDEX ON events (kafka_topic, kafka_partition, kafka_offset);

Step 3: Stream Processor (Python)

import json
import psycopg2
from kafka import KafkaConsumer

consumer = KafkaConsumer(
"raw-events",
bootstrap_servers="stream-kafka.abc123.db.foundrydb.com:9093",
security_protocol="SASL_PLAINTEXT",
sasl_mechanism="SCRAM-SHA-256",
sasl_plain_username="app_user",
sasl_plain_password="KAFKA_PASS",
value_deserializer=lambda m: json.loads(m.decode("utf-8")),
group_id="pg-writer",
enable_auto_commit=False,
)

conn = psycopg2.connect("postgresql://app_user:PG_PASS@stream-pg.abc123.db.foundrydb.com:5432/defaultdb?sslmode=require")
batch = []

for message in consumer:
batch.append({
"event_type": message.value.get("type", "unknown"),
"payload": json.dumps(message.value),
"topic": message.topic,
"partition": message.partition,
"offset": message.offset,
})

if len(batch) >= 50:
with conn.cursor() as cur:
for row in batch:
cur.execute("""
INSERT INTO events (event_type, payload, kafka_topic, kafka_partition, kafka_offset)
VALUES (%(event_type)s, %(payload)s, %(topic)s, %(partition)s, %(offset)s)
ON CONFLICT (kafka_topic, kafka_partition, kafka_offset) DO NOTHING
""", row)
conn.commit()
consumer.commit()
batch.clear()

Exactly-Once Semantics

The unique index on (kafka_topic, kafka_partition, kafka_offset) with ON CONFLICT DO NOTHING ensures exactly-once delivery even if the consumer replays messages after a restart.

Scaling

  • Kafka: Add partitions for parallel consumers
  • PostgreSQL: Add read replicas for analytics queries
  • Consumer: Run multiple instances with the same consumer group

Cleanup

curl -u $USER:$PASS -X DELETE https://api.foundrydb.com/managed-services/$KAFKA_ID
curl -u $USER:$PASS -X DELETE https://api.foundrydb.com/managed-services/$PG_ID