Introduction
Financial fraud costs the global economy over $500 billion annually, and that number continues to climb as digital transactions accelerate. Traditional batch-based fraud detection systems — where transactions are analyzed hours or days after the fact — are no longer sufficient. By the time a batch job flags a fraudulent transaction, the money is gone.
Modern fraud detection demands sub-second decision-making. When a customer swipes a card, initiates a wire transfer, or submits a payment, the system must evaluate that transaction against hundreds of features, multiple ML models, and a rule engine — all within 50–200 milliseconds.
In this article, we walk through the end-to-end architecture of a production-grade real-time fraud detection system, covering stream ingestion with Apache Kafka, stream processing with Apache Flink, feature engineering, ML model serving, and rule engine integration.
High-Level Architecture
| Layer | Technology | Responsibility | Latency Target |
|---|---|---|---|
| Ingestion | Apache Kafka | Receive and buffer raw transaction events | < 5ms |
| Stream Processing | Apache Flink | Enrich transactions, compute real-time features | < 50ms |
| Feature Store | Redis / Feast | Serve pre-computed and real-time features | < 10ms |
| ML Inference | TensorFlow Serving / Triton | Score transactions with trained models | < 30ms |
| Rule Engine | Custom / Drools | Apply deterministic business rules | < 5ms |
| Decision | Orchestrator service | Combine scores, decide approve/decline/review | < 10ms |
The data flow: Transaction Event → Kafka Topic → Flink Job (enrichment + features) → Feature Store Lookup → ML Model Scoring → Rule Engine → Decision → Response.
Step 1: Transaction Ingestion with Kafka
from confluent_kafka import Producer
import json
import uuid
from datetime import datetime
producer_config = {
"bootstrap.servers": "kafka-broker-1:9092,kafka-broker-2:9092",
"acks": "all",
"enable.idempotence": True,
"max.in.flight.requests.per.connection": 5,
}
producer = Producer(producer_config)
def publish_transaction(transaction: dict):
transaction["event_id"] = str(uuid.uuid4())
transaction["event_timestamp"] = datetime.utcnow().isoformat()
producer.produce(
topic="transactions.raw",
key=transaction["account_id"],
value=json.dumps(transaction).encode("utf-8"),
callback=delivery_report,
)
producer.flush()
def delivery_report(err, msg):
if err:
print(f"Delivery failed: {err}")
publish_transaction({
"account_id": "ACC-1029384",
"amount": 2499.99,
"currency": "USD",
"merchant_id": "MERCH-8827",
"merchant_category": "electronics",
"card_present": False,
"ip_address": "203.0.113.42",
"latitude": 40.7128,
"longitude": -74.0060,
})
Step 2: Feature Engineering
Raw transactions are enriched with real-time features computed over sliding windows:
- Velocity features: transaction count and total amount in the last 1, 5, 15, and 60 minutes
- Behavioral deviation: how far the current amount deviates from the account's historical average
- Geolocation features: distance from last transaction, impossible travel detection
- Merchant risk: fraud rate associated with the merchant category
- Device and IP features: new device detection, high-risk geography
- Time-based features: hour of day, day of week, time since last transaction
Impossible Travel Detection
from math import radians, sin, cos, sqrt, atan2
def haversine_distance(lat1, lon1, lat2, lon2):
"""Calculate the great-circle distance between two points in km."""
R = 6371
dlat = radians(lat2 - lat1)
dlon = radians(lon2 - lon1)
a = sin(dlat/2)**2 + cos(radians(lat1)) * cos(radians(lat2)) * sin(dlon/2)**2
return R * 2 * atan2(sqrt(a), sqrt(1-a))
def detect_impossible_travel(current_tx, last_tx):
"""Flag transactions where travel speed exceeds 900 km/h."""
if last_tx is None:
return False, 0.0
distance_km = haversine_distance(
current_tx["latitude"], current_tx["longitude"],
last_tx["latitude"], last_tx["longitude"],
)
time_diff_hours = (
parse_timestamp(current_tx["event_timestamp"])
- parse_timestamp(last_tx["event_timestamp"])
).total_seconds() / 3600
if time_diff_hours <= 0:
return distance_km > 1, distance_km
speed_kmh = distance_km / time_diff_hours
return speed_kmh > 900, distance_km
Step 3: Feature Store Integration
A feature store bridges batch-computed historical features and real-time features, preventing training-serving skew.
from feast import FeatureStore, Entity, FeatureView, Field
from feast.types import Float64, Int64
from datetime import timedelta
account = Entity(
name="account_id",
join_keys=["account_id"],
)
account_behavior_fv = FeatureView(
name="account_behavior",
entities=[account],
ttl=timedelta(days=90),
schema=[
Field(name="avg_daily_tx_count", dtype=Float64),
Field(name="avg_daily_tx_amount", dtype=Float64),
Field(name="std_tx_amount", dtype=Float64),
Field(name="days_since_account_open", dtype=Int64),
Field(name="historical_fraud_count", dtype=Int64),
],
source=account_batch_source,
)
store = FeatureStore(repo_path="feature_repo/")
def get_account_features(account_id: str) -> dict:
features = store.get_online_features(
features=[
"account_behavior:avg_daily_tx_count",
"account_behavior:avg_daily_tx_amount",
"account_behavior:std_tx_amount",
"account_behavior:days_since_account_open",
"account_behavior:historical_fraud_count",
],
entity_rows=[{"account_id": account_id}],
)
return features.to_dict()
Step 4: ML Models for Fraud Detection
Model Architecture Comparison
| Model Type | Strengths | Latency | Use Case |
|---|---|---|---|
| Isolation Forest | Unsupervised, catches novel fraud patterns | < 5ms | Anomaly detection baseline |
| XGBoost / LightGBM | High accuracy, interpretable, fast inference | < 2ms | Primary scoring model |
| Neural Network (LSTM) | Captures sequential transaction patterns | < 15ms | Sequence-based detection |
| Graph Neural Network | Detects fraud rings and collusion | < 50ms | Network-based fraud |
Isolation Forest for Anomaly Detection
from sklearn.ensemble import IsolationForest
from sklearn.preprocessing import StandardScaler
import numpy as np
import joblib
def train_isolation_forest(historical_transactions: np.ndarray):
feature_columns = [
"amount", "window_tx_count_5m", "window_total_amount_5m",
"amount_deviation", "distance_from_last_tx",
"hour_of_day", "days_since_account_open",
]
scaler = StandardScaler()
X_scaled = scaler.fit_transform(historical_transactions[feature_columns])
model = IsolationForest(
n_estimators=200,
contamination=0.001,
max_samples="auto",
random_state=42,
n_jobs=-1,
)
model.fit(X_scaled)
joblib.dump(model, "models/isolation_forest.joblib")
joblib.dump(scaler, "models/scaler.joblib")
return model, scaler
def score_isolation_forest(features: dict) -> float:
model = joblib.load("models/isolation_forest.joblib")
scaler = joblib.load("models/scaler.joblib")
feature_vector = np.array([[
features["amount"], features["window_tx_count_5m"],
features["window_total_amount_5m"], features["amount_deviation"],
features["distance_from_last_tx"], features["hour_of_day"],
features["days_since_account_open"],
]])
X_scaled = scaler.transform(feature_vector)
raw_score = model.decision_function(X_scaled)[0]
anomaly_score = max(0, min(1, 0.5 - raw_score))
return anomaly_score
XGBoost Supervised Model
import xgboost as xgb
from sklearn.metrics import average_precision_score
def train_xgboost_fraud_model(X_train, y_train, X_val, y_val):
fraud_ratio = y_train.sum() / len(y_train)
scale_pos_weight = (1 - fraud_ratio) / fraud_ratio
params = {
"objective": "binary:logistic",
"eval_metric": ["aucpr", "logloss"],
"max_depth": 6,
"learning_rate": 0.05,
"subsample": 0.8,
"colsample_bytree": 0.8,
"scale_pos_weight": scale_pos_weight,
"min_child_weight": 5,
"tree_method": "hist",
}
dtrain = xgb.DMatrix(X_train, label=y_train)
dval = xgb.DMatrix(X_val, label=y_val)
model = xgb.train(
params, dtrain,
num_boost_round=500,
evals=[(dtrain, "train"), (dval, "val")],
early_stopping_rounds=30,
verbose_eval=50,
)
y_pred = model.predict(dval)
ap_score = average_precision_score(y_val, y_pred)
print(f"Average Precision Score: {ap_score:.4f}")
model.save_model("models/xgboost_fraud.json")
return model
Step 5: Rule Engine Layer
from dataclasses import dataclass
from typing import List, Callable
from enum import Enum
class RuleAction(Enum):
BLOCK = "block"
REVIEW = "review"
FLAG = "flag"
PASS = "pass"
@dataclass
class FraudRule:
name: str
description: str
condition: Callable[[dict], bool]
action: RuleAction
priority: int
class RuleEngine:
def __init__(self):
self.rules: List[FraudRule] = []
self._register_default_rules()
def _register_default_rules(self):
self.rules = [
FraudRule(
name="impossible_travel",
description="Transaction from impossible travel distance",
condition=lambda tx: tx.get("impossible_travel", False),
action=RuleAction.BLOCK,
priority=0,
),
FraudRule(
name="velocity_breach",
description="More than 10 transactions in 5 minutes",
condition=lambda tx: tx.get("window_tx_count_5m", 0) > 10,
action=RuleAction.BLOCK,
priority=0,
),
FraudRule(
name="high_value_cnp",
description="Card-not-present transaction over $5,000",
condition=lambda tx: (
not tx.get("card_present", True) and tx["amount"] > 5000
),
action=RuleAction.REVIEW,
priority=1,
),
FraudRule(
name="new_device_high_amount",
description="New device with transaction over $1,000",
condition=lambda tx: (
tx.get("is_new_device", False) and tx["amount"] > 1000
),
action=RuleAction.REVIEW,
priority=2,
),
]
self.rules.sort(key=lambda r: r.priority)
def evaluate(self, transaction: dict) -> tuple:
triggered = []
worst_action = RuleAction.PASS
for rule in self.rules:
if rule.condition(transaction):
triggered.append(rule.name)
if rule.action == RuleAction.BLOCK:
worst_action = RuleAction.BLOCK
elif rule.action == RuleAction.REVIEW and worst_action != RuleAction.BLOCK:
worst_action = RuleAction.REVIEW
return worst_action, triggered
Step 6: Decision Orchestrator
from dataclasses import dataclass
from enum import Enum
import time
class Decision(Enum):
APPROVE = "approve"
DECLINE = "decline"
MANUAL_REVIEW = "manual_review"
@dataclass
class FraudDecision:
decision: Decision
fraud_score: float
model_scores: dict
triggered_rules: list
latency_ms: float
explanation: str
class FraudDecisionOrchestrator:
def __init__(self):
self.rule_engine = RuleEngine()
def evaluate_transaction(self, enriched_tx: dict) -> FraudDecision:
start_time = time.monotonic()
historical_features = get_account_features(enriched_tx["account_id"])
features = {**enriched_tx, **historical_features}
isolation_score = score_isolation_forest(features)
xgboost_score = score_xgboost(features)
fraud_score = 0.3 * isolation_score + 0.7 * xgboost_score
rule_action, triggered_rules = self.rule_engine.evaluate(features)
if rule_action == RuleAction.BLOCK:
decision = Decision.DECLINE
explanation = f"Blocked by rules: {', '.join(triggered_rules)}"
elif fraud_score > 0.85:
decision = Decision.DECLINE
explanation = f"High fraud score: {fraud_score:.3f}"
elif fraud_score > 0.5 or rule_action == RuleAction.REVIEW:
decision = Decision.MANUAL_REVIEW
explanation = f"Review needed. Score: {fraud_score:.3f}"
else:
decision = Decision.APPROVE
explanation = f"Approved. Score: {fraud_score:.3f}"
latency_ms = (time.monotonic() - start_time) * 1000
return FraudDecision(
decision=decision,
fraud_score=fraud_score,
model_scores={"isolation_forest": isolation_score, "xgboost": xgboost_score},
triggered_rules=triggered_rules,
latency_ms=latency_ms,
explanation=explanation,
)
asyncio.gather(). The complete pipeline should target under 100ms at p99.
Monitoring and Model Performance
- Precision at recall thresholds: At 90% recall, typical targets are 5–10% precision (10–20 false alerts per true fraud catch).
- False positive rate: Industry benchmarks target < 0.5%.
- Model latency (p50, p95, p99): Track percentile latencies, not averages.
- Feature drift: Monitor using KL divergence or Population Stability Index (PSI). PSI above 0.2 signals significant drift.
- Concept drift: Track AUC-PR on a sliding window. A decline of more than 2% should trigger retraining.
Metrics Pipeline
from prometheus_client import Histogram, Counter, Gauge
FRAUD_SCORE_HISTOGRAM = Histogram(
"fraud_score_distribution",
"Distribution of fraud scores",
buckets=[0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1.0],
)
DECISION_COUNTER = Counter(
"fraud_decisions_total",
"Total fraud decisions by type",
["decision"],
)
PIPELINE_LATENCY = Histogram(
"fraud_pipeline_latency_ms",
"End-to-end pipeline latency",
buckets=[10, 25, 50, 75, 100, 150, 200, 500],
)
def record_decision(fraud_decision: FraudDecision):
FRAUD_SCORE_HISTOGRAM.observe(fraud_decision.fraud_score)
DECISION_COUNTER.labels(decision=fraud_decision.decision.value).inc()
PIPELINE_LATENCY.observe(fraud_decision.latency_ms)
Production Considerations
Handling Class Imbalance
- SMOTE or ADASYN for synthetic oversampling during training
- Cost-sensitive learning via
scale_pos_weightin XGBoost - Stratified sampling for validation splits
- AUC-PR over AUC-ROC for imbalanced datasets
Model Retraining Strategy
- Scheduled retraining: Weekly on a rolling 90-day window
- Champion-challenger deployment: Run new models alongside production before promoting
- Shadow mode: Deploy new models to score without influencing decisions
Conclusion
Building a real-time fraud detection pipeline is a cross-disciplinary challenge combining distributed systems (Kafka, Flink), machine learning (ensemble models, feature engineering), and domain expertise (banking regulations, fraud patterns). The key takeaways: invest heavily in feature engineering, use an ensemble approach combining unsupervised and supervised models, always pair ML with a rule engine, and build comprehensive monitoring from day one.
At HexoByte Solutions, we help financial institutions design and deploy these systems end to end. If your organization is looking to modernize its fraud detection capabilities, reach out to our team to discuss how we can help.