Contents

spark.conf.set(
  "spark.sql.streaming.stateStore.providerClass",
  "com.databricks.sql.streaming.state.RocksDBStateStoreProvider"
)

from pyspark.sql.streaming import StatefulProcessor, StatefulProcessorHandle
from pyspark.sql.types import *
from pyspark.sql import Row
from pyspark.sql.functions import explode, lit, expr, col, current_timestamp, minute, date_trunc
import pandas as pd
from decimal import Decimal
from typing import Iterator
from datetime import datetime, timedelta
import uuid
import time
# spark streaming dataframe, events_df
display(events_df.limit(5))
class EventDataTransformer(StatefulProcessor):
    def init(self, handle: StatefulProcessorHandle) -> None:
        ############################
        ##### State definition #####
        ############################
        ### Define what we want to hold in the state, and what it will look like ###

        # 🍽️ when the first item finished being prepared
        first_ts_schema = StructType([StructField("first_ts", TimestampType(), True)])
        self.first_ts = handle.getValueState("first_ts", first_ts_schema)

        # 🍽️ when was the order finished cooking, keep this to establish pickup delays
        finished_ts_schema = StructType([StructField("finished_ts", TimestampType(), True)])
        self.finished_ts = handle.getValueState("finished_ts", finished_ts_schema)

        # 🚗 when was the order picked up by the driver, keep this to establish pickup delays
        pickup_ts_schema = StructType([StructField("pickup_ts", TimestampType(), True)])
        self.pickup_ts = handle.getValueState("pickup_ts", finished_ts_schema)

        # 🚗 the latest timestamp we have that the driver is still driving
        delivering_schema = StructType([StructField("delivering_ts", TimestampType(), True)])
        self.delivering_ts = handle.getValueState("delivering_ts", delivering_schema)

        # 💸 the price of the order
        price_schema = StructType([StructField("price", DecimalType(10,2), True)])
        self.price = handle.getValueState("price", price_schema)

        # 🔥 whether the order was cooked (no refunds for cold salads)
        hot_schema = StructType([StructField("hot_flag", BooleanType(), True)])
        self.hot_flag = handle.getValueState("hot_flag", hot_schema)

    def handleInputRows(self, key, rows, timerValues) -> Iterator[pd.DataFrame]:
        # 🚩 define flags
        refund_flag = pickup_delay = driver_delay = hot_bool = False
        # 🕣 define timestamps
        finished_ts = pickup_ts = delivering_ts = first_ts = max_ts = min_ts = None
        # 🔢 define decimals
        price_dec = Decimal('NaN')

        for pdf in rows: 
            ########################
            ##### Update State #####
            ########################

            #### 🍽️ First menu item finished time ####
            first_pdf = pdf[pdf['event_type'] == 'completed'] 
            if first_pdf.empty:
                continue
            #if nothing is in the state, then update it
            elif not self.first_ts.exists():
                first_ts = first_pdf['event_ts'].min() # technically there's only one
                self.first_ts.update((first_ts,))            
            # otherwise if the state is more than the latest menu item, then overwrite the state
            elif self.first_ts.get()[0] > first_pdf['event_ts'].min():   
                first_ts = first_pdf['event_ts'].min() 
                self.first_ts.update((first_ts,))  
            # otherwise retrieve it from the state
            else:
                first_ts = self.first_ts.get()[0]                 
            
            #### 🍽️ Preparation finished time ####
            # now add the finished time to the state 
            finished_pdf = pdf[pdf['event_type'] == 'finished']
            if finished_pdf.empty:
                continue
            # if the finished_ts doesn't exist then update it
            elif not self.finished_ts.exists():
                fin_ts = finished_pdf['event_ts'].max() # technically there's only one
                self.finished_ts.update((fin_ts,))  
            # otherwise retrieve it from the state
            else:
                fin_ts = self.finished_ts.get()[0] 

            #### 🚗 Pickup time ####  
            # now add the driver pickup time to the state 
            pickup_pdf = pdf[pdf['event_type'] == 'pickup']
            if pickup_pdf.empty:
                continue
            # if the pickup_ts doesn't exist then update it
            elif not self.pickup_ts.exists():
                pu_ts = pickup_pdf['event_ts'].max() # technically there's only one
                self.pickup_ts.update((pu_ts,))  
            # otherwise retrieve it from the state
            else:
                pu_ts = self.pickup_ts.get()[0] 

            #### 🚗 delivering time ####  
            # now add the driver pickup time to the state 
            delivering_pdf = pdf[pdf['event_type'] == 'delivering']
            if delivering_pdf.empty:
                continue
            # if the delivering_ts doesn't exist then update it
            elif not self.delivering_ts.exists():
                del_ts = delivering_pdf['event_ts'].max() # we want the most recent one
                self.delivering_ts.update((del_ts,))  
            # prep for edge case where data is out of order and state is larger than 
            elif self.delivering_ts.get()[0] > delivering_pdf['event_ts'].max():
                del_ts = self.delivering_ts.get()[0] 
            # otherwise update it
            else:
                del_ts = delivering_pdf['event_ts'].max()
                self.delivering_ts.update((del_ts,)) 

            #### 💸 price ####  
            # hold on to the price 
            order_pdf = pdf[pdf['event_type'] == 'received']
            # if the price already exists in the state, get it
            if self.price.exists():
                price_dec = self.price.get()
            # if you don't have the data to update it, continue
            elif order_pdf.empty:
                continue
            # otherwise update it
            else:
                price_str = order_pdf['event_body'].iloc[0]
                price_dec = Decimal(price_str.split(':')[1].strip())
                self.price.update((price_dec,))  

            #### 🔥 hot flag ####  
            # store whether any items were cooked
            order_temp_pdf = pdf[pdf['event_type'] == 'completed']
            if order_temp_pdf.empty:
                continue
            # if the flag already exists in the state, get it
            # overwrite it with hot flag if needed
            elif self.hot_flag.exists():
                hot_max = self.hot_flag.get()
                order_temp_pdf['hot_flag'] = order_temp_pdf['event_body'].str.extract(r'hot_flag:\s*(\w+)')[0]
                order_temp_pdf['hot_flag_bool'] = order_temp_pdf['hot_flag'].str.lower() == "true"
                hot_bool = order_temp_pdf['hot_flag_bool'].iloc[0]
                hot_max = max(hot_bool, hot_max)
                self.hot_flag.update((hot_max,))
            # otherwise update it
            else:
                order_temp_pdf['hot_flag'] = order_temp_pdf['event_body'].str.extract(r'hot_flag:\s*(\w+)')[0]
                order_temp_pdf['hot_flag_bool'] = order_temp_pdf['hot_flag'].str.lower() == "true"
                hot_max = order_temp_pdf['hot_flag_bool'].iloc[0]                
                self.hot_flag.update((hot_max,))

            ########################
            ##### Refund Logic #####
            ########################
            
            # 🥶 find if items were cooked too far apart
            if fin_ts - first_ts > timedelta(minutes=20): 
                cooking_delay = True

            # 🕣 figure out if the order was late being picked up
            if pu_ts - fin_ts > timedelta(minutes=5):
                pickup_delay = True

            # 🕣 figure out if the driver is still driving after 45 mins
            if del_ts - pu_ts > timedelta(minutes=45):
                driver_delay = True

            ##########################
            ##### State eviction #####
            ##########################

            # 💸 if the customer is eligible for a refund, then purge from the state before their order arrives
            if pickup_delay == True and driver_delay == True and price_dec >= 50 and hot_max == True:
                refund_flag = True
                yield pd.DataFrame([{
                    "order_id": str(key[0]),          
                    "fin_ts": fin_ts,         
                    "del_ts": del_ts,
                    "price": price_dec,
                    "refund": refund_flag          
                }])  

            # 🚗 if the order has been delivered, then purge it from the state anyway        
            elif (pdf['event_type'] == 'delivered').any():
                refund_flag = False
                yield pd.DataFrame([{
                    "order_id": str(key[0]),         
                    "fin_ts": fin_ts,         
                    "del_ts": del_ts,
                    "price": price_dec,
                    "refund": refund_flag          
                }])
          
    def close(self) -> None:
        pass
output_schema = StructType([
    StructField("order_id", StringType()),
    StructField("fin_ts", TimestampType()),
    StructField("del_ts", TimestampType()),
    StructField("price", DecimalType(10,2)),
    StructField("refund", BooleanType())
])
display(events_df
 .groupBy("order_id") 
 .transformWithStateInPandas(
        EventDataTransformer(),
        outputStructType=output_schema,
        outputMode="Append",
        timeMode="None"
    )
 )
display(events_df
 .groupBy("order_id") 
 .transformWithStateInPandas(
        EventDataTransformer(),
        outputStructType=output_schema,
        outputMode="Append",
        timeMode="None"
    )
  .filter("del_ts > current_timestamp() - interval 1 hour")
 )