PostgreSQL to Kafka CDC Pipeline
Stream every insert, update, and delete from a managed PostgreSQL service into Kafka topics. The platform wires the private network path, installs Debezium on the Kafka Connect worker, creates the logical replication publication, and starts the connector. You observe a status endpoint and consume Kafka topics.
Prerequisites
Before creating a pipeline you need:
- A running PostgreSQL service in your organization.
- A running Kafka service with the
kafka-connectaddon enabled in the same organization and same UpCloud peering region as the PostgreSQL service. - Your organization ID (visible in the dashboard under Settings or via
GET /organizations). - API credentials with the
service:createpermission in the organization.
Create the services
If you do not have both services yet, create them now. The Kafka service must be created with addons: ["kafka-connect"].
# PostgreSQL source
curl -u "$USER:$PASS" -X POST https://api.foundrydb.com/managed-services \
-H "Content-Type: application/json" \
-d '{
"name": "orders-pg",
"database_type": "postgresql",
"version": "17",
"plan_name": "tier-2",
"zone": "se-sto1",
"storage_size_gb": 100,
"storage_tier": "maxiops"
}'
# Kafka sink — kafka-connect addon is required
curl -u "$USER:$PASS" -X POST https://api.foundrydb.com/managed-services \
-H "Content-Type: application/json" \
-d '{
"name": "events-kafka",
"database_type": "kafka",
"version": "3.9",
"plan_name": "tier-3",
"zone": "se-sto1",
"storage_size_gb": 200,
"storage_tier": "maxiops",
"addons": ["kafka-connect"]
}'
Wait for both services to reach running status before proceeding. The pipeline reconciler stays in Pending while either service is still provisioning.
# Poll until both are running
curl -u "$USER:$PASS" https://api.foundrydb.com/managed-services/$PG_ID | jq '.status'
curl -u "$USER:$PASS" https://api.foundrydb.com/managed-services/$KAFKA_ID | jq '.status'
Step 1: Create the pipeline
curl -u "$USER:$PASS" -X POST \
"https://api.foundrydb.com/organizations/$ORG_ID/pipelines" \
-H "Content-Type: application/json" \
-d '{
"name": "orders-cdc",
"pipeline_type": "cdc_pg_to_kafka",
"source_service_id": "'$PG_ID'",
"sink_service_id": "'$KAFKA_ID'",
"config": {
"database_name": "defaultdb",
"tables": ["public.orders", "public.customers"],
"topic_prefix": "shop",
"snapshot_mode": "initial"
}
}'
The API returns 202 Accepted with the pipeline in Pending status.
{
"id": "a1b2c3d4-e5f6-7890-abcd-ef1234567890",
"organization_id": "...",
"name": "orders-cdc",
"pipeline_type": "cdc_pg_to_kafka",
"source_service_id": "...",
"sink_service_id": "...",
"status": "Pending",
"provision_step": null,
"config": {
"database_name": "defaultdb",
"tables": ["public.orders", "public.customers"],
"topic_prefix": "shop",
"snapshot_mode": "initial"
},
"created_at": "2026-06-22T10:00:00Z"
}
Save the pipeline id.
Request fields
| Field | Type | Required | Description |
|---|---|---|---|
name | string | Yes | Pipeline name, unique per organization. Letters, digits, hyphens, underscores. Max 64 chars. |
pipeline_type | string | Yes | Must be cdc_pg_to_kafka. |
source_service_id | UUID | Yes | PostgreSQL service UUID. |
sink_service_id | UUID | Yes | Kafka service UUID. Must differ from source. |
config.database_name | string | No | Database to capture. Defaults to defaultdb. |
config.tables | string[] | No | Tables as schema.table. Bare names get public. prefix. Empty means all tables. |
config.topic_prefix | string | No | Kafka topic prefix. Topics are <prefix>.<schema>.<table>. Defaults to the pipeline name. |
config.snapshot_mode | string | No | Debezium snapshot.mode. Defaults to initial (snapshot existing rows, then stream). |
Step 2: Monitor provisioning
Poll the status endpoint. Provisioning typically completes in 2 to 4 minutes.
curl -u "$USER:$PASS" \
"https://api.foundrydb.com/pipelines/$PIPELINE_ID/status"
{
"id": "a1b2c3d4-e5f6-7890-abcd-ef1234567890",
"status": "Provisioning",
"connector_name": null,
"connector_state": null,
"task_states": null,
"source_lag_bytes": null,
"topic_prefix": "shop",
"last_health_check_at": null,
"error_message": null
}
The provision_step on the full pipeline object (via GET /organizations/{orgId}/pipelines/{pipelineId}) shows exactly where the reconciler is:
| Step | What is happening |
|---|---|
ensure_network | Bidirectional SDN peering created between source and sink subnets; waiting for UpCloud to activate both directions |
install_plugin | Debezium PostgreSQL connector plugin downloaded and extracted on the Kafka Connect worker; worker restarting |
prepare_source | REPLICATION attribute granted to the connector database user; sink subnet added to pg_hba.conf |
create_publication | Logical replication publication created on the source; scoped to your table filter |
create_connector | Debezium connector created; waiting for RUNNING state |
When provisioning completes the status becomes Running:
{
"status": "Running",
"connector_name": "mdb-pipeline-a1b2c3d4",
"connector_state": "RUNNING",
"task_states": [{"id": 0, "state": "RUNNING"}],
"source_lag_bytes": 0,
"topic_prefix": "shop",
"last_health_check_at": "2026-06-22T10:04:12Z",
"error_message": null
}
Step 3: Verify data is flowing
Insert a row into the PostgreSQL source and check the Kafka topic.
-- Connect to your PostgreSQL service
INSERT INTO orders (id, customer_id, total, status)
VALUES (1001, 42, 99.95, 'placed');
The change lands on topic shop.public.orders. Consume it with the Kafka CLI or a SASL-aware client:
kafka-console-consumer \
--bootstrap-server "$KAFKA_HOST:9093" \
--consumer.config /tmp/sasl.properties \
--topic shop.public.orders \
--from-beginning \
--max-messages 1
Where /tmp/sasl.properties contains:
security.protocol=SASL_PLAINTEXT
sasl.mechanism=SCRAM-SHA-256
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
username="app_user" password="YOUR_KAFKA_PASS";
The message is a Debezium envelope with before, after, source, and op fields:
{
"before": null,
"after": {
"id": 1001,
"customer_id": 42,
"total": 99.95,
"status": "placed"
},
"source": {
"connector": "postgresql",
"db": "defaultdb",
"schema": "public",
"table": "orders",
"lsn": 12345678
},
"op": "c"
}
op values: c (create), u (update), d (delete), r (read/snapshot).
Step 4: Ongoing monitoring
Check the status endpoint for connector state and source replication lag:
curl -u "$USER:$PASS" \
"https://api.foundrydb.com/pipelines/$PIPELINE_ID/status" | jq '{
status: .status,
connector_state: .connector_state,
source_lag_bytes: .source_lag_bytes,
last_health_check_at: .last_health_check_at,
error: .error_message
}'
source_lag_bytes is the confirmed WAL lag reported by the replication slot. A value above a few megabytes on a busy system indicates the connector is falling behind.
Source failover behaviour
If the PostgreSQL primary fails over, the reconciler detects that the connector's database.hostname no longer matches the current primary's private IP. It updates the connector configuration, refreshes the credentials, and restarts the connector. Debezium recreates the replication slot on the new primary and resumes from its committed Kafka offsets.
You do not need to take any action. The pipeline status transitions briefly to a transient error message visible in error_message, then clears once the connector is RUNNING again. This takes 30 to 90 seconds depending on Kafka Connect restart behaviour.
See Overview: delivery guarantee for the at-least-once semantics that apply across failover.
Deleting the pipeline
curl -u "$USER:$PASS" -X DELETE \
"https://api.foundrydb.com/organizations/$ORG_ID/pipelines/$PIPELINE_ID"
Returns 202 Accepted. The reconciler then:
- Deletes the Debezium connector (releasing the replication slot handle).
- Drops the replication slot on the source primary.
- Drops the publication on the source.
- Removes the
pg_hba.confentry added for the pipeline. - Soft-deletes the pipeline row.
The SDN peering between source and sink is kept in place. Other pipelines between the same two services may share it, and it is removed when the underlying service is deleted.
Troubleshooting
Status stuck in Pending
Both services must be in Running status. Check:
curl -u "$USER:$PASS" https://api.foundrydb.com/managed-services/$PG_ID | jq '.status'
curl -u "$USER:$PASS" https://api.foundrydb.com/managed-services/$KAFKA_ID | jq '.status'
The Kafka service also needs the kafka-connect addon fully running. The addon status is visible in the service detail response.
Status is Failed
Inspect error_message on the pipeline:
curl -u "$USER:$PASS" \
"https://api.foundrydb.com/organizations/$ORG_ID/pipelines/$PIPELINE_ID" \
| jq '{status: .status, error: .error_message, step: .provision_step}'
Common reasons:
| Error | Fix |
|---|---|
sink service does not have the kafka-connect addon enabled | Delete and recreate the Kafka service with "addons": ["kafka-connect"] |
source zone X and sink zone Y are not in the same peering region | Use services in the same UpCloud peering region (all European zones are in the same region) |
source service must be postgresql | Swap source and sink IDs; the PostgreSQL service is the source |
connector task failed: ... | Check the connector task trace in error_message; often a credentials or publication mismatch |
source_lag_bytes is growing
The connector is streaming but falling behind the WAL. Check:
- Kafka broker disk space and throughput.
- Whether the Kafka Connect worker VM is undersized (upgrade to a larger plan).
- Whether
tablesis too broad; consider filtering to high-traffic tables only.
What's next
- Data Pipelines overview — lifecycle, network model, and delivery guarantees
- Data Pipelines API reference — full schema and all endpoints
- Kafka event streaming — creating and configuring Kafka services
- Pipeline Templates — deploy a pre-wired multi-service topology (RAG, event analytics, recommendation)