Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 66 additions & 0 deletions sdk/src/opendecree/_watcher_base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
"""Shared base class for WatchedField and AsyncWatchedField."""

from __future__ import annotations

import logging
from collections.abc import Callable
from typing import Generic, TypeVar

from opendecree._convert import convert_value

T = TypeVar("T")

_logger = logging.getLogger("opendecree.watcher")

_RECONNECT_INITIAL = 1.0
_RECONNECT_MAX = 30.0
_RECONNECT_MULTIPLIER = 2.0


class _WatchedFieldBase(Generic[T]):
"""Common state and helpers shared by WatchedField and AsyncWatchedField."""

def __init__(self, path: str, type_: type[T], default: T) -> None:
self._path = path
self._type = type_
self._default = default
self._value: T = default
self._is_set = False
self._callbacks: list[Callable[[T, T], None]] = []

@property
def path(self) -> str:
"""The field path this value tracks."""
return self._path

def on_change(self, fn: Callable[[T, T], None]) -> Callable[[T, T], None]:
"""Register a callback for value changes. Can be used as a decorator.

The callback receives (old_value, new_value).
"""
self._callbacks.append(fn)
return fn

def __bool__(self) -> bool:
"""Truthy based on the current value. False for False, 0, '', None."""
return bool(self._value)

def _apply_raw(self, raw_value: str | None) -> tuple[T, T]:
"""Set _value/_is_set from a raw string. Returns (old, new). Caller must lock if needed."""
old = self._value
if raw_value is not None:
self._value = convert_value(raw_value, self._type) # type: ignore[assignment]
self._is_set = True
else:
self._value = self._default
self._is_set = False
return old, self._value

def _fire_callbacks(self, old: T, new: T) -> None:
"""Invoke registered callbacks when the value changes."""
if old != new:
for cb in self._callbacks:
try:
cb(old, new)
except Exception:
_logger.exception("Error in on_change callback for %s", self._path)
67 changes: 15 additions & 52 deletions sdk/src/opendecree/async_watcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,66 +21,44 @@
import asyncio
import logging
import random
from collections.abc import AsyncIterator, Callable
from typing import Any, Generic, TypeVar
from collections.abc import AsyncIterator
from typing import Any, TypeVar

import grpc.aio

from opendecree._convert import convert_value, typed_value_to_string
from opendecree._convert import typed_value_to_string
from opendecree._stubs import process_get_all_response
from opendecree._watcher_base import (
_RECONNECT_INITIAL,
_RECONNECT_MAX,
_RECONNECT_MULTIPLIER,
_WatchedFieldBase,
)
from opendecree.types import Change

logger = logging.getLogger("opendecree.async_watcher")

T = TypeVar("T")

# Default reconnect backoff parameters.
_RECONNECT_INITIAL = 1.0
_RECONNECT_MAX = 30.0
_RECONNECT_MULTIPLIER = 2.0


class AsyncWatchedField(Generic[T]):
"""A live, thread-safe configuration field with a typed value (async variant).
class AsyncWatchedField(_WatchedFieldBase[T]):
"""A live configuration field with a typed value (async variant).

Updated automatically by the watcher's asyncio task.
"""

def __init__(self, path: str, type_: type[T], default: T) -> None:
self._path = path
self._type = type_
self._default = default
self._value: T = default
self._is_set = False
self._callbacks: list[Callable[[T, T], None]] = []
super().__init__(path, type_, default)
self._change_queue: asyncio.Queue[Change | None] = asyncio.Queue()

@property
def path(self) -> str:
"""The field path this value tracks."""
return self._path

@property
def value(self) -> T:
"""The current value — always fresh."""
return self._value

def __bool__(self) -> bool:
"""Truthy based on the current value. False for False, 0, '', None."""
return bool(self._value)

def __repr__(self) -> str:
return f"AsyncWatchedField({self._path!r}, value={self._value!r})"

def on_change(self, fn: Callable[[T, T], None]) -> Callable[[T, T], None]:
"""Register a callback for value changes. Can be used as a decorator.

The callback receives (old_value, new_value) and is called from the
watcher's asyncio task.
"""
self._callbacks.append(fn)
return fn

async def changes(self) -> AsyncIterator[Change]:
"""Async iterator that yields Change events for this field.

Expand All @@ -94,28 +72,13 @@ async def changes(self) -> AsyncIterator[Change]:

def _update(self, raw_value: str | None, change: Change) -> None:
"""Update the field value from a raw string. Called by the watcher task."""
old = self._value
if raw_value is not None:
self._value = convert_value(raw_value, self._type) # type: ignore[assignment]
self._is_set = True
else:
self._value = self._default
self._is_set = False

new = self._value
if old != new:
for cb in self._callbacks:
try:
cb(old, new)
except Exception:
logger.exception("Error in on_change callback for %s", self._path)

old, new = self._apply_raw(raw_value)
self._fire_callbacks(old, new)
self._change_queue.put_nowait(change)

def _load_initial(self, raw_value: str) -> None:
"""Set initial value from snapshot. No callbacks fired."""
self._value = convert_value(raw_value, self._type) # type: ignore[assignment]
self._is_set = True
self._apply_raw(raw_value)

def _stop(self) -> None:
"""Signal the changes() iterator to stop."""
Expand Down
66 changes: 14 additions & 52 deletions sdk/src/opendecree/watcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,68 +20,46 @@
import random
import threading
import time
from collections.abc import Callable, Iterator
from typing import Any, Generic, TypeVar
from collections.abc import Iterator
from typing import Any, TypeVar

import grpc

from opendecree._convert import convert_value, typed_value_to_string
from opendecree._convert import typed_value_to_string
from opendecree._stubs import process_get_all_response
from opendecree._watcher_base import (
_RECONNECT_INITIAL,
_RECONNECT_MAX,
_RECONNECT_MULTIPLIER,
_WatchedFieldBase,
)
from opendecree.types import Change

logger = logging.getLogger("opendecree.watcher")

T = TypeVar("T")

# Default reconnect backoff parameters.
_RECONNECT_INITIAL = 1.0
_RECONNECT_MAX = 30.0
_RECONNECT_MULTIPLIER = 2.0


class WatchedField(Generic[T]):
class WatchedField(_WatchedFieldBase[T]):
"""A live, thread-safe configuration field with a typed value.

Attributes are updated automatically by the watcher's background thread.
"""

def __init__(self, path: str, type_: type[T], default: T) -> None:
self._path = path
self._type = type_
self._default = default
self._value: T = default
self._is_set = False
super().__init__(path, type_, default)
self._lock = threading.Lock()
self._callbacks: list[Callable[[T, T], None]] = []
self._change_queue: queue.Queue[Change] = queue.Queue()

@property
def path(self) -> str:
"""The field path this value tracks."""
return self._path

@property
def value(self) -> T:
"""The current value — always fresh, thread-safe."""
with self._lock:
return self._value

def __bool__(self) -> bool:
"""Truthy based on the current value. False for False, 0, '', None."""
return bool(self.value)

def __repr__(self) -> str:
return f"WatchedField({self._path!r}, value={self.value!r})"

def on_change(self, fn: Callable[[T, T], None]) -> Callable[[T, T], None]:
"""Register a callback for value changes. Can be used as a decorator.

The callback receives (old_value, new_value) and is called from the
watcher's background thread.
"""
self._callbacks.append(fn)
return fn

def changes(self) -> Iterator[Change]:
"""Blocking iterator that yields Change events for this field.

Expand All @@ -100,30 +78,14 @@ def changes(self) -> Iterator[Change]:
def _update(self, raw_value: str | None, change: Change) -> None:
"""Update the field value from a raw string. Called by the watcher thread."""
with self._lock:
old = self._value
if raw_value is not None:
self._value = convert_value(raw_value, self._type) # type: ignore[assignment]
self._is_set = True
else:
self._value = self._default
self._is_set = False

# Notify callbacks (outside the lock to avoid deadlocks).
new = self._value
if old != new:
for cb in self._callbacks:
try:
cb(old, new)
except Exception:
logger.exception("Error in on_change callback for %s", self._path)

old, new = self._apply_raw(raw_value)
self._fire_callbacks(old, new)
self._change_queue.put(change)

def _load_initial(self, raw_value: str) -> None:
"""Set initial value from snapshot. No callbacks fired."""
with self._lock:
self._value = convert_value(raw_value, self._type) # type: ignore[assignment]
self._is_set = True
self._apply_raw(raw_value)

def _stop(self) -> None:
"""Signal the changes() iterator to stop."""
Expand Down