CDC pipeline using Debezium Server, MySQL, and Amazon Kinesis

Noufal Rijal
8 min readJul 11, 2021

--

Whenever we work with microservices, the basic principle that revolves around is to have Microservice-specific Databases.

Suppose we have a Producer microservice and a Consumer microservice, whenever a new addition happens to the Producer microservices it should be reflected with the Consumer microservice in real-time.

So there should be a way of capturing the changes done to the source data system and propagating them to target data systems in a reliable and scalable manner in real-time.

And this paves the road for Change Data Capture.

This article will discuss how to capture the changes from the source database and propagate them to the target database using MySQL, Debezium Server, and Amazon Kinesis.

This is how it works!

Understanding CDC, MySQL Binlogs, Debezium Server and Amazon Kinesis

So before we jump into the implementation of this system we will need to understand a few concepts.

Change Data Capture

Change Data Capture (CDC) tracks data changes (usually close to real-time). CDC can be implemented for various tasks such as auditing, copying data to another system, or processing (and reacting to) events. In MySQL, the easiest and probably most efficient way to track data changes is to use binary logs.

MySQL Binlogs

The binary log is a set of log files that contain information about data modifications made to a MySQL server instance.

Debezium

  • Debezium is a set of distributed services to capture changes in our databases so that the applications can see those changes and respond to them
  • Debezium records all row-level changes within each database table in a change event stream, and applications simply read these streams to see the change events in the same order in which they occurred

Debezium Server

  • The Debezium server is a configurable, ready-to-use application that streams change events from a source database to a variety of messaging infrastructures
  • Change events can be serialized to different formats like JSON or Apache Avro and then will be sent to one of a variety of messaging infrastructures such as Amazon Kinesis, Google Cloud Pub/Sub, or Apache Pulsar
  • For this POC we will be using Amazon Kinesis as the data streaming means

Amazon Kinesis

  • Amazon Kinesis Data Streams (KDS) is a massively scalable and durable real-time data streaming service
  • We can create data-processing applications, known as Kinesis Data Streams applications

Configuration Steps

Let’s dive into the tech stack configurations that is needed for this POC. They are as below:

1. MySQL

Before getting started with CDC in MySQL, we need to enable binary logging. Steps for enabling binary logging :

Step 1: Checking the Database Status

The first step is to check if the log-bin option is turned on or not and what binary log format is used. MySQL provides three types of bin-log formats: ROW, MIXED, and STATEMENT.

SHOW VARIABLES LIKE 'log_bin'; -- Value should be ON
SHOW VARIABLES like '%binlog_format%'; -- Value should be set to ROW

Step 2: Updating the MySQL Configuration File

If the log-bin value is OFF from the above query, we need to update the MySQL configuration file (my.conf) by adding the below lines to the [mysqld] section.

log-bin=bin.log
log-bin-index=bin-log.index
max_binlog_size=100M
binlog_format=row
socket=mysql.sock

Once Step 2 is completed, confirm the changes using the queries in Step 1.

With the above steps, we are all set with the MySQL configurations for CDC.

2. Installing and Configuring Debezium Server

Step 1. Install Java

Make sure that you have Java installed, if not install Java.

apt-get install openjdk-8-jre

Step 2. Install Debezium Server

  • For making this POC we will be using the Debezium server version 1.5.1.Final, download the version using the below command:
wget -O debezium.tar.gz
"https://repo1.maven.org/maven2/io/debezium/debezium-server-dist/1.5.1.Final/debezium-server-dist-1.5.1.Final.tar.gz"
  • Unzip the downloaded content to the disk
tar -xf debezium.tar.gz 
  • A directory named debezium-server will be created with the following contents:

Step 3. Configuring the Debezium Server

  • Create a config file called application.properties in the folder conf/
touch conf/application.properties
  • Create a new folder called data and add a new file called offsets.dat
mkdir data
touch data/offsets.dat
  • The new folder structure will be as shown below
  • Now in the application.properties file that we created in the previous step, add the below configurations:
debezium.sink.type=kinesis
debezium.sink.kinesis.region=us-west-2
debezium.sink.kinesis.credentials.profile=default
debezium.source.connector.class=io.debezium.connector.mysql.MySqlConnector
debezium.source.offset.storage.file.filename=data/offsets.dat
debezium.source.offset.flush.interval.ms=0
debezium.source.database.hostname=localhost
debezium.source.database.port=3306
debezium.source.database.user=root
debezium.source.database.password=root
debezium.source.database.server.name=debezium-tutorial
debezium.source.schema.include.list=TestDB
debezium.source.table.include.list=TestDB.Orders

debezium.source.database.history=io.debezium.relational.history.FileDatabaseHistory
debezium.source.database.history.file.filename=history.dat

