Real-Time ETL Pipeline Design: From Kafka to Data Warehouse
A guide to designing real-time ETL pipelines with Apache Kafka, Flink, and Snowflake — processing millions of events per second with sub-second latency.
Real-time ETL pipelines enable businesses to make decisions based on the freshest data rather than yesterday's batch report. Lambda and Kappa architectures are the two dominant approaches, each suited to different use cases.
1. Lambda vs Kappa Architecture
Lambda Architecture — Processes both batch and streaming in parallel:
Data Source → Kafka → Stream Processing (Flink) → Serving Layer
↘ Batch Processing (Spark) ↗
Advantages: batch layer guarantees accuracy; stream layer delivers low latency. Drawbacks: two codebases maintaining the same business logic.
Kappa Architecture — Streaming only:
Data Source → Kafka (high retention) → Stream Processing → Serving Layer
Advantages: simpler, single codebase. Drawbacks: reprocessing historical data is more expensive.
Ventra Rocket typically chooses Kappa for most use cases due to simpler operations.
2. Kafka Setup for High-Throughput Ingestion
# docker-compose.yml for local development
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.5.0
environment:
ZOOKEEPER_CLIENT_PORT: 2181
kafka:
image: confluentinc/cp-kafka:7.5.0
depends_on: [zookeeper]
ports:
- "9092:9092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
KAFKA_NUM_PARTITIONS: 12
KAFKA_DEFAULT_REPLICATION_FACTOR: 1
KAFKA_LOG_RETENTION_HOURS: 168
KAFKA_MESSAGE_MAX_BYTES: 10485760
schema-registry:
image: confluentinc/cp-schema-registry:7.5.0
depends_on: [kafka]
ports:
- "8081:8081"
environment:
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: kafka:9092
Avro Producer
from confluent_kafka import Producer
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroSerializer
ORDER_SCHEMA = """
{
"type": "record",
"name": "OrderEvent",
"fields": [
{"name": "order_id", "type": "string"},
{"name": "customer_id", "type": "string"},
{"name": "amount", "type": "double"},
{"name": "status", "type": "string"},
{"name": "timestamp", "type": "long", "logicalType": "timestamp-millis"}
]
}
"""
schema_registry = SchemaRegistryClient({"url": "http://localhost:8081"})
avro_serializer = AvroSerializer(schema_registry, ORDER_SCHEMA)
producer = Producer({"bootstrap.servers": "localhost:9092"})
def publish_order_event(order: dict):
producer.produce(
topic="orders.events",
key=order["order_id"],
value=avro_serializer(order, None),
)
producer.poll(0)
3. Stream Processing with Apache Flink
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment, EnvironmentSettings
env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(4)
settings = EnvironmentSettings.new_instance().in_streaming_mode().build()
t_env = StreamTableEnvironment.create(env, settings)
t_env.execute_sql("""
CREATE TABLE orders_kafka (
order_id STRING,
customer_id STRING,
amount DOUBLE,
status STRING,
event_time TIMESTAMP(3),
WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
) WITH (
'connector' = 'kafka',
'topic' = 'orders.events',
'properties.bootstrap.servers'= 'localhost:9092',
'format' = 'avro-confluent',
'avro-confluent.url' = 'http://localhost:8081',
'scan.startup.mode' = 'earliest-offset'
)
""")
t_env.execute_sql("""
INSERT INTO orders_summary_sink
SELECT
TUMBLE_START(event_time, INTERVAL '1' MINUTE) AS window_start,
TUMBLE_END(event_time, INTERVAL '1' MINUTE) AS window_end,
COUNT(*) AS total_orders,
SUM(amount) AS total_revenue,
AVG(amount) AS avg_order_value
FROM orders_kafka
WHERE status = 'completed'
GROUP BY TUMBLE(event_time, INTERVAL '1' MINUTE)
""")
4. Snowpipe for Continuous Loading into Snowflake
-- Create a stage pointing to your S3 bucket
CREATE STAGE orders_stage
URL = 's3://your-bucket/orders/'
CREDENTIALS = (AWS_KEY_ID='...' AWS_SECRET_KEY='...');
-- Auto-ingest pipe that picks up new files automatically
CREATE PIPE orders_pipe
AUTO_INGEST = TRUE
AS
COPY INTO bronze_db.orders.raw
FROM @orders_stage
FILE_FORMAT = (TYPE = 'PARQUET');
-- Retrieve the SQS ARN to configure S3 event notification
SHOW PIPES;
5. Monitoring Pipeline Health
from kafka import KafkaAdminClient
from kafka.structs import TopicPartition
def get_consumer_lag(group_id: str, topic: str) -> dict:
admin = KafkaAdminClient(bootstrap_servers="localhost:9092")
consumer_offsets = admin.list_consumer_group_offsets(group_id)
lag_by_partition = {}
for tp, offset_metadata in consumer_offsets.items():
if tp.topic == topic:
end_offsets = admin.list_offsets({tp: OffsetSpec.latest()})
lag = end_offsets[tp].offset - offset_metadata.offset
lag_by_partition[tp.partition] = lag
return {
"total_lag": sum(lag_by_partition.values()),
"by_partition": lag_by_partition,
}
Prometheus alerting rule for consumer lag:
- alert: KafkaConsumerLagHigh
expr: kafka_consumer_lag_sum > 10000
for: 5m
labels:
severity: warning
annotations:
summary: "Consumer group {{ $labels.group }} lag is {{ $value }}"
Conclusion
Real-time ETL pipelines are more complex than batch ETL but deliver significantly greater business value. Kafka ensures durability and throughput, Flink handles streaming with exactly-once semantics, and Snowpipe continuously loads data into the warehouse. Ventra Rocket has deployed pipelines processing 5 million events per hour for fintech clients with end-to-end latency under 3 seconds.
Related Articles
PostgreSQL Performance Tuning: Indexing, Query Plans, and Configuration
Practical PostgreSQL optimization — index strategies, EXPLAIN ANALYZE interpretation, connection pooling, partitioning, and production configuration settings.
Redis Caching Patterns for Node.js Applications
Production caching — cache-aside, write-through, session management, rate limiting with sorted sets, pub/sub, and invalidation strategies.