feat: implement core storage, fetcher, and sync engine (Phase 1)#27
feat: implement core storage, fetcher, and sync engine (Phase 1)#27tac0turtle merged 2 commits intomainfrom
Conversation
Add SQLite store (pkg/store/sqlite.go) with WAL mode, PRAGMA user_version migration, and idempotent writes. Add Celestia node fetcher (pkg/fetch/celestia_node.go) using go-jsonrpc with slim response types for CometBFT JSON encoding. Implement sync engine with Coordinator state machine (init -> backfill -> stream), concurrent Backfiller with batched worker pool, and SubscriptionManager with gap detection and re-backfill recovery. Wire everything in cmd/apex/main.go with signal handling. Closes #10, #11, #12 Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Summary of ChangesHello @tac0turtle, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request delivers the foundational components for the Apex indexer, establishing a robust data ingestion pipeline. It integrates a persistent SQLite store, a dedicated fetcher for Celestia node data, and a sophisticated sync engine to manage the entire data synchronization process from historical backfilling to real-time streaming. This phase lays the groundwork for reliable and efficient data indexing. Highlights
🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console. Changelog
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
📝 WalkthroughWalkthroughThis PR implements the core indexing infrastructure for Apex: a SQLite persistence layer with schema migrations, a Celestia RPC-based data fetcher, a batch backfiller for historical blocks, a live subscription handler for streaming headers, and integrates them into the Coordinator orchestration layer. The CLI entry point is wired to execute the complete sync workflow. Changes
Sequence Diagram(s)sequenceDiagram
actor User
participant CLI as cmd/apex
participant Coordinator
participant Store
participant Backfiller
participant Fetcher as CelestiaNodeFetcher
participant SubMgr as SubscriptionManager
User->>CLI: Start indexer
CLI->>Coordinator: Run(ctx)
Coordinator->>Store: GetSyncState()
Store-->>Coordinator: fromHeight, networkHeight
Coordinator->>Backfiller: Initialize with heights
activate Backfiller
loop Each Batch
Backfiller->>Backfiller: Create height batch
par Worker Goroutines
Backfiller->>Fetcher: GetHeader(height)
Fetcher-->>Backfiller: header
Backfiller->>Store: PutHeader(header)
Backfiller->>Store: GetNamespaces()
Store-->>Backfiller: namespaces
Backfiller->>Fetcher: GetBlobs(height, namespaces)
Fetcher-->>Backfiller: blobs
Backfiller->>Store: PutBlobs(blobs)
end
Backfiller->>Store: SetSyncState(LatestHeight)
end
deactivate Backfiller
Coordinator->>SubMgr: Initialize & Run
activate SubMgr
SubMgr->>Fetcher: SubscribeHeaders(ctx)
loop Live Stream
Fetcher-->>SubMgr: header (from subscription)
SubMgr->>SubMgr: Validate contiguity
alt Gap Detected
SubMgr-->>Coordinator: ErrGapDetected
Coordinator->>Backfiller: Re-run for gap range
Note over Coordinator: Loop back to backfill
else No Gap
SubMgr->>Store: PutHeader(header)
SubMgr->>Fetcher: GetBlobs(height, namespaces)
SubMgr->>Store: PutBlobs(blobs)
SubMgr->>Store: SetSyncState(Streaming, LatestHeight)
end
end
deactivate SubMgr
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Possibly related issues
Possibly related PRs
Poem
🚥 Pre-merge checks | ✅ 3 | ❌ 2❌ Failed checks (1 warning, 1 inconclusive)
✅ Passed checks (3 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 6
🧹 Nitpick comments (7)
pkg/store/migrations/001_init.sql (1)
17-27:AUTOINCREMENTonblobs.idis unnecessary — consider dropping it.In SQLite,
INTEGER PRIMARY KEY AUTOINCREMENTincurs extra overhead: it maintains thesqlite_sequencetable and performs an additional write on every insert to guarantee monotonically-increasing IDs with no gaps. Since nothing in the application depends on gap-free or non-reused IDs (theUNIQUE(height, namespace, commitment)constraint is the meaningful uniqueness guarantee), plainINTEGER PRIMARY KEYis sufficient and faster.♻️ Proposed change
- id INTEGER PRIMARY KEY AUTOINCREMENT, + id INTEGER PRIMARY KEY,🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pkg/store/migrations/001_init.sql` around lines 17 - 27, The blobs table currently defines id as INTEGER PRIMARY KEY AUTOINCREMENT which forces SQLite to maintain sqlite_sequence and causes extra writes; change the id column definition in the CREATE TABLE for blobs from using AUTOINCREMENT to a plain INTEGER PRIMARY KEY (i.e., remove the AUTOINCREMENT keyword) so inserts use the rowid without the extra overhead—update the blobs CREATE TABLE statement (referencing id, AUTOINCREMENT, and table name blobs) accordingly and ensure any surrounding schema or migrations remain consistent with the plain INTEGER PRIMARY KEY behavior.pkg/store/sqlite_test.go (1)
30-282: Tests don't follow the required table-driven pattern.
TestPutGetHeader,TestGetHeaderNotFound, andTestPutHeaderIdempotentall exercise the samePutHeader/GetHeadersurface; the same applies to the blob and sync-state groups. These should be unified intoTestHeader,TestBlobs, andTestSyncStatefunctions usingt.Runsubtests. As per coding guidelines, "Use table-driven tests pattern for test implementation."♻️ Sketch for header tests
func TestHeader(t *testing.T) { tests := []struct { name string store func() *SQLiteStore height uint64 wantErr error }{ {name: "round-trip", /* ... */}, {name: "not found", height: 999, wantErr: ErrNotFound}, {name: "idempotent insert", /* ... */}, } for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { // ... }) } }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pkg/store/sqlite_test.go` around lines 30 - 282, Several tests duplicate the same Put/Get logic instead of using table-driven subtests; consolidate the header, blobs, and sync-state groups into table-driven tests that call t.Run for each scenario. Replace TestPutGetHeader/TestGetHeaderNotFound/TestPutHeaderIdempotent with a single TestHeader using a []struct{ name string; setup func() *SQLiteStore; input ...; wantErr error } and inside the loop call openTestDB, then exercise s.PutHeader and s.GetHeader (and idempotent re-PutHeader) per case. Do the same for blobs in TestBlobs covering single GetBlob, range GetBlobs, not-found, idempotent PutBlobs, empty inputs (use s.PutBlobs, s.GetBlob, s.GetBlobs), and for sync state in TestSyncState covering fresh ErrNotFound, SetSyncState/GetSyncState, and upsert (use s.SetSyncState and s.GetSyncState); implement assertions inside each t.Run and keep helper calls (openTestDB, testNamespace) inside the table-driven cases.pkg/sync/coordinator.go (1)
80-87:Status()returns onlyState, leavingLatestHeightandNetworkHeightzero.Callers (and future API consumers) may expect
Status()to return a fully populatedSyncStatus. Currently onlyStateis set. Consider readingLatestHeightfrom the store or caching it in the coordinator alongsidestateso the status endpoint is more useful.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pkg/sync/coordinator.go` around lines 80 - 87, Status() currently only returns State and leaves LatestHeight and NetworkHeight zero; update Coordinator.Status() to also populate LatestHeight (and NetworkHeight if available) by reading the latest synced height from the persistent store or from a cached coordinator field protected by c.stateMu (e.g., add/maintain Coordinator.latestHeight and optionally Coordinator.networkHeight when heights change). In practice, inside Status() while holding c.stateMu.RLock(), read c.latestHeight (or call the store accessor such as c.store.LatestHeight()/similar), and set types.SyncStatus{State: c.state, LatestHeight: <value>, NetworkHeight: <value>} so callers receive a fully populated SyncStatus.pkg/sync/coordinator_test.go (2)
44-52: Polling loops without timeout can hang tests indefinitely.The
for { time.Sleep(10ms); if state == Streaming { break } }pattern appears in every test but lacks a deadline. If the coordinator never reachesStreaming(e.g., due to a bug), the test hangs forever instead of failing — particularly problematic in CI.Consider adding a timeout, e.g.:
♻️ Suggested helper
+func waitForState(t *testing.T, coord *Coordinator, want types.SyncState, timeout time.Duration) { + t.Helper() + deadline := time.After(timeout) + for { + select { + case <-deadline: + t.Fatalf("timed out waiting for state %v, current: %v", want, coord.Status().State) + default: + } + if coord.Status().State == want { + return + } + time.Sleep(10 * time.Millisecond) + } +}Then replace each bare polling loop (lines 46-52, 110-116, 156-161, 190-195) with
waitForState(t, coord, types.Streaming, 5*time.Second).🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pkg/sync/coordinator_test.go` around lines 44 - 52, The polling loops in coordinator_test.go use an infinite for-sleep loop to wait for coord.Status() to reach types.Streaming, which can hang tests; add a helper function waitForState(t *testing.T, coord *CoordinatorType?, want types.State, timeout time.Duration) that polls coord.Status() with a small tick and fails the test with t.Fatalf on timeout, then replace each bare loop (the blocks that check coord.Status() and break on types.Streaming) with calls like waitForState(t, coord, types.Streaming, 5*time.Second); ensure the helper references coord.Status() and types.Streaming so it’s easy to locate and reuse across the test file.
143-144: Dead code:origGetNetworkHeadis unused.
origGetNetworkHeadis assigned at line 144 but only referenced via a blank identifier at line 201 (_ = origGetNetworkHead). This looks like a leftover from refactoring. Remove both lines to reduce confusion.🧹 Proposed cleanup
- callCount := 0 - origGetNetworkHead := ft.headH ctx, cancel := context.WithCancel(context.Background()) ... - _ = origGetNetworkHead err := coord.Run(ctx)If
callCountis still needed for the gap-injection guard on line 163, keep it; otherwise remove it too.Also applies to: 201-201
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pkg/sync/coordinator_test.go` around lines 143 - 144, Remove the dead-variable assignment origGetNetworkHead := ft.headH and the later dummy use `_ = origGetNetworkHead`; these are leftovers from refactoring and unused. Also check whether callCount (used as the gap-injection guard) is actually needed; if it's not referenced elsewhere keep neither callCount nor its guard, otherwise keep callCount and just remove origGetNetworkHead and its blank-assignment. Locate symbols ft.headH, origGetNetworkHead, callCount to make the edits.pkg/store/sqlite.go (1)
26-53:SetMaxOpenConns(1)serializes all DB access through a single connection.This is intentional per the comment, and works fine for correctness with SQLite. However, the concurrent backfill workers in
backfill.gowill contend on this single connection. If network fetching is the bottleneck this won't matter, but if the store becomes the bottleneck (e.g., large blobs), consider a separate read-only connection pool or increasing the limit with WAL mode.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pkg/store/sqlite.go` around lines 26 - 53, Open currently calls db.SetMaxOpenConns(1) which forces all SQLite access through a single connection and causes contention (notably for concurrent backfill workers in backfill.go); update the Open function (or SQLiteStore initialization) to avoid serializing all access by either increasing db.SetMaxOpenConns to a higher value when WAL is enabled or by creating and exposing a separate read-only connection pool (e.g., a second *sql.DB managed by SQLiteStore) that backfill workers can use for heavy/readonly reads; ensure any new pool is opened with the same PRAGMA settings and properly closed in SQLiteStore.Close.pkg/fetch/celestia_node.go (1)
225-232:isNotFoundErrrelies on string matching — fragile but pragmatic.This will break silently if Celestia changes error message wording. Consider adding a comment noting the dependency on these specific error strings, or checking for structured error codes if the RPC library supports them.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pkg/fetch/celestia_node.go` around lines 225 - 232, The isNotFoundErr function currently relies solely on fragile string matching of err.Error(); update it to first try structured checks (e.g., type-assert the error to the RPC/grpc error type or use errors.Is against any known Celestia "not found" sentinel/error value) and only fall back to the existing strings.Contains checks for "blob: not found" and "header: not found"; also add a concise comment above isNotFoundErr explaining the dependency on those specific strings and why the structured-check-first fallback-to-string approach is used so future changes to the RPC library can be handled safely.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@cmd/apex/main.go`:
- Around line 120-135: The function runIndexer currently discards the caller
context by using `_ context.Context` and creates a fresh context with
signal.NotifyContext(context.Background(), ...); change the signature to accept
a real context parameter (e.g., runIndexer(ctx context.Context, cfg
*config.Config) error) and call signal.NotifyContext(ctx, syscall.SIGINT,
syscall.SIGTERM) so the returned ctx inherits the caller's
cancellation/deadline; ensure the new ctx is used for downstream operations and
call stop() appropriately to cancel the signal watcher.
In `@pkg/fetch/celestia_node.go`:
- Around line 96-120: The goroutine in SubscribeHeaders can leak because "for
raw := range rawCh" blocks and won't observe ctx cancellation; replace the range
loop with an explicit select-based receive like "for { select { case raw, ok :=
<-rawCh: if !ok { break } ... case <-ctx.Done(): return } }" so the goroutine
exits when ctx is canceled, keep the existing mapHeader handling and logging,
and preserve deferred close(out); use f.header.Subscribe call and rawCh name to
locate the subscription code.
In `@pkg/store/sqlite.go`:
- Around line 159-173: Replace direct equality checks against sql.ErrNoRows with
errors.Is(...) in this file: locate the comparison inside GetHeader (and the two
other occurrences in this file where code does `err == sql.ErrNoRows`) and
change each to `if errors.Is(err, sql.ErrNoRows) { ... }`; also add the standard
"errors" import to the import block so the code compiles. Ensure the error
handling branches and wrapped fmt.Errorf usages remain unchanged except for
using errors.Is for the sentinel check.
In `@pkg/sync/backfill_test.go`:
- Around line 11-128: Combine the four tests TestBackfillerBatchProcessing,
TestBackfillerContextCancellation, TestBackfillerFetchError, and
TestBackfillerSingleHeight into a single table-driven test (e.g.,
TestBackfiller) that iterates over a slice of cases and uses t.Run with
t.Parallel for each case; for each case call a setup func that prepares new
mockStore and mockFetcher (use makeHeader, mockFetcher.addHeader,
mockFetcher.addBlobs as appropriate), then construct a Backfiller and call
Backfiller.Run with the case's start/end and assert errors, stored
headers/blobs, and sync state (LatestHeight) based on case.wantErr, wantBlobs,
and wantLatest — this consolidates test setup and allows parallel subtests per
the coding guideline.
In `@pkg/sync/mock_test.go`:
- Around line 153-164: In GetHeader, reading f.getHdrFn happens before acquiring
f.mu which can race with concurrent writers; change to read getHdrFn under the
mutex (e.g., f.mu.Lock(); fn := f.getHdrFn; f.mu.Unlock()), then if fn != nil
call fn(ctx, height); keep the existing locked access for f.headers (use the
same mu) so all shared fields (getHdrFn and headers) are consistently
synchronized in mockFetcher.GetHeader.
In `@pkg/sync/subscription.go`:
- Around line 94-99: SetSyncState is writing a SyncStatus with NetworkHeight
left at zero, overwriting stored backfill progress; update
SubscriptionManager.Run so before calling sm.store.SetSyncState you populate
types.SyncStatus.NetworkHeight (either by fetching the current network head once
at startup and storing it in a local variable, or by using hdr.Height when the
subscription is at tip) and pass that value into the struct you pass to
sm.store.SetSyncState (keep State and LatestHeight as-is but ensure
NetworkHeight is non-zero) to avoid corrupting the sync_state row.
---
Nitpick comments:
In `@pkg/fetch/celestia_node.go`:
- Around line 225-232: The isNotFoundErr function currently relies solely on
fragile string matching of err.Error(); update it to first try structured checks
(e.g., type-assert the error to the RPC/grpc error type or use errors.Is against
any known Celestia "not found" sentinel/error value) and only fall back to the
existing strings.Contains checks for "blob: not found" and "header: not found";
also add a concise comment above isNotFoundErr explaining the dependency on
those specific strings and why the structured-check-first fallback-to-string
approach is used so future changes to the RPC library can be handled safely.
In `@pkg/store/migrations/001_init.sql`:
- Around line 17-27: The blobs table currently defines id as INTEGER PRIMARY KEY
AUTOINCREMENT which forces SQLite to maintain sqlite_sequence and causes extra
writes; change the id column definition in the CREATE TABLE for blobs from using
AUTOINCREMENT to a plain INTEGER PRIMARY KEY (i.e., remove the AUTOINCREMENT
keyword) so inserts use the rowid without the extra overhead—update the blobs
CREATE TABLE statement (referencing id, AUTOINCREMENT, and table name blobs)
accordingly and ensure any surrounding schema or migrations remain consistent
with the plain INTEGER PRIMARY KEY behavior.
In `@pkg/store/sqlite_test.go`:
- Around line 30-282: Several tests duplicate the same Put/Get logic instead of
using table-driven subtests; consolidate the header, blobs, and sync-state
groups into table-driven tests that call t.Run for each scenario. Replace
TestPutGetHeader/TestGetHeaderNotFound/TestPutHeaderIdempotent with a single
TestHeader using a []struct{ name string; setup func() *SQLiteStore; input ...;
wantErr error } and inside the loop call openTestDB, then exercise s.PutHeader
and s.GetHeader (and idempotent re-PutHeader) per case. Do the same for blobs in
TestBlobs covering single GetBlob, range GetBlobs, not-found, idempotent
PutBlobs, empty inputs (use s.PutBlobs, s.GetBlob, s.GetBlobs), and for sync
state in TestSyncState covering fresh ErrNotFound, SetSyncState/GetSyncState,
and upsert (use s.SetSyncState and s.GetSyncState); implement assertions inside
each t.Run and keep helper calls (openTestDB, testNamespace) inside the
table-driven cases.
In `@pkg/store/sqlite.go`:
- Around line 26-53: Open currently calls db.SetMaxOpenConns(1) which forces all
SQLite access through a single connection and causes contention (notably for
concurrent backfill workers in backfill.go); update the Open function (or
SQLiteStore initialization) to avoid serializing all access by either increasing
db.SetMaxOpenConns to a higher value when WAL is enabled or by creating and
exposing a separate read-only connection pool (e.g., a second *sql.DB managed by
SQLiteStore) that backfill workers can use for heavy/readonly reads; ensure any
new pool is opened with the same PRAGMA settings and properly closed in
SQLiteStore.Close.
In `@pkg/sync/coordinator_test.go`:
- Around line 44-52: The polling loops in coordinator_test.go use an infinite
for-sleep loop to wait for coord.Status() to reach types.Streaming, which can
hang tests; add a helper function waitForState(t *testing.T, coord
*CoordinatorType?, want types.State, timeout time.Duration) that polls
coord.Status() with a small tick and fails the test with t.Fatalf on timeout,
then replace each bare loop (the blocks that check coord.Status() and break on
types.Streaming) with calls like waitForState(t, coord, types.Streaming,
5*time.Second); ensure the helper references coord.Status() and types.Streaming
so it’s easy to locate and reuse across the test file.
- Around line 143-144: Remove the dead-variable assignment origGetNetworkHead :=
ft.headH and the later dummy use `_ = origGetNetworkHead`; these are leftovers
from refactoring and unused. Also check whether callCount (used as the
gap-injection guard) is actually needed; if it's not referenced elsewhere keep
neither callCount nor its guard, otherwise keep callCount and just remove
origGetNetworkHead and its blank-assignment. Locate symbols ft.headH,
origGetNetworkHead, callCount to make the edits.
In `@pkg/sync/coordinator.go`:
- Around line 80-87: Status() currently only returns State and leaves
LatestHeight and NetworkHeight zero; update Coordinator.Status() to also
populate LatestHeight (and NetworkHeight if available) by reading the latest
synced height from the persistent store or from a cached coordinator field
protected by c.stateMu (e.g., add/maintain Coordinator.latestHeight and
optionally Coordinator.networkHeight when heights change). In practice, inside
Status() while holding c.stateMu.RLock(), read c.latestHeight (or call the store
accessor such as c.store.LatestHeight()/similar), and set
types.SyncStatus{State: c.state, LatestHeight: <value>, NetworkHeight: <value>}
so callers receive a fully populated SyncStatus.
| func (f *CelestiaNodeFetcher) SubscribeHeaders(ctx context.Context) (<-chan *types.Header, error) { | ||
| rawCh, err := f.header.Subscribe(ctx) | ||
| if err != nil { | ||
| return nil, fmt.Errorf("header.Subscribe: %w", err) | ||
| } | ||
|
|
||
| out := make(chan *types.Header, 64) | ||
| go func() { | ||
| defer close(out) | ||
| for raw := range rawCh { | ||
| hdr, err := mapHeader(raw) | ||
| if err != nil { | ||
| f.log.Error().Err(err).Msg("failed to parse subscribed header") | ||
| continue | ||
| } | ||
| select { | ||
| case out <- hdr: | ||
| case <-ctx.Done(): | ||
| return | ||
| } | ||
| } | ||
| }() | ||
|
|
||
| return out, nil | ||
| } |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
cat go.mod | grep -i jsonrpcRepository: evstack/apex
Length of output: 103
🏁 Script executed:
rg -n "Subscribe" pkg/fetch/celestia_node.go -B 2 -A 2Repository: evstack/apex
Length of output: 533
🏁 Script executed:
# Search for import paths to understand the go-jsonrpc dependency
rg -n "go-jsonrpc\|jsonrpc" --type go -A 1 | head -50Repository: evstack/apex
Length of output: 38
🌐 Web query:
go-jsonrpc Subscribe context cancellation channel closure behavior
💡 Result:
In filecoin-project/go-jsonrpc, “subscriptions” are typically implemented as RPC methods that return a Go channel (websocket mode only), not as a special Subscribe API.
Behavior is:
- Context cancellation propagation (client → server): if the client-side
ctxfor an in-flight JSON-RPC call is canceled, go-jsonrpc propagates that cancellation to the server handler; in websocket mode it does so by sending anxrpc.cancelmessage with the canceled request ID. [1][2] - How/when the subscription channel closes:
- When the server closes the returned channel, the client is notified via
xrpc.ch.closeand the client-side channel is closed accordingly. [1][2] - If the websocket connection breaks, the client-side channel is closed; on the server side, further writes are discarded. [1][2]
- When the server closes the returned channel, the client is notified via
- Important implication: canceling the context used to initiate the call does not, by itself, guarantee the returned channel will be closed on the client. The intended pattern is that the server handler must watch
ctx.Done()and stop producing/close the channel itself (i.e., treat context cancellation as the signal to end the stream). [1][2]
Sources:
[1] go-jsonrpc README / pkg docs describing context cancel (xrpc.cancel) and channel close (xrpc.ch.close) + websocket-break behavior. (github.com)
[2] pkg.go.dev for github.com/filecoin-project/go-jsonrpc (same content as README excerpt). (pkg.go.dev)
Citations:
- 1: https://github.com/filecoin-project/go-jsonrpc?utm_source=openai
- 2: https://pkg.go.dev/github.com/filecoin-project/go-jsonrpc?utm_source=openai
Potential goroutine leak if the upstream rawCh doesn't close on context cancellation.
The for raw := range rawCh (line 105) blocks waiting for the next message. The ctx.Done() check on line 113 only executes after a message is received. If ctx is cancelled but rawCh is neither closed nor producing items, this goroutine will leak.
Per go-jsonrpc's documented behavior (v0.10.1), canceling the context does not automatically close the returned subscription channel. The channel closes only when: (1) the server closes it, or (2) the websocket connection breaks. This means safety depends entirely on whether the Celestia node's server-side Subscribe implementation watches ctx.Done() and closes the channel accordingly. If the server doesn't do this, the goroutine will be stuck at the range statement indefinitely.
Consider adding a separate goroutine that monitors ctx.Done() to explicitly cancel or drain the subscription, or refactoring to handle context cancellation more explicitly.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@pkg/fetch/celestia_node.go` around lines 96 - 120, The goroutine in
SubscribeHeaders can leak because "for raw := range rawCh" blocks and won't
observe ctx cancellation; replace the range loop with an explicit select-based
receive like "for { select { case raw, ok := <-rawCh: if !ok { break } ... case
<-ctx.Done(): return } }" so the goroutine exits when ctx is canceled, keep the
existing mapHeader handling and logging, and preserve deferred close(out); use
f.header.Subscribe call and rawCh name to locate the subscription code.
| func (f *mockFetcher) GetHeader(ctx context.Context, height uint64) (*types.Header, error) { | ||
| if f.getHdrFn != nil { | ||
| return f.getHdrFn(ctx, height) | ||
| } | ||
| f.mu.Lock() | ||
| defer f.mu.Unlock() | ||
| h, ok := f.headers[height] | ||
| if !ok { | ||
| return nil, store.ErrNotFound | ||
| } | ||
| return h, nil | ||
| } |
There was a problem hiding this comment.
getHdrFn read without holding the lock — potential data race under -race.
f.getHdrFn is accessed at line 154 before f.mu.Lock() is acquired. If any test assigns getHdrFn after goroutines that call GetHeader are already running (e.g., for dynamic injection mid-test), the race detector will flag the concurrent write/read. Consistent locking avoids this:
🔒 Proposed fix
func (f *mockFetcher) GetHeader(ctx context.Context, height uint64) (*types.Header, error) {
- if f.getHdrFn != nil {
- return f.getHdrFn(ctx, height)
- }
f.mu.Lock()
- defer f.mu.Unlock()
+ fn := f.getHdrFn
h, ok := f.headers[height]
+ f.mu.Unlock()
+ if fn != nil {
+ return fn(ctx, height)
+ }
if !ok {
return nil, store.ErrNotFound
}
return h, nil
}Based on learnings, all tests must be run with the -race flag; unprotected shared field access defeats that goal.
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| func (f *mockFetcher) GetHeader(ctx context.Context, height uint64) (*types.Header, error) { | |
| if f.getHdrFn != nil { | |
| return f.getHdrFn(ctx, height) | |
| } | |
| f.mu.Lock() | |
| defer f.mu.Unlock() | |
| h, ok := f.headers[height] | |
| if !ok { | |
| return nil, store.ErrNotFound | |
| } | |
| return h, nil | |
| } | |
| func (f *mockFetcher) GetHeader(ctx context.Context, height uint64) (*types.Header, error) { | |
| f.mu.Lock() | |
| fn := f.getHdrFn | |
| h, ok := f.headers[height] | |
| f.mu.Unlock() | |
| if fn != nil { | |
| return fn(ctx, height) | |
| } | |
| if !ok { | |
| return nil, store.ErrNotFound | |
| } | |
| return h, nil | |
| } |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@pkg/sync/mock_test.go` around lines 153 - 164, In GetHeader, reading
f.getHdrFn happens before acquiring f.mu which can race with concurrent writers;
change to read getHdrFn under the mutex (e.g., f.mu.Lock(); fn := f.getHdrFn;
f.mu.Unlock()), then if fn != nil call fn(ctx, height); keep the existing locked
access for f.headers (use the same mu) so all shared fields (getHdrFn and
headers) are consistently synchronized in mockFetcher.GetHeader.
| if err := sm.store.SetSyncState(ctx, types.SyncStatus{ | ||
| State: types.Streaming, | ||
| LatestHeight: hdr.Height, | ||
| }); err != nil { | ||
| return fmt.Errorf("set sync state: %w", err) | ||
| } |
There was a problem hiding this comment.
SetSyncState always writes NetworkHeight: 0, corrupting previously stored progress.
types.SyncStatus{State: types.Streaming, LatestHeight: hdr.Height} leaves NetworkHeight as the zero value. Since SetSyncState performs a full upsert of the singleton sync_state row, every processed header overwrites whatever NetworkHeight the backfill phase had persisted with 0.
The simplest fix is to track the network height in SubscriptionManager.Run — it can be fetched once at startup and refreshed periodically, or simply passed in from the coordinator:
🐛 Proposed minimal fix (fetch network head once in Run)
func (sm *SubscriptionManager) Run(ctx context.Context) error {
ch, err := sm.fetcher.SubscribeHeaders(ctx)
if err != nil {
return fmt.Errorf("subscribe headers: %w", err)
}
namespaces, err := sm.store.GetNamespaces(ctx)
if err != nil {
return fmt.Errorf("get namespaces: %w", err)
}
var lastHeight uint64
ss, err := sm.store.GetSyncState(ctx)
if err != nil && !errors.Is(err, store.ErrNotFound) {
return fmt.Errorf("get sync state: %w", err)
}
+ var networkHeight uint64
if ss != nil {
lastHeight = ss.LatestHeight
+ networkHeight = ss.NetworkHeight
} if err := sm.store.SetSyncState(ctx, types.SyncStatus{
State: types.Streaming,
LatestHeight: hdr.Height,
+ NetworkHeight: networkHeight,
}); err != nil {For a live view, networkHeight could also be updated from the incoming hdr.Height (the subscription is at the network tip, so hdr.Height is the network height) — either approach beats overwriting with 0.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@pkg/sync/subscription.go` around lines 94 - 99, SetSyncState is writing a
SyncStatus with NetworkHeight left at zero, overwriting stored backfill
progress; update SubscriptionManager.Run so before calling sm.store.SetSyncState
you populate types.SyncStatus.NetworkHeight (either by fetching the current
network head once at startup and storing it in a local variable, or by using
hdr.Height when the subscription is at tip) and pass that value into the struct
you pass to sm.store.SetSyncState (keep State and LatestHeight as-is but ensure
NetworkHeight is non-zero) to avoid corrupting the sync_state row.
There was a problem hiding this comment.
Code Review
This pull request introduces the core components for the indexer: a SQLite-based store, a Celestia node fetcher, and a sync engine. The overall architecture is robust, with a clear separation of concerns, good use of interfaces, and a resilient sync coordinator that handles backfilling, streaming, and gap detection. The code is well-tested, including unit tests for new components and integration-style tests for the sync logic using mocks. My feedback includes a few suggestions for improvement, mainly around context handling and opportunities for code simplification.
| func (b *Backfiller) processBatch(ctx context.Context, from, to uint64, namespaces []types.Namespace) error { | ||
| heights := make(chan uint64, to-from+1) | ||
| for h := from; h <= to; h++ { | ||
| heights <- h | ||
| } | ||
| close(heights) | ||
|
|
||
| workers := b.concurrency | ||
| if int(to-from+1) < workers { | ||
| workers = int(to - from + 1) | ||
| } | ||
|
|
||
| var ( | ||
| wg sync.WaitGroup | ||
| mu sync.Mutex | ||
| firstErr error | ||
| ) | ||
|
|
||
| for range workers { | ||
| wg.Add(1) | ||
| go func() { | ||
| defer wg.Done() | ||
| for height := range heights { | ||
| // Check for prior error or context cancellation. | ||
| mu.Lock() | ||
| failed := firstErr != nil | ||
| mu.Unlock() | ||
| if failed { | ||
| return | ||
| } | ||
|
|
||
| if err := ctx.Err(); err != nil { | ||
| mu.Lock() | ||
| if firstErr == nil { | ||
| firstErr = err | ||
| } | ||
| mu.Unlock() | ||
| return | ||
| } | ||
|
|
||
| if err := b.processHeight(ctx, height, namespaces); err != nil { | ||
| mu.Lock() | ||
| if firstErr == nil { | ||
| firstErr = fmt.Errorf("height %d: %w", height, err) | ||
| } | ||
| mu.Unlock() | ||
| return | ||
| } | ||
| } | ||
| }() | ||
| } | ||
|
|
||
| wg.Wait() | ||
| return firstErr | ||
| } |
There was a problem hiding this comment.
The concurrent batch processing logic can be significantly simplified by using golang.org/x/sync/errgroup. An errgroup.Group automatically handles context cancellation across goroutines and returns the first error, which mirrors your current implementation but in a more idiomatic and maintainable way.
Here's an example of how it could be refactored:
import "golang.org/x/sync/errgroup"
func (b *Backfiller) processBatch(ctx context.Context, from, to uint64, namespaces []types.Namespace) error {
g, gCtx := errgroup.WithContext(ctx)
heights := make(chan uint64)
// Feeder goroutine
g.Go(func() error {
defer close(heights)
for h := from; h <= to; h++ {
select {
case heights <- h:
case <-gCtx.Done():
return gCtx.Err()
}
}
return nil
})
// Worker goroutines
workers := b.concurrency
if int(to-from+1) < workers {
workers = int(to - from + 1)
}
for i := 0; i < workers; i++ {
g.Go(func() error {
for height := range heights {
if err := b.processHeight(gCtx, height, namespaces); err != nil {
return fmt.Errorf("height %d: %w", height, err)
}
}
return nil
})
}
return g.Wait()
}- Use caller context instead of context.Background() in runIndexer - Fix goroutine leak in SubscribeHeaders by select-based receive - Preserve NetworkHeight in subscription sync state updates - Use errors.Is for sql.ErrNoRows comparisons - Fix data race on mockFetcher.getHdrFn read - Remove redundant "[]" check in mapBlobs - Convert backfill tests to table-driven subtests Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Add SQLite store (pkg/store/sqlite.go) with WAL mode, PRAGMA user_version migration, and idempotent writes. Add Celestia node fetcher (pkg/fetch/celestia_node.go) using go-jsonrpc with slim response types for CometBFT JSON encoding. Implement sync engine with Coordinator state machine (init -> backfill -> stream), concurrent Backfiller with batched worker pool, and SubscriptionManager with gap detection and re-backfill recovery. Wire everything in cmd/apex/main.go with signal handling.
Closes #10, #11, #12
Overview
Summary by CodeRabbit
Release Notes
New Features
Tests
Chores