Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 83 additions & 10 deletions CLAUDE.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
# Apex — Celestia Namespace Indexer

Lightweight indexer that watches Celestia namespaces, stores blobs/headers in SQLite, and exposes them via JSON-RPC, gRPC, and REST health endpoints. Includes Prometheus observability, a CLI client, and multi-stage Docker build.

## Build Commands

```bash
Expand All @@ -15,26 +17,97 @@ just tidy # go mod tidy

## Architecture

Apex is a lightweight indexer that watches Celestia namespaces, stores blobs/headers in SQLite, and exposes them via an HTTP API.
### Data Flow

```
cmd/apex/ CLI entrypoint (cobra)
config/ YAML config loading and validation
pkg/types/ Domain types (Namespace, Blob, Header, SyncState)
pkg/store/ Storage interface (SQLite impl in Phase 1)
pkg/fetch/ Data fetcher interface (Celestia node client in Phase 1)
pkg/sync/ Sync coordinator (backfill + streaming)
pkg/api/ HTTP API server (Phase 2)
Celestia Node → Fetcher → Sync Coordinator → Store (SQLite)
→ Notifier → Subscribers
API (JSON-RPC + gRPC + Health)
```

The sync coordinator runs in two phases: **backfill** (historical blocks in batches) then **streaming** (live via header subscription). Height observers publish events to the notifier which fans out to API subscribers.

### File Structure

```
cmd/apex/
main.go CLI entrypoint, server wiring, graceful shutdown
client.go Thin HTTP JSON-RPC client for CLI commands
status.go `apex status` command (health endpoint)
blob_cmd.go `apex blob get|list` commands
config_cmd.go `apex config validate|show` commands

config/
config.go Config structs (DataSource, Storage, RPC, Sync, Metrics, Log)
load.go YAML loading, validation, env var override, template generation

pkg/types/
types.go Domain types: Namespace, Blob, Header, SyncState, SyncStatus

pkg/store/
store.go Store interface (PutBlobs, GetBlobs, PutHeader, GetHeader, sync state)
sqlite.go SQLite implementation with metrics instrumentation
migrations/ SQL migration files

pkg/fetch/
fetcher.go DataFetcher + ProofForwarder interfaces
celestia_node.go Celestia node-api client (headers, blobs, subscriptions, proofs)

pkg/sync/
coordinator.go Sync lifecycle: initialize → backfill → stream, tracks heights
backfill.go Concurrent batch backfill with configurable batch size/concurrency
subscription.go Header subscription manager for live streaming

pkg/api/
service.go API service layer (blob/header queries, proof forwarding, subscriptions)
notifier.go Event fan-out to subscribers with bounded buffers
health.go /health and /health/ready HTTP endpoints, HealthStatus JSON
jsonrpc/ JSON-RPC server (go-jsonrpc), blob/header/subscription handlers
grpc/ gRPC server, protobuf service implementations
gen/apex/v1/ Generated protobuf Go code

pkg/metrics/
metrics.go Recorder interface (nil-safe), nopRecorder, PromRecorder (Prometheus)
server.go HTTP server for /metrics endpoint

proto/apex/v1/ Protobuf definitions (blob, header, types)

