Kafka to PostgreSQL Streaming (Manual Setup)
Build a streaming pipeline from Kafka to PostgreSQL without templates, giving you full control over service configuration, versions, and plans.
Architecture
Step 1: Create Services Individually
# Kafka with a larger plan
curl -u $USER:$PASS -X POST https://api.foundrydb.com/managed-services \
-H "Content-Type: application/json" \
-d '{"name":"stream-kafka","database_type":"kafka","version":"3.9","plan_name":"tier-3","zone":"se-sto1","storage_size_gb":200,"storage_tier":"maxiops"}'
# PostgreSQL with read replica
curl -u $USER:$PASS -X POST https://api.foundrydb.com/managed-services \
-H "Content-Type: application/json" \
-d '{"name":"stream-pg","database_type":"postgresql","version":"17","plan_name":"tier-3","zone":"se-sto1","storage_size_gb":100,"storage_tier":"maxiops","node_count":2,"replication_mode":"async"}'
Step 2: Create PostgreSQL Schema
CREATE TABLE events (
id BIGSERIAL PRIMARY KEY,
event_type TEXT NOT NULL,
payload JSONB NOT NULL,
kafka_topic TEXT NOT NULL,
kafka_partition INT,
kafka_offset BIGINT,
received_at TIMESTAMPTZ DEFAULT NOW()
);
CREATE INDEX ON events (received_at DESC);
-- Deduplicate by Kafka offset for exactly-once semantics
CREATE UNIQUE INDEX ON events (kafka_topic, kafka_partition, kafka_offset);
Step 3: Stream Processor (Python)
import json
import psycopg2
from kafka import KafkaConsumer
consumer = KafkaConsumer(
"raw-events",
bootstrap_servers="stream-kafka.abc123.db.foundrydb.com:9093",
security_protocol="SASL_PLAINTEXT",
sasl_mechanism="SCRAM-SHA-256",
sasl_plain_username="app_user",
sasl_plain_password="KAFKA_PASS",
value_deserializer=lambda m: json.loads(m.decode("utf-8")),
group_id="pg-writer",
enable_auto_commit=False,
)
conn = psycopg2.connect("postgresql://app_user:PG_PASS@stream-pg.abc123.db.foundrydb.com:5432/defaultdb?sslmode=require")
batch = []
for message in consumer:
batch.append({
"event_type": message.value.get("type", "unknown"),
"payload": json.dumps(message.value),
"topic": message.topic,
"partition": message.partition,
"offset": message.offset,
})
if len(batch) >= 50:
with conn.cursor() as cur:
for row in batch:
cur.execute("""
INSERT INTO events (event_type, payload, kafka_topic, kafka_partition, kafka_offset)
VALUES (%(event_type)s, %(payload)s, %(topic)s, %(partition)s, %(offset)s)
ON CONFLICT (kafka_topic, kafka_partition, kafka_offset) DO NOTHING
""", row)
conn.commit()
consumer.commit()
batch.clear()
Exactly-Once Semantics
The unique index on (kafka_topic, kafka_partition, kafka_offset) with ON CONFLICT DO NOTHING ensures exactly-once delivery even if the consumer replays messages after a restart.
Scaling
- Kafka: Add partitions for parallel consumers
- PostgreSQL: Add read replicas for analytics queries
- Consumer: Run multiple instances with the same consumer group
Cleanup
curl -u $USER:$PASS -X DELETE https://api.foundrydb.com/managed-services/$KAFKA_ID
curl -u $USER:$PASS -X DELETE https://api.foundrydb.com/managed-services/$PG_ID