Skip to content

feat: implement core storage, fetcher, and sync engine (Phase 1)#27

Merged
tac0turtle merged 2 commits intomainfrom
phase1/core-storage-sync-engine
Feb 19, 2026
Merged

feat: implement core storage, fetcher, and sync engine (Phase 1)#27
tac0turtle merged 2 commits intomainfrom
phase1/core-storage-sync-engine

Conversation

@tac0turtle
Copy link
Contributor

@tac0turtle tac0turtle commented Feb 19, 2026

Add SQLite store (pkg/store/sqlite.go) with WAL mode, PRAGMA user_version migration, and idempotent writes. Add Celestia node fetcher (pkg/fetch/celestia_node.go) using go-jsonrpc with slim response types for CometBFT JSON encoding. Implement sync engine with Coordinator state machine (init -> backfill -> stream), concurrent Backfiller with batched worker pool, and SubscriptionManager with gap detection and re-backfill recovery. Wire everything in cmd/apex/main.go with signal handling.

Closes #10, #11, #12

Overview

Summary by CodeRabbit

Release Notes

  • New Features

    • Added data fetching from Celestia nodes via JSON-RPC
    • Introduced persistent local data storage
    • Implemented batched historical data backfilling
    • Added live header stream subscription and processing
    • Established sync coordination with state checkpointing
  • Tests

    • Added comprehensive test coverage for synchronization components
  • Chores

    • Added required dependencies for new functionality
    • Updated project configuration files

Add SQLite store (pkg/store/sqlite.go) with WAL mode, PRAGMA
user_version migration, and idempotent writes. Add Celestia node
fetcher (pkg/fetch/celestia_node.go) using go-jsonrpc with slim
response types for CometBFT JSON encoding. Implement sync engine
with Coordinator state machine (init -> backfill -> stream),
concurrent Backfiller with batched worker pool, and
SubscriptionManager with gap detection and re-backfill recovery.
Wire everything in cmd/apex/main.go with signal handling.

Closes #10, #11, #12

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
@gemini-code-assist
Copy link

Summary of Changes

Hello @tac0turtle, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request delivers the foundational components for the Apex indexer, establishing a robust data ingestion pipeline. It integrates a persistent SQLite store, a dedicated fetcher for Celestia node data, and a sophisticated sync engine to manage the entire data synchronization process from historical backfilling to real-time streaming. This phase lays the groundwork for reliable and efficient data indexing.

Highlights

  • Core Storage Implementation: Implemented a SQLite-based data store (pkg/store/sqlite.go) featuring Write-Ahead Logging (WAL) mode, PRAGMA user_version for migrations, and idempotent write operations for headers, blobs, and namespaces.
  • Celestia Node Data Fetcher: Developed a Celestia node fetcher (pkg/fetch/celestia_node.go) utilizing the go-jsonrpc library. This fetcher includes slim response types to efficiently handle CometBFT JSON encoding for headers and blobs, and supports live header subscriptions.
  • Robust Sync Engine: Introduced a comprehensive sync engine (pkg/sync) orchestrated by a Coordinator state machine. This engine manages the full data synchronization lifecycle: initialization, historical backfilling, and live streaming.
  • Concurrent Backfiller: The sync engine includes a concurrent Backfiller with a batched worker pool to efficiently fetch and store historical block data, ensuring data integrity and performance during initial synchronization.
  • Subscription Manager with Gap Detection: A SubscriptionManager component was added to handle live header streams, incorporating gap detection logic and automatic re-backfill recovery to maintain data consistency.
  • Application Wiring and Signal Handling: The core components (store, fetcher, sync coordinator) have been wired into cmd/apex/main.go, complete with robust signal handling for graceful shutdown.
  • Dependency Updates: Added new Go module dependencies for JSON-RPC communication (github.com/filecoin-project/go-jsonrpc) and SQLite database access (modernc.org/sqlite).

🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console.

