The Fraud Alert That Arrived 47 Hours Too Late
A fintech customer reported unauthorized transactions totaling β¬12,400. When the fraud team investigated, the pattern was obvious: 23 small transactions across 4 countries in 6 hours, all from a single compromised card. The batch fraud detection system had flagged itβ47 hours after the last fraudulent transaction.
The batch job ran every 24 hours. By the time it processed the data, analyzed patterns, and generated alerts, the damage was done twice over. This type of incident catalyzes the shift to real-time stream processing.
With proper streaming architecture, suspicious patterns trigger alerts in under 3 seconds. The same detection logic, fundamentally reimagined for streaming architecture.
When Real-Time Matters (And When It Doesn't)
Not everything needs real-time processing. The distinction:
True real-time requirements:
- Fraud detection (value of prevention decreases rapidly with delay)
- Operational monitoring (delayed alerts = extended outages)
- Dynamic pricing (stale prices = lost revenue or margin)
- Live personalization (recommendations must match current context)
Batch is fine for:
- Reporting and analytics (yesterday's data is usually acceptable)
- ML model training (training on real-time data is rarely necessary)
- Data warehouse loading (nightly updates work for most BI)
- Compliance reporting (regulatory timelines are usually days, not seconds)
The latency-cost tradeoff: Real-time systems cost 3-10x more than batch for equivalent throughput. Only pay that premium when latency directly impacts business value.
Stream Processing Fundamentals
The Core Mental Model
Batch processing: Compute over complete, bounded datasets
Stream processing: Compute over incomplete, unbounded data flows
Batch:
[βββββ Complete Dataset βββββ] β Process β ResultStream:
ββeventββeventββeventββeventββeventβββΆ (infinite)
β
ββββββΌβββββ
β Process β
ββββββ¬βββββ
β
βresultβresultβresultββΆ
Key Concepts
Event time vs. processing time:
- Event time: When the event actually occurred
- Processing time: When your system processes it
Mobile apps might emit events while offline and sync hours later. Your system receives them in processing order; you need to reason about event order.
Windows:
How do you aggregate an infinite stream? By defining finite windows.
Tumbling window: |ββ5minββ|ββ5minββ|ββ5minββ|
Non-overlapping, fixed sizeSliding window: |ββ5minββ|
|ββ5minββ|
|ββ5minββ|
Overlapping, slides by interval
Session window: |ββsessionββ| |ββsessionββ|
Defined by activity gaps
Watermarks:
How do you know when all events for a window have arrived? You don'tβbut watermarks provide heuristic guarantees.
A watermark at time T means: "I've probably received all events with event time β€ T"
Events arriving after their watermark are "late." You choose whether to drop them, update previous results, or emit corrections.
Architecture Patterns
Pattern 1: Event Sourcing with Kafka
Store all state changes as an immutable sequence of events.
User actions β Kafka topics β Derived views
β
βββββββββββββββββββββββββββββββββββ
β Event Log (Kafka) β
β [Created] [Updated] [Deleted] β
βββββββββββββββββββββββββββββββββββ
β β β
βββββββββββββ βββββββββββββ βββββββββββββ
β Search β β Cache β β Analytics β
β Index β β View β β Store β
βββββββββββββ βββββββββββββ βββββββββββββ
Kafka configuration for durability:
Producer settings
acks=all # Wait for all replicas
enable.idempotence=true # Exactly-once semantics
retries=2147483647 # Retry indefinitelyTopic settings
min.insync.replicas=2 # Require 2 replicas for writes
retention.ms=604800000 # 7 days retention
cleanup.policy=delete # Or "compact" for state topics
Pattern 2: Stream-Table Duality
A stream is a table's changelog. A table is a stream's latest state.
Stream (changelog):
t1: {user: 123, balance: 100}
t2: {user: 123, balance: 150}
t3: {user: 123, balance: 125}Table (current state):
user: 123 β balance: 125
Use this for stream-stream joins (enrich events with reference data):
Flink SQL example
CREATE TABLE orders (
order_id STRING,
user_id STRING,
product_id STRING,
order_time TIMESTAMP(3),
WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND
) WITH ('connector' = 'kafka', ...);CREATE TABLE products (
product_id STRING,
product_name STRING,
category STRING,
price DECIMAL(10,2)
) WITH ('connector' = 'kafka', ...);
-- Stream-table join: enrich orders with product info
SELECT
o.order_id,
o.user_id,
p.product_name,
p.category,
p.price
FROM orders o
JOIN products FOR SYSTEM_TIME AS OF o.order_time AS p
ON o.product_id = p.product_id;
Pattern 3: Materialized Views with Incremental Updates
Pre-compute query results, update incrementally as new events arrive.
Traditional approach:
-- Runs every hour, scans entire table
SELECT category, COUNT(*), SUM(amount)
FROM transactions
WHERE timestamp >= NOW() - INTERVAL '24 hours'
GROUP BY category
Streaming approach:
Flink streaming aggregation
SELECT
category,
COUNT(*) as transaction_count,
SUM(amount) as total_amount
FROM transactions
GROUP BY
category,
TUMBLE(event_time, INTERVAL '1' HOUR)
The streaming query maintains state incrementally. Each new event updates only the affected aggregationβO(1) per event instead of O(n) per query.
Implementation Deep Dive
Kafka Streams for Simple Pipelines
For JVM-based applications, Kafka Streams provides stream processing without a separate cluster.
StreamsBuilder builder = new StreamsBuilder();// Read from input topic
KStream<String, Transaction> transactions = builder.stream("transactions");
// Detect suspicious patterns
KStream<String, Alert> alerts = transactions
.groupByKey()
.windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(5)))
.aggregate(
TransactionAccumulator::new,
(key, transaction, accumulator) -> accumulator.add(transaction),
Materialized.as("transaction-store")
)
.toStream()
.filter((windowedKey, accumulator) -> accumulator.isSuspicious())
.map((windowedKey, accumulator) -> KeyValue.pair(
windowedKey.key(),
new Alert(windowedKey.key(), accumulator.getReason())
));
// Write alerts to output topic
alerts.to("alerts");
Apache Flink for Complex Processing
Flink handles complex event processing, exactly-once semantics, and sophisticated windowing.
PyFlink fraud detection
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironmentenv = StreamExecutionEnvironment.get_execution_environment()
t_env = StreamTableEnvironment.create(env)
Define fraud detection query
t_env.execute_sql("""
CREATE VIEW suspicious_cards AS
SELECT
card_id,
COUNT(DISTINCT country) as country_count,
COUNT(*) as transaction_count,
SUM(amount) as total_amount
FROM transactions
GROUP BY
card_id,
HOP(event_time, INTERVAL '1' MINUTE, INTERVAL '5' MINUTES)
HAVING
COUNT(DISTINCT country) >= 3
AND COUNT(*) >= 5
""")Alert on suspicious patterns
t_env.execute_sql("""
INSERT INTO alerts
SELECT
card_id,
'Multiple countries in short window' as reason,
country_count,
transaction_count,
total_amount
FROM suspicious_cards
""")
State Management
Stream processing state must survive failures. Options:
RocksDB (embedded):
Flink configuration
state.backend: rocksdb
state.backend.rocksdb.localdir: /mnt/ssd/flink-state
state.checkpoints.dir: s3://bucket/checkpoints
External state stores (Redis, Cassandra):
class StatefulProcessor(ProcessFunction):
def __init__(self):
self.redis = Redis(host='redis-cluster') def process_element(self, event, ctx):
# Read current state
state = self.redis.hgetall(f"user:{event.user_id}")
# Update state
state['last_seen'] = event.timestamp
state['event_count'] = int(state.get('event_count', 0)) + 1
# Write back
self.redis.hset(f"user:{event.user_id}", mapping=state)
# Emit enriched event
yield EnrichedEvent(event, state)
Operational Challenges
Challenge 1: Exactly-Once Processing
Distributed systems fail. How do you ensure each event is processed exactly once?
Kafka's exactly-once: Transactions across produce and consume.
producer.initTransactions();
try {
producer.beginTransaction();
producer.send(record1);
producer.send(record2);
producer.sendOffsetsToTransaction(offsets, consumerGroupId);
producer.commitTransaction();
} catch (Exception e) {
producer.abortTransaction();
}
Idempotent consumers: Design consumers to handle duplicates safely.
def process_event(event):
# Check if already processed
if redis.sismember("processed", event.id):
return # Skip duplicate # Process event
do_processing(event)
# Mark as processed (with TTL to prevent unbounded growth)
redis.sadd("processed", event.id)
redis.expire("processed", 86400) # 24-hour TTL
Challenge 2: Handling Late Data
Events arrive out of order. Your windowed aggregation closed, but a late event just arrived.
Options:
- Drop late data: Simple, but you lose accuracy
- Allowed lateness: Keep windows open longer
- Side outputs: Route late data to separate stream for reprocessing
- Corrections: Emit updated results when late data arrives
Flink allowed lateness
.window(TumblingEventTimeWindows.of(Time.minutes(5)))
.allowedLateness(Time.minutes(1))
.sideOutputLateData(lateOutputTag)
Challenge 3: Backpressure
When consumers can't keep up with producers, what happens?
Without backpressure: Memory fills, system crashes.
With backpressure: Slow consumers signal producers to slow down.
Producer (1000 events/sec)
β
Buffer (filling up)
β
Consumer (500 events/sec)
β
Backpressure signal β Producer slows to 500 events/sec
Monitoring backpressure in Flink:
metrics:
- name: inputQueueLength
alert_if: > 1000
- name: backPressuredTimeMsPerSecond
alert_if: > 500
Real-Time ML Inference
The intersection of streaming and AI: running models on real-time data.
Pattern: Feature Store Integration
Live events β Stream processor β Feature computation
β
Feature Store
β
ML Model β Real-time predictions
Implementation:
class RealTimeFeatureProcessor(ProcessFunction):
def __init__(self, model_client, feature_store):
self.model = model_client
self.features = feature_store def process_element(self, event, ctx):
# Compute streaming features
streaming_features = {
'recent_transaction_count': self.count_recent(event.user_id),
'velocity_score': self.calculate_velocity(event)
}
# Get historical features from store
historical_features = self.features.get_features(
entity_id=event.user_id,
feature_names=['avg_transaction_amount', 'account_age_days']
)
# Combine and score
all_features = {streaming_features, historical_features}
risk_score = self.model.predict(all_features)
yield ScoredTransaction(event, risk_score)
The fraud detection system that started this article uses exactly this pattern. Streaming features (transaction velocity, geographic spread) combine with historical features (customer profile, typical behavior) to score each transaction in real-time.
The 47-hour delay is now 2.8 seconds. Same detection logic, radically different architecture. That's the power of thinking in streams.