feat(pipeline): adaptive per-endpoint SPARQL timeouts#421
Merged
Conversation
- Introduce TimeoutPolicy interface with ConstantTimeoutPolicy and AdaptiveTimeoutPolicy implementations, plus matching factories - Inject per-call AbortSignal in SparqlConstructExecutor and SparqlItemSelector; classify HTTP 504 and AbortError as 'timeout' - Thread a per-dataset TimeoutPolicy through PipelineOptions → Stage → executors/selectors via ExecuteOptions and SelectOptions - Forward tighten/relax transitions to ProgressReporter; ConsoleReporter prints them - Breaking: SparqlConstructExecutorOptions.timeout is replaced by timeoutPolicy; migrate pipeline-void and pipeline-shacl-sampler - Document the mechanism in the pipeline README and ADR 0003
Issue #419 already captures the rationale and decisions; the ADR duplicated rather than added context.
- default → defaultMs - short → tightenedMs - tightenAfterTimeouts replaces threshold The new names carry explicit units and echo the tighten/relax vocabulary used by the state machine, the transition events, and the console output.
- PipelineOptions.timeout (factory)
- ExecuteOptions.timeout / SelectOptions.timeout / RunOptions.timeout
- SparqlConstructExecutorOptions.timeout / SparqlItemSelectorOptions.timeout
- Internal Pipeline.timeoutFactory field follows suit
The option's purpose is configuring the timeout; the value's *type* is
TimeoutPolicy. Naming the field after its purpose is shorter, less
stuttery, and reads cleaner at call sites:
timeout: adaptiveTimeoutPolicy({...})
Also expands the README's adaptive-timeouts section to define the
healthy/tightened states upfront and tabulate outcome classification.
Timeouts now live exclusively at the Pipeline level. Drop: - SparqlConstructExecutorOptions.timeout - SparqlItemSelectorOptions.timeout - VoidStageOptions.timeout (would have been silently overridden) - ShaclSampleStagesOptions.timeout (same) The fallback budget when no PipelineOptions.timeout is supplied is now a module-level ConstantTimeoutPolicy(300_000). The old executor/selector options were redundant: at runtime, Stage always forwarded the per- dataset policy from Pipeline, which silently replaced any executor-author ceiling. Removing the option fixes that footgun and pushes operators toward the right altitude for timeout configuration.
…ough adapter selectors classSelector and subjectSelector wrapped SparqlItemSelector but dropped the third `SelectOptions` argument that Stage now threads through. As a result, the Pipeline's per-dataset TimeoutPolicy never reached selector requests for VoID class partitioning or SHACL subject sampling, so adaptive tightening silently ignored those calls. Also clarifies the JSDoc on SparqlConstructExecutorOptions.fetcher: a user-supplied fetcher bypasses the policy budget — the policy hooks still fire for outcome reporting, but adaptive tightening cannot apply. This option is intended for tests; most callers should leave it unset.
ddeboer
added a commit
to netwerk-digitaal-erfgoed/dataset-knowledge-graph
that referenced
this pull request
May 28, 2026
## Summary Enables adaptive per-endpoint SPARQL timeouts via the new `PipelineOptions.timeout` field landing in `@lde/pipeline` (see ldelements/lde#421). After two consecutive `timeout` outcomes on the same endpoint, subsequent requests fast-fail at 10s instead of waiting out the full 5-minute budget; a single successful request relaxes the endpoint back to default. State resets per dataset, so one bad dataset cannot poison the next. The 2026-05-27 manual run highlighted the motivation: `https://data.razu.nl/id/dataset/kranten` spent ~80 minutes cycling through stage-level 504s while light queries on the same endpoint completed in ~100ms. Expected effect with adaptive on: worst-case wall-clock per troublesome dataset drops from ~80min to ~15min, with the same partial output preserved. ## Blocked on - ldelements/lde#421 merging - A new `@lde/pipeline` release (≥ 0.31) so this branch can compile; the PR depends on `adaptiveTimeoutPolicy` and `PipelineOptions.timeout` which are not in 0.30.2. Once that release ships, the `@lde/pipeline` version range in `package.json` will be bumped (Dependabot will handle it) and this PR can merge. ## Defaults `defaultMs: 300_000`, `tightenedMs: 10_000`, `tightenAfterTimeouts: 2` — the values discussed in ldelements/lde#419. Worth tuning empirically once enabled (collect a couple of weeks of `↘ Tightened` event counts from `ConsoleReporter` and adjust if too aggressive or too forgiving).
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Adds adaptive per-endpoint SPARQL timeouts to
@lde/pipeline, so a single failing third-party endpoint can no longer hold up the whole DKG run.TimeoutPolicyinterface plus two implementations:ConstantTimeoutPolicy(current behaviour, the implicit default) andAdaptiveTimeoutPolicy(per-endpoint state machine that tightens to a short budget afterthresholdconsecutive timeouts and relaxes on the nextok). Factory helpersconstantTimeoutPolicy/adaptiveTimeoutPolicyare exported.SparqlConstructExecutorandSparqlItemSelectornow install a per-callAbortSignalderived from the policy. Each attempt insidepRetrycallsbeforeRequest, runs the request, classifies the outcome (HTTP 504 andAbortError/TimeoutError→timeout, others →error), and reports viaafterRequest. The executor’stimeout: numberoption is replaced bytimeoutPolicy?: TimeoutPolicy(breaking, pre-release perAGENTS.md).PipelineOptions.timeoutPolicy?: () => TimeoutPolicyis invoked once per dataset and threaded through every stage / executor / selector viaRunOptions,ExecuteOptions, andSelectOptions.ProgressReportergains optionaltimeoutTightened/timeoutRelaxedhooks. The pipeline subscribes to the policy at each dataset boundary and forwards transitions.ConsoleReporterprints↘ Tightened/↗ Relaxedlines so operators can tell a fast-failed stage from an unexpected speedup.pipeline-voidandpipeline-shacl-samplerto the newtimeoutPolicyfield.Behaviour is unchanged for callers that don’t supply a policy — the implicit default is
constantTimeoutPolicy(300_000).Fix #419