Skip to content

Commit 01af67e

Browse files
Add files via upload
1 parent a4cc6eb commit 01af67e

File tree

1 file changed

+378
-0
lines changed

1 file changed

+378
-0
lines changed
Lines changed: 378 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,378 @@
1+
{
2+
"cells": [
3+
{
4+
"cell_type": "code",
5+
"execution_count": 0,
6+
"metadata": {
7+
"application/vnd.databricks.v1+cell": {
8+
"cellMetadata": {
9+
"byteLimit": 2048000,
10+
"rowLimit": 10000
11+
},
12+
"inputWidgets": {},
13+
"nuid": "6c825ee7-038b-4194-a661-59462dd72f79",
14+
"showTitle": true,
15+
"tableResultSettingsMap": {},
16+
"title": "Setup"
17+
}
18+
},
19+
"outputs": [],
20+
"source": [
21+
"spark.conf.set(\n",
22+
" \"spark.sql.streaming.stateStore.providerClass\",\n",
23+
" \"com.databricks.sql.streaming.state.RocksDBStateStoreProvider\"\n",
24+
")\n",
25+
"\n",
26+
"from pyspark.sql.streaming import StatefulProcessor, StatefulProcessorHandle\n",
27+
"from pyspark.sql.types import *\n",
28+
"from pyspark.sql import Row\n",
29+
"from pyspark.sql.functions import explode, lit, expr, col, current_timestamp, minute, date_trunc\n",
30+
"import pandas as pd\n",
31+
"from decimal import Decimal\n",
32+
"from typing import Iterator\n",
33+
"from datetime import datetime, timedelta\n",
34+
"import uuid\n",
35+
"import time"
36+
]
37+
},
38+
{
39+
"cell_type": "code",
40+
"execution_count": 0,
41+
"metadata": {
42+
"application/vnd.databricks.v1+cell": {
43+
"cellMetadata": {
44+
"byteLimit": 2048000,
45+
"rowLimit": 10000
46+
},
47+
"inputWidgets": {},
48+
"nuid": "d29c4c73-6d31-4350-a78c-570d7318fac9",
49+
"showTitle": true,
50+
"tableResultSettingsMap": {},
51+
"title": "Let's look at our events data"
52+
}
53+
},
54+
"outputs": [],
55+
"source": [
56+
"# spark streaming dataframe, events_df\n",
57+
"display(events_df.limit(5))"
58+
]
59+
},
60+
{
61+
"cell_type": "code",
62+
"execution_count": 0,
63+
"metadata": {
64+
"application/vnd.databricks.v1+cell": {
65+
"cellMetadata": {
66+
"byteLimit": 2048000,
67+
"rowLimit": 10000
68+
},
69+
"inputWidgets": {},
70+
"nuid": "2df8932c-3e04-4bfe-930e-7d16e5dfcb1f",
71+
"showTitle": true,
72+
"tableResultSettingsMap": {},
73+
"title": "✨🪄✨"
74+
}
75+
},
76+
"outputs": [],
77+
"source": [
78+
"class EventDataTransformer(StatefulProcessor):\n",
79+
" def init(self, handle: StatefulProcessorHandle) -> None:\n",
80+
" ############################\n",
81+
" ##### State definition #####\n",
82+
" ############################\n",
83+
" ### Define what we want to hold in the state, and what it will look like ###\n",
84+
"\n",
85+
" # 🍽️ when the first item finished being prepared\n",
86+
" first_ts_schema = StructType([StructField(\"first_ts\", TimestampType(), True)])\n",
87+
" self.first_ts = handle.getValueState(\"first_ts\", first_ts_schema)\n",
88+
"\n",
89+
" # 🍽️ when was the order finished cooking, keep this to establish pickup delays\n",
90+
" finished_ts_schema = StructType([StructField(\"finished_ts\", TimestampType(), True)])\n",
91+
" self.finished_ts = handle.getValueState(\"finished_ts\", finished_ts_schema)\n",
92+
"\n",
93+
" # 🚗 when was the order picked up by the driver, keep this to establish pickup delays\n",
94+
" pickup_ts_schema = StructType([StructField(\"pickup_ts\", TimestampType(), True)])\n",
95+
" self.pickup_ts = handle.getValueState(\"pickup_ts\", finished_ts_schema)\n",
96+
"\n",
97+
" # 🚗 the latest timestamp we have that the driver is still driving\n",
98+
" delivering_schema = StructType([StructField(\"delivering_ts\", TimestampType(), True)])\n",
99+
" self.delivering_ts = handle.getValueState(\"delivering_ts\", delivering_schema)\n",
100+
"\n",
101+
" # 💸 the price of the order\n",
102+
" price_schema = StructType([StructField(\"price\", DecimalType(10,2), True)])\n",
103+
" self.price = handle.getValueState(\"price\", price_schema)\n",
104+
"\n",
105+
" # 🔥 whether the order was cooked (no refunds for cold salads)\n",
106+
" hot_schema = StructType([StructField(\"hot_flag\", BooleanType(), True)])\n",
107+
" self.hot_flag = handle.getValueState(\"hot_flag\", hot_schema)\n",
108+
"\n",
109+
" def handleInputRows(self, key, rows, timerValues) -> Iterator[pd.DataFrame]:\n",
110+
" # 🚩 define flags\n",
111+
" refund_flag = pickup_delay = driver_delay = hot_bool = False\n",
112+
" # 🕣 define timestamps\n",
113+
" finished_ts = pickup_ts = delivering_ts = first_ts = max_ts = min_ts = None\n",
114+
" # 🔢 define decimals\n",
115+
" price_dec = Decimal('NaN')\n",
116+
"\n",
117+
" for pdf in rows: \n",
118+
" ########################\n",
119+
" ##### Update State #####\n",
120+
" ########################\n",
121+
"\n",
122+
" #### 🍽️ First menu item finished time ####\n",
123+
" first_pdf = pdf[pdf['event_type'] == 'completed'] \n",
124+
" if first_pdf.empty:\n",
125+
" continue\n",
126+
" #if nothing is in the state, then update it\n",
127+
" elif not self.first_ts.exists():\n",
128+
" first_ts = first_pdf['event_ts'].min() # technically there's only one\n",
129+
" self.first_ts.update((first_ts,)) \n",
130+
" # otherwise if the state is more than the latest menu item, then overwrite the state\n",
131+
" elif self.first_ts.get()[0] > first_pdf['event_ts'].min(): \n",
132+
" first_ts = first_pdf['event_ts'].min() \n",
133+
" self.first_ts.update((first_ts,)) \n",
134+
" # otherwise retrieve it from the state\n",
135+
" else:\n",
136+
" first_ts = self.first_ts.get()[0] \n",
137+
" \n",
138+
" #### 🍽️ Preparation finished time ####\n",
139+
" # now add the finished time to the state \n",
140+
" finished_pdf = pdf[pdf['event_type'] == 'finished']\n",
141+
" if finished_pdf.empty:\n",
142+
" continue\n",
143+
" # if the finished_ts doesn't exist then update it\n",
144+
" elif not self.finished_ts.exists():\n",
145+
" fin_ts = finished_pdf['event_ts'].max() # technically there's only one\n",
146+
" self.finished_ts.update((fin_ts,)) \n",
147+
" # otherwise retrieve it from the state\n",
148+
" else:\n",
149+
" fin_ts = self.finished_ts.get()[0] \n",
150+
"\n",
151+
" #### 🚗 Pickup time #### \n",
152+
" # now add the driver pickup time to the state \n",
153+
" pickup_pdf = pdf[pdf['event_type'] == 'pickup']\n",
154+
" if pickup_pdf.empty:\n",
155+
" continue\n",
156+
" # if the pickup_ts doesn't exist then update it\n",
157+
" elif not self.pickup_ts.exists():\n",
158+
" pu_ts = pickup_pdf['event_ts'].max() # technically there's only one\n",
159+
" self.pickup_ts.update((pu_ts,)) \n",
160+
" # otherwise retrieve it from the state\n",
161+
" else:\n",
162+
" pu_ts = self.pickup_ts.get()[0] \n",
163+
"\n",
164+
" #### 🚗 delivering time #### \n",
165+
" # now add the driver pickup time to the state \n",
166+
" delivering_pdf = pdf[pdf['event_type'] == 'delivering']\n",
167+
" if delivering_pdf.empty:\n",
168+
" continue\n",
169+
" # if the delivering_ts doesn't exist then update it\n",
170+
" elif not self.delivering_ts.exists():\n",
171+
" del_ts = delivering_pdf['event_ts'].max() # we want the most recent one\n",
172+
" self.delivering_ts.update((del_ts,)) \n",
173+
" # prep for edge case where data is out of order and state is larger than \n",
174+
" elif self.delivering_ts.get()[0] > delivering_pdf['event_ts'].max():\n",
175+
" del_ts = self.delivering_ts.get()[0] \n",
176+
" # otherwise update it\n",
177+
" else:\n",
178+
" del_ts = delivering_pdf['event_ts'].max()\n",
179+
" self.delivering_ts.update((del_ts,)) \n",
180+
"\n",
181+
" #### 💸 price #### \n",
182+
" # hold on to the price \n",
183+
" order_pdf = pdf[pdf['event_type'] == 'received']\n",
184+
" # if the price already exists in the state, get it\n",
185+
" if self.price.exists():\n",
186+
" price_dec = self.price.get()\n",
187+
" # if you don't have the data to update it, continue\n",
188+
" elif order_pdf.empty:\n",
189+
" continue\n",
190+
" # otherwise update it\n",
191+
" else:\n",
192+
" price_str = order_pdf['event_body'].iloc[0]\n",
193+
" price_dec = Decimal(price_str.split(':')[1].strip())\n",
194+
" self.price.update((price_dec,)) \n",
195+
"\n",
196+
" #### 🔥 hot flag #### \n",
197+
" # store whether any items were cooked\n",
198+
" order_temp_pdf = pdf[pdf['event_type'] == 'completed']\n",
199+
" if order_temp_pdf.empty:\n",
200+
" continue\n",
201+
" # if the flag already exists in the state, get it\n",
202+
" # overwrite it with hot flag if needed\n",
203+
" elif self.hot_flag.exists():\n",
204+
" hot_max = self.hot_flag.get()\n",
205+
" order_temp_pdf['hot_flag'] = order_temp_pdf['event_body'].str.extract(r'hot_flag:\\s*(\\w+)')[0]\n",
206+
" order_temp_pdf['hot_flag_bool'] = order_temp_pdf['hot_flag'].str.lower() == \"true\"\n",
207+
" hot_bool = order_temp_pdf['hot_flag_bool'].iloc[0]\n",
208+
" hot_max = max(hot_bool, hot_max)\n",
209+
" self.hot_flag.update((hot_max,))\n",
210+
" # otherwise update it\n",
211+
" else:\n",
212+
" order_temp_pdf['hot_flag'] = order_temp_pdf['event_body'].str.extract(r'hot_flag:\\s*(\\w+)')[0]\n",
213+
" order_temp_pdf['hot_flag_bool'] = order_temp_pdf['hot_flag'].str.lower() == \"true\"\n",
214+
" hot_max = order_temp_pdf['hot_flag_bool'].iloc[0] \n",
215+
" self.hot_flag.update((hot_max,))\n",
216+
"\n",
217+
" ########################\n",
218+
" ##### Refund Logic #####\n",
219+
" ########################\n",
220+
" \n",
221+
" # 🥶 find if items were cooked too far apart\n",
222+
" if fin_ts - first_ts > timedelta(minutes=20): \n",
223+
" cooking_delay = True\n",
224+
"\n",
225+
" # 🕣 figure out if the order was late being picked up\n",
226+
" if pu_ts - fin_ts > timedelta(minutes=5):\n",
227+
" pickup_delay = True\n",
228+
"\n",
229+
" # 🕣 figure out if the driver is still driving after 45 mins\n",
230+
" if del_ts - pu_ts > timedelta(minutes=45):\n",
231+
" driver_delay = True\n",
232+
"\n",
233+
" ##########################\n",
234+
" ##### State eviction #####\n",
235+
" ##########################\n",
236+
"\n",
237+
" # 💸 if the customer is eligible for a refund, then purge from the state before their order arrives\n",
238+
" if pickup_delay == True and driver_delay == True and price_dec >= 50 and hot_max == True:\n",
239+
" refund_flag = True\n",
240+
" yield pd.DataFrame([{\n",
241+
" \"order_id\": str(key[0]), \n",
242+
" \"fin_ts\": fin_ts, \n",
243+
" \"del_ts\": del_ts,\n",
244+
" \"price\": price_dec,\n",
245+
" \"refund\": refund_flag \n",
246+
" }]) \n",
247+
"\n",
248+
" # 🚗 if the order has been delivered, then purge it from the state anyway \n",
249+
" elif (pdf['event_type'] == 'delivered').any():\n",
250+
" refund_flag = False\n",
251+
" yield pd.DataFrame([{\n",
252+
" \"order_id\": str(key[0]), \n",
253+
" \"fin_ts\": fin_ts, \n",
254+
" \"del_ts\": del_ts,\n",
255+
" \"price\": price_dec,\n",
256+
" \"refund\": refund_flag \n",
257+
" }])\n",
258+
" \n",
259+
" def close(self) -> None:\n",
260+
" pass"
261+
]
262+
},
263+
{
264+
"cell_type": "code",
265+
"execution_count": 0,
266+
"metadata": {
267+
"application/vnd.databricks.v1+cell": {
268+
"cellMetadata": {
269+
"byteLimit": 2048000,
270+
"rowLimit": 10000
271+
},
272+
"inputWidgets": {},
273+
"nuid": "d0cfe63b-d82c-4899-acaa-664b54671bbe",
274+
"showTitle": true,
275+
"tableResultSettingsMap": {},
276+
"title": "Define the output schema"
277+
}
278+
},
279+
"outputs": [],
280+
"source": [
281+
"output_schema = StructType([\n",
282+
" StructField(\"order_id\", StringType()),\n",
283+
" StructField(\"fin_ts\", TimestampType()),\n",
284+
" StructField(\"del_ts\", TimestampType()),\n",
285+
" StructField(\"price\", DecimalType(10,2)),\n",
286+
" StructField(\"refund\", BooleanType())\n",
287+
"])"
288+
]
289+
},
290+
{
291+
"cell_type": "code",
292+
"execution_count": 0,
293+
"metadata": {
294+
"application/vnd.databricks.v1+cell": {
295+
"cellMetadata": {
296+
"byteLimit": 2048000,
297+
"rowLimit": 10000
298+
},
299+
"inputWidgets": {},
300+
"nuid": "c95264fe-c0b9-4a6c-97be-0206c43646dd",
301+
"showTitle": true,
302+
"tableResultSettingsMap": {},
303+
"title": "Let's put it all together!"
304+
}
305+
},
306+
"outputs": [],
307+
"source": [
308+
"display(events_df\n",
309+
" .groupBy(\"order_id\") \n",
310+
" .transformWithStateInPandas(\n",
311+
" EventDataTransformer(),\n",
312+
" outputStructType=output_schema,\n",
313+
" outputMode=\"Append\",\n",
314+
" timeMode=\"None\"\n",
315+
" )\n",
316+
" )"
317+
]
318+
},
319+
{
320+
"cell_type": "code",
321+
"execution_count": 0,
322+
"metadata": {
323+
"application/vnd.databricks.v1+cell": {
324+
"cellMetadata": {
325+
"byteLimit": 2048000,
326+
"rowLimit": 10000
327+
},
328+
"inputWidgets": {},
329+
"nuid": "7898c97b-9867-42c9-b5b0-9b76c6257ca8",
330+
"showTitle": true,
331+
"tableResultSettingsMap": {},
332+
"title": "Visualise"
333+
}
334+
},
335+
"outputs": [],
336+
"source": [
337+
"display(events_df\n",
338+
" .groupBy(\"order_id\") \n",
339+
" .transformWithStateInPandas(\n",
340+
" EventDataTransformer(),\n",
341+
" outputStructType=output_schema,\n",
342+
" outputMode=\"Append\",\n",
343+
" timeMode=\"None\"\n",
344+
" )\n",
345+
" .filter(\"del_ts > current_timestamp() - interval 1 hour\")\n",
346+
" )"
347+
]
348+
}
349+
],
350+
"metadata": {
351+
"application/vnd.databricks.v1+notebook": {
352+
"computePreferences": null,
353+
"dashboards": [],
354+
"environmentMetadata": {
355+
"base_environment": "",
356+
"environment_version": "2"
357+
},
358+
"inputWidgetPreferences": null,
359+
"language": "python",
360+
"notebookMetadata": {
361+
"mostRecentlyExecutedCommandWithImplicitDF": {
362+
"commandId": 5353035250454098,
363+
"dataframes": [
364+
"_sqldf"
365+
]
366+
},
367+
"pythonIndentUnit": 4
368+
},
369+
"notebookName": "Current LDN 2025: transformWithState",
370+
"widgets": {}
371+
},
372+
"language_info": {
373+
"name": "python"
374+
}
375+
},
376+
"nbformat": 4,
377+
"nbformat_minor": 0
378+
}

0 commit comments

Comments
 (0)