Skip to content

fix: commit schema bug#1666

Open
parmesant wants to merge 1 commit into
parseablehq:mainfrom
parmesant:commit_schema_fix
Open

fix: commit schema bug#1666
parmesant wants to merge 1 commit into
parseablehq:mainfrom
parmesant:commit_schema_fix

Conversation

@parmesant
Copy link
Copy Markdown
Contributor

@parmesant parmesant commented Jun 2, 2026

Parseable enterprise failed Quest test due to schema not being present in metastore as soon as possible

Description


This PR has:

  • been tested to ensure log ingestion and log query works.
  • added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader.
  • added documentation for new or modified features or behaviors.

Summary by CodeRabbit

  • Refactor
    • Enhanced event processing workflow with improved schema staging and management efficiency

@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai Bot commented Jun 2, 2026

Review Change Stack

Walkthrough

Event processing now stages schemas upfront using a new reusable helper. Event::process creates the stream instance once before processing, conditionally stages the schema file on the first event, and reuses the stream for event pushes. The staging logic is extracted from prepare_parquet into stage_schema_file, which handles directory creation, schema merging, and disk writes.

Changes

Schema Staging Refactoring

Layer / File(s) Summary
Schema staging helper and prepare_parquet refactoring
src/parseable/streams.rs
New public method stage_schema_file creates the stream directory, computes the relative schema file path, loads and merges existing staging schemas via Schema::try_merge, and writes the result to disk. prepare_parquet is refactored to delegate to this helper instead of containing the staging-and-merge logic inline.
Event processing with staged schema handling
src/event/mod.rs
Event::process is refactored to bind the stream instance upfront via get_or_create_stream. On first event, it conditionally stages the schema file via stream.stage_schema_file(...) when stream.get_static_schema_flag() is false, then pushes the event using the pre-obtained stream instance instead of inline stream creation.

Sequence Diagram

sequenceDiagram
  participant EventProcess as Event::process
  participant PARSEABLE
  participant Stream
  participant Filesystem
  EventProcess->>PARSEABLE: get_or_create_stream(...)
  PARSEABLE-->>EventProcess: stream
  EventProcess->>EventProcess: Check is_first_event && !static_schema_flag
  EventProcess->>Stream: stage_schema_file(schema)
  Stream->>Filesystem: Ensure stream directory exists
  Stream->>Filesystem: Load existing staging schemas (if present)
  Stream->>Stream: Merge schemas via Schema::try_merge
  Stream->>Filesystem: Write merged schema to stream_name.schema
  Stream-->>EventProcess: Ok(())
  EventProcess->>Stream: push(event_data)
  Stream-->>EventProcess: Result
Loading

🎯 3 (Moderate) | ⏱️ ~20 minutes

Possibly Related PRs

  • parseablehq/parseable#1201: Refactors error handling for schema commits to use StagingError, which directly enables the new stage_schema_file method's error semantics.
  • parseablehq/parseable#1258: Optimizes stream creation contention in get_or_create_stream, which is now called upfront in the refactored Event::process.
  • parseablehq/parseable#1381: Fixes the order of stream creation relative to schema commit, aligning with this PR's refactoring to create streams before schema staging.

Suggested Reviewers

  • nikhilsinhaparseable
  • nitisht

🐰 Schemas now stage with grace,
Stream reuse keeps up the pace,
Events push clean and true,
One instance, not brand new!

🚥 Pre-merge checks | ✅ 4 | ❌ 1

❌ Failed checks (1 inconclusive)

Check name Status Explanation Resolution
Description check ❓ Inconclusive The description mentions the bug symptom (schema not in metastore soon enough) but lacks implementation details about how the fix works and which specific changes address the issue. Add details explaining the chosen solution and key changes (e.g., stream instance creation timing and new stage_schema_file method) to clarify the technical approach.
✅ Passed checks (4 passed)
Check name Status Explanation
Title check ✅ Passed The title 'fix: commit schema bug' directly reflects the main change—refactoring schema commitment flow to ensure schema availability in metastore earlier.
Docstring Coverage ✅ Passed Docstring coverage is 100.00% which is sufficient. The required threshold is 80.00%.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (1)
src/event/mod.rs (1)

91-95: ⚡ Quick win

Add a regression test for the new first-event staging path.

