Implement CDC In DLT Pipeline: Change Data Capture

Importance of Change Data Capture (CDC)



Change Data Capture (CDC) is the process that captures the changes in records made to transactional Database (Mysql, Postgre) or Data Warehouse. CDC captures operations like data deletion, append and updating, typically as a stream to re-materialize the table in external systems.

CDC enables incremental loading while eliminating the need for bulk load updating.

By capturing CDC events, we can re-materialize the source table as Delta Table in our Lakehouse and start running Analysis on top of it (Data Science, BI), merging the data with external system.


Capturing CDC



A variety of **CDC tools** are available. One of the open source leader solution is Debezium, but other implementation exists simplifying the datasource, such as Fivetran, Qlik Replicate, Streamset, Talend, Oracle GoldenGate, AWS DMS.

In this demo we are using CDC data coming from an external system like Debezium or DMS.

Debezium takes care of capturing every changed row. It typically sends the history of data changes to Kafka logs or save them as file. To simplify the demo, we'll consider that our external CDC system is up and running and saving the CDC as JSON file in our blob storage (S3, ADLS, GCS).

Our job is to CDC informations from the `customer` table (json format), making sure they're correct, and then materializing the customer table in our Lakehouse.

Materializing table from CDC events with Delta Live Table



In this example, we'll synchronize data from the Customers table in our MySQL database.

- We extract the changes from our transactional database using Debezium or any other tool and save them in a cloud object storage (S3 folder, ADLS, GCS).
- Using Autoloader we incrementally load the messages from cloud object storage, and stores the raw messages them in the `customers_cdc`. Autoloader will take care of infering the schema and handling schema evolution for us.
- Then we'll add a view `customers_cdc_clean` to check the quality of our data, using expectation, and then build dashboards to track data quality. As example the ID should never be null as we'll use it to run our upsert operations.
- Finally we perform the APPLY CHANGES INTO (doing the upserts) on the cleaned cdc data to apply the changes to the final `customers` table
- Extra: we'll also see how DLT can simply create Slowly Changing Dimention of type 2 (SCD2) to keep track of all the changes

Here is the flow we'll implement, consuming CDC data from an external database. Note that the incoming could be any format, including message queue such as Kafka.

Accessing the DLT pipeline



Your pipeline has been created! You can directly access the Delta Live Table Pipeline for CDC.

CDC input from tools like Debezium



For each change, we receive a JSON message containing all the fields of the row being updated (customer name, email, address...). In addition, we have extra metadata informations including:

- operation: an operation code, typically (DELETE, APPEND, UPDATE)
- operation_date: the date and timestamp for the record came for each operation action

Tools like Debezium can produce more advanced output such as the row value before the change, but we'll exclude them for the clarity of the demo

1/ Ingesting data with Autoloader





Our first step is to ingest the data from the cloud storage. Again, this could be from any other source like (message queue etc).

This can be challenging for multiple reason. We have to:

- operate at scale, potentially ingesting millions of small files
- infer schema and json type
- handle bad record with incorrect json schema
- take care of schema evolution (ex: new column in the customer table)

Databricks Autoloader solves all these challenges out of the box.

2/ Cleanup & expectations to track data quality





Next, we'll add expectations to controle data quality. To do so, we'll create a view (we don't need to duplicate the data) and check the following conditions:

- ID must never be null
- the cdc operation type must be valid
- the json must have been properly read by the autoloader

If one of these conditions isn't respected, we'll drop the row.

These expectations metrics are saved as technical tables and can then be re-used with Databricks SQL to track data quality over time.

3/ Materializing the silver table with APPLY CHANGES





The silver `customer` table will contains the most up to date view. It'll be a replicate of the original table.

This is non trivial to implement manually. You need to consider things like data deduplication to keep the most recent row.

Thanksfully Delta Live Table solve theses challenges out of the box with the `APPLY CHANGE` operation

4/ Slowly Changing Dimension of type 2 (SCD2)



Why SCD2



It's often required to create a table tracking all the changes resulting from APPEND, UPDATE and DELETE:

* History: you want to keep an history of all the changes from your table
* Traceability: you want to see which operation

SCD2 with DLT



Delta support CDF (Change Data Flow) and `table_change` can be used to query the table modification in a SQL/python. However, CDF main use-case is to capture changes in a pipeline and not create a full view of the table changes from the begining.

Things get especially complex to implement if you have out of order events. If you need to sequence your changes by a timestamp and receive a modification which happened in the past, then you not only need to append a new entry in your SCD table, but also update the previous entries.

Delta Live Table makes all this logic super simple and let you create a separate table containing all the modifications, from the begining of the time. This table can then be used at scale, with specific partitions / zorder columns if required. Out of order fields will be handled out of the box based on the _sequence_by

To create a SCD2 table, all we have to do is leverage the `APPLY CHANGES` with the extra option: `STORED AS {SCD TYPE 1 | SCD TYPE 2 [WITH {TIMESTAMP|VERSION}}]`

*Note: you can also limit the columns being tracked with the option: `TRACK HISTORY ON {columnList |* EXCEPT(exceptColumnList)}*

Conclusion


We now have our DLT pipeline up & ready! Our `customers` table is materialize and we can start building BI report to analyze and improve our business. It also open the door to Data Science and ML use-cases such as customer churn, segmentation etc.

Monitoring your data quality metrics with Delta Live Table





Delta Live Tables tracks all your data quality metrics. You can leverage the expecations directly as SQL table with Databricks SQL to track your expectation metrics and send alerts as required.

This let you build custom dashboards to track those metrics.

Data Quality Dashboard

For more detail on how to analyse Expectation metrics, open the [03-Retail_DLT_CDC_Monitoring]($./03-Retail_DLT_CDC_Monitoring) notebook.