O Nosso Pipeline Mais Rápido Corre Uma Vez por Dia. O Mais Lento Corre a Cada Segundo.
Isto parece ao contrário. O real-time não deveria ser mais rápido? A resposta revela uma verdade fundamental sobre engenharia de dados: velocidade não é sobre quantas vezes corres—é sobre quanto trabalho fazes por corrida.
O nosso pipeline batch diário processa 180 milhões de registos em 23 minutos. O nosso stream "real-time" processa 2.000 registos por segundo—mas cada registo leva 450ms de processamento individual. O pipeline batch move 130.000 registos por segundo. O stream move 2.000.
Para aplicações IA, compreender esta distinção determina se os teus dados de treino têm horas de atraso ou semanas, se a tua feature store atualiza a tempo para previsões, e se os teus custos são €5.000/mês ou €50.000/mês.
ETL para IA: O Que é Diferente
ETL tradicional move dados da fonte para o warehouse. Pipelines IA têm requisitos adicionais:
Requisito 1: Feature Engineering em Escala
Dados brutos raramente alimentam modelos diretamente. Precisas de features derivadas—agregações, transformações, embeddings—calculadas consistentemente entre treino e inferência.
O problema de training-serving skew: Se features são calculadas diferentemente durante treino versus serving, a performance do modelo degrada silenciosamente. Um modelo treinado em "valor médio de compra nos últimos 30 dias" calculado de uma forma vai falhar quando o serving calcula diferentemente.
Arquitetura de solução:
┌─────────────────────────────────────────────────────────┐
│ Feature Pipeline │
│ ┌───────────┐ ┌────────────┐ ┌──────────────┐ │
│ │ Raw Data │ ──► │ Transform │ ──► │ Feature │ │
│ │ Sources │ │ Logic │ │ Store │ │
│ └───────────┘ └────────────┘ └──────────────┘ │
└─────────────────────────────────────────────────────────┘
│
┌────────────────┼────────────────┐
▼ ▼
┌──────────────┐ ┌──────────────┐
│ Training │ │ Serving │
│ Pipeline │ │ Pipeline │
└──────────────┘ └──────────────┘
Tanto treino como serving leem da mesma feature store. A lógica de transformação corre uma vez, produzindo features consumidas por ambos os caminhos.
Requisito 2: Lineage e Reprodutibilidade
Quando as previsões de um modelo correm mal, precisas de rastrear: Que dados de treino foram usados? Que transformações foram aplicadas? Que versão da lógica de features estava em vigor?
Requisitos mínimos de lineage:
- Cada dataset versionado com snapshots imutáveis
- Código de transformação versionado e ligado a datasets de output
- Treino de modelo ligado a versões específicas de dados
- Logs de previsão ligados a versão do modelo e dados de input
Implementação com dbt:
models/features/user_features.sql
{{
config(
materialized='incremental',
unique_key='user_id',
meta={
'owner': 'data-team',
'lineage_tracked': true,
'freshness_sla': '24 hours'
}
)
}}SELECT
user_id,
COUNT(*) as total_purchases,
AVG(purchase_amount) as avg_purchase_value,
MAX(purchase_date) as last_purchase_date
FROM {{ ref('stg_purchases') }}
WHERE purchase_date >= DATEADD(day, -30, CURRENT_DATE)
GROUP BY user_id
Requisito 3: Enforcement de Qualidade de Dados
Dados maus criam modelos maus. Ao contrário de analytics tradicional onde dados maus produzem relatórios errados, dados de treino IA maus produzem modelos que confiantemente fazem previsões erradas em escala.
Gates de qualidade que enforçamos:
from great_expectations import expectVerificações estruturais
expect.column_values_to_not_be_null("user_id")
expect.column_values_to_be_unique("user_id")Verificações estatísticas
expect.column_mean_to_be_between("purchase_amount", min_value=10, max_value=1000)
expect.column_values_to_be_between("user_age", min_value=18, max_value=120)Verificações de freshness
expect.column_max_to_be_between(
"created_at",
min_value=datetime.now() - timedelta(hours=24),
max_value=datetime.now()
)Deteção de drift
expect.column_distribution_to_match_reference("user_segment", reference_dataset)
Se qualquer verificação falhar, o pipeline para. Sem corrupção silenciosa de dados a propagar para modelos.
A Stack ETL Moderna para IA
Camada 1: Ingestão
Para fontes batch (bases de dados, ficheiros, APIs):
Usamos Airbyte para conectores a 300+ fontes. Orientado por configuração, trata mudanças de schema graciosamente, e open-source.
airbyte/connections/salesforce.yaml
sourceDefinitionId: salesforce
configuration:
client_id: ${SALESFORCE_CLIENT_ID}
client_secret: ${SALESFORCE_CLIENT_SECRET}
refresh_token: ${SALESFORCE_REFRESH_TOKEN}
streams:
- name: opportunities
sync_mode: incremental
cursor_field: last_modified_date
- name: accounts
sync_mode: full_refresh # Tabela pequena, sempre full sync
Para fontes streaming (eventos, logs, IoT):
Kafka para ingestão de eventos. Debezium para change data capture de bases de dados transacionais.
Princípio chave de design: Aterrar dados raw primeiro, transformar depois. Armazenar payloads originais numa camada raw antes de qualquer transformação. Podes sempre re-transformar; não consegues recuperar dados que nunca armazenaste.
Camada 2: Armazenamento
A arquitetura medallion:
Bronze (Raw) → Silver (Limpo) → Gold (Features)
│ │ │
│ Formato original │ Validado, tipado │ Pronto para modelo
│ Snapshots imutáveis │ Deduplicado │ Agregado
│ Histórico completo │ Schemas standard │ Feature store
Escolhas tecnológicas:
- Bronze: Ficheiros Parquet no S3/GCS, particionados por data de ingestão
- Silver: Tabelas Delta Lake com transações ACID e time travel
- Gold: Feature store (Feast, Tecton, ou custom) com serving online/offline
Porquê Delta Lake para Silver:
Time travel para reprodutibilidade
df = spark.read.format("delta").option("versionAsOf", 42).load("/silver/users")Schema enforcement
spark.read.format("delta").load("/silver/users").write \
.format("delta") \
.option("mergeSchema", "false") \ # Falhar em schema mismatch
.mode("append") \
.save("/silver/users")Transações ACID para escritas concorrentes
delta_table.merge(
updates_df,
"target.user_id = source.user_id"
).whenMatchedUpdateAll().whenNotMatchedInsertAll().execute()
Camada 3: Transformação
dbt para transformações SQL:
A nossa equipa de dados pensa em SQL. dbt permite-lhes definir transformações declarativamente enquanto a plataforma trata de execução, teste e documentação.
-- models/gold/user_360.sql
WITH purchase_stats AS (
SELECT
user_id,
COUNT(*) as purchase_count,
SUM(amount) as total_spent,
AVG(amount) as avg_order_value
FROM {{ ref('silver_purchases') }}
GROUP BY user_id
),engagement_stats AS (
SELECT
user_id,
COUNT(*) as login_count,
MAX(login_time) as last_login
FROM {{ ref('silver_logins') }}
WHERE login_time >= DATEADD(day, -90, CURRENT_DATE)
GROUP BY user_id
)
SELECT
u.user_id,
u.email,
u.signup_date,
p.purchase_count,
p.total_spent,
p.avg_order_value,
e.login_count,
e.last_login,
DATEDIFF(day, u.signup_date, CURRENT_DATE) as days_since_signup
FROM {{ ref('silver_users') }} u
LEFT JOIN purchase_stats p ON u.user_id = p.user_id
LEFT JOIN engagement_stats e ON u.user_id = e.user_id
Spark para transformações complexas:
Quando SQL não é suficiente—feature engineering ML, agregações complexas, ou processar dados não estruturados—usamos PySpark.
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.sql import functions as FFeature engineering
df = df.withColumn(
"recency_score",
F.datediff(F.current_date(), F.col("last_purchase_date"))
)Geração de embedding (chamando modelo externo)
def generate_embedding(text):
# Chamar API de embedding
return embedding_vectorembedding_udf = F.udf(generate_embedding, ArrayType(FloatType()))
df = df.withColumn("description_embedding", embedding_udf(F.col("product_description")))
Camada 4: Orquestração
Apache Airflow permanece o standard, apesar das suas peculiaridades. Padrões chave:
Triggers baseados em sensor:
wait_for_upstream = S3KeySensor(
task_id='wait_for_raw_data',
bucket_name='data-lake',
bucket_key='raw/purchases/dt={{ ds }}/_SUCCESS',
timeout=3600 # 1 hora
)
Geração dinâmica de DAG:
Gerar um DAG por fonte de dados
for source in ['salesforce', 'hubspot', 'postgres_production']:
dag = DAG(
dag_id=f'ingest_{source}',
schedule_interval='@hourly',
default_args=default_args
) with dag:
extract = AirbyteOperator(connection_id=source)
validate = GreatExpectationsOperator(suite=f'{source}_validation')
transform = DbtRunOperator(models=f'+stg_{source}')
extract >> validate >> transform
Tratamento de backfill:
@task
def process_partition(ds):
# ds = execution_date em formato YYYY-MM-DD
# Mesmo código trata corridas diárias e backfills
input_path = f"s3://raw/events/dt={ds}/"
output_path = f"s3://silver/events/dt={ds}/"
transform(input_path, output_path)
Técnicas de Otimização de Performance
Técnica 1: Partition Pruning
A otimização mais impactante. Particionar dados por padrões de query.
Antes (full scan):
SELECT * FROM events WHERE event_date = '2024-12-10'
-- Scaneia 2TB de dados
Depois (partition pruning):
-- Dados particionados por dt=YYYY-MM-DD
SELECT * FROM events WHERE dt = '2024-12-10'
-- Scaneia 50GB de dados (partição apenas desse dia)
Técnica 2: Armazenamento Colunar com Compressão
Parquet com compressão ZSTD tipicamente atinge rácios de compressão de 10:1.
Configuração Parquet ótima:
df.write.format("parquet") \
.option("compression", "zstd") \
.option("parquet.block.size", 128 1024 1024) \ # Row groups de 128MB
.partitionBy("dt") \
.save("s3://silver/events/")
Técnica 3: Processamento Incremental
Não reprocessar o que não mudou.
Padrão incremental:
{% if is_incremental() %}
SELECT * FROM source_table
WHERE updated_at > (SELECT MAX(updated_at) FROM {{ this }})
{% else %}
SELECT * FROM source_table
{% endif %}
Técnica 4: Predicate Pushdown
Empurrar filtros para a fonte de dados, não para o motor de processamento.
Mau: Carregar todos os dados, depois filtrar
df = spark.read.parquet("s3://data/events/")
filtered = df.filter(F.col("event_type") == "purchase")Bom: Filtrar no momento de leitura
df = spark.read.parquet("s3://data/events/").filter(F.col("event_type") == "purchase")Spark empurra este filtro para o leitor Parquet
Monitorização e Alertas
Métricas essenciais de pipeline:
- Freshness: Tempo desde última corrida bem-sucedida
- Completude: Contagens de linhas vs. esperado
- Latência: Tempo de processamento end-to-end
- Custo: Compute e storage por pipeline
Regras de alerta:
- alert: DataFreshnessSLA
expr: time() - data_freshness_timestamp > 86400 # 24 horas
labels:
severity: critical
annotations:
summary: "SLA de freshness de dados violado para {{ $labels.table }}"- alert: UnexpectedRowCountDrop
expr: (yesterday_row_count - today_row_count) / yesterday_row_count > 0.2
labels:
severity: warning
annotations:
summary: "Contagem de linhas caiu >20% para {{ $labels.table }}"
Os melhores pipelines ETL são aqueles em que ninguém pensa. Correm fiavelmente, escalam automaticamente, e surfacem problemas antes dos utilizadores notarem. Isto requer investimento upfront em observabilidade, idempotência e testing—mas esse investimento paga dividendos cada vez que não és chamado às 3 da manhã.