Changelog
  • .gitignore
    • Added 'ISSUES.md' to the ignore list for project-related files.
  • cmd/apex/main.go
    • Imported necessary packages for context, error handling, OS signals, Celestia data fetching, SQLite storage, and synchronization logic.
    • Replaced the placeholder 'TODO' comment with a call to a new 'runIndexer' function to initiate the core indexing process.
    • Implemented the 'runIndexer' function to parse namespaces, open the SQLite store, persist configured namespaces, connect to the Celestia node fetcher, and build and run the sync coordinator with signal handling.
  • go.mod
    • Added 'github.com/filecoin-project/go-jsonrpc' for JSON-RPC communication.
    • Added 'modernc.org/sqlite' for SQLite database access.
  • go.sum
    • Updated module checksums to reflect the addition of new dependencies.
  • pkg/fetch/celestia_node.go
    • Added a new file implementing 'CelestiaNodeFetcher' to interact with a Celestia node's JSON-RPC API for fetching headers and blobs.
    • Defined 'headerAPI' and 'blobAPI' structs for JSON-RPC stubs.
    • Included helper types 'jsonInt64' and 'hexBytes' to handle specific CometBFT JSON encoding formats.
  • pkg/fetch/celestia_node_test.go
    • Added unit tests for 'mapHeader' and 'mapBlobs' functions to ensure correct parsing of Celestia node responses.
    • Included tests for 'isNotFoundErr', 'namespacesToBytes', 'jsonInt64', and 'hexBytes' utility functions.
  • pkg/store/migrations/001_init.sql
    • Added SQL schema for 'sync_state', 'headers', 'blobs', and 'namespaces' tables, including primary keys, unique constraints, and an index.
  • pkg/store/sqlite.go
    • Added a new file implementing 'SQLiteStore' for persistent data storage.
    • Provided 'Open' function to initialize the SQLite database with WAL mode, busy timeout, and foreign key constraints.
    • Implemented 'migrate' function to apply schema migrations using embedded SQL files.
    • Implemented CRUD operations for headers, blobs, namespaces, and sync state, ensuring idempotent writes.
  • pkg/store/sqlite_test.go
    • Added comprehensive unit tests for the 'SQLiteStore' to verify header, blob, namespace, and sync state operations, including idempotency and migration behavior.
  • pkg/sync/backfill.go
    • Added a new file defining the 'Backfiller' component responsible for fetching and storing historical blocks in batched, concurrent chunks.
    • Implemented the 'Run' method to manage the backfilling process, including checkpointing sync state.
  • pkg/sync/backfill_test.go
    • Added unit tests for the 'Backfiller' to verify batch processing, context cancellation, and error handling during historical data fetching.
  • pkg/sync/coordinator.go
    • Modified the 'Coordinator' struct to include a mutex for state management and a logger.
    • Added 'WithLogger' option for configuring the coordinator's logger.
    • Implemented 'Status' method for concurrent-safe retrieval of sync status.
    • Implemented the main 'Run' method to orchestrate the sync lifecycle (initialize -> backfill -> stream), including re-backfill on gap detection.
    • Implemented 'initialize' method to determine starting height and network head, supporting resumption from checkpoints.
  • pkg/sync/coordinator_test.go
    • Added unit tests for the 'Coordinator' to cover the full sync cycle, resuming from checkpoints, handling gap detection and re-backfilling, and context cancellation.
  • pkg/sync/mock_test.go
    • Added a new file containing mock implementations of 'Store' and 'DataFetcher' interfaces for isolated testing of sync components.
  • pkg/sync/subscription.go
    • Added a new file defining the 'SubscriptionManager' component to handle live header subscriptions.
    • Implemented the 'Run' method to process new headers, check for contiguity, detect gaps, and update sync state.
    • Implemented 'processHeader' to store new headers and associated blobs.
  • pkg/types/types.go
    • Modified the 'Blob' struct to include a 'Height' field, associating blobs with their respective block heights.
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

@coderabbitai
Copy link

coderabbitai bot commented Feb 19, 2026

📝 Walkthrough

Walkthrough

This PR implements the core indexing infrastructure for Apex: a SQLite persistence layer with schema migrations, a Celestia RPC-based data fetcher, a batch backfiller for historical blocks, a live subscription handler for streaming headers, and integrates them into the Coordinator orchestration layer. The CLI entry point is wired to execute the complete sync workflow.

Changes

Cohort / File(s) Summary
Configuration & Dependencies
.gitignore, go.mod, cmd/apex/main.go
Adds ISSUES.md to gitignore; introduces SQLite, JSON-RPC, and logging dependencies; wires runIndexer function to orchestrate lifecycle (namespaces, store, fetcher, sync coordinator) with proper signal handling and cleanup.
Storage Layer
pkg/store/sqlite.go, pkg/store/migrations/001_init.sql, pkg/store/sqlite_test.go
Implements SQLite-backed Store with WAL mode, embedded schema migrations (sync_state, headers, blobs, namespaces tables), and CRUD operations for headers, blobs, namespaces, and sync state; comprehensive test suite validates persistence, range queries, idempotency, and edge cases.
Data Fetching Layer
pkg/fetch/celestia_node.go, pkg/fetch/celestia_node_test.go
Introduces CelestiaNodeFetcher communicating with Celestia node via JSON-RPC/WebSocket; provides methods to fetch headers/blobs by height, subscribe to header updates, and parse responses; includes robust mapping logic with custom JSON unmarshalling for numeric and hex-encoded fields.
Sync Engine — Backfiller
pkg/sync/backfill.go, pkg/sync/backfill_test.go
Implements Backfiller for batched, concurrent historical block retrieval; spawns worker goroutines per batch, fetches headers and blobs, updates sync checkpoints, and propagates errors; tests validate batch processing, cancellation, and error handling.
Sync Engine — Coordinator & Subscription
pkg/sync/coordinator.go, pkg/sync/subscription.go, pkg/sync/coordinator_test.go
Expands Coordinator with Run orchestration (initialize → backfill → streaming), state management (RWMutex, Status API), logger configuration, and gap detection (ErrGapDetected); introduces SubscriptionManager to process live headers, enforce height contiguity, and emit gaps; comprehensive tests cover full sync cycles, resume from checkpoint, gap recovery, and cancellation.
Testing Infrastructure
pkg/sync/mock_test.go
Provides in-memory mockStore and mockFetcher implementations with mutex-protected maps, optional error injection, and deterministic header/blob serving for isolated sync engine testing.
Domain Types
pkg/types/types.go
Adds Height field to Blob struct to track the block height associated with each blob.

Sequence Diagram(s)

sequenceDiagram
    actor User
    participant CLI as cmd/apex
    participant Coordinator
    participant Store
    participant Backfiller
    participant Fetcher as CelestiaNodeFetcher
    participant SubMgr as SubscriptionManager

    User->>CLI: Start indexer
    CLI->>Coordinator: Run(ctx)
    
    Coordinator->>Store: GetSyncState()
    Store-->>Coordinator: fromHeight, networkHeight
    
    Coordinator->>Backfiller: Initialize with heights
    activate Backfiller
    loop Each Batch
        Backfiller->>Backfiller: Create height batch
        par Worker Goroutines
            Backfiller->>Fetcher: GetHeader(height)
            Fetcher-->>Backfiller: header
            Backfiller->>Store: PutHeader(header)
            
            Backfiller->>Store: GetNamespaces()
            Store-->>Backfiller: namespaces
            Backfiller->>Fetcher: GetBlobs(height, namespaces)
            Fetcher-->>Backfiller: blobs
            Backfiller->>Store: PutBlobs(blobs)
        end
        Backfiller->>Store: SetSyncState(LatestHeight)
    end
    deactivate Backfiller
    
    Coordinator->>SubMgr: Initialize & Run
    activate SubMgr
    SubMgr->>Fetcher: SubscribeHeaders(ctx)
    
    loop Live Stream
        Fetcher-->>SubMgr: header (from subscription)
        SubMgr->>SubMgr: Validate contiguity
        alt Gap Detected
            SubMgr-->>Coordinator: ErrGapDetected
            Coordinator->>Backfiller: Re-run for gap range
            Note over Coordinator: Loop back to backfill
        else No Gap
            SubMgr->>Store: PutHeader(header)
            SubMgr->>Fetcher: GetBlobs(height, namespaces)
            SubMgr->>Store: PutBlobs(blobs)
            SubMgr->>Store: SetSyncState(Streaming, LatestHeight)
        end
    end
    deactivate SubMgr
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60 minutes

Possibly related issues

  • SQLite storage layer implementation #10: SQLite storage layer implementation — This PR directly implements the SQLite-backed storage layer with schema migrations, CRUD operations, and WAL mode as specified in the issue requirements.
  • Sync engine: backfill, streaming, and gap recovery #11: Sync engine (Backfill/Streaming/Gap recovery) — This PR implements the Coordinator orchestration, Backfiller for historical batched retrieval, SubscriptionManager for live headers, and CelestiaNodeFetcher data source matching the sync engine architecture.

Possibly related PRs

Poem

🐰 A rabbit hops through blockschain so tall,
SQLite stores headers, one and all!
Backfill and stream, gap-proof we stand,
Celestia's blobs now fill the land! 📦✨

🚥 Pre-merge checks | ✅ 3 | ❌ 2

