How to Set Up Kafka for Real-Time Event Streaming (with Code)

Back to All Articles

Introduction

Apache Kafka has become the backbone of modern event-driven architectures. Whether you are building real-time analytics pipelines, synchronizing microservices, or ingesting IoT telemetry at scale, Kafka provides a durable, high-throughput, distributed streaming platform that handles millions of events per second.

With the release of KRaft mode (Kafka Raft), ZooKeeper is no longer required — simplifying operations and reducing infrastructure complexity. As of Kafka 3.5+, KRaft is production-ready and the recommended deployment model.

In this guide, we walk through setting up Kafka in KRaft mode using Docker Compose, writing producers and consumers in both Python and Java, integrating Schema Registry and Kafka Connect, and applying production-grade performance tuning.

What you will learn: Kafka architecture fundamentals → KRaft mode setup with Docker Compose → Python and Java producer/consumer code → Schema Registry & Kafka Connect → Performance tuning → Kafka vs alternatives comparison.

Kafka Architecture Overview

KRaft Mode: Why ZooKeeper Is Gone

Key benefits of KRaft mode:

Setting Up Kafka with Docker Compose (KRaft Mode)

# docker-compose.yml
version: '3.8'

services:
  kafka-1:
    image: confluentinc/cp-kafka:7.7.1
    hostname: kafka-1
    container_name: kafka-1
    ports:
      - "9092:9092"
    environment:
      KAFKA_NODE_ID: 1
      KAFKA_PROCESS_ROLES: broker,controller
      KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka-1:29093,2@kafka-2:29093,3@kafka-3:29093
      KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:29092,CONTROLLER://0.0.0.0:29093,EXTERNAL://0.0.0.0:9092
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka-1:29092,EXTERNAL://localhost:9092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,EXTERNAL:PLAINTEXT
      KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_LOG_DIRS: /var/lib/kafka/data
      KAFKA_NUM_PARTITIONS: 3
      KAFKA_DEFAULT_REPLICATION_FACTOR: 3
      KAFKA_MIN_INSYNC_REPLICAS: 2
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
      CLUSTER_ID: "MkU3OEVBNTcwNTJENDM2Qk"
    volumes:
      - kafka-1-data:/var/lib/kafka/data

  kafka-2:
    image: confluentinc/cp-kafka:7.7.1
    hostname: kafka-2
    container_name: kafka-2
    ports:
      - "9093:9092"
    environment:
      KAFKA_NODE_ID: 2
      KAFKA_PROCESS_ROLES: broker,controller
      KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka-1:29093,2@kafka-2:29093,3@kafka-3:29093
      KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:29092,CONTROLLER://0.0.0.0:29093,EXTERNAL://0.0.0.0:9092
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka-2:29092,EXTERNAL://localhost:9093
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,EXTERNAL:PLAINTEXT
      KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_LOG_DIRS: /var/lib/kafka/data
      KAFKA_NUM_PARTITIONS: 3
      KAFKA_DEFAULT_REPLICATION_FACTOR: 3
      KAFKA_MIN_INSYNC_REPLICAS: 2
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
      CLUSTER_ID: "MkU3OEVBNTcwNTJENDM2Qk"
    volumes:
      - kafka-2-data:/var/lib/kafka/data

  kafka-3:
    image: confluentinc/cp-kafka:7.7.1
    hostname: kafka-3
    container_name: kafka-3
    ports:
      - "9094:9092"
    environment:
      KAFKA_NODE_ID: 3
      KAFKA_PROCESS_ROLES: broker,controller
      KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka-1:29093,2@kafka-2:29093,3@kafka-3:29093
      KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:29092,CONTROLLER://0.0.0.0:29093,EXTERNAL://0.0.0.0:9092
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka-3:29092,EXTERNAL://localhost:9094
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,EXTERNAL:PLAINTEXT
      KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_LOG_DIRS: /var/lib/kafka/data
      KAFKA_NUM_PARTITIONS: 3
      KAFKA_DEFAULT_REPLICATION_FACTOR: 3
      KAFKA_MIN_INSYNC_REPLICAS: 2
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
      CLUSTER_ID: "MkU3OEVBNTcwNTJENDM2Qk"
    volumes:
      - kafka-3-data:/var/lib/kafka/data

  schema-registry:
    image: confluentinc/cp-schema-registry:7.7.1
    hostname: schema-registry
    container_name: schema-registry
    depends_on:
      - kafka-1
      - kafka-2
      - kafka-3
    ports:
      - "8081:8081"
    environment:
      SCHEMA_REGISTRY_HOST_NAME: schema-registry
      SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: kafka-1:29092,kafka-2:29092,kafka-3:29092
      SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081

  kafka-ui:
    image: provectuslabs/kafka-ui:latest
    container_name: kafka-ui
    ports:
      - "8080:8080"
    environment:
      KAFKA_CLUSTERS_0_NAME: local
      KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka-1:29092,kafka-2:29092,kafka-3:29092
      KAFKA_CLUSTERS_0_SCHEMAREGISTRY: http://schema-registry:8081
    depends_on:
      - kafka-1
      - schema-registry

