Skip to content

[flink] Fix scan parallelism propagating to downstream operators#7914

Open
ArnavBalyan wants to merge 1 commit into
apache:masterfrom
ArnavBalyan:arnavb/flink-override
Open

[flink] Fix scan parallelism propagating to downstream operators#7914
ArnavBalyan wants to merge 1 commit into
apache:masterfrom
ArnavBalyan:arnavb/flink-override

Conversation

@ArnavBalyan
Copy link
Copy Markdown
Member

Purpose

Tests

  • UT

@ArnavBalyan
Copy link
Copy Markdown
Member Author

cc @JingsongLi thanks!

@hackergin
Copy link
Copy Markdown

@ArnavBalyan Thanks for picking this up! The direction is right, but I think there are a few gaps worth addressing before merging:

  1. Only handles explicitly configured scan.parallelism, misses the inferred path

The PR only propagates options.get(SCAN_PARALLELISM). However, Paimon's parallelism resolution has two sources — see FlinkTableSource#inferSourceParallelism:

  1. User-configured scan.parallelism (explicit)
  2. infer-scan.parallelism = true (enabled by default), which infers from bucket count / split count

This PR only covers case (1). For case (2) — which is the default behavior most users hit — the inferred value is still applied via DataStream#setParallelism() inside the
producer lambda, so ParallelismProvider#getParallelism() returns Optional.empty() and the planner-side isParallelismConfigured=true flag is never set. The forward-edge
propagation described in #7905 will still happen for any user who relies on inferred parallelism.

  1. Parallelism is now set twice — provider + producer

After this PR, parallelism is applied in two places:

  • The planner reads ParallelismProvider#getParallelism() and calls setParallelism(p, true) on the Source Transformation
  • The producer lambda still calls sourceBuilder.sourceParallelism(inferSourceParallelism(env)) / dataStreamSource.setParallelism(parallelism) internally

The DataStream#setParallelism(int) overload sets isParallelismConfigured=false, while the planner path sets it to true. Depending on execution order, the planner's
"configured" flag may be overridden back to false, which would silently re-introduce the forward-edge propagation this PR is trying to fix.

  1. Test placement and coverage
  • testScanProviderGetParallelism is placed in LineageUtilsTest, but it has nothing to do with lineage.
  • The test only asserts Optional.ofNullable behavior on the field. It doesn't actually verify the bug being fixed — there's no assertion that downstream operators no longer
    inherit the source parallelism, and no assertion on Source Transformation#isParallelismConfigured(). An end-to-end SQL test would be much more convincing, and the inferred
    path from (1) should also be covered.
  1. Flink version note

ParallelismProvider#getParallelism() only fully solves the forward-edge issue from Flink 1.19+ (FLIP-367), where the planner explicitly calls setParallelism(p, true) on the
Source Transformation. On older Flink versions the behavior is unchanged. Worth mentioning this in the PR description / a Javadoc note so users know what to expect.

@hackergin
Copy link
Copy Markdown

State compatibility / upgrade risk

This fix changes the resulting job graph topology for any existing user who has scan.parallelism set or relies on inferred parallelism:

  • Before: source and downstream operators are chained via forward edges, all running at source parallelism.
  • After: the planner inserts a rebalance edge between source and downstream, and operator chaining is broken.

For users upgrading from an older Paimon version with an existing Flink savepoint/checkpoint, this is a breaking topology change:

  • The operator IDs / chain hashes of downstream operators will change, so state restore from savepoint will fail (or silently drop state) unless users explicitly set uids on
    every operator — which most SQL users don't.
  • Even watermark/state alignment behavior of downstream stateful operators may differ once the chain is broken.

This needs to be called out explicitly in the PR description / release notes as a breaking change for stateful streaming jobs.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants