Kafka
Apache Kafka is a distributed event streaming platform used in the Greenfield Cluster for building real-time data pipelines and streaming applications.
Overview
The Greenfield Cluster includes Kafka with:
- Distributed Setup: 3-broker Kafka cluster
- Zookeeper Ensemble: 3-node Zookeeper for coordination
- Persistent Storage: Dedicated volumes for data retention
- High Throughput: Optimized for high-volume messaging
- Fault Tolerance: Built-in replication and failover
Architecture
Configuration
| Component | Replicas | Resources | Storage |
|---|---|---|---|
| Kafka Brokers | 3 | 500m CPU, 1Gi RAM | 10Gi each |
| Zookeeper | 3 | 250m CPU, 512Mi RAM | 5Gi each |
Usage
Connection Information
# Kafka bootstrap servers
kafka-0.kafka.greenfield.svc.cluster.local:9092
kafka-1.kafka.greenfield.svc.cluster.local:9092
kafka-2.kafka.greenfield.svc.cluster.local:9092
# Or use service
kafka.greenfield.svc.cluster.local:9092
# Zookeeper ensemble
zookeeper-0.zookeeper.greenfield.svc.cluster.local:2181
zookeeper-1.zookeeper.greenfield.svc.cluster.local:2181
zookeeper-2.zookeeper.greenfield.svc.cluster.local:2181
Producing Messages
Python (kafka-python)
from kafka import KafkaProducer
import json
producer = KafkaProducer(
bootstrap_servers=['kafka.greenfield.svc.cluster.local:9092'],
value_serializer=lambda v: json.dumps(v).encode('utf-8')
)
producer.send('events', {'type': 'user_signup', 'user_id': '123'})
producer.flush()
Node.js (kafkajs)
const { Kafka } = require('kafkajs');
const kafka = new Kafka({
clientId: 'my-app',
brokers: ['kafka.greenfield.svc.cluster.local:9092']
});
const producer = kafka.producer();
await producer.connect();
await producer.send({
topic: 'events',
messages: [
{ value: JSON.stringify({ type: 'user_signup', user_id: '123' }) }
]
});
Consuming Messages
Python
from kafka import KafkaConsumer
import json
consumer = KafkaConsumer(
'events',
bootstrap_servers=['kafka.greenfield.svc.cluster.local:9092'],
value_deserializer=lambda m: json.loads(m.decode('utf-8')),
group_id='my-consumer-group'
)
for message in consumer:
print(f"Received: {message.value}")
Node.js
const consumer = kafka.consumer({ groupId: 'my-consumer-group' });
await consumer.connect();
await consumer.subscribe({ topic: 'events' });
await consumer.run({
eachMessage: async ({ topic, partition, message }) => {
console.log('Received:', JSON.parse(message.value.toString()));
}
});
Using Kafka CLI
# Create topic
kubectl exec -it kafka-0 -n greenfield -- kafka-topics \
--create --topic my-topic \
--bootstrap-server localhost:9092 \
--partitions 3 \
--replication-factor 2
# List topics
kubectl exec -it kafka-0 -n greenfield -- kafka-topics \
--list --bootstrap-server localhost:9092
# Describe topic
kubectl exec -it kafka-0 -n greenfield -- kafka-topics \
--describe --topic my-topic \
--bootstrap-server localhost:9092
# Produce messages
kubectl exec -it kafka-0 -n greenfield -- kafka-console-producer \
--topic my-topic \
--bootstrap-server localhost:9092
# Consume messages
kubectl exec -it kafka-0 -n greenfield -- kafka-console-consumer \
--topic my-topic \
--from-beginning \
--bootstrap-server localhost:9092
Operations
Topic Management
# List consumer groups
kubectl exec kafka-0 -n greenfield -- kafka-consumer-groups \
--list --bootstrap-server localhost:9092
# Describe consumer group
kubectl exec kafka-0 -n greenfield -- kafka-consumer-groups \
--describe --group my-consumer-group \
--bootstrap-server localhost:9092
# Delete topic
kubectl exec kafka-0 -n greenfield -- kafka-topics \
--delete --topic my-topic \
--bootstrap-server localhost:9092
Monitoring
# Check broker logs
kubectl logs kafka-0 -n greenfield
# Check Zookeeper
kubectl exec zookeeper-0 -n greenfield -- zkCli.sh ls /brokers/ids
# Monitor lag
kubectl exec kafka-0 -n greenfield -- kafka-consumer-groups \
--describe --group my-group --bootstrap-server localhost:9092
Best Practices
- Partitioning Strategy: Choose partition keys wisely for distribution
- Replication Factor: Use replication factor of 3 for production
- Consumer Groups: Use consumer groups for load distribution
- Retention Policy: Configure appropriate retention based on use case
- Monitoring: Track consumer lag and broker health
- Idempotent Producers: Enable idempotence for exactly-once semantics
Common Use Cases
Event Sourcing
# Store events
producer.send('user-events', {
'event_type': 'user_created',
'user_id': '123',
'timestamp': '2024-01-01T12:00:00Z',
'data': {'name': 'John', 'email': 'john@example.com'}
})
Stream Processing
# Process events in real-time
for message in consumer:
event = message.value
if event['type'] == 'order_created':
process_order(event['order_id'])
Log Aggregation
# Centralized logging
producer.send('application-logs', {
'service': 'api',
'level': 'ERROR',
'message': 'Failed to process request',
'timestamp': '2024-01-01T12:00:00Z'
})