Partilhar
Ciência de Dados

Construir Pipelines ETL Eficientes para Aplicações de IA

Construir Pipelines ETL Eficientes para Aplicações de IA

O Nosso Pipeline Mais Rápido Corre Uma Vez por Dia. O Mais Lento Corre a Cada Segundo.

Isto parece ao contrário. O real-time não deveria ser mais rápido? A resposta revela uma verdade fundamental sobre engenharia de dados: velocidade não é sobre quantas vezes corres—é sobre quanto trabalho fazes por corrida.

O nosso pipeline batch diário processa 180 milhões de registos em 23 minutos. O nosso stream "real-time" processa 2.000 registos por segundo—mas cada registo leva 450ms de processamento individual. O pipeline batch move 130.000 registos por segundo. O stream move 2.000.

Para aplicações IA, compreender esta distinção determina se os teus dados de treino têm horas de atraso ou semanas, se a tua feature store atualiza a tempo para previsões, e se os teus custos são €5.000/mês ou €50.000/mês.

ETL para IA: O Que é Diferente

ETL tradicional move dados da fonte para o warehouse. Pipelines IA têm requisitos adicionais:

Requisito 1: Feature Engineering em Escala

Dados brutos raramente alimentam modelos diretamente. Precisas de features derivadas—agregações, transformações, embeddings—calculadas consistentemente entre treino e inferência.

O problema de training-serving skew: Se features são calculadas diferentemente durante treino versus serving, a performance do modelo degrada silenciosamente. Um modelo treinado em "valor médio de compra nos últimos 30 dias" calculado de uma forma vai falhar quando o serving calcula diferentemente.

Arquitetura de solução:

┌─────────────────────────────────────────────────────────┐
│ Feature Pipeline │
│ ┌───────────┐ ┌────────────┐ ┌──────────────┐ │
│ │ Raw Data │ ──► │ Transform │ ──► │ Feature │ │
│ │ Sources │ │ Logic │ │ Store │ │
│ └───────────┘ └────────────┘ └──────────────┘ │
└─────────────────────────────────────────────────────────┘

┌────────────────┼────────────────┐
▼ ▼
┌──────────────┐ ┌──────────────┐
│ Training │ │ Serving │
│ Pipeline │ │ Pipeline │
└──────────────┘ └──────────────┘

Tanto treino como serving leem da mesma feature store. A lógica de transformação corre uma vez, produzindo features consumidas por ambos os caminhos.

Requisito 2: Lineage e Reprodutibilidade

Quando as previsões de um modelo correm mal, precisas de rastrear: Que dados de treino foram usados? Que transformações foram aplicadas? Que versão da lógica de features estava em vigor?

Requisitos mínimos de lineage:

  • Cada dataset versionado com snapshots imutáveis

  • Código de transformação versionado e ligado a datasets de output

  • Treino de modelo ligado a versões específicas de dados

  • Logs de previsão ligados a versão do modelo e dados de input

Implementação com dbt:

models/features/user_features.sql

{{
config(
materialized='incremental',
unique_key='user_id',
meta={
'owner': 'data-team',
'lineage_tracked': true,
'freshness_sla': '24 hours'
}
)
}}

SELECT
user_id,
COUNT(*) as total_purchases,
AVG(purchase_amount) as avg_purchase_value,
MAX(purchase_date) as last_purchase_date
FROM {{ ref('stg_purchases') }}
WHERE purchase_date >= DATEADD(day, -30, CURRENT_DATE)
GROUP BY user_id

Requisito 3: Enforcement de Qualidade de Dados

Dados maus criam modelos maus. Ao contrário de analytics tradicional onde dados maus produzem relatórios errados, dados de treino IA maus produzem modelos que confiantemente fazem previsões erradas em escala.

Gates de qualidade que enforçamos:

from great_expectations import expect

Verificações estruturais

expect.column_values_to_not_be_null("user_id")
expect.column_values_to_be_unique("user_id")

Verificações estatísticas

expect.column_mean_to_be_between("purchase_amount", min_value=10, max_value=1000)
expect.column_values_to_be_between("user_age", min_value=18, max_value=120)

Verificações de freshness

expect.column_max_to_be_between(
"created_at",
min_value=datetime.now() - timedelta(hours=24),
max_value=datetime.now()
)

Deteção de drift

expect.column_distribution_to_match_reference("user_segment", reference_dataset)

Se qualquer verificação falhar, o pipeline para. Sem corrupção silenciosa de dados a propagar para modelos.

A Stack ETL Moderna para IA

Camada 1: Ingestão

Para fontes batch (bases de dados, ficheiros, APIs):
Usamos Airbyte para conectores a 300+ fontes. Orientado por configuração, trata mudanças de schema graciosamente, e open-source.

airbyte/connections/salesforce.yaml

sourceDefinitionId: salesforce
configuration:
client_id: ${SALESFORCE_CLIENT_ID}
client_secret: ${SALESFORCE_CLIENT_SECRET}
refresh_token: ${SALESFORCE_REFRESH_TOKEN}
streams:
- name: opportunities
sync_mode: incremental
cursor_field: last_modified_date
- name: accounts
sync_mode: full_refresh # Tabela pequena, sempre full sync

Para fontes streaming (eventos, logs, IoT):
Kafka para ingestão de eventos. Debezium para change data capture de bases de dados transacionais.

Princípio chave de design: Aterrar dados raw primeiro, transformar depois. Armazenar payloads originais numa camada raw antes de qualquer transformação. Podes sempre re-transformar; não consegues recuperar dados que nunca armazenaste.

Camada 2: Armazenamento

A arquitetura medallion:

Bronze (Raw)    →    Silver (Limpo)    →    Gold (Features)
│ │ │
│ Formato original │ Validado, tipado │ Pronto para modelo
│ Snapshots imutáveis │ Deduplicado │ Agregado
│ Histórico completo │ Schemas standard │ Feature store

Escolhas tecnológicas:

  • Bronze: Ficheiros Parquet no S3/GCS, particionados por data de ingestão

  • Silver: Tabelas Delta Lake com transações ACID e time travel

  • Gold: Feature store (Feast, Tecton, ou custom) com serving online/offline

Porquê Delta Lake para Silver:

Time travel para reprodutibilidade

df = spark.read.format("delta").option("versionAsOf", 42).load("/silver/users")

Schema enforcement

spark.read.format("delta").load("/silver/users").write \
.format("delta") \
.option("mergeSchema", "false") \ # Falhar em schema mismatch
.mode("append") \
.save("/silver/users")

Transações ACID para escritas concorrentes

delta_table.merge(
updates_df,
"target.user_id = source.user_id"
).whenMatchedUpdateAll().whenNotMatchedInsertAll().execute()

Camada 3: Transformação

dbt para transformações SQL:
A nossa equipa de dados pensa em SQL. dbt permite-lhes definir transformações declarativamente enquanto a plataforma trata de execução, teste e documentação.

-- models/gold/user_360.sql
WITH purchase_stats AS (
SELECT
user_id,
COUNT(*) as purchase_count,
SUM(amount) as total_spent,
AVG(amount) as avg_order_value
FROM {{ ref('silver_purchases') }}
GROUP BY user_id
),

engagement_stats AS (
SELECT
user_id,
COUNT(*) as login_count,
MAX(login_time) as last_login
FROM {{ ref('silver_logins') }}
WHERE login_time >= DATEADD(day, -90, CURRENT_DATE)
GROUP BY user_id
)

SELECT
u.user_id,
u.email,
u.signup_date,
p.purchase_count,
p.total_spent,
p.avg_order_value,
e.login_count,
e.last_login,
DATEDIFF(day, u.signup_date, CURRENT_DATE) as days_since_signup
FROM {{ ref('silver_users') }} u
LEFT JOIN purchase_stats p ON u.user_id = p.user_id
LEFT JOIN engagement_stats e ON u.user_id = e.user_id

Spark para transformações complexas:
Quando SQL não é suficiente—feature engineering ML, agregações complexas, ou processar dados não estruturados—usamos PySpark.

from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.sql import functions as F

Feature engineering

df = df.withColumn(
"recency_score",
F.datediff(F.current_date(), F.col("last_purchase_date"))
)

Geração de embedding (chamando modelo externo)

def generate_embedding(text):
# Chamar API de embedding
return embedding_vector

