Skip to content

Conversation

@lutter
Copy link
Collaborator

@lutter lutter commented Jan 24, 2026

No idea if this is any good. This is based on the spec in docs/specs/runner-refactor.md. It took 7 loop iterations to arrive at this code.

lutter and others added 16 commits January 23, 2026 16:42
Introduces architecture spec for simplifying core/src/subgraph/runner.rs:
- Enum-based FSM for full runner lifecycle
- Pipeline stages for block processing
- TriggerRunner component to eliminate code duplication
- Checkpoint-based state management for rollback

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Add a separate timeout for database setup operations (migrations, schema
creation, FDW configuration) which can legitimately take longer than
normal runtime operations. This fixes timeout issues during runner tests
when using the default 5s GRAPH_STORE_CONNECTION_TIMEOUT.

The new GRAPH_STORE_SETUP_TIMEOUT defaults to 30s and is used only during
database initialization via the new get_for_setup() method, preserving
the fast-fail behavior (5s) for production runtime operations.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Add OutputConfig module to capture verbose runner test output to
tests/runner-tests.log when running locally, while keeping console
output in CI (detected via GITHUB_ACTIONS env var).

- Add slog-async and slog-term workspace dependencies
- Create tests/src/output.rs with OutputConfig for CI-aware output
- Update run_cmd() to write to log file instead of stdout
- Update test_logger() to use file-based slog drain locally
- Add runner-tests.log to .gitignore

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
… loops

This commit introduces TriggerRunner, a new component that encapsulates the
common trigger execution logic that was previously duplicated in process_block.

Changes:
- Create runner/ directory module structure (runner/mod.rs, runner/trigger_runner.rs)
- Add TriggerRunner struct that handles executing pre-matched triggers
- Replace the first trigger processing loop (initial block triggers) with
  TriggerRunner::execute()
- Replace the second trigger processing loop (dynamic data source triggers)
  with TriggerRunner::execute()

This is Phase 1 of the runner refactor plan, which aims to simplify the
subgraph runner's complexity by extracting reusable components.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Introduce the state machine types for the runner's lifecycle:

- RunnerState enum with variants for each lifecycle phase:
  Initializing, AwaitingBlock, ProcessingBlock, Reverting,
  Restarting, and Stopped
- RestartReason enum capturing why block stream needs restart
- StopReason enum capturing terminal states

This is Phase 2 of the runner refactor. The types are defined but not
yet used - Phase 3 will refactor run_inner to use the state machine.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Extract helper methods from run_inner to simplify the control flow:
- initialize(): Handles unfailing deterministic errors and max end block check
- await_block(): Gets next stream event and processes it
- check_cancellation(): Handles the cancel/unassign check logic
- restart(): Restarts the store and block stream
- finalize(): Flushes store on shutdown

The main run_inner loop is now flat with a clear structure:
1. Initialize (may stop early)
2. Start block stream
3. Event loop: await_block -> match action -> continue/stop/restart

This eliminates the nested loops that made the original code hard to follow.

Add StopBlockReached and BreakOnRestart variants to StopReason for
completeness.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Break the monolithic process_block method into explicit pipeline stages:

1. match_triggers: Matches block triggers to handlers
2. execute_triggers: Processes matched triggers via TriggerRunner
3. process_dynamic_data_sources: Handles newly created data sources
4. process_offchain_triggers: Handles offchain data source triggers
5. persist_block: Commits all changes to the store

This refactor:
- Eliminates the duplicated trigger processing loop (now consolidated
  in process_dynamic_data_sources using execute_triggers)
- Makes the block processing flow explicit and sequential
- Adds pipeline.rs with scaffolding types for future phases
- Preserves all existing behavior

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Add unified error classification system to replace scattered error handling:

- Add ProcessingErrorKind enum with Deterministic, NonDeterministic, and
  PossibleReorg variants
- Add PossibleReorg variant to ProcessingError for clean reorg handling
- Add kind(), should_restart(), should_stop_processing(), is_retryable()
  helper methods for consistent error classification
- Add MappingErrorHelper trait for converting MappingError to ProcessingError
- Add demote_possible_reorg() helper for cases where clean restart is not
  possible (after create_dynamic_data_sources)

Error handling invariants are now documented in error.rs:
- Deterministic: Stop processing, persist PoI only, fail subgraph
- NonDeterministic: Retry with exponential backoff
- PossibleReorg: Restart block stream cleanly (don't persist)

Refactored process_block, process_dynamic_data_sources, and
handle_offchain_triggers to use the new error classification system
while preserving existing behavior.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Add BlockStateCheckpoint struct and checkpoint()/restore() methods to
BlockState to enable rollback scenarios during block processing.

The checkpoint captures the lengths of data source vectors so that
state can be restored to a previous point (e.g., before dynamic data
source processing). This lays the groundwork for properly handling
PossibleReorg errors during dynamic DS creation, though the actual
rollback logic is deferred for future improvements.

Currently, a checkpoint is taken before dynamic DS processing but not
yet used for actual rollback - this documents the checkpoint point and
prepares for future enhancements (see b21fa73b-6453-4340-99fb-1a78ec62efb1).

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Phase 7 (Module Organization) was already completed during the natural
evolution of earlier phases - the runner/ directory structure with
state.rs, pipeline.rs, and trigger_runner.rs was established as part
of Phase 1-4 work.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants