databricks-logo

01-DLT-Loan-pipeline-SQL

(SQL)
Loading...

Our datasets are coming from 3 different systems and saved under a cloud storage folder (S3/ADLS/GCS):

  • loans/raw_transactions (loans uploader here in every few minutes)
  • loans/ref_accounting_treatment (reference table, mostly static)
  • loans/historical_loans (loan from legacy system, new data added every week)

Let's ingest this data incrementally, and then compute a couple of aggregates that we'll need for our final Dashboard to report our KPI.

-- %fs ls /Volumes/main__build/dbdemos_dlt_loan/raw_data/raw_transactions
CREATE STREAMING TABLE raw_txs
  COMMENT "New raw loan data incrementally ingested from cloud object storage landing zone"
AS SELECT * FROM STREAM read_files('/Volumes/main__build/dbdemos_dlt_loan/raw_data/raw_transactions', format => 'json', inferColumnTypes => true)
CREATE STREAMING TABLE ref_accounting_treatment
  COMMENT "Lookup mapping for accounting codes"
AS SELECT * FROM STREAM delta.`/Volumes/main__build/dbdemos_dlt_loan/raw_data/ref_accounting_treatment`
-- as this is only refreshed at a weekly basis, we can lower the interval
CREATE STREAMING TABLE raw_historical_loans
  TBLPROPERTIES ("pipelines.trigger.interval"="6 hour")
  COMMENT "Raw historical transactions"
AS SELECT * FROM STREAM read_files('/Volumes/main__build/dbdemos_dlt_loan/raw_data/historical_loans', format => 'csv', inferColumnTypes => true)
CREATE STREAMING LIVE VIEW new_txs 
  COMMENT "Livestream of new transactions"
AS SELECT txs.*, ref.accounting_treatment as accounting_treatment FROM stream(raw_txs) txs
  INNER JOIN ref_accounting_treatment ref ON txs.accounting_treatment_id = ref.id
CREATE STREAMING TABLE cleaned_new_txs (
  CONSTRAINT `Payments should be this year`  EXPECT (next_payment_date > date('2020-12-31')),
  CONSTRAINT `Balance should be positive`    EXPECT (balance > 0 AND arrears_balance > 0) ON VIOLATION DROP ROW,
  CONSTRAINT `Cost center must be specified` EXPECT (cost_center_code IS NOT NULL) ON VIOLATION FAIL UPDATE
)
  COMMENT "Livestream of new transactions, cleaned and compliant"
AS SELECT * from STREAM(new_txs)
-- This is the inverse condition of the above statement to quarantine incorrect data for further analysis.
CREATE STREAMING TABLE quarantine_bad_txs (
  CONSTRAINT `Payments should be this year`  EXPECT (next_payment_date <= date('2020-12-31')),
  CONSTRAINT `Balance should be positive`    EXPECT (balance <= 0 OR arrears_balance <= 0) ON VIOLATION DROP ROW
)
  COMMENT "Incorrect transactions requiring human analysis"
AS SELECT * from STREAM(new_txs)
CREATE MATERIALIZED VIEW historical_txs
  COMMENT "Historical loan transactions"
AS SELECT l.*, ref.accounting_treatment as accounting_treatment FROM raw_historical_loans l
  INNER JOIN ref_accounting_treatment ref ON l.accounting_treatment_id = ref.id
CREATE MATERIALIZED VIEW total_loan_balances
  COMMENT "Combines historical and new loan data for unified rollup of loan balances"
  TBLPROPERTIES ("pipelines.autoOptimize.zOrderCols" = "location_code")
AS SELECT sum(revol_bal)  AS bal, addr_state   AS location_code FROM historical_txs  GROUP BY addr_state
  UNION SELECT sum(balance) AS bal, country_code AS location_code FROM cleaned_new_txs GROUP BY country_code
CREATE MATERIALIZED VIEW new_loan_balances_by_cost_center
  COMMENT "Live table of new loan balances for consumption by different cost centers"
AS SELECT sum(balance) as sum_balance, cost_center_code FROM cleaned_new_txs
  GROUP BY cost_center_code
CREATE MATERIALIZED VIEW new_loan_balances_by_country
  COMMENT "Live table of new loan balances per country"
AS SELECT sum(count) as sum_count, country_code FROM cleaned_new_txs GROUP BY country_code

Next steps

Your DLT pipeline is ready to be started. Click here to access the pipeline created for you using this notebook.

To create a new one, Open the DLT menu, create a pipeline and select this notebook to run it. To generate sample data, please run the companion notebook (make sure the path where you read and write the data are the same!)

Datas Analyst can start using DBSQL to analyze data and track our Loan metrics. Data Scientist can also access the data to start building models to predict payment default or other more advanced use-cases.

;