Conversation
…ucture (Phase 2) Add the complete API layer so rollup clients can connect to Apex as a drop-in replacement for celestia-node. This implements issues #14, #2, and #3. Subscription infrastructure (#14): - Event fan-out notifier with configurable buffer, namespace filtering, contiguity tracking, and non-blocking publish with capacity warnings - SQLite read/write pool split for concurrent API reads - Observer hook on sync coordinator for publishing height events JSON-RPC compatibility layer (#2): - Shared service layer with blob/header operations - celestia-node compatible JSON-RPC handlers (blob, header modules) - Proof forwarding to upstream node (GetProof, Included) - Stub modules for share, fraud, blobstream - WebSocket subscription support for blob and header streams gRPC API (#3): - Protobuf service definitions (BlobService, HeaderService) - Server-streaming Subscribe RPCs for real-time events - Buf configuration for proto generation - Separate configurable port (default :9090) Both API servers wire through the shared Service layer with graceful shutdown on SIGINT/SIGTERM. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
📝 WalkthroughWalkthroughAdds protobuf definitions and Buf config, implements JSON‑RPC and gRPC servers with tests, introduces an API Service and Notifier, adds proof‑forwarding interfaces, splits SQLite reader/writer pools with pagination, and wires a HeightObserver into sync coordinator; main starts servers and handles graceful shutdown. Changes
Sequence Diagram(s)sequenceDiagram
participant Client as Client (HTTP/WS)
participant JSONRPC as JSON‑RPC Server
participant GRPC as gRPC Server
participant Service as API Service
participant Store as SQLite Store
participant Notifier as Notifier
participant Sync as Sync Coordinator
rect rgba(100,200,150,0.5)
Note over Client,Service: Request/Fetch Flow
Client->>JSONRPC: blob.Get(height, namespace)
JSONRPC->>Service: BlobGet(height, namespace)
Service->>Store: GetBlob(height, namespace, commitment)
Store-->>Service: Blob data
Service-->>JSONRPC: JSON blob
JSONRPC-->>Client: response
end
rect rgba(150,200,100,0.5)
Note over Client,Notifier: Subscribe Flow
Client->>GRPC: Subscribe(namespace)
GRPC->>Service: BlobSubscribe(namespace)
Service->>Notifier: Subscribe([namespace])
Notifier-->>Service: subscription channel
Service-->>GRPC: streamer
GRPC->>Client: stream BlobEvents
end
rect rgba(200,150,100,0.5)
Note over Sync,Notifier: Publish Flow
Sync->>Store: store header & blobs
Sync->>Notifier: Publish(HeightEvent)
Notifier->>Notifier: filter by namespace, non-blocking send
Notifier-->>Client: events to subscribers
end
sequenceDiagram
participant Main as cmd/apex
participant Config as Config Loader
participant Store as SQLite Store
participant Notifier as Notifier
participant Service as API Service
participant JSONRPC as JSON‑RPC Server
participant GRPC as gRPC Server
participant Sync as Coordinator
rect rgba(100,150,200,0.5)
Note over Main,Sync: Startup
Main->>Config: Load config
Config-->>Main: config (subscription, grpc_listen_addr, readPoolSize)
Main->>Store: Open(dbPath, readPoolSize)
Store-->>Main: reader/writer DBs
Main->>Notifier: NewNotifier(bufferSize)
Notifier-->>Main: notifier
Main->>Service: NewService(store, fetcher, proofForwarder, notifier)
Service-->>Main: service
Main->>JSONRPC: NewServer(service)
Main->>GRPC: NewServer(service)
Main->>Sync: NewCoordinator(WithObserver(notifier.Publish))
Main->>JSONRPC: Start (goroutine)
Main->>GRPC: Start (goroutine)
Main->>Sync: Run()
end
rect rgba(200,100,150,0.5)
Note over Main,Sync: Shutdown
Sync-->>Main: Run() returns
Main->>GRPC: GracefulStop()
Main->>JSONRPC: Shutdown(ctx)
Main->>Store: Close()
end
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Possibly related PRs
Poem
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
Summary of ChangesHello @tac0turtle, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request significantly enhances the Highlights
🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console. Changelog
Activity
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
There was a problem hiding this comment.
Code Review
This pull request introduces a comprehensive API layer for the Apex indexer, including both gRPC and JSON-RPC interfaces, a pub/sub notifier for real-time events, protobuf definitions, and extensive test coverage. It also improves database access to support concurrent reads. However, several Denial of Service (DoS) vulnerabilities were identified in the API handlers due to a lack of input validation and resource limits on requests that accept lists of namespaces. These could be exploited to cause excessive database load or memory exhaustion, and implementing sensible limits on input sizes is recommended before deploying to production. Additionally, there are suggestions for performance and error handling improvements.
There was a problem hiding this comment.
Actionable comments posted: 8
🧹 Nitpick comments (11)
justfile (1)
39-41:checktarget skipsbuf lint— proto violations won't be caught in CI.The
checktarget (described as "CI equivalent") runs golangci-lint for Go but never runsbuf lint. Sincebuf.yamlenablesSTANDARDlint,buf lintwould currently fail onheader.proto(see that file's review). Additionally, there is no check to ensure generated files inpkg/api/grpc/genare up to date.🔧 Suggested additions to `check`
+# Lint proto files +proto-lint: + buf lint + +# Verify generated proto files are up-to-date (for CI) +proto-check: proto + git diff --exit-code pkg/api/grpc/gen + # Run all checks (CI equivalent) -check: tidy-check lint test build +check: tidy-check proto-check proto-lint lint test buildAlso applies to: 43-44
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@justfile` around lines 39 - 41, The CI-equivalent check target currently only runs golangci-lint and misses protobuf linting and generated-code checks; update the check target to run buf lint (to catch STANDARD rules like the violation in header.proto) and add a verification step that runs the proto target (or buf generate) and fails if generated files in pkg/api/grpc/gen are out of date (e.g., run just proto/buf generate then ensure no diffs). Modify the existing check target to invoke "buf lint" and a generation-and-compare step so proto lint errors and stale generated code cause the check to fail.pkg/api/jsonrpc/stubs.go (2)
14-15: Minor naming inconsistency:BlobStubs(plural) vsShareStub,FraudStub,BlobstreamStub(singular).Pick one convention — singular is more idiomatic for a Go type name.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pkg/api/jsonrpc/stubs.go` around lines 14 - 15, The type name BlobStubs is inconsistent with the other singular stub types (ShareStub, FraudStub, BlobstreamStub); rename BlobStubs to BlobStub and update all references/usages accordingly (constructors, variable declarations, tests, and any method receivers tied to BlobStubs) so the type name follows the singular convention like ShareStub, FraudStub, and BlobstreamStub.
9-12: Prefererrors.Newfor static sentinel errors.
fmt.Errorfis unnecessary when there are no format verbs.errors.Newis more idiomatic and avoids the implicit dependency onfmt.Suggested fix
+ "errors" - "fmt" var ( - errNotSupported = fmt.Errorf("method not supported by apex indexer") - errReadOnly = fmt.Errorf("apex is a read-only indexer, blob submission not supported") + errNotSupported = errors.New("method not supported by apex indexer") + errReadOnly = errors.New("apex is a read-only indexer, blob submission not supported") )🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pkg/api/jsonrpc/stubs.go` around lines 9 - 12, Replace the sentinel errors errNotSupported and errReadOnly in pkg/api/jsonrpc/stubs.go to use errors.New instead of fmt.Errorf (i.e., errNotSupported = errors.New("method not supported by apex indexer") and errReadOnly = errors.New("apex is a read-only indexer, blob submission not supported")), and update imports to remove "fmt" and add "errors" (or keep "fmt" only if used elsewhere).pkg/sync/coordinator.go (1)
19-21:heightparameter is redundant withheader.Height.
HeightObserverreceives bothheight uint64andheader *types.Header—header.Heightalready carries the same value. This isn't a bug, but it widens the API surface unnecessarily and could lead to inconsistency if callers pass mismatched values.Consider simplifying to
func(header *types.Header, blobs []types.Blob).🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pkg/sync/coordinator.go` around lines 19 - 21, HeightObserver currently takes a redundant height uint64 plus header *types.Header and blobs []types.Blob, which can cause inconsistency; change the type signature of HeightObserver to accept only (header *types.Header, blobs []types.Blob), update all callers/registerers to stop passing the separate height value and instead use header.Height where needed, and adjust any implementations of HeightObserver to the new two-argument form (search for the HeightObserver type definition and all references to it).pkg/api/grpc/server.go (1)
12-19: Consider adding gRPC server options for production readiness.The server is created with
grpc.NewServer()using no options. A few things to consider for follow-up:
- Max message size: Default 4 MB receive limit may be too small for large blobs. Use
grpc.MaxRecvMsgSize(...).- Recovery interceptor: An unhandled panic in a handler will crash the process. A recovery interceptor (e.g.,
grpc-ecosystem/go-grpc-middleware) gracefully returns an Internal error instead.- Reflection: Registering
reflection.Register(srv)enables debugging withgrpcurl.Not blocking, but worth tracking.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pkg/api/grpc/server.go` around lines 12 - 19, The gRPC server is created without production-ready options in NewServer; update NewServer to configure sensible server options: set MaxRecvMsgSize and MaxSendMsgSize via grpc.MaxRecvMsgSize(...) / grpc.MaxSendMsgSize(...) to allow larger blobs, add recovery middleware as a UnaryInterceptor/StreamInterceptor (e.g., using grpc-ecosystem/go-grpc-middleware's recovery) to convert panics into Internal errors, and register reflection (reflection.Register(srv)) so tools like grpcurl can inspect services (ensure these options are applied before registering BlobServiceServer and HeaderServiceServer).proto/apex/v1/blob.proto (1)
10-19: Buf lint violations: RPC request/response naming and missing response wrapper forGet.Buf reports
RPC_REQUEST_STANDARD_NAMEandRPC_RESPONSE_STANDARD_NAMEerrors on all three RPCs. Beyond naming,GetreturnsBlobdirectly — this prevents adding response-level metadata (e.g., proof status, pagination) later without a breaking change.Consider wrapping the response and adopting Buf's naming convention:
Proposed fix
service BlobService { - rpc Get(GetBlobRequest) returns (Blob); - rpc GetAll(GetAllBlobsRequest) returns (GetAllBlobsResponse); - rpc Subscribe(SubscribeBlobsRequest) returns (stream BlobEvent); + rpc Get(BlobServiceGetRequest) returns (BlobServiceGetResponse); + rpc GetAll(BlobServiceGetAllRequest) returns (BlobServiceGetAllResponse); + rpc Subscribe(BlobServiceSubscribeRequest) returns (stream BlobServiceSubscribeResponse); } -message GetBlobRequest { +message BlobServiceGetRequest { uint64 height = 1; bytes namespace = 2; bytes commitment = 3; } -message GetAllBlobsRequest { +message BlobServiceGetResponse { + Blob blob = 1; +} + +message BlobServiceGetAllRequest { uint64 height = 1; repeated bytes namespaces = 2; } -message GetAllBlobsResponse { +message BlobServiceGetAllResponse { repeated Blob blobs = 1; } -message SubscribeBlobsRequest { +message BlobServiceSubscribeRequest { repeated bytes namespaces = 1; } -message BlobEvent { +message BlobServiceSubscribeResponse { uint64 height = 1; repeated Blob blobs = 2; }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@proto/apex/v1/blob.proto` around lines 10 - 19, Buf lint requires RPC request/response names to follow the RPCNameRequest/RPCNameResponse pattern and the Get RPC must return a response wrapper instead of a raw Blob; rename and adjust messages accordingly: rename GetBlobRequest -> GetRequest and replace Get's response Blob with a wrapper message GetResponse that contains a Blob field; rename GetAllBlobsRequest -> GetAllRequest and GetAllBlobsResponse -> GetAllResponse (preserve fields), and rename SubscribeBlobsRequest -> SubscribeRequest and the streamed response type to SubscribeResponse (or SubscribeStreamingResponse) to follow RPCNameResponse naming; update the service RPC signatures (Get, GetAll, Subscribe) to reference the new message names and update all code/refs that use the old message names.pkg/api/grpc/server_test.go (1)
124-164:startTestServerandstartTestHeaderServerare nearly identical; consider unifying.Both helpers create the same gRPC server (with both services registered) but return different client types. A single helper returning both clients (or accepting a callback) would reduce duplication.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pkg/api/grpc/server_test.go` around lines 124 - 164, startTestServer and startTestHeaderServer duplicate setup code for creating the gRPC server and connection; consolidate them into a single helper (e.g., startTestClients or startTestServerWithClients) that calls NewServer and reuses the same listener/connection setup, then either returns both clients (pb.NewBlobServiceClient and pb.NewHeaderServiceClient) or accepts a callback to receive the connection and construct clients there; ensure t.Cleanup still calls srv.GracefulStop and conn.Close and replace usages of startTestServer/startTestHeaderServer with the unified helper.pkg/api/grpc/blob_service.go (2)
23-42:GetByHeighterror from store mapped uniformly toInternal; consider distinguishingNotFound.Line 32 maps all
GetBlobserrors tocodes.Internal. If the store returns a "not found"-style error for the height (unlikely forGetBlobswhich returns empty slices, but possible for future store implementations), it would be more appropriate to map it tocodes.NotFound. Current behavior is acceptable given the store contract, but worth a note for robustness.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pkg/api/grpc/blob_service.go` around lines 23 - 42, The Get handler currently maps every error from s.svc.Store().GetBlobs to codes.Internal; update Get to detect a not-found-style error (compare err to store.ErrNotFound or use errors.Is(err, store.ErrNotFound)) and return status.Error(codes.NotFound, err.Error()) in that case, otherwise keep returning status.Errorf(codes.Internal, "get blobs: %v", err); touch the Get function and its s.svc.Store().GetBlobs error handling and use errors.Is/store.ErrNotFound when available.
107-117: Narrowing conversionint→int32onIndex(Line 115).
types.Blob.Indexisint(platform-dependent, 64-bit on most targets) while the proto field isint32. If an index ever exceedsmath.MaxInt32, this silently truncates. Blob indices are practically small, but a bounds check or usingint64in the proto would be more defensive.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pkg/api/grpc/blob_service.go` around lines 107 - 117, The conversion in blobToProto silently narrows types by casting types.Blob.Index (int) to int32 for pb.Blob.Index; add an explicit bounds check against math.MaxInt32 and math.MinInt32 and handle overflow instead of truncating: update blobToProto to return (*pb.Blob, error), import math, check if b.Index is outside int32 range and return a clear error when it is, otherwise safely cast to int32 and return the proto; alternatively, if changing the proto is preferred, change pb.Blob.Index to int64 and remove the cast—reference the blobToProto function and the Index field when making the change.pkg/api/service_test.go (1)
16-114: Duplicated mock implementations across three test packages.
mockStore,mockFetcher, andtestNamespaceare nearly identical inpkg/api/service_test.go,pkg/api/grpc/server_test.go, andpkg/api/jsonrpc/server_test.go. Consider extracting them into a shared internal test helper package (e.g.,pkg/api/apitestorpkg/internal/testutil) to reduce maintenance burden and keep the mocks in sync as theStore/DataFetcherinterfaces evolve.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pkg/api/service_test.go` around lines 16 - 114, The three duplicated mocks (mockStore, mockFetcher, and testNamespace) should be moved into a single internal test helper package (e.g., apitest or testutil) and imported by the three test packages: create a new package that exposes the constructors and types (e.g., NewMockStore, NewMockFetcher, TestNamespace) and move the implementations of mockStore, mockFetcher, Get/Put methods, and testNamespace there, update existing tests to import and use those constructors instead of local duplicates, and ensure exported names match usage (or use package-level helpers) so the Store/DataFetcher interface implementations remain identical and compile without changing production code.cmd/apex/main.go (1)
173-200: JSON-RPC and gRPC server startup failures are silently swallowed.If
httpSrv.ListenAndServe()(line 183) orgrpcSrv.Serve(lis)(line 197) fails immediately (e.g., address already in use), the error is only logged and the indexer continues running without that API. This creates an operational risk: since Apex is positioned as a "drop-in replacement for celestia-node," a silently unavailable API layer could break client expectations without clear indication to operators.Note:
net.Listen()at line 190 is synchronous and correctly returns early on bind failure, but the subsequentServe()call lacks equivalent protection.Consider using error channels to propagate early startup failures back to the main goroutine so
runIndexercan abort if critical services fail to start.Sketch: propagate early startup errors
+ rpcErrCh := make(chan error, 1) go func() { log.Info().Str("addr", cfg.RPC.ListenAddr).Msg("JSON-RPC server listening") if err := httpSrv.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) { - log.Error().Err(err).Msg("JSON-RPC server error") + rpcErrCh <- err } }() + grpcErrCh := make(chan error, 1) go func() { log.Info().Str("addr", cfg.RPC.GRPCListenAddr).Msg("gRPC server listening") if err := grpcSrv.Serve(lis); err != nil { - log.Error().Err(err).Msg("gRPC server error") + grpcErrCh <- err } }()Then add early error detection (e.g.,
selecton channels with short timeout before coordinator starts).🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@cmd/apex/main.go` around lines 173 - 200, The HTTP and gRPC servers (httpSrv.ListenAndServe and grpcSrv.Serve) currently only log errors in their goroutines so immediate startup failures are swallowed; update runIndexer to create an error channel (e.g., startupErrCh) and have the goroutines send any returned error to that channel if ListenAndServe or Serve returns non-nil immediately; in the main runIndexer flow, select on startupErrCh (with optional short timeout) before proceeding to the rest of initialization and return the error if one is received so the process aborts on early bind/serve failures; ensure you still treat http.ErrServerClosed as non-fatal when deciding what to send on the channel.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@cmd/apex/main.go`:
- Around line 208-216: GracefulStop() on grpcSrv can hang forever for active
streaming RPCs; wrap the graceful shutdown in a goroutine and use a timeout
fallback that calls grpcSrv.Stop() if GracefulStop doesn't finish within the
same 5s window used by the HTTP shutdown. Specifically, start GracefulStop() in
a goroutine, wait on a done channel or select with time.After(5*time.Second) (or
reuse shutdownCtx) and call grpcSrv.Stop() on timeout to force termination;
reference grpcSrv.GracefulStop and grpcSrv.Stop in your changes.
In `@config/load.go`:
- Around line 115-117: Add the same presence validation for the JSON-RPC listen
address as for gRPC: check cfg.RPC.ListenAddr and return an error if empty
(e.g., fmt.Errorf("rpc.listen_addr is required")). Update the load/config
validation near the existing check for cfg.RPC.GRPCListenAddr so both
cfg.RPC.ListenAddr and cfg.RPC.GRPCListenAddr are validated before proceeding
(referencing symbols cfg.RPC.ListenAddr and cfg.RPC.GRPCListenAddr).
In `@pkg/api/grpc/header_service.go`:
- Around line 23-29: GetByHeight (and similarly LocalHead) currently maps
store.ErrNotFound to codes.Internal; update the error handling in GetByHeight
and LocalHead to check for store.ErrNotFound and return a gRPC NotFound status
(e.g., status.Errorf(codes.NotFound, "...: %v", err) or
status.Error(codes.NotFound, err)) instead of codes.Internal, preserving the
existing message format for other errors and leaving successful returns
unchanged; refer to the GetByHeight and LocalHead methods and the
store.ErrNotFound sentinel when making the change.
In `@pkg/api/grpc/server_test.go`:
- Around line 309-312: The spin-wait loop using notifier.SubscriberCount() lacks
an upper-bound and can hang tests; modify the wait in the test (around
notifier.SubscriberCount()) to use a bounded wait with a timeout—e.g., create a
deadline using time.After or a context with timeout and poll SubscriberCount()
in a loop that breaks on either SubscriberCount() > 0 or the timeout firing, and
fail the test (t.Fatalf or t.Fatalf-like assertion) if the timeout elapses
without a subscriber; ensure you update the test's waiting logic only in the
code that references notifier.SubscriberCount() and the surrounding stream
setup.
In `@pkg/api/jsonrpc/server.go`:
- Around line 16-20: The blob namespace currently registers only BlobHandler, so
BlobStubs' methods (GetCommitmentProof, Submit) that return user-friendly errors
are never exposed; either register BlobStubs alongside BlobHandler
(srv.Register("blob", &BlobStubs{}) or similar) or move/forward BlobStubs'
methods into BlobHandler (implement GetCommitmentProof and Submit on BlobHandler
to call the BlobStubs logic) so calls to blob.GetCommitmentProof and blob.Submit
return the intended errNotSupported/errReadOnly messages instead of "method not
found".
In `@pkg/api/notifier.go`:
- Around line 95-135: Publish currently takes a read lock (n.mu.RLock()) but
mutates sub.lastHeight, creating a latent data race; fix by either (A) making
lastHeight an atomic.Uint64 on the subscriber type and replace all reads/writes
of sub.lastHeight in Publish (and any other places) with atomic Load/Store
operations, or (B) change Publish to take a full write lock (replace
n.mu.RLock()/RUnlock() with n.mu.Lock()/Unlock()) so mutations to sub.lastHeight
are protected; refer to Publish, sub.lastHeight, n.mu.RLock/RUnlock and
n.mu.Lock/Unlock (or atomic.Uint64) to locate the changes.
In `@pkg/store/sqlite.go`:
- Around line 270-277: The Close method on SQLiteStore currently returns only
rErr and drops wErr when both fail; update SQLiteStore.Close to combine both
errors using errors.Join: call s.reader.Close() and s.writer.Close(), then if
both errors are non-nil return errors.Join(rErr, wErr), otherwise return
whichever is non-nil (or nil). Ensure the errors package is imported so
errors.Join can be used and keep function name SQLiteStore.Close and calls to
s.reader.Close and s.writer.Close unchanged.
In `@proto/apex/v1/header.proto`:
- Around line 10-29: The proto currently reuses Header and google.protobuf.Empty
across all RPCs which violates STANDARD lint rules (RPC_REQUEST_RESPONSE_UNIQUE
and RPC_*_STANDARD_NAME); update HeaderService by creating unique
request/response wrapper messages named after each RPC (e.g.,
GetByHeightRequest/GetByHeightResponse, LocalHeadRequest/LocalHeadResponse,
NetworkHeadRequest/NetworkHeadResponse, SubscribeRequest/SubscribeResponse) and
have each rpc use its corresponding wrapper (e.g., rpc
GetByHeight(GetByHeightRequest) returns (GetByHeightResponse); where
GetByHeightResponse contains the Header), adjust Subscribe to return stream
SubscribeResponse, stop using google.protobuf.Empty (remove its import) and
delete or rename existing GetHeaderRequest/SubscribeHeadersRequest to the new
RPC-specific types so each RPC has its own request and response type.
---
Nitpick comments:
In `@cmd/apex/main.go`:
- Around line 173-200: The HTTP and gRPC servers (httpSrv.ListenAndServe and
grpcSrv.Serve) currently only log errors in their goroutines so immediate
startup failures are swallowed; update runIndexer to create an error channel
(e.g., startupErrCh) and have the goroutines send any returned error to that
channel if ListenAndServe or Serve returns non-nil immediately; in the main
runIndexer flow, select on startupErrCh (with optional short timeout) before
proceeding to the rest of initialization and return the error if one is received
so the process aborts on early bind/serve failures; ensure you still treat
http.ErrServerClosed as non-fatal when deciding what to send on the channel.
In `@justfile`:
- Around line 39-41: The CI-equivalent check target currently only runs
golangci-lint and misses protobuf linting and generated-code checks; update the
check target to run buf lint (to catch STANDARD rules like the violation in
header.proto) and add a verification step that runs the proto target (or buf
generate) and fails if generated files in pkg/api/grpc/gen are out of date
(e.g., run just proto/buf generate then ensure no diffs). Modify the existing
check target to invoke "buf lint" and a generation-and-compare step so proto
lint errors and stale generated code cause the check to fail.
In `@pkg/api/grpc/blob_service.go`:
- Around line 23-42: The Get handler currently maps every error from
s.svc.Store().GetBlobs to codes.Internal; update Get to detect a not-found-style
error (compare err to store.ErrNotFound or use errors.Is(err,
store.ErrNotFound)) and return status.Error(codes.NotFound, err.Error()) in that
case, otherwise keep returning status.Errorf(codes.Internal, "get blobs: %v",
err); touch the Get function and its s.svc.Store().GetBlobs error handling and
use errors.Is/store.ErrNotFound when available.
- Around line 107-117: The conversion in blobToProto silently narrows types by
casting types.Blob.Index (int) to int32 for pb.Blob.Index; add an explicit
bounds check against math.MaxInt32 and math.MinInt32 and handle overflow instead
of truncating: update blobToProto to return (*pb.Blob, error), import math,
check if b.Index is outside int32 range and return a clear error when it is,
otherwise safely cast to int32 and return the proto; alternatively, if changing
the proto is preferred, change pb.Blob.Index to int64 and remove the
cast—reference the blobToProto function and the Index field when making the
change.
In `@pkg/api/grpc/server_test.go`:
- Around line 124-164: startTestServer and startTestHeaderServer duplicate setup
code for creating the gRPC server and connection; consolidate them into a single
helper (e.g., startTestClients or startTestServerWithClients) that calls
NewServer and reuses the same listener/connection setup, then either returns
both clients (pb.NewBlobServiceClient and pb.NewHeaderServiceClient) or accepts
a callback to receive the connection and construct clients there; ensure
t.Cleanup still calls srv.GracefulStop and conn.Close and replace usages of
startTestServer/startTestHeaderServer with the unified helper.
In `@pkg/api/grpc/server.go`:
- Around line 12-19: The gRPC server is created without production-ready options
in NewServer; update NewServer to configure sensible server options: set
MaxRecvMsgSize and MaxSendMsgSize via grpc.MaxRecvMsgSize(...) /
grpc.MaxSendMsgSize(...) to allow larger blobs, add recovery middleware as a
UnaryInterceptor/StreamInterceptor (e.g., using
grpc-ecosystem/go-grpc-middleware's recovery) to convert panics into Internal
errors, and register reflection (reflection.Register(srv)) so tools like grpcurl
can inspect services (ensure these options are applied before registering
BlobServiceServer and HeaderServiceServer).
In `@pkg/api/jsonrpc/stubs.go`:
- Around line 14-15: The type name BlobStubs is inconsistent with the other
singular stub types (ShareStub, FraudStub, BlobstreamStub); rename BlobStubs to
BlobStub and update all references/usages accordingly (constructors, variable
declarations, tests, and any method receivers tied to BlobStubs) so the type
name follows the singular convention like ShareStub, FraudStub, and
BlobstreamStub.
- Around line 9-12: Replace the sentinel errors errNotSupported and errReadOnly
in pkg/api/jsonrpc/stubs.go to use errors.New instead of fmt.Errorf (i.e.,
errNotSupported = errors.New("method not supported by apex indexer") and
errReadOnly = errors.New("apex is a read-only indexer, blob submission not
supported")), and update imports to remove "fmt" and add "errors" (or keep "fmt"
only if used elsewhere).
In `@pkg/api/service_test.go`:
- Around line 16-114: The three duplicated mocks (mockStore, mockFetcher, and
testNamespace) should be moved into a single internal test helper package (e.g.,
apitest or testutil) and imported by the three test packages: create a new
package that exposes the constructors and types (e.g., NewMockStore,
NewMockFetcher, TestNamespace) and move the implementations of mockStore,
mockFetcher, Get/Put methods, and testNamespace there, update existing tests to
import and use those constructors instead of local duplicates, and ensure
exported names match usage (or use package-level helpers) so the
Store/DataFetcher interface implementations remain identical and compile without
changing production code.
In `@pkg/sync/coordinator.go`:
- Around line 19-21: HeightObserver currently takes a redundant height uint64
plus header *types.Header and blobs []types.Blob, which can cause inconsistency;
change the type signature of HeightObserver to accept only (header
*types.Header, blobs []types.Blob), update all callers/registerers to stop
passing the separate height value and instead use header.Height where needed,
and adjust any implementations of HeightObserver to the new two-argument form
(search for the HeightObserver type definition and all references to it).
In `@proto/apex/v1/blob.proto`:
- Around line 10-19: Buf lint requires RPC request/response names to follow the
RPCNameRequest/RPCNameResponse pattern and the Get RPC must return a response
wrapper instead of a raw Blob; rename and adjust messages accordingly: rename
GetBlobRequest -> GetRequest and replace Get's response Blob with a wrapper
message GetResponse that contains a Blob field; rename GetAllBlobsRequest ->
GetAllRequest and GetAllBlobsResponse -> GetAllResponse (preserve fields), and
rename SubscribeBlobsRequest -> SubscribeRequest and the streamed response type
to SubscribeResponse (or SubscribeStreamingResponse) to follow RPCNameResponse
naming; update the service RPC signatures (Get, GetAll, Subscribe) to reference
the new message names and update all code/refs that use the old message names.
SQLite WAL readers don't benefit beyond ~8 connections. Hardcode the pool ceiling and derive the size from runtime.NumCPU() instead of exposing a config knob that nobody needs to tune. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…pping - Fix proto lint: unique request/response types per RPC (buf STANDARD) - Add limit/offset pagination to gRPC GetAll (store + proto) - Restrict gRPC Subscribe to single namespace - Fix Notifier data race: Publish uses write lock instead of read lock - Map store.ErrNotFound to gRPC codes.NotFound in header service - Move BlobStubs methods onto BlobHandler for proper registration - Use errors.Join in SQLiteStore.Close - Add rpc.listen_addr validation - Add timeout to gRPC GracefulStop (5s fallback to Stop) - Add deadline to spin-wait in subscribe test - Cap gRPC GetAll namespaces at 16 (JSON-RPC uncapped for compat) Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
There was a problem hiding this comment.
Actionable comments posted: 4
🧹 Nitpick comments (9)
config/config.go (1)
32-35: Consider making the gRPC namespace cap configurable.The PR objectives note that
GetAllcaps gRPC requests at 16 namespaces (JSON-RPC is uncapped). Hardcoding this limit in the API layer rather than exposing it throughRPCConfig(e.g.,MaxNamespaces int) makes the value invisible to operators and requires a code change to tune. Given the asymmetry with JSON-RPC, surfacing it here as a configurable field with a documented default would improve operational flexibility.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@config/config.go` around lines 32 - 35, The RPC gRPC namespace cap is hardcoded in the API layer (affecting GetAll) and should be exposed in RPCConfig so operators can tune it; add a MaxNamespaces int field to RPCConfig with a sensible default (documented in comments/README), wire that config through where GetAll enforces the 16-namespace limit (e.g., pass or read RPCConfig.MaxNamespaces in the code path that implements GetAll) and ensure any existing hardcoded constant (16) is replaced by using RPCConfig.MaxNamespaces.pkg/api/grpc/server_test.go (2)
21-121:mockStore,mockFetcher, andtestNamespaceare verbatim copies of the equivalents inpkg/api/jsonrpc/server_test.go.Extracting these into a shared internal test package (e.g.,
pkg/api/internal/testutil) would eliminate the duplication. Additionally,startTestServerandstartTestHeaderServer(Lines 123–163) are nearly identical — a single generic helper or a combined return struct could consolidate them.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pkg/api/grpc/server_test.go` around lines 21 - 121, Duplicate test helpers (mockStore, mockFetcher, testNamespace) and nearly identical startTestServer/startTestHeaderServer should be extracted to a shared internal test package; create pkg/api/internal/testutil with exported helpers NewMockStore, NewMockFetcher, TestNamespace and a unified StartTestServers (or StartTestServerPair) that returns the servers/clients needed, then replace the local definitions in both pkg/api/grpc/server_test.go and pkg/api/jsonrpc/server_test.go to import and use those helpers (reference symbols: mockStore, mockFetcher, testNamespace, startTestServer, startTestHeaderServer).
165-288:TestGRPCBlobGet,TestGRPCBlobGetAll, and the three header tests can be consolidated into table-driven tests.All five tests follow the same seed-service-call-assert pattern. A table-driven approach would reduce boilerplate and is required by the project's test guidelines.
As per coding guidelines, "Use table-driven tests pattern for test implementation".
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pkg/api/grpc/server_test.go` around lines 165 - 288, Several tests (TestGRPCBlobGet, TestGRPCBlobGetAll, TestGRPCHeaderGetByHeight, TestGRPCHeaderLocalHead, TestGRPCHeaderNetworkHead) repeat the same seed → service → client call → assert pattern; convert them to table-driven subtests. Create a table (slice of test cases) where each case has a name, seed data (e.g., blobs or headers), the request to send, which server starter to use (startTestServer or startTestHeaderServer), and expected assertions (expected height, data, count, etc.), then loop with t.Run(c.name, func(t *testing.T){ set up newMockStore(), seed st using c.seed (refer to st.blobs and st.headers), create notifier and svc via api.NewService, start client via the appropriate helper, call the RPC (client.Get, GetAll, GetByHeight, LocalHead, NetworkHead) and run the existing checks for that case }); factor repeated setup into small helpers (e.g., makeServiceAndClient or seedStore) referenced by function names in the diff to keep tests concise and readable.pkg/api/jsonrpc/server_test.go (2)
21-121:mockStore,mockFetcher, andtestNamespaceare duplicated verbatim inpkg/api/grpc/server_test.go.Both test packages maintain identical copies of these three helpers. Consider extracting them into a shared internal test package (e.g.,
pkg/api/internal/testutil) so future changes (e.g., adding a new Store method) only need to be made once.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pkg/api/jsonrpc/server_test.go` around lines 21 - 121, Extract the duplicated test helpers (mockStore type and its methods, mockFetcher type and its methods, and testNamespace) into a shared internal test package (e.g., pkg/api/internal/testutil) and replace the copies in pkg/api/jsonrpc/server_test.go and pkg/api/grpc/server_test.go with imports from that package; move all related helper functions and types into that package, keep signatures unchanged (mockStore, mockFetcher, testNamespace), update the test files to import testutil and reference testutil.mockStore/testutil.mockFetcher/testutil.testNamespace (or exported names if you choose to export), and run tests to ensure no missing methods (export any symbols needed by both packages).
179-241:TestJSONRPCHeaderGetByHeight/LocalHead/NetworkHeadcan be consolidated into a table-driven test.The three header tests follow the same structure: seed store, build service, call RPC, check result. Grouping them under a
TestJSONRPCHeaderMethodstable test would reduce boilerplate and align with the project's table-driven tests guideline (already followed byTestJSONRPCStubMethods).As per coding guidelines, "Use table-driven tests pattern for test implementation".
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pkg/api/jsonrpc/server_test.go` around lines 179 - 241, Consolidate the three similar tests (TestJSONRPCHeaderGetByHeight, TestJSONRPCHeaderLocalHead, TestJSONRPCHeaderNetworkHead) into a single table-driven test (e.g., TestJSONRPCHeaderMethods) that iterates cases with fields: name, setup (store headers/syncState or mockFetcher.networkHead), rpcMethod (e.g., "header.GetByHeight", "header.LocalHead", "header.NetworkHead"), rpcParams (uint64(42) only for GetByHeight), and expectedRawJSON. For each case create the store via newMockStore or mockFetcher as needed, build service with api.NewService(st, ft, ..., notifier, zerolog.Nop()) and server with NewServer, call doRPC(srv, rpcMethod, params...), and assert resp.Error is nil and string(resp.Result) equals expectedRawJSON; this removes duplicated boilerplate while keeping the same assertions and setups.pkg/store/sqlite_test.go (1)
105-105: Consider adding pagination test coverage for non-zerolimit/offset.All three updated call sites pass
0, 0(no-limit, no-offset), leaving the new pagination feature entirely untested at the SQLite level. A table-driven test verifyinglimit > 0andoffset > 0semantics would confirm correct SQL construction and results.Also applies to: 152-152, 275-275
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pkg/store/sqlite_test.go` at line 105, The tests in sqlite_test.go call s.GetBlobs(ctx, ns, 10, 11, 0, 0) only with limit=0/offset=0, so pagination SQL is untested; add a table-driven subtest in the same file that calls s.GetBlobs with a variety of limit>0 and offset>0 combinations (e.g. limit=2 offset=0, limit=2 offset=1, limit larger than remaining, offset at end) and assert err==nil, the number of returned blobs matches expected, and the returned blob ordering/IDs match the expected slice for each case; reference the s.GetBlobs method and reuse existing fixture data inserted earlier in the test to compute expected results so the assertions validate that limit/offset are applied correctly.proto/apex/v1/blob.proto (1)
31-38: Consider usinguint32for pagination fields to prevent negative values at the wire level.
int32allows negativelimitandoffsetvalues from clients. While the server should validate, usinguint32would make the contract self-documenting and prevent a class of invalid requests.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@proto/apex/v1/blob.proto` around lines 31 - 38, Change the pagination fields in the GetAllBlobsRequest message so they use unsigned types to prevent negative values at the wire level: update the fields limit and offset from int32 to uint32 in message GetAllBlobsRequest, regenerate protobuf artifacts, and then update any call sites, validators, and handlers that assume signed ints (e.g., request parsing in RPC handlers that reference GetAllBlobsRequest.limit or .offset) to use the new unsigned type or cast/validate as needed.pkg/api/grpc/blob_service.go (1)
120-127: DuplicatebytesToNamespacehelper across packages.This exact function also exists in
pkg/api/jsonrpc/blob.go(Line 96). Consider extracting it to a shared location (e.g.,pkg/types) to avoid drift.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pkg/api/grpc/blob_service.go` around lines 120 - 127, The bytesToNamespace helper is duplicated; extract the logic from the bytesToNamespace function into a single shared utility in a common package (e.g., add a new exported function BytesToNamespace in pkg/types or an existing pkg/types file) and replace both bytesToNamespace implementations (the one in blob_service.go and the one in pkg/api/jsonrpc/blob.go) to call that shared BytesToNamespace helper; ensure the new function preserves the same signature/behavior (checking len == types.NamespaceSize, copying into types.Namespace, returning error) and update imports where needed.pkg/store/sqlite.go (1)
75-86: Configure PRAGMAs via DSN to ensure all connections inherit the settings.
database/sqlmanages a connection pool, andExec("PRAGMA ...")applies settings only to the connection that executes it. New connections from the pool won't inheritbusy_timeoutorforeign_keys(which are per-connection settings).journal_mode=WALis database-level and persists, so it's unaffected; however, the reader pool will only havebusy_timeoutandforeign_keyson the first connection used.While WAL mode mitigates practical risk (readers rarely encounter
SQLITE_BUSYandforeign_keysonly affects writes), usemodernc.org/sqlite's DSN_pragmaparameter for correctness:file:path?_pragma=journal_mode(WAL)&_pragma=busy_timeout(5000)&_pragma=foreign_keys(1)This ensures all connections are properly configured without needing
configureSQLite()for per-connection settings.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pkg/store/sqlite.go` around lines 75 - 86, The configureSQLite function currently sets PRAGMAs with db.Exec which only affects the single connection used; instead add the per-connection PRAGMA settings to the SQLite DSN using modernc.org/sqlite's _pragma parameters so every pooled connection inherits them (e.g. _pragma=busy_timeout(5000)&_pragma=foreign_keys(1) and _pragma=journal_mode(WAL) if desired), and simplify or remove the db.Exec calls in configureSQLite; update the codepath that opens the DB (the caller that constructs the DSN before sql.Open) to include these _pragma entries so configureSQLite no longer relies on per-connection Exec to set busy_timeout and foreign_keys.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@cmd/apex/main.go`:
- Around line 188-200: If net.Listen for gRPC (cfg.RPC.GRPCListenAddr) fails,
the previously started HTTP server goroutine must be shut down to avoid leaking
sockets/goroutines; modify the error path after net.Listen to call the HTTP
server's shutdown routine (e.g., httpSrv.Shutdown with a context timeout) and
handle/log any shutdown error before returning the net.Listen error. Locate the
HTTP server instance started earlier (httpSrv or similar) and use
context.WithTimeout to perform a graceful shutdown, optionally using a
sync.WaitGroup or channel to wait for the HTTP goroutine to exit before
returning.
In `@pkg/api/grpc/blob_service.go`:
- Around line 59-66: The current loop calls s.svc.Store().GetBlobs per namespace
applying req.Limit and req.Offset per-namespace; change this to collect all
results first (append to allBlobs inside the loop without passing per-namespace
limit/offset), then apply global pagination using req.Offset and req.Limit on
the aggregated allBlobs slice (use safe bounds checks when slicing). Make the
same change in BlobGetAll (service.go) so both gRPC and JSON-RPC return a
globally paginated result and ensure error handling and any total-count
reporting are updated accordingly.
In `@pkg/api/service.go`:
- Around line 58-82: BlobGetAll currently passes the per-request limit and
offset into each s.store.GetBlobs call, causing pagination to be applied
per-namespace; instead fetch all matching blobs per namespace (or fetch with
no/very large paging) into allBlobs, then apply the requested offset and limit
to the aggregated allBlobs slice before marshalling and returning. Update
BlobGetAll to call s.store.GetBlobs without passing the incoming limit/offset
(or use a sentinel to request all), concatenate into allBlobs, compute start :=
min(offset, len(allBlobs)) and end := min(start+limit, len(allBlobs)) (handling
zero/unbounded limit), slice allBlobs[start:end], then marshal that sliced
result; keep the existing error handling and use MarshalBlob for each item.
In `@proto/apex/v1/header.proto`:
- Line 21: The RPC Subscribe (rpc Subscribe) currently uses wrapper message
types named SubscribeHeadersRequest and SubscribeHeadersResponse which violates
Buf's RPC_*_STANDARD_NAME rule; rename those messages to follow the RPC naming
convention (e.g., SubscribeRequest and SubscribeResponse or prefixed with the
service name like HeaderServiceSubscribeRequest/HeaderServiceSubscribeResponse),
update the rpc signature (rpc Subscribe(SubscribeRequest) returns (stream
SubscribeResponse);) and update all references/usages of SubscribeHeadersRequest
and SubscribeHeadersResponse throughout the proto so the names match the new
message identifiers.
---
Duplicate comments:
In `@pkg/store/sqlite.go`:
- Around line 281-283: Update SQLiteStore.Close to combine both pool Close
errors using errors.Join by calling errors.Join(s.reader.Close(),
s.writer.Close()) in the SQLiteStore.Close method; ensure the "errors" package
is imported and add nil checks for s.reader and s.writer (call Close only if
non-nil) to avoid nil dereferences before joining their returned errors.
---
Nitpick comments:
In `@config/config.go`:
- Around line 32-35: The RPC gRPC namespace cap is hardcoded in the API layer
(affecting GetAll) and should be exposed in RPCConfig so operators can tune it;
add a MaxNamespaces int field to RPCConfig with a sensible default (documented
in comments/README), wire that config through where GetAll enforces the
16-namespace limit (e.g., pass or read RPCConfig.MaxNamespaces in the code path
that implements GetAll) and ensure any existing hardcoded constant (16) is
replaced by using RPCConfig.MaxNamespaces.
In `@pkg/api/grpc/blob_service.go`:
- Around line 120-127: The bytesToNamespace helper is duplicated; extract the
logic from the bytesToNamespace function into a single shared utility in a
common package (e.g., add a new exported function BytesToNamespace in pkg/types
or an existing pkg/types file) and replace both bytesToNamespace implementations
(the one in blob_service.go and the one in pkg/api/jsonrpc/blob.go) to call that
shared BytesToNamespace helper; ensure the new function preserves the same
signature/behavior (checking len == types.NamespaceSize, copying into
types.Namespace, returning error) and update imports where needed.
In `@pkg/api/grpc/server_test.go`:
- Around line 21-121: Duplicate test helpers (mockStore, mockFetcher,
testNamespace) and nearly identical startTestServer/startTestHeaderServer should
be extracted to a shared internal test package; create pkg/api/internal/testutil
with exported helpers NewMockStore, NewMockFetcher, TestNamespace and a unified
StartTestServers (or StartTestServerPair) that returns the servers/clients
needed, then replace the local definitions in both pkg/api/grpc/server_test.go
and pkg/api/jsonrpc/server_test.go to import and use those helpers (reference
symbols: mockStore, mockFetcher, testNamespace, startTestServer,
startTestHeaderServer).
- Around line 165-288: Several tests (TestGRPCBlobGet, TestGRPCBlobGetAll,
TestGRPCHeaderGetByHeight, TestGRPCHeaderLocalHead, TestGRPCHeaderNetworkHead)
repeat the same seed → service → client call → assert pattern; convert them to
table-driven subtests. Create a table (slice of test cases) where each case has
a name, seed data (e.g., blobs or headers), the request to send, which server
starter to use (startTestServer or startTestHeaderServer), and expected
assertions (expected height, data, count, etc.), then loop with t.Run(c.name,
func(t *testing.T){ set up newMockStore(), seed st using c.seed (refer to
st.blobs and st.headers), create notifier and svc via api.NewService, start
client via the appropriate helper, call the RPC (client.Get, GetAll,
GetByHeight, LocalHead, NetworkHead) and run the existing checks for that case
}); factor repeated setup into small helpers (e.g., makeServiceAndClient or
seedStore) referenced by function names in the diff to keep tests concise and
readable.
In `@pkg/api/jsonrpc/server_test.go`:
- Around line 21-121: Extract the duplicated test helpers (mockStore type and
its methods, mockFetcher type and its methods, and testNamespace) into a shared
internal test package (e.g., pkg/api/internal/testutil) and replace the copies
in pkg/api/jsonrpc/server_test.go and pkg/api/grpc/server_test.go with imports
from that package; move all related helper functions and types into that
package, keep signatures unchanged (mockStore, mockFetcher, testNamespace),
update the test files to import testutil and reference
testutil.mockStore/testutil.mockFetcher/testutil.testNamespace (or exported
names if you choose to export), and run tests to ensure no missing methods
(export any symbols needed by both packages).
- Around line 179-241: Consolidate the three similar tests
(TestJSONRPCHeaderGetByHeight, TestJSONRPCHeaderLocalHead,
TestJSONRPCHeaderNetworkHead) into a single table-driven test (e.g.,
TestJSONRPCHeaderMethods) that iterates cases with fields: name, setup (store
headers/syncState or mockFetcher.networkHead), rpcMethod (e.g.,
"header.GetByHeight", "header.LocalHead", "header.NetworkHead"), rpcParams
(uint64(42) only for GetByHeight), and expectedRawJSON. For each case create the
store via newMockStore or mockFetcher as needed, build service with
api.NewService(st, ft, ..., notifier, zerolog.Nop()) and server with NewServer,
call doRPC(srv, rpcMethod, params...), and assert resp.Error is nil and
string(resp.Result) equals expectedRawJSON; this removes duplicated boilerplate
while keeping the same assertions and setups.
In `@pkg/store/sqlite_test.go`:
- Line 105: The tests in sqlite_test.go call s.GetBlobs(ctx, ns, 10, 11, 0, 0)
only with limit=0/offset=0, so pagination SQL is untested; add a table-driven
subtest in the same file that calls s.GetBlobs with a variety of limit>0 and
offset>0 combinations (e.g. limit=2 offset=0, limit=2 offset=1, limit larger
than remaining, offset at end) and assert err==nil, the number of returned blobs
matches expected, and the returned blob ordering/IDs match the expected slice
for each case; reference the s.GetBlobs method and reuse existing fixture data
inserted earlier in the test to compute expected results so the assertions
validate that limit/offset are applied correctly.
In `@pkg/store/sqlite.go`:
- Around line 75-86: The configureSQLite function currently sets PRAGMAs with
db.Exec which only affects the single connection used; instead add the
per-connection PRAGMA settings to the SQLite DSN using modernc.org/sqlite's
_pragma parameters so every pooled connection inherits them (e.g.
_pragma=busy_timeout(5000)&_pragma=foreign_keys(1) and _pragma=journal_mode(WAL)
if desired), and simplify or remove the db.Exec calls in configureSQLite; update
the codepath that opens the DB (the caller that constructs the DSN before
sql.Open) to include these _pragma entries so configureSQLite no longer relies
on per-connection Exec to set busy_timeout and foreign_keys.
In `@proto/apex/v1/blob.proto`:
- Around line 31-38: Change the pagination fields in the GetAllBlobsRequest
message so they use unsigned types to prevent negative values at the wire level:
update the fields limit and offset from int32 to uint32 in message
GetAllBlobsRequest, regenerate protobuf artifacts, and then update any call
sites, validators, and handlers that assume signed ints (e.g., request parsing
in RPC handlers that reference GetAllBlobsRequest.limit or .offset) to use the
new unsigned type or cast/validate as needed.
| // Start gRPC server. | ||
| grpcSrv := grpcapi.NewServer(svc, log.Logger) | ||
| lis, err := net.Listen("tcp", cfg.RPC.GRPCListenAddr) | ||
| if err != nil { | ||
| return fmt.Errorf("listen gRPC: %w", err) | ||
| } | ||
|
|
||
| go func() { | ||
| log.Info().Str("addr", cfg.RPC.GRPCListenAddr).Msg("gRPC server listening") | ||
| if err := grpcSrv.Serve(lis); err != nil { | ||
| log.Error().Err(err).Msg("gRPC server error") | ||
| } | ||
| }() |
There was a problem hiding this comment.
HTTP server left running if gRPC listener setup fails.
If net.Listen on Line 190 fails, the function returns an error immediately, but the HTTP server started at Line 181 is never shut down. This leaks the goroutine and the listening socket.
Proposed fix
lis, err := net.Listen("tcp", cfg.RPC.GRPCListenAddr)
if err != nil {
+ // Shut down the already-started HTTP server before returning.
+ _ = httpSrv.Close()
return fmt.Errorf("listen gRPC: %w", err)
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| // Start gRPC server. | |
| grpcSrv := grpcapi.NewServer(svc, log.Logger) | |
| lis, err := net.Listen("tcp", cfg.RPC.GRPCListenAddr) | |
| if err != nil { | |
| return fmt.Errorf("listen gRPC: %w", err) | |
| } | |
| go func() { | |
| log.Info().Str("addr", cfg.RPC.GRPCListenAddr).Msg("gRPC server listening") | |
| if err := grpcSrv.Serve(lis); err != nil { | |
| log.Error().Err(err).Msg("gRPC server error") | |
| } | |
| }() | |
| // Start gRPC server. | |
| grpcSrv := grpcapi.NewServer(svc, log.Logger) | |
| lis, err := net.Listen("tcp", cfg.RPC.GRPCListenAddr) | |
| if err != nil { | |
| // Shut down the already-started HTTP server before returning. | |
| _ = httpSrv.Close() | |
| return fmt.Errorf("listen gRPC: %w", err) | |
| } | |
| go func() { | |
| log.Info().Str("addr", cfg.RPC.GRPCListenAddr).Msg("gRPC server listening") | |
| if err := grpcSrv.Serve(lis); err != nil { | |
| log.Error().Err(err).Msg("gRPC server error") | |
| } | |
| }() |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@cmd/apex/main.go` around lines 188 - 200, If net.Listen for gRPC
(cfg.RPC.GRPCListenAddr) fails, the previously started HTTP server goroutine
must be shut down to avoid leaking sockets/goroutines; modify the error path
after net.Listen to call the HTTP server's shutdown routine (e.g.,
httpSrv.Shutdown with a context timeout) and handle/log any shutdown error
before returning the net.Listen error. Locate the HTTP server instance started
earlier (httpSrv or similar) and use context.WithTimeout to perform a graceful
shutdown, optionally using a sync.WaitGroup or channel to wait for the HTTP
goroutine to exit before returning.
- Close HTTP server if gRPC listener setup fails (prevents goroutine leak) - Apply limit/offset pagination to aggregate result across all namespaces instead of per-namespace (fixes incorrect multi-namespace pagination) - Rename proto request/response types to pass buf DEFAULT lint: GetBlobRequest→GetRequest, GetAllBlobsRequest→GetAllRequest, SubscribeBlobsRequest→BlobServiceSubscribeRequest, SubscribeHeadersRequest→HeaderServiceSubscribeRequest (and responses) - Fix buf.yaml lint category from STANDARD (invalid) to DEFAULT Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
There was a problem hiding this comment.
Actionable comments posted: 1
🧹 Nitpick comments (5)
pkg/api/grpc/server_test.go (2)
123-163: Consider consolidating the two nearly identical test helpers.
startTestServerandstartTestHeaderServershare all server-setup/teardown logic and differ only in the returned client type. A single generic helper (or one that returns*grpc.ClientConn) would eliminate the duplication.♻️ Example: single helper returning the connection
+func startTestGRPCServer(t *testing.T, svc *api.Service) *grpc.ClientConn { + t.Helper() + srv := NewServer(svc, zerolog.Nop()) + lis, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + t.Fatalf("listen: %v", err) + } + go func() { _ = srv.Serve(lis) }() + t.Cleanup(func() { srv.GracefulStop() }) + conn, err := grpc.NewClient(lis.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials())) + if err != nil { + t.Fatalf("dial: %v", err) + } + t.Cleanup(func() { _ = conn.Close() }) + return conn +}Then in each test:
conn := startTestGRPCServer(t, svc) client := pb.NewBlobServiceClient(conn) // or pb.NewHeaderServiceClient(conn)🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pkg/api/grpc/server_test.go` around lines 123 - 163, Consolidate duplicated test setup in startTestServer and startTestHeaderServer by creating a single helper (e.g., startTestGRPCServer) that performs NewServer(svc,...), listens, serves, registers t.Cleanup for srv.GracefulStop and conn.Close, and returns the grpc.ClientConn (or *grpc.ClientConn) instead of a specific typed client; then update callers to call pb.NewBlobServiceClient(conn) or pb.NewHeaderServiceClient(conn) as needed (replace references to startTestServer and startTestHeaderServer with the new startTestGRPCServer).
165-217: Tests cover only the happy path — consider adding error-path scenarios.
TestGRPCBlobGetandTestGRPCBlobGetAllexercise only the success case. Table-driven sub-tests for "not found", "invalid namespace", and "empty namespaces" would improve confidence and align with the project's table-driven test guideline.As per coding guidelines:
**/*_test.go: "Use table-driven tests pattern for test implementation".🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pkg/api/grpc/server_test.go` around lines 165 - 217, Add table-driven subtests to TestGRPCBlobGet and TestGRPCBlobGetAll to cover error paths: for TestGRPCBlobGet add cases for "not found" (no blob at height/commitment), "invalid namespace" (bad length), and "bad commitment" and assert returned error; for TestGRPCBlobGetAll add cases for "empty namespaces" (expect error or empty result per API), "no matching blobs" and "invalid namespace" and assert proper error/empty response. Use newMockStore to set up st.blobs per case, reuse testNamespace() for valid namespaces, and call client.Get / client.GetAll inside subtests to verify responses and errors instead of only the happy path.proto/apex/v1/blob.proto (1)
31-38: Consider usinguint32(orint32with explicit validation) for pagination fields.
int32allows negative values forlimitandoffset. The server treats negatives the same as zero (no-op), which is safe but potentially confusing. If negative values are never meaningful,uint32would express the intent more clearly at the schema level. Low priority — current server-side handling is safe.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@proto/apex/v1/blob.proto` around lines 31 - 38, The pagination fields in message GetAllRequest (fields limit and offset) are currently int32 and thus allow negative values; change their types to uint32 to express non-negativity at the schema level (or keep int32 but add explicit validation in the server-side handlers that reject negative values) — update the proto definition for GetAllRequest by replacing "int32 limit" and "int32 offset" with "uint32 limit" and "uint32 offset" (and regenerate stubs), or alternatively add validation logic in the RPC handling code that checks GetAllRequest.limit and GetAllRequest.offset and treats negatives as errors/normalizes them to 0.pkg/api/service.go (1)
159-167: ExposingStore()andFetcher()accessors enables the logic duplication flagged inblob_service.go.The gRPC handler bypasses
Servicemethods and callsStore().GetBlobs(...)directly because theServicemethods returnjson.RawMessage. Consider adding typed (non-JSON) methods onServicefor blob/header retrieval, so gRPC handlers can share the same business logic without accessing the store directly. This would let you remove or narrow theStore()/Fetcher()accessors.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pkg/api/service.go` around lines 159 - 167, The public Store() and Fetcher() accessors let callers bypass Service business logic (see blob_service.go) — add typed, non-JSON methods on Service such as GetBlobs(ctx context.Context, ids []string) ([]Blob, error) and GetHeaders(ctx context.Context, filter HeaderFilter) ([]Header, error) that encapsulate the existing Store().GetBlobs/... and Fetcher usage and return concrete types; update the gRPC handlers to call these new Service methods instead of Store().GetBlobs(...) and Fetcher(), then remove or make Service.Store() and Service.Fetcher() private to prevent direct store/fetcher access.pkg/api/grpc/blob_service.go (1)
24-42: Extract shared blob-matching logic to reduce duplication.The gRPC handler re-implements business logic that also exists in the service layer:
- BlobGet duplication (lines 30-39): The commit matching loop (
fetch blobs → iterate → match commitment) is identical toService.BlobGetin service.go, differing only in return type.- BlobGetAll pagination duplication (lines 69-78): The pagination logic (apply offset/limit to aggregate results) is identical to
Service.BlobGetAllin service.go.If the matching algorithm or pagination logic evolves, both code paths require updates. Consider extracting a shared method on
Servicethat accepts a callback or returns the typed blob directly, allowing both the JSON-RPC and gRPC handlers to delegate to a single source of truth.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pkg/api/grpc/blob_service.go` around lines 24 - 42, The Get handler duplicates matching logic present in Service.BlobGet; instead of re-implementing the fetch/iterate/match loop in BlobServiceServer.Get, delegate to the service layer by adding or using a shared Service method (e.g., Service.BlobGet / Service.FindBlobByCommitment) that accepts context, namespace, commitment and height and returns the matching typed blob (or nil/error); then have BlobServiceServer.Get call that method and convert the returned blob with blobToProto. Do the same for pagination: replace the inline offset/limit logic with a call to Service.BlobGetAll (or a new Service.PaginateBlobs helper) so both JSON-RPC and gRPC use the single implementation. Ensure the new/used Service method signatures match existing symbols (Service.BlobGet, Service.BlobGetAll) so handlers can delegate without duplicating logic.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@pkg/api/service.go`:
- Around line 170-179: MarshalBlob currently omits the Blob.Height and
Blob.Signer fields, causing a mismatch with blobToProto and types.Blob; update
the map in MarshalBlob (function MarshalBlob) to include "height": b.Height and
"signer": b.Signer so the JSON mirrors blobToProto/types.Blob (or, if omission
is intentional, add a clear comment in MarshalBlob explaining why those fields
are excluded). Ensure the keys match naming style ("height", "signer") and use
the raw b.Height and b.Signer values as in the types.Blob definition.
---
Nitpick comments:
In `@pkg/api/grpc/blob_service.go`:
- Around line 24-42: The Get handler duplicates matching logic present in
Service.BlobGet; instead of re-implementing the fetch/iterate/match loop in
BlobServiceServer.Get, delegate to the service layer by adding or using a shared
Service method (e.g., Service.BlobGet / Service.FindBlobByCommitment) that
accepts context, namespace, commitment and height and returns the matching typed
blob (or nil/error); then have BlobServiceServer.Get call that method and
convert the returned blob with blobToProto. Do the same for pagination: replace
the inline offset/limit logic with a call to Service.BlobGetAll (or a new
Service.PaginateBlobs helper) so both JSON-RPC and gRPC use the single
implementation. Ensure the new/used Service method signatures match existing
symbols (Service.BlobGet, Service.BlobGetAll) so handlers can delegate without
duplicating logic.
In `@pkg/api/grpc/server_test.go`:
- Around line 123-163: Consolidate duplicated test setup in startTestServer and
startTestHeaderServer by creating a single helper (e.g., startTestGRPCServer)
that performs NewServer(svc,...), listens, serves, registers t.Cleanup for
srv.GracefulStop and conn.Close, and returns the grpc.ClientConn (or
*grpc.ClientConn) instead of a specific typed client; then update callers to
call pb.NewBlobServiceClient(conn) or pb.NewHeaderServiceClient(conn) as needed
(replace references to startTestServer and startTestHeaderServer with the new
startTestGRPCServer).
- Around line 165-217: Add table-driven subtests to TestGRPCBlobGet and
TestGRPCBlobGetAll to cover error paths: for TestGRPCBlobGet add cases for "not
found" (no blob at height/commitment), "invalid namespace" (bad length), and
"bad commitment" and assert returned error; for TestGRPCBlobGetAll add cases for
"empty namespaces" (expect error or empty result per API), "no matching blobs"
and "invalid namespace" and assert proper error/empty response. Use newMockStore
to set up st.blobs per case, reuse testNamespace() for valid namespaces, and
call client.Get / client.GetAll inside subtests to verify responses and errors
instead of only the happy path.
In `@pkg/api/service.go`:
- Around line 159-167: The public Store() and Fetcher() accessors let callers
bypass Service business logic (see blob_service.go) — add typed, non-JSON
methods on Service such as GetBlobs(ctx context.Context, ids []string) ([]Blob,
error) and GetHeaders(ctx context.Context, filter HeaderFilter) ([]Header,
error) that encapsulate the existing Store().GetBlobs/... and Fetcher usage and
return concrete types; update the gRPC handlers to call these new Service
methods instead of Store().GetBlobs(...) and Fetcher(), then remove or make
Service.Store() and Service.Fetcher() private to prevent direct store/fetcher
access.
In `@proto/apex/v1/blob.proto`:
- Around line 31-38: The pagination fields in message GetAllRequest (fields
limit and offset) are currently int32 and thus allow negative values; change
their types to uint32 to express non-negativity at the schema level (or keep
int32 but add explicit validation in the server-side handlers that reject
negative values) — update the proto definition for GetAllRequest by replacing
"int32 limit" and "int32 offset" with "uint32 limit" and "uint32 offset" (and
regenerate stubs), or alternatively add validation logic in the RPC handling
code that checks GetAllRequest.limit and GetAllRequest.offset and treats
negatives as errors/normalizes them to 0.
| func MarshalBlob(b *types.Blob) json.RawMessage { | ||
| m := map[string]any{ | ||
| "namespace": b.Namespace[:], | ||
| "data": b.Data, | ||
| "share_version": b.ShareVersion, | ||
| "commitment": b.Commitment, | ||
| "index": b.Index, | ||
| } | ||
| raw, _ := json.Marshal(m) //nolint:errcheck | ||
| return raw |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Check types.Blob struct fields to understand what's available
ast-grep --pattern $'type Blob struct {
$$$
}'
# Also search for Blob definition in a more flexible way
fd -e go . | xargs grep -l "type Blob struct" | head -5Repository: evstack/apex
Length of output: 1896
🏁 Script executed:
#!/bin/bash
# Check if Signer is used/referenced elsewhere in JSON context
rg -n 'signer' --type=go -g '!**/gen/**' -i | head -20
# Check the blobToProto function mentioned in the comment
rg -n 'blobToProto' --type=go -A 20Repository: evstack/apex
Length of output: 5305
🏁 Script executed:
#!/bin/bash
# Let's look at the actual service.go file around the MarshalBlob function
cat -n pkg/api/service.go | head -200 | tail -80Repository: evstack/apex
Length of output: 3216
Include height and signer fields in MarshalBlob to match blobToProto and types.Blob definition.
The types.Blob struct has Height and Signer fields (visible in pkg/types/types.go), and blobToProto includes both when converting to the proto message. However, MarshalBlob omits these fields. This inconsistency may cause issues for upstream clients expecting a complete blob representation. Either add these fields to the marshaled JSON or add a comment explaining why they're intentionally excluded.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@pkg/api/service.go` around lines 170 - 179, MarshalBlob currently omits the
Blob.Height and Blob.Signer fields, causing a mismatch with blobToProto and
types.Blob; update the map in MarshalBlob (function MarshalBlob) to include
"height": b.Height and "signer": b.Signer so the JSON mirrors
blobToProto/types.Blob (or, if omission is intentional, add a clear comment in
MarshalBlob explaining why those fields are excluded). Ensure the keys match
naming style ("height", "signer") and use the raw b.Height and b.Signer values
as in the types.Blob definition.
Overview
Summary by CodeRabbit
New Features
Refactor
Tests
Chores