Our Fastest Pipeline Runs Once a Day. Our Slowest Runs Every Second.
This sounds backwards. Shouldn't real-time be faster? The answer reveals a fundamental truth about data engineering: speed isn't about how often you runβit's about how much work you do per run.
Our daily batch pipeline processes 180 million records in 23 minutes. Our "real-time" stream processes 2,000 records per secondβbut each record takes 450ms of individual processing. The batch pipeline moves 130,000 records per second. The stream moves 2,000.
For AI applications, understanding this distinction determines whether your training data is hours stale or weeks stale, whether your feature store updates in time for predictions, and whether your costs are β¬5,000/month or β¬50,000/month.
ETL for AI: What's Different
Traditional ETL moves data from source to warehouse. AI pipelines have additional requirements:
Requirement 1: Feature Engineering at Scale
Raw data rarely feeds directly into models. You need derived featuresβaggregations, transformations, embeddingsβcomputed consistently across training and inference.
The training-serving skew problem: If features are computed differently during training versus serving, model performance degrades silently. A model trained on "average purchase value over 30 days" computed one way will fail when serving computes it differently.
Solution architecture:
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β Feature Pipeline β
β βββββββββββββ ββββββββββββββ ββββββββββββββββ β
β β Raw Data β βββΊ β Transform β βββΊ β Feature β β
β β Sources β β Logic β β Store β β
β βββββββββββββ ββββββββββββββ ββββββββββββββββ β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β
ββββββββββββββββββΌβββββββββββββββββ
βΌ βΌ
ββββββββββββββββ ββββββββββββββββ
β Training β β Serving β
β Pipeline β β Pipeline β
ββββββββββββββββ ββββββββββββββββ
Both training and serving read from the same feature store. The transformation logic runs once, producing features consumed by both paths.
Requirement 2: Lineage and Reproducibility
When a model's predictions go wrong, you need to trace back: What training data was used? What transformations were applied? What version of the feature logic was in effect?
Minimum lineage requirements:
- Every dataset versioned with immutable snapshots
- Transformation code versioned and linked to output datasets
- Model training linked to specific data versions
- Prediction logs linked to model version and input data
Implementation with dbt:
models/features/user_features.sql
{{
config(
materialized='incremental',
unique_key='user_id',
meta={
'owner': 'data-team',
'lineage_tracked': true,
'freshness_sla': '24 hours'
}
)
}}SELECT
user_id,
COUNT(*) as total_purchases,
AVG(purchase_amount) as avg_purchase_value,
MAX(purchase_date) as last_purchase_date
FROM {{ ref('stg_purchases') }}
WHERE purchase_date >= DATEADD(day, -30, CURRENT_DATE)
GROUP BY user_id
Requirement 3: Data Quality Enforcement
Bad data creates bad models. Unlike traditional analytics where bad data produces wrong reports, bad AI training data produces models that confidently make wrong predictions at scale.
Quality gates we enforce:
from great_expectations import expectStructural checks
expect.column_values_to_not_be_null("user_id")
expect.column_values_to_be_unique("user_id")Statistical checks
expect.column_mean_to_be_between("purchase_amount", min_value=10, max_value=1000)
expect.column_values_to_be_between("user_age", min_value=18, max_value=120)Freshness checks
expect.column_max_to_be_between(
"created_at",
min_value=datetime.now() - timedelta(hours=24),
max_value=datetime.now()
)Drift detection
expect.column_distribution_to_match_reference("user_segment", reference_dataset)
If any check fails, the pipeline halts. No silent data corruption propagating to models.
The Modern ETL Stack for AI
Layer 1: Ingestion
For batch sources (databases, files, APIs):
We use Airbyte for connectors to 300+ sources. Configuration-driven, handles schema changes gracefully, and open-source.
airbyte/connections/salesforce.yaml
sourceDefinitionId: salesforce
configuration:
client_id: ${SALESFORCE_CLIENT_ID}
client_secret: ${SALESFORCE_CLIENT_SECRET}
refresh_token: ${SALESFORCE_REFRESH_TOKEN}
streams:
- name: opportunities
sync_mode: incremental
cursor_field: last_modified_date
- name: accounts
sync_mode: full_refresh # Small table, always full sync
For streaming sources (events, logs, IoT):
Kafka for event ingestion. Debezium for change data capture from transactional databases.
Key design principle: Land raw data first, transform later. Store original payloads in a raw layer before any transformation. You can always re-transform; you can't recover data you never stored.
Layer 2: Storage
The medallion architecture:
Bronze (Raw) β Silver (Cleaned) β Gold (Features)
β β β
β Original format β Validated, typed β Model-ready
β Immutable snapshots β Deduplicated β Aggregated
β Full history β Standard schemas β Feature store
Technology choices:
- Bronze: Parquet files on S3/GCS, partitioned by ingestion date
- Silver: Delta Lake tables with ACID transactions and time travel
- Gold: Feature store (Feast, Tecton, or custom) with online/offline serving
Why Delta Lake for Silver:
Time travel for reproducibility
df = spark.read.format("delta").option("versionAsOf", 42).load("/silver/users")Schema enforcement
spark.read.format("delta").load("/silver/users").write \
.format("delta") \
.option("mergeSchema", "false") \ # Fail on schema mismatch
.mode("append") \
.save("/silver/users")ACID transactions for concurrent writes
delta_table.merge(
updates_df,
"target.user_id = source.user_id"
).whenMatchedUpdateAll().whenNotMatchedInsertAll().execute()
Layer 3: Transformation
dbt for SQL transformations:
Our data team thinks in SQL. dbt lets them define transformations declaratively while the platform handles execution, testing, and documentation.
-- models/gold/user_360.sql
WITH purchase_stats AS (
SELECT
user_id,
COUNT(*) as purchase_count,
SUM(amount) as total_spent,
AVG(amount) as avg_order_value
FROM {{ ref('silver_purchases') }}
GROUP BY user_id
),engagement_stats AS (
SELECT
user_id,
COUNT(*) as login_count,
MAX(login_time) as last_login
FROM {{ ref('silver_logins') }}
WHERE login_time >= DATEADD(day, -90, CURRENT_DATE)
GROUP BY user_id
)
SELECT
u.user_id,
u.email,
u.signup_date,
p.purchase_count,
p.total_spent,
p.avg_order_value,
e.login_count,
e.last_login,
DATEDIFF(day, u.signup_date, CURRENT_DATE) as days_since_signup
FROM {{ ref('silver_users') }} u
LEFT JOIN purchase_stats p ON u.user_id = p.user_id
LEFT JOIN engagement_stats e ON u.user_id = e.user_id
Spark for complex transformations:
When SQL isn't enoughβML feature engineering, complex aggregations, or processing unstructured dataβwe use PySpark.
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.sql import functions as FFeature engineering
df = df.withColumn(
"recency_score",
F.datediff(F.current_date(), F.col("last_purchase_date"))
)Embedding generation (calling external model)
def generate_embedding(text):
# Call embedding API
return embedding_vectorembedding_udf = F.udf(generate_embedding, ArrayType(FloatType()))
df = df.withColumn("description_embedding", embedding_udf(F.col("product_description")))
Layer 4: Orchestration
Apache Airflow remains the standard, despite its quirks. Key patterns:
Sensor-based triggers:
wait_for_upstream = S3KeySensor(
task_id='wait_for_raw_data',
bucket_name='data-lake',
bucket_key='raw/purchases/dt={{ ds }}/_SUCCESS',
timeout=3600 # 1 hour
)
Dynamic DAG generation:
Generate one DAG per data source
for source in ['salesforce', 'hubspot', 'postgres_production']:
dag = DAG(
dag_id=f'ingest_{source}',
schedule_interval='@hourly',
default_args=default_args
) with dag:
extract = AirbyteOperator(connection_id=source)
validate = GreatExpectationsOperator(suite=f'{source}_validation')
transform = DbtRunOperator(models=f'+stg_{source}')
extract >> validate >> transform
Backfill handling:
@task
def process_partition(ds):
# ds = execution_date in YYYY-MM-DD format
# Same code handles both daily runs and backfills
input_path = f"s3://raw/events/dt={ds}/"
output_path = f"s3://silver/events/dt={ds}/"
transform(input_path, output_path)
Performance Optimization Techniques
Technique 1: Partition Pruning
The single most impactful optimization. Partition data by query patterns.
Before (full scan):
SELECT * FROM events WHERE event_date = '2024-12-10'
-- Scans 2TB of data
After (partition pruning):
-- Data partitioned by dt=YYYY-MM-DD
SELECT * FROM events WHERE dt = '2024-12-10'
-- Scans 50GB of data (partition for that day only)
Technique 2: Columnar Storage with Compression
Parquet with ZSTD compression typically achieves 10:1 compression ratios.
Optimal Parquet configuration:
df.write.format("parquet") \
.option("compression", "zstd") \
.option("parquet.block.size", 128 1024 1024) \ # 128MB row groups
.partitionBy("dt") \
.save("s3://silver/events/")
Technique 3: Incremental Processing
Don't reprocess what hasn't changed.
Incremental pattern:
{% if is_incremental() %}
SELECT * FROM source_table
WHERE updated_at > (SELECT MAX(updated_at) FROM {{ this }})
{% else %}
SELECT * FROM source_table
{% endif %}
Technique 4: Predicate Pushdown
Push filters to the data source, not the processing engine.
Bad: Load all data, then filter
df = spark.read.parquet("s3://data/events/")
filtered = df.filter(F.col("event_type") == "purchase")Good: Filter at read time
df = spark.read.parquet("s3://data/events/").filter(F.col("event_type") == "purchase")Spark pushes this filter to the Parquet reader
Monitoring and Alerting
Essential pipeline metrics:
- Freshness: Time since last successful run
- Completeness: Row counts vs. expected
- Latency: End-to-end processing time
- Cost: Compute and storage per pipeline
Alerting rules:
- alert: DataFreshnessSLA
expr: time() - data_freshness_timestamp > 86400 # 24 hours
labels:
severity: critical
annotations:
summary: "Data freshness SLA violated for {{ $labels.table }}"- alert: UnexpectedRowCountDrop
expr: (yesterday_row_count - today_row_count) / yesterday_row_count > 0.2
labels:
severity: warning
annotations:
summary: "Row count dropped >20% for {{ $labels.table }}"
The best ETL pipelines are the ones nobody thinks about. They run reliably, scale automatically, and surface problems before users notice. This requires upfront investment in observability, idempotency, and testingβbut that investment pays dividends every time you don't get paged at 3 AM.