Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,8 @@ serde_json = { version = "1.0", features = ["arbitrary_precision"] }
serde_regex = "1.1.0"
serde_yaml = "0.9.21"
slog = { version = "2.8.2", features = ["release_max_level_trace", "max_level_trace"] }
slog-async = "2.5.0"
slog-term = "2.7.0"
sqlparser = { version = "0.60.0", features = ["visitor"] }
strum = { version = "0.27", features = ["derive"] }
syn = { version = "2.0.114", features = ["full"] }
Expand Down
116 changes: 116 additions & 0 deletions core/src/subgraph/error.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,20 @@
//! Error types for subgraph processing.
//!
//! This module provides error classification for block processing, enabling
//! consistent error handling across the runner. See [`ProcessingErrorKind`]
//! for the error classification system and its invariants.
//!
//! # Error Handling Invariants
//!
//! These invariants MUST be preserved throughout the codebase:
//!
//! | Error Kind | Response |
//! |-----------------|---------------------------------------------------|
//! | Deterministic | Stop processing, persist PoI only, fail subgraph |
//! | NonDeterministic| Retry with exponential backoff |
//! | PossibleReorg | Restart block stream cleanly (don't persist) |

use graph::components::subgraph::MappingError;
use graph::data::subgraph::schema::SubgraphError;
use graph::env::ENV_VARS;
use graph::prelude::{anyhow, thiserror, Error, StoreError};
Expand Down Expand Up @@ -25,13 +42,87 @@ pub enum ProcessingError {

#[error("subgraph stopped while processing triggers")]
Canceled,

/// A possible reorg was detected during processing.
///
/// This indicates that the data being processed may be inconsistent due to a
/// blockchain reorganization. The correct response is to restart the block stream
/// cleanly without persisting any changes, allowing it to detect and handle the reorg.
#[error("possible reorg detected: {0:#}")]
PossibleReorg(Error),
}

/// Classification of processing errors for determining the appropriate response.
///
/// # Error Handling Invariants
///
/// The following invariants MUST be preserved when handling errors:
///
/// - **Deterministic**: Stop processing the current block, persist only the PoI entity,
/// and fail the subgraph. These errors are reproducible and indicate a bug in the
/// subgraph mapping or an invalid state.
///
/// - **NonDeterministic**: Retry with exponential backoff. These errors are transient
/// (network issues, temporary unavailability) and may succeed on retry.
///
/// - **PossibleReorg**: Restart the block stream cleanly without persisting any changes.
/// The block stream will detect the reorg and provide the correct blocks to process.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ProcessingErrorKind {
/// Stop processing, persist PoI only, fail the subgraph
Deterministic,
/// Retry with backoff, attempt to unfail
NonDeterministic,
/// Restart block stream cleanly (don't persist)
PossibleReorg,
}

impl ProcessingError {
pub fn is_deterministic(&self) -> bool {
matches!(self, ProcessingError::Deterministic(_))
}

/// Classify this error to determine the appropriate response.
///
/// See [`ProcessingErrorKind`] for the semantics of each classification.
pub fn kind(&self) -> ProcessingErrorKind {
match self {
ProcessingError::Deterministic(_) => ProcessingErrorKind::Deterministic,
ProcessingError::Unknown(_) => ProcessingErrorKind::NonDeterministic,
ProcessingError::PossibleReorg(_) => ProcessingErrorKind::PossibleReorg,
// Canceled is treated as non-deterministic for classification purposes,
// but it's typically handled specially (clean shutdown).
ProcessingError::Canceled => ProcessingErrorKind::NonDeterministic,
}
}

/// Whether this error should stop processing the current block.
///
/// Returns `true` for deterministic errors, which indicate a bug in the
/// subgraph mapping that will reproduce on retry.
#[allow(dead_code)] // Part of public error classification API
pub fn should_stop_processing(&self) -> bool {
matches!(self.kind(), ProcessingErrorKind::Deterministic)
}

/// Whether this error requires a clean restart of the block stream.
///
/// Returns `true` for possible reorg errors, where we need to restart
/// without persisting any changes so the block stream can detect and
/// handle the reorganization.
pub fn should_restart(&self) -> bool {
matches!(self.kind(), ProcessingErrorKind::PossibleReorg)
}

/// Whether this error is retryable with backoff.
///
/// Returns `true` for non-deterministic errors, which are transient
/// and may succeed on retry (network issues, temporary unavailability).
#[allow(dead_code)] // Part of public error classification API
pub fn is_retryable(&self) -> bool {
matches!(self.kind(), ProcessingErrorKind::NonDeterministic)
}

pub fn detail(self, ctx: &str) -> ProcessingError {
match self {
ProcessingError::Unknown(e) => {
Expand All @@ -41,6 +132,9 @@ impl ProcessingError {
ProcessingError::Deterministic(e) => {
ProcessingError::Deterministic(Box::new(anyhow!("{e}").context(ctx.to_string())))
}
ProcessingError::PossibleReorg(e) => {
ProcessingError::PossibleReorg(e.context(ctx.to_string()))
}
ProcessingError::Canceled => ProcessingError::Canceled,
}
}
Expand Down Expand Up @@ -96,3 +190,25 @@ impl<T> ClassifyErrorHelper<T, StoreError> for Result<T, StoreError> {
})
}
}

impl From<MappingError> for ProcessingError {
fn from(e: MappingError) -> Self {
match e {
MappingError::PossibleReorg(e) => ProcessingError::PossibleReorg(e),
MappingError::Unknown(e) => ProcessingError::Unknown(e),
}
}
}

/// Helper trait for converting `MappingError` results to `ProcessingError` results.
///
/// This preserves the `PossibleReorg` variant for proper error handling.
pub(crate) trait MappingErrorHelper<T> {
fn into_processing_error(self) -> Result<T, ProcessingError>;
}

impl<T> MappingErrorHelper<T> for Result<T, MappingError> {
fn into_processing_error(self) -> Result<T, ProcessingError> {
self.map_err(ProcessingError::from)
}
}
Loading