Change Data Capture with MongoDB Change Streams

by Marek Chodak, Backend Chapter Lead


What is Change Data Capture?


Change Data Capture (CDC) is a concept related to identifying and recording data changes occurring in data stores in real time. It can be used for synchronization and ensuring consistency of data across applications and systems.


An application that implements this pattern very often relies on the internal log of a database called Write-Ahead Log(WAL). It is a log that contains all the changes made to the database before they are applied to data files with tables and indexes. CDC implementation will subscribe to changes made in the log and expose them to downstream applications. Doing it this way instead of querying for changes in tables/collections is not causing additional traffic to the database, and that could impact its performance.


Some usages of CDC Usages

  • Application Integration CDC can expose changes made to entities in one application to a downstream one that might rely on this data to perform its functionalities.

  • Data Analytics CDC can be used to send data changes to data analytic services.

  • Auditing To satisfy data compliance requirements some applications need to store all changes made to some of their data. CDC can be a very good solution to this problem.

  • Read Models CDC can also be a very good tool to feed data in a system that utilizes the CQRS pattern. It can send all the changes made by the Command part of the system to the Query part to prepare the data for reading.

  • Avoiding Distributed Transaction It is often the case in a distributed system that in response to some kind of trigger, being a REST call or receiving an event, an application needs to make a change to the database and send a message using Kafka or any other messaging service. The problem that can happen here is that if between committing the transaction to the database and sending the message to Kafka the application fails, or the connection to Kafka is lost, the database will be updated but the message will not be sent. This might leave your system in an inconsistent state. Using CDC we can avoid this problem because storing a change in the database and sending a message is decoupled.

MongoDB Change Streams


One of the most popular platforms that provide CDC services is Debezium. It can operate with many different databases but the downside is that it adds additional complexity to your infrastructure, deployments, and configuration. If you are working with a MongoDB database it provides a built-in solution that can be used to implement the CDC pattern.


MongoDB Change Streams allow applications to subscribe to all data changes on a single collection, a database, or an entire deployment, and immediately react to them. It uses “oplog” as the data source which is the MongoDB implementation of WAL. Change streams use the aggregation framework, and because of this, applications can also filter for specific changes or transform the notifications at will.


Example of Change Streams flow

Implementation with Java

To see how it works we will subscribe to MongoDB change streams using Java leveraging Spring Boot Data.


First of all, we need to bring up our MongoDB database. To do so we will use Docker and its docker-compose functionality. Here is the definition that will do the job: docker-compose.yml

version: '3.8'

services:
  mongo:
    image: mongo:4.4.3
    container_name: 'mongo'
    environment:
      - MONGO_INITDB_DATABASE=customer
      - MONGO_INITDB_ROOT_USERNAME=root
      - MONGO_INITDB_ROOT_PASSWORD=root123
      - MONGO_REPLICA_SET_NAME=rs0
    command: --replSet rs0
    healthcheck:
      test: test $$(echo "rs.initiate().ok || rs.status().ok" | mongo -u root -p root123 --quiet) -eq 1
      interval: 10s
      start_period: 30s
    restart: always
    volumes:
      - ./mongo-volume:/data/db
    ports:
      - '27017-27019:27017-27019'

What we can see here is that MongoDB needs to be in a replica set mode. This is a requirement of MongoDB Change Streams.


To generate the base application in Java with Spring Boot we can use: Spring Initializr.


As dependencies, we need to add Spring Data MongoDB.

Example of the application config:

After we download, unzip and open the project we can start developing our application.

As our entity that will be stored in the database collection, we will use a basic Customer model that will contain two fields, firstName, and lastName. We can define it as a record:

public record Customer(String firstName, String lastName) {
}

Next, let's define a class that will subscribe to the Change Stream:

@Component
public class MongoDbChangeStreamsConfig implements ApplicationListener<ApplicationReadyEvent> {   @Override
   public void onApplicationEvent(ApplicationReadyEvent event) {
   }
}

This class implements ApplicationListener<ApplicationReadyEvent> interface which will cause the onApplicationEvent method to run when the application starts. In this method, we will subscribe to Change Streams.


We need to define the configuration for the MongoDB connection client:

CodecRegistry pojoCodecRegistry = fromProviders(PojoCodecProvider.builder().automatic(true).build());
CodecRegistry codecRegistry = fromRegistries(MongoClientSettings.getDefaultCodecRegistry(), pojoCodecRegistry);String mongoUri = "mongodb://root:root123@localhost:27017/customer?authSource=admin";
ConnectionString connectionString = new ConnectionString(mongoUri);MongoClientSettings clientSettings = MongoClientSettings.builder()
        .applyConnectionString(connectionString)
        .codecRegistry(codecRegistry)
        .build();

