Skip to content

Migrate services from polling to event-driven communication #732

@vfusco

Description

@vfusco

Description

This issue covers the migration of existing services from the current polling-based approach to the event-driven architecture using the library implemented in the issue #731 .

Current State

Services currently run polling loops that periodically query the database:

  • EvmReader: Polls L1 for inputs and epoch events, writes to DB
  • Advancer: Polls for CLOSED epochs and unprocessed inputs
  • Validator: Polls for INPUTS_PROCESSED epochs
  • Claimer: Polls for CLAIM_CALCULATED epochs
  • PRT: Polls for CLAIM_CALCULATED epochs and dispute events

Target State

Services will:

  1. Subscribe to relevant events on startup
  2. React to events as they arrive
  3. Perform periodic sync as fallback (hybrid pattern)
  4. Publish events when they cause state transitions

Tasks

EvmReader

  • Publish epoch.opened when creating a new epoch
  • Publish epoch.closed when closing an epoch
  • Publish input.received when storing new inputs
  • Publish app.registered when registering new applications

Advancer

  • Subscribe to epoch.closed and input.received events
  • Replace main polling loop with event-driven processing
  • Keep periodic sync for catch-up on startup and missed events
  • Publish epoch.inputs_processed when completing epoch processing

Validator

  • Subscribe to epoch.inputs_processed events
  • Replace polling loop with event-driven processing
  • Keep periodic sync for resilience
  • Publish epoch.claim_calculated when proof is ready

Claimer

  • Subscribe to epoch.claim_calculated events
  • Replace polling loop with event-driven processing
  • Keep periodic sync for resilience
  • Publish epoch.claim_submitted when submitting to L1
  • Publish epoch.claim_accepted or epoch.claim_rejected based on L1 result
  • Publish app.inoperable when proof is rejected (quorum)

PRT

  • Subscribe to epoch.claim_calculated events
  • Replace polling loop with event-driven processing
  • Keep periodic sync for resilience
  • Publish epoch.claim_submitted, epoch.claim_accepted, epoch.claim_rejected
  • Publish app.inoperable when proof is rejected

General

  • Update service initialization to create event bus
  • Ensure graceful shutdown closes event subscriptions
  • Add metrics for event processing latency
  • Update existing tests to account for event publishing

Hybrid Pattern

Each service should implement the following pattern for resilience:

func (s *Service) Run(ctx context.Context) error {
    // 1. Initial sync on startup (catch up on anything missed while offline)
    if err := s.syncPendingWork(ctx); err != nil {
        return err
    }
    
    // 2. Subscribe to relevant events
    ch, err := s.bus.Subscribe(ctx, s.eventFilter())
    if err != nil {
        return err
    }
    
    // 3. Periodic sync ticker (catch any missed events)
    ticker := time.NewTicker(30 * time.Second)
    defer ticker.Stop()
    
    // 4. Main loop
    for {
        select {
        case event := <-ch:
            s.handleEvent(ctx, event)
        case <-ticker.C:
            s.syncPendingWork(ctx)
        case <-ctx.Done():
            return ctx.Err()
        }
    }
}

This ensures:

  • No work is missed during service restarts
  • Events that arrive while processing are not lost
  • System remains functional even if event delivery fails

Note on event ordering

The select statement in Go does not guarantee ordering when multiple cases are ready. However, this is acceptable for our design because:

  1. Events are hints, not commands — they signal that work may be available
  2. The authoritative state is always in the database
  3. Business invariants (epoch ordering, sequential processing) are enforced by database queries, not event order
  4. Services must be idempotent — receiving the same event twice or out of order should not cause incorrect behavior

If stricter ordering guarantees are needed in the future, the library can be extended to use a priority queue or sequence numbers.


Acceptance Criteria

  • All services use event-driven communication for inter-service coordination
  • Polling interval can be significantly increased (e.g., 30s+) as it's now only a fallback
  • System behavior remains correct when events are missed (hybrid pattern works)
  • No regression in existing functionality
  • Integration tests pass with event-driven architecture
  • Observability: logs/metrics show event processing

Dependencies


Migration Strategy

The migration can be done incrementally per service:

  1. Phase 1: Add event publishing to EvmReader (no consumers yet)
  2. Phase 2: Migrate Advancer to consume events + publish its own
  3. Phase 3: Migrate Validator
  4. Phase 4: Migrate Claimer and PRT

Each phase can be merged independently, and the system will continue working during migration since the polling fallback remains active.


Notes

  • Keep polling as fallback, don't remove it entirely
  • The periodic sync interval can be tuned based on operational experience
  • Monitor event queue depths to detect backpressure issues

Metadata

Metadata

Assignees

Labels

No labels
No labels

Type

No type

Projects

Status

No status

Relationships

None yet

Development

No branches or pull requests

Issue actions