volumes:
  kafka-1-data:
  kafka-2-data:
  kafka-3-data:

Start the cluster:

docker compose up -d

# Verify brokers are running
docker compose ps

# Create a test topic
docker exec kafka-1 kafka-topics --bootstrap-server localhost:29092 \
  --create --topic events --partitions 6 --replication-factor 3
Key configuration note: The CLUSTER_ID must be identical across all brokers. Generate one with kafka-storage random-uuid. The KAFKA_PROCESS_ROLES can be broker,controller (combined mode for small clusters) or separated for production at scale.

Kafka Producer and Consumer in Python

The confluent-kafka Python library is the officially supported client, built on librdkafka.

Producer (Python)

from confluent_kafka import Producer
import json
import time

conf = {
    'bootstrap.servers': 'localhost:9092,localhost:9093,localhost:9094',
    'client.id': 'python-producer',
    'acks': 'all',
    'retries': 5,
    'linger.ms': 10,
    'batch.size': 65536,
    'compression.type': 'lz4',
    'enable.idempotence': True,
}

producer = Producer(conf)

def delivery_callback(err, msg):
    if err:
        print(f'Message delivery failed: {err}')
    else:
        print(f'Delivered to {msg.topic()}[{msg.partition()}] @ offset {msg.offset()}')

for i in range(100):
    event = {
        'event_id': i,
        'event_type': 'page_view',
        'user_id': f'user_{i % 10}',
        'timestamp': time.time(),
        'url': f'/products/{i}'
    }
    producer.produce(
        topic='events',
        key=str(event['user_id']).encode('utf-8'),
        value=json.dumps(event).encode('utf-8'),
        callback=delivery_callback
    )
    producer.poll(0)

producer.flush(timeout=30)

Consumer (Python)

from confluent_kafka import Consumer, KafkaError
import json

conf = {
    'bootstrap.servers': 'localhost:9092,localhost:9093,localhost:9094',
    'group.id': 'event-processor-group',
    'auto.offset.reset': 'earliest',
    'enable.auto.commit': False,
    'max.poll.interval.ms': 300000,
    'session.timeout.ms': 45000,
    'fetch.min.bytes': 1024,
}

consumer = Consumer(conf)
consumer.subscribe(['events'])

try:
    while True:
        msg = consumer.poll(timeout=1.0)
        if msg is None:
            continue
        if msg.error():
            if msg.error().code() == KafkaError._PARTITION_EOF:
                continue
            print(f'Consumer error: {msg.error()}')
            break

        event = json.loads(msg.value().decode('utf-8'))
        print(f'Received: {event["event_type"]} from {event["user_id"]}')

        # Commit offset after successful processing
        consumer.commit(asynchronous=False)

except KeyboardInterrupt:
    pass
finally:
    consumer.close()
Why manual commits? With enable.auto.commit=False, you control exactly when offsets are committed. This prevents data loss — if your consumer crashes after auto-committing but before processing, that message is lost. Manual commits ensure at-least-once delivery semantics.

Kafka Producer and Consumer in Java

Producer (Java)

import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;

