Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions cmd/apex/blob_cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ func blobCmd() *cobra.Command {
Short: "Query blobs from the indexer",
}
cmd.AddCommand(blobGetCmd())
cmd.AddCommand(blobGetByCommitmentCmd())
cmd.AddCommand(blobListCmd())
return cmd
}
Expand Down Expand Up @@ -54,6 +55,30 @@ func blobGetCmd() *cobra.Command {
}
}

func blobGetByCommitmentCmd() *cobra.Command {
return &cobra.Command{
Use: "get-by-commitment <commitment-hex>",
Short: "Get a blob by commitment alone (no height required)",
Args: cobra.ExactArgs(1),
RunE: func(cmd *cobra.Command, args []string) error {
addr, _ := cmd.Flags().GetString("rpc-addr")

commitment, err := hex.DecodeString(args[0])
if err != nil {
return fmt.Errorf("invalid commitment hex: %w", err)
}

client := newRPCClient(addr)
result, err := client.call(cmd.Context(), "blob.GetByCommitment", commitment)
if err != nil {
return err
}

return printJSON(cmd, result)
},
}
}

func blobListCmd() *cobra.Command {
cmd := &cobra.Command{
Use: "list <height>",
Expand Down
45 changes: 34 additions & 11 deletions cmd/apex/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,11 +106,16 @@ func startCmd() *cobra.Command {

setupLogger(cfg.Log)

log.Info().
startLog := log.Info().
Str("version", version).
Str("node_url", cfg.DataSource.CelestiaNodeURL).
Int("namespaces", len(cfg.DataSource.Namespaces)).
Msg("starting apex indexer")
Str("datasource_type", cfg.DataSource.Type).
Int("namespaces", len(cfg.DataSource.Namespaces))
if cfg.DataSource.Type == "app" {
startLog = startLog.Str("app_url", cfg.DataSource.CelestiaAppURL)
} else {
startLog = startLog.Str("node_url", cfg.DataSource.CelestiaNodeURL)
}
startLog.Msg("starting apex indexer")

return runIndexer(cmd.Context(), cfg)
},
Expand Down Expand Up @@ -173,20 +178,38 @@ func runIndexer(ctx context.Context, cfg *config.Config) error {
}
}

// Connect to Celestia node.
fetcher, err := fetch.NewCelestiaNodeFetcher(ctx, cfg.DataSource.CelestiaNodeURL, cfg.DataSource.AuthToken, log.Logger)
if err != nil {
return fmt.Errorf("connect to celestia node: %w", err)
// Connect to data source.
var (
dataFetcher fetch.DataFetcher
proofFwd fetch.ProofForwarder
)
switch cfg.DataSource.Type {
case "app":
appFetcher, err := fetch.NewCelestiaAppFetcher(cfg.DataSource.CelestiaAppURL, cfg.DataSource.AuthToken, log.Logger)
if err != nil {
return fmt.Errorf("create celestia-app fetcher: %w", err)
}
dataFetcher = appFetcher
// celestia-app does not serve blob proofs; proofFwd stays nil.
case "node", "":
nodeFetcher, err := fetch.NewCelestiaNodeFetcher(ctx, cfg.DataSource.CelestiaNodeURL, cfg.DataSource.AuthToken, log.Logger)
if err != nil {
return fmt.Errorf("connect to celestia node: %w", err)
}
dataFetcher = nodeFetcher
proofFwd = nodeFetcher
default:
return fmt.Errorf("unsupported data source type: %q", cfg.DataSource.Type)
}
defer fetcher.Close() //nolint:errcheck
defer dataFetcher.Close() //nolint:errcheck

// Set up API layer.
notifier := api.NewNotifier(cfg.Subscription.BufferSize, log.Logger)
notifier.SetMetrics(rec)
svc := api.NewService(db, fetcher, fetcher, notifier, log.Logger)
svc := api.NewService(db, dataFetcher, proofFwd, notifier, log.Logger)

// Build and run the sync coordinator with observer hook.
coord := syncer.New(db, fetcher,
coord := syncer.New(db, dataFetcher,
syncer.WithStartHeight(cfg.Sync.StartHeight),
syncer.WithBatchSize(cfg.Sync.BatchSize),
syncer.WithConcurrency(cfg.Sync.Concurrency),
Expand Down
7 changes: 6 additions & 1 deletion config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,13 @@ type Config struct {
Log LogConfig `yaml:"log"`
}

// DataSourceConfig configures the Celestia node connection.
// DataSourceConfig configures the Celestia data source.
// Type selects the backend: "node" (default) uses a Celestia DA node,
// "app" uses a celestia-app consensus node via CometBFT RPC.
type DataSourceConfig struct {
Type string `yaml:"type"` // "node" (default) or "app"
CelestiaNodeURL string `yaml:"celestia_node_url"`
CelestiaAppURL string `yaml:"celestia_app_url"`
AuthToken string `yaml:"-"` // populated only via APEX_AUTH_TOKEN env var
Namespaces []string `yaml:"namespaces"`
}
Expand Down Expand Up @@ -63,6 +67,7 @@ type LogConfig struct {
func DefaultConfig() Config {
return Config{
DataSource: DataSourceConfig{
Type: "node",
CelestiaNodeURL: "http://localhost:26658",
},
Storage: StorageConfig{
Expand Down
40 changes: 30 additions & 10 deletions config/load.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,15 @@ const defaultConfigYAML = `# Apex configuration
# Generated by: apex init

data_source:
# Celestia node RPC endpoint
# Data source type: "node" (Celestia DA node) or "app" (celestia-app CometBFT RPC)
type: "node"

# Celestia DA node RPC endpoint (required when type: "node")
celestia_node_url: "http://localhost:26658"

# Celestia-app CometBFT RPC endpoint (required when type: "app")
# celestia_app_url: "http://localhost:26657"

# Auth token: set via APEX_AUTH_TOKEN env var (not read from this file).

# Namespaces to index (hex-encoded, 29 bytes = 58 hex chars each).
Expand Down Expand Up @@ -106,9 +112,30 @@ func Load(path string) (*Config, error) {
return &cfg, nil
}

func validateDataSource(ds *DataSourceConfig) error {
switch ds.Type {
case "node", "":
if ds.CelestiaNodeURL == "" {
return fmt.Errorf("data_source.celestia_node_url is required for type \"node\"")
}
case "app":
if ds.CelestiaAppURL == "" {
return fmt.Errorf("data_source.celestia_app_url is required for type \"app\"")
}
default:
return fmt.Errorf("data_source.type %q is invalid; must be \"node\" or \"app\"", ds.Type)
}
for _, ns := range ds.Namespaces {
if _, err := types.NamespaceFromHex(ns); err != nil {
return fmt.Errorf("invalid namespace %q: %w", ns, err)
}
}
return nil
}

func validate(cfg *Config) error {
if cfg.DataSource.CelestiaNodeURL == "" {
return fmt.Errorf("data_source.celestia_node_url is required")
if err := validateDataSource(&cfg.DataSource); err != nil {
return err
}
if cfg.Storage.DBPath == "" {
return fmt.Errorf("storage.db_path is required")
Expand Down Expand Up @@ -138,12 +165,5 @@ func validate(cfg *Config) error {
return fmt.Errorf("log.format %q is invalid; must be json or console", cfg.Log.Format)
}

// Validate namespace hex strings.
for _, ns := range cfg.DataSource.Namespaces {
if _, err := types.NamespaceFromHex(ns); err != nil {
return fmt.Errorf("invalid namespace %q: %w", ns, err)
}
}

return nil
}
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ go 1.25.0

require (
github.com/filecoin-project/go-jsonrpc v0.10.1
github.com/gorilla/websocket v1.4.2
github.com/prometheus/client_golang v1.23.2
github.com/rs/zerolog v1.34.0
github.com/spf13/cobra v1.10.2
Expand All @@ -19,7 +20,6 @@ require (
github.com/dustin/go-humanize v1.0.1 // indirect
github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6 // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/gorilla/websocket v1.4.2 // indirect
github.com/inconshreveable/mousetrap v1.1.0 // indirect
github.com/ipfs/go-log/v2 v2.0.8 // indirect
github.com/kr/text v0.2.0 // indirect
Expand Down
17 changes: 17 additions & 0 deletions pkg/api/grpc/blob_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package grpcapi
import (
"bytes"
"context"
"errors"
"fmt"

"google.golang.org/grpc"
Expand Down Expand Up @@ -41,6 +42,22 @@ func (s *BlobServiceServer) Get(ctx context.Context, req *pb.GetRequest) (*pb.Ge
return nil, status.Error(codes.NotFound, store.ErrNotFound.Error())
}

func (s *BlobServiceServer) GetByCommitment(ctx context.Context, req *pb.GetByCommitmentRequest) (*pb.GetByCommitmentResponse, error) {
if len(req.Commitment) == 0 {
return nil, status.Error(codes.InvalidArgument, "commitment is required")
}

b, err := s.svc.Store().GetBlobByCommitment(ctx, req.Commitment)
if err != nil {
if errors.Is(err, store.ErrNotFound) {
return nil, status.Error(codes.NotFound, store.ErrNotFound.Error())
}
return nil, status.Errorf(codes.Internal, "get blob by commitment: %v", err)
}

return &pb.GetByCommitmentResponse{Blob: blobToProto(b)}, nil
}

func (s *BlobServiceServer) GetAll(ctx context.Context, req *pb.GetAllRequest) (*pb.GetAllResponse, error) {
const maxNamespaces = 16
if len(req.Namespaces) > maxNamespaces {
Expand Down
Loading