Status: Canon Version: 1.0.0 Context: The Physics of the Engine
In Wire, the atomic unit of processing is the Event.
An Event is:
- Immutable: Once created, it cannot be changed.
- Timestamped: Every event carries an explicit
EventTime(int64). - Keyed (Optional): Events may have a Partition Key which determines their routing.
- Ordered: Events within a specific Key Range are guaranteed to be processed in order.
- Bounded or Unbounded: The engine treats all data as potentially infinite. Bounded data (files) is just a special case of a stream that closes.
Wire strictly separates the "When it happened" from "When we saw it".
- Event Time: The timestamp embedded in the record (e.g.,
click_time). This determines results. - Processing Time: The wall clock of the worker. Used only for timeouts or metrics, never for correctness.
A Watermark(T) is a control packet flowing through the stream that declares:
"No more events with timestamp < T will arrive."
- Generation: Source Connectors generate watermarks based on observed data (monotonically increasing).
- Propagation:
- Operators forward the minimum watermark received from all upstream inputs.
OutputWatermark = Min(InputWatermark_1, InputWatermark_2, ...)
- Function: Watermarks trigger Window Calculations and expire timers.
If an event arrives with Timestamp < CurrentWatermark:
- Default: The event is dropped (or sent to a side-output "Dead Letter Queue").
- Allowed Lateness: Users can configure a grace period where late events trigger a window re-computation/update.
Windowing assigns events to finite temporal buckets.
- Tumbling Windows: Fixed size, non-overlapping (e.g., "Every 5 minutes").
- Sliding Windows: Fixed size, overlapping (e.g., "Every 1 minute, look back 5 minutes").
- Session Windows: Dynamic size, gap-based (e.g., "User activity until 30m idle").
- Window State: State is scoped to
(Key, WindowID). - Cleanup: When
Watermark > Window_End + Lateness, the window state is automatically purged from Pebble.
Wire utilizes a combination of Yamux Flow Control and Go Channels to manage load.
- Yamux Streams: Inter-node communication happens over Yamux streams. If the receiver is slow, the Yamux window closes, pausing the sender.
- Bounded Channels: Intra-node communication uses bounded Go channels.
- Propagation: If a Sink is slow:
- The Sink's input buffer fills.
- The upstream Operator blocks on write (channel full).
- Yamux stops reading from the network for that stream.
- ... This propagates recursively to the Source.
- Source Behavior: When blocked, the Source stops reading from external systems (e.g., stops accepting events from external systems).
Guarantee: No unlimited buffering. No silent data drops under load.
Wire guarantees consistency using the Asynchronous Barrier Snapshot (ABS) algorithm (a variant of Chandy-Lamport).
A Checkpoint Barrier (ID=N) is a control record injected by the Coordinator into all Source streams.
- It flows strictly linearly with data.
- It divides the stream into "Epoch N" (pre-barrier) and "Epoch N+1" (post-barrier).
For operators with multiple inputs (e.g., CoProcess, Join, KeyBy):
- Wait: When Barrier N arrives on Input A, the operator stops processing Input A.
- Buffer: Records arriving on Input A (belonging to Epoch N+1) are buffered.
- Process: The operator continues processing Input B until Barrier N arrives on Input B.
- Snapshot: Once ALL inputs have received Barrier N, the operator triggers a state snapshot.
- Forward: The operator emits Barrier N downstream and unblocks Input A.
This alignment ensures the snapshot captures exactly the state of "All events <= Barrier N".
- Trigger: Coordinator sends
TriggerCheckpoint(N)to Sources. - Local Snapshot: Each operator creates an async Pebble Checkpoint (hard link) of its local state.
- Persist: Background workers replicate the checkpoint data to the durable store (replicated PebbleDB on peer nodes).
- Acknowledge: Workers notify the Coordinator.
- Complete: When all tasks ACK Checkpoint N, it is marked "Global Complete".
- Coordinator monitors heartbeats from Workers.
- If a worker is lost, the Job is marked
FAILING.
- Cancel: All running tasks are cancelled.
- Restore: The Coordinator selects the latest Completed Checkpoint (N).
- Reschedule: Tasks are redeployed.
- State Load: Tasks download their specific state shard for Checkpoint N from storage into Pebble.
- Rewind: Sources reset their read offsets to those recorded in Checkpoint N.
- Resume: Processing restarts from Epoch N+1.
- Internal State: Always Exactly-Once (due to rollback).
- Sink Output:
- Idempotent Sinks (KV Store): Naturally Exactly-Once.
- Transactional Sinks: Require "Two-Phase Commit" tied to the Checkpoint completion mechanism.
- Standard Sinks: At-Least-Once (may see duplicates after replay).