Skip to main content

PostgreSQL to Kafka CDC Pipeline

Stream every insert, update, and delete from a managed PostgreSQL service into Kafka topics. The platform wires the private network path, installs Debezium on the Kafka Connect worker, creates the logical replication publication, and starts the connector. You observe a status endpoint and consume Kafka topics.

Prerequisites

Before creating a pipeline you need:

  1. A running PostgreSQL service in your organization.
  2. A running Kafka service with the kafka-connect addon enabled in the same organization and same UpCloud peering region as the PostgreSQL service.
  3. Your organization ID (visible in the dashboard under Settings or via GET /organizations).
  4. API credentials with the service:create permission in the organization.

Create the services

If you do not have both services yet, create them now. The Kafka service must be created with addons: ["kafka-connect"].

# PostgreSQL source
curl -u "$USER:$PASS" -X POST https://api.foundrydb.com/managed-services \
-H "Content-Type: application/json" \
-d '{
"name": "orders-pg",
"database_type": "postgresql",
"version": "17",
"plan_name": "tier-2",
"zone": "se-sto1",
"storage_size_gb": 100,
"storage_tier": "maxiops"
}'

# Kafka sink — kafka-connect addon is required
curl -u "$USER:$PASS" -X POST https://api.foundrydb.com/managed-services \
-H "Content-Type: application/json" \
-d '{
"name": "events-kafka",
"database_type": "kafka",
"version": "3.9",
"plan_name": "tier-3",
"zone": "se-sto1",
"storage_size_gb": 200,
"storage_tier": "maxiops",
"addons": ["kafka-connect"]
}'

Wait for both services to reach running status before proceeding. The pipeline reconciler stays in Pending while either service is still provisioning.

# Poll until both are running
curl -u "$USER:$PASS" https://api.foundrydb.com/managed-services/$PG_ID | jq '.status'
curl -u "$USER:$PASS" https://api.foundrydb.com/managed-services/$KAFKA_ID | jq '.status'

Step 1: Create the pipeline

curl -u "$USER:$PASS" -X POST \
"https://api.foundrydb.com/organizations/$ORG_ID/pipelines" \
-H "Content-Type: application/json" \
-d '{
"name": "orders-cdc",
"pipeline_type": "cdc_pg_to_kafka",
"source_service_id": "'$PG_ID'",
"sink_service_id": "'$KAFKA_ID'",
"config": {
"database_name": "defaultdb",
"tables": ["public.orders", "public.customers"],
"topic_prefix": "shop",
"snapshot_mode": "initial"
}
}'

The API returns 202 Accepted with the pipeline in Pending status.

{
"id": "a1b2c3d4-e5f6-7890-abcd-ef1234567890",
"organization_id": "...",
"name": "orders-cdc",
"pipeline_type": "cdc_pg_to_kafka",
"source_service_id": "...",
"sink_service_id": "...",
"status": "Pending",
"provision_step": null,
"config": {
"database_name": "defaultdb",
"tables": ["public.orders", "public.customers"],
"topic_prefix": "shop",
"snapshot_mode": "initial"
},
"created_at": "2026-06-22T10:00:00Z"
}

Save the pipeline id.

Request fields

FieldTypeRequiredDescription
namestringYesPipeline name, unique per organization. Letters, digits, hyphens, underscores. Max 64 chars.
pipeline_typestringYesMust be cdc_pg_to_kafka.
source_service_idUUIDYesPostgreSQL service UUID.
sink_service_idUUIDYesKafka service UUID. Must differ from source.
config.database_namestringNoDatabase to capture. Defaults to defaultdb.
config.tablesstring[]NoTables as schema.table. Bare names get public. prefix. Empty means all tables.
config.topic_prefixstringNoKafka topic prefix. Topics are <prefix>.<schema>.<table>. Defaults to the pipeline name.
config.snapshot_modestringNoDebezium snapshot.mode. Defaults to initial (snapshot existing rows, then stream).

Step 2: Monitor provisioning

Poll the status endpoint. Provisioning typically completes in 2 to 4 minutes.

curl -u "$USER:$PASS" \
"https://api.foundrydb.com/pipelines/$PIPELINE_ID/status"
{
"id": "a1b2c3d4-e5f6-7890-abcd-ef1234567890",
"status": "Provisioning",
"connector_name": null,
"connector_state": null,
"task_states": null,
"source_lag_bytes": null,
"topic_prefix": "shop",
"last_health_check_at": null,
"error_message": null
}

The provision_step on the full pipeline object (via GET /organizations/{orgId}/pipelines/{pipelineId}) shows exactly where the reconciler is:

StepWhat is happening
ensure_networkBidirectional SDN peering created between source and sink subnets; waiting for UpCloud to activate both directions
install_pluginDebezium PostgreSQL connector plugin downloaded and extracted on the Kafka Connect worker; worker restarting
prepare_sourceREPLICATION attribute granted to the connector database user; sink subnet added to pg_hba.conf
create_publicationLogical replication publication created on the source; scoped to your table filter
create_connectorDebezium connector created; waiting for RUNNING state

