Share
Data Science

Building Efficient ETL Pipelines for AI Applications

Building Efficient ETL Pipelines for AI Applications

Our Fastest Pipeline Runs Once a Day. Our Slowest Runs Every Second.

This sounds backwards. Shouldn't real-time be faster? The answer reveals a fundamental truth about data engineering: speed isn't about how often you runβ€”it's about how much work you do per run.

Our daily batch pipeline processes 180 million records in 23 minutes. Our "real-time" stream processes 2,000 records per secondβ€”but each record takes 450ms of individual processing. The batch pipeline moves 130,000 records per second. The stream moves 2,000.

For AI applications, understanding this distinction determines whether your training data is hours stale or weeks stale, whether your feature store updates in time for predictions, and whether your costs are €5,000/month or €50,000/month.

ETL for AI: What's Different

Traditional ETL moves data from source to warehouse. AI pipelines have additional requirements:

Requirement 1: Feature Engineering at Scale

Raw data rarely feeds directly into models. You need derived featuresβ€”aggregations, transformations, embeddingsβ€”computed consistently across training and inference.

The training-serving skew problem: If features are computed differently during training versus serving, model performance degrades silently. A model trained on "average purchase value over 30 days" computed one way will fail when serving computes it differently.

Solution architecture:

β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚ Feature Pipeline β”‚
β”‚ β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”‚
β”‚ β”‚ Raw Data β”‚ ──► β”‚ Transform β”‚ ──► β”‚ Feature β”‚ β”‚
β”‚ β”‚ Sources β”‚ β”‚ Logic β”‚ β”‚ Store β”‚ β”‚
β”‚ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
β”‚
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β–Ό β–Ό
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚ Training β”‚ β”‚ Serving β”‚
β”‚ Pipeline β”‚ β”‚ Pipeline β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

Both training and serving read from the same feature store. The transformation logic runs once, producing features consumed by both paths.

Requirement 2: Lineage and Reproducibility

When a model's predictions go wrong, you need to trace back: What training data was used? What transformations were applied? What version of the feature logic was in effect?

Minimum lineage requirements:

  • Every dataset versioned with immutable snapshots

  • Transformation code versioned and linked to output datasets

  • Model training linked to specific data versions

  • Prediction logs linked to model version and input data

Implementation with dbt:

models/features/user_features.sql

{{
config(
materialized='incremental',
unique_key='user_id',
meta={
'owner': 'data-team',
'lineage_tracked': true,
'freshness_sla': '24 hours'
}
)
}}

SELECT
user_id,
COUNT(*) as total_purchases,
AVG(purchase_amount) as avg_purchase_value,
MAX(purchase_date) as last_purchase_date
FROM {{ ref('stg_purchases') }}
WHERE purchase_date >= DATEADD(day, -30, CURRENT_DATE)
GROUP BY user_id

Requirement 3: Data Quality Enforcement

Bad data creates bad models. Unlike traditional analytics where bad data produces wrong reports, bad AI training data produces models that confidently make wrong predictions at scale.

Quality gates we enforce:

from great_expectations import expect

Structural checks

expect.column_values_to_not_be_null("user_id")
expect.column_values_to_be_unique("user_id")

Statistical checks

expect.column_mean_to_be_between("purchase_amount", min_value=10, max_value=1000)
expect.column_values_to_be_between("user_age", min_value=18, max_value=120)

Freshness checks

expect.column_max_to_be_between(
"created_at",
min_value=datetime.now() - timedelta(hours=24),
max_value=datetime.now()
)

Drift detection

expect.column_distribution_to_match_reference("user_segment", reference_dataset)

If any check fails, the pipeline halts. No silent data corruption propagating to models.

The Modern ETL Stack for AI

Layer 1: Ingestion

For batch sources (databases, files, APIs):
We use Airbyte for connectors to 300+ sources. Configuration-driven, handles schema changes gracefully, and open-source.

airbyte/connections/salesforce.yaml

sourceDefinitionId: salesforce
configuration:
client_id: ${SALESFORCE_CLIENT_ID}
client_secret: ${SALESFORCE_CLIENT_SECRET}
refresh_token: ${SALESFORCE_REFRESH_TOKEN}
streams:
- name: opportunities
sync_mode: incremental
cursor_field: last_modified_date
- name: accounts
sync_mode: full_refresh # Small table, always full sync

For streaming sources (events, logs, IoT):
Kafka for event ingestion. Debezium for change data capture from transactional databases.

Key design principle: Land raw data first, transform later. Store original payloads in a raw layer before any transformation. You can always re-transform; you can't recover data you never stored.

Layer 2: Storage

The medallion architecture:

Bronze (Raw)    β†’    Silver (Cleaned)    β†’    Gold (Features)
β”‚ β”‚ β”‚
β”‚ Original format β”‚ Validated, typed β”‚ Model-ready
β”‚ Immutable snapshots β”‚ Deduplicated β”‚ Aggregated
β”‚ Full history β”‚ Standard schemas β”‚ Feature store

Technology choices:

  • Bronze: Parquet files on S3/GCS, partitioned by ingestion date

  • Silver: Delta Lake tables with ACID transactions and time travel

  • Gold: Feature store (Feast, Tecton, or custom) with online/offline serving

Why Delta Lake for Silver:

Time travel for reproducibility

df = spark.read.format("delta").option("versionAsOf", 42).load("/silver/users")

Schema enforcement

spark.read.format("delta").load("/silver/users").write \
.format("delta") \
.option("mergeSchema", "false") \ # Fail on schema mismatch
.mode("append") \
.save("/silver/users")

ACID transactions for concurrent writes

