Skip to content

Commit 4d33841

Browse files
authored
fix: use keyed_window.keys for task lookup in accumulator task manager (#330)
Signed-off-by: Sreekanth <prsreekanth920@gmail.com>
1 parent a66cdda commit 4d33841

3 files changed

Lines changed: 40 additions & 22 deletions

File tree

packages/pynumaflow/pynumaflow/accumulator/_dtypes.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,9 @@ class WindowOperation(IntEnum):
1717
Enumerate the type of Window operation received.
1818
"""
1919

20-
OPEN = (0,)
21-
CLOSE = (1,)
22-
APPEND = (2,)
20+
OPEN = 0
21+
CLOSE = 1
22+
APPEND = 2
2323

2424

2525
@dataclass(init=False, slots=True)

packages/pynumaflow/pynumaflow/accumulator/servicer/task_manager.py

Lines changed: 17 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
_AccumulatorBuilderClass,
1515
AccumulatorAsyncCallable,
1616
WindowOperation,
17+
AccumulatorRequest,
1718
)
1819
from pynumaflow.proto.accumulator import accumulator_pb2
1920
from pynumaflow.shared.asynciter import NonBlockingIterator
@@ -93,7 +94,7 @@ async def stream_send_eof(self):
9394
for unified_key in task_keys:
9495
await self.tasks[unified_key].iterator.put(STREAM_EOF)
9596

96-
async def close_task(self, req):
97+
async def close_task(self, req: AccumulatorRequest):
9798
"""
9899
Closes a running accumulator task for a given key.
99100
Based on the request we compute the unique key, and then
@@ -104,8 +105,9 @@ async def close_task(self, req):
104105
3. Wait for all the results from the task to be written to the global result queue
105106
4. Remove the task from the tracker
106107
"""
107-
d = req.payload
108-
keys = d.keys
108+
# Use keyed_window.keys for task lookup since payload.keys may be empty
109+
# (e.g., CLOSE operations don't carry data, so payload.keys is not populated).
110+
keys = req.keyed_window.keys
109111
unified_key = build_unique_key_name(keys)
110112
curr_task = self.tasks.get(unified_key, None)
111113

@@ -120,14 +122,16 @@ async def close_task(self, req):
120122
# Put the exception in the result queue
121123
await self.global_result_queue.put(err)
122124

123-
async def create_task(self, req):
125+
async def create_task(self, req: AccumulatorRequest):
124126
"""
125127
Creates a new accumulator task for the given request.
126128
Based on the request we compute a unique key, and then
127129
it creates a new task or appends the request to the existing task.
128130
"""
129131
d = req.payload
130-
keys = d.keys
132+
# Use keyed_window.keys for task lookup — the authoritative key identity
133+
# for the window, consistent across all operation types (OPEN, APPEND, CLOSE).
134+
keys = req.keyed_window.keys
131135
unified_key = build_unique_key_name(keys)
132136
curr_task = self.tasks.get(unified_key, None)
133137

@@ -138,7 +142,7 @@ async def create_task(self, req):
138142
# Create a new result queue for the current task
139143
# We create a new result queue for each task, so that
140144
# the results of the accumulator operation can be sent to the
141-
# the global result queue, which in turn sends the results
145+
# global result queue, which in turn sends the results
142146
# to the client.
143147
res_queue = NonBlockingIterator()
144148

@@ -172,13 +176,14 @@ async def create_task(self, req):
172176
# Put the request in the iterator
173177
await curr_task.iterator.put(d)
174178

175-
async def send_datum_to_task(self, req):
179+
async def send_datum_to_task(self, req: AccumulatorRequest):
176180
"""
177181
Appends the request to the existing window reduce task.
178182
If the task does not exist, create it.
179183
"""
180184
d = req.payload
181-
keys = d.keys
185+
# Use keyed_window.keys for task lookup to match the key used in create_task/close_task.
186+
keys = req.keyed_window.keys
182187
unified_key = build_unique_key_name(keys)
183188
result = self.tasks.get(unified_key, None)
184189
if not result:
@@ -215,9 +220,7 @@ async def __invoke_accumulator(
215220
# Put the exception in the result queue
216221
await self.global_result_queue.put(err)
217222

218-
async def process_input_stream(
219-
self, request_iterator: AsyncIterable[accumulator_pb2.AccumulatorRequest]
220-
):
223+
async def process_input_stream(self, request_iterator: AsyncIterable[AccumulatorRequest]):
221224
# Start iterating through the request iterator and create tasks
222225
# based on the operation type received.
223226
try:
@@ -226,15 +229,15 @@ async def process_input_stream(
226229
request_count += 1
227230
# check whether the request is an open, append, or close operation
228231
match request.operation:
229-
case int(WindowOperation.OPEN):
232+
case WindowOperation.OPEN:
230233
# create a new task for the open operation and
231234
# put the request in the task iterator
232235
await self.create_task(request)
233-
case int(WindowOperation.APPEND):
236+
case WindowOperation.APPEND:
234237
# append the task data to the existing task
235238
# if the task does not exist, create a new task
236239
await self.send_datum_to_task(request)
237-
case int(WindowOperation.CLOSE):
240+
case WindowOperation.CLOSE:
238241
# close the current task for req
239242
await self.close_task(request)
240243
case _:

packages/pynumaflow/tests/accumulator/test_async_accumulator.py

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -30,9 +30,11 @@
3030
def request_generator(count, request, resetkey: bool = False, send_close: bool = False):
3131
for i in range(count):
3232
if resetkey:
33-
# Clear previous keys and add new ones
33+
# Update keys on both payload and keyedWindow to match real platform behavior
3434
del request.payload.keys[:]
3535
request.payload.keys.extend([f"key-{i}"])
36+
del request.operation.keyedWindow.keys[:]
37+
request.operation.keyedWindow.keys.extend([f"key-{i}"])
3638

3739
# Set operation based on index - first is OPEN, rest are APPEND
3840
if i == 0:
@@ -52,9 +54,11 @@ def request_generator(count, request, resetkey: bool = False, send_close: bool =
5254
def request_generator_append_only(count, request, resetkey: bool = False):
5355
for i in range(count):
5456
if resetkey:
55-
# Clear previous keys and add new ones
57+
# Update keys on both payload and keyedWindow to match real platform behavior
5658
del request.payload.keys[:]
5759
request.payload.keys.extend([f"key-{i}"])
60+
del request.operation.keyedWindow.keys[:]
61+
request.operation.keyedWindow.keys.extend([f"key-{i}"])
5862

5963
# Set operation to APPEND for all requests
6064
request.operation.event = accumulator_pb2.AccumulatorRequest.WindowOperation.Event.APPEND
@@ -64,9 +68,11 @@ def request_generator_append_only(count, request, resetkey: bool = False):
6468
def request_generator_mixed(count, request, resetkey: bool = False):
6569
for i in range(count):
6670
if resetkey:
67-
# Clear previous keys and add new ones
71+
# Update keys on both payload and keyedWindow to match real platform behavior
6872
del request.payload.keys[:]
6973
request.payload.keys.extend([f"key-{i}"])
74+
del request.operation.keyedWindow.keys[:]
75+
request.operation.keyedWindow.keys.extend([f"key-{i}"])
7076

7177
if i % 2 == 0:
7278
# Set operation to APPEND for even requests
@@ -107,17 +113,26 @@ def start_request() -> accumulator_pb2.AccumulatorRequest:
107113

108114
def start_request_without_open() -> accumulator_pb2.AccumulatorRequest:
109115
event_time_timestamp, watermark_timestamp = get_time_args()
110-
116+
window = accumulator_pb2.KeyedWindow(
117+
start=mock_interval_window_start(),
118+
end=mock_interval_window_end(),
119+
slot="slot-0",
120+
keys=["test_key"],
121+
)
111122
payload = accumulator_pb2.Payload(
112123
keys=["test_key"],
113124
value=mock_message(),
114125
event_time=event_time_timestamp,
115126
watermark=watermark_timestamp,
116127
id="test_id",
117128
)
118-
129+
operation = accumulator_pb2.AccumulatorRequest.WindowOperation(
130+
event=accumulator_pb2.AccumulatorRequest.WindowOperation.Event.APPEND,
131+
keyedWindow=window,
132+
)
119133
request = accumulator_pb2.AccumulatorRequest(
120134
payload=payload,
135+
operation=operation,
121136
)
122137
return request
123138

0 commit comments

Comments
 (0)