This branch is the actual fix for the Quest failure, but I don’t see coverage here asserting that a dynamic-schema stream has a staged *.schema before the first push()/parquet cycle. A focused test around Event::process would make this much harder to regress.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/event/mod.rs` around lines 91 - 95, Add a regression test that exercises
the first-event staging path in Event::process: create a dynamic-schema stream
(stream.get_static_schema_flag() == false), invoke Event::process (or the flow
that sets self.is_first_event) and assert that commit_schema(&self.stream_name,
...) was called and that stream.stage_schema_file received/created the expected
"*.schema" before any push()/parquet cycle; use the Event::process entry point,
the is_first_event flag, and verify side-effects against commit_schema and
stage_schema_file (or inspect the stream's staged schema via get_schema()) to
ensure the schema file is staged on the first event.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@src/parseable/streams.rs`:
- Around line 510-527: stage_schema_file currently does an unsynchronized
read/merge/write that can lose fields or read partial JSON; fix by serializing
updates with a per-stream lock and writing via a temp-file + atomic rename:
add/use a per-stream Mutex/RwLock (e.g., a field on the struct guarding schema
ops) and acquire it at the start of stage_schema_file, call
get_schemas_if_present (or re-read the staging file) while holding the lock,
merge via Schema::try_merge, then write the merged bytes to a temp path (e.g.,
same dir with a .tmp suffix or .ingestor.{id}.schema.tmp) and atomically rename
to the final schema filename (replace the current RelativePathBuf usage so final
name is the intended .ingestor.{id}.schema); also ensure get_schemas_if_present
is only used/read under the same lock or returns errors instead of silently
ignoring deserialization failures so partial writes are not treated as absent.

---

Nitpick comments:
In `@src/event/mod.rs`:
- Around line 91-95: Add a regression test that exercises the first-event
staging path in Event::process: create a dynamic-schema stream
(stream.get_static_schema_flag() == false), invoke Event::process (or the flow
that sets self.is_first_event) and assert that commit_schema(&self.stream_name,
...) was called and that stream.stage_schema_file received/created the expected
"*.schema" before any push()/parquet cycle; use the Event::process entry point,
the is_first_event flag, and verify side-effects against commit_schema and
stage_schema_file (or inspect the stream's staged schema via get_schema()) to
ensure the schema file is staged on the first event.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Repository UI

Review profile: CHILL

Plan: Pro

Run ID: d98e25e0-7082-48cd-85ab-0a5c8c5731e7

📥 Commits

Reviewing files that changed from the base of the PR and between cefe210 and bdf5e17.

📒 Files selected for processing (2)
  • src/event/mod.rs
  • src/parseable/streams.rs

Comment thread src/parseable/streams.rs
Comment on lines +510 to +527
pub fn stage_schema_file(&self, mut schema: Schema) -> Result<(), StagingError> {
// schema is dynamic, read from staging and merge if present
fs::create_dir_all(&self.data_path)?;

let staging_schemas = self.get_schemas_if_present();
if let Some(mut staging_schemas) = staging_schemas {
staging_schemas.push(schema);
schema = Schema::try_merge(staging_schemas)?;
}
// need to add something before .schema to make the file have an extension of type `schema`
let path = RelativePathBuf::from_iter([format!("{}.schema", self.stream_name)])
.to_path(&self.data_path);

// save the merged schema on staging disk
// the path should be stream/.ingestor.{id}.schema
info!("writing schema to path - {path:?}");
write(path, to_bytes(&schema))?;
}
let staging_schemas = self.get_schemas_if_present();
if let Some(mut staging_schemas) = staging_schemas {
staging_schemas.push(schema);
schema = Schema::try_merge(staging_schemas)?;
}

// save the merged schema on staging disk
// the path should be stream/.ingestor.{id}.schema
info!("writing schema to path - {path:?}");
write(path, to_bytes(&schema))?;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Serialize schema-file updates; the current read/merge/write can drop fields.

stage_schema_file now sits on a hotter path, but it still does an unsynchronized read/merge/write of the same *.schema file. Two concurrent callers can both merge against stale on-disk state and the later write() wins; additionally, a concurrent get_schemas_if_present() can observe a partially rewritten JSON file and silently ignore it because deserialization errors are skipped. Please guard this helper with a per-stream schema lock and publish updates via temp-file + rename so staged schemas cannot regress under concurrent ingestion.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/parseable/streams.rs` around lines 510 - 527, stage_schema_file currently
does an unsynchronized read/merge/write that can lose fields or read partial
JSON; fix by serializing updates with a per-stream lock and writing via a
temp-file + atomic rename: add/use a per-stream Mutex/RwLock (e.g., a field on
the struct guarding schema ops) and acquire it at the start of
stage_schema_file, call get_schemas_if_present (or re-read the staging file)
while holding the lock, merge via Schema::try_merge, then write the merged bytes
to a temp path (e.g., same dir with a .tmp suffix or .ingestor.{id}.schema.tmp)
and atomically rename to the final schema filename (replace the current
RelativePathBuf usage so final name is the intended .ingestor.{id}.schema); also
ensure get_schemas_if_present is only used/read under the same lock or returns
errors instead of silently ignoring deserialization failures so partial writes
are not treated as absent.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant