O Alerta de Fraude Que Chegou 47 Horas Demasiado Tarde
Um cliente fintech reportou transações não autorizadas totalizando €12.400. Quando a equipa de fraude investigou, o padrão era óbvio: 23 pequenas transações em 4 países em 6 horas, todas de um único cartão comprometido. O sistema de deteção de fraude batch tinha-o sinalizado—47 horas após a última transação fraudulenta.
O job batch corria a cada 24 horas. Quando processou os dados, analisou padrões e gerou alertas, o dano estava feito duas vezes. Este tipo de incidente catalisa a mudança para processamento de stream em tempo real.
Com arquitetura streaming apropriada, padrões suspeitos disparam alertas em menos de 3 segundos. A mesma lógica de deteção, fundamentalmente reimaginada para arquitetura streaming.
Quando o Tempo Real Importa (E Quando Não)
Nem tudo precisa de processamento em tempo real. A distinção:
Requisitos de tempo real verdadeiros:
- Deteção de fraude (valor da prevenção diminui rapidamente com atraso)
- Monitorização operacional (alertas atrasados = outages prolongados)
- Preços dinâmicos (preços desatualizados = receita ou margem perdida)
- Personalização ao vivo (recomendações devem corresponder ao contexto atual)
Batch está bem para:
- Reporting e analytics (dados de ontem são geralmente aceitáveis)
- Treino de modelos ML (treinar com dados em tempo real é raramente necessário)
- Loading de data warehouse (atualizações noturnas funcionam para a maioria de BI)
- Reporting de compliance (timelines regulatórios são geralmente dias, não segundos)
O trade-off latência-custo: Sistemas em tempo real custam 3-10x mais que batch para throughput equivalente. Só paga esse prémio quando a latência impacta diretamente o valor de negócio.
Fundamentos de Stream Processing
O Modelo Mental Core
Processamento batch: Computar sobre datasets completos e limitados
Processamento stream: Computar sobre fluxos de dados incompletos e ilimitados
Batch:
[───── Dataset Completo ─────] → Processar → ResultadoStream:
──evento──evento──evento──evento──evento──▶ (infinito)
│
┌────▼────┐
│Processar│
└────┬────┘
│
─resultado─resultado─resultado─▶
Conceitos Chave
Event time vs. processing time:
- Event time: Quando o evento realmente ocorreu
- Processing time: Quando o teu sistema o processa
Apps móveis podem emitir eventos enquanto offline e sincronizar horas depois. O teu sistema recebe-os em ordem de processamento; precisas de raciocinar sobre ordem de eventos.
Windows:
Como agregas um stream infinito? Definindo windows finitas.
Tumbling window: |──5min──|──5min──|──5min──|
Não sobrepostas, tamanho fixoSliding window: |──5min──|
|──5min──|
|──5min──|
Sobrepostas, desliza por intervalo
Session window: |──sessão──| |──sessão──|
Definida por gaps de atividade
Watermarks:
Como sabes quando todos os eventos para uma window chegaram? Não sabes—mas watermarks fornecem garantias heurísticas.
Um watermark no tempo T significa: "Provavelmente recebi todos os eventos com event time ≤ T"
Eventos que chegam após o seu watermark são "atrasados." Tu escolhes se os descartas, atualizas resultados anteriores, ou emites correções.
Padrões de Arquitetura
Padrão 1: Event Sourcing com Kafka
Armazenar todas as mudanças de estado como uma sequência imutável de eventos.
Ações user → Kafka topics → Views derivadas
↓
┌─────────────────────────────────┐
│ Event Log (Kafka) │
│ [Criado] [Atualizado] [Apagado] │
└─────────────────────────────────┘
↓ ↓ ↓
┌───────────┐ ┌───────────┐ ┌───────────┐
│ Search │ │ Cache │ │ Analytics │
│ Index │ │ View │ │ Store │
└───────────┘ └───────────┘ └───────────┘
Configuração Kafka para durabilidade:
Definições de producer
acks=all # Esperar por todas as réplicas
enable.idempotence=true # Semântica exactly-once
retries=2147483647 # Retry indefinidamenteDefinições de topic
min.insync.replicas=2 # Requerer 2 réplicas para escritas
retention.ms=604800000 # 7 dias de retenção
cleanup.policy=delete # Ou "compact" para state topics
Padrão 2: Dualidade Stream-Table
Um stream é o changelog de uma table. Uma table é o estado mais recente de um stream.
Stream (changelog):
t1: {user: 123, balance: 100}
t2: {user: 123, balance: 150}
t3: {user: 123, balance: 125}Table (estado atual):
user: 123 → balance: 125
Usa isto para stream-stream joins (enriquecer eventos com dados de referência):
Exemplo Flink SQL
CREATE TABLE orders (
order_id STRING,
user_id STRING,
product_id STRING,
order_time TIMESTAMP(3),
WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND
) WITH ('connector' = 'kafka', ...);CREATE TABLE products (
product_id STRING,
product_name STRING,
category STRING,
price DECIMAL(10,2)
) WITH ('connector' = 'kafka', ...);
-- Stream-table join: enriquecer encomendas com info de produto
SELECT
o.order_id,
o.user_id,
p.product_name,
p.category,
p.price
FROM orders o
JOIN products FOR SYSTEM_TIME AS OF o.order_time AS p
ON o.product_id = p.product_id;
Padrão 3: Materialized Views com Updates Incrementais
Pré-computar resultados de query, atualizar incrementalmente quando novos eventos chegam.
Abordagem tradicional:
-- Corre a cada hora, scaneia tabela inteira
SELECT category, COUNT(*), SUM(amount)
FROM transactions
WHERE timestamp >= NOW() - INTERVAL '24 hours'
GROUP BY category
Abordagem streaming:
Agregação streaming Flink
SELECT
category,
COUNT(*) as transaction_count,
SUM(amount) as total_amount
FROM transactions
GROUP BY
category,
TUMBLE(event_time, INTERVAL '1' HOUR)
A query streaming mantém estado incrementalmente. Cada novo evento atualiza apenas a agregação afetada—O(1) por evento em vez de O(n) por query.
Deep Dive de Implementação
Kafka Streams para Pipelines Simples
Para aplicações baseadas em JVM, Kafka Streams fornece processamento de stream sem um cluster separado.
StreamsBuilder builder = new StreamsBuilder();// Ler do topic de input
KStream<String, Transaction> transactions = builder.stream("transactions");
// Detetar padrões suspeitos
KStream<String, Alert> alerts = transactions
.groupByKey()
.windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(5)))
.aggregate(
TransactionAccumulator::new,
(key, transaction, accumulator) -> accumulator.add(transaction),
Materialized.as("transaction-store")
)
.toStream()
.filter((windowedKey, accumulator) -> accumulator.isSuspicious())
.map((windowedKey, accumulator) -> KeyValue.pair(
windowedKey.key(),
new Alert(windowedKey.key(), accumulator.getReason())
));
// Escrever alertas para topic de output
alerts.to("alerts");
Apache Flink para Processamento Complexo
Flink trata processamento de eventos complexos, semântica exactly-once e windowing sofisticado.
PyFlink deteção de fraude
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironmentenv = StreamExecutionEnvironment.get_execution_environment()
t_env = StreamTableEnvironment.create(env)
Definir query de deteção de fraude
t_env.execute_sql("""
CREATE VIEW suspicious_cards AS
SELECT
card_id,
COUNT(DISTINCT country) as country_count,
COUNT(*) as transaction_count,
SUM(amount) as total_amount
FROM transactions
GROUP BY
card_id,
HOP(event_time, INTERVAL '1' MINUTE, INTERVAL '5' MINUTES)
HAVING
COUNT(DISTINCT country) >= 3
AND COUNT(*) >= 5
""")-- Alertar sobre padrões suspeitos
t_env.execute_sql("""
INSERT INTO alerts
SELECT
card_id,
'Múltiplos países em janela curta' as reason,
country_count,
transaction_count,
total_amount
FROM suspicious_cards
""")
Gestão de Estado
Estado de processamento de stream deve sobreviver a falhas. Opções:
RocksDB (embebido):
Configuração Flink
state.backend: rocksdb
state.backend.rocksdb.localdir: /mnt/ssd/flink-state
state.checkpoints.dir: s3://bucket/checkpoints
State stores externos (Redis, Cassandra):
class StatefulProcessor(ProcessFunction):
def __init__(self):
self.redis = Redis(host='redis-cluster') def process_element(self, event, ctx):
# Ler estado atual
state = self.redis.hgetall(f"user:{event.user_id}")
# Atualizar estado
state['last_seen'] = event.timestamp
state['event_count'] = int(state.get('event_count', 0)) + 1
# Escrever de volta
self.redis.hset(f"user:{event.user_id}", mapping=state)
# Emitir evento enriquecido
yield EnrichedEvent(event, state)
Desafios Operacionais
Desafio 1: Processamento Exactly-Once
Sistemas distribuídos falham. Como garantes que cada evento é processado exatamente uma vez?
Exactly-once do Kafka: Transações através de produce e consume.
producer.initTransactions();
try {
producer.beginTransaction();
producer.send(record1);
producer.send(record2);
producer.sendOffsetsToTransaction(offsets, consumerGroupId);
producer.commitTransaction();
} catch (Exception e) {
producer.abortTransaction();
}
Consumers idempotentes: Desenha consumers para tratar duplicados com segurança.
def process_event(event):
# Verificar se já processado
if redis.sismember("processed", event.id):
return # Saltar duplicado # Processar evento
do_processing(event)
# Marcar como processado (com TTL para prevenir crescimento ilimitado)
redis.sadd("processed", event.id)
redis.expire("processed", 86400) # TTL de 24 horas
Desafio 2: Tratar Dados Atrasados
Eventos chegam fora de ordem. A tua agregação windowed fechou, mas um evento atrasado acabou de chegar.
Opções:
- Descartar dados atrasados: Simples, mas perdes precisão
- Atraso permitido: Manter windows abertas mais tempo
- Side outputs: Encaminhar dados atrasados para stream separado para reprocessamento
- Correções: Emitir resultados atualizados quando dados atrasados chegam
Flink allowed lateness
.window(TumblingEventTimeWindows.of(Time.minutes(5)))
.allowedLateness(Time.minutes(1))
.sideOutputLateData(lateOutputTag)
Desafio 3: Backpressure
Quando consumers não conseguem acompanhar producers, o que acontece?
Sem backpressure: Memória enche, sistema crasha.
Com backpressure: Consumers lentos sinalizam producers para abrandar.
Producer (1000 eventos/sec)
↓
Buffer (a encher)
↓
Consumer (500 eventos/sec)
↓
Sinal backpressure → Producer abranda para 500 eventos/sec
Monitorizar backpressure no Flink:
metrics:
- name: inputQueueLength
alert_if: > 1000
- name: backPressuredTimeMsPerSecond
alert_if: > 500
Inferência ML em Tempo Real
A interseção de streaming e IA: correr modelos em dados em tempo real.
Padrão: Integração com Feature Store
Eventos live → Stream processor → Computação de features
↓
Feature Store
↓
Modelo ML → Previsões em tempo real
Implementação:
class RealTimeFeatureProcessor(ProcessFunction):
def __init__(self, model_client, feature_store):
self.model = model_client
self.features = feature_store def process_element(self, event, ctx):
# Computar streaming features
streaming_features = {
'recent_transaction_count': self.count_recent(event.user_id),
'velocity_score': self.calculate_velocity(event)
}
# Obter features históricas da store
historical_features = self.features.get_features(
entity_id=event.user_id,
feature_names=['avg_transaction_amount', 'account_age_days']
)
# Combinar e pontuar
all_features = {streaming_features, historical_features}
risk_score = self.model.predict(all_features)
yield ScoredTransaction(event, risk_score)
O sistema de deteção de fraude que iniciou este artigo usa exatamente este padrão. Streaming features (velocidade de transação, dispersão geográfica) combinam com features históricas (perfil de cliente, comportamento típico) para pontuar cada transação em tempo real.
O atraso de 47 horas é agora 2,8 segundos. Mesma lógica de deteção, arquitetura radicalmente diferente. Este é o poder de pensar em streams.