Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions sdk/src/opendecree/async_watcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import asyncio
import logging
import random
import re
from collections.abc import AsyncIterator, Callable
from typing import Any, TypeVar

Expand All @@ -38,6 +39,8 @@

logger = logging.getLogger("opendecree.async_watcher")

_CONTROL_CHARS_RE = re.compile(r"[^\x20-\x7E]")

T = TypeVar("T")


Expand Down Expand Up @@ -152,9 +155,8 @@ async def start(self) -> None:

await self._load_snapshot()
self._stopped = False
self._task = asyncio.create_task(
self._subscribe_loop(), name=f"decree-watcher-{self._tenant_id}"
)
safe_id = _CONTROL_CHARS_RE.sub("", self._tenant_id)
self._task = asyncio.create_task(self._subscribe_loop(), name=f"decree-watcher-{safe_id}")

async def stop(self) -> None:
"""Stop watching and cancel the background task."""
Expand Down
6 changes: 5 additions & 1 deletion sdk/src/opendecree/watcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import logging
import queue
import random
import re
import threading
import time
from collections.abc import Callable, Iterator
Expand All @@ -37,6 +38,8 @@

logger = logging.getLogger("opendecree.watcher")

_CONTROL_CHARS_RE = re.compile(r"[^\x20-\x7E]")

T = TypeVar("T")


Expand Down Expand Up @@ -156,8 +159,9 @@ def start(self) -> None:

self._load_snapshot()
self._stop_event.clear()
safe_id = _CONTROL_CHARS_RE.sub("", self._tenant_id)
self._thread = threading.Thread(
target=self._subscribe_loop, daemon=True, name=f"decree-watcher-{self._tenant_id}"
target=self._subscribe_loop, daemon=True, name=f"decree-watcher-{safe_id}"
)
self._thread.start()

Expand Down
22 changes: 22 additions & 0 deletions sdk/tests/test_async_watcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -341,3 +341,25 @@ async def empty_stream():
stub.Subscribe.assert_called_once()
_, sub_kwargs = stub.Subscribe.call_args
assert sub_kwargs.get("metadata") == auth_meta

@pytest.mark.asyncio
async def test_task_name_sanitizes_control_chars(self):
stub = MagicMock()
pb2 = MagicMock()
mock_resp = MagicMock()
mock_resp.config.values = []
stub.GetConfig = AsyncMock(return_value=mock_resp)

async def empty_stream():
return
yield

stub.Subscribe.return_value = empty_stream()

w = AsyncConfigWatcher(stub, pb2, "tenant\x00evil\x1f", timeout=5.0)
await w.start()
assert w._task is not None
assert "\x00" not in w._task.get_name()
assert "\x1f" not in w._task.get_name()
assert "tenantevil" in w._task.get_name()
await w.stop()
16 changes: 16 additions & 0 deletions sdk/tests/test_watcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -403,3 +403,19 @@ def cancel(self):
# Thread must have joined within the timeout.
assert not thread_ref.is_alive()
assert w._thread is None

def test_thread_name_sanitizes_control_chars(self):
stub = MagicMock()
pb2 = MagicMock()
mock_resp = MagicMock()
mock_resp.config.values = []
stub.GetConfig.return_value = mock_resp
stub.Subscribe.return_value = iter([])

w = ConfigWatcher(stub, pb2, "tenant\x00evil\x1f", timeout=5.0)
w.start()
assert w._thread is not None
assert "\x00" not in w._thread.name
assert "\x1f" not in w._thread.name
assert "tenantevil" in w._thread.name
w.stop()