Note:

  1. debezium.sink.type: is for the type of messaging or streaming partner that we use to stream the data. For this POC we are using Amazon Kinesis.
  2. debezium.sink.kinesis.region: is the region to which the Kinesis stream is configured with.
  3. debezium.sink.kinesis.credentials.profile: is to mention the profile details for which the Kinesis stream is created (only used when we have multiple AWS profiles configured in our system, else it will be the default).
  4. debezium.source.connector.class: specifies the type of database management system from which the changes are captured. For this POC we are using a MySQL connector. Connectors for other database management systems are available.
  5. debezium.source.offset.storage.file.filename: is the file in which the connector offsets are stored.
  6. debezium.source.database.dbname: is the name of the database whose changes are to be captured.
  7. debezium.source.database.server.name: is the name of the running debezium server, mainly used while creating streams in Amazon Kinesis.
  8. debezium.source.table.include.list: this is the list of table names whose changes are to be captured separated by commas. If this configuration is not provided we need to create streams for all individual tables present within the Database for which the changes are to be captured. The names of the tables should be provided as database_name.table_name . In order to capture the changes from multiple tables the names of the tables have to be provided by separating with commas. For example database_name.table_name1, database_name.table_name2, ......
  9. debezium.snapshot.new.tables = parallel: this configuration is used to take snapshots of the newly created tables after the debezium server is configured. If this is not provided then that tables’ changes won’t be captured/snapshotted.
  10. debezium.source.tombstones.on.delete=false: This configuration is mainly used to capture the changes made by DELETE queries. Usually, a delete operation is represented by a delete event and a subsequent tombstone event, which causes the server to exit unexpectedly.
  11. debezium.source.database.history: Some of the connectors (e.g MySQL, SQL Server, Db2, Oracle) monitor the database schema evolution over time and store the data in database history. io.debezium.relational.history.KafkaDatabaseHistory is the default value for Kafka bases systems. io.debezium.relational.history.FileDatabaseHistory is used for non-Kafka deployments.
  12. debezium.source.database.history.file.filename: The name and location of the file to which FileDatabaseHistory persists its data.

3. Setting up Amazon Kinesis

Step 1. Creating Kinesis Data Streams

We need to create Data Streams for two purposes, they are

  1. Stream to identify the debezium server which sent the messages, in the same name as provided in debezium.source.database.server.name
  2. Streams to capture the changes for the individual tables, ie: we have to create Kinesis streams for all the tables listed in debezium.source.table.include.list , the format for creating the streams are:
<debezium.source.database.server.name>        -- Stream for Server
<debezium.source.database.dbname>.<tablename> -- Stream for tables

for example, based on our application.properties file

debezium-tutorial                             -- Stream for Server           
debezium-tutorial.TestDB.Events -- Stream for table

Steps to create a Kinesis Data Stream

  1. Go to the Amazon Kinesis page and click Create data stream
  2. Enter the data stream name
  3. Add the required number of shards, for out POC 1 shard is enough
  4. Click create data stream

Similarly, create the necessary streams for the tables provided in debezium.source.table.include.list

Step 2. Lambda function to read the streaming data

Once the data reaches the Kinesis streams we can process it and can route it to the respective target points.

Here we will be writing a lambda function that will get triggered based on incoming data streams.

Steps to create the lambda function:

  1. Go to the AWS Lambda page and click Create function
  2. Select “Auther from scratch” and provide the function name and select the Runtime as Python 3.8
  3. Click Create function

Configuring the roles created

  1. Once lambda function is created go to the configurations tab on the lambda function page
  2. Choose the Permission tab where we can see the Role that we have created

3. Click on the Role name, this will open up the role’s summary page

4. Click attach policies and add “AmazonKinesisFullAccess”.

Add Triggers

  1. Go to the configurations tab on the lambda function page
  2. Choose the Triggers tab where we can add new triggers
  3. Select Kinesis from the dropdown and add the streams created by us earlier from the Kinesis streams dropdown

Add code to Lambda Function

lambda_function.py

import base64
import json
import boto3

# simple lambda function to retrieve the data from Kinesis Data Stream
# in order to send the data to the respective target DB's ..
# make the necessary connections

def lambda_handler(event, context):
kinesis = boto3.client('kinesis')
for record in event['Records']:
payload = base64.b64decode(record['kinesis']['data']).decode('utf-8')
data = json.loads(payload)
# do something with the data
print(data)
return 'Successfully processed {} records.'.format(len(event['Records']))

Now we are ready to go with the testing…

Testing the CDC Pipeline

  1. Create a database called TestDB
Create database TestDB;

2. Create a test table called Events

create table `TestDB`.`Events` (
`Id` int not null,
`EventName` varchar(150) null,
`EventDate` datetime null,
primary key(`Id`));

3. Start the Debezium server from the debezium-server folder using

sh run.sh

4. Once the server is up, we can start doing INSERT, UPDATE, DELETE operations to the table created :

Insert into Events values (1,'Test',CurDate());
Update Events set EventName = "updated test" where Id=1;
Delete from Events where Id=1;

We can check the data received at the Kinesis stream by checking the cloudwatch logs.

Stream data received for the INSERT operation

Similarly, we will be receiving data streams with op values as u and d respectively for UPDATE and DELETE operations.

The received data can then be routed to the required target databases based on our requirements.

References

Hope the content was informative! :)

Noufal Rijal

--

--

Noufal Rijal
Noufal Rijal

Written by Noufal Rijal

Software Engineer and a Data Science Enthusiast :)

Responses (2)