%md-sandbox ## Simple ETL with DLT DLT makes Data Engineering accessible for all. Just declare your transformations in SQL or Python, and DLT will handle the Data Engineering complexity for you. <img style="float:right" src="https://github.com/QuentinAmbard/databricks-demo/raw/main/product_demos/dlt-golden-demo-loan-1.png" width="700"/> **Accelerate ETL development** <br/> Enable analysts and data engineers to innovate rapidly with simple pipeline development and maintenance **Remove operational complexity** <br/> By automating complex administrative tasks and gaining broader visibility into pipeline operations **Trust your data** <br/> With built-in quality controls and quality monitoring to ensure accurate and useful BI, Data Science, and ML **Simplify batch and streaming** <br/> With self-optimization and auto-scaling data pipelines for batch or streaming processing ## Simple ingestion with Lakeflow Connect Lakeflow Connect offers built-in data ingestion connectors for popular SaaS applications, databases and file sources, such as Salesforce, Workday, and SQL Server to build incremental data pipelines at scale, fully integrated with Databricks. To give it a try, check our [Lakeflow Connect Product Tour](https://www.databricks.com/resources/demos/tours/platform/discover-databricks-lakeflow-connect-demo) ## Our DLT pipeline We'll be using as input a raw dataset containing information on our customers Loan and historical transactions. Our goal is to ingest this data in near real time and build table for our Analyst team while ensuring data quality. **Your DLT Pipeline is ready!** Your pipeline was started using this notebook and is <a dbdemos-pipeline-id="dlt-loans" href="/#joblist/pipelines/460f840c-9ecc-4d19-a661-f60fd3a88297">available here</a>. <!-- Collect usage data (view). Remove it to disable collection. View README for more details. --> <img width="1px" src="https://ppxrzfxige.execute-api.us-west-2.amazonaws.com/v1/analytics?category=data-engineering¬ebook=01-DLT-Loan-pipeline-SQL&demo_name=dlt-loans&event=VIEW">
%md Our datasets are coming from 3 different systems and saved under a cloud storage folder (S3/ADLS/GCS): * `loans/raw_transactions` (loans uploader here in every few minutes) * `loans/ref_accounting_treatment` (reference table, mostly static) * `loans/historical_loans` (loan from legacy system, new data added every week) Let's ingest this data incrementally, and then compute a couple of aggregates that we'll need for our final Dashboard to report our KPI.
Our datasets are coming from 3 different systems and saved under a cloud storage folder (S3/ADLS/GCS):
loans/raw_transactions
(loans uploader here in every few minutes)loans/ref_accounting_treatment
(reference table, mostly static)loans/historical_loans
(loan from legacy system, new data added every week)
Let's ingest this data incrementally, and then compute a couple of aggregates that we'll need for our final Dashboard to report our KPI.
-- %fs ls /Volumes/main__build/dbdemos_dlt_loan/raw_data/raw_transactions
%md-sandbox ## Bronze layer: incrementally ingest data leveraging Databricks Autoloader <img style="float: right; padding-left: 10px" src="https://github.com/QuentinAmbard/databricks-demo/raw/main/product_demos/dlt-golden-demo-loan-2.png" width="600"/> Our raw data is being sent to a blob storage. Autoloader simplify this ingestion, including schema inference, schema evolution while being able to scale to millions of incoming files. Autoloader is available in SQL using the `read_files` function and can be used with a variety of format (json, csv, avro...): For more detail on Autoloader, you can see `dbdemos.install('auto-loader')` #### STREAMING LIVE TABLE Defining tables as `STREAMING` will guarantee that you only consume new incoming data. Without `STREAMING`, you will scan and ingest all the data available at once. See the [documentation](https://docs.databricks.com/data-engineering/delta-live-tables/delta-live-tables-incremental-data.html) for more details
CREATE STREAMING TABLE raw_txs COMMENT "New raw loan data incrementally ingested from cloud object storage landing zone" AS SELECT * FROM STREAM read_files('/Volumes/main__build/dbdemos_dlt_loan/raw_data/raw_transactions', format => 'json', inferColumnTypes => true)
CREATE STREAMING TABLE ref_accounting_treatment COMMENT "Lookup mapping for accounting codes" AS SELECT * FROM STREAM delta.`/Volumes/main__build/dbdemos_dlt_loan/raw_data/ref_accounting_treatment`
-- as this is only refreshed at a weekly basis, we can lower the interval CREATE STREAMING TABLE raw_historical_loans TBLPROPERTIES ("pipelines.trigger.interval"="6 hour") COMMENT "Raw historical transactions" AS SELECT * FROM STREAM read_files('/Volumes/main__build/dbdemos_dlt_loan/raw_data/historical_loans', format => 'csv', inferColumnTypes => true)
%md-sandbox ## Silver layer: joining tables while ensuring data quality <img style="float: right; padding-left: 10px" src="https://github.com/QuentinAmbard/databricks-demo/raw/main/product_demos/dlt-golden-demo-loan-3.png" width="600"/> Once the bronze layer is defined, we'll create the sliver layers by Joining data. Note that bronze tables are referenced using the `LIVE` spacename. To consume only increment from the Bronze layer like `BZ_raw_txs`, we'll be using the `stream` keyworkd: `stream(LIVE.BZ_raw_txs)` Note that we don't have to worry about compactions, DLT handles that for us. #### Expectations By defining expectations (`CONSTRAINT <name> EXPECT <condition>`), you can enforce and track your data quality. See the [documentation](https://docs.databricks.com/data-engineering/delta-live-tables/delta-live-tables-expectations.html) for more details
CREATE STREAMING LIVE VIEW new_txs COMMENT "Livestream of new transactions" AS SELECT txs.*, ref.accounting_treatment as accounting_treatment FROM stream(raw_txs) txs INNER JOIN ref_accounting_treatment ref ON txs.accounting_treatment_id = ref.id
CREATE STREAMING TABLE cleaned_new_txs ( CONSTRAINT `Payments should be this year` EXPECT (next_payment_date > date('2020-12-31')), CONSTRAINT `Balance should be positive` EXPECT (balance > 0 AND arrears_balance > 0) ON VIOLATION DROP ROW, CONSTRAINT `Cost center must be specified` EXPECT (cost_center_code IS NOT NULL) ON VIOLATION FAIL UPDATE ) COMMENT "Livestream of new transactions, cleaned and compliant" AS SELECT * from STREAM(new_txs)
-- This is the inverse condition of the above statement to quarantine incorrect data for further analysis. CREATE STREAMING TABLE quarantine_bad_txs ( CONSTRAINT `Payments should be this year` EXPECT (next_payment_date <= date('2020-12-31')), CONSTRAINT `Balance should be positive` EXPECT (balance <= 0 OR arrears_balance <= 0) ON VIOLATION DROP ROW ) COMMENT "Incorrect transactions requiring human analysis" AS SELECT * from STREAM(new_txs)
CREATE MATERIALIZED VIEW historical_txs COMMENT "Historical loan transactions" AS SELECT l.*, ref.accounting_treatment as accounting_treatment FROM raw_historical_loans l INNER JOIN ref_accounting_treatment ref ON l.accounting_treatment_id = ref.id
%md-sandbox ## Gold layer <img style="float: right; padding-left: 10px" src="https://github.com/QuentinAmbard/databricks-demo/raw/main/product_demos/dlt-golden-demo-loan-4.png" width="600"/> Our last step is to materialize the Gold Layer. Because these tables will be requested at scale using a SQL Endpoint, we'll add Zorder at the table level to ensure faster queries using `pipelines.autoOptimize.zOrderCols`, and DLT will handle the rest.
CREATE MATERIALIZED VIEW total_loan_balances COMMENT "Combines historical and new loan data for unified rollup of loan balances" TBLPROPERTIES ("pipelines.autoOptimize.zOrderCols" = "location_code") AS SELECT sum(revol_bal) AS bal, addr_state AS location_code FROM historical_txs GROUP BY addr_state UNION SELECT sum(balance) AS bal, country_code AS location_code FROM cleaned_new_txs GROUP BY country_code
CREATE MATERIALIZED VIEW new_loan_balances_by_cost_center COMMENT "Live table of new loan balances for consumption by different cost centers" AS SELECT sum(balance) as sum_balance, cost_center_code FROM cleaned_new_txs GROUP BY cost_center_code
CREATE MATERIALIZED VIEW new_loan_balances_by_country COMMENT "Live table of new loan balances per country" AS SELECT sum(count) as sum_count, country_code FROM cleaned_new_txs GROUP BY country_code
%md ## Next steps Your DLT pipeline is ready to be started. <a dbdemos-pipeline-id="dlt-loans" href="/#joblist/pipelines/460f840c-9ecc-4d19-a661-f60fd3a88297">Click here to access the pipeline</a> created for you using this notebook. To create a new one, Open the DLT menu, create a pipeline and select this notebook to run it. To generate sample data, please run the [companion notebook]($./_resources/00-Loan-Data-Generator) (make sure the path where you read and write the data are the same!) Datas Analyst can start using DBSQL to analyze data and track our Loan metrics. Data Scientist can also access the data to start building models to predict payment default or other more advanced use-cases.
Next steps
Your DLT pipeline is ready to be started. Click here to access the pipeline created for you using this notebook.
To create a new one, Open the DLT menu, create a pipeline and select this notebook to run it. To generate sample data, please run the companion notebook (make sure the path where you read and write the data are the same!)
Datas Analyst can start using DBSQL to analyze data and track our Loan metrics. Data Scientist can also access the data to start building models to predict payment default or other more advanced use-cases.
%md-sandbox ## Tracking data quality Expectations stats are automatically available as system table. This information let you monitor your data ingestion quality. You can leverage DBSQL to request these table and build custom alerts based on the metrics your business is tracking. See [how to access your DLT metrics]($./03-Log-Analysis) <img width="500" src="https://github.com/databricks-demos/dbdemos-resources/blob/main/images/product/dlt/dlt-loans-dashboard.png?raw=true"> <a dbdemos-dashboard-id="dlt-expectations" href='/sql/dashboardsv3/01ef00cc36721f9e9f2028ee75723cc1' target="_blank">Data Quality Dashboard example</a>