Real-Time Fraud Detection Pipeline: Architecture and Code

Back to All Articles

Introduction

Financial fraud costs the global economy over $500 billion annually, and that number continues to climb as digital transactions accelerate. Traditional batch-based fraud detection systems — where transactions are analyzed hours or days after the fact — are no longer sufficient. By the time a batch job flags a fraudulent transaction, the money is gone.

Modern fraud detection demands sub-second decision-making. When a customer swipes a card, initiates a wire transfer, or submits a payment, the system must evaluate that transaction against hundreds of features, multiple ML models, and a rule engine — all within 50–200 milliseconds.

In this article, we walk through the end-to-end architecture of a production-grade real-time fraud detection system, covering stream ingestion with Apache Kafka, stream processing with Apache Flink, feature engineering, ML model serving, and rule engine integration.

High-Level Architecture

Layer Technology Responsibility Latency Target
Ingestion Apache Kafka Receive and buffer raw transaction events < 5ms
Stream Processing Apache Flink Enrich transactions, compute real-time features < 50ms
Feature Store Redis / Feast Serve pre-computed and real-time features < 10ms
ML Inference TensorFlow Serving / Triton Score transactions with trained models < 30ms
Rule Engine Custom / Drools Apply deterministic business rules < 5ms
Decision Orchestrator service Combine scores, decide approve/decline/review < 10ms

The data flow: Transaction EventKafka TopicFlink Job (enrichment + features) → Feature Store LookupML Model ScoringRule EngineDecisionResponse.

Why Kafka + Flink? Kafka provides durable, partitioned, exactly-once message delivery at millions of events per second. Flink adds stateful stream processing with event-time semantics, windowed aggregations, and exactly-once state guarantees — critical for computing features like “number of transactions in the last 5 minutes” without data loss or duplication.

Step 1: Transaction Ingestion with Kafka

from confluent_kafka import Producer
import json
import uuid
from datetime import datetime

producer_config = {
    "bootstrap.servers": "kafka-broker-1:9092,kafka-broker-2:9092",
    "acks": "all",
    "enable.idempotence": True,
    "max.in.flight.requests.per.connection": 5,
}

producer = Producer(producer_config)

def publish_transaction(transaction: dict):
    transaction["event_id"] = str(uuid.uuid4())
    transaction["event_timestamp"] = datetime.utcnow().isoformat()

    producer.produce(
        topic="transactions.raw",
        key=transaction["account_id"],
        value=json.dumps(transaction).encode("utf-8"),
        callback=delivery_report,
    )
    producer.flush()

def delivery_report(err, msg):
    if err:
        print(f"Delivery failed: {err}")

publish_transaction({
    "account_id": "ACC-1029384",
    "amount": 2499.99,
    "currency": "USD",
    "merchant_id": "MERCH-8827",
    "merchant_category": "electronics",
    "card_present": False,
    "ip_address": "203.0.113.42",
    "latitude": 40.7128,
    "longitude": -74.0060,
})

Step 2: Feature Engineering

Raw transactions are enriched with real-time features computed over sliding windows:

Impossible Travel Detection

from math import radians, sin, cos, sqrt, atan2

def haversine_distance(lat1, lon1, lat2, lon2):
    """Calculate the great-circle distance between two points in km."""
    R = 6371
    dlat = radians(lat2 - lat1)
    dlon = radians(lon2 - lon1)
    a = sin(dlat/2)**2 + cos(radians(lat1)) * cos(radians(lat2)) * sin(dlon/2)**2
    return R * 2 * atan2(sqrt(a), sqrt(1-a))

def detect_impossible_travel(current_tx, last_tx):
    """Flag transactions where travel speed exceeds 900 km/h."""
    if last_tx is None:
        return False, 0.0

    distance_km = haversine_distance(
        current_tx["latitude"], current_tx["longitude"],
        last_tx["latitude"], last_tx["longitude"],
    )
    time_diff_hours = (
        parse_timestamp(current_tx["event_timestamp"])
        - parse_timestamp(last_tx["event_timestamp"])
    ).total_seconds() / 3600

    if time_diff_hours <= 0:
        return distance_km > 1, distance_km

    speed_kmh = distance_km / time_diff_hours
    return speed_kmh > 900, distance_km

Step 3: Feature Store Integration

A feature store bridges batch-computed historical features and real-time features, preventing training-serving skew.

from feast import FeatureStore, Entity, FeatureView, Field
from feast.types import Float64, Int64
from datetime import timedelta

account = Entity(
    name="account_id",
    join_keys=["account_id"],
)

account_behavior_fv = FeatureView(
    name="account_behavior",
    entities=[account],
    ttl=timedelta(days=90),
    schema=[
        Field(name="avg_daily_tx_count", dtype=Float64),
        Field(name="avg_daily_tx_amount", dtype=Float64),
        Field(name="std_tx_amount", dtype=Float64),
        Field(name="days_since_account_open", dtype=Int64),
        Field(name="historical_fraud_count", dtype=Int64),
    ],
    source=account_batch_source,
)

