Skip to content

feat(pipeline): adaptive per-endpoint SPARQL timeouts#421

Merged
ddeboer merged 6 commits into
mainfrom
feat/adaptive-timeout-policy
May 28, 2026
Merged

feat(pipeline): adaptive per-endpoint SPARQL timeouts#421
ddeboer merged 6 commits into
mainfrom
feat/adaptive-timeout-policy

Conversation

@ddeboer
Copy link
Copy Markdown
Member

@ddeboer ddeboer commented May 28, 2026

Summary

Adds adaptive per-endpoint SPARQL timeouts to @lde/pipeline, so a single failing third-party endpoint can no longer hold up the whole DKG run.

  • New TimeoutPolicy interface plus two implementations: ConstantTimeoutPolicy (current behaviour, the implicit default) and AdaptiveTimeoutPolicy (per-endpoint state machine that tightens to a short budget after threshold consecutive timeouts and relaxes on the next ok). Factory helpers constantTimeoutPolicy / adaptiveTimeoutPolicy are exported.
  • SparqlConstructExecutor and SparqlItemSelector now install a per-call AbortSignal derived from the policy. Each attempt inside pRetry calls beforeRequest, runs the request, classifies the outcome (HTTP 504 and AbortError / TimeoutErrortimeout, others → error), and reports via afterRequest. The executor’s timeout: number option is replaced by timeoutPolicy?: TimeoutPolicy (breaking, pre-release per AGENTS.md).
  • PipelineOptions.timeoutPolicy?: () => TimeoutPolicy is invoked once per dataset and threaded through every stage / executor / selector via RunOptions, ExecuteOptions, and SelectOptions.
  • ProgressReporter gains optional timeoutTightened / timeoutRelaxed hooks. The pipeline subscribes to the policy at each dataset boundary and forwards transitions. ConsoleReporter prints ↘ Tightened / ↗ Relaxed lines so operators can tell a fast-failed stage from an unexpected speedup.
  • Migrates pipeline-void and pipeline-shacl-sampler to the new timeoutPolicy field.
  • Documents the mechanism in the pipeline README.

Behaviour is unchanged for callers that don’t supply a policy — the implicit default is constantTimeoutPolicy(300_000).

Fix #419

ddeboer added 6 commits May 28, 2026 13:39
- Introduce TimeoutPolicy interface with ConstantTimeoutPolicy and
  AdaptiveTimeoutPolicy implementations, plus matching factories
- Inject per-call AbortSignal in SparqlConstructExecutor and
  SparqlItemSelector; classify HTTP 504 and AbortError as 'timeout'
- Thread a per-dataset TimeoutPolicy through PipelineOptions → Stage →
  executors/selectors via ExecuteOptions and SelectOptions
- Forward tighten/relax transitions to ProgressReporter; ConsoleReporter
  prints them
- Breaking: SparqlConstructExecutorOptions.timeout is replaced by
  timeoutPolicy; migrate pipeline-void and pipeline-shacl-sampler
- Document the mechanism in the pipeline README and ADR 0003
Issue #419 already captures the rationale and decisions; the ADR
duplicated rather than added context.
- default → defaultMs
- short → tightenedMs
- tightenAfterTimeouts replaces threshold

The new names carry explicit units and echo the tighten/relax
vocabulary used by the state machine, the transition events, and the
console output.
- PipelineOptions.timeout (factory)
- ExecuteOptions.timeout / SelectOptions.timeout / RunOptions.timeout
- SparqlConstructExecutorOptions.timeout / SparqlItemSelectorOptions.timeout
- Internal Pipeline.timeoutFactory field follows suit

The option's purpose is configuring the timeout; the value's *type* is
TimeoutPolicy. Naming the field after its purpose is shorter, less
stuttery, and reads cleaner at call sites:
  timeout: adaptiveTimeoutPolicy({...})

Also expands the README's adaptive-timeouts section to define the
healthy/tightened states upfront and tabulate outcome classification.
Timeouts now live exclusively at the Pipeline level. Drop:

- SparqlConstructExecutorOptions.timeout
- SparqlItemSelectorOptions.timeout
- VoidStageOptions.timeout (would have been silently overridden)
- ShaclSampleStagesOptions.timeout (same)

The fallback budget when no PipelineOptions.timeout is supplied is now a
module-level ConstantTimeoutPolicy(300_000). The old executor/selector
options were redundant: at runtime, Stage always forwarded the per-
dataset policy from Pipeline, which silently replaced any executor-author
ceiling. Removing the option fixes that footgun and pushes operators
toward the right altitude for timeout configuration.
…ough adapter selectors

classSelector and subjectSelector wrapped SparqlItemSelector but dropped
the third `SelectOptions` argument that Stage now threads through. As a
result, the Pipeline's per-dataset TimeoutPolicy never reached selector
requests for VoID class partitioning or SHACL subject sampling, so
adaptive tightening silently ignored those calls.

Also clarifies the JSDoc on SparqlConstructExecutorOptions.fetcher: a
user-supplied fetcher bypasses the policy budget — the policy hooks
still fire for outcome reporting, but adaptive tightening cannot apply.
This option is intended for tests; most callers should leave it unset.
@ddeboer ddeboer merged commit f783aed into main May 28, 2026
2 checks passed
@ddeboer ddeboer deleted the feat/adaptive-timeout-policy branch May 28, 2026 17:10
ddeboer added a commit to netwerk-digitaal-erfgoed/dataset-knowledge-graph that referenced this pull request May 28, 2026
## Summary

Enables adaptive per-endpoint SPARQL timeouts via the new
`PipelineOptions.timeout` field landing in `@lde/pipeline` (see
ldelements/lde#421).

After two consecutive `timeout` outcomes on the same endpoint,
subsequent requests fast-fail at 10s instead of waiting out the full
5-minute budget; a single successful request relaxes the endpoint back
to default. State resets per dataset, so one bad dataset cannot poison
the next.

The 2026-05-27 manual run highlighted the motivation:
`https://data.razu.nl/id/dataset/kranten` spent ~80 minutes cycling
through stage-level 504s while light queries on the same endpoint
completed in ~100ms. Expected effect with adaptive on: worst-case
wall-clock per troublesome dataset drops from ~80min to ~15min, with the
same partial output preserved.

## Blocked on

- ldelements/lde#421 merging
- A new `@lde/pipeline` release (≥ 0.31) so this branch can compile; the
PR depends on `adaptiveTimeoutPolicy` and `PipelineOptions.timeout`
which are not in 0.30.2.

Once that release ships, the `@lde/pipeline` version range in
`package.json` will be bumped (Dependabot will handle it) and this PR
can merge.

## Defaults

`defaultMs: 300_000`, `tightenedMs: 10_000`, `tightenAfterTimeouts: 2` —
the values discussed in ldelements/lde#419. Worth tuning empirically
once enabled (collect a couple of weeks of `↘ Tightened` event counts
from `ConsoleReporter` and adjust if too aggressive or too forgiving).
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Adaptive per-endpoint SPARQL timeouts in @lde/pipeline

1 participant