Skip to content

Implement event-driven communication library using PostgreSQL LISTEN/NOTIFY #731

@vfusco

Description

@vfusco

Description

Currently, services communicate through the database using a polling strategy. Each service periodically queries tables looking for state changes (e.g., epochs in a specific status, new inputs). This approach has inherent latency equal to the polling interval and creates unnecessary load on the database.

This issue proposes implementing an internal library that enables event-driven communication between services using PostgreSQL's LISTEN/NOTIFY mechanism. This will reduce latency for state transitions and decrease database polling overhead.

Context

The system has 5 services (EvmReader, Advancer, Validator, Claimer, PRT) that currently poll the database for:

  • Active applications
  • Epochs in specific states (OPEN, CLOSED, INPUTS_PROCESSED, CLAIM_CALCULATED, etc.)
  • New inputs for processing

With LISTEN/NOTIFY, producers can notify consumers immediately when state changes occur, enabling near real-time reactions.

Design Considerations

  1. Fire-and-forget nature: LISTEN/NOTIFY does not persist events. If a consumer is offline when an event is published, it will miss it. The library should be designed with this in mind, and consumers should implement a hybrid approach (events + periodic sync).

  2. Interface-based design: Define clean interfaces (Publisher, Subscriber) to allow future alternative implementations (e.g., in-memory for single-process mode).

  3. Channel strategy: Consider using per-app channels or event-type channels for efficient filtering.


Tasks

  • Define core interfaces in internal/events package:

    • Event struct with Type, AppID, Payload, Timestamp
    • EventType constants for all domain events
    • Publisher interface with Publish(ctx, event) method
    • Subscriber interface with Subscribe(ctx, filter) method returning event channel
    • SubscriptionFilter struct for filtering by event types and app IDs
  • Implement PostgreSQL backend in internal/events/postgres:

    • Use pq.Listener for receiving notifications
    • Implement automatic reconnection on connection loss
    • JSON serialization for event payloads
    • Dispatch loop to route events to matching subscribers
  • Implement event publishing:

    • pg_notify() call on event publication
    • Consider batching for high-frequency events
  • Add unit tests:

    • Interface contract tests
    • Serialization/deserialization tests
    • Filter matching logic tests
  • Add integration tests:

    • Publish/subscribe round-trip with real PostgreSQL
    • Multiple subscribers receiving same event
    • Subscriber reconnection after connection loss
    • High-volume event handling
  • Document the library:

    • Usage examples
    • Event type catalog
    • Best practices for consumers (hybrid pattern)

Proposed Event Types

const (
    // Application events
    EventAppRegistered   EventType = "app.registered"
    EventAppDeactivated  EventType = "app.deactivated"
    EventAppReactivated  EventType = "app.reactivated"
    EventAppInoperable   EventType = "app.inoperable"

    // Epoch events
    EventEpochOpened          EventType = "epoch.opened"
    EventEpochClosed          EventType = "epoch.closed"
    EventEpochInputsProcessed EventType = "epoch.inputs_processed"
    EventEpochClaimCalculated EventType = "epoch.claim_calculated"
    EventEpochClaimSubmitted  EventType = "epoch.claim_submitted"
    EventEpochClaimAccepted   EventType = "epoch.claim_accepted"
    EventEpochClaimRejected   EventType = "epoch.claim_rejected"

    // Input events
    EventInputReceived EventType = "input.received"
)

Acceptance Criteria

  • Library compiles and passes all tests
  • Integration tests demonstrate end-to-end publish/subscribe flow
  • Documentation explains usage and the hybrid pattern for resilience
  • No breaking changes to existing service code (library is additive)

Notes

  • This issue focuses only on the library implementation, not service migration
  • Service migration will be handled in a separate follow-up issue
  • The library should be designed to support future backends (e.g., in-memory) but only PostgreSQL needs to be implemented now

Metadata

Metadata

Assignees

Labels

No labels
No labels

Type

No type

Projects

Status

No status

Relationships

None yet

Development

No branches or pull requests

Issue actions