❌ Failed checks (1 warning, 1 inconclusive)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 11.11% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
Linked Issues check ❓ Inconclusive The PR references closing issues #10, #11, and #12, but only #10 objectives are provided. Schema deviations and missing interface definition cannot be fully assessed without access to all linked issue requirements. Verify that all linked issue (#10, #11, #12) requirements are met, including the Store interface definition and all required method signatures.
✅ Passed checks (3 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title 'feat: implement core storage, fetcher, and sync engine (Phase 1)' accurately summarizes the main addition of three core components (storage, fetcher, and sync engine).
Out of Scope Changes check ✅ Passed All changes are scoped to implementing the storage layer, Celestia node fetcher, and sync engine components; no unrelated modifications detected.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
  • 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch phase1/core-storage-sync-engine

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 6

🧹 Nitpick comments (7)
pkg/store/migrations/001_init.sql (1)

17-27: AUTOINCREMENT on blobs.id is unnecessary — consider dropping it.

In SQLite, INTEGER PRIMARY KEY AUTOINCREMENT incurs extra overhead: it maintains the sqlite_sequence table and performs an additional write on every insert to guarantee monotonically-increasing IDs with no gaps. Since nothing in the application depends on gap-free or non-reused IDs (the UNIQUE(height, namespace, commitment) constraint is the meaningful uniqueness guarantee), plain INTEGER PRIMARY KEY is sufficient and faster.

♻️ Proposed change
-    id            INTEGER PRIMARY KEY AUTOINCREMENT,
+    id            INTEGER PRIMARY KEY,
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@pkg/store/migrations/001_init.sql` around lines 17 - 27, The blobs table
currently defines id as INTEGER PRIMARY KEY AUTOINCREMENT which forces SQLite to
maintain sqlite_sequence and causes extra writes; change the id column
definition in the CREATE TABLE for blobs from using AUTOINCREMENT to a plain
INTEGER PRIMARY KEY (i.e., remove the AUTOINCREMENT keyword) so inserts use the
rowid without the extra overhead—update the blobs CREATE TABLE statement
(referencing id, AUTOINCREMENT, and table name blobs) accordingly and ensure any
surrounding schema or migrations remain consistent with the plain INTEGER
PRIMARY KEY behavior.
pkg/store/sqlite_test.go (1)

30-282: Tests don't follow the required table-driven pattern.

TestPutGetHeader, TestGetHeaderNotFound, and TestPutHeaderIdempotent all exercise the same PutHeader/GetHeader surface; the same applies to the blob and sync-state groups. These should be unified into TestHeader, TestBlobs, and TestSyncState functions using t.Run subtests. As per coding guidelines, "Use table-driven tests pattern for test implementation."

♻️ Sketch for header tests
func TestHeader(t *testing.T) {
    tests := []struct {
        name    string
        store   func() *SQLiteStore
        height  uint64
        wantErr error
    }{
        {name: "round-trip", /* ... */},
        {name: "not found", height: 999, wantErr: ErrNotFound},
        {name: "idempotent insert", /* ... */},
    }
    for _, tc := range tests {
        t.Run(tc.name, func(t *testing.T) {
            // ...
        })
    }
}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@pkg/store/sqlite_test.go` around lines 30 - 282, Several tests duplicate the
same Put/Get logic instead of using table-driven subtests; consolidate the
header, blobs, and sync-state groups into table-driven tests that call t.Run for
each scenario. Replace
TestPutGetHeader/TestGetHeaderNotFound/TestPutHeaderIdempotent with a single
TestHeader using a []struct{ name string; setup func() *SQLiteStore; input ...;
wantErr error } and inside the loop call openTestDB, then exercise s.PutHeader
and s.GetHeader (and idempotent re-PutHeader) per case. Do the same for blobs in
TestBlobs covering single GetBlob, range GetBlobs, not-found, idempotent
PutBlobs, empty inputs (use s.PutBlobs, s.GetBlob, s.GetBlobs), and for sync
state in TestSyncState covering fresh ErrNotFound, SetSyncState/GetSyncState,
and upsert (use s.SetSyncState and s.GetSyncState); implement assertions inside
each t.Run and keep helper calls (openTestDB, testNamespace) inside the
table-driven cases.
pkg/sync/coordinator.go (1)

80-87: Status() returns only State, leaving LatestHeight and NetworkHeight zero.

Callers (and future API consumers) may expect Status() to return a fully populated SyncStatus. Currently only State is set. Consider reading LatestHeight from the store or caching it in the coordinator alongside state so the status endpoint is more useful.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@pkg/sync/coordinator.go` around lines 80 - 87, Status() currently only
returns State and leaves LatestHeight and NetworkHeight zero; update
Coordinator.Status() to also populate LatestHeight (and NetworkHeight if
available) by reading the latest synced height from the persistent store or from
a cached coordinator field protected by c.stateMu (e.g., add/maintain
Coordinator.latestHeight and optionally Coordinator.networkHeight when heights
change). In practice, inside Status() while holding c.stateMu.RLock(), read
c.latestHeight (or call the store accessor such as
c.store.LatestHeight()/similar), and set types.SyncStatus{State: c.state,
LatestHeight: <value>, NetworkHeight: <value>} so callers receive a fully
populated SyncStatus.
pkg/sync/coordinator_test.go (2)

44-52: Polling loops without timeout can hang tests indefinitely.

The for { time.Sleep(10ms); if state == Streaming { break } } pattern appears in every test but lacks a deadline. If the coordinator never reaches Streaming (e.g., due to a bug), the test hangs forever instead of failing — particularly problematic in CI.

Consider adding a timeout, e.g.:

♻️ Suggested helper
+func waitForState(t *testing.T, coord *Coordinator, want types.SyncState, timeout time.Duration) {
+	t.Helper()
+	deadline := time.After(timeout)
+	for {
+		select {
+		case <-deadline:
+			t.Fatalf("timed out waiting for state %v, current: %v", want, coord.Status().State)
+		default:
+		}
+		if coord.Status().State == want {
+			return
+		}
+		time.Sleep(10 * time.Millisecond)
+	}
+}

Then replace each bare polling loop (lines 46-52, 110-116, 156-161, 190-195) with waitForState(t, coord, types.Streaming, 5*time.Second).

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@pkg/sync/coordinator_test.go` around lines 44 - 52, The polling loops in
coordinator_test.go use an infinite for-sleep loop to wait for coord.Status() to
reach types.Streaming, which can hang tests; add a helper function
waitForState(t *testing.T, coord *CoordinatorType?, want types.State, timeout
time.Duration) that polls coord.Status() with a small tick and fails the test
with t.Fatalf on timeout, then replace each bare loop (the blocks that check
coord.Status() and break on types.Streaming) with calls like waitForState(t,
coord, types.Streaming, 5*time.Second); ensure the helper references
coord.Status() and types.Streaming so it’s easy to locate and reuse across the
test file.

143-144: Dead code: origGetNetworkHead is unused.

origGetNetworkHead is assigned at line 144 but only referenced via a blank identifier at line 201 (_ = origGetNetworkHead). This looks like a leftover from refactoring. Remove both lines to reduce confusion.

🧹 Proposed cleanup
-	callCount := 0
-	origGetNetworkHead := ft.headH
 	ctx, cancel := context.WithCancel(context.Background())
...
-	_ = origGetNetworkHead
 	err := coord.Run(ctx)

If callCount is still needed for the gap-injection guard on line 163, keep it; otherwise remove it too.

Also applies to: 201-201

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@pkg/sync/coordinator_test.go` around lines 143 - 144, Remove the
dead-variable assignment origGetNetworkHead := ft.headH and the later dummy use
`_ = origGetNetworkHead`; these are leftovers from refactoring and unused. Also
check whether callCount (used as the gap-injection guard) is actually needed; if
it's not referenced elsewhere keep neither callCount nor its guard, otherwise
keep callCount and just remove origGetNetworkHead and its blank-assignment.
Locate symbols ft.headH, origGetNetworkHead, callCount to make the edits.
pkg/store/sqlite.go (1)

26-53: SetMaxOpenConns(1) serializes all DB access through a single connection.

This is intentional per the comment, and works fine for correctness with SQLite. However, the concurrent backfill workers in backfill.go will contend on this single connection. If network fetching is the bottleneck this won't matter, but if the store becomes the bottleneck (e.g., large blobs), consider a separate read-only connection pool or increasing the limit with WAL mode.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@pkg/store/sqlite.go` around lines 26 - 53, Open currently calls
db.SetMaxOpenConns(1) which forces all SQLite access through a single connection
and causes contention (notably for concurrent backfill workers in backfill.go);
update the Open function (or SQLiteStore initialization) to avoid serializing
all access by either increasing db.SetMaxOpenConns to a higher value when WAL is
enabled or by creating and exposing a separate read-only connection pool (e.g.,
a second *sql.DB managed by SQLiteStore) that backfill workers can use for
heavy/readonly reads; ensure any new pool is opened with the same PRAGMA
settings and properly closed in SQLiteStore.Close.
pkg/fetch/celestia_node.go (1)

225-232: isNotFoundErr relies on string matching — fragile but pragmatic.

This will break silently if Celestia changes error message wording. Consider adding a comment noting the dependency on these specific error strings, or checking for structured error codes if the RPC library supports them.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@pkg/fetch/celestia_node.go` around lines 225 - 232, The isNotFoundErr
function currently relies solely on fragile string matching of err.Error();
update it to first try structured checks (e.g., type-assert the error to the
RPC/grpc error type or use errors.Is against any known Celestia "not found"
sentinel/error value) and only fall back to the existing strings.Contains checks
for "blob: not found" and "header: not found"; also add a concise comment above
isNotFoundErr explaining the dependency on those specific strings and why the
structured-check-first fallback-to-string approach is used so future changes to
the RPC library can be handled safely.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@cmd/apex/main.go`:
- Around line 120-135: The function runIndexer currently discards the caller
context by using `_ context.Context` and creates a fresh context with
signal.NotifyContext(context.Background(), ...); change the signature to accept
a real context parameter (e.g., runIndexer(ctx context.Context, cfg
*config.Config) error) and call signal.NotifyContext(ctx, syscall.SIGINT,
syscall.SIGTERM) so the returned ctx inherits the caller's
cancellation/deadline; ensure the new ctx is used for downstream operations and
call stop() appropriately to cancel the signal watcher.

In `@pkg/fetch/celestia_node.go`:
- Around line 96-120: The goroutine in SubscribeHeaders can leak because "for
raw := range rawCh" blocks and won't observe ctx cancellation; replace the range
loop with an explicit select-based receive like "for { select { case raw, ok :=
<-rawCh: if !ok { break } ... case <-ctx.Done(): return } }" so the goroutine
exits when ctx is canceled, keep the existing mapHeader handling and logging,
and preserve deferred close(out); use f.header.Subscribe call and rawCh name to
locate the subscription code.

In `@pkg/store/sqlite.go`:
- Around line 159-173: Replace direct equality checks against sql.ErrNoRows with
errors.Is(...) in this file: locate the comparison inside GetHeader (and the two
other occurrences in this file where code does `err == sql.ErrNoRows`) and
change each to `if errors.Is(err, sql.ErrNoRows) { ... }`; also add the standard
"errors" import to the import block so the code compiles. Ensure the error
handling branches and wrapped fmt.Errorf usages remain unchanged except for
using errors.Is for the sentinel check.

In `@pkg/sync/backfill_test.go`:
- Around line 11-128: Combine the four tests TestBackfillerBatchProcessing,
TestBackfillerContextCancellation, TestBackfillerFetchError, and
TestBackfillerSingleHeight into a single table-driven test (e.g.,
TestBackfiller) that iterates over a slice of cases and uses t.Run with
t.Parallel for each case; for each case call a setup func that prepares new
mockStore and mockFetcher (use makeHeader, mockFetcher.addHeader,
mockFetcher.addBlobs as appropriate), then construct a Backfiller and call
Backfiller.Run with the case's start/end and assert errors, stored
headers/blobs, and sync state (LatestHeight) based on case.wantErr, wantBlobs,
and wantLatest — this consolidates test setup and allows parallel subtests per
the coding guideline.

In `@pkg/sync/mock_test.go`:
- Around line 153-164: In GetHeader, reading f.getHdrFn happens before acquiring
f.mu which can race with concurrent writers; change to read getHdrFn under the
mutex (e.g., f.mu.Lock(); fn := f.getHdrFn; f.mu.Unlock()), then if fn != nil
call fn(ctx, height); keep the existing locked access for f.headers (use the
same mu) so all shared fields (getHdrFn and headers) are consistently
synchronized in mockFetcher.GetHeader.

In `@pkg/sync/subscription.go`:
- Around line 94-99: SetSyncState is writing a SyncStatus with NetworkHeight
left at zero, overwriting stored backfill progress; update
SubscriptionManager.Run so before calling sm.store.SetSyncState you populate
types.SyncStatus.NetworkHeight (either by fetching the current network head once
at startup and storing it in a local variable, or by using hdr.Height when the
subscription is at tip) and pass that value into the struct you pass to
sm.store.SetSyncState (keep State and LatestHeight as-is but ensure
NetworkHeight is non-zero) to avoid corrupting the sync_state row.

---

Nitpick comments:
In `@pkg/fetch/celestia_node.go`:
- Around line 225-232: The isNotFoundErr function currently relies solely on
fragile string matching of err.Error(); update it to first try structured checks
(e.g., type-assert the error to the RPC/grpc error type or use errors.Is against
any known Celestia "not found" sentinel/error value) and only fall back to the
existing strings.Contains checks for "blob: not found" and "header: not found";
also add a concise comment above isNotFoundErr explaining the dependency on
those specific strings and why the structured-check-first fallback-to-string
approach is used so future changes to the RPC library can be handled safely.