When provisioning completes the status becomes Running:

{
"status": "Running",
"connector_name": "mdb-pipeline-a1b2c3d4",
"connector_state": "RUNNING",
"task_states": [{"id": 0, "state": "RUNNING"}],
"source_lag_bytes": 0,
"topic_prefix": "shop",
"last_health_check_at": "2026-06-22T10:04:12Z",
"error_message": null
}

Step 3: Verify data is flowing

Insert a row into the PostgreSQL source and check the Kafka topic.

-- Connect to your PostgreSQL service
INSERT INTO orders (id, customer_id, total, status)
VALUES (1001, 42, 99.95, 'placed');

The change lands on topic shop.public.orders. Consume it with the Kafka CLI or a SASL-aware client:

kafka-console-consumer \
--bootstrap-server "$KAFKA_HOST:9093" \
--consumer.config /tmp/sasl.properties \
--topic shop.public.orders \
--from-beginning \
--max-messages 1

Where /tmp/sasl.properties contains:

security.protocol=SASL_PLAINTEXT
sasl.mechanism=SCRAM-SHA-256
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
username="app_user" password="YOUR_KAFKA_PASS";

The message is a Debezium envelope with before, after, source, and op fields:

{
"before": null,
"after": {
"id": 1001,
"customer_id": 42,
"total": 99.95,
"status": "placed"
},
"source": {
"connector": "postgresql",
"db": "defaultdb",
"schema": "public",
"table": "orders",
"lsn": 12345678
},
"op": "c"
}

op values: c (create), u (update), d (delete), r (read/snapshot).

Step 4: Ongoing monitoring

Check the status endpoint for connector state and source replication lag:

curl -u "$USER:$PASS" \
"https://api.foundrydb.com/pipelines/$PIPELINE_ID/status" | jq '{
status: .status,
connector_state: .connector_state,
source_lag_bytes: .source_lag_bytes,
last_health_check_at: .last_health_check_at,
error: .error_message
}'

source_lag_bytes is the confirmed WAL lag reported by the replication slot. A value above a few megabytes on a busy system indicates the connector is falling behind.

Source failover behaviour

If the PostgreSQL primary fails over, the reconciler detects that the connector's database.hostname no longer matches the current primary's private IP. It updates the connector configuration, refreshes the credentials, and restarts the connector. Debezium recreates the replication slot on the new primary and resumes from its committed Kafka offsets.

You do not need to take any action. The pipeline status transitions briefly to a transient error message visible in error_message, then clears once the connector is RUNNING again. This takes 30 to 90 seconds depending on Kafka Connect restart behaviour.

See Overview: delivery guarantee for the at-least-once semantics that apply across failover.

Deleting the pipeline

curl -u "$USER:$PASS" -X DELETE \
"https://api.foundrydb.com/organizations/$ORG_ID/pipelines/$PIPELINE_ID"

Returns 202 Accepted. The reconciler then:

  1. Deletes the Debezium connector (releasing the replication slot handle).
  2. Drops the replication slot on the source primary.
  3. Drops the publication on the source.
  4. Removes the pg_hba.conf entry added for the pipeline.
  5. Soft-deletes the pipeline row.

The SDN peering between source and sink is kept in place. Other pipelines between the same two services may share it, and it is removed when the underlying service is deleted.

Troubleshooting

Status stuck in Pending

Both services must be in Running status. Check:

curl -u "$USER:$PASS" https://api.foundrydb.com/managed-services/$PG_ID | jq '.status'
curl -u "$USER:$PASS" https://api.foundrydb.com/managed-services/$KAFKA_ID | jq '.status'

The Kafka service also needs the kafka-connect addon fully running. The addon status is visible in the service detail response.

Status is Failed

Inspect error_message on the pipeline:

curl -u "$USER:$PASS" \
"https://api.foundrydb.com/organizations/$ORG_ID/pipelines/$PIPELINE_ID" \
| jq '{status: .status, error: .error_message, step: .provision_step}'

Common reasons:

ErrorFix
sink service does not have the kafka-connect addon enabledDelete and recreate the Kafka service with "addons": ["kafka-connect"]
source zone X and sink zone Y are not in the same peering regionUse services in the same UpCloud peering region (all European zones are in the same region)
source service must be postgresqlSwap source and sink IDs; the PostgreSQL service is the source
connector task failed: ...Check the connector task trace in error_message; often a credentials or publication mismatch

source_lag_bytes is growing

The connector is streaming but falling behind the WAL. Check:

  • Kafka broker disk space and throughput.
  • Whether the Kafka Connect worker VM is undersized (upgrade to a larger plan).
  • Whether tables is too broad; consider filtering to high-traffic tables only.

What's next