diff --git a/packages/pynumaflow/pynumaflow/accumulator/_dtypes.py b/packages/pynumaflow/pynumaflow/accumulator/_dtypes.py index c6899529..1fe65426 100644 --- a/packages/pynumaflow/pynumaflow/accumulator/_dtypes.py +++ b/packages/pynumaflow/pynumaflow/accumulator/_dtypes.py @@ -234,6 +234,7 @@ class AccumulatorResult: "_result_queue", "_consumer_future", "_latest_watermark", + "_close_window", ) _future: Task @@ -242,6 +243,7 @@ class AccumulatorResult: _result_queue: NonBlockingIterator _consumer_future: Task _latest_watermark: datetime + _close_window: KeyedWindow | None @property def future(self) -> Task: @@ -310,6 +312,29 @@ def update_watermark(self, new_watermark: datetime): raise TypeError("new_watermark must be a datetime object") self._latest_watermark = new_watermark + @property + def close_window(self) -> KeyedWindow | None: + """Returns the keyed window from the CLOSE request, if the task was closed. + + Returns: + KeyedWindow | None: The window carried by the CLOSE request, echoed back in + the EOF response; None if the task has not received a CLOSE. + """ + return self._close_window + + def set_close_window(self, window: KeyedWindow): + """Stashes the CLOSE request's keyed window so the EOF response can echo it. + + Args: + window (KeyedWindow): The keyed window from the CLOSE request. + + Raises: + TypeError: If window is not a KeyedWindow object. + """ + if not isinstance(window, KeyedWindow): + raise TypeError("window must be a KeyedWindow object") + self._close_window = window + @dataclass class AccumulatorRequest: diff --git a/packages/pynumaflow/pynumaflow/accumulator/servicer/task_manager.py b/packages/pynumaflow/pynumaflow/accumulator/servicer/task_manager.py index ee14e3ea..8f00f474 100644 --- a/packages/pynumaflow/pynumaflow/accumulator/servicer/task_manager.py +++ b/packages/pynumaflow/pynumaflow/accumulator/servicer/task_manager.py @@ -1,6 +1,6 @@ import asyncio from collections.abc import AsyncIterable -from datetime import datetime +from datetime import datetime, timezone from google.protobuf import timestamp_pb2 from pynumaflow._constants import ( @@ -112,6 +112,10 @@ async def close_task(self, req: AccumulatorRequest): curr_task = self.tasks.get(unified_key, None) if curr_task: + # Stash the CLOSE request's keyed window BEFORE signalling EOF so the task's + # consumer (write_to_global_queue) can echo it back in the EOF response. Core + # uses the echoed window to identify and garbage-collect the closed window. + curr_task.set_close_window(req.keyed_window) await self.tasks[unified_key].iterator.put(STREAM_EOF) await curr_task.future await curr_task.consumer_future @@ -167,7 +171,7 @@ async def create_task(self, req: AccumulatorRequest): # Create a new AccumulatorResult object to store the task information curr_task = AccumulatorResult( - task, niter, keys, res_queue, consumer, datetime.fromtimestamp(-1) + task, niter, keys, res_queue, consumer, datetime.fromtimestamp(-1), None ) # Save the result of the accumulator operation to the task list @@ -318,7 +322,7 @@ async def write_to_global_queue( watermark_pb.FromDatetime(msg.watermark) start_dt_pb = timestamp_pb2.Timestamp() - start_dt_pb.FromDatetime(datetime.fromtimestamp(0)) + start_dt_pb.FromDatetime(datetime.fromtimestamp(0, tz=timezone.utc)) end_dt_pb = timestamp_pb2.Timestamp() end_dt_pb.FromDatetime(wm) @@ -339,17 +343,39 @@ async def write_to_global_queue( tags=msg.tags, ) await output_queue.put(res) - # send EOF - start_eof_pb = timestamp_pb2.Timestamp() - start_eof_pb.FromDatetime(datetime.fromtimestamp(0)) + # Send EOF. Echo the CLOSE request's keyed window (start/end/slot/keys) so core can + # match the EOF to the window it is closing and garbage-collect it. Mirrors the + # numaflow-rs accumulator behavior (PR #177). + close_window = task.close_window + if close_window is not None: + start_eof_pb = timestamp_pb2.Timestamp() + start_eof_pb.FromDatetime(close_window.start) + + end_eof_pb = timestamp_pb2.Timestamp() + end_eof_pb.FromDatetime(close_window.end) + + eof_window = accumulator_pb2.KeyedWindow( + start=start_eof_pb, + end=end_eof_pb, + slot=close_window.slot, + keys=close_window.keys, + ) + else: + # Fallback for the stream-close/shutdown path (no CLOSE request, e.g. + # stream_send_eof on SIGTERM): synthesize the window from epoch(0) and the + # latest watermark, preserving prior behavior. + start_eof_pb = timestamp_pb2.Timestamp() + start_eof_pb.FromDatetime(datetime.fromtimestamp(0, tz=timezone.utc)) - end_eof_pb = timestamp_pb2.Timestamp() - end_eof_pb.FromDatetime(wm) + end_eof_pb = timestamp_pb2.Timestamp() + end_eof_pb.FromDatetime(wm) - res = accumulator_pb2.AccumulatorResponse( - window=accumulator_pb2.KeyedWindow( + eof_window = accumulator_pb2.KeyedWindow( start=start_eof_pb, end=end_eof_pb, slot="slot-0", keys=task.keys - ), + ) + + res = accumulator_pb2.AccumulatorResponse( + window=eof_window, EOF=True, ) await output_queue.put(res) diff --git a/packages/pynumaflow/tests/accumulator/test_async_accumulator.py b/packages/pynumaflow/tests/accumulator/test_async_accumulator.py index eb499cdf..85feac0d 100644 --- a/packages/pynumaflow/tests/accumulator/test_async_accumulator.py +++ b/packages/pynumaflow/tests/accumulator/test_async_accumulator.py @@ -4,6 +4,7 @@ import grpc import pytest from google.protobuf import empty_pb2 as _empty_pb2 +from google.protobuf import timestamp_pb2 from pynumaflow import setup_logging from pynumaflow.accumulator import ( @@ -87,6 +88,38 @@ def request_generator_mixed(count, request, resetkey: bool = False): yield request +# Distinct, recognizable close-window values used to prove the EOF echoes the +# CLOSE request's window rather than the synthesized fallback window. +CLOSE_WINDOW_START_SECONDS = 1000000000 +CLOSE_WINDOW_END_SECONDS = 2000000000 +CLOSE_WINDOW_SLOT = "slot-7" + + +def request_generator_custom_close(count, request): + """Yields OPEN + APPEND requests, then a CLOSE whose keyed window carries + distinct start/end/slot values (mirroring core sending a real + max_event_time + timeout window on close).""" + for i in range(count): + if i == 0: + request.operation.event = accumulator_pb2.AccumulatorRequest.WindowOperation.Event.OPEN + else: + request.operation.event = ( + accumulator_pb2.AccumulatorRequest.WindowOperation.Event.APPEND + ) + yield request + + # CLOSE carrying a distinct keyed window to be echoed back in the EOF response. + request.operation.event = accumulator_pb2.AccumulatorRequest.WindowOperation.Event.CLOSE + request.operation.keyedWindow.start.CopyFrom( + timestamp_pb2.Timestamp(seconds=CLOSE_WINDOW_START_SECONDS) + ) + request.operation.keyedWindow.end.CopyFrom( + timestamp_pb2.Timestamp(seconds=CLOSE_WINDOW_END_SECONDS) + ) + request.operation.keyedWindow.slot = CLOSE_WINDOW_SLOT + yield request + + def start_request() -> accumulator_pb2.AccumulatorRequest: event_time_timestamp, watermark_timestamp = get_time_args() window = accumulator_pb2.KeyedWindow( @@ -289,6 +322,52 @@ def test_accumulate_with_close(accumulator_stub) -> None: assert 1 == eof_count +def test_accumulate_close_echoes_eof_window(accumulator_stub) -> None: + """The EOF response must echo the exact KeyedWindow from the CLOSE request.""" + request = start_request() + generator_response = None + try: + generator_response = accumulator_stub.AccumulateFn( + request_iterator=request_generator_custom_close(count=5, request=request) + ) + except grpc.RpcError as e: + logging.error(e) + + eof_count = 0 + for r in generator_response: + if r.EOF: + eof_count += 1 + assert r.window.start.seconds == CLOSE_WINDOW_START_SECONDS + assert r.window.end.seconds == CLOSE_WINDOW_END_SECONDS + assert r.window.slot == CLOSE_WINDOW_SLOT + assert list(r.window.keys) == ["test_key"] + + assert 1 == eof_count + + +def test_accumulate_eof_window_fallback_without_close(accumulator_stub) -> None: + """When the stream closes without a CLOSE (e.g. shutdown), the EOF window falls + back to the synthesized window (start=epoch(0), slot='slot-0').""" + request = start_request() + generator_response = None + try: + generator_response = accumulator_stub.AccumulateFn( + request_iterator=request_generator(count=5, request=request, send_close=False) + ) + except grpc.RpcError as e: + logging.error(e) + + eof_count = 0 + for r in generator_response: + if r.EOF: + eof_count += 1 + assert r.window.start.seconds == 0 + assert r.window.slot == "slot-0" + assert list(r.window.keys) == ["test_key"] + + assert 1 == eof_count + + def test_accumulate_append_without_open(accumulator_stub) -> None: request = start_request_without_open() generator_response = None diff --git a/packages/pynumaflow/tests/accumulator/test_datatypes.py b/packages/pynumaflow/tests/accumulator/test_datatypes.py index 0f829024..5561c1e1 100644 --- a/packages/pynumaflow/tests/accumulator/test_datatypes.py +++ b/packages/pynumaflow/tests/accumulator/test_datatypes.py @@ -149,7 +149,9 @@ def test_accumulator_result_create(): consumer_future = None # In real usage, this would be an asyncio.Task watermark = datetime.fromtimestamp(1662998400, timezone.utc) - result = AccumulatorResult(future, iterator, keys, result_queue, consumer_future, watermark) + result = AccumulatorResult( + future, iterator, keys, result_queue, consumer_future, watermark, None + ) assert result.future == future assert result.iterator == iterator @@ -161,7 +163,7 @@ def test_accumulator_result_create(): def test_accumulator_result_update_watermark(): result = AccumulatorResult( - None, None, [], None, None, datetime.fromtimestamp(1662998400, timezone.utc) + None, None, [], None, None, datetime.fromtimestamp(1662998400, timezone.utc), None ) new_watermark = datetime.fromtimestamp(1662998460, timezone.utc) result.update_watermark(new_watermark) @@ -170,7 +172,7 @@ def test_accumulator_result_update_watermark(): def test_accumulator_result_update_watermark_invalid_type(): result = AccumulatorResult( - None, None, [], None, None, datetime.fromtimestamp(1662998400, timezone.utc) + None, None, [], None, None, datetime.fromtimestamp(1662998400, timezone.utc), None ) with pytest.raises(TypeError): result.update_watermark("not a datetime")