In `@pkg/store/migrations/001_init.sql`:
- Around line 17-27: The blobs table currently defines id as INTEGER PRIMARY KEY
AUTOINCREMENT which forces SQLite to maintain sqlite_sequence and causes extra
writes; change the id column definition in the CREATE TABLE for blobs from using
AUTOINCREMENT to a plain INTEGER PRIMARY KEY (i.e., remove the AUTOINCREMENT
keyword) so inserts use the rowid without the extra overhead—update the blobs
CREATE TABLE statement (referencing id, AUTOINCREMENT, and table name blobs)
accordingly and ensure any surrounding schema or migrations remain consistent
with the plain INTEGER PRIMARY KEY behavior.

In `@pkg/store/sqlite_test.go`:
- Around line 30-282: Several tests duplicate the same Put/Get logic instead of
using table-driven subtests; consolidate the header, blobs, and sync-state
groups into table-driven tests that call t.Run for each scenario. Replace
TestPutGetHeader/TestGetHeaderNotFound/TestPutHeaderIdempotent with a single
TestHeader using a []struct{ name string; setup func() *SQLiteStore; input ...;
wantErr error } and inside the loop call openTestDB, then exercise s.PutHeader
and s.GetHeader (and idempotent re-PutHeader) per case. Do the same for blobs in
TestBlobs covering single GetBlob, range GetBlobs, not-found, idempotent
PutBlobs, empty inputs (use s.PutBlobs, s.GetBlob, s.GetBlobs), and for sync
state in TestSyncState covering fresh ErrNotFound, SetSyncState/GetSyncState,
and upsert (use s.SetSyncState and s.GetSyncState); implement assertions inside
each t.Run and keep helper calls (openTestDB, testNamespace) inside the
table-driven cases.

In `@pkg/store/sqlite.go`:
- Around line 26-53: Open currently calls db.SetMaxOpenConns(1) which forces all
SQLite access through a single connection and causes contention (notably for
concurrent backfill workers in backfill.go); update the Open function (or
SQLiteStore initialization) to avoid serializing all access by either increasing
db.SetMaxOpenConns to a higher value when WAL is enabled or by creating and
exposing a separate read-only connection pool (e.g., a second *sql.DB managed by
SQLiteStore) that backfill workers can use for heavy/readonly reads; ensure any
new pool is opened with the same PRAGMA settings and properly closed in
SQLiteStore.Close.

In `@pkg/sync/coordinator_test.go`:
- Around line 44-52: The polling loops in coordinator_test.go use an infinite
for-sleep loop to wait for coord.Status() to reach types.Streaming, which can
hang tests; add a helper function waitForState(t *testing.T, coord
*CoordinatorType?, want types.State, timeout time.Duration) that polls
coord.Status() with a small tick and fails the test with t.Fatalf on timeout,
then replace each bare loop (the blocks that check coord.Status() and break on
types.Streaming) with calls like waitForState(t, coord, types.Streaming,
5*time.Second); ensure the helper references coord.Status() and types.Streaming
so it’s easy to locate and reuse across the test file.
- Around line 143-144: Remove the dead-variable assignment origGetNetworkHead :=
ft.headH and the later dummy use `_ = origGetNetworkHead`; these are leftovers
from refactoring and unused. Also check whether callCount (used as the
gap-injection guard) is actually needed; if it's not referenced elsewhere keep
neither callCount nor its guard, otherwise keep callCount and just remove
origGetNetworkHead and its blank-assignment. Locate symbols ft.headH,
origGetNetworkHead, callCount to make the edits.

In `@pkg/sync/coordinator.go`:
- Around line 80-87: Status() currently only returns State and leaves
LatestHeight and NetworkHeight zero; update Coordinator.Status() to also
populate LatestHeight (and NetworkHeight if available) by reading the latest
synced height from the persistent store or from a cached coordinator field
protected by c.stateMu (e.g., add/maintain Coordinator.latestHeight and
optionally Coordinator.networkHeight when heights change). In practice, inside
Status() while holding c.stateMu.RLock(), read c.latestHeight (or call the store
accessor such as c.store.LatestHeight()/similar), and set
types.SyncStatus{State: c.state, LatestHeight: <value>, NetworkHeight: <value>}
so callers receive a fully populated SyncStatus.

