Introduction
Apache Kafka has become the backbone of modern event-driven architectures. Whether you are building real-time analytics pipelines, synchronizing microservices, or ingesting IoT telemetry at scale, Kafka provides a durable, high-throughput, distributed streaming platform that handles millions of events per second.
With the release of KRaft mode (Kafka Raft), ZooKeeper is no longer required — simplifying operations and reducing infrastructure complexity. As of Kafka 3.5+, KRaft is production-ready and the recommended deployment model.
In this guide, we walk through setting up Kafka in KRaft mode using Docker Compose, writing producers and consumers in both Python and Java, integrating Schema Registry and Kafka Connect, and applying production-grade performance tuning.
Kafka Architecture Overview
- Brokers — Kafka servers that store data and serve client requests. A cluster typically runs 3+ brokers for fault tolerance.
- Topics — Logical channels for organizing messages, divided into partitions for parallelism.
- Partitions — Ordered, immutable sequences of records, replicated across brokers for durability.
- Producers — Applications that publish events to topics.
- Consumers — Applications that subscribe to events. Consumers in the same consumer group share the workload.
- KRaft Controller — Replaces ZooKeeper for cluster metadata management using the Raft consensus protocol.
KRaft Mode: Why ZooKeeper Is Gone
Key benefits of KRaft mode:
- Simplified operations — No separate ZooKeeper cluster to manage.
- Faster controller failover — Metadata recovery is significantly faster.
- Better scalability — Supports millions of partitions per cluster.
- Single security model — No need to secure both Kafka and ZooKeeper separately.
Setting Up Kafka with Docker Compose (KRaft Mode)
# docker-compose.yml
version: '3.8'
services:
kafka-1:
image: confluentinc/cp-kafka:7.7.1
hostname: kafka-1
container_name: kafka-1
ports:
- "9092:9092"
environment:
KAFKA_NODE_ID: 1
KAFKA_PROCESS_ROLES: broker,controller
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka-1:29093,2@kafka-2:29093,3@kafka-3:29093
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:29092,CONTROLLER://0.0.0.0:29093,EXTERNAL://0.0.0.0:9092
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka-1:29092,EXTERNAL://localhost:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,EXTERNAL:PLAINTEXT
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_LOG_DIRS: /var/lib/kafka/data
KAFKA_NUM_PARTITIONS: 3
KAFKA_DEFAULT_REPLICATION_FACTOR: 3
KAFKA_MIN_INSYNC_REPLICAS: 2
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
CLUSTER_ID: "MkU3OEVBNTcwNTJENDM2Qk"
volumes:
- kafka-1-data:/var/lib/kafka/data
kafka-2:
image: confluentinc/cp-kafka:7.7.1
hostname: kafka-2
container_name: kafka-2
ports:
- "9093:9092"
environment:
KAFKA_NODE_ID: 2
KAFKA_PROCESS_ROLES: broker,controller
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka-1:29093,2@kafka-2:29093,3@kafka-3:29093
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:29092,CONTROLLER://0.0.0.0:29093,EXTERNAL://0.0.0.0:9092
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka-2:29092,EXTERNAL://localhost:9093
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,EXTERNAL:PLAINTEXT
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_LOG_DIRS: /var/lib/kafka/data
KAFKA_NUM_PARTITIONS: 3
KAFKA_DEFAULT_REPLICATION_FACTOR: 3
KAFKA_MIN_INSYNC_REPLICAS: 2
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
CLUSTER_ID: "MkU3OEVBNTcwNTJENDM2Qk"
volumes:
- kafka-2-data:/var/lib/kafka/data
kafka-3:
image: confluentinc/cp-kafka:7.7.1
hostname: kafka-3
container_name: kafka-3
ports:
- "9094:9092"
environment:
KAFKA_NODE_ID: 3
KAFKA_PROCESS_ROLES: broker,controller
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka-1:29093,2@kafka-2:29093,3@kafka-3:29093
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:29092,CONTROLLER://0.0.0.0:29093,EXTERNAL://0.0.0.0:9092
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka-3:29092,EXTERNAL://localhost:9094
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,EXTERNAL:PLAINTEXT
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_LOG_DIRS: /var/lib/kafka/data
KAFKA_NUM_PARTITIONS: 3
KAFKA_DEFAULT_REPLICATION_FACTOR: 3
KAFKA_MIN_INSYNC_REPLICAS: 2
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
CLUSTER_ID: "MkU3OEVBNTcwNTJENDM2Qk"
volumes:
- kafka-3-data:/var/lib/kafka/data
schema-registry:
image: confluentinc/cp-schema-registry:7.7.1
hostname: schema-registry
container_name: schema-registry
depends_on:
- kafka-1
- kafka-2
- kafka-3
ports:
- "8081:8081"
environment:
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: kafka-1:29092,kafka-2:29092,kafka-3:29092
SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081
kafka-ui:
image: provectuslabs/kafka-ui:latest
container_name: kafka-ui
ports:
- "8080:8080"
environment:
KAFKA_CLUSTERS_0_NAME: local
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka-1:29092,kafka-2:29092,kafka-3:29092
KAFKA_CLUSTERS_0_SCHEMAREGISTRY: http://schema-registry:8081
depends_on:
- kafka-1
- schema-registry
volumes:
kafka-1-data:
kafka-2-data:
kafka-3-data:
Start the cluster:
docker compose up -d
# Verify brokers are running
docker compose ps
# Create a test topic
docker exec kafka-1 kafka-topics --bootstrap-server localhost:29092 \
--create --topic events --partitions 6 --replication-factor 3
CLUSTER_ID must be identical across all brokers. Generate one with kafka-storage random-uuid. The KAFKA_PROCESS_ROLES can be broker,controller (combined mode for small clusters) or separated for production at scale.
Kafka Producer and Consumer in Python
The confluent-kafka Python library is the officially supported client, built on librdkafka.
Producer (Python)
from confluent_kafka import Producer
import json
import time
conf = {
'bootstrap.servers': 'localhost:9092,localhost:9093,localhost:9094',
'client.id': 'python-producer',
'acks': 'all',
'retries': 5,
'linger.ms': 10,
'batch.size': 65536,
'compression.type': 'lz4',
'enable.idempotence': True,
}
producer = Producer(conf)
def delivery_callback(err, msg):
if err:
print(f'Message delivery failed: {err}')
else:
print(f'Delivered to {msg.topic()}[{msg.partition()}] @ offset {msg.offset()}')
for i in range(100):
event = {
'event_id': i,
'event_type': 'page_view',
'user_id': f'user_{i % 10}',
'timestamp': time.time(),
'url': f'/products/{i}'
}
producer.produce(
topic='events',
key=str(event['user_id']).encode('utf-8'),
value=json.dumps(event).encode('utf-8'),
callback=delivery_callback
)
producer.poll(0)
producer.flush(timeout=30)
Consumer (Python)
from confluent_kafka import Consumer, KafkaError
import json
conf = {
'bootstrap.servers': 'localhost:9092,localhost:9093,localhost:9094',
'group.id': 'event-processor-group',
'auto.offset.reset': 'earliest',
'enable.auto.commit': False,
'max.poll.interval.ms': 300000,
'session.timeout.ms': 45000,
'fetch.min.bytes': 1024,
}
consumer = Consumer(conf)
consumer.subscribe(['events'])
try:
while True:
msg = consumer.poll(timeout=1.0)
if msg is None:
continue
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
continue
print(f'Consumer error: {msg.error()}')
break
event = json.loads(msg.value().decode('utf-8'))
print(f'Received: {event["event_type"]} from {event["user_id"]}')
# Commit offset after successful processing
consumer.commit(asynchronous=False)
except KeyboardInterrupt:
pass
finally:
consumer.close()
enable.auto.commit=False, you control exactly when offsets are committed. This prevents data loss — if your consumer crashes after auto-committing but before processing, that message is lost. Manual commits ensure at-least-once delivery semantics.
Kafka Producer and Consumer in Java
Producer (Java)
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
public class EventProducer {
public static void main(String[] args) {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
"localhost:9092,localhost:9093,localhost:9094");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
StringSerializer.class.getName());
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "lz4");
props.put(ProducerConfig.LINGER_MS_CONFIG, 10);
try (KafkaProducer<String, String> producer =
new KafkaProducer<>(props)) {
for (int i = 0; i < 100; i++) {
String key = "user_" + (i % 10);
String value = String.format(
"{"event_id":%d,"event_type":"page_view","user_id":"%s"}",
i, key);
ProducerRecord<String, String> record =
new ProducerRecord<>("events", key, value);
producer.send(record, (metadata, exception) -> {
if (exception != null) {
exception.printStackTrace();
} else {
System.out.printf("Sent to %s[%d] @ %d%n",
metadata.topic(), metadata.partition(),
metadata.offset());
}
});
}
}
}
}
Schema Registry: Enforcing Data Contracts
from confluent_kafka import SerializingProducer
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroSerializer
schema_str = """
{
"type": "record",
"name": "PageViewEvent",
"namespace": "com.hexobyte.events",
"fields": [
{"name": "event_id", "type": "int"},
{"name": "user_id", "type": "string"},
{"name": "url", "type": "string"},
{"name": "timestamp", "type": "long", "logicalType": "timestamp-millis"}
]
}
"""
schema_registry_client = SchemaRegistryClient({
'url': 'http://localhost:8081'
})
avro_serializer = AvroSerializer(
schema_registry_client,
schema_str,
lambda obj, ctx: obj
)
producer = SerializingProducer({
'bootstrap.servers': 'localhost:9092',
'key.serializer': lambda k, ctx: k.encode('utf-8'),
'value.serializer': avro_serializer,
})
producer.produce(
topic='page-views',
key='user_42',
value={
'event_id': 1,
'user_id': 'user_42',
'url': '/products/kafka-guide',
'timestamp': 1741500000000
}
)
producer.flush()
Kafka Connect: Integrating External Systems
Common connectors:
- Debezium — Change Data Capture (CDC) from PostgreSQL, MySQL, MongoDB
- JDBC Source/Sink — Poll databases or write events to relational databases
- S3 Sink — Archive topics to Amazon S3 in Parquet or Avro format
- Elasticsearch Sink — Index events for full-text search
Example: Debezium PostgreSQL Connector
# Register a connector via the Kafka Connect REST API
curl -X POST http://localhost:8083/connectors \
-H "Content-Type: application/json" \
-d '{
"name": "postgres-source",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "postgres",
"database.port": "5432",
"database.user": "kafka_user",
"database.password": "secret",
"database.dbname": "app_db",
"topic.prefix": "cdc",
"table.include.list": "public.orders,public.users",
"plugin.name": "pgoutput",
"slot.name": "debezium_slot"
}
}'
Performance Tuning
Producer Tuning
linger.ms=10-50— Batch messages for better throughputbatch.size=65536-131072— Larger batches reduce network round tripscompression.type=lz4— Best throughput-to-compression ratioacks=all— Required for durability withmin.insync.replicas=2enable.idempotence=true— Prevents duplicate messages on retries
Consumer Tuning
fetch.min.bytes=1024-65536— Reduces fetch request frequencymax.poll.records=500— Tune based on per-record processing timesession.timeout.ms=45000— Prevents unnecessary rebalances- Partition assignment — Use
CooperativeStickyAssignorfor smoother rebalances
Broker Configuration
- Partition count — 10-30 per topic for most workloads
- Replication factor of 3 — Tolerates one broker failure
min.insync.replicas=2— Requires 2 replicas to acknowledge writes- Log retention —
log.retention.hours=168(7 days) is typical
SASL_SSL) for all communication. Use ACLs to restrict topic access. Monitor with JMX metrics exported to Prometheus/Grafana. Alert on consumer lag, under-replicated partitions, and controller availability.
Kafka vs Alternatives
| Feature | Apache Kafka | Apache Pulsar | Redpanda |
|---|---|---|---|
| Language | Java/Scala | Java | C++ |
| Metadata | KRaft (built-in) | ZooKeeper (moving to Oxia) | Raft (built-in) |
| Multi-tenancy | Limited (topic-level ACLs) | Native (namespaces, tenants) | Limited |
| Latency | Low (2–10ms p99) | Low (5–15ms p99) | Ultra-low (<2ms p99) |
| Ecosystem | Largest (Connect, Streams, ksqlDB) | Growing (Functions, IO connectors) | Kafka-compatible ecosystem |
| Ops Complexity | Moderate | High (Kafka + BookKeeper + ZK) | Low (single binary) |
| Best For | General-purpose streaming at scale | Multi-tenant, geo-replicated workloads | Low-latency, simple operations |
Our recommendation: Choose Kafka for the broadest ecosystem and battle-tested reliability. Consider Redpanda for operational simplicity and ultra-low latency. Consider Pulsar for natively multi-tenant or geo-replicated architectures.
Conclusion
Apache Kafka remains the industry standard for real-time event streaming, and KRaft mode has made it significantly easier to operate. In this guide, we covered the full journey — from a 3-broker KRaft cluster with Docker Compose, to production-quality producers and consumers in Python and Java, to Schema Registry for data contracts and Kafka Connect for database streaming.
Key takeaways:
- Use KRaft mode — ZooKeeper is deprecated
- Use confluent-kafka for Python clients — built on librdkafka
- Enable idempotent producers and manual consumer commits
- Adopt Schema Registry early for schema evolution
- Monitor consumer lag and under-replicated partitions
At HexoByte Solutions, we help organizations design and implement event-driven architectures that scale. Whether you are migrating from a monolith or building a greenfield streaming platform, our team can guide you from architecture to production.