%md-sandbox # Streaming on Databricks with Spark and Delta Lake Streaming on Databricks is greatly simplified using Delta Live Table (DLT). <br/> DLT lets you write your entire data pipeline, supporting streaming transformation using SQL or python and removing all the technical challenges. We strongly recommend implementing your pipelines using DLT as this will allow for much robust pipelines, enforcing data quality and greatly accelerating project delivery.<br/> *For a DLT example, please install `dbdemos.install('dlt-loans')` or the C360 Lakehouse demo: `dbdemos.install('lakehouse-retail-churn')`* Spark Streaming API offers lower-level primitive offering more advanced control, such as `foreachBatch` and custom streaming operation with `applyInPandasWithState`. Some advanced use-case can be implemented using these APIs, and this is what we'll focus on. ## Building a sessionization stream with Delta Lake and Spark Streaming ### What's sessionization? <div style="float:right" ><img src="https://raw.githubusercontent.com/databricks-demos/dbdemos-resources/main/images/product/streaming-sessionization/session_diagram.png" style="height: 200px; margin:0px 0px 0px 10px"/></div> Sessionization is the process of finding time-bounded user session from a flow of event, grouping all events happening around the same time (ex: number of clicks, pages most view etc) When there is a temporal gap greater than X minute, we decide to split the session in 2 distinct sessions ### Why is that important? Understanding sessions is critical for a lot of use cases: - Detect cart abandonment in your online shot, and automatically trigger marketing actions as follow-up to increase your sales - Build better attribution model for your affiliation, based on the user actions during each session - Understand user journey in your website, and provide better experience to increase your user retention - ... ### Sessionization with Spark & Delta Sessionization can be done in many ways. SQL windowing is often used but quickly become too restricted for complex use-case. Instead, we'll be using the following Delta Architecture: <img src="https://raw.githubusercontent.com/databricks-demos/dbdemos-resources/main/images/product/streaming-sessionization/sessionization.png" width="1200px"> Being able to process and aggregate your sessions in a Batch and Streaming fashion can be a real challenge, especially when updates are required in your historical data! Thankfully, Delta and Spark can simplify our job, using Spark Streaming function with a custom stateful operation (`flatMapGroupsWithState` operator), in a streaming and batch fashion. Let's build our Session job to detect cart abandonment ! *Note: again, this is an advanced demo - if you're starting with Databricks and are looking for a simple streaming pipeline we recommand going with DLT instead.* <!-- Collect usage data (view). Remove it to disable collection or disable tracker during installation. View README for more details. --> <img width="1px" src="https://ppxrzfxige.execute-api.us-west-2.amazonaws.com/v1/analytics?category=data-engineering¬ebook=01-Delta-session-BRONZE&demo_name=streaming-sessionization&event=VIEW">
%md ## First, make sure events are published to your kafka queue Start the [_00-Delta-session-PRODUCER]($./_00-Delta-session-PRODUCER) notebook to send messages to your kafka queue.
First, make sure events are published to your kafka queue
Start the _00-Delta-session-PRODUCER notebook to send messages to your kafka queue.
%run ./_resources/00-setup $reset_all_data=false
%md-sandbox ##  1/ Bronze table: store the stream as Delta Lake table <img style="float:right; height: 250px; margin: 0px 30px 0px 30px" src="https://raw.githubusercontent.com/databricks-demos/dbdemos-resources/main/images/product/streaming-sessionization/sessionization_bronze.png"> The first step is to consume data from our streaming engine (Kafka, Kinesis, Pulsar etc.) and save it in our Data Lake. We won't be doing any transformation, the goal is to be able to re-process all the data and change/improve the downstream logic when needed #### Solving small files and compaction issues Everytime we capture kafka events, they'll be stored in our table and this will create new files. After several days, we'll endup with millions of small files leading to performance issues.<br/> Databricks solves that with autoOptimize & autoCompact, 2 properties to set at the table level. *Note that if the table isn't created with all the columns. The engine will automatically add the new column from kafka at write time, merging the schema gracefuly*
%sql CREATE TABLE IF NOT EXISTS events_raw (key string, value string) TBLPROPERTIES (delta.autoOptimize.optimizeWrite = true, delta.autoOptimize.autoCompact = true);
dbutils.fs.rm(volume_folder+"/checkpoints", True)
# NOTE: the demo runs with Kafka, and dbdemos doesn't publically expose its demo kafka servers. Use your own IPs to run the demo properly kafka_bootstrap_servers_tls = "b-1.oneenvkafka.fso631.c14.kafka.us-west-2.amazonaws.com:9092,b-2.oneenvkafka.fso631.c14.kafka.us-west-2.amazonaws.com:9092,b-3.oneenvkafka.fso631.c14.kafka.us-west-2.amazonaws.com:9092" #kafka_bootstrap_servers_tls = "<Replace by your own kafka servers>" # Also make sure to have the proper instance profile to allow the access if you're on AWS. stream = (spark .readStream #=== Configurations for Kafka streams === .format("kafka") .option("kafka.bootstrap.servers", kafka_bootstrap_servers_tls) .option("kafka.security.protocol", "PLAINTEXT") #SSL .option("subscribe", "dbdemos-sessions") #kafka topic .option("startingOffsets", "latest") #Consume messages from the end .option("maxOffsetsPerTrigger", "10000") # Control ingestion rate - backpressure #.option("ignoreChanges", "true") .load() .withColumn('key', col('key').cast('string')) .withColumn('value', col('value').cast('string')) .writeStream # === Write to the delta table === .format("delta") .trigger(processingTime="20 seconds") #.trigger(availableNow=True) --use this for serverless .option("checkpointLocation", volume_folder+"/checkpoints/bronze") .option("mergeSchema", "true") .outputMode("append") .table("events_raw")) DBDemos.wait_for_table("events_raw")
%sql SELECT * FROM events_raw;
_sqldf
and can be used in other Python and SQL cells.%md ### Our Raw events are now ready to be analyzed It's now easy to run queries in our events_raw table. Our data is saved as JSON, databricks makes it easy to query:
Our Raw events are now ready to be analyzed
It's now easy to run queries in our events_raw table. Our data is saved as JSON, databricks makes it easy to query:
%sql select count(*), value:platform as platform from events_raw group by platform;
_sqldf
and can be used in other Python and SQL cells.%md ## Searching for duplicate events As you can see, our producer sends incorrect messages. Not only we have null event_id from time to time, but we also have duplicate events (identical events being send twice with the same ID and exact same content)
Searching for duplicate events
As you can see, our producer sends incorrect messages.
Not only we have null event_id from time to time, but we also have duplicate events (identical events being send twice with the same ID and exact same content)
%sql select count(*) event_count, value :event_id event_id, first(value) from events_raw group by event_id having event_count > 1 order by event_id;
_sqldf
and can be used in other Python and SQL cells.DBDemos.stop_all_streams(sleep_time=120)
%md ## Next steps: Cleanup data and remove duplicates It looks like we have duplicate event in our dataset. Let's see how we can perform some cleanup. In addition, reading from JSON isn't super efficient, and what if our json changes over time ? While we can explore the dataset using spark json manipulation, this isn't ideal. For example is the json in our message changes after a few month, our request will fail. Futhermore, performances won't be great at scale: because all our data is stored as a unique, we can't leverage data skipping and a columnar format That's why we need another table: **[A Silver Table!]($./02-Delta-session-SILVER)**
Next steps: Cleanup data and remove duplicates
It looks like we have duplicate event in our dataset. Let's see how we can perform some cleanup.
In addition, reading from JSON isn't super efficient, and what if our json changes over time ?
While we can explore the dataset using spark json manipulation, this isn't ideal. For example is the json in our message changes after a few month, our request will fail.
Futhermore, performances won't be great at scale: because all our data is stored as a unique, we can't leverage data skipping and a columnar format
That's why we need another table: A Silver Table!