Share
Data Science

Real-Time Data Processing: Stream Analytics for Modern Apps

Real-Time Data Processing: Stream Analytics for Modern Apps

The Fraud Alert That Arrived 47 Hours Too Late

A fintech customer reported unauthorized transactions totaling €12,400. When the fraud team investigated, the pattern was obvious: 23 small transactions across 4 countries in 6 hours, all from a single compromised card. The batch fraud detection system had flagged itβ€”47 hours after the last fraudulent transaction.

The batch job ran every 24 hours. By the time it processed the data, analyzed patterns, and generated alerts, the damage was done twice over. This type of incident catalyzes the shift to real-time stream processing.

With proper streaming architecture, suspicious patterns trigger alerts in under 3 seconds. The same detection logic, fundamentally reimagined for streaming architecture.

When Real-Time Matters (And When It Doesn't)

Not everything needs real-time processing. The distinction:

True real-time requirements:

  • Fraud detection (value of prevention decreases rapidly with delay)

  • Operational monitoring (delayed alerts = extended outages)

  • Dynamic pricing (stale prices = lost revenue or margin)

  • Live personalization (recommendations must match current context)

Batch is fine for:

  • Reporting and analytics (yesterday's data is usually acceptable)

  • ML model training (training on real-time data is rarely necessary)

  • Data warehouse loading (nightly updates work for most BI)

  • Compliance reporting (regulatory timelines are usually days, not seconds)

The latency-cost tradeoff: Real-time systems cost 3-10x more than batch for equivalent throughput. Only pay that premium when latency directly impacts business value.

Stream Processing Fundamentals

The Core Mental Model

Batch processing: Compute over complete, bounded datasets
Stream processing: Compute over incomplete, unbounded data flows

Batch:
[───── Complete Dataset ─────] β†’ Process β†’ Result

Stream:
──event──event──event──event──event──▢ (infinite)
β”‚
β”Œβ”€β”€β”€β”€β–Όβ”€β”€β”€β”€β”
β”‚ Process β”‚
β””β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”˜
β”‚
─result─result─result─▢

Key Concepts

Event time vs. processing time:

  • Event time: When the event actually occurred

  • Processing time: When your system processes it

Mobile apps might emit events while offline and sync hours later. Your system receives them in processing order; you need to reason about event order.

Windows:
How do you aggregate an infinite stream? By defining finite windows.

Tumbling window: |──5min──|──5min──|──5min──|
Non-overlapping, fixed size

Sliding window: |──5min──|
|──5min──|
|──5min──|
Overlapping, slides by interval

Session window: |──session──| |──session──|
Defined by activity gaps

Watermarks:
How do you know when all events for a window have arrived? You don'tβ€”but watermarks provide heuristic guarantees.

A watermark at time T means: "I've probably received all events with event time ≀ T"

Events arriving after their watermark are "late." You choose whether to drop them, update previous results, or emit corrections.

Architecture Patterns

Pattern 1: Event Sourcing with Kafka

Store all state changes as an immutable sequence of events.

User actions β†’ Kafka topics β†’ Derived views
↓
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚ Event Log (Kafka) β”‚
β”‚ [Created] [Updated] [Deleted] β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
↓ ↓ ↓
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚ Search β”‚ β”‚ Cache β”‚ β”‚ Analytics β”‚
β”‚ Index β”‚ β”‚ View β”‚ β”‚ Store β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

Kafka configuration for durability:

Producer settings

acks=all # Wait for all replicas
enable.idempotence=true # Exactly-once semantics
retries=2147483647 # Retry indefinitely

Topic settings

min.insync.replicas=2 # Require 2 replicas for writes
retention.ms=604800000 # 7 days retention
cleanup.policy=delete # Or "compact" for state topics

Pattern 2: Stream-Table Duality

A stream is a table's changelog. A table is a stream's latest state.

Stream (changelog):
t1: {user: 123, balance: 100}
t2: {user: 123, balance: 150}
t3: {user: 123, balance: 125}

Table (current state):
user: 123 β†’ balance: 125

Use this for stream-stream joins (enrich events with reference data):

Flink SQL example

CREATE TABLE orders (
order_id STRING,
user_id STRING,
product_id STRING,
order_time TIMESTAMP(3),
WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND
) WITH ('connector' = 'kafka', ...);

CREATE TABLE products (
product_id STRING,
product_name STRING,
category STRING,
price DECIMAL(10,2)
) WITH ('connector' = 'kafka', ...);

-- Stream-table join: enrich orders with product info
SELECT
o.order_id,
o.user_id,
p.product_name,
p.category,
p.price
FROM orders o
JOIN products FOR SYSTEM_TIME AS OF o.order_time AS p
ON o.product_id = p.product_id;

Pattern 3: Materialized Views with Incremental Updates

Pre-compute query results, update incrementally as new events arrive.

Traditional approach:

-- Runs every hour, scans entire table
SELECT category, COUNT(*), SUM(amount)
FROM transactions
WHERE timestamp >= NOW() - INTERVAL '24 hours'
GROUP BY category

Streaming approach:

Flink streaming aggregation

SELECT
category,
COUNT(*) as transaction_count,
SUM(amount) as total_amount
FROM transactions
GROUP BY
category,
TUMBLE(event_time, INTERVAL '1' HOUR)

The streaming query maintains state incrementally. Each new event updates only the affected aggregationβ€”O(1) per event instead of O(n) per query.

Implementation Deep Dive

Kafka Streams for Simple Pipelines

For JVM-based applications, Kafka Streams provides stream processing without a separate cluster.

StreamsBuilder builder = new StreamsBuilder();

// Read from input topic
KStream<String, Transaction> transactions = builder.stream("transactions");

// Detect suspicious patterns
KStream<String, Alert> alerts = transactions
.groupByKey()
.windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(5)))
.aggregate(
TransactionAccumulator::new,
(key, transaction, accumulator) -> accumulator.add(transaction),
Materialized.as("transaction-store")
)
.toStream()
.filter((windowedKey, accumulator) -> accumulator.isSuspicious())
.map((windowedKey, accumulator) -> KeyValue.pair(
windowedKey.key(),
new Alert(windowedKey.key(), accumulator.getReason())
));