embedding_udf = F.udf(generate_embedding, ArrayType(FloatType()))
df = df.withColumn("description_embedding", embedding_udf(F.col("product_description")))

Camada 4: Orquestração

Apache Airflow permanece o standard, apesar das suas peculiaridades. Padrões chave:

Triggers baseados em sensor:

wait_for_upstream = S3KeySensor(
task_id='wait_for_raw_data',
bucket_name='data-lake',
bucket_key='raw/purchases/dt={{ ds }}/_SUCCESS',
timeout=3600 # 1 hora
)

Geração dinâmica de DAG:

Gerar um DAG por fonte de dados

for source in ['salesforce', 'hubspot', 'postgres_production']:
dag = DAG(
dag_id=f'ingest_{source}',
schedule_interval='@hourly',
default_args=default_args
)

with dag:
extract = AirbyteOperator(connection_id=source)
validate = GreatExpectationsOperator(suite=f'{source}_validation')
transform = DbtRunOperator(models=f'+stg_{source}')

extract >> validate >> transform

Tratamento de backfill:

@task
def process_partition(ds):
# ds = execution_date em formato YYYY-MM-DD
# Mesmo código trata corridas diárias e backfills
input_path = f"s3://raw/events/dt={ds}/"
output_path = f"s3://silver/events/dt={ds}/"
transform(input_path, output_path)

Técnicas de Otimização de Performance

Técnica 1: Partition Pruning

A otimização mais impactante. Particionar dados por padrões de query.

Antes (full scan):

SELECT * FROM events WHERE event_date = '2024-12-10'
-- Scaneia 2TB de dados

Depois (partition pruning):

-- Dados particionados por dt=YYYY-MM-DD
SELECT * FROM events WHERE dt = '2024-12-10'
-- Scaneia 50GB de dados (partição apenas desse dia)

Técnica 2: Armazenamento Colunar com Compressão

Parquet com compressão ZSTD tipicamente atinge rácios de compressão de 10:1.

Configuração Parquet ótima:

df.write.format("parquet") \
.option("compression", "zstd") \
.option("parquet.block.size", 128 1024 1024) \ # Row groups de 128MB
.partitionBy("dt") \
.save("s3://silver/events/")

Técnica 3: Processamento Incremental

Não reprocessar o que não mudou.

Padrão incremental:

{% if is_incremental() %}
SELECT * FROM source_table
WHERE updated_at > (SELECT MAX(updated_at) FROM {{ this }})
{% else %}
SELECT * FROM source_table
{% endif %}

Técnica 4: Predicate Pushdown

Empurrar filtros para a fonte de dados, não para o motor de processamento.

Mau: Carregar todos os dados, depois filtrar

df = spark.read.parquet("s3://data/events/")
filtered = df.filter(F.col("event_type") == "purchase")

Bom: Filtrar no momento de leitura

df = spark.read.parquet("s3://data/events/").filter(F.col("event_type") == "purchase")

Spark empurra este filtro para o leitor Parquet

Monitorização e Alertas

Métricas essenciais de pipeline:

  • Freshness: Tempo desde última corrida bem-sucedida

  • Completude: Contagens de linhas vs. esperado

  • Latência: Tempo de processamento end-to-end

  • Custo: Compute e storage por pipeline

Regras de alerta:

- alert: DataFreshnessSLA
expr: time() - data_freshness_timestamp > 86400 # 24 horas
labels:
severity: critical
annotations:
summary: "SLA de freshness de dados violado para {{ $labels.table }}"

  • alert: UnexpectedRowCountDrop

expr: (yesterday_row_count - today_row_count) / yesterday_row_count > 0.2
labels:
severity: warning
annotations:
summary: "Contagem de linhas caiu >20% para {{ $labels.table }}"

Os melhores pipelines ETL são aqueles em que ninguém pensa. Correm fiavelmente, escalam automaticamente, e surfacem problemas antes dos utilizadores notarem. Isto requer investimento upfront em observabilidade, idempotência e testing—mas esse investimento paga dividendos cada vez que não és chamado às 3 da manhã.

João Mendes

Sobre o Autor

João Mendes

Cofundador da AIOBI. Engenheiro de Dados e IA com experiência em infraestrutura de dados, produtos inteligentes e soluções escaláveis.