You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Source connectors today write to one stream and one topic, picked at startup.
Say you have a Postgres table with 50 event types and you want each type in its own Iggy topic. today you either split into 50 tables (bad schema, no one uses databases like this) or run 50 connector copies. the 50-copy version means 50x DB load, 50 configs, 50 state files; for CDC, 50 replication slots, each pinning WAL until its restart_lsn advances - one stuck consumer pins WAL for the whole cluster.
Using iggy connectors should not force its users to bad schema or design choices.
The row should say where it goes; the connector should send it there.
Proposal TL;DR
Messages carry destination: Option<Destination>; runtime groups a batch by (stream, topic) and dispatches each group via one unbound IggyProducer::send_to. Postgres source derives destination from a column or table name; any source can use the new Route transform to map a payload field through a static lookup. Source gains required commit() (runs after atomic state persist; owns slot-advance, deferred DELETE) and mode() (Cdc | Polling { destructive } | Opaque, drives commit-failure policy). Destination growth bounded by max_destinations cap, allowlist, and per-destination circuit breaker. Bundled: fix producer task leak on stream switch, atomic FileStateProvider::save (tmp+rename+fsync), switch Postgres CDC slot from pgoutput to test_decoding so the existing parser actually matches.
Scope of changes
two changes that make ABI break - we're under 1.0.0 no migrators, etc. v1 favors simple-and-correct over flexible. things deliberately left for later live in Future so we don't accidentally close those doors.
Step 1 - messages carry their own destination
ProducedMessage and DecodedMessage get destination: Option<Destination> where Destination { stream: Identifier, topic: Identifier, partition_key: Option<Partitioning> }. partition_key = None = balanced (server picks).
the runtime groups a batch by (stream, topic) and sends each group through one shared unbound IggyProducer. partition_key rides through to send_to. IggyProducer::send_to(stream, topic, messages, partitioning) already takes them per call, so the producer doesn't need to be bound.
cap + allowlist checked before any send. a typo or a weird row value can't accidentally create thousands of topics.
fits CDC and any source that already knows where each row belongs (table name, tenant ID, event-type column). the Postgres source gains config keys for column-derived destinations, per-table mapping, and a flag to strip routing columns from the payload.
bonus: kills a real bug. configuring several streams for one source builds one producer per stream and keeps the last; each discarded producer leaves a spawned task at producer.rs:153 that only exits on DiagnosticEvent::Shutdown from the underlying client. routing makes this impossible by design (one producer per source plugin), but the orphan leak should still be fixed independently with a cancellation token on ProducerCore - it bites any SDK user, not just connectors.
Step 2 - a routing transform
a new Route transform reads a field from the payload, looks the value up in a static mapping, and writes the destination onto the message (same field Step 1 added). the mapping is served by the existing config HTTP server (one new endpoint, same auth).
Route replaces the whole destination atomically. mixing plugin-set partition_key with Route-set topic isn't allowed: different topic = different partition count = different key->partition mapping. one wins, all-or-nothing.
Route always overwrites if it matches; no precedence option. a source configured with both any routing.* key and a Route transform fails init - explicit refusal at startup.
fits HTTP webhook sources, file tailers, bridges from other messaging systems - anything where the plugin dumps opaque data and the operator owns routing.
topic_field uses dotted-path syntax over JSON objects: a.b.c walks Object -> Object -> leaf. no arrays, no key names containing .. new grammar, scoped to Route only; the existing filter_fields transform stays top-level-key-only.
dispatch order:
plugin sets destination.
transforms run; Route may overwrite.
admission checks the result against cap and allowlist.
anything still missing follows on_missing_destination.
non-Route transforms mutate DecodedMessage in place today, so they preserve destination for free. only test helpers and trait docs need updating.
SDK changes (ABI break)
Step 1 cannot land on the current SDK; batched into the same release:
IggyProducer::unbound() constructor + builder. today IggyProducerBuilder::new requires stream/topic up front and ProducerCore::init creates them. unbound skips that and exposes only send_to; calling send() errors. the runtime enforces the split at startup: bound producer rejects messages with a destination, unbound rejects messages without one.
unbound() is direct-only. builder refuses SendMode::Background - background ack means "queued into the linger buffer", not "server acked", and the commit story below needs real acks. losing the distinction silently breaks CDC. background batching for routed sources is a future design with explicit ack callbacks.
unbound() forces send_retries_count = None. retries live in the per-destination circuit breaker.
the Source trait gains two required methods, no defaults:
async fn commit(&self) -> Result<(), Error> - runs after the runtime has atomically persisted state (tmp + rename + parent-dir fsync; today's FileStateProvider::save truncates-then-writes, leaving the file zero-byte on a mid-write crash - fix bundled in this release). plugin drains Mutex<Option<PendingBatch>> and runs side effects (slot-advance, deferred DELETE). if commit is skipped or fails, the next poll filters rows already covered by persisted state (CDC: drop rows where lsn <= last_persisted_lsn, then re-run slot-advance). required, no default impl - silent no-op corrupts state. no state arg: PendingBatch is the single typed home.
fn mode(&self) -> SourceMode - returns Cdc | Polling { destructive: bool } | Opaque. drives on_admission_failure default and commit-failure policy. no default - plugin author picks at code time, not config time. runtime cross-checks at startup: Polling { destructive: false } with delete_after_read = true or processed_column set = init fails. config can't escalate non-destructive into destructive; the reverse (overdeclared destructive) is allowed.
ProducedMessage and DecodedMessage gain destination: Option<Destination>. sink-side ConsumedMessage unchanged.
ConnectorState stays pub Vec<u8>. the plugin puts a state-shape version as the first field of its own envelope. version mismatch -> typed error from Source::init -> connector refuses to start; operator clears the state file deliberately. silent reset is opt-in behind an explicit plugin flag (off by default). the runtime never interprets state contents.
ConnectorStatus gains Degraded, slotted between Running and Error. today's enum (Starting | Running | Stopping | Stopped | Error) treats Error as terminal and can't carry "still retrying, not progressing" which the CDC commit-retry loop needs.
runtime call site: process_messages returns Vec<(DestinationKey, IggyMessage)> instead of Vec<IggyMessage>. source_forwarding_loop replaces the single producer.send(...) with a loop that issues one send_to per destination group, waits for all to finish, persists state atomically, then calls commit.
in-tree plugins (random, http, postgres, elasticsearch, influxdb) gain commit: Ok(()) + correct mode() impls in the same release. plugins ship as independent .so files - the runtime gates dlopen on iggy_source_abi_version() -> u32 matching its compiled-in constant and refuses load on mismatch with a typed error. without this gate, postcard would silently misdecode ProducedMessage (new destination field is non-self-describing wire-format change) and every batch from an old .so would drop.
State and atomicity
this decides whether the design is safe to ship.
batch = unit of state advance. commit runs only after every per-destination send has acked.
strictly sequential per plugin: poll -> send fan-out -> commit -> next poll. the runtime holds one in-flight batch per plugin. pipelining gets added later behind max_in_flight_batches if anyone needs the throughput.
per-destination sends inside a batch run in parallel, bounded by max_concurrent_destinations (default 16). this caps in-flight requests, not wire bandwidth - all sends share one client and one TCP/QUIC writer, so the knob mostly hides latency.
a single failed destination fails the whole batch. state is not persisted, commit is skipped, same rows return on next poll.
state persist happens only after every send acks, and commit runs only after state persist returns. a failed commit leaves state durable but side effects skipped - next poll filters covered rows and re-runs the side effect. crash between send-ack and state persist -> re-poll, no duplicates beyond the in-flight batch. crash between state persist and commit -> next poll sees state covering the batch, plugin filters and replays side effects. duplicates on restart accepted.
commit-failure policy by source mode:
mode
policy
Cdc
retry with exp backoff capped at 30s, never gives up - stopping = WAL backs up on the server. after commit_failure_threshold (default 16) consecutive failures, status flips Running -> Degraded. one successful commit clears the counter and flips back. no hysteresis - flapping is visible in status-change history; alerts fire on entry to Degraded.
Polling
re-poll on next tick (today's behavior). rows stay in the source table, safe by construction.
Opaque (webhook/file)
stop the connector and surface the error. no buffer behind it.
stop during CDC commit-retry: operator stop interrupts the backoff sleep and waits up to commit_stop_grace (default 30s) for the in-flight commit to finish. miss the window -> runtime exits without further commits; restart re-polls from last persisted state, duplicates for the in-flight batch accepted. no force-kill mid-commit in v1.
escape hatch for prolonged outage: POST /sources/{key}/cdc/abandon-batch advances the slot to PendingBatch.last_lsn without sending, clears the pending batch, surfaces last_abandon_at in status. documented data-loss path - the only mechanism that keeps WAL from filling PG storage if Iggy is unreachable for hours. restart cannot do this on its own (state still points at the stuck batch).
CDC requirement: switch pg_logical_slot_get_changes (auto-advances confirmed_flush_lsn) to pg_logical_slot_peek_changes plus explicit pg_replication_slot_advance(slot, last_lsn) from inside commit. PendingBatch carries last_lsn; persisted state carries last_persisted_lsn. on poll, drop rows with lsn <= last_persisted_lsn before forming the batch - closes the window where state is durable but the slot hasn't been advanced yet. requires PG >= 11.
polling requirement: every destructive op moves from poll to commit. that's delete_after_read = true (DELETE) and processed_column (UPDATE ... SET col = TRUE). today both run in mark_or_delete_processed_rows during poll; both move.
duplicates accepted as the cost of correctness. plugins should pick stable ids (row PK hash, LSN+xid) instead of Uuid::v4 so downstream can dedupe. the Iggy server does not dedupe; the runtime treats id as opaque. filtering replays is a downstream consumer's job.
Producer model
sources without routing.* keep current behavior: the runtime builds a bound IggyProducer from stream/topic config and rejects messages that set destination. adding routing.* flips the source to unbound. one mode per source, picked at startup.
one IggyProducer::unbound() per source plugin. direct-only.
destinations resolved via DashMap<(stream, topic), AdmissionRecord>. AdmissionRecord holds a tokio::sync::OnceCell<DestinationMeta> (captures partitions_count during the one-time ensure call, used for partition_id bounds checks) plus circuit-breaker state. DashMap::entry makes get-or-insert atomic within the process.
the ensure call is idempotent: IggyClient::create_stream then create_topic, treating StreamNameAlreadyExists / TopicNameAlreadyExists as success. server-side both errors leave no partial state behind, so concurrent first-touches are safe.
allowlist destinations are pre-warmed at startup, in parallel. individual pre-warm failures log and continue (next send_to retries on the slow path). startup_prewarm_deadline (default 60s) bounds total time; on deadline, un-warmed destinations log warnings and the connector starts anyway. open / denylist modes pay the round-trip on first touch per new destination - documented tail-latency event.
no eviction. max_destinations is a hard cap on live admission records; hitting it = admission failure regardless of mode. allowlist bounds the universe at config time; open mode is "set cap to expected universe; hitting it = config error, not a hot-path event". eviction would need breaker-pinning carveouts and re-touch semantics easy to get wrong.
per-destination circuit breaker. counts send failures, opens after 5 consecutive failures (circuit_breaker.failure_threshold), holds open for 30s (circuit_breaker.cool_down), then half-opens (one probe; success closes, failure reopens for another full cool-down). hitting an open breaker = admission failure with reason circuit_open -> on_admission_failure. does not silently reroute to default_* - those are NULL/missing defaults applied before admission, not a fallback for healthy-but-busy destinations. breaker skips errors whose recursively-unwrapped cause is CannotSendMessagesDueToClientDisconnection (the SDK boxes every send error in ProducerSendFailed { cause, ... }, so this is a structural unwrap, not a variant match) - those hit every destination at once and would trip every breaker on a transient outage. proper transient-vs-permanent classification waits on IggyError::is_retryable (future).
partition_key_encoding maps to Partitioning variants: bytes -> messages_key(&[u8]); key_u32 -> messages_key_u32(u32) (server hashes, client doesn't pre-hash); partition_id -> partition_id(u32) (direct partition selection, no hashing). partition_id requires routing.topic_defaults.partitions_count > 1. admission bounds-checks each value against partitions_count from the cached DestinationMeta and rejects OOB rows as partition_id_out_of_range - counted separately, doesn't feed the breaker, so a bad ID column can't block a healthy topic.
NULL / missing values degrade gracefully: missing partition_key_column -> Partitioning::None (server picks); missing stream_column / topic_column -> default_stream / default_topic. config validation requires default_* whenever the matching *_column is set - otherwise the only alternative is silently dropping every NULL-keyed row.
the mapping is fetched once at startup from the config HTTP server. to reload, bump config.version and restart. hot-reload is future; v1 contract is "mapping is frozen for the connector's lifetime". if the fetch fails (HTTP error, 404, malformed JSON), the connector refuses to start - empty mapping is never an acceptable fallback because Route would silently route every message through default_topic.
one block per source. every auto-created topic inherits. per-allowlist-entry overrides are future.
admission
[plugin_config.routing.admission]
mode = "allowlist"# open | allowlist | denylistmax_destinations = 256# per plugin instance, in-memory, resets on restartmax_concurrent_destinations = 16on_missing_destination = "default"# default | drop | erroron_admission_failure = "error"# drop | error (default per source mode, see below)allowlist = [
{ stream = "commerce", topic = "orders" },
{ stream = "commerce", topic = "*" },
{ stream = "identity", topic = "users" },
]
allowlist grammar is narrow: each entry is { stream: SEG, topic: SEG } where SEG is either a literal identifier ([a-zA-Z0-9._-]+, dots literal) or * as the whole segment. no prefix/suffix globs, no regex. { stream = "*", topic = "*" } is rejected by config validation - use mode = "open" instead, the same thing said deliberately.
denylist uses the same grammar. destinations matching any entry are rejected and follow on_admission_failure; everything else passes. { stream = "*", topic = "*" } rejected here too - it would refuse every message, a config error not a routing decision.
max_destinations is a hard ceiling. hitting it = admission failure regardless of mode. true in open mode too: open means "no allowlist", it does not mean "unbounded".
on_admission_failure default by source mode:
source mode
default
why
CDC
error
WAL already consumed; drop = silent data loss
polling + delete_after_read / processed_column
error
once commit runs the destructive op, drop = silent data loss
polling without destructive ops
drop
rows stay in source table; re-poll catches them
webhook / file / opaque
drop
upstream owns the buffer
Telemetry
hot-path counters stay low-cardinality (connector_key only, no stream/topic labels). per-destination detail is exposed via a pull admin endpoint instead of fanned out across Prometheus labels, so a runaway routing config can't blow up the metrics backend.
iggy_connector_destinations_active{connector_key} (gauge, bounded by max_destinations)
iggy_connector_destinations_rejected_total{connector_key, reason} - reason in {cap, denylist, unknown, circuit_open, partition_id_out_of_range}
iggy_connector_routing_unmatched_total{connector_key, action} - action in {default, drop, error}
iggy_connector_destination_create_latency_seconds (histogram, no destination labels)
iggy_connector_destination_circuit_open{connector_key} (gauge, count of open breakers)
GET /connectors/{key}/destinations returns the live admission table: stream, topic, breaker state, last error, message counts. ops who need per-destination detail look there instead of in Prometheus.
What this design is not
not a generic SMT framework; Route is the only routing-aware transform.
not a dynamic topic factory - cap + allowlist + circuit breaker bound destination growth and keep it observable.
not a full fix for the broken Postgres CDC backend, but includes the smallest unblock: today's slot uses 'pgoutput' (binary) but the parser expects INSERT: / UPDATE: / DELETE: prefixes that only test_decoding emits, so it never matches. the routing release switches the slot to 'test_decoding' (one-line change) so CDC users can exercise routing. proper binary pgoutput decode lands behind cdc_pg_replicate later.
not hot-reload of Route mapping; restart-connector is the v1 reload mechanism.
not cross-destination ordering within a batch; parallel send_to calls ack out of order. operators who need cross-topic causality opt in with max_concurrent_destinations = 1.
not background-dispatcher friendly; routed producers are direct-only.
not per-destination schemas; one encoder per source plugin. revisit if anyone hits it.
not sink-side fan-out (one sink writing to N tables/indices); separate design.
Future, not in v1
deliberately out of scope:
pipelined poll -> commit (max_in_flight_batches)
hot-reload of Route mapping with epoch tag on routed messages
bounded per-destination Prometheus labels under allowlist mode
sink-side multi-destination dispatch (mirror of Route for sinks)
per-allowlist-entry topic-defaults overrides
background dispatch for routed producers (needs explicit ack callbacks)
Verification
end-to-end checks before merging:
existing single-stream sources still work unchanged (random_source, influxdb_source without routing config).
Postgres column routing: configure topic_column, populate with three event types, assert three topics receive correctly.
Postgres CDC routing: switch slot to test_decoding, insert into two tables, assert by_table routing lands rows and confirmed_flush_lsn advances only on commit.
Route transform: webhook source feeding {"event": {"type": "..."}}, mapping resolves to three topics, default catches unknown.
crash recovery: kill the runtime between send-ack and commit, restart, assert no data loss (duplicates expected for CDC and destructive-polling).
atomic state file: kill the runtime mid FileStateProvider::save (between truncate and fsync), restart, assert state is the previous full version (tmp+rename leaves either old or new, never empty).
ABI mismatch: load an .so built with old SDK, assert dlopen returns a typed IncompatibleAbi error before any FFI call - no postcard decode attempt.
abandon-batch: black-hole Iggy, drive a CDC connector into Degraded, hit /cdc/abandon-batch, assert slot advances to PendingBatch.last_lsn, status returns to Running, last_abandon_at is set, next poll proceeds.
partition_id OOB: configure partition_id encoding + partitions_count = 4, feed a row with id = 99, assert admission rejects with partition_id_out_of_range, breaker count is unchanged, other rows in the batch still send.
circuit breaker: black-hole one destination, assert N consecutive failures trip the breaker, traffic to other destinations is unaffected, breaker closes after cool-down.
background-dispatcher refusal: try to build an unbound producer with SendMode::Background, assert builder errors at build time.
config validation: source with both routing.* and a Route transform, assert init fails before start.
CDC commit Degraded flip: black-hole Iggy during commit, assert retries with capped backoff, flips to Degraded after commit_failure_threshold, never silently sits as Running.
cap-hit path: max_destinations = 2, feed three distinct destinations, assert the third routes through on_admission_failure regardless of admission mode.
state version mismatch: write a state file with bumped version, restart, assert Source::init returns the typed error and status reports Error with version-seen-vs-expected - not silent reset.
Route mapping fetch failure: point at config HTTP endpoint returning 500 (or 404, or malformed JSON), assert startup fails with upstream error surfaced - connector does not boot with empty mapping.
partition_key NULL fallback: configure partition_key_column, feed a row with NULL, assert Partitioning::None and no error counter increment.
Degraded visibility: drive a CDC connector past commit_failure_threshold, assert status API returns Degraded (not Running, not Error) and the alertable metric reflects it.
reacted with thumbs up emoji reacted with thumbs down emoji reacted with laugh emoji reacted with hooray emoji reacted with confused emoji reacted with heart emoji reacted with rocket emoji reacted with eyes emoji
Uh oh!
There was an error while loading. Please reload this page.
-
Problem
Source connectors today write to one stream and one topic, picked at startup.
Say you have a Postgres table with 50 event types and you want each type in its own Iggy topic. today you either split into 50 tables (bad schema, no one uses databases like this) or run 50 connector copies. the 50-copy version means 50x DB load, 50 configs, 50 state files; for CDC, 50 replication slots, each pinning WAL until its
restart_lsnadvances - one stuck consumer pins WAL for the whole cluster.Using iggy connectors should not force its users to bad schema or design choices.
The row should say where it goes; the connector should send it there.
Proposal TL;DR
Messages carry
destination: Option<Destination>; runtime groups a batch by(stream, topic)and dispatches each group via one unboundIggyProducer::send_to. Postgres source derives destination from a column or table name; any source can use the newRoutetransform to map a payload field through a static lookup.Sourcegains requiredcommit()(runs after atomic state persist; owns slot-advance, deferred DELETE) andmode()(Cdc | Polling { destructive } | Opaque, drives commit-failure policy). Destination growth bounded bymax_destinationscap, allowlist, and per-destination circuit breaker. Bundled: fix producer task leak on stream switch, atomicFileStateProvider::save(tmp+rename+fsync), switch Postgres CDC slot frompgoutputtotest_decodingso the existing parser actually matches.Scope of changes
two changes that make ABI break - we're under 1.0.0 no migrators, etc. v1 favors simple-and-correct over flexible. things deliberately left for later live in Future so we don't accidentally close those doors.
Step 1 - messages carry their own destination
ProducedMessageandDecodedMessagegetdestination: Option<Destination>whereDestination { stream: Identifier, topic: Identifier, partition_key: Option<Partitioning> }.partition_key = None= balanced (server picks).(stream, topic)and sends each group through one shared unboundIggyProducer.partition_keyrides through tosend_to.IggyProducer::send_to(stream, topic, messages, partitioning)already takes them per call, so the producer doesn't need to be bound.fits CDC and any source that already knows where each row belongs (table name, tenant ID, event-type column). the Postgres source gains config keys for column-derived destinations, per-table mapping, and a flag to strip routing columns from the payload.
bonus: kills a real bug. configuring several streams for one source builds one producer per stream and keeps the last; each discarded producer leaves a spawned task at
producer.rs:153that only exits onDiagnosticEvent::Shutdownfrom the underlying client. routing makes this impossible by design (one producer per source plugin), but the orphan leak should still be fixed independently with a cancellation token onProducerCore- it bites any SDK user, not just connectors.Step 2 - a routing transform
Routetransform reads a field from the payload, looks the value up in a static mapping, and writes the destination onto the message (same field Step 1 added). the mapping is served by the existing config HTTP server (one new endpoint, same auth).Routereplaces the wholedestinationatomically. mixing plugin-setpartition_keywith Route-set topic isn't allowed: different topic = different partition count = different key->partition mapping. one wins, all-or-nothing.Routealways overwrites if it matches; no precedence option. a source configured with both anyrouting.*key and aRoutetransform fails init - explicit refusal at startup.fits HTTP webhook sources, file tailers, bridges from other messaging systems - anything where the plugin dumps opaque data and the operator owns routing.
topic_fielduses dotted-path syntax over JSON objects:a.b.cwalks Object -> Object -> leaf. no arrays, no key names containing.. new grammar, scoped toRouteonly; the existingfilter_fieldstransform stays top-level-key-only.dispatch order:
Routemay overwrite.on_missing_destination.non-Route transforms mutate
DecodedMessagein place today, so they preservedestinationfor free. only test helpers and trait docs need updating.SDK changes (ABI break)
Step 1 cannot land on the current SDK; batched into the same release:
IggyProducer::unbound()constructor + builder. todayIggyProducerBuilder::newrequires stream/topic up front andProducerCore::initcreates them. unbound skips that and exposes onlysend_to; callingsend()errors. the runtime enforces the split at startup: bound producer rejects messages with adestination, unbound rejects messages without one.unbound()is direct-only. builder refusesSendMode::Background- background ack means "queued into the linger buffer", not "server acked", and the commit story below needs real acks. losing the distinction silently breaks CDC. background batching for routed sources is a future design with explicit ack callbacks.unbound()forcessend_retries_count = None. retries live in the per-destination circuit breaker.Sourcetrait gains two required methods, no defaults:async fn commit(&self) -> Result<(), Error>- runs after the runtime has atomically persisted state (tmp + rename + parent-dir fsync; today'sFileStateProvider::savetruncates-then-writes, leaving the file zero-byte on a mid-write crash - fix bundled in this release). plugin drainsMutex<Option<PendingBatch>>and runs side effects (slot-advance, deferred DELETE). ifcommitis skipped or fails, the nextpollfilters rows already covered by persisted state (CDC: drop rows wherelsn <= last_persisted_lsn, then re-run slot-advance). required, no default impl - silent no-op corrupts state. nostatearg:PendingBatchis the single typed home.fn mode(&self) -> SourceMode- returnsCdc | Polling { destructive: bool } | Opaque. driveson_admission_failuredefault and commit-failure policy. no default - plugin author picks at code time, not config time. runtime cross-checks at startup:Polling { destructive: false }withdelete_after_read = trueorprocessed_columnset = init fails. config can't escalate non-destructive into destructive; the reverse (overdeclared destructive) is allowed.ProducedMessageandDecodedMessagegaindestination: Option<Destination>. sink-sideConsumedMessageunchanged.ConnectorStatestayspub Vec<u8>. the plugin puts a state-shape version as the first field of its own envelope. version mismatch -> typed error fromSource::init-> connector refuses to start; operator clears the state file deliberately. silent reset is opt-in behind an explicit plugin flag (off by default). the runtime never interprets state contents.ConnectorStatusgainsDegraded, slotted betweenRunningandError. today's enum (Starting | Running | Stopping | Stopped | Error) treatsErroras terminal and can't carry "still retrying, not progressing" which the CDC commit-retry loop needs.process_messagesreturnsVec<(DestinationKey, IggyMessage)>instead ofVec<IggyMessage>.source_forwarding_loopreplaces the singleproducer.send(...)with a loop that issues onesend_toper destination group, waits for all to finish, persists state atomically, then callscommit.commit: Ok(())+ correctmode()impls in the same release. plugins ship as independent.sofiles - the runtime gatesdlopenoniggy_source_abi_version() -> u32matching its compiled-in constant and refuses load on mismatch with a typed error. without this gate, postcard would silently misdecodeProducedMessage(newdestinationfield is non-self-describing wire-format change) and every batch from an old.sowould drop.State and atomicity
this decides whether the design is safe to ship.
commitruns only after every per-destination send has acked.poll -> send fan-out -> commit -> next poll. the runtime holds one in-flight batch per plugin. pipelining gets added later behindmax_in_flight_batchesif anyone needs the throughput.max_concurrent_destinations(default 16). this caps in-flight requests, not wire bandwidth - all sends share one client and one TCP/QUIC writer, so the knob mostly hides latency.commitis skipped, same rows return on next poll.commitruns only after state persist returns. a failedcommitleaves state durable but side effects skipped - nextpollfilters covered rows and re-runs the side effect. crash between send-ack and state persist -> re-poll, no duplicates beyond the in-flight batch. crash between state persist andcommit-> nextpollsees state covering the batch, plugin filters and replays side effects. duplicates on restart accepted.commit-failure policy by source mode:
Cdccommit_failure_threshold(default 16) consecutive failures, status flipsRunning -> Degraded. one successfulcommitclears the counter and flips back. no hysteresis - flapping is visible in status-change history; alerts fire on entry toDegraded.PollingOpaque(webhook/file)stop during CDC commit-retry: operator stop interrupts the backoff sleep and waits up to
commit_stop_grace(default 30s) for the in-flightcommitto finish. miss the window -> runtime exits without further commits; restart re-polls from last persisted state, duplicates for the in-flight batch accepted. no force-kill mid-commitin v1.escape hatch for prolonged outage:
POST /sources/{key}/cdc/abandon-batchadvances the slot toPendingBatch.last_lsnwithout sending, clears the pending batch, surfaceslast_abandon_atin status. documented data-loss path - the only mechanism that keeps WAL from filling PG storage if Iggy is unreachable for hours. restart cannot do this on its own (state still points at the stuck batch).CDC requirement: switch
pg_logical_slot_get_changes(auto-advancesconfirmed_flush_lsn) topg_logical_slot_peek_changesplus explicitpg_replication_slot_advance(slot, last_lsn)from insidecommit.PendingBatchcarrieslast_lsn; persisted state carrieslast_persisted_lsn. onpoll, drop rows withlsn <= last_persisted_lsnbefore forming the batch - closes the window where state is durable but the slot hasn't been advanced yet. requires PG >= 11.polling requirement: every destructive op moves from
polltocommit. that'sdelete_after_read = true(DELETE) andprocessed_column(UPDATE ... SET col = TRUE). today both run inmark_or_delete_processed_rowsduring poll; both move.duplicates accepted as the cost of correctness. plugins should pick stable
ids (row PK hash, LSN+xid) instead ofUuid::v4so downstream can dedupe. the Iggy server does not dedupe; the runtime treatsidas opaque. filtering replays is a downstream consumer's job.Producer model
routing.*keep current behavior: the runtime builds a boundIggyProducerfromstream/topicconfig and rejects messages that setdestination. addingrouting.*flips the source to unbound. one mode per source, picked at startup.IggyProducer::unbound()per source plugin. direct-only.DashMap<(stream, topic), AdmissionRecord>.AdmissionRecordholds atokio::sync::OnceCell<DestinationMeta>(capturespartitions_countduring the one-time ensure call, used forpartition_idbounds checks) plus circuit-breaker state.DashMap::entrymakes get-or-insert atomic within the process.IggyClient::create_streamthencreate_topic, treatingStreamNameAlreadyExists/TopicNameAlreadyExistsas success. server-side both errors leave no partial state behind, so concurrent first-touches are safe.send_toretries on the slow path).startup_prewarm_deadline(default 60s) bounds total time; on deadline, un-warmed destinations log warnings and the connector starts anyway. open / denylist modes pay the round-trip on first touch per new destination - documented tail-latency event.max_destinationsis a hard cap on live admission records; hitting it = admission failure regardless ofmode. allowlist bounds the universe at config time; open mode is "set cap to expected universe; hitting it = config error, not a hot-path event". eviction would need breaker-pinning carveouts and re-touch semantics easy to get wrong.circuit_breaker.failure_threshold), holds open for 30s (circuit_breaker.cool_down), then half-opens (one probe; success closes, failure reopens for another full cool-down). hitting an open breaker = admission failure with reasoncircuit_open->on_admission_failure. does not silently reroute todefault_*- those are NULL/missing defaults applied before admission, not a fallback for healthy-but-busy destinations. breaker skips errors whose recursively-unwrapped cause isCannotSendMessagesDueToClientDisconnection(the SDK boxes every send error inProducerSendFailed { cause, ... }, so this is a structural unwrap, not a variant match) - those hit every destination at once and would trip every breaker on a transient outage. proper transient-vs-permanent classification waits onIggyError::is_retryable(future).Config
Postgres - destination from columns
partition_key_encodingmaps toPartitioningvariants:bytes -> messages_key(&[u8]);key_u32 -> messages_key_u32(u32)(server hashes, client doesn't pre-hash);partition_id -> partition_id(u32)(direct partition selection, no hashing).partition_idrequiresrouting.topic_defaults.partitions_count > 1. admission bounds-checks each value againstpartitions_countfrom the cachedDestinationMetaand rejects OOB rows aspartition_id_out_of_range- counted separately, doesn't feed the breaker, so a bad ID column can't block a healthy topic.NULL / missing values degrade gracefully: missing
partition_key_column->Partitioning::None(server picks); missingstream_column/topic_column->default_stream/default_topic. config validation requiresdefault_*whenever the matching*_columnis set - otherwise the only alternative is silently dropping every NULL-keyed row.Postgres - destination from table name (CDC)
any source - routing transform
the mapping is fetched once at startup from the config HTTP server. to reload, bump
config.versionand restart. hot-reload is future; v1 contract is "mapping is frozen for the connector's lifetime". if the fetch fails (HTTP error, 404, malformed JSON), the connector refuses to start - empty mapping is never an acceptable fallback because Route would silently route every message throughdefault_topic.topic defaults (auto-created destinations)
one block per source. every auto-created topic inherits. per-allowlist-entry overrides are future.
admission
allowlist grammar is narrow: each entry is
{ stream: SEG, topic: SEG }whereSEGis either a literal identifier ([a-zA-Z0-9._-]+, dots literal) or*as the whole segment. no prefix/suffix globs, no regex.{ stream = "*", topic = "*" }is rejected by config validation - usemode = "open"instead, the same thing said deliberately.denylist uses the same grammar. destinations matching any entry are rejected and follow
on_admission_failure; everything else passes.{ stream = "*", topic = "*" }rejected here too - it would refuse every message, a config error not a routing decision.max_destinationsis a hard ceiling. hitting it = admission failure regardless of mode. true inopenmode too: open means "no allowlist", it does not mean "unbounded".on_admission_failuredefault by source mode:errordelete_after_read/processed_columnerrorcommitruns the destructive op, drop = silent data lossdropdropTelemetry
hot-path counters stay low-cardinality (
connector_keyonly, no stream/topic labels). per-destination detail is exposed via a pull admin endpoint instead of fanned out across Prometheus labels, so a runaway routing config can't blow up the metrics backend.iggy_connector_messages_routed_total{connector_key}iggy_connector_destinations_active{connector_key}(gauge, bounded bymax_destinations)iggy_connector_destinations_rejected_total{connector_key, reason}- reason in {cap,denylist,unknown,circuit_open,partition_id_out_of_range}iggy_connector_routing_unmatched_total{connector_key, action}- action in {default,drop,error}iggy_connector_destination_create_latency_seconds(histogram, no destination labels)iggy_connector_destination_circuit_open{connector_key}(gauge, count of open breakers)GET /connectors/{key}/destinationsreturns the live admission table: stream, topic, breaker state, last error, message counts. ops who need per-destination detail look there instead of in Prometheus.What this design is not
Routeis the only routing-aware transform.'pgoutput'(binary) but the parser expectsINSERT:/UPDATE:/DELETE:prefixes that onlytest_decodingemits, so it never matches. the routing release switches the slot to'test_decoding'(one-line change) so CDC users can exercise routing. proper binarypgoutputdecode lands behindcdc_pg_replicatelater.Routemapping; restart-connector is the v1 reload mechanism.send_tocalls ack out of order. operators who need cross-topic causality opt in withmax_concurrent_destinations = 1.Future, not in v1
deliberately out of scope:
poll -> commit(max_in_flight_batches)Routemapping with epoch tag on routed messagesIggyError::is_retryable)pgoutputdecode for the CDC backendRoutefor sinks)Verification
end-to-end checks before merging:
topic_column, populate with three event types, assert three topics receive correctly.test_decoding, insert into two tables, assertby_tablerouting lands rows andconfirmed_flush_lsnadvances only oncommit.{"event": {"type": "..."}}, mapping resolves to three topics, default catches unknown.commit, restart, assert no data loss (duplicates expected for CDC and destructive-polling).FileStateProvider::save(between truncate and fsync), restart, assert state is the previous full version (tmp+rename leaves either old or new, never empty)..sobuilt with old SDK, assertdlopenreturns a typedIncompatibleAbierror before any FFI call - no postcard decode attempt.Degraded, hit/cdc/abandon-batch, assert slot advances toPendingBatch.last_lsn, status returns toRunning,last_abandon_atis set, next poll proceeds.partition_idencoding +partitions_count = 4, feed a row with id = 99, assert admission rejects withpartition_id_out_of_range, breaker count is unchanged, other rows in the batch still send.on_admission_failuredrives drop-vs-error correctly.SendMode::Background, assert builder errors at build time.routing.*and aRoutetransform, assert init fails before start.Degradedaftercommit_failure_threshold, never silently sits asRunning.max_destinations = 2, feed three distinct destinations, assert the third routes throughon_admission_failureregardless of admissionmode.Source::initreturns the typed error and status reportsErrorwith version-seen-vs-expected - not silent reset.partition_key_column, feed a row with NULL, assertPartitioning::Noneand no error counter increment.commit_failure_threshold, assert status API returnsDegraded(notRunning, notError) and the alertable metric reflects it.Reference discussion:
https://discord.com/channels/1144142576266530928/1423706109402808504/1507212593260134530
Beta Was this translation helpful? Give feedback.
All reactions