Skip to main content

Build an Event-driven Analytics Stack

Deploy a high-throughput analytics pipeline that ingests events through Kafka, indexes them in OpenSearch for real-time search, and stores aggregated metrics in PostgreSQL for SQL reporting.

Architecture

Step 1: Deploy the Pipeline

curl -u $USER:$PASS -X POST https://api.foundrydb.com/pipelines \
-H "Content-Type: application/json" \
-d '{
"template_id": "event-analytics",
"name": "my-analytics",
"zone": "se-sto1"
}'

This creates:

ServiceEngineRolePlan
my-analytics-streamKafka 3.9Event streamtier-2, 100GB
my-analytics-searchOpenSearch 2Search indextier-4, 100GB
my-analytics-analyticsPostgreSQL 17Analytics storetier-2, 50GB

Step 2: Create OpenSearch Index

curl -k -u app_user:PASSWORD -X PUT "https://$OPENSEARCH_HOST:9200/events" \
-H "Content-Type: application/json" \
-d '{
"mappings": {
"properties": {
"timestamp": { "type": "date" },
"event_type": { "type": "keyword" },
"source": { "type": "keyword" },
"user_id": { "type": "keyword" },
"message": { "type": "text" },
"duration_ms": { "type": "float" },
"status_code": { "type": "integer" }
}
}
}'

Step 3: Create PostgreSQL Aggregation Tables

CREATE TABLE hourly_metrics (
hour TIMESTAMPTZ NOT NULL,
event_type TEXT NOT NULL,
source TEXT NOT NULL,
event_count BIGINT DEFAULT 0,
avg_duration_ms FLOAT,
error_count BIGINT DEFAULT 0,
PRIMARY KEY (hour, event_type, source)
);
CREATE INDEX ON hourly_metrics (hour DESC);

Step 4: Build Workers

You'll need two Kafka consumers:

  1. OpenSearch indexer: Consumes events and bulk-indexes them into OpenSearch
  2. PostgreSQL aggregator: Consumes events and updates hourly metric aggregates

Both consume from the same Kafka topic using different consumer groups.

See the full worker implementations in the MkDocs tutorial.

Step 5: Query Your Data

Search events in OpenSearch

curl -k -u app_user:PASSWORD "https://$OPENSEARCH_HOST:9200/events/_search" \
-H "Content-Type: application/json" \
-d '{"query": {"bool": {"must": [{"match": {"message": "timeout"}}, {"range": {"timestamp": {"gte": "now-1h"}}}]}}}'

SQL reports from PostgreSQL

SELECT event_type, SUM(event_count) as total,
ROUND(AVG(avg_duration_ms)::numeric, 1) as avg_ms
FROM hourly_metrics
WHERE hour >= NOW() - INTERVAL '24 hours'
GROUP BY event_type ORDER BY total DESC;

Next Steps

  • Connect OpenSearch Dashboards for visualization
  • Scale OpenSearch for higher search throughput
  • Set up Kafka topic retention policies
  • Add daily roll-up jobs

Cleanup

curl -u $USER:$PASS -X DELETE https://api.foundrydb.com/pipelines/$PIPELINE_ID