Build an Event-driven Analytics Stack
Deploy a high-throughput analytics pipeline that ingests events through Kafka, indexes them in OpenSearch for real-time search, and stores aggregated metrics in PostgreSQL for SQL reporting.
Architecture
Step 1: Deploy the Pipeline
curl -u $USER:$PASS -X POST https://api.foundrydb.com/pipelines \
-H "Content-Type: application/json" \
-d '{
"template_id": "event-analytics",
"name": "my-analytics",
"zone": "se-sto1"
}'
This creates:
| Service | Engine | Role | Plan |
|---|---|---|---|
my-analytics-stream | Kafka 3.9 | Event stream | tier-2, 100GB |
my-analytics-search | OpenSearch 2 | Search index | tier-4, 100GB |
my-analytics-analytics | PostgreSQL 17 | Analytics store | tier-2, 50GB |
Step 2: Create OpenSearch Index
curl -k -u app_user:PASSWORD -X PUT "https://$OPENSEARCH_HOST:9200/events" \
-H "Content-Type: application/json" \
-d '{
"mappings": {
"properties": {
"timestamp": { "type": "date" },
"event_type": { "type": "keyword" },
"source": { "type": "keyword" },
"user_id": { "type": "keyword" },
"message": { "type": "text" },
"duration_ms": { "type": "float" },
"status_code": { "type": "integer" }
}
}
}'
Step 3: Create PostgreSQL Aggregation Tables
CREATE TABLE hourly_metrics (
hour TIMESTAMPTZ NOT NULL,
event_type TEXT NOT NULL,
source TEXT NOT NULL,
event_count BIGINT DEFAULT 0,
avg_duration_ms FLOAT,
error_count BIGINT DEFAULT 0,
PRIMARY KEY (hour, event_type, source)
);
CREATE INDEX ON hourly_metrics (hour DESC);
Step 4: Build Workers
You'll need two Kafka consumers:
- OpenSearch indexer: Consumes events and bulk-indexes them into OpenSearch
- PostgreSQL aggregator: Consumes events and updates hourly metric aggregates
Both consume from the same Kafka topic using different consumer groups.
See the full worker implementations in the MkDocs tutorial.
Step 5: Query Your Data
Search events in OpenSearch
curl -k -u app_user:PASSWORD "https://$OPENSEARCH_HOST:9200/events/_search" \
-H "Content-Type: application/json" \
-d '{"query": {"bool": {"must": [{"match": {"message": "timeout"}}, {"range": {"timestamp": {"gte": "now-1h"}}}]}}}'
SQL reports from PostgreSQL
SELECT event_type, SUM(event_count) as total,
ROUND(AVG(avg_duration_ms)::numeric, 1) as avg_ms
FROM hourly_metrics
WHERE hour >= NOW() - INTERVAL '24 hours'
GROUP BY event_type ORDER BY total DESC;
Next Steps
- Connect OpenSearch Dashboards for visualization
- Scale OpenSearch for higher search throughput
- Set up Kafka topic retention policies
- Add daily roll-up jobs
Cleanup
curl -u $USER:$PASS -X DELETE https://api.foundrydb.com/pipelines/$PIPELINE_ID