Debezium Kafka CDC
- Data, AI & Analytics
Debezium Kafka CDC
Traditional database synchronization relies on polling—repeatedly querying databases to detect changes. This approach consumes resources, increases latency, and can miss rapid updates. Debezium Kafka CDC offers a superior alternative using Change Data Capture to monitor database changes in real-time without impacting performance.

The Database Synchronization Problem
Running SELECT queries every few seconds across millions of records wastes resources. Each query consumes CPU cycles, locks tables, and adds network overhead. For high-traffic systems processing thousands of events per minute, polling becomes unsustainable.
Most developers have written code like this:
|
1 2 3 4 5 |
while True: new_records = db.query("SELECT * FROM orders WHERE updated_at > ?", last_check) process_records(new_records) time.sleep(5) |
This pattern creates constant database load, introduces polling delays, misses changes between intervals, and struggles to scale across multiple tables.
Debezium Kafka CDC: What is Change Data Capture?
Change Data Capture monitors and captures database modifications in real-time. Instead of polling, CDC reads the database transaction log—the same mechanism databases use for ACID properties and replication.
Every database maintains a transaction log. MySQL has the binary log (binlog), PostgreSQL uses Write-Ahead Logging (WAL), and MongoDB has the oplog. CDC reads these logs and converts changes into events, eliminating query overhead while capturing every modification as it happens.
The Debezium Kafka CDC Stack
Debezium: The CDC Engine
Debezium is an open-source platform for change data capture. It provides connectors for MySQL, PostgreSQL, MongoDB, SQL Server, Oracle, and Cassandra. Connectors monitor transaction logs continuously, converting each row-level change into structured events containing operation type, before and after values, timestamps, and source metadata.
Apache Kafka: The Streaming Platform
Kafka provides distributed messaging infrastructure with durable event persistence, high throughput handling millions of events per second, fault-tolerant architecture, and independent producer-consumer operations.
Debezium Kafka CDC Architecture
Your source database generates changes during normal operations. Debezium connectors read transaction logs and convert changes to events. Events publish to Kafka topics organized by database and table. Consumer applications subscribe to topics and process events. Processed data reaches target systems—databases, search indexes, caches, or data warehouses.
Setting Up Debezium Kafka CDC
Docker Infrastructure
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 |
version: '3.8' services: zookeeper: image: quay.io/debezium/zookeeper:3.2 ports: - 2181:2181 kafka: image: quay.io/debezium/kafka:3.2 ports: - 9092:9092 environment: - KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 depends_on: - zookeeper debezium-connect: image: quay.io/debezium/connect:3.2 ports: - 8083:8083 environment: - BOOTSTRAP_SERVERS=kafka:9092 - GROUP_ID=debezium depends_on: - kafka |
Configuring the Debezium Connector
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
{ "name": "mysql-orders-connector", "config": { "connector.class": "io.debezium.connector.mysql.MySqlConnector", "database.hostname": "mysql-server", "database.port": "3306", "database.user": "debezium", "database.password": "dbz_password", "database.server.id": "184054", "topic.prefix": "ecommerce", "database.include.list": "orders_db", "table.include.list": "orders_db.orders,orders_db.customers", "snapshot.mode": "initial" } } |
ecommerce.orders_db.orders. Snapshot mode initial takes a full snapshot before streaming changes, while schema_only captures just structure.
Debezium Kafka CDC Event Structure
When an order inserts:
|
1 2 3 |
INSERT INTO orders (order_id, customer_id, total_amount, order_status) VALUES ('ORD12345', 1001, 299.99, 'pending'); |
Debezium produces:
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 |
{ "payload": { "before": null, "after": { "order_id": "ORD12345", "customer_id": 1001, "total_amount": 299.99, "order_status": "pending" }, "source": { "connector": "mysql", "db": "orders_db", "table": "orders", "ts_ms": 1697234567890 }, "op": "c", "ts_ms": 1697234567892 } } |
Operations: c for insert, u for update, d for delete. The before field shows original values, after shows new values.
Building a Debezium Kafka CDC Consumer
Basic Consumer
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 |
import json from kafka import KafkaConsumer from sqlalchemy import create_engine class DebeziumCDCConsumer: def __init__(self, kafka_servers, topic_pattern, db_connection): self.consumer = KafkaConsumer( bootstrap_servers=kafka_servers, group_id='cdc-processor', auto_offset_reset='earliest', value_deserializer=lambda m: json.loads(m.decode('utf-8')) ) self.consumer.subscribe(pattern=topic_pattern) self.db_engine = create_engine(db_connection) def start(self): for message in self.consumer: self.process_message(message.value) def process_message(self, message): payload = message.get('payload', {}) operation = payload.get('op') if operation == 'c': self.handle_insert(payload) elif operation == 'u': self.handle_update(payload) elif operation == 'd': self.handle_delete(payload) def handle_insert(self, payload): after = payload.get('after', {}) # Process insert print(f"New record: {after}") def handle_update(self, payload): after = payload.get('after', {}) # Process update print(f"Updated: {after}") def handle_delete(self, payload): before = payload.get('before', {}) # Process delete print(f"Deleted: {before}") |
Multi-Threaded Consumer for Performance
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 |
import threading from queue import Queue class MultiThreadedCDCProcessor: def __init__(self, config): self.config = config self.num_workers = config.get('num_workers', 3) self.event_queue = Queue(maxsize=1000) self.running = True def start(self): # Start consumer threads for i in range(self.num_workers): thread = threading.Thread(target=self._consumer_worker, args=(i,)) thread.start() # Start processor thread threading.Thread(target=self._processor_worker).start() def _consumer_worker(self, worker_id): consumer = KafkaConsumer( bootstrap_servers=self.config['kafka']['servers'], group_id=self.config['kafka']['group_id'], value_deserializer=lambda m: json.loads(m.decode('utf-8')) ) consumer.subscribe(pattern=self.config['topic_pattern']) while self.running: messages = consumer.poll(timeout_ms=1000, max_records=100) for topic_partition, records in messages.items(): for record in records: event = self._extract_event(record.value) if event: self.event_queue.put(event) def _processor_worker(self): db_engine = create_engine(self.config['database']['connection']) batch = [] batch_size = 50 while self.running: if not self.event_queue.empty(): batch.append(self.event_queue.get()) if len(batch) >= batch_size: self._process_batch(batch, db_engine) batch.clear() def _extract_event(self, message): payload = message.get('payload', {}) if payload.get('op') not in ['c', 'u']: return None return { 'operation': payload.get('op'), 'data': payload.get('after', {}), 'timestamp': payload.get('ts_ms') } |
Event Filtering and Transformation
Filter by Business Rules
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
class EventFilter: def should_process(self, event): data = event['data'] # Only orders above minimum amount if data.get('total_amount', 0) < 10.0: return False # Skip inactive records if data.get('status') == 'inactive': return False return True |
Transform Events
|
1 2 3 4 5 6 7 8 9 10 11 12 |
class EventTransformer: def transform(self, event): data = event['data'] return { 'order_id': data.get('order_id'), 'customer_id': data.get('customer_id'), 'amount': float(data.get('total_amount', 0)), 'status': data.get('order_status', 'unknown'), 'processed_at': datetime.now().isoformat() } |
Debezium Kafka CDC Configuration
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
{ "kafka": { "servers": ["localhost:9092"], "group_id": "cdc-consumer", "auto_offset_reset": "latest" }, "topic_pattern": "ecommerce\\.orders_db\\..*", "num_workers": 3, "batch_size": 100, "database": { "connection": "postgresql://user:pass@localhost/target_db" } } |
Benefits of Debezium Kafka CDC
Zero Database Overhead – CDC reads transaction logs that databases maintain anyway. No additional query load on source databases.
Real-Time Processing – Changes appear in Kafka within milliseconds. Systems react to modifications immediately.
Guaranteed Delivery – Kafka persists events durably. Consumers resume from last offset after downtime.
Horizontal Scalability – Add partitions and consumers to scale throughput from thousands to millions of events per second.
Complete Audit Trail – Every change becomes a permanent Kafka event for compliance and analysis.
Common Debezium Kafka CDC Use Cases
Real-time data replication synchronizes databases across regions. Cache invalidation updates Redis when source data changes. Search index synchronization keeps Elasticsearch current. Event-driven microservices trigger on data changes. Data warehouses receive continuous updates for near-real-time analytics. Audit systems capture modifications for compliance.
Getting Started
Prerequisites
|
1 2 3 4 5 6 |
# Start infrastructure docker-compose up -d # Install dependencies pip install kafka-python sqlalchemy pandas |
Register Connector
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
curl -X POST http://localhost:8083/connectors \ -H "Content-Type: application/json" \ -d '{ "name": "mysql-connector", "config": { "connector.class": "io.debezium.connector.mysql.MySqlConnector", "database.hostname": "mysql-host", "database.port": "3306", "database.user": "debezium", "database.password": "password", "topic.prefix": "myapp", "database.include.list": "production_db" } }' |
Run Consumer
|
1 2 3 4 5 6 7 |
consumer = DebeziumCDCConsumer( kafka_servers=['localhost:9092'], topic_pattern='myapp\\.production_db\\..*', db_connection='postgresql://user:pass@localhost/target' ) consumer.start() |
Best Practices for Debezium Kafka CDC
Use separate topics per table for granular control. Implement unique consumer group IDs for independent pipelines. Add retry logic and dead-letter queues for failed events. Monitor consumer lag and processing rates. Handle schema changes with Kafka Schema Registry. Secure production with SSL and SASL authentication.
Monitoring Debezium Kafka CDC
Check Consumer Lag
|
1 2 3 |
kafka-consumer-groups --bootstrap-server localhost:9092 \ --describe --group cdc-consumer |
Verify Connector Health
|
1 2 3 4 5 6 7 |
import requests def check_health(connector_name): response = requests.get(f'http://localhost:8083/connectors/{connector_name}/status') status = response.json() return status['connector']['state'] == 'RUNNING' |
Advanced Patterns
Multi-Target Replication
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
class MultiTargetProcessor: def __init__(self): self.targets = { 'postgres': PostgresTarget(), 'elasticsearch': ElasticsearchTarget(), 'redis': RedisTarget() } def process(self, event): for name, target in self.targets.items(): try: target.apply_change(event) except Exception as e: print(f"Error with {name}: {e}") |
Dead Letter Queue
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
class CDCWithDLQ: def __init__(self, dlq_topic): self.dlq_producer = KafkaProducer(bootstrap_servers=['localhost:9092']) self.dlq_topic = dlq_topic self.max_retries = 3 def process_with_retry(self, event): for attempt in range(self.max_retries): try: self.process_event(event) return True except Exception as e: if attempt == self.max_retries - 1: self.send_to_dlq(event, str(e)) return False |
Troubleshooting
Database Permissions
|
1 2 3 |
GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'debezium'@'%'; |
Enable Binary Logging
|
1 2 3 |
SHOW VARIABLES LIKE 'log_bin'; SHOW VARIABLES LIKE 'binlog_format'; |
High Consumer Lag
Scale consumers horizontally. Increase batch size. Optimize processing logic. Add Kafka partitions for parallel processing.
Conclusion
Debezium Kafka CDC transforms database change management by eliminating polling and providing real-time event streams. The technology delivers zero source database impact, true real-time processing, guaranteed delivery, horizontal scalability, and complete audit trails.
Whether building microservices, data warehouses, or event-driven architectures, Debezium Kafka CDC provides reliable, scalable data integration. Start with a simple connector, process events according to business logic, and scale as requirements grow.
Resources
Related content
Auriga: Leveling Up for Enterprise Growth!
Auriga’s journey began in 2010 crafting products for India’s