// Write alerts to output topic
alerts.to("alerts");

Apache Flink for Complex Processing

Flink handles complex event processing, exactly-once semantics, and sophisticated windowing.

PyFlink fraud detection

from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment

env = StreamExecutionEnvironment.get_execution_environment()
t_env = StreamTableEnvironment.create(env)

Define fraud detection query

t_env.execute_sql("""
CREATE VIEW suspicious_cards AS
SELECT
card_id,
COUNT(DISTINCT country) as country_count,
COUNT(*) as transaction_count,
SUM(amount) as total_amount
FROM transactions
GROUP BY
card_id,
HOP(event_time, INTERVAL '1' MINUTE, INTERVAL '5' MINUTES)
HAVING
COUNT(DISTINCT country) >= 3
AND COUNT(*) >= 5
""")

Alert on suspicious patterns

t_env.execute_sql("""
INSERT INTO alerts
SELECT
card_id,
'Multiple countries in short window' as reason,
country_count,
transaction_count,
total_amount
FROM suspicious_cards
""")

State Management

Stream processing state must survive failures. Options:

RocksDB (embedded):

Flink configuration

state.backend: rocksdb
state.backend.rocksdb.localdir: /mnt/ssd/flink-state
state.checkpoints.dir: s3://bucket/checkpoints

External state stores (Redis, Cassandra):

class StatefulProcessor(ProcessFunction):
def __init__(self):
self.redis = Redis(host='redis-cluster')

def process_element(self, event, ctx):
# Read current state
state = self.redis.hgetall(f"user:{event.user_id}")

# Update state
state['last_seen'] = event.timestamp
state['event_count'] = int(state.get('event_count', 0)) + 1

# Write back
self.redis.hset(f"user:{event.user_id}", mapping=state)

# Emit enriched event
yield EnrichedEvent(event, state)

Operational Challenges

Challenge 1: Exactly-Once Processing

Distributed systems fail. How do you ensure each event is processed exactly once?

Kafka's exactly-once: Transactions across produce and consume.

producer.initTransactions();
try {
producer.beginTransaction();
producer.send(record1);
producer.send(record2);
producer.sendOffsetsToTransaction(offsets, consumerGroupId);
producer.commitTransaction();
} catch (Exception e) {
producer.abortTransaction();
}

Idempotent consumers: Design consumers to handle duplicates safely.

def process_event(event):
# Check if already processed
if redis.sismember("processed", event.id):
return # Skip duplicate

# Process event
do_processing(event)

# Mark as processed (with TTL to prevent unbounded growth)
redis.sadd("processed", event.id)
redis.expire("processed", 86400) # 24-hour TTL

Challenge 2: Handling Late Data

Events arrive out of order. Your windowed aggregation closed, but a late event just arrived.

Options:

  • Drop late data: Simple, but you lose accuracy

  • Allowed lateness: Keep windows open longer

  • Side outputs: Route late data to separate stream for reprocessing

  • Corrections: Emit updated results when late data arrives

Flink allowed lateness

.window(TumblingEventTimeWindows.of(Time.minutes(5)))
.allowedLateness(Time.minutes(1))
.sideOutputLateData(lateOutputTag)

Challenge 3: Backpressure

When consumers can't keep up with producers, what happens?

Without backpressure: Memory fills, system crashes.

With backpressure: Slow consumers signal producers to slow down.

Producer (1000 events/sec)
↓
Buffer (filling up)
↓
Consumer (500 events/sec)
↓
Backpressure signal β†’ Producer slows to 500 events/sec

Monitoring backpressure in Flink:

metrics:
- name: inputQueueLength
alert_if: > 1000
- name: backPressuredTimeMsPerSecond
alert_if: > 500

Real-Time ML Inference

The intersection of streaming and AI: running models on real-time data.

Pattern: Feature Store Integration

Live events β†’ Stream processor β†’ Feature computation
↓
Feature Store
↓
ML Model β†’ Real-time predictions

Implementation:

class RealTimeFeatureProcessor(ProcessFunction):
def __init__(self, model_client, feature_store):
self.model = model_client
self.features = feature_store

def process_element(self, event, ctx):
# Compute streaming features
streaming_features = {
'recent_transaction_count': self.count_recent(event.user_id),
'velocity_score': self.calculate_velocity(event)
}

# Get historical features from store
historical_features = self.features.get_features(
entity_id=event.user_id,
feature_names=['avg_transaction_amount', 'account_age_days']
)

# Combine and score
all_features = {streaming_features, historical_features}
risk_score = self.model.predict(all_features)

yield ScoredTransaction(event, risk_score)

The fraud detection system that started this article uses exactly this pattern. Streaming features (transaction velocity, geographic spread) combine with historical features (customer profile, typical behavior) to score each transaction in real-time.

The 47-hour delay is now 2.8 seconds. Same detection logic, radically different architecture. That's the power of thinking in streams.

JoΓ£o Mendes

About the Author

JoΓ£o Mendes

Co-founder of AIOBI. Data & AI Engineer with experience in data infrastructure, intelligent products, and scalable solutions.