Back to Blog
data10 min read

Real-Time ETL Pipeline Design: From Kafka to Data Warehouse

A guide to designing real-time ETL pipelines with Apache Kafka, Flink, and Snowflake — processing millions of events per second with sub-second latency.

V
By Ventra Rocket
·Published on 5 February 2026
#ETL#Kafka#Real-time#Data Engineering#Snowflake

Real-time ETL pipelines enable businesses to make decisions based on the freshest data rather than yesterday's batch report. Lambda and Kappa architectures are the two dominant approaches, each suited to different use cases.

1. Lambda vs Kappa Architecture

Lambda Architecture — Processes both batch and streaming in parallel:

Data Source → Kafka → Stream Processing (Flink) → Serving Layer
                    ↘ Batch Processing (Spark)  ↗

Advantages: batch layer guarantees accuracy; stream layer delivers low latency. Drawbacks: two codebases maintaining the same business logic.

Kappa Architecture — Streaming only:

Data Source → Kafka (high retention) → Stream Processing → Serving Layer

Advantages: simpler, single codebase. Drawbacks: reprocessing historical data is more expensive.

Ventra Rocket typically chooses Kappa for most use cases due to simpler operations.

2. Kafka Setup for High-Throughput Ingestion

# docker-compose.yml for local development
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.5.0
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181

  kafka:
    image: confluentinc/cp-kafka:7.5.0
    depends_on: [zookeeper]
    ports:
      - "9092:9092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
      KAFKA_NUM_PARTITIONS: 12
      KAFKA_DEFAULT_REPLICATION_FACTOR: 1
      KAFKA_LOG_RETENTION_HOURS: 168
      KAFKA_MESSAGE_MAX_BYTES: 10485760

  schema-registry:
    image: confluentinc/cp-schema-registry:7.5.0
    depends_on: [kafka]
    ports:
      - "8081:8081"
    environment:
      SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: kafka:9092

Avro Producer

from confluent_kafka import Producer
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroSerializer

ORDER_SCHEMA = """
{
  "type": "record",
  "name": "OrderEvent",
  "fields": [
    {"name": "order_id",    "type": "string"},
    {"name": "customer_id", "type": "string"},
    {"name": "amount",      "type": "double"},
    {"name": "status",      "type": "string"},
    {"name": "timestamp",   "type": "long", "logicalType": "timestamp-millis"}
  ]
}
"""

schema_registry = SchemaRegistryClient({"url": "http://localhost:8081"})
avro_serializer  = AvroSerializer(schema_registry, ORDER_SCHEMA)
producer         = Producer({"bootstrap.servers": "localhost:9092"})

def publish_order_event(order: dict):
    producer.produce(
        topic="orders.events",
        key=order["order_id"],
        value=avro_serializer(order, None),
    )
    producer.poll(0)
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment, EnvironmentSettings

env      = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(4)
settings = EnvironmentSettings.new_instance().in_streaming_mode().build()
t_env    = StreamTableEnvironment.create(env, settings)

t_env.execute_sql("""
    CREATE TABLE orders_kafka (
        order_id    STRING,
        customer_id STRING,
        amount      DOUBLE,
        status      STRING,
        event_time  TIMESTAMP(3),
        WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
    ) WITH (
        'connector'                   = 'kafka',
        'topic'                       = 'orders.events',
        'properties.bootstrap.servers'= 'localhost:9092',
        'format'                      = 'avro-confluent',
        'avro-confluent.url'          = 'http://localhost:8081',
        'scan.startup.mode'           = 'earliest-offset'
    )
""")

t_env.execute_sql("""
    INSERT INTO orders_summary_sink
    SELECT
        TUMBLE_START(event_time, INTERVAL '1' MINUTE) AS window_start,
        TUMBLE_END(event_time,   INTERVAL '1' MINUTE) AS window_end,
        COUNT(*)   AS total_orders,
        SUM(amount) AS total_revenue,
        AVG(amount) AS avg_order_value
    FROM orders_kafka
    WHERE status = 'completed'
    GROUP BY TUMBLE(event_time, INTERVAL '1' MINUTE)
""")

4. Snowpipe for Continuous Loading into Snowflake

-- Create a stage pointing to your S3 bucket
CREATE STAGE orders_stage
  URL = 's3://your-bucket/orders/'
  CREDENTIALS = (AWS_KEY_ID='...' AWS_SECRET_KEY='...');

-- Auto-ingest pipe that picks up new files automatically
CREATE PIPE orders_pipe
  AUTO_INGEST = TRUE
AS
  COPY INTO bronze_db.orders.raw
  FROM @orders_stage
  FILE_FORMAT = (TYPE = 'PARQUET');

-- Retrieve the SQS ARN to configure S3 event notification
SHOW PIPES;

5. Monitoring Pipeline Health

from kafka import KafkaAdminClient
from kafka.structs import TopicPartition

def get_consumer_lag(group_id: str, topic: str) -> dict:
    admin = KafkaAdminClient(bootstrap_servers="localhost:9092")
    consumer_offsets = admin.list_consumer_group_offsets(group_id)
    lag_by_partition = {}

    for tp, offset_metadata in consumer_offsets.items():
        if tp.topic == topic:
            end_offsets = admin.list_offsets({tp: OffsetSpec.latest()})
            lag = end_offsets[tp].offset - offset_metadata.offset
            lag_by_partition[tp.partition] = lag

    return {
        "total_lag": sum(lag_by_partition.values()),
        "by_partition": lag_by_partition,
    }

Prometheus alerting rule for consumer lag:

- alert: KafkaConsumerLagHigh
  expr: kafka_consumer_lag_sum > 10000
  for: 5m
  labels:
    severity: warning
  annotations:
    summary: "Consumer group {{ $labels.group }} lag is {{ $value }}"

Conclusion

Real-time ETL pipelines are more complex than batch ETL but deliver significantly greater business value. Kafka ensures durability and throughput, Flink handles streaming with exactly-once semantics, and Snowpipe continuously loads data into the warehouse. Ventra Rocket has deployed pipelines processing 5 million events per hour for fintech clients with end-to-end latency under 3 seconds.

Related Articles

Real-Time ETL Pipeline Design: From Kafka to Data Warehouse | Ventra Rocket