First, there is a codec registry configuration, which enables the deserialization of objects sent by MongoDB. Then there is ConnectionString config and finally combining both into a Settings object.


After that, we can connect to the database and subscribe for the changes:

try (MongoClient mongoClient = MongoClients.create(clientSettings))
    MongoDatabase db = mongoClient.getDatabase(connectionString.getDatabase());
    MongoCollection<Customer> grades = db.getCollection("customers", Customer.class);
    grades.watch().forEach(printEvent());
}

First, we define which database and collection changes we want to listen to. We need to also provide to which class definition the received objects should be deserialized.


Finally the grades.watch().forEach(printEvent()) will subscribe to the change streams and wait for any changes on the collection. When the change comes the object that represents it will be printed to the console. The definition of the printEvent() method is as follows:

private static Consumer<ChangeStreamDocument<Customer>> printEvent() {
    return System.out::println;
}

Let's start our database using the docker-compose, command:

docker-compose up -d

Then start our application. It should block and wait for changes from MongoDB.

Next, let's log in to the MongoDB console:

mongo -u root -p root123

And insert one customer into the collection:

use customer
db.customers.insert({firstName: "Jhon", lastName: "Doe"})

Then in our application logs, we should see:

ChangeStreamDocument{ operationType=insert, resumeToken={"_data": "8262BEF66F000000012B022C0100296E5A10049F8669B1E0A0468FBE3D71E1310592CD46645F6964006462BEF66FC510FC418314423F0004"}, namespace=customer.customers, destinationNamespace=null, fullDocument=Customer[firstName=Jhon, lastName=Doe], documentKey={"_id": {"$oid": "62bef66fc510fc418314423f"}}, clusterTime=Timestamp{value=7115395417893765121, seconds=1656682095, inc=1}, updateDescription=null, txnNumber=null, lsid=null}

Here we can see all the data regarding the document that was changed. Which database and collection was it in, and what was the full document after the change.


Resuming the subscription

There is something very interesting in the payload of the change event, resumeToken. In case of application restart, redeployment, or crash this token lets us resume our subscription at the place where we left off. We can use it as follows, replace the:

customers.watch().forEach(printEvent());

with:

BsonValue value = new BsonString("<resume_token>");
BsonDocument bsonDocument = new BsonDocument("_data", value);
customers.watch().resumeAfter(bsonDocument).forEach(printEvent());

Where in the BsonString constructor you will put the resumeToken from the log.

Next, let's stop the application and go to the mongo console and insert two more customers:

db.customers.insert({firstName: "Jake", lastName: "Smith"})
db.customers.insert({firstName: "Anna", lastName: "Kowalski"})

After you start your application you should automatically see log entries for both of the newly added customers.


Listening to specific operation types

By default, the subscription will listen for all the types of operations, but if for example, you want to listen to only inserts and deletes you can do it by defining a pipeline that you pass to the watch() function:

List<Bson> pipeline = singletonList(match(in("operationType", asList("insert", "delete"))));
customers.watch(pipeline).resumeAfter(bsonDocument)
.forEach(printEvent());

Next steps

What can we do now?


Instead of just printing the change we should probably do something meaningful with it. What that should be, depends on your requirements. Maybe you will take the document from the change event, transform it, and store it as an audit log. Or maybe we will publish the change on some kind of topic or queue so other applications will be notified that the data they are interested in has changed. There are a lot of possibilities.


Final Solution:

@Component
public class MongoDbChangeStreamsConfig implements ApplicationListener<ApplicationReadyEvent> {
    @Override
    public void onApplicationEvent(ApplicationReadyEvent event) {
        CodecRegistry pojoCodecRegistry = fromProviders(PojoCodecProvider.builder().automatic(true).build());
        CodecRegistry codecRegistry = fromRegistries(MongoClientSettings.getDefaultCodecRegistry(), pojoCodecRegistry);
        String mongoUri = "mongodb://root:root123@localhost:27017/customer?authSource=admin";
        ConnectionString connectionString = new ConnectionString(mongoUri);
        MongoClientSettings clientSettings = MongoClientSettings.builder()
                .applyConnectionString(connectionString)
                .codecRegistry(codecRegistry)
                .build();
        try (MongoClient mongoClient = MongoClients.create(clientSettings)){
            MongoDatabase db = mongoClient.getDatabase(connectionString.getDatabase());
            MongoCollection<Customer> customers = db.getCollection("customers", Customer.class);
            BsonValue value = new BsonString("RESUME_TOKEN");
            BsonDocument bsonDocument = new BsonDocument("_data", value);
            List<Bson> pipeline = singletonList(match(in("operationType", asList("insert", "delete"))));
            customers.watch(pipeline).resumeAfter(bsonDocument).forEach(printEvent());
        }
    }
    private static Consumer<ChangeStreamDocument<Customer>> printEvent() {
        return System.out::println;
    }
}