Quay lại blog
data10 phút đọc

Thiết kế Real-time ETL Pipeline: Từ Kafka đến Data Warehouse

Hướng dẫn thiết kế pipeline ETL real-time với Apache Kafka, Flink và Snowflake — xử lý hàng triệu events mỗi giây với độ trễ dưới 1 giây.

V
Bởi Ventra Rocket
·Đăng ngày 5 tháng 2, 2026
#ETL#Kafka#Real-time#Data Engineering#Snowflake

Real-time ETL pipeline cho phép doanh nghiệp ra quyết định dựa trên dữ liệu mới nhất thay vì báo cáo ngày hôm qua. Kiến trúc Lambda và Kappa là hai approach phổ biến, mỗi cái phù hợp với use case khác nhau.

1. Kiến trúc Lambda vs Kappa

Lambda Architecture — Xử lý cả batch và streaming song song:

Data Source → Kafka → Stream Processing (Flink) → Serving Layer
                    ↘ Batch Processing (Spark) ↗

Ưu điểm: Batch layer đảm bảo accuracy, stream layer đảm bảo low latency. Nhược điểm: Phải maintain hai codebase xử lý logic tương tự.

Kappa Architecture — Chỉ dùng streaming:

Data Source → Kafka (retention cao) → Stream Processing → Serving Layer

Ưu điểm: Đơn giản hơn, một codebase duy nhất. Nhược điểm: Reprocessing lịch sử tốn kém hơn.

Ventra Rocket thường chọn Kappa cho hầu hết use cases vì đơn giản hơn trong vận hành.

2. Kafka Setup cho High-Throughput Ingestion

# docker-compose.yml cho local development
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.5.0
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181

  kafka:
    image: confluentinc/cp-kafka:7.5.0
    depends_on: [zookeeper]
    ports:
      - "9092:9092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
      KAFKA_NUM_PARTITIONS: 12
      KAFKA_DEFAULT_REPLICATION_FACTOR: 1
      KAFKA_LOG_RETENTION_HOURS: 168  # 7 ngày
      KAFKA_MESSAGE_MAX_BYTES: 10485760  # 10MB

  schema-registry:
    image: confluentinc/cp-schema-registry:7.5.0
    depends_on: [kafka]
    ports:
      - "8081:8081"
    environment:
      SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: kafka:9092

Producer với Avro Schema

from confluent_kafka import Producer
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroSerializer
import json

ORDER_SCHEMA = """
{
  "type": "record",
  "name": "OrderEvent",
  "fields": [
    {"name": "order_id", "type": "string"},
    {"name": "customer_id", "type": "string"},
    {"name": "amount", "type": "double"},
    {"name": "status", "type": "string"},
    {"name": "timestamp", "type": "long", "logicalType": "timestamp-millis"}
  ]
}
"""

schema_registry = SchemaRegistryClient({"url": "http://localhost:8081"})
avro_serializer = AvroSerializer(schema_registry, ORDER_SCHEMA)

producer = Producer({"bootstrap.servers": "localhost:9092"})

def publish_order_event(order: dict):
    producer.produce(
        topic="orders.events",
        key=order["order_id"],
        value=avro_serializer(order, None),
        on_delivery=lambda err, msg: (
            print(f"Error: {err}") if err else None
        ),
    )
    producer.poll(0)
# Flink Python API (PyFlink)
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment, EnvironmentSettings

env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(4)

settings = EnvironmentSettings.new_instance().in_streaming_mode().build()
t_env = StreamTableEnvironment.create(env, settings)

# Đọc từ Kafka
t_env.execute_sql("""
    CREATE TABLE orders_kafka (
        order_id STRING,
        customer_id STRING,
        amount DOUBLE,
        status STRING,
        event_time TIMESTAMP(3),
        WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
    ) WITH (
        'connector' = 'kafka',
        'topic' = 'orders.events',
        'properties.bootstrap.servers' = 'localhost:9092',
        'format' = 'avro-confluent',
        'avro-confluent.url' = 'http://localhost:8081',
        'scan.startup.mode' = 'earliest-offset'
    )
""")

# Aggregate theo cửa sổ 1 phút
t_env.execute_sql("""
    CREATE TABLE orders_summary_sink (
        window_start TIMESTAMP(3),
        window_end TIMESTAMP(3),
        total_orders BIGINT,
        total_revenue DOUBLE,
        avg_order_value DOUBLE
    ) WITH (
        'connector' = 'jdbc',
        'url' = 'jdbc:postgresql://localhost:5432/analytics',
        'table-name' = 'orders_minute_summary',
        'driver' = 'org.postgresql.Driver'
    )
""")

t_env.execute_sql("""
    INSERT INTO orders_summary_sink
    SELECT
        TUMBLE_START(event_time, INTERVAL '1' MINUTE) AS window_start,
        TUMBLE_END(event_time, INTERVAL '1' MINUTE) AS window_end,
        COUNT(*) AS total_orders,
        SUM(amount) AS total_revenue,
        AVG(amount) AS avg_order_value
    FROM orders_kafka
    WHERE status = 'completed'
    GROUP BY TUMBLE(event_time, INTERVAL '1' MINUTE)
""")

4. Snowpipe cho Continuous Loading vào Snowflake

-- Tạo stage trỏ đến S3 bucket
CREATE STAGE orders_stage
  URL = 's3://your-bucket/orders/'
  CREDENTIALS = (AWS_KEY_ID='...' AWS_SECRET_KEY='...');

-- Tạo pipe tự động ingest file mới
CREATE PIPE orders_pipe
  AUTO_INGEST = TRUE
  AS
  COPY INTO bronze_db.orders.raw
  FROM @orders_stage
  FILE_FORMAT = (TYPE = 'PARQUET');

-- Lấy SQS ARN để configure S3 event notification
SHOW PIPES;
-- Dùng notification_channel value để setup S3 → SQS → Snowpipe

5. Monitoring Pipeline Health

# Kafka Consumer Lag monitoring
from kafka import KafkaAdminClient
from kafka.structs import TopicPartition

def get_consumer_lag(group_id: str, topic: str) -> dict:
    admin = KafkaAdminClient(bootstrap_servers="localhost:9092")
    consumer_offsets = admin.list_consumer_group_offsets(group_id)

    lag_by_partition = {}
    for tp, offset_metadata in consumer_offsets.items():
        if tp.topic == topic:
            # Lấy end offset
            end_offsets = admin.list_offsets({tp: OffsetSpec.latest()})
            end_offset = end_offsets[tp].offset
            lag = end_offset - offset_metadata.offset
            lag_by_partition[tp.partition] = lag

    return {
        "total_lag": sum(lag_by_partition.values()),
        "by_partition": lag_by_partition,
    }

Xuất metrics vào Prometheus và alert khi lag vượt ngưỡng:

# Alerting rule
- alert: KafkaConsumerLagHigh
  expr: kafka_consumer_lag_sum > 10000
  for: 5m
  labels:
    severity: warning
  annotations:
    summary: "Consumer group {{ $labels.group }} lag is {{ $value }}"

Kết luận

Real-time ETL pipeline phức tạp hơn batch ETL nhưng mang lại giá trị kinh doanh vượt trội. Kafka đảm bảo durability và throughput, Flink xử lý streaming với exactly-once semantics, Snowpipe tải dữ liệu vào warehouse liên tục. Ventra Rocket đã triển khai pipeline xử lý 5 triệu events/giờ cho khách hàng fintech với độ trễ end-to-end dưới 3 giây.

Bài viết liên quan

Thiết kế Real-time ETL Pipeline: Từ Kafka đến Data Warehouse | Ventra Rocket