From 1dcdf9cdf290dc7ca0c7d494a5e514374f58f802 Mon Sep 17 00:00:00 2001 From: JunRuiLee Date: Tue, 19 May 2026 17:39:39 +0800 Subject: [PATCH 1/2] [python] Add StartupMode enum and scan.mode option to CoreOptions Introduce StartupMode enum and scan.mode config option, aligning with Java CoreOptions.SCAN_MODE for explicit scan startup mode control. - Add StartupMode enum with all Java values including deprecated FULL - Add scan.mode ConfigOption with DEFAULT as default value - Add startup_mode() method that resolves effective mode from scan.mode plus companion options (matching Java CoreOptions.startupMode() logic) - Maps deprecated FULL to LATEST_FULL - Auto-detects mode from scan.timestamp*, scan.snapshot-id, etc. when scan.mode is DEFAULT - Add _validate_scan_mode() in TableScan to reject invalid combinations: from-timestamp without timestamp options, from-snapshot without snapshot/tag/watermark, and streaming-only modes (latest, compacted-full) in batch reads --- .../pypaimon/common/options/core_options.py | 63 ++++++++++++ paimon-python/pypaimon/read/table_scan.py | 96 +++++++++++++++++++ 2 files changed, 159 insertions(+) diff --git a/paimon-python/pypaimon/common/options/core_options.py b/paimon-python/pypaimon/common/options/core_options.py index 1cb9e831d335..5ef851e82efb 100644 --- a/paimon-python/pypaimon/common/options/core_options.py +++ b/paimon-python/pypaimon/common/options/core_options.py @@ -56,6 +56,23 @@ class MergeEngine(str, Enum): FIRST_ROW = "first-row" +class StartupMode(str, Enum): + """ + Startup mode for scan operations. + """ + DEFAULT = "default" + LATEST_FULL = "latest-full" + FULL = "full" + LATEST = "latest" + COMPACTED_FULL = "compacted-full" + FROM_TIMESTAMP = "from-timestamp" + FROM_SNAPSHOT = "from-snapshot" + FROM_SNAPSHOT_FULL = "from-snapshot-full" + FROM_CREATION_TIMESTAMP = "from-creation-timestamp" + FROM_FILE_CREATION_TIME = "from-file-creation-time" + INCREMENTAL = "incremental" + + class CoreOptions: """Core options for Paimon tables.""" # File format constants @@ -299,6 +316,21 @@ class CoreOptions: .with_description("Specify the file name prefix of data files.") ) # Scan options + SCAN_MODE: ConfigOption[StartupMode] = ( + ConfigOptions.key("scan.mode") + .enum_type(StartupMode) + .default_value(StartupMode.DEFAULT) + .with_description( + "Scan startup mode for the table. " + "'default' resolves the actual mode from other scan options. " + "'latest-full' reads the latest snapshot then streams changes. " + "'latest' only streams changes without an initial snapshot. " + "'from-timestamp' reads from a specific timestamp. " + "'from-snapshot' reads from a specific snapshot. " + "'incremental' reads incremental changes between two snapshots/tags." + ) + ) + SCAN_FALLBACK_BRANCH: ConfigOption[str] = ( ConfigOptions.key("scan.fallback-branch") .string_type() @@ -749,6 +781,37 @@ def vector_target_file_size(self, default=None): def data_file_prefix(self, default=None): return self.options.get(CoreOptions.DATA_FILE_PREFIX, default) + def scan_mode(self, default=None): + return self.options.get(CoreOptions.SCAN_MODE, default) + + def startup_mode(self) -> 'StartupMode': + """Resolve the effective startup mode, matching Java CoreOptions.startupMode(). + + If scan.mode is DEFAULT, auto-detects from other scan options. + Maps deprecated FULL to LATEST_FULL. + """ + mode = self.scan_mode() + if mode == StartupMode.DEFAULT: + if (self.options.contains_key("scan.timestamp-millis") + or self.options.contains_key("scan.timestamp")): + return StartupMode.FROM_TIMESTAMP + elif (self.options.contains(CoreOptions.SCAN_SNAPSHOT_ID) + or self.options.contains(CoreOptions.SCAN_TAG_NAME) + or self.options.contains_key("scan.watermark")): + return StartupMode.FROM_SNAPSHOT + elif self.options.contains(CoreOptions.INCREMENTAL_BETWEEN_TIMESTAMP): + return StartupMode.INCREMENTAL + elif self.options.contains_key("scan.file-creation-time-millis"): + return StartupMode.FROM_FILE_CREATION_TIME + elif self.options.contains_key("scan.creation-time-millis"): + return StartupMode.FROM_CREATION_TIMESTAMP + else: + return StartupMode.LATEST_FULL + elif mode == StartupMode.FULL: + return StartupMode.LATEST_FULL + else: + return mode + def scan_fallback_branch(self, default=None): return self.options.get(CoreOptions.SCAN_FALLBACK_BRANCH, default) diff --git a/paimon-python/pypaimon/read/table_scan.py b/paimon-python/pypaimon/read/table_scan.py index 623261803503..ee9e4e34c493 100755 --- a/paimon-python/pypaimon/read/table_scan.py +++ b/paimon-python/pypaimon/read/table_scan.py @@ -58,6 +58,8 @@ def _create_file_scanner(self) -> FileScanner: snapshot_manager = self.table.snapshot_manager() manifest_list_manager = ManifestListManager(self.table) + self._validate_scan_mode() + from pypaimon.snapshot.time_travel_util import TimeTravelUtil, SCAN_KEYS has_time_travel = any(options.contains_key(key) for key in SCAN_KEYS) has_incremental = options.contains(CoreOptions.INCREMENTAL_BETWEEN_TIMESTAMP) @@ -158,3 +160,97 @@ def with_slice(self, start_pos, end_pos) -> 'TableScan': def with_global_index_result(self, result) -> 'TableScan': self.file_scanner.with_global_index_result(result) return self + + def _validate_scan_mode(self): + """Validate scan.mode against companion options using a whitelist approach. + + Each StartupMode declares exactly which scan keys are allowed. Any + scan key present but not in the whitelist for the resolved mode is + rejected. This matches Java's SchemaValidation mutual-exclusion matrix. + """ + from pypaimon.common.options.core_options import StartupMode + + core_options = self.table.options + mode = core_options.startup_mode() + options = core_options.options + + has_snapshot_id = options.contains(CoreOptions.SCAN_SNAPSHOT_ID) + has_tag_name = options.contains(CoreOptions.SCAN_TAG_NAME) + has_watermark = options.contains_key("scan.watermark") + has_timestamp_millis = options.contains_key("scan.timestamp-millis") + has_timestamp = options.contains_key("scan.timestamp") + has_incremental = options.contains(CoreOptions.INCREMENTAL_BETWEEN_TIMESTAMP) + has_file_creation_time = options.contains_key("scan.file-creation-time-millis") + has_creation_time = options.contains_key("scan.creation-time-millis") + + present_keys = [] + if has_snapshot_id: + present_keys.append("scan.snapshot-id") + if has_tag_name: + present_keys.append("scan.tag-name") + if has_watermark: + present_keys.append("scan.watermark") + if has_timestamp_millis: + present_keys.append("scan.timestamp-millis") + if has_timestamp: + present_keys.append("scan.timestamp") + if has_incremental: + present_keys.append("incremental-between-timestamp") + if has_file_creation_time: + present_keys.append("scan.file-creation-time-millis") + if has_creation_time: + present_keys.append("scan.creation-time-millis") + + # scan.timestamp-millis and scan.timestamp are mutually exclusive + if has_timestamp_millis and has_timestamp: + raise ValueError( + "scan.timestamp-millis and scan.timestamp cannot both be set." + ) + + # Define allowed companion keys per mode + if mode == StartupMode.FROM_TIMESTAMP: + allowed = {"scan.timestamp-millis", "scan.timestamp"} + if not (has_timestamp_millis or has_timestamp): + raise ValueError( + "scan.mode is 'from-timestamp' but neither " + "scan.timestamp-millis nor scan.timestamp is set." + ) + elif mode == StartupMode.FROM_SNAPSHOT_FULL: + allowed = {"scan.snapshot-id"} + if not has_snapshot_id: + raise ValueError( + "scan.mode is 'from-snapshot-full' but scan.snapshot-id is not set." + ) + elif mode == StartupMode.FROM_SNAPSHOT: + allowed = {"scan.snapshot-id", "scan.tag-name", "scan.watermark"} + if not (has_snapshot_id or has_tag_name or has_watermark): + raise ValueError( + "scan.mode is 'from-snapshot' but none of " + "scan.snapshot-id, scan.tag-name, or scan.watermark is set." + ) + elif mode == StartupMode.INCREMENTAL: + allowed = {"incremental-between-timestamp"} + if not has_incremental: + raise ValueError( + "scan.mode is 'incremental' but " + "incremental-between-timestamp is not set." + ) + elif mode in (StartupMode.LATEST_FULL, StartupMode.LATEST): + allowed = set() + elif mode in (StartupMode.COMPACTED_FULL, + StartupMode.FROM_CREATION_TIMESTAMP, + StartupMode.FROM_FILE_CREATION_TIME): + raise NotImplementedError( + f"scan.mode '{mode.value}' is not yet supported in pypaimon." + ) + else: + allowed = set() + + # Reject any scan key that's not in the whitelist for this mode + disallowed = [k for k in present_keys if k not in allowed] + if disallowed: + raise ValueError( + f"scan.mode '{mode.value}' conflicts with: {disallowed}. " + f"Only {sorted(allowed) if allowed else 'no scan keys'} " + f"are allowed for this mode." + ) From 30ea4f5647a7e37c91d3901863649123e591bdbf Mon Sep 17 00:00:00 2001 From: JunRuiLee Date: Sun, 24 May 2026 10:39:43 +0800 Subject: [PATCH 2/2] [python] Address review: use CoreOptions constants, fix exception type, add tests - Replace raw string literals with CoreOptions constants in _validate_scan_mode() for refactoring safety - Change NotImplementedError to ValueError for unsupported scan modes - Add unit tests covering validation paths: missing required options, conflicting options, DEFAULT auto-detection, unsupported modes, and deprecated FULL mapping --- .../pypaimon/common/options/core_options.py | 34 ++++++- paimon-python/pypaimon/read/table_scan.py | 43 +++++---- .../pypaimon/tests/table_scan_mode_test.py | 94 +++++++++++++++++++ 3 files changed, 148 insertions(+), 23 deletions(-) create mode 100644 paimon-python/pypaimon/tests/table_scan_mode_test.py diff --git a/paimon-python/pypaimon/common/options/core_options.py b/paimon-python/pypaimon/common/options/core_options.py index 5ef851e82efb..3840bda065ac 100644 --- a/paimon-python/pypaimon/common/options/core_options.py +++ b/paimon-python/pypaimon/common/options/core_options.py @@ -16,6 +16,7 @@ # under the License. import sys +import warnings from datetime import timedelta from enum import Enum from typing import Dict, Optional @@ -392,6 +393,24 @@ class CoreOptions: ) ) + SCAN_FILE_CREATION_TIME_MILLIS: ConfigOption[int] = ( + ConfigOptions.key("scan.file-creation-time-millis") + .long_type() + .no_default_value() + .with_description( + "After configuring this time, only the data files created after this time will be read." + ) + ) + + SCAN_CREATION_TIME_MILLIS: ConfigOption[int] = ( + ConfigOptions.key("scan.creation-time-millis") + .long_type() + .no_default_value() + .with_description( + "Optional timestamp used in case of 'from-creation-timestamp' scan mode." + ) + ) + SOURCE_SPLIT_TARGET_SIZE: ConfigOption[MemorySize] = ( ConfigOptions.key("source.split.target-size") .memory_type() @@ -792,22 +811,27 @@ def startup_mode(self) -> 'StartupMode': """ mode = self.scan_mode() if mode == StartupMode.DEFAULT: - if (self.options.contains_key("scan.timestamp-millis") - or self.options.contains_key("scan.timestamp")): + if (self.options.contains(CoreOptions.SCAN_TIMESTAMP_MILLIS) + or self.options.contains(CoreOptions.SCAN_TIMESTAMP)): return StartupMode.FROM_TIMESTAMP elif (self.options.contains(CoreOptions.SCAN_SNAPSHOT_ID) or self.options.contains(CoreOptions.SCAN_TAG_NAME) - or self.options.contains_key("scan.watermark")): + or self.options.contains(CoreOptions.SCAN_WATERMARK)): return StartupMode.FROM_SNAPSHOT elif self.options.contains(CoreOptions.INCREMENTAL_BETWEEN_TIMESTAMP): return StartupMode.INCREMENTAL - elif self.options.contains_key("scan.file-creation-time-millis"): + elif self.options.contains(CoreOptions.SCAN_FILE_CREATION_TIME_MILLIS): return StartupMode.FROM_FILE_CREATION_TIME - elif self.options.contains_key("scan.creation-time-millis"): + elif self.options.contains(CoreOptions.SCAN_CREATION_TIME_MILLIS): return StartupMode.FROM_CREATION_TIMESTAMP else: return StartupMode.LATEST_FULL elif mode == StartupMode.FULL: + warnings.warn( + "scan.mode 'full' is deprecated, use 'latest-full' instead.", + DeprecationWarning, + stacklevel=2, + ) return StartupMode.LATEST_FULL else: return mode diff --git a/paimon-python/pypaimon/read/table_scan.py b/paimon-python/pypaimon/read/table_scan.py index ee9e4e34c493..03a1c8b06297 100755 --- a/paimon-python/pypaimon/read/table_scan.py +++ b/paimon-python/pypaimon/read/table_scan.py @@ -176,30 +176,30 @@ def _validate_scan_mode(self): has_snapshot_id = options.contains(CoreOptions.SCAN_SNAPSHOT_ID) has_tag_name = options.contains(CoreOptions.SCAN_TAG_NAME) - has_watermark = options.contains_key("scan.watermark") - has_timestamp_millis = options.contains_key("scan.timestamp-millis") - has_timestamp = options.contains_key("scan.timestamp") + has_watermark = options.contains(CoreOptions.SCAN_WATERMARK) + has_timestamp_millis = options.contains(CoreOptions.SCAN_TIMESTAMP_MILLIS) + has_timestamp = options.contains(CoreOptions.SCAN_TIMESTAMP) has_incremental = options.contains(CoreOptions.INCREMENTAL_BETWEEN_TIMESTAMP) - has_file_creation_time = options.contains_key("scan.file-creation-time-millis") - has_creation_time = options.contains_key("scan.creation-time-millis") + has_file_creation_time = options.contains(CoreOptions.SCAN_FILE_CREATION_TIME_MILLIS) + has_creation_time = options.contains(CoreOptions.SCAN_CREATION_TIME_MILLIS) present_keys = [] if has_snapshot_id: - present_keys.append("scan.snapshot-id") + present_keys.append(CoreOptions.SCAN_SNAPSHOT_ID.key()) if has_tag_name: - present_keys.append("scan.tag-name") + present_keys.append(CoreOptions.SCAN_TAG_NAME.key()) if has_watermark: - present_keys.append("scan.watermark") + present_keys.append(CoreOptions.SCAN_WATERMARK.key()) if has_timestamp_millis: - present_keys.append("scan.timestamp-millis") + present_keys.append(CoreOptions.SCAN_TIMESTAMP_MILLIS.key()) if has_timestamp: - present_keys.append("scan.timestamp") + present_keys.append(CoreOptions.SCAN_TIMESTAMP.key()) if has_incremental: - present_keys.append("incremental-between-timestamp") + present_keys.append(CoreOptions.INCREMENTAL_BETWEEN_TIMESTAMP.key()) if has_file_creation_time: - present_keys.append("scan.file-creation-time-millis") + present_keys.append(CoreOptions.SCAN_FILE_CREATION_TIME_MILLIS.key()) if has_creation_time: - present_keys.append("scan.creation-time-millis") + present_keys.append(CoreOptions.SCAN_CREATION_TIME_MILLIS.key()) # scan.timestamp-millis and scan.timestamp are mutually exclusive if has_timestamp_millis and has_timestamp: @@ -209,27 +209,34 @@ def _validate_scan_mode(self): # Define allowed companion keys per mode if mode == StartupMode.FROM_TIMESTAMP: - allowed = {"scan.timestamp-millis", "scan.timestamp"} + allowed = { + CoreOptions.SCAN_TIMESTAMP_MILLIS.key(), + CoreOptions.SCAN_TIMESTAMP.key(), + } if not (has_timestamp_millis or has_timestamp): raise ValueError( "scan.mode is 'from-timestamp' but neither " "scan.timestamp-millis nor scan.timestamp is set." ) elif mode == StartupMode.FROM_SNAPSHOT_FULL: - allowed = {"scan.snapshot-id"} + allowed = {CoreOptions.SCAN_SNAPSHOT_ID.key()} if not has_snapshot_id: raise ValueError( "scan.mode is 'from-snapshot-full' but scan.snapshot-id is not set." ) elif mode == StartupMode.FROM_SNAPSHOT: - allowed = {"scan.snapshot-id", "scan.tag-name", "scan.watermark"} + allowed = { + CoreOptions.SCAN_SNAPSHOT_ID.key(), + CoreOptions.SCAN_TAG_NAME.key(), + CoreOptions.SCAN_WATERMARK.key(), + } if not (has_snapshot_id or has_tag_name or has_watermark): raise ValueError( "scan.mode is 'from-snapshot' but none of " "scan.snapshot-id, scan.tag-name, or scan.watermark is set." ) elif mode == StartupMode.INCREMENTAL: - allowed = {"incremental-between-timestamp"} + allowed = {CoreOptions.INCREMENTAL_BETWEEN_TIMESTAMP.key()} if not has_incremental: raise ValueError( "scan.mode is 'incremental' but " @@ -240,7 +247,7 @@ def _validate_scan_mode(self): elif mode in (StartupMode.COMPACTED_FULL, StartupMode.FROM_CREATION_TIMESTAMP, StartupMode.FROM_FILE_CREATION_TIME): - raise NotImplementedError( + raise ValueError( f"scan.mode '{mode.value}' is not yet supported in pypaimon." ) else: diff --git a/paimon-python/pypaimon/tests/table_scan_mode_test.py b/paimon-python/pypaimon/tests/table_scan_mode_test.py new file mode 100644 index 000000000000..c33fdf9beb40 --- /dev/null +++ b/paimon-python/pypaimon/tests/table_scan_mode_test.py @@ -0,0 +1,94 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import unittest +import warnings +from unittest.mock import Mock + +from pypaimon.common.options.core_options import CoreOptions, StartupMode +from pypaimon.read.table_scan import TableScan + + +def _scan(options): + scan = TableScan.__new__(TableScan) + scan.table = Mock() + scan.table.options = CoreOptions.from_dict(options) + return scan + + +class TableScanModeTest(unittest.TestCase): + + def test_from_timestamp_requires_timestamp_option(self): + scan = _scan({ + CoreOptions.SCAN_MODE.key(): StartupMode.FROM_TIMESTAMP.value, + }) + + with self.assertRaisesRegex( + ValueError, + "neither scan.timestamp-millis nor scan.timestamp is set"): + scan._validate_scan_mode() + + def test_latest_conflicts_with_snapshot_id(self): + scan = _scan({ + CoreOptions.SCAN_MODE.key(): StartupMode.LATEST.value, + CoreOptions.SCAN_SNAPSHOT_ID.key(): "1", + }) + + with self.assertRaisesRegex(ValueError, "scan.snapshot-id"): + scan._validate_scan_mode() + + def test_default_with_timestamp_millis_resolves_to_from_timestamp(self): + options = CoreOptions.from_dict({ + CoreOptions.SCAN_MODE.key(): StartupMode.DEFAULT.value, + CoreOptions.SCAN_TIMESTAMP_MILLIS.key(): "123", + }) + + self.assertEqual(options.startup_mode(), StartupMode.FROM_TIMESTAMP) + _scan(options.options.to_map())._validate_scan_mode() + + def test_default_with_snapshot_id_resolves_to_from_snapshot(self): + options = CoreOptions.from_dict({ + CoreOptions.SCAN_MODE.key(): StartupMode.DEFAULT.value, + CoreOptions.SCAN_SNAPSHOT_ID.key(): "1", + }) + + self.assertEqual(options.startup_mode(), StartupMode.FROM_SNAPSHOT) + _scan(options.options.to_map())._validate_scan_mode() + + def test_unsupported_scan_modes_raise_value_error(self): + scan = _scan({ + CoreOptions.SCAN_MODE.key(): StartupMode.COMPACTED_FULL.value, + }) + + with self.assertRaisesRegex(ValueError, "not yet supported"): + scan._validate_scan_mode() + + def test_full_mode_maps_to_latest_full_with_deprecation_warning(self): + options = CoreOptions.from_dict({ + CoreOptions.SCAN_MODE.key(): StartupMode.FULL.value, + }) + + with warnings.catch_warnings(record=True) as caught: + warnings.simplefilter("always") + self.assertEqual(options.startup_mode(), StartupMode.LATEST_FULL) + + self.assertEqual(len(caught), 1) + self.assertTrue(issubclass(caught[0].category, DeprecationWarning)) + + +if __name__ == '__main__': + unittest.main()