From 460fa86c09682e99ad1a00b81fbce4b25e9cfa0a Mon Sep 17 00:00:00 2001 From: zeevdr Date: Mon, 25 May 2026 22:49:35 +0300 Subject: [PATCH] fix(watcher): bound WatchedField change queue to prevent unbounded memory growth Add max_queue_size parameter (default 1024) to WatchedField and AsyncWatchedField. When the queue is full, the oldest entry is dropped (drop-oldest strategy) and dropped_changes is incremented. A warning is logged on each drop. Matches the TypeScript SDK's queue overflow semantics. Closes #100 Co-Authored-By: Claude Sonnet 4.6 --- sdk/src/opendecree/async_watcher.py | 49 ++++++++++++++-- sdk/src/opendecree/watcher.py | 52 ++++++++++++++--- sdk/tests/test_async_watcher.py | 74 +++++++++++++++++++++++- sdk/tests/test_watcher.py | 90 +++++++++++++++++++++++++++-- 4 files changed, 243 insertions(+), 22 deletions(-) diff --git a/sdk/src/opendecree/async_watcher.py b/sdk/src/opendecree/async_watcher.py index f7d0073..8d5375f 100644 --- a/sdk/src/opendecree/async_watcher.py +++ b/sdk/src/opendecree/async_watcher.py @@ -22,6 +22,7 @@ import logging import random import re +from collections import deque from collections.abc import AsyncIterator, Callable from typing import Any, TypeVar @@ -41,6 +42,8 @@ _CONTROL_CHARS_RE = re.compile(r"[^\x20-\x7E]") +_DEFAULT_MAX_QUEUE_SIZE = 1024 + T = TypeVar("T") @@ -57,15 +60,26 @@ def __init__( default: T, *, on_callback_error: Callable[[Exception], None] | None = None, + max_queue_size: int = _DEFAULT_MAX_QUEUE_SIZE, ) -> None: super().__init__(path, type_, default, on_callback_error=on_callback_error) - self._change_queue: asyncio.Queue[Change | None] = asyncio.Queue() + self._max_queue_size = max_queue_size + self._dropped_changes = 0 + # _change_queue is a deque used as a bounded FIFO. _queue_event gates + # the async changes() iterator when the deque is empty. + self._change_queue: deque[Change | None] = deque() + self._queue_event = asyncio.Event() @property def value(self) -> T: """The current value — always fresh.""" return self._value + @property + def dropped_changes(self) -> int: + """Number of changes dropped because the queue was full.""" + return self._dropped_changes + def __repr__(self) -> str: return f"AsyncWatchedField({self._path!r}, value={self._value!r})" @@ -75,7 +89,14 @@ async def changes(self) -> AsyncIterator[Change]: Yields Change objects until the watcher is stopped. """ while True: - change = await self._change_queue.get() + await self._queue_event.wait() + if not self._change_queue: + # Spurious wake — clear and re-wait. + self._queue_event.clear() + continue + change = self._change_queue.popleft() + if not self._change_queue: + self._queue_event.clear() if change is None: # sentinel return yield change @@ -84,7 +105,18 @@ def _update(self, raw_value: str | None, change: Change) -> None: """Update the field value from a raw string. Called by the watcher task.""" old, new = self._apply_raw(raw_value) self._fire_callbacks(old, new) - self._change_queue.put_nowait(change) + if len(self._change_queue) >= self._max_queue_size: + self._change_queue.popleft() + self._dropped_changes += 1 + logger.warning( + "AsyncWatchedField %r: change queue full (max=%d), oldest entry dropped " + "(total dropped: %d)", + self._path, + self._max_queue_size, + self._dropped_changes, + ) + self._change_queue.append(change) + self._queue_event.set() def _load_initial(self, raw_value: str) -> None: """Set initial value from snapshot. No callbacks fired.""" @@ -92,7 +124,8 @@ def _load_initial(self, raw_value: str) -> None: def _stop(self) -> None: """Signal the changes() iterator to stop.""" - self._change_queue.put_nowait(None) + self._change_queue.append(None) + self._queue_event.set() class AsyncConfigWatcher: @@ -126,6 +159,7 @@ def field( *, default: T, on_callback_error: Callable[[Exception], None] | None = None, + max_queue_size: int = _DEFAULT_MAX_QUEUE_SIZE, ) -> AsyncWatchedField[T]: """Register a field to watch. @@ -138,13 +172,18 @@ def field( on_callback_error: Optional hook called with the exception when an on_change callback raises. If not set, the exception is logged. The hook may re-raise to terminate the watcher's background task. + max_queue_size: Maximum number of unread changes buffered. When the + queue is full, the oldest entry is dropped and ``dropped_changes`` + is incremented. Default: 1024. Returns: An AsyncWatchedField that tracks the live value. """ if self._task is not None: raise RuntimeError("Cannot register fields after watcher has started") - watched = AsyncWatchedField(path, type_, default, on_callback_error=on_callback_error) + watched = AsyncWatchedField( + path, type_, default, on_callback_error=on_callback_error, max_queue_size=max_queue_size + ) self._fields[path] = watched return watched diff --git a/sdk/src/opendecree/watcher.py b/sdk/src/opendecree/watcher.py index 86f3c20..5302559 100644 --- a/sdk/src/opendecree/watcher.py +++ b/sdk/src/opendecree/watcher.py @@ -16,11 +16,11 @@ from __future__ import annotations import logging -import queue import random import re import threading import time +from collections import deque from collections.abc import Callable, Iterator from typing import Any, TypeVar @@ -38,6 +38,8 @@ logger = logging.getLogger("opendecree.watcher") +_DEFAULT_MAX_QUEUE_SIZE = 1024 + _CONTROL_CHARS_RE = re.compile(r"[^\x20-\x7E]") T = TypeVar("T") @@ -56,10 +58,16 @@ def __init__( default: T, *, on_callback_error: Callable[[Exception], None] | None = None, + max_queue_size: int = _DEFAULT_MAX_QUEUE_SIZE, ) -> None: super().__init__(path, type_, default, on_callback_error=on_callback_error) self._lock = threading.Lock() - self._change_queue: queue.Queue[Change] = queue.Queue() + self._max_queue_size = max_queue_size + self._dropped_changes = 0 + # _change_queue is a deque used as a bounded FIFO. _queue_cond is used + # by changes() to block when the deque is empty. + self._change_queue: deque[Change] = deque() + self._queue_cond = threading.Condition(threading.Lock()) @property def value(self) -> T: @@ -67,6 +75,12 @@ def value(self) -> T: with self._lock: return self._value + @property + def dropped_changes(self) -> int: + """Number of changes dropped because the queue was full.""" + with self._queue_cond: + return self._dropped_changes + def __repr__(self) -> str: return f"WatchedField({self._path!r}, value={self.value!r})" @@ -77,10 +91,10 @@ def changes(self) -> Iterator[Change]: Yields Change objects with old_value and new_value as strings. """ while True: - try: - change = self._change_queue.get(timeout=1.0) - except queue.Empty: - continue + with self._queue_cond: + while not self._change_queue: + self._queue_cond.wait(timeout=1.0) + change = self._change_queue.popleft() if change is _SENTINEL_CHANGE: return yield change @@ -90,7 +104,19 @@ def _update(self, raw_value: str | None, change: Change) -> None: with self._lock: old, new = self._apply_raw(raw_value) self._fire_callbacks(old, new) - self._change_queue.put(change) + with self._queue_cond: + if len(self._change_queue) >= self._max_queue_size: + self._change_queue.popleft() + self._dropped_changes += 1 + logger.warning( + "WatchedField %r: change queue full (max=%d), oldest entry dropped " + "(total dropped: %d)", + self._path, + self._max_queue_size, + self._dropped_changes, + ) + self._change_queue.append(change) + self._queue_cond.notify() def _load_initial(self, raw_value: str) -> None: """Set initial value from snapshot. No callbacks fired.""" @@ -99,7 +125,9 @@ def _load_initial(self, raw_value: str) -> None: def _stop(self) -> None: """Signal the changes() iterator to stop.""" - self._change_queue.put(_SENTINEL_CHANGE) + with self._queue_cond: + self._change_queue.append(_SENTINEL_CHANGE) + self._queue_cond.notify() # Sentinel to signal the changes() iterator to stop. @@ -130,6 +158,7 @@ def field( *, default: T, on_callback_error: Callable[[Exception], None] | None = None, + max_queue_size: int = _DEFAULT_MAX_QUEUE_SIZE, ) -> WatchedField[T]: """Register a field to watch. @@ -142,13 +171,18 @@ def field( on_callback_error: Optional hook called with the exception when an on_change callback raises. If not set, the exception is logged. The hook may re-raise to terminate the watcher's background loop. + max_queue_size: Maximum number of unread changes buffered. When the + queue is full, the oldest entry is dropped and ``dropped_changes`` + is incremented. Default: 1024. Returns: A WatchedField that tracks the live value. """ if self._thread is not None: raise RuntimeError("Cannot register fields after watcher has started") - watched = WatchedField(path, type_, default, on_callback_error=on_callback_error) + watched = WatchedField( + path, type_, default, on_callback_error=on_callback_error, max_queue_size=max_queue_size + ) self._fields[path] = watched return watched diff --git a/sdk/tests/test_async_watcher.py b/sdk/tests/test_async_watcher.py index 76171a7..d2a9acf 100644 --- a/sdk/tests/test_async_watcher.py +++ b/sdk/tests/test_async_watcher.py @@ -72,13 +72,15 @@ def test_update_null_resets_to_default(self): @pytest.mark.asyncio async def test_changes_iterator(self): f = AsyncWatchedField("x", str, "") + f._load_initial("a") c1 = Change(field_path="x", old_value="a", new_value="b", version=1) c2 = Change(field_path="x", old_value="b", new_value="c", version=2) - f._change_queue.put_nowait(c1) - f._change_queue.put_nowait(c2) - f._change_queue.put_nowait(None) # sentinel + # Populate via the internal helpers (matching the production path). + f._update("b", c1) + f._update("c", c2) + f._stop() collected = [c async for c in f.changes()] assert len(collected) == 2 @@ -140,6 +142,72 @@ def bad_cb(old: int, new: int) -> None: assert len(errors) == 1 assert isinstance(errors[0], RuntimeError) + # --- Bounded queue tests --- + + def test_dropped_changes_starts_at_zero(self): + f = AsyncWatchedField("x", str, "", max_queue_size=5) + assert f.dropped_changes == 0 + + def test_queue_fills_without_dropping_below_limit(self): + f = AsyncWatchedField("x", str, "", max_queue_size=3) + for i in range(3): + c = Change(field_path="x", old_value=str(i), new_value=str(i + 1), version=i) + f._update(str(i + 1), c) + + assert f.dropped_changes == 0 + assert len(f._change_queue) == 3 + + def test_oldest_entry_dropped_when_queue_full(self): + f = AsyncWatchedField("x", str, "", max_queue_size=3) + for i in range(5): + c = Change(field_path="x", old_value=str(i), new_value=str(i + 1), version=i) + f._update(str(i + 1), c) + + assert f.dropped_changes == 2 + assert len(f._change_queue) == 3 + versions = [c.version for c in f._change_queue] + assert versions == [2, 3, 4] + + def test_drop_logs_warning(self, caplog): + import logging + + f = AsyncWatchedField("payments.fee", str, "", max_queue_size=2) + with caplog.at_level(logging.WARNING, logger="opendecree.async_watcher"): + for i in range(4): + c = Change( + field_path="payments.fee", old_value=str(i), new_value=str(i + 1), version=i + ) + f._update(str(i + 1), c) + + assert f.dropped_changes == 2 + warning_records = [r for r in caplog.records if "dropped" in r.message] + assert len(warning_records) == 2 + assert "payments.fee" in warning_records[0].message + + def test_max_queue_size_constructor_arg(self): + f = AsyncWatchedField("x", str, "", max_queue_size=10) + assert f._max_queue_size == 10 + + def test_default_max_queue_size(self): + from opendecree.async_watcher import _DEFAULT_MAX_QUEUE_SIZE + + f = AsyncWatchedField("x", str, "") + assert f._max_queue_size == _DEFAULT_MAX_QUEUE_SIZE + assert _DEFAULT_MAX_QUEUE_SIZE == 1024 + + @pytest.mark.asyncio + async def test_changes_iterator_after_overflow(self): + f = AsyncWatchedField("x", str, "", max_queue_size=2) + for i in range(4): + c = Change(field_path="x", old_value=str(i), new_value=str(i + 1), version=i) + f._update(str(i + 1), c) + f._stop() + + collected = [c async for c in f.changes()] + assert len(collected) == 2 + assert collected[0].version == 2 + assert collected[1].version == 3 + # --- AsyncConfigWatcher unit tests --- diff --git a/sdk/tests/test_watcher.py b/sdk/tests/test_watcher.py index 3099b4f..ad874be 100644 --- a/sdk/tests/test_watcher.py +++ b/sdk/tests/test_watcher.py @@ -8,7 +8,7 @@ import grpc import pytest -from opendecree.watcher import _SENTINEL_CHANGE, ConfigWatcher, WatchedField +from opendecree.watcher import ConfigWatcher, WatchedField from tests.conftest import FakeRpcError # --- WatchedField unit tests --- @@ -85,16 +85,17 @@ def test_update_null_resets_to_default(self): def test_changes_iterator(self): f = WatchedField("x", str, "") + f._load_initial("a") from opendecree.types import Change c1 = Change(field_path="x", old_value="a", new_value="b", version=1) c2 = Change(field_path="x", old_value="b", new_value="c", version=2) - # Put changes then sentinel. - f._change_queue.put(c1) - f._change_queue.put(c2) - f._change_queue.put(_SENTINEL_CHANGE) + # Put changes then sentinel via the public internal helpers. + f._update("b", c1) + f._update("c", c2) + f._stop() collected = list(f.changes()) assert len(collected) == 2 @@ -164,6 +165,85 @@ def bad_cb(old: int, new: int) -> None: assert len(errors) == 1 assert isinstance(errors[0], RuntimeError) + # --- Bounded queue tests --- + + def test_dropped_changes_starts_at_zero(self): + f = WatchedField("x", str, "", max_queue_size=5) + assert f.dropped_changes == 0 + + def test_queue_fills_without_dropping_below_limit(self): + from opendecree.types import Change + + f = WatchedField("x", str, "", max_queue_size=3) + for i in range(3): + c = Change(field_path="x", old_value=str(i), new_value=str(i + 1), version=i) + f._update(str(i + 1), c) + + assert f.dropped_changes == 0 + assert len(f._change_queue) == 3 + + def test_oldest_entry_dropped_when_queue_full(self): + from opendecree.types import Change + + f = WatchedField("x", str, "", max_queue_size=3) + changes = [ + Change(field_path="x", old_value=str(i), new_value=str(i + 1), version=i) + for i in range(5) + ] + for i, c in enumerate(changes): + f._update(str(i + 1), c) + + # Two oldest entries were dropped. + assert f.dropped_changes == 2 + # Queue still holds exactly max_queue_size entries (the newest 3). + assert len(f._change_queue) == 3 + versions = [c.version for c in f._change_queue] + assert versions == [2, 3, 4] + + def test_drop_logs_warning(self, caplog): + import logging + + from opendecree.types import Change + + f = WatchedField("payments.fee", str, "", max_queue_size=2) + with caplog.at_level(logging.WARNING, logger="opendecree.watcher"): + for i in range(4): + c = Change( + field_path="payments.fee", old_value=str(i), new_value=str(i + 1), version=i + ) + f._update(str(i + 1), c) + + assert f.dropped_changes == 2 + warning_records = [r for r in caplog.records if "dropped" in r.message] + assert len(warning_records) == 2 + assert "payments.fee" in warning_records[0].message + + def test_max_queue_size_constructor_arg(self): + f = WatchedField("x", str, "", max_queue_size=10) + assert f._max_queue_size == 10 + + def test_default_max_queue_size(self): + from opendecree.watcher import _DEFAULT_MAX_QUEUE_SIZE + + f = WatchedField("x", str, "") + assert f._max_queue_size == _DEFAULT_MAX_QUEUE_SIZE + assert _DEFAULT_MAX_QUEUE_SIZE == 1024 + + def test_changes_iterator_after_overflow(self): + from opendecree.types import Change + + f = WatchedField("x", str, "", max_queue_size=2) + for i in range(4): + c = Change(field_path="x", old_value=str(i), new_value=str(i + 1), version=i) + f._update(str(i + 1), c) + f._stop() + + collected = list(f.changes()) + # Only the 2 newest changes survive. + assert len(collected) == 2 + assert collected[0].version == 2 + assert collected[1].version == 3 + # --- ConfigWatcher unit tests ---