store = FeatureStore(repo_path="feature_repo/")

def get_account_features(account_id: str) -> dict:
    features = store.get_online_features(
        features=[
            "account_behavior:avg_daily_tx_count",
            "account_behavior:avg_daily_tx_amount",
            "account_behavior:std_tx_amount",
            "account_behavior:days_since_account_open",
            "account_behavior:historical_fraud_count",
        ],
        entity_rows=[{"account_id": account_id}],
    )
    return features.to_dict()

Step 4: ML Models for Fraud Detection

Model Architecture Comparison

Model Type Strengths Latency Use Case
Isolation Forest Unsupervised, catches novel fraud patterns < 5ms Anomaly detection baseline
XGBoost / LightGBM High accuracy, interpretable, fast inference < 2ms Primary scoring model
Neural Network (LSTM) Captures sequential transaction patterns < 15ms Sequence-based detection
Graph Neural Network Detects fraud rings and collusion < 50ms Network-based fraud

Isolation Forest for Anomaly Detection

from sklearn.ensemble import IsolationForest
from sklearn.preprocessing import StandardScaler
import numpy as np
import joblib

def train_isolation_forest(historical_transactions: np.ndarray):
    feature_columns = [
        "amount", "window_tx_count_5m", "window_total_amount_5m",
        "amount_deviation", "distance_from_last_tx",
        "hour_of_day", "days_since_account_open",
    ]

    scaler = StandardScaler()
    X_scaled = scaler.fit_transform(historical_transactions[feature_columns])

    model = IsolationForest(
        n_estimators=200,
        contamination=0.001,
        max_samples="auto",
        random_state=42,
        n_jobs=-1,
    )
    model.fit(X_scaled)

    joblib.dump(model, "models/isolation_forest.joblib")
    joblib.dump(scaler, "models/scaler.joblib")
    return model, scaler

def score_isolation_forest(features: dict) -> float:
    model = joblib.load("models/isolation_forest.joblib")
    scaler = joblib.load("models/scaler.joblib")

    feature_vector = np.array([[
        features["amount"], features["window_tx_count_5m"],
        features["window_total_amount_5m"], features["amount_deviation"],
        features["distance_from_last_tx"], features["hour_of_day"],
        features["days_since_account_open"],
    ]])

    X_scaled = scaler.transform(feature_vector)
    raw_score = model.decision_function(X_scaled)[0]
    anomaly_score = max(0, min(1, 0.5 - raw_score))
    return anomaly_score

XGBoost Supervised Model

import xgboost as xgb
from sklearn.metrics import average_precision_score

def train_xgboost_fraud_model(X_train, y_train, X_val, y_val):
    fraud_ratio = y_train.sum() / len(y_train)
    scale_pos_weight = (1 - fraud_ratio) / fraud_ratio

    params = {
        "objective": "binary:logistic",
        "eval_metric": ["aucpr", "logloss"],
        "max_depth": 6,
        "learning_rate": 0.05,
        "subsample": 0.8,
        "colsample_bytree": 0.8,
        "scale_pos_weight": scale_pos_weight,
        "min_child_weight": 5,
        "tree_method": "hist",
    }

    dtrain = xgb.DMatrix(X_train, label=y_train)
    dval = xgb.DMatrix(X_val, label=y_val)

    model = xgb.train(
        params, dtrain,
        num_boost_round=500,
        evals=[(dtrain, "train"), (dval, "val")],
        early_stopping_rounds=30,
        verbose_eval=50,
    )

    y_pred = model.predict(dval)
    ap_score = average_precision_score(y_val, y_pred)
    print(f"Average Precision Score: {ap_score:.4f}")

    model.save_model("models/xgboost_fraud.json")
    return model

Step 5: Rule Engine Layer

from dataclasses import dataclass
from typing import List, Callable
from enum import Enum

class RuleAction(Enum):
    BLOCK = "block"
    REVIEW = "review"
    FLAG = "flag"
    PASS = "pass"

@dataclass
class FraudRule:
    name: str
    description: str
    condition: Callable[[dict], bool]
    action: RuleAction
    priority: int

class RuleEngine:
    def __init__(self):
        self.rules: List[FraudRule] = []
        self._register_default_rules()

    def _register_default_rules(self):
        self.rules = [
            FraudRule(
                name="impossible_travel",
                description="Transaction from impossible travel distance",
                condition=lambda tx: tx.get("impossible_travel", False),
                action=RuleAction.BLOCK,
                priority=0,
            ),
            FraudRule(
                name="velocity_breach",
                description="More than 10 transactions in 5 minutes",
                condition=lambda tx: tx.get("window_tx_count_5m", 0) > 10,
                action=RuleAction.BLOCK,
                priority=0,
            ),
            FraudRule(
                name="high_value_cnp",
                description="Card-not-present transaction over $5,000",
                condition=lambda tx: (
                    not tx.get("card_present", True) and tx["amount"] > 5000
                ),
                action=RuleAction.REVIEW,
                priority=1,
            ),
            FraudRule(
                name="new_device_high_amount",
                description="New device with transaction over $1,000",
                condition=lambda tx: (
                    tx.get("is_new_device", False) and tx["amount"] > 1000
                ),
                action=RuleAction.REVIEW,
                priority=2,
            ),
        ]
        self.rules.sort(key=lambda r: r.priority)

    def evaluate(self, transaction: dict) -> tuple:
        triggered = []
        worst_action = RuleAction.PASS

        for rule in self.rules:
            if rule.condition(transaction):
                triggered.append(rule.name)
                if rule.action == RuleAction.BLOCK:
                    worst_action = RuleAction.BLOCK
                elif rule.action == RuleAction.REVIEW and worst_action != RuleAction.BLOCK:
                    worst_action = RuleAction.REVIEW

        return worst_action, triggered

Step 6: Decision Orchestrator

from dataclasses import dataclass
from enum import Enum
import time

class Decision(Enum):
    APPROVE = "approve"
    DECLINE = "decline"
    MANUAL_REVIEW = "manual_review"

@dataclass
class FraudDecision:
    decision: Decision
    fraud_score: float
    model_scores: dict
    triggered_rules: list
    latency_ms: float
    explanation: str

class FraudDecisionOrchestrator:
    def __init__(self):
        self.rule_engine = RuleEngine()

    def evaluate_transaction(self, enriched_tx: dict) -> FraudDecision:
        start_time = time.monotonic()

        historical_features = get_account_features(enriched_tx["account_id"])
        features = {**enriched_tx, **historical_features}

        isolation_score = score_isolation_forest(features)
        xgboost_score = score_xgboost(features)

        fraud_score = 0.3 * isolation_score + 0.7 * xgboost_score

        rule_action, triggered_rules = self.rule_engine.evaluate(features)

        if rule_action == RuleAction.BLOCK:
            decision = Decision.DECLINE
            explanation = f"Blocked by rules: {', '.join(triggered_rules)}"
        elif fraud_score > 0.85:
            decision = Decision.DECLINE
            explanation = f"High fraud score: {fraud_score:.3f}"
        elif fraud_score > 0.5 or rule_action == RuleAction.REVIEW:
            decision = Decision.MANUAL_REVIEW
            explanation = f"Review needed. Score: {fraud_score:.3f}"
        else:
            decision = Decision.APPROVE
            explanation = f"Approved. Score: {fraud_score:.3f}"

        latency_ms = (time.monotonic() - start_time) * 1000

        return FraudDecision(
            decision=decision,
            fraud_score=fraud_score,
            model_scores={"isolation_forest": isolation_score, "xgboost": xgboost_score},
            triggered_rules=triggered_rules,
            latency_ms=latency_ms,
            explanation=explanation,
        )
Latency Budget: Feature store lookups should complete in under 10ms (use Redis). ML model inference should be under 15ms per model. Run models in parallel using asyncio.gather(). The complete pipeline should target under 100ms at p99.

Monitoring and Model Performance

Metrics Pipeline

from prometheus_client import Histogram, Counter, Gauge

FRAUD_SCORE_HISTOGRAM = Histogram(
    "fraud_score_distribution",
    "Distribution of fraud scores",
    buckets=[0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1.0],
)
DECISION_COUNTER = Counter(
    "fraud_decisions_total",
    "Total fraud decisions by type",
    ["decision"],
)
PIPELINE_LATENCY = Histogram(
    "fraud_pipeline_latency_ms",
    "End-to-end pipeline latency",
    buckets=[10, 25, 50, 75, 100, 150, 200, 500],
)

def record_decision(fraud_decision: FraudDecision):
    FRAUD_SCORE_HISTOGRAM.observe(fraud_decision.fraud_score)
    DECISION_COUNTER.labels(decision=fraud_decision.decision.value).inc()
    PIPELINE_LATENCY.observe(fraud_decision.latency_ms)

Production Considerations

Handling Class Imbalance

Model Retraining Strategy

Conclusion

Building a real-time fraud detection pipeline is a cross-disciplinary challenge combining distributed systems (Kafka, Flink), machine learning (ensemble models, feature engineering), and domain expertise (banking regulations, fraud patterns). The key takeaways: invest heavily in feature engineering, use an ensemble approach combining unsupervised and supervised models, always pair ML with a rule engine, and build comprehensive monitoring from day one.

At HexoByte Solutions, we help financial institutions design and deploy these systems end to end. If your organization is looking to modernize its fraud detection capabilities, reach out to our team to discuss how we can help.