Comment on lines 96 to 120
func (f *CelestiaNodeFetcher) SubscribeHeaders(ctx context.Context) (<-chan *types.Header, error) {
rawCh, err := f.header.Subscribe(ctx)
if err != nil {
return nil, fmt.Errorf("header.Subscribe: %w", err)
}

out := make(chan *types.Header, 64)
go func() {
defer close(out)
for raw := range rawCh {
hdr, err := mapHeader(raw)
if err != nil {
f.log.Error().Err(err).Msg("failed to parse subscribed header")
continue
}
select {
case out <- hdr:
case <-ctx.Done():
return
}
}
}()

return out, nil
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

🧩 Analysis chain

🏁 Script executed:

cat go.mod | grep -i jsonrpc

Repository: evstack/apex

Length of output: 103


🏁 Script executed:

rg -n "Subscribe" pkg/fetch/celestia_node.go -B 2 -A 2

Repository: evstack/apex

Length of output: 533


🏁 Script executed:

# Search for import paths to understand the go-jsonrpc dependency
rg -n "go-jsonrpc\|jsonrpc" --type go -A 1 | head -50

Repository: evstack/apex

Length of output: 38


🌐 Web query:

go-jsonrpc Subscribe context cancellation channel closure behavior

💡 Result:

In filecoin-project/go-jsonrpc, “subscriptions” are typically implemented as RPC methods that return a Go channel (websocket mode only), not as a special Subscribe API.

Behavior is:

  • Context cancellation propagation (client → server): if the client-side ctx for an in-flight JSON-RPC call is canceled, go-jsonrpc propagates that cancellation to the server handler; in websocket mode it does so by sending an xrpc.cancel message with the canceled request ID. [1][2]
  • How/when the subscription channel closes:
    • When the server closes the returned channel, the client is notified via xrpc.ch.close and the client-side channel is closed accordingly. [1][2]
    • If the websocket connection breaks, the client-side channel is closed; on the server side, further writes are discarded. [1][2]
  • Important implication: canceling the context used to initiate the call does not, by itself, guarantee the returned channel will be closed on the client. The intended pattern is that the server handler must watch ctx.Done() and stop producing/close the channel itself (i.e., treat context cancellation as the signal to end the stream). [1][2]

Sources:
[1] go-jsonrpc README / pkg docs describing context cancel (xrpc.cancel) and channel close (xrpc.ch.close) + websocket-break behavior. (github.com)
[2] pkg.go.dev for github.com/filecoin-project/go-jsonrpc (same content as README excerpt). (pkg.go.dev)

Citations:


Potential goroutine leak if the upstream rawCh doesn't close on context cancellation.

The for raw := range rawCh (line 105) blocks waiting for the next message. The ctx.Done() check on line 113 only executes after a message is received. If ctx is cancelled but rawCh is neither closed nor producing items, this goroutine will leak.

Per go-jsonrpc's documented behavior (v0.10.1), canceling the context does not automatically close the returned subscription channel. The channel closes only when: (1) the server closes it, or (2) the websocket connection breaks. This means safety depends entirely on whether the Celestia node's server-side Subscribe implementation watches ctx.Done() and closes the channel accordingly. If the server doesn't do this, the goroutine will be stuck at the range statement indefinitely.

Consider adding a separate goroutine that monitors ctx.Done() to explicitly cancel or drain the subscription, or refactoring to handle context cancellation more explicitly.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@pkg/fetch/celestia_node.go` around lines 96 - 120, The goroutine in
SubscribeHeaders can leak because "for raw := range rawCh" blocks and won't
observe ctx cancellation; replace the range loop with an explicit select-based
receive like "for { select { case raw, ok := <-rawCh: if !ok { break } ... case
<-ctx.Done(): return } }" so the goroutine exits when ctx is canceled, keep the
existing mapHeader handling and logging, and preserve deferred close(out); use
f.header.Subscribe call and rawCh name to locate the subscription code.

Comment on lines 153 to 164
func (f *mockFetcher) GetHeader(ctx context.Context, height uint64) (*types.Header, error) {
if f.getHdrFn != nil {
return f.getHdrFn(ctx, height)
}
f.mu.Lock()
defer f.mu.Unlock()
h, ok := f.headers[height]
if !ok {
return nil, store.ErrNotFound
}
return h, nil
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

getHdrFn read without holding the lock — potential data race under -race.

f.getHdrFn is accessed at line 154 before f.mu.Lock() is acquired. If any test assigns getHdrFn after goroutines that call GetHeader are already running (e.g., for dynamic injection mid-test), the race detector will flag the concurrent write/read. Consistent locking avoids this:

🔒 Proposed fix
 func (f *mockFetcher) GetHeader(ctx context.Context, height uint64) (*types.Header, error) {
-	if f.getHdrFn != nil {
-		return f.getHdrFn(ctx, height)
-	}
 	f.mu.Lock()
-	defer f.mu.Unlock()
+	fn := f.getHdrFn
 	h, ok := f.headers[height]
+	f.mu.Unlock()
+	if fn != nil {
+		return fn(ctx, height)
+	}
 	if !ok {
 		return nil, store.ErrNotFound
 	}
 	return h, nil
 }

Based on learnings, all tests must be run with the -race flag; unprotected shared field access defeats that goal.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
func (f *mockFetcher) GetHeader(ctx context.Context, height uint64) (*types.Header, error) {
if f.getHdrFn != nil {
return f.getHdrFn(ctx, height)
}
f.mu.Lock()
defer f.mu.Unlock()
h, ok := f.headers[height]
if !ok {
return nil, store.ErrNotFound
}
return h, nil
}
func (f *mockFetcher) GetHeader(ctx context.Context, height uint64) (*types.Header, error) {
f.mu.Lock()
fn := f.getHdrFn
h, ok := f.headers[height]
f.mu.Unlock()
if fn != nil {
return fn(ctx, height)
}
if !ok {
return nil, store.ErrNotFound
}
return h, nil
}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@pkg/sync/mock_test.go` around lines 153 - 164, In GetHeader, reading
f.getHdrFn happens before acquiring f.mu which can race with concurrent writers;
change to read getHdrFn under the mutex (e.g., f.mu.Lock(); fn := f.getHdrFn;
f.mu.Unlock()), then if fn != nil call fn(ctx, height); keep the existing locked
access for f.headers (use the same mu) so all shared fields (getHdrFn and
headers) are consistently synchronized in mockFetcher.GetHeader.

Comment on lines 94 to 99
if err := sm.store.SetSyncState(ctx, types.SyncStatus{
State: types.Streaming,
LatestHeight: hdr.Height,
}); err != nil {
return fmt.Errorf("set sync state: %w", err)
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

SetSyncState always writes NetworkHeight: 0, corrupting previously stored progress.

types.SyncStatus{State: types.Streaming, LatestHeight: hdr.Height} leaves NetworkHeight as the zero value. Since SetSyncState performs a full upsert of the singleton sync_state row, every processed header overwrites whatever NetworkHeight the backfill phase had persisted with 0.

The simplest fix is to track the network height in SubscriptionManager.Run — it can be fetched once at startup and refreshed periodically, or simply passed in from the coordinator:

🐛 Proposed minimal fix (fetch network head once in Run)
 func (sm *SubscriptionManager) Run(ctx context.Context) error {
 	ch, err := sm.fetcher.SubscribeHeaders(ctx)
 	if err != nil {
 		return fmt.Errorf("subscribe headers: %w", err)
 	}

 	namespaces, err := sm.store.GetNamespaces(ctx)
 	if err != nil {
 		return fmt.Errorf("get namespaces: %w", err)
 	}

 	var lastHeight uint64
 	ss, err := sm.store.GetSyncState(ctx)
 	if err != nil && !errors.Is(err, store.ErrNotFound) {
 		return fmt.Errorf("get sync state: %w", err)
 	}
+	var networkHeight uint64
 	if ss != nil {
 		lastHeight = ss.LatestHeight
+		networkHeight = ss.NetworkHeight
 	}
 	if err := sm.store.SetSyncState(ctx, types.SyncStatus{
 		State:        types.Streaming,
 		LatestHeight: hdr.Height,
+		NetworkHeight: networkHeight,
 	}); err != nil {

For a live view, networkHeight could also be updated from the incoming hdr.Height (the subscription is at the network tip, so hdr.Height is the network height) — either approach beats overwriting with 0.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@pkg/sync/subscription.go` around lines 94 - 99, SetSyncState is writing a
SyncStatus with NetworkHeight left at zero, overwriting stored backfill
progress; update SubscriptionManager.Run so before calling sm.store.SetSyncState
you populate types.SyncStatus.NetworkHeight (either by fetching the current
network head once at startup and storing it in a local variable, or by using
hdr.Height when the subscription is at tip) and pass that value into the struct
you pass to sm.store.SetSyncState (keep State and LatestHeight as-is but ensure
NetworkHeight is non-zero) to avoid corrupting the sync_state row.

Copy link

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces the core components for the indexer: a SQLite-based store, a Celestia node fetcher, and a sync engine. The overall architecture is robust, with a clear separation of concerns, good use of interfaces, and a resilient sync coordinator that handles backfilling, streaming, and gap detection. The code is well-tested, including unit tests for new components and integration-style tests for the sync logic using mocks. My feedback includes a few suggestions for improvement, mainly around context handling and opportunities for code simplification.

Comment on lines +59 to +113
func (b *Backfiller) processBatch(ctx context.Context, from, to uint64, namespaces []types.Namespace) error {
heights := make(chan uint64, to-from+1)
for h := from; h <= to; h++ {
heights <- h
}
close(heights)

workers := b.concurrency
if int(to-from+1) < workers {
workers = int(to - from + 1)
}

var (
wg sync.WaitGroup
mu sync.Mutex
firstErr error
)

for range workers {
wg.Add(1)
go func() {
defer wg.Done()
for height := range heights {
// Check for prior error or context cancellation.
mu.Lock()
failed := firstErr != nil
mu.Unlock()
if failed {
return
}

if err := ctx.Err(); err != nil {
mu.Lock()
if firstErr == nil {
firstErr = err
}
mu.Unlock()
return
}

if err := b.processHeight(ctx, height, namespaces); err != nil {
mu.Lock()
if firstErr == nil {
firstErr = fmt.Errorf("height %d: %w", height, err)
}
mu.Unlock()
return
}
}
}()
}

wg.Wait()
return firstErr
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The concurrent batch processing logic can be significantly simplified by using golang.org/x/sync/errgroup. An errgroup.Group automatically handles context cancellation across goroutines and returns the first error, which mirrors your current implementation but in a more idiomatic and maintainable way.

Here's an example of how it could be refactored:

import "golang.org/x/sync/errgroup"

func (b *Backfiller) processBatch(ctx context.Context, from, to uint64, namespaces []types.Namespace) error {
	g, gCtx := errgroup.WithContext(ctx)
	heights := make(chan uint64)

	// Feeder goroutine
	g.Go(func() error {
		defer close(heights)
		for h := from; h <= to; h++ {
			select {
			case heights <- h:
			case <-gCtx.Done():
				return gCtx.Err()
			}
		}
		return nil
	})

	// Worker goroutines
	workers := b.concurrency
	if int(to-from+1) < workers {
		workers = int(to - from + 1)
	}
	for i := 0; i < workers; i++ {
		g.Go(func() error {
			for height := range heights {
				if err := b.processHeight(gCtx, height, namespaces); err != nil {
					return fmt.Errorf("height %d: %w", height, err)
				}
			}
			return nil
		})
	}

	return g.Wait()
}

- Use caller context instead of context.Background() in runIndexer
- Fix goroutine leak in SubscribeHeaders by select-based receive
- Preserve NetworkHeight in subscription sync state updates
- Use errors.Is for sql.ErrNoRows comparisons
- Fix data race on mockFetcher.getHdrFn read
- Remove redundant "[]" check in mapBlobs
- Convert backfill tests to table-driven subtests

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
@tac0turtle tac0turtle merged commit 1107536 into main Feb 19, 2026
3 checks passed
@tac0turtle tac0turtle deleted the phase1/core-storage-sync-engine branch February 19, 2026 13:00
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

SQLite storage layer implementation

1 participant

Comments