class EventDataTransformer(StatefulProcessor):
def init(self, handle: StatefulProcessorHandle) -> None:
############################
##### State definition #####
############################
### Define what we want to hold in the state, and what it will look like ###
# 🍽️ when the first item finished being prepared
first_ts_schema = StructType([StructField("first_ts", TimestampType(), True)])
self.first_ts = handle.getValueState("first_ts", first_ts_schema)
# 🍽️ when was the order finished cooking, keep this to establish pickup delays
finished_ts_schema = StructType([StructField("finished_ts", TimestampType(), True)])
self.finished_ts = handle.getValueState("finished_ts", finished_ts_schema)
# 🚗 when was the order picked up by the driver, keep this to establish pickup delays
pickup_ts_schema = StructType([StructField("pickup_ts", TimestampType(), True)])
self.pickup_ts = handle.getValueState("pickup_ts", finished_ts_schema)
# 🚗 the latest timestamp we have that the driver is still driving
delivering_schema = StructType([StructField("delivering_ts", TimestampType(), True)])
self.delivering_ts = handle.getValueState("delivering_ts", delivering_schema)
# 💸 the price of the order
price_schema = StructType([StructField("price", DecimalType(10,2), True)])
self.price = handle.getValueState("price", price_schema)
# 🔥 whether the order was cooked (no refunds for cold salads)
hot_schema = StructType([StructField("hot_flag", BooleanType(), True)])
self.hot_flag = handle.getValueState("hot_flag", hot_schema)
def handleInputRows(self, key, rows, timerValues) -> Iterator[pd.DataFrame]:
# 🚩 define flags
refund_flag = pickup_delay = driver_delay = hot_bool = False
# 🕣 define timestamps
finished_ts = pickup_ts = delivering_ts = first_ts = max_ts = min_ts = None
# 🔢 define decimals
price_dec = Decimal('NaN')
for pdf in rows:
########################
##### Update State #####
########################
#### 🍽️ First menu item finished time ####
first_pdf = pdf[pdf['event_type'] == 'completed']
if first_pdf.empty:
continue
#if nothing is in the state, then update it
elif not self.first_ts.exists():
first_ts = first_pdf['event_ts'].min() # technically there's only one
self.first_ts.update((first_ts,))
# otherwise if the state is more than the latest menu item, then overwrite the state
elif self.first_ts.get()[0] > first_pdf['event_ts'].min():
first_ts = first_pdf['event_ts'].min()
self.first_ts.update((first_ts,))
# otherwise retrieve it from the state
else:
first_ts = self.first_ts.get()[0]
#### 🍽️ Preparation finished time ####
# now add the finished time to the state
finished_pdf = pdf[pdf['event_type'] == 'finished']
if finished_pdf.empty:
continue
# if the finished_ts doesn't exist then update it
elif not self.finished_ts.exists():
fin_ts = finished_pdf['event_ts'].max() # technically there's only one
self.finished_ts.update((fin_ts,))
# otherwise retrieve it from the state
else:
fin_ts = self.finished_ts.get()[0]
#### 🚗 Pickup time ####
# now add the driver pickup time to the state
pickup_pdf = pdf[pdf['event_type'] == 'pickup']
if pickup_pdf.empty:
continue
# if the pickup_ts doesn't exist then update it
elif not self.pickup_ts.exists():
pu_ts = pickup_pdf['event_ts'].max() # technically there's only one
self.pickup_ts.update((pu_ts,))
# otherwise retrieve it from the state
else:
pu_ts = self.pickup_ts.get()[0]
#### 🚗 delivering time ####
# now add the driver pickup time to the state
delivering_pdf = pdf[pdf['event_type'] == 'delivering']
if delivering_pdf.empty:
continue
# if the delivering_ts doesn't exist then update it
elif not self.delivering_ts.exists():
del_ts = delivering_pdf['event_ts'].max() # we want the most recent one
self.delivering_ts.update((del_ts,))
# prep for edge case where data is out of order and state is larger than
elif self.delivering_ts.get()[0] > delivering_pdf['event_ts'].max():
del_ts = self.delivering_ts.get()[0]
# otherwise update it
else:
del_ts = delivering_pdf['event_ts'].max()
self.delivering_ts.update((del_ts,))
#### 💸 price ####
# hold on to the price
order_pdf = pdf[pdf['event_type'] == 'received']
# if the price already exists in the state, get it
if self.price.exists():
price_dec = self.price.get()
# if you don't have the data to update it, continue
elif order_pdf.empty:
continue
# otherwise update it
else:
price_str = order_pdf['event_body'].iloc[0]
price_dec = Decimal(price_str.split(':')[1].strip())
self.price.update((price_dec,))
#### 🔥 hot flag ####
# store whether any items were cooked
order_temp_pdf = pdf[pdf['event_type'] == 'completed']
if order_temp_pdf.empty:
continue
# if the flag already exists in the state, get it
# overwrite it with hot flag if needed
elif self.hot_flag.exists():
hot_max = self.hot_flag.get()
order_temp_pdf['hot_flag'] = order_temp_pdf['event_body'].str.extract(r'hot_flag:\s*(\w+)')[0]
order_temp_pdf['hot_flag_bool'] = order_temp_pdf['hot_flag'].str.lower() == "true"
hot_bool = order_temp_pdf['hot_flag_bool'].iloc[0]
hot_max = max(hot_bool, hot_max)
self.hot_flag.update((hot_max,))
# otherwise update it
else:
order_temp_pdf['hot_flag'] = order_temp_pdf['event_body'].str.extract(r'hot_flag:\s*(\w+)')[0]
order_temp_pdf['hot_flag_bool'] = order_temp_pdf['hot_flag'].str.lower() == "true"
hot_max = order_temp_pdf['hot_flag_bool'].iloc[0]
self.hot_flag.update((hot_max,))
########################
##### Refund Logic #####
########################
# 🥶 find if items were cooked too far apart
if fin_ts - first_ts > timedelta(minutes=20):
cooking_delay = True
# 🕣 figure out if the order was late being picked up
if pu_ts - fin_ts > timedelta(minutes=5):
pickup_delay = True
# 🕣 figure out if the driver is still driving after 45 mins
if del_ts - pu_ts > timedelta(minutes=45):
driver_delay = True
##########################
##### State eviction #####
##########################
# 💸 if the customer is eligible for a refund, then purge from the state before their order arrives
if pickup_delay == True and driver_delay == True and price_dec >= 50 and hot_max == True:
refund_flag = True
yield pd.DataFrame([{
"order_id": str(key[0]),
"fin_ts": fin_ts,
"del_ts": del_ts,
"price": price_dec,
"refund": refund_flag
}])
# 🚗 if the order has been delivered, then purge it from the state anyway
elif (pdf['event_type'] == 'delivered').any():
refund_flag = False
yield pd.DataFrame([{
"order_id": str(key[0]),
"fin_ts": fin_ts,
"del_ts": del_ts,
"price": price_dec,
"refund": refund_flag
}])
def close(self) -> None:
pass