Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions packages/pynumaflow/pynumaflow/accumulator/_dtypes.py
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,7 @@ class AccumulatorResult:
"_result_queue",
"_consumer_future",
"_latest_watermark",
"_close_window",
)

_future: Task
Expand All @@ -242,6 +243,7 @@ class AccumulatorResult:
_result_queue: NonBlockingIterator
_consumer_future: Task
_latest_watermark: datetime
_close_window: KeyedWindow | None

@property
def future(self) -> Task:
Expand Down Expand Up @@ -310,6 +312,29 @@ def update_watermark(self, new_watermark: datetime):
raise TypeError("new_watermark must be a datetime object")
self._latest_watermark = new_watermark

@property
def close_window(self) -> KeyedWindow | None:
"""Returns the keyed window from the CLOSE request, if the task was closed.

Returns:
KeyedWindow | None: The window carried by the CLOSE request, echoed back in
the EOF response; None if the task has not received a CLOSE.
"""
return self._close_window

def set_close_window(self, window: KeyedWindow):
"""Stashes the CLOSE request's keyed window so the EOF response can echo it.

Args:
window (KeyedWindow): The keyed window from the CLOSE request.

Raises:
TypeError: If window is not a KeyedWindow object.
"""
if not isinstance(window, KeyedWindow):
raise TypeError("window must be a KeyedWindow object")
self._close_window = window


@dataclass
class AccumulatorRequest:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import asyncio
from collections.abc import AsyncIterable
from datetime import datetime
from datetime import datetime, timezone

