databricks-logo

    01-Delta-session-BRONZE

    (Python)
    Loading...

    First, make sure events are published to your kafka queue

    Start the _00-Delta-session-PRODUCER notebook to send messages to your kafka queue.

    %run ./_resources/00-setup $reset_all_data=false
    %sql
    CREATE TABLE IF NOT EXISTS events_raw (key string, value string) TBLPROPERTIES (delta.autoOptimize.optimizeWrite = true, delta.autoOptimize.autoCompact = true);
    dbutils.fs.rm(volume_folder+"/checkpoints", True)
    True
    # NOTE: the demo runs with Kafka, and dbdemos doesn't publically expose its demo kafka servers. Use your own IPs to run the demo properly
    kafka_bootstrap_servers_tls = "b-1.oneenvkafka.fso631.c14.kafka.us-west-2.amazonaws.com:9092,b-2.oneenvkafka.fso631.c14.kafka.us-west-2.amazonaws.com:9092,b-3.oneenvkafka.fso631.c14.kafka.us-west-2.amazonaws.com:9092"
    #kafka_bootstrap_servers_tls = "<Replace by your own kafka servers>"
    # Also make sure to have the proper instance profile to allow the access if you're on AWS.
    
    stream = (spark
        .readStream
           #=== Configurations for Kafka streams ===
          .format("kafka")
          .option("kafka.bootstrap.servers", kafka_bootstrap_servers_tls) 
          .option("kafka.security.protocol", "PLAINTEXT") #SSL
          .option("subscribe", "dbdemos-sessions") #kafka topic
          .option("startingOffsets", "latest") #Consume messages from the end
          .option("maxOffsetsPerTrigger", "10000") # Control ingestion rate - backpressure
          #.option("ignoreChanges", "true")
        .load()
        .withColumn('key', col('key').cast('string'))
        .withColumn('value', col('value').cast('string'))
        .writeStream
           # === Write to the delta table ===
          .format("delta")
          .trigger(processingTime="20 seconds")
          #.trigger(availableNow=True) --use this for serverless
          .option("checkpointLocation", volume_folder+"/checkpoints/bronze")
          .option("mergeSchema", "true")
          .outputMode("append")
          .table("events_raw"))
    
    DBDemos.wait_for_table("events_raw")
    Stream stopped...
    %sql SELECT * FROM events_raw;

    Our Raw events are now ready to be analyzed

    It's now easy to run queries in our events_raw table. Our data is saved as JSON, databricks makes it easy to query:

    %sql
    select count(*), value:platform as platform from events_raw group by platform;

    Searching for duplicate events

    As you can see, our producer sends incorrect messages.

    Not only we have null event_id from time to time, but we also have duplicate events (identical events being send twice with the same ID and exact same content)

    %sql
    select count(*) event_count, value :event_id event_id, first(value) from events_raw
      group by event_id
        having event_count > 1
      order by event_id;
    DBDemos.stop_all_streams(sleep_time=120)
    Stopping 1 streams All stream stopped

    Next steps: Cleanup data and remove duplicates

    It looks like we have duplicate event in our dataset. Let's see how we can perform some cleanup.

    In addition, reading from JSON isn't super efficient, and what if our json changes over time ?

    While we can explore the dataset using spark json manipulation, this isn't ideal. For example is the json in our message changes after a few month, our request will fail.

    Futhermore, performances won't be great at scale: because all our data is stored as a unique, we can't leverage data skipping and a columnar format

    That's why we need another table: A Silver Table!

    ;