delta_table.merge(
updates_df,
"target.user_id = source.user_id"
).whenMatchedUpdateAll().whenNotMatchedInsertAll().execute()

Layer 3: Transformation

dbt for SQL transformations:
Our data team thinks in SQL. dbt lets them define transformations declaratively while the platform handles execution, testing, and documentation.

-- models/gold/user_360.sql
WITH purchase_stats AS (
SELECT
user_id,
COUNT(*) as purchase_count,
SUM(amount) as total_spent,
AVG(amount) as avg_order_value
FROM {{ ref('silver_purchases') }}
GROUP BY user_id
),

engagement_stats AS (
SELECT
user_id,
COUNT(*) as login_count,
MAX(login_time) as last_login
FROM {{ ref('silver_logins') }}
WHERE login_time >= DATEADD(day, -90, CURRENT_DATE)
GROUP BY user_id
)

SELECT
u.user_id,
u.email,
u.signup_date,
p.purchase_count,
p.total_spent,
p.avg_order_value,
e.login_count,
e.last_login,
DATEDIFF(day, u.signup_date, CURRENT_DATE) as days_since_signup
FROM {{ ref('silver_users') }} u
LEFT JOIN purchase_stats p ON u.user_id = p.user_id
LEFT JOIN engagement_stats e ON u.user_id = e.user_id

Spark for complex transformations:
When SQL isn't enoughβ€”ML feature engineering, complex aggregations, or processing unstructured dataβ€”we use PySpark.

from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.sql import functions as F

Feature engineering

df = df.withColumn(
"recency_score",
F.datediff(F.current_date(), F.col("last_purchase_date"))
)

Embedding generation (calling external model)

def generate_embedding(text):
# Call embedding API
return embedding_vector

embedding_udf = F.udf(generate_embedding, ArrayType(FloatType()))
df = df.withColumn("description_embedding", embedding_udf(F.col("product_description")))

Layer 4: Orchestration

Apache Airflow remains the standard, despite its quirks. Key patterns:

Sensor-based triggers:

wait_for_upstream = S3KeySensor(
task_id='wait_for_raw_data',
bucket_name='data-lake',
bucket_key='raw/purchases/dt={{ ds }}/_SUCCESS',
timeout=3600 # 1 hour
)

Dynamic DAG generation:

Generate one DAG per data source

for source in ['salesforce', 'hubspot', 'postgres_production']:
dag = DAG(
dag_id=f'ingest_{source}',
schedule_interval='@hourly',
default_args=default_args
)

with dag:
extract = AirbyteOperator(connection_id=source)
validate = GreatExpectationsOperator(suite=f'{source}_validation')
transform = DbtRunOperator(models=f'+stg_{source}')

extract >> validate >> transform

Backfill handling:

@task
def process_partition(ds):
# ds = execution_date in YYYY-MM-DD format
# Same code handles both daily runs and backfills
input_path = f"s3://raw/events/dt={ds}/"
output_path = f"s3://silver/events/dt={ds}/"
transform(input_path, output_path)

Performance Optimization Techniques

Technique 1: Partition Pruning

The single most impactful optimization. Partition data by query patterns.

Before (full scan):

SELECT * FROM events WHERE event_date = '2024-12-10'
-- Scans 2TB of data

After (partition pruning):

-- Data partitioned by dt=YYYY-MM-DD
SELECT * FROM events WHERE dt = '2024-12-10'
-- Scans 50GB of data (partition for that day only)

Technique 2: Columnar Storage with Compression

Parquet with ZSTD compression typically achieves 10:1 compression ratios.

Optimal Parquet configuration:

df.write.format("parquet") \
.option("compression", "zstd") \
.option("parquet.block.size", 128 1024 1024) \ # 128MB row groups
.partitionBy("dt") \
.save("s3://silver/events/")

Technique 3: Incremental Processing

Don't reprocess what hasn't changed.

Incremental pattern:

{% if is_incremental() %}
SELECT * FROM source_table
WHERE updated_at > (SELECT MAX(updated_at) FROM {{ this }})
{% else %}
SELECT * FROM source_table
{% endif %}

Technique 4: Predicate Pushdown

Push filters to the data source, not the processing engine.

Bad: Load all data, then filter

df = spark.read.parquet("s3://data/events/")
filtered = df.filter(F.col("event_type") == "purchase")

Good: Filter at read time

df = spark.read.parquet("s3://data/events/").filter(F.col("event_type") == "purchase")

Spark pushes this filter to the Parquet reader

Monitoring and Alerting

Essential pipeline metrics:

  • Freshness: Time since last successful run

  • Completeness: Row counts vs. expected

  • Latency: End-to-end processing time

  • Cost: Compute and storage per pipeline

Alerting rules:

- alert: DataFreshnessSLA
expr: time() - data_freshness_timestamp > 86400 # 24 hours
labels:
severity: critical
annotations:
summary: "Data freshness SLA violated for {{ $labels.table }}"

  • alert: UnexpectedRowCountDrop

expr: (yesterday_row_count - today_row_count) / yesterday_row_count > 0.2
labels:
severity: warning
annotations:
summary: "Row count dropped >20% for {{ $labels.table }}"

The best ETL pipelines are the ones nobody thinks about. They run reliably, scale automatically, and surface problems before users notice. This requires upfront investment in observability, idempotency, and testingβ€”but that investment pays dividends every time you don't get paged at 3 AM.

JoΓ£o Mendes

About the Author

JoΓ£o Mendes

Co-founder of AIOBI. Data & AI Engineer with experience in data infrastructure, intelligent products, and scalable solutions.