Dockerfile Multi-stage build (golang builder + distroless runtime)
```

### Key Interfaces

- **`store.Store`** — persistence (SQLite impl, instrumented with metrics)
- **`fetch.DataFetcher`** — block data retrieval (Celestia node client)
- **`fetch.ProofForwarder`** — proof/inclusion forwarding to upstream node
- **`metrics.Recorder`** — nil-safe metrics abstraction (Prometheus or no-op)
- **`api.StatusProvider`** — sync status for health endpoints (implemented by coordinator)

### Ports (defaults)

| Port | Protocol | Purpose |
|-------|----------|------------------|
| :8080 | HTTP | JSON-RPC + health|
| :9090 | TCP | gRPC |
| :9091 | HTTP | Prometheus /metrics |

### Config

YAML with strict unknown-field rejection. Auth token via `APEX_AUTH_TOKEN` env var only (not in config file). See `config/config.go` for all fields and `DefaultConfig()` for defaults.

## Conventions

- Go 1.23 minimum (slog, range-over-func available)
- Go 1.25+ (`go.mod` specifies 1.25.0)
- SQLite via `modernc.org/sqlite` (CGo-free)
- Config: YAML (`gopkg.in/yaml.v3`), strict unknown-field rejection
- Logging: `rs/zerolog`
- CLI: `spf13/cobra`
- Linter: golangci-lint v2 (.golangci.yml v2 format)
- Metrics: `prometheus/client_golang` behind nil-safe `Recorder` interface
- JSON-RPC: `filecoin-project/go-jsonrpc`
- gRPC: `google.golang.org/grpc` + `google.golang.org/protobuf`
- Protobuf codegen: `buf` (`buf.yaml` + `buf.gen.yaml`)
- Linter: golangci-lint v2 (.golangci.yml v2 format), gocyclo max 15
- Formatter: gofumpt
- Build runner: just (justfile)

Expand Down
22 changes: 22 additions & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
FROM golang:1.25-bookworm AS builder

WORKDIR /src

COPY go.mod go.sum ./
RUN go mod download

COPY . .

RUN CGO_ENABLED=0 go build -trimpath \
-ldflags="-s -w -X main.version=$(git describe --tags --always --dirty 2>/dev/null || echo docker)" \
-o /apex ./cmd/apex

FROM gcr.io/distroless/static-debian12:nonroot

COPY --from=builder /apex /apex

USER 65532:65532

EXPOSE 8080 9090 9091

ENTRYPOINT ["/apex", "start"]
106 changes: 106 additions & 0 deletions cmd/apex/blob_cmd.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
package main

import (
"encoding/hex"
"encoding/json"
"fmt"
"os"
"strconv"

"github.com/spf13/cobra"
)

func blobCmd() *cobra.Command {
cmd := &cobra.Command{
Use: "blob",
Short: "Query blobs from the indexer",
}
cmd.AddCommand(blobGetCmd())
cmd.AddCommand(blobListCmd())
return cmd
}

func blobGetCmd() *cobra.Command {
return &cobra.Command{
Use: "get <height> <namespace-hex> <commitment-hex>",
Short: "Get a single blob by height, namespace, and commitment",
Args: cobra.ExactArgs(3),
RunE: func(cmd *cobra.Command, args []string) error {
addr, _ := cmd.Flags().GetString("rpc-addr")

height, err := strconv.ParseUint(args[0], 10, 64)
if err != nil {
return fmt.Errorf("invalid height: %w", err)
}

ns, err := hex.DecodeString(args[1])
if err != nil {
return fmt.Errorf("invalid namespace hex: %w", err)
}

commitment, err := hex.DecodeString(args[2])
if err != nil {
return fmt.Errorf("invalid commitment hex: %w", err)
}

client := newRPCClient(addr)
result, err := client.call(cmd.Context(), "blob.Get", height, ns, commitment)
if err != nil {
return err
}

return printJSON(cmd, result)
},
}
}

func blobListCmd() *cobra.Command {
cmd := &cobra.Command{
Use: "list <height>",
Short: "List all blobs at a given height",
Args: cobra.ExactArgs(1),
RunE: func(cmd *cobra.Command, args []string) error {
addr, _ := cmd.Flags().GetString("rpc-addr")
nsHex, _ := cmd.Flags().GetString("namespace")

height, err := strconv.ParseUint(args[0], 10, 64)
if err != nil {
return fmt.Errorf("invalid height: %w", err)
}

var namespaces [][]byte
if nsHex != "" {
ns, err := hex.DecodeString(nsHex)
if err != nil {
return fmt.Errorf("invalid namespace hex: %w", err)
}
namespaces = [][]byte{ns}
}

client := newRPCClient(addr)
result, err := client.call(cmd.Context(), "blob.GetAll", height, namespaces)
if err != nil {
return err
}

return printJSON(cmd, result)
},
}

cmd.Flags().String("namespace", "", "filter by namespace (hex-encoded)")
return cmd
}

func printJSON(_ *cobra.Command, raw json.RawMessage) error {
return prettyPrintJSON(raw)
}

func prettyPrintJSON(raw json.RawMessage) error {
var out any
if err := json.Unmarshal(raw, &out); err != nil {
return err
}
enc := json.NewEncoder(os.Stdout)
enc.SetIndent("", " ")
return enc.Encode(out)
}
108 changes: 108 additions & 0 deletions cmd/apex/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
package main

import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"time"
)

// rpcClient is a thin JSON-RPC client over HTTP for CLI commands.
type rpcClient struct {
url string
client *http.Client
}

func newRPCClient(addr string) *rpcClient {
return &rpcClient{
url: "http://" + addr,
client: &http.Client{
Timeout: 10 * time.Second,
},
}
}

type jsonRPCRequest struct {
Jsonrpc string `json:"jsonrpc"`
Method string `json:"method"`
Params []any `json:"params"`
ID int `json:"id"`
}

type jsonRPCResponse struct {
Result json.RawMessage `json:"result"`
Error *jsonRPCError `json:"error,omitempty"`
}

type jsonRPCError struct {
Code int `json:"code"`
Message string `json:"message"`
}

func (c *rpcClient) call(ctx context.Context, method string, params ...any) (json.RawMessage, error) {
if params == nil {
params = []any{}
}

body, err := json.Marshal(jsonRPCRequest{
Jsonrpc: "2.0",
Method: method,
Params: params,
ID: 1,
})
if err != nil {
return nil, fmt.Errorf("marshal request: %w", err)
}

req, err := http.NewRequestWithContext(ctx, http.MethodPost, c.url, bytes.NewReader(body))
if err != nil {
return nil, fmt.Errorf("create request: %w", err)
}
req.Header.Set("Content-Type", "application/json")

resp, err := c.client.Do(req)
if err != nil {
return nil, fmt.Errorf("send request: %w", err)
}
defer resp.Body.Close() //nolint:errcheck

respBody, err := io.ReadAll(resp.Body)
if err != nil {
return nil, fmt.Errorf("read response: %w", err)
}

var rpcResp jsonRPCResponse
if err := json.Unmarshal(respBody, &rpcResp); err != nil {
return nil, fmt.Errorf("unmarshal response: %w", err)
}

if rpcResp.Error != nil {
return nil, fmt.Errorf("rpc error %d: %s", rpcResp.Error.Code, rpcResp.Error.Message)
}

return rpcResp.Result, nil
}

// fetchHealth fetches the health endpoint directly over HTTP.
func (c *rpcClient) fetchHealth(ctx context.Context) (json.RawMessage, error) {
req, err := http.NewRequestWithContext(ctx, http.MethodGet, c.url+"/health", nil)
if err != nil {
return nil, fmt.Errorf("create request: %w", err)
}

resp, err := c.client.Do(req)
if err != nil {
return nil, fmt.Errorf("send request: %w", err)
}
defer resp.Body.Close() //nolint:errcheck

body, err := io.ReadAll(resp.Body)
if err != nil {
return nil, fmt.Errorf("read response: %w", err)
}

return body, nil
}
Loading