from google.protobuf import timestamp_pb2
from pynumaflow._constants import (
Expand Down Expand Up @@ -112,6 +112,10 @@ async def close_task(self, req: AccumulatorRequest):
curr_task = self.tasks.get(unified_key, None)

if curr_task:
# Stash the CLOSE request's keyed window BEFORE signalling EOF so the task's
# consumer (write_to_global_queue) can echo it back in the EOF response. Core
# uses the echoed window to identify and garbage-collect the closed window.
curr_task.set_close_window(req.keyed_window)
await self.tasks[unified_key].iterator.put(STREAM_EOF)
await curr_task.future
await curr_task.consumer_future
Expand Down Expand Up @@ -167,7 +171,7 @@ async def create_task(self, req: AccumulatorRequest):

# Create a new AccumulatorResult object to store the task information
curr_task = AccumulatorResult(
task, niter, keys, res_queue, consumer, datetime.fromtimestamp(-1)
task, niter, keys, res_queue, consumer, datetime.fromtimestamp(-1), None
)

# Save the result of the accumulator operation to the task list
Expand Down Expand Up @@ -318,7 +322,7 @@ async def write_to_global_queue(
watermark_pb.FromDatetime(msg.watermark)

start_dt_pb = timestamp_pb2.Timestamp()
start_dt_pb.FromDatetime(datetime.fromtimestamp(0))
start_dt_pb.FromDatetime(datetime.fromtimestamp(0, tz=timezone.utc))

end_dt_pb = timestamp_pb2.Timestamp()
end_dt_pb.FromDatetime(wm)
Expand All @@ -339,17 +343,39 @@ async def write_to_global_queue(
tags=msg.tags,
)
await output_queue.put(res)
# send EOF
start_eof_pb = timestamp_pb2.Timestamp()
start_eof_pb.FromDatetime(datetime.fromtimestamp(0))
# Send EOF. Echo the CLOSE request's keyed window (start/end/slot/keys) so core can
# match the EOF to the window it is closing and garbage-collect it. Mirrors the
# numaflow-rs accumulator behavior (PR #177).
close_window = task.close_window
if close_window is not None:
start_eof_pb = timestamp_pb2.Timestamp()
start_eof_pb.FromDatetime(close_window.start)

end_eof_pb = timestamp_pb2.Timestamp()
end_eof_pb.FromDatetime(close_window.end)

eof_window = accumulator_pb2.KeyedWindow(
start=start_eof_pb,
end=end_eof_pb,
slot=close_window.slot,
keys=close_window.keys,
)
else:
# Fallback for the stream-close/shutdown path (no CLOSE request, e.g.
# stream_send_eof on SIGTERM): synthesize the window from epoch(0) and the
# latest watermark, preserving prior behavior.
start_eof_pb = timestamp_pb2.Timestamp()
start_eof_pb.FromDatetime(datetime.fromtimestamp(0, tz=timezone.utc))

end_eof_pb = timestamp_pb2.Timestamp()
end_eof_pb.FromDatetime(wm)
end_eof_pb = timestamp_pb2.Timestamp()
end_eof_pb.FromDatetime(wm)

res = accumulator_pb2.AccumulatorResponse(
window=accumulator_pb2.KeyedWindow(
eof_window = accumulator_pb2.KeyedWindow(
start=start_eof_pb, end=end_eof_pb, slot="slot-0", keys=task.keys
),
)

res = accumulator_pb2.AccumulatorResponse(
window=eof_window,
EOF=True,
)
await output_queue.put(res)
Expand Down
79 changes: 79 additions & 0 deletions packages/pynumaflow/tests/accumulator/test_async_accumulator.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import grpc
import pytest
from google.protobuf import empty_pb2 as _empty_pb2
from google.protobuf import timestamp_pb2

from pynumaflow import setup_logging
from pynumaflow.accumulator import (
Expand Down Expand Up @@ -87,6 +88,38 @@ def request_generator_mixed(count, request, resetkey: bool = False):
yield request


# Distinct, recognizable close-window values used to prove the EOF echoes the
# CLOSE request's window rather than the synthesized fallback window.
CLOSE_WINDOW_START_SECONDS = 1000000000
CLOSE_WINDOW_END_SECONDS = 2000000000
CLOSE_WINDOW_SLOT = "slot-7"


def request_generator_custom_close(count, request):
"""Yields OPEN + APPEND requests, then a CLOSE whose keyed window carries
distinct start/end/slot values (mirroring core sending a real
max_event_time + timeout window on close)."""
for i in range(count):
if i == 0:
request.operation.event = accumulator_pb2.AccumulatorRequest.WindowOperation.Event.OPEN
else:
request.operation.event = (
accumulator_pb2.AccumulatorRequest.WindowOperation.Event.APPEND
)
yield request

# CLOSE carrying a distinct keyed window to be echoed back in the EOF response.
request.operation.event = accumulator_pb2.AccumulatorRequest.WindowOperation.Event.CLOSE
request.operation.keyedWindow.start.CopyFrom(
timestamp_pb2.Timestamp(seconds=CLOSE_WINDOW_START_SECONDS)
)
request.operation.keyedWindow.end.CopyFrom(
timestamp_pb2.Timestamp(seconds=CLOSE_WINDOW_END_SECONDS)
)
request.operation.keyedWindow.slot = CLOSE_WINDOW_SLOT
yield request


def start_request() -> accumulator_pb2.AccumulatorRequest:
event_time_timestamp, watermark_timestamp = get_time_args()
window = accumulator_pb2.KeyedWindow(
Expand Down Expand Up @@ -289,6 +322,52 @@ def test_accumulate_with_close(accumulator_stub) -> None:
assert 1 == eof_count


def test_accumulate_close_echoes_eof_window(accumulator_stub) -> None:
"""The EOF response must echo the exact KeyedWindow from the CLOSE request."""
request = start_request()
generator_response = None
try:
generator_response = accumulator_stub.AccumulateFn(
request_iterator=request_generator_custom_close(count=5, request=request)
)
except grpc.RpcError as e:
logging.error(e)

eof_count = 0
for r in generator_response:
if r.EOF:
eof_count += 1
assert r.window.start.seconds == CLOSE_WINDOW_START_SECONDS
assert r.window.end.seconds == CLOSE_WINDOW_END_SECONDS
assert r.window.slot == CLOSE_WINDOW_SLOT
assert list(r.window.keys) == ["test_key"]

assert 1 == eof_count


def test_accumulate_eof_window_fallback_without_close(accumulator_stub) -> None:
"""When the stream closes without a CLOSE (e.g. shutdown), the EOF window falls
back to the synthesized window (start=epoch(0), slot='slot-0')."""
request = start_request()
generator_response = None
try:
generator_response = accumulator_stub.AccumulateFn(
request_iterator=request_generator(count=5, request=request, send_close=False)
)
except grpc.RpcError as e:
logging.error(e)

eof_count = 0
for r in generator_response:
if r.EOF:
eof_count += 1
assert r.window.start.seconds == 0
assert r.window.slot == "slot-0"
assert list(r.window.keys) == ["test_key"]

assert 1 == eof_count


def test_accumulate_append_without_open(accumulator_stub) -> None:
request = start_request_without_open()
generator_response = None
Expand Down
8 changes: 5 additions & 3 deletions packages/pynumaflow/tests/accumulator/test_datatypes.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,9 @@ def test_accumulator_result_create():
consumer_future = None # In real usage, this would be an asyncio.Task
watermark = datetime.fromtimestamp(1662998400, timezone.utc)

result = AccumulatorResult(future, iterator, keys, result_queue, consumer_future, watermark)
result = AccumulatorResult(
future, iterator, keys, result_queue, consumer_future, watermark, None
)

assert result.future == future
assert result.iterator == iterator
Expand All @@ -161,7 +163,7 @@ def test_accumulator_result_create():

def test_accumulator_result_update_watermark():
result = AccumulatorResult(
None, None, [], None, None, datetime.fromtimestamp(1662998400, timezone.utc)
None, None, [], None, None, datetime.fromtimestamp(1662998400, timezone.utc), None
)
new_watermark = datetime.fromtimestamp(1662998460, timezone.utc)
result.update_watermark(new_watermark)
Expand All @@ -170,7 +172,7 @@ def test_accumulator_result_update_watermark():

def test_accumulator_result_update_watermark_invalid_type():
result = AccumulatorResult(
None, None, [], None, None, datetime.fromtimestamp(1662998400, timezone.utc)
None, None, [], None, None, datetime.fromtimestamp(1662998400, timezone.utc), None
)
with pytest.raises(TypeError):
result.update_watermark("not a datetime")
Expand Down
Loading