fix: commit schema bug#1666
Conversation
WalkthroughEvent processing now stages schemas upfront using a new reusable helper. ChangesSchema Staging Refactoring
Sequence DiagramsequenceDiagram
participant EventProcess as Event::process
participant PARSEABLE
participant Stream
participant Filesystem
EventProcess->>PARSEABLE: get_or_create_stream(...)
PARSEABLE-->>EventProcess: stream
EventProcess->>EventProcess: Check is_first_event && !static_schema_flag
EventProcess->>Stream: stage_schema_file(schema)
Stream->>Filesystem: Ensure stream directory exists
Stream->>Filesystem: Load existing staging schemas (if present)
Stream->>Stream: Merge schemas via Schema::try_merge
Stream->>Filesystem: Write merged schema to stream_name.schema
Stream-->>EventProcess: Ok(())
EventProcess->>Stream: push(event_data)
Stream-->>EventProcess: Result
🎯 3 (Moderate) | ⏱️ ~20 minutes Possibly Related PRs
Suggested Reviewers
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 inconclusive)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Comment |
There was a problem hiding this comment.
Actionable comments posted: 1
🧹 Nitpick comments (1)
src/event/mod.rs (1)
91-95: ⚡ Quick winAdd a regression test for the new first-event staging path.
This branch is the actual fix for the Quest failure, but I don’t see coverage here asserting that a dynamic-schema stream has a staged
*.schemabefore the firstpush()/parquet cycle. A focused test aroundEvent::processwould make this much harder to regress.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@src/event/mod.rs` around lines 91 - 95, Add a regression test that exercises the first-event staging path in Event::process: create a dynamic-schema stream (stream.get_static_schema_flag() == false), invoke Event::process (or the flow that sets self.is_first_event) and assert that commit_schema(&self.stream_name, ...) was called and that stream.stage_schema_file received/created the expected "*.schema" before any push()/parquet cycle; use the Event::process entry point, the is_first_event flag, and verify side-effects against commit_schema and stage_schema_file (or inspect the stream's staged schema via get_schema()) to ensure the schema file is staged on the first event.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@src/parseable/streams.rs`:
- Around line 510-527: stage_schema_file currently does an unsynchronized
read/merge/write that can lose fields or read partial JSON; fix by serializing
updates with a per-stream lock and writing via a temp-file + atomic rename:
add/use a per-stream Mutex/RwLock (e.g., a field on the struct guarding schema
ops) and acquire it at the start of stage_schema_file, call
get_schemas_if_present (or re-read the staging file) while holding the lock,
merge via Schema::try_merge, then write the merged bytes to a temp path (e.g.,
same dir with a .tmp suffix or .ingestor.{id}.schema.tmp) and atomically rename
to the final schema filename (replace the current RelativePathBuf usage so final
name is the intended .ingestor.{id}.schema); also ensure get_schemas_if_present
is only used/read under the same lock or returns errors instead of silently
ignoring deserialization failures so partial writes are not treated as absent.
---
Nitpick comments:
In `@src/event/mod.rs`:
- Around line 91-95: Add a regression test that exercises the first-event
staging path in Event::process: create a dynamic-schema stream
(stream.get_static_schema_flag() == false), invoke Event::process (or the flow
that sets self.is_first_event) and assert that commit_schema(&self.stream_name,
...) was called and that stream.stage_schema_file received/created the expected
"*.schema" before any push()/parquet cycle; use the Event::process entry point,
the is_first_event flag, and verify side-effects against commit_schema and
stage_schema_file (or inspect the stream's staged schema via get_schema()) to
ensure the schema file is staged on the first event.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Repository UI
Review profile: CHILL
Plan: Pro
Run ID: d98e25e0-7082-48cd-85ab-0a5c8c5731e7
📒 Files selected for processing (2)
src/event/mod.rssrc/parseable/streams.rs
| pub fn stage_schema_file(&self, mut schema: Schema) -> Result<(), StagingError> { | ||
| // schema is dynamic, read from staging and merge if present | ||
| fs::create_dir_all(&self.data_path)?; | ||
|
|
||
| let staging_schemas = self.get_schemas_if_present(); | ||
| if let Some(mut staging_schemas) = staging_schemas { | ||
| staging_schemas.push(schema); | ||
| schema = Schema::try_merge(staging_schemas)?; | ||
| } | ||
| // need to add something before .schema to make the file have an extension of type `schema` | ||
| let path = RelativePathBuf::from_iter([format!("{}.schema", self.stream_name)]) | ||
| .to_path(&self.data_path); | ||
|
|
||
| // save the merged schema on staging disk | ||
| // the path should be stream/.ingestor.{id}.schema | ||
| info!("writing schema to path - {path:?}"); | ||
| write(path, to_bytes(&schema))?; | ||
| } | ||
| let staging_schemas = self.get_schemas_if_present(); | ||
| if let Some(mut staging_schemas) = staging_schemas { | ||
| staging_schemas.push(schema); | ||
| schema = Schema::try_merge(staging_schemas)?; | ||
| } | ||
|
|
||
| // save the merged schema on staging disk | ||
| // the path should be stream/.ingestor.{id}.schema | ||
| info!("writing schema to path - {path:?}"); | ||
| write(path, to_bytes(&schema))?; |
There was a problem hiding this comment.
Serialize schema-file updates; the current read/merge/write can drop fields.
stage_schema_file now sits on a hotter path, but it still does an unsynchronized read/merge/write of the same *.schema file. Two concurrent callers can both merge against stale on-disk state and the later write() wins; additionally, a concurrent get_schemas_if_present() can observe a partially rewritten JSON file and silently ignore it because deserialization errors are skipped. Please guard this helper with a per-stream schema lock and publish updates via temp-file + rename so staged schemas cannot regress under concurrent ingestion.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@src/parseable/streams.rs` around lines 510 - 527, stage_schema_file currently
does an unsynchronized read/merge/write that can lose fields or read partial
JSON; fix by serializing updates with a per-stream lock and writing via a
temp-file + atomic rename: add/use a per-stream Mutex/RwLock (e.g., a field on
the struct guarding schema ops) and acquire it at the start of
stage_schema_file, call get_schemas_if_present (or re-read the staging file)
while holding the lock, merge via Schema::try_merge, then write the merged bytes
to a temp path (e.g., same dir with a .tmp suffix or .ingestor.{id}.schema.tmp)
and atomically rename to the final schema filename (replace the current
RelativePathBuf usage so final name is the intended .ingestor.{id}.schema); also
ensure get_schemas_if_present is only used/read under the same lock or returns
errors instead of silently ignoring deserialization failures so partial writes
are not treated as absent.
Parseable enterprise failed Quest test due to schema not being present in metastore as soon as possible
Description
This PR has:
Summary by CodeRabbit