public class EventProducer {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
                  "localhost:9092,localhost:9093,localhost:9094");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                  StringSerializer.class.getName());
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                  StringSerializer.class.getName());
        props.put(ProducerConfig.ACKS_CONFIG, "all");
        props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
        props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "lz4");
        props.put(ProducerConfig.LINGER_MS_CONFIG, 10);

        try (KafkaProducer<String, String> producer =
                 new KafkaProducer<>(props)) {
            for (int i = 0; i < 100; i++) {
                String key = "user_" + (i % 10);
                String value = String.format(
                    "{"event_id":%d,"event_type":"page_view","user_id":"%s"}",
                    i, key);

                ProducerRecord<String, String> record =
                    new ProducerRecord<>("events", key, value);

                producer.send(record, (metadata, exception) -> {
                    if (exception != null) {
                        exception.printStackTrace();
                    } else {
                        System.out.printf("Sent to %s[%d] @ %d%n",
                            metadata.topic(), metadata.partition(),
                            metadata.offset());
                    }
                });
            }
        }
    }
}

Schema Registry: Enforcing Data Contracts

from confluent_kafka import SerializingProducer
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroSerializer

schema_str = """
{
  "type": "record",
  "name": "PageViewEvent",
  "namespace": "com.hexobyte.events",
  "fields": [
    {"name": "event_id", "type": "int"},
    {"name": "user_id", "type": "string"},
    {"name": "url", "type": "string"},
    {"name": "timestamp", "type": "long", "logicalType": "timestamp-millis"}
  ]
}
"""

schema_registry_client = SchemaRegistryClient({
    'url': 'http://localhost:8081'
})

avro_serializer = AvroSerializer(
    schema_registry_client,
    schema_str,
    lambda obj, ctx: obj
)

producer = SerializingProducer({
    'bootstrap.servers': 'localhost:9092',
    'key.serializer': lambda k, ctx: k.encode('utf-8'),
    'value.serializer': avro_serializer,
})

producer.produce(
    topic='page-views',
    key='user_42',
    value={
        'event_id': 1,
        'user_id': 'user_42',
        'url': '/products/kafka-guide',
        'timestamp': 1741500000000
    }
)
producer.flush()

Kafka Connect: Integrating External Systems

Common connectors:

Example: Debezium PostgreSQL Connector

# Register a connector via the Kafka Connect REST API
curl -X POST http://localhost:8083/connectors \
  -H "Content-Type: application/json" \
  -d '{
    "name": "postgres-source",
    "config": {
      "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
      "database.hostname": "postgres",
      "database.port": "5432",
      "database.user": "kafka_user",
      "database.password": "secret",
      "database.dbname": "app_db",
      "topic.prefix": "cdc",
      "table.include.list": "public.orders,public.users",
      "plugin.name": "pgoutput",
      "slot.name": "debezium_slot"
    }
  }'

Performance Tuning

Producer Tuning

Consumer Tuning

Broker Configuration

Production checklist: Enable TLS encryption (SASL_SSL) for all communication. Use ACLs to restrict topic access. Monitor with JMX metrics exported to Prometheus/Grafana. Alert on consumer lag, under-replicated partitions, and controller availability.

Kafka vs Alternatives

Feature Apache Kafka Apache Pulsar Redpanda
Language Java/Scala Java C++
Metadata KRaft (built-in) ZooKeeper (moving to Oxia) Raft (built-in)
Multi-tenancy Limited (topic-level ACLs) Native (namespaces, tenants) Limited
Latency Low (2–10ms p99) Low (5–15ms p99) Ultra-low (<2ms p99)
Ecosystem Largest (Connect, Streams, ksqlDB) Growing (Functions, IO connectors) Kafka-compatible ecosystem
Ops Complexity Moderate High (Kafka + BookKeeper + ZK) Low (single binary)
Best For General-purpose streaming at scale Multi-tenant, geo-replicated workloads Low-latency, simple operations

Our recommendation: Choose Kafka for the broadest ecosystem and battle-tested reliability. Consider Redpanda for operational simplicity and ultra-low latency. Consider Pulsar for natively multi-tenant or geo-replicated architectures.

Conclusion

Apache Kafka remains the industry standard for real-time event streaming, and KRaft mode has made it significantly easier to operate. In this guide, we covered the full journey — from a 3-broker KRaft cluster with Docker Compose, to production-quality producers and consumers in Python and Java, to Schema Registry for data contracts and Kafka Connect for database streaming.

Key takeaways:

At HexoByte Solutions, we help organizations design and implement event-driven architectures that scale. Whether you are migrating from a monolith or building a greenfield streaming platform, our team can guide you from architecture to production.