Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 26 additions & 2 deletions sdk/src/opendecree/_retry.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ class RetryConfig:
max_backoff: Maximum backoff duration in seconds.
multiplier: Backoff multiplier between attempts.
retryable_codes: gRPC status codes that trigger a retry.
total_timeout: Overall wall-clock budget in seconds shared across all
attempts. When set, backoff sleeps are clipped to the remaining
budget and no further attempt is made once the budget is exhausted.
None means no global limit (original behavior).
"""

max_attempts: int = 3
Expand All @@ -40,6 +44,7 @@ class RetryConfig:
grpc.StatusCode.DEADLINE_EXCEEDED,
)
)
total_timeout: float | None = None


def write_safe_config(base: RetryConfig | None) -> RetryConfig | None:
Expand All @@ -60,6 +65,7 @@ def write_safe_config(base: RetryConfig | None) -> RetryConfig | None:
max_backoff=base.max_backoff,
multiplier=base.multiplier,
retryable_codes=safe_codes,
total_timeout=base.total_timeout,
)


Expand All @@ -68,10 +74,13 @@ def with_retry(config: RetryConfig | None, fn: Callable[[], T]) -> T:
if config is None:
return fn()

deadline = time.monotonic() + config.total_timeout if config.total_timeout is not None else None
last_err: Exception | None = None
backoff = config.initial_backoff

for attempt in range(config.max_attempts):
if deadline is not None and time.monotonic() >= deadline:
break
try:
return fn()
except grpc.RpcError as e:
Expand All @@ -80,7 +89,13 @@ def with_retry(config: RetryConfig | None, fn: Callable[[], T]) -> T:
raise
last_err = e
jitter = random.uniform(0.5, 1.5)
time.sleep(backoff * jitter)
sleep_time = backoff * jitter
if deadline is not None:
remaining = deadline - time.monotonic()
if remaining <= 0:
raise
sleep_time = min(sleep_time, remaining)
time.sleep(sleep_time)
backoff = min(backoff * config.multiplier, config.max_backoff)

raise last_err # type: ignore[misc] # pragma: no cover
Expand All @@ -91,10 +106,13 @@ async def async_with_retry(config: RetryConfig | None, fn: Callable[[], Awaitabl
if config is None:
return await fn()

deadline = time.monotonic() + config.total_timeout if config.total_timeout is not None else None
last_err: Exception | None = None
backoff = config.initial_backoff

for attempt in range(config.max_attempts):
if deadline is not None and time.monotonic() >= deadline:
break
try:
return await fn()
except grpc.aio.AioRpcError as e:
Expand All @@ -103,7 +121,13 @@ async def async_with_retry(config: RetryConfig | None, fn: Callable[[], Awaitabl
raise
last_err = e
jitter = random.uniform(0.5, 1.5)
await asyncio.sleep(backoff * jitter)
sleep_time = backoff * jitter
if deadline is not None:
remaining = deadline - time.monotonic()
if remaining <= 0:
raise
sleep_time = min(sleep_time, remaining)
await asyncio.sleep(sleep_time)
backoff = min(backoff * config.multiplier, config.max_backoff)

raise last_err # type: ignore[misc] # pragma: no cover
74 changes: 74 additions & 0 deletions sdk/tests/test_retry.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,3 +138,77 @@ async def fn() -> str:
with patch("opendecree._retry.asyncio.sleep", new_callable=AsyncMock):
with pytest.raises(grpc.aio.AioRpcError):
await async_with_retry(RetryConfig(max_attempts=2), fn)


# --- Deadline budget ---


def test_deadline_clips_sleep():
"""Sleep is clipped to remaining budget so total wall time stays bounded."""
err = FakeRpcError(grpc.StatusCode.UNAVAILABLE)
fn = MagicMock(side_effect=[err, "ok"])
slept: list[float] = []

with patch("opendecree._retry.time.sleep", side_effect=lambda s: slept.append(s)):
# monotonic: [deadline=0.0, loop-top-0=0.0, remaining=0.05, loop-top-1=0.05]
with patch("opendecree._retry.time.monotonic", side_effect=[0.0, 0.0, 0.05, 0.05]):
result = with_retry(RetryConfig(max_attempts=3, total_timeout=0.1), fn)

assert result == "ok"
assert slept[0] <= 0.05 + 1e-9 # clipped to remaining budget


def test_deadline_exhausted_raises_immediately():
"""When budget is gone after a failure, raises without sleeping."""
err = FakeRpcError(grpc.StatusCode.UNAVAILABLE)
fn = MagicMock(side_effect=err)
slept: list[float] = []

with patch("opendecree._retry.time.sleep", side_effect=lambda s: slept.append(s)):
# monotonic calls: [deadline_start=0.0, loop-top-0=0.0, remaining-check=0.2 (over budget)]
with patch("opendecree._retry.time.monotonic", side_effect=[0.0, 0.0, 0.2]):
with pytest.raises(grpc.RpcError):
with_retry(RetryConfig(max_attempts=3, total_timeout=0.1), fn)

assert slept == []


def test_deadline_already_passed_before_second_attempt():
"""Loop-top deadline check stops further attempts once budget is exhausted."""
err = FakeRpcError(grpc.StatusCode.UNAVAILABLE)
fn = MagicMock(side_effect=[err, "ok"])

with patch("opendecree._retry.time.sleep"):
# monotonic: [deadline=0.0, loop-top-0=0.0, remaining=0.05 (ok), loop-top-1=0.2 (over)]
with patch("opendecree._retry.time.monotonic", side_effect=[0.0, 0.0, 0.05, 0.2]):
with pytest.raises(grpc.RpcError):
with_retry(RetryConfig(max_attempts=3, total_timeout=0.1), fn)

assert fn.call_count == 1


def test_write_safe_config_preserves_total_timeout():
base = RetryConfig(total_timeout=30.0)
safe = write_safe_config(base)
assert safe is not None
assert safe.total_timeout == 30.0


@pytest.mark.asyncio
async def test_async_deadline_exhausted_raises_immediately():
err = FakeRpcError(grpc.StatusCode.UNAVAILABLE)
slept: list[float] = []

async def fn() -> str:
raise err

async def fake_sleep(s: float) -> None:
slept.append(s)

with patch("opendecree._retry.asyncio.sleep", side_effect=fake_sleep):
# monotonic: [deadline_start=0.0, loop-top-0=0.0, remaining-check=0.2 (over budget)]
with patch("opendecree._retry.time.monotonic", side_effect=[0.0, 0.0, 0.2]):
with pytest.raises(grpc.aio.AioRpcError):
await async_with_retry(RetryConfig(max_attempts=3, total_timeout=0.1), fn)

assert slept == []