Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions cmd/apex/blob_cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"os"
"strconv"

"github.com/evstack/apex/pkg/types"
"github.com/spf13/cobra"
)

Expand Down Expand Up @@ -34,12 +35,12 @@ func blobGetCmd() *cobra.Command {
return fmt.Errorf("invalid height: %w", err)
}

ns, err := hex.DecodeString(args[1])
ns, err := hex.DecodeString(types.StripHexPrefix(args[1]))

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The types.StripHexPrefix function is now applied to args[1] for ns decoding. This is a good improvement for robustness, as it handles cases where the input hex string might include a 0x or 0X prefix.

Suggested change
ns, err := hex.DecodeString(types.StripHexPrefix(args[1]))
ns, err := hex.DecodeString(types.StripHexPrefix(args[1]))

if err != nil {
return fmt.Errorf("invalid namespace hex: %w", err)
}

commitment, err := hex.DecodeString(args[2])
commitment, err := hex.DecodeString(types.StripHexPrefix(args[2]))

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Similar to the ns decoding, types.StripHexPrefix is correctly applied to args[2] for commitment decoding, ensuring consistent handling of hex string inputs.

Suggested change
commitment, err := hex.DecodeString(types.StripHexPrefix(args[2]))
commitment, err := hex.DecodeString(types.StripHexPrefix(args[2]))

if err != nil {
return fmt.Errorf("invalid commitment hex: %w", err)
}
Expand All @@ -63,7 +64,7 @@ func blobGetByCommitmentCmd() *cobra.Command {
RunE: func(cmd *cobra.Command, args []string) error {
addr, _ := cmd.Flags().GetString("rpc-addr")

commitment, err := hex.DecodeString(args[0])
commitment, err := hex.DecodeString(types.StripHexPrefix(args[0]))

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The types.StripHexPrefix function is applied to args[0] for commitment decoding. This ensures that the commitment hex string is correctly parsed, even if it contains a 0x or 0X prefix.

Suggested change
commitment, err := hex.DecodeString(types.StripHexPrefix(args[0]))
commitment, err := hex.DecodeString(types.StripHexPrefix(args[0]))

if err != nil {
return fmt.Errorf("invalid commitment hex: %w", err)
}
Expand Down Expand Up @@ -95,7 +96,7 @@ func blobListCmd() *cobra.Command {

var namespaces [][]byte
if nsHex != "" {
ns, err := hex.DecodeString(nsHex)
ns, err := hex.DecodeString(types.StripHexPrefix(nsHex))

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The types.StripHexPrefix function is applied to nsHex before decoding. This is a good practice to handle various input formats for hex strings, making the command-line interface more user-friendly.

Suggested change
ns, err := hex.DecodeString(types.StripHexPrefix(nsHex))
ns, err := hex.DecodeString(types.StripHexPrefix(nsHex))

if err != nil {
return fmt.Errorf("invalid namespace hex: %w", err)
}
Expand Down
89 changes: 64 additions & 25 deletions cmd/apex/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
jsonrpcapi "github.com/evstack/apex/pkg/api/jsonrpc"
"github.com/evstack/apex/pkg/fetch"
"github.com/evstack/apex/pkg/metrics"
"github.com/evstack/apex/pkg/profile"

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The profile package is imported to support the new profiling functionality. This is a necessary addition for enabling pprof endpoints.

Suggested change
"github.com/evstack/apex/pkg/profile"
"github.com/evstack/apex/pkg/profile"

"github.com/evstack/apex/pkg/store"
syncer "github.com/evstack/apex/pkg/sync"
"github.com/evstack/apex/pkg/types"
Expand Down Expand Up @@ -151,6 +152,49 @@ func setupMetrics(cfg *config.Config) (metrics.Recorder, *metrics.Server) {
return rec, srv
}

func setupProfiling(cfg *config.Config) *profile.Server {
if !cfg.Profiling.Enabled {
return nil
}
srv := profile.NewServer(cfg.Profiling.ListenAddr, log.Logger)
go func() {
if err := srv.Start(); err != nil {
log.Error().Err(err).Msg("profiling server error")
}
}()
return srv
}

func openDataSource(ctx context.Context, cfg *config.Config) (fetch.DataFetcher, fetch.ProofForwarder, error) {
switch cfg.DataSource.Type {
case "app":
appFetcher, err := fetch.NewCelestiaAppFetcher(cfg.DataSource.CelestiaAppURL, cfg.DataSource.AuthToken, log.Logger)
if err != nil {
return nil, nil, fmt.Errorf("create celestia-app fetcher: %w", err)
}
return appFetcher, nil, nil
case "node", "":
nodeFetcher, err := fetch.NewCelestiaNodeFetcher(ctx, cfg.DataSource.CelestiaNodeURL, cfg.DataSource.AuthToken, log.Logger)
if err != nil {
return nil, nil, fmt.Errorf("connect to celestia node: %w", err)
}
return nodeFetcher, nodeFetcher, nil
default:
return nil, nil, fmt.Errorf("unsupported data source type: %q", cfg.DataSource.Type)
}
}

func openStore(ctx context.Context, cfg *config.Config) (store.Store, error) {
switch cfg.Storage.Type {
case "s3":
return store.NewS3Store(ctx, cfg.Storage.S3)
case "sqlite", "":
return store.Open(cfg.Storage.DBPath)
default:
return nil, fmt.Errorf("unsupported storage type: %q", cfg.Storage.Type)
}
}

func runIndexer(ctx context.Context, cfg *config.Config) error {
// Parse namespaces from config.
namespaces, err := cfg.ParsedNamespaces()
Expand All @@ -159,14 +203,22 @@ func runIndexer(ctx context.Context, cfg *config.Config) error {
}

rec, metricsSrv := setupMetrics(cfg)
profileSrv := setupProfiling(cfg)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The profileSrv variable is introduced to hold the profiling server instance. This is part of integrating the new profiling feature.

Suggested change
profileSrv := setupProfiling(cfg)
profileSrv := setupProfiling(cfg)


// Open store.
db, err := store.Open(cfg.Storage.DBPath)
db, err := openStore(ctx, cfg)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The openStore function is now used to initialize the database, replacing the direct call to store.Open. This aligns with the new modular approach for handling storage backends.

Suggested change
db, err := openStore(ctx, cfg)
db, err := openStore(ctx, cfg)

if err != nil {
return fmt.Errorf("open store: %w", err)
}
defer db.Close() //nolint:errcheck
db.SetMetrics(rec)

// Wire metrics into the store.
switch s := db.(type) {
case *store.SQLiteStore:
s.SetMetrics(rec)
case *store.S3Store:
s.SetMetrics(rec)
}
Comment on lines +215 to +221

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The logic for wiring metrics into the store is updated to handle both SQLiteStore and S3Store types. This ensures that the new S3 store also reports metrics correctly.

switch s := db.(type) {
	case *store.SQLiteStore:
		s.SetMetrics(rec)
	case *store.S3Store:
		s.SetMetrics(rec)
	}


// Persist configured namespaces.
ctx, stop := signal.NotifyContext(ctx, syscall.SIGINT, syscall.SIGTERM)
Expand All @@ -179,27 +231,9 @@ func runIndexer(ctx context.Context, cfg *config.Config) error {
}

// Connect to data source.
var (
dataFetcher fetch.DataFetcher
proofFwd fetch.ProofForwarder
)
switch cfg.DataSource.Type {
case "app":
appFetcher, err := fetch.NewCelestiaAppFetcher(cfg.DataSource.CelestiaAppURL, cfg.DataSource.AuthToken, log.Logger)
if err != nil {
return fmt.Errorf("create celestia-app fetcher: %w", err)
}
dataFetcher = appFetcher
// celestia-app does not serve blob proofs; proofFwd stays nil.
case "node", "":
nodeFetcher, err := fetch.NewCelestiaNodeFetcher(ctx, cfg.DataSource.CelestiaNodeURL, cfg.DataSource.AuthToken, log.Logger)
if err != nil {
return fmt.Errorf("connect to celestia node: %w", err)
}
dataFetcher = nodeFetcher
proofFwd = nodeFetcher
default:
return fmt.Errorf("unsupported data source type: %q", cfg.DataSource.Type)
dataFetcher, proofFwd, err := openDataSource(ctx, cfg)
if err != nil {
return err
}
Comment on lines +234 to 237

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The data source initialization is refactored to use the new openDataSource function, simplifying the runIndexer logic and making it more readable.

dataFetcher, proofFwd, err := openDataSource(ctx, cfg)
	if err != nil {
		return err
	}

defer dataFetcher.Close() //nolint:errcheck

Expand Down Expand Up @@ -263,7 +297,7 @@ func runIndexer(ctx context.Context, cfg *config.Config) error {

err = coord.Run(ctx)

gracefulShutdown(httpSrv, grpcSrv, metricsSrv)
gracefulShutdown(httpSrv, grpcSrv, metricsSrv, profileSrv)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The profileSrv is added to the gracefulShutdown function call, ensuring that the profiling server is also gracefully shut down when the indexer stops.

gracefulShutdown(httpSrv, grpcSrv, metricsSrv, profileSrv)


if err != nil && !errors.Is(err, context.Canceled) {
return fmt.Errorf("coordinator: %w", err)
Expand All @@ -273,7 +307,7 @@ func runIndexer(ctx context.Context, cfg *config.Config) error {
return nil
}

func gracefulShutdown(httpSrv *http.Server, grpcSrv *grpc.Server, metricsSrv *metrics.Server) {
func gracefulShutdown(httpSrv *http.Server, grpcSrv *grpc.Server, metricsSrv *metrics.Server, profileSrv *profile.Server) {
stopped := make(chan struct{})
go func() {
grpcSrv.GracefulStop()
Expand All @@ -299,4 +333,9 @@ func gracefulShutdown(httpSrv *http.Server, grpcSrv *grpc.Server, metricsSrv *me
log.Error().Err(err).Msg("metrics server shutdown error")
}
}
if profileSrv != nil {
if err := profileSrv.Shutdown(shutdownCtx); err != nil {
log.Error().Err(err).Msg("profiling server shutdown error")
}
}
Comment on lines +336 to +340

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This block adds logic to gracefully shut down the profiling server if it's enabled. This is important for proper resource management and clean exits.

if profileSrv != nil {
		if err := profileSrv.Shutdown(shutdownCtx); err != nil {
			log.Error().Err(err).Msg("profiling server shutdown error")
		}
	}

}
31 changes: 29 additions & 2 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ type Config struct {
Sync SyncConfig `yaml:"sync"`
Subscription SubscriptionConfig `yaml:"subscription"`
Metrics MetricsConfig `yaml:"metrics"`
Profiling ProfilingConfig `yaml:"profiling"`

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The ProfilingConfig field is added to the main Config struct. This enables the application to load profiling-related settings from the configuration file.

Suggested change
Profiling ProfilingConfig `yaml:"profiling"`
Profiling ProfilingConfig `yaml:"profiling"`

Log LogConfig `yaml:"log"`
}

Expand All @@ -28,9 +29,24 @@ type DataSourceConfig struct {
Namespaces []string `yaml:"namespaces"`
}

// StorageConfig configures the SQLite database.
// StorageConfig configures the persistence backend.
// Type selects the backend: "sqlite" (default) uses a local SQLite file,
// "s3" uses an S3-compatible object store.
type StorageConfig struct {
DBPath string `yaml:"db_path"`
Type string `yaml:"type"` // "sqlite" (default) or "s3"
DBPath string `yaml:"db_path"` // SQLite path (used when type=sqlite)
S3 *S3Config `yaml:"s3"` // S3 settings (used when type=s3)
Comment on lines +36 to +38

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The Type and S3 fields are added to StorageConfig to allow selection between SQLite and S3 storage, and to hold S3-specific configuration.

Suggested change
Type string `yaml:"type"` // "sqlite" (default) or "s3"
DBPath string `yaml:"db_path"` // SQLite path (used when type=sqlite)
S3 *S3Config `yaml:"s3"` // S3 settings (used when type=s3)
Type string `yaml:"type"` // "sqlite" (default) or "s3"
DBPath string `yaml:"db_path"` // SQLite path (used when type=sqlite)
S3 *S3Config `yaml:"s3"` // S3 settings (used when type=s3)

}

// S3Config configures an S3-compatible object store backend.
// Credentials are resolved via standard AWS SDK mechanisms
// (env vars, IAM role, shared credentials file).
type S3Config struct {
Bucket string `yaml:"bucket"`
Prefix string `yaml:"prefix"`
Region string `yaml:"region"`
Endpoint string `yaml:"endpoint"` // custom endpoint for MinIO, R2, Spaces
ChunkSize int `yaml:"chunk_size"` // heights per S3 object, default 64
}

// RPCConfig configures the API servers.
Expand All @@ -57,6 +73,12 @@ type MetricsConfig struct {
ListenAddr string `yaml:"listen_addr"`
}

// ProfilingConfig configures pprof profiling endpoints.
type ProfilingConfig struct {
Enabled bool `yaml:"enabled"`
ListenAddr string `yaml:"listen_addr"`
}
Comment on lines +77 to +80

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The ProfilingConfig struct is added to define configuration options for pprof profiling endpoints, including Enabled and ListenAddr.

Suggested change
type ProfilingConfig struct {
Enabled bool `yaml:"enabled"`
ListenAddr string `yaml:"listen_addr"`
}
// ProfilingConfig configures pprof profiling endpoints.
type ProfilingConfig struct {
Enabled bool `yaml:"enabled"`
ListenAddr string `yaml:"listen_addr"`
}


// LogConfig configures logging.
type LogConfig struct {
Level string `yaml:"level"`
Expand All @@ -71,6 +93,7 @@ func DefaultConfig() Config {
CelestiaNodeURL: "http://localhost:26658",
},
Storage: StorageConfig{
Type: "sqlite",

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The Type field is added to the default StorageConfig to explicitly set it to "sqlite". This ensures that SQLite remains the default storage backend when no type is specified in the configuration.

Suggested change
Type: "sqlite",
Type: "sqlite",

DBPath: "apex.db",
},
RPC: RPCConfig{
Expand All @@ -88,6 +111,10 @@ func DefaultConfig() Config {
Enabled: true,
ListenAddr: ":9091",
},
Profiling: ProfilingConfig{
Enabled: false,
ListenAddr: "127.0.0.1:6061",
},
Comment on lines +114 to +117

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Default values for ProfilingConfig are added, with profiling disabled by default and a loopback address for ListenAddr. This provides a sensible default configuration for the new profiling feature.

Suggested change
Profiling: ProfilingConfig{
Enabled: false,
ListenAddr: "127.0.0.1:6061",
},
Profiling: ProfilingConfig{
Enabled: false,
ListenAddr: "127.0.0.1:6061",
},

Log: LogConfig{
Level: "info",
Format: "json",
Expand Down
54 changes: 51 additions & 3 deletions config/load.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,20 @@ data_source:
namespaces: []

storage:
# Path to the SQLite database file
# Storage backend: "sqlite" (default) or "s3"
type: "sqlite"

# Path to the SQLite database file (used when type: "sqlite")
db_path: "apex.db"

# S3-compatible object store settings (used when type: "s3")
# s3:
# bucket: "my-apex-bucket"
# prefix: "indexer"
# region: "us-east-1"
# endpoint: "" # custom endpoint for MinIO, R2, etc.
# chunk_size: 64 # heights per S3 object

rpc:
# Address for the JSON-RPC API server (HTTP/WebSocket)
listen_addr: ":8080"
Expand All @@ -72,6 +83,12 @@ metrics:
# Address for the metrics server
listen_addr: ":9091"

profiling:
# Enable pprof endpoints (/debug/pprof/*)
enabled: false
# Bind address for profiling HTTP server (prefer loopback)
listen_addr: "127.0.0.1:6061"

log:
# Log level: trace, debug, info, warn, error, fatal, panic
level: "info"
Expand Down Expand Up @@ -133,12 +150,40 @@ func validateDataSource(ds *DataSourceConfig) error {
return nil
}

func validateStorage(s *StorageConfig) error {
switch s.Type {
case "s3":
if s.S3 == nil {
return fmt.Errorf("storage.s3 is required when storage.type is \"s3\"")
}
if s.S3.Bucket == "" {
return fmt.Errorf("storage.s3.bucket is required")
}
if s.S3.Region == "" && s.S3.Endpoint == "" {
return fmt.Errorf("storage.s3.region is required (unless endpoint is set)")
}
if s.S3.ChunkSize == 0 {
s.S3.ChunkSize = 64
}
if s.S3.ChunkSize < 0 {
return fmt.Errorf("storage.s3.chunk_size must be positive")
}
case "sqlite", "":
if s.DBPath == "" {
return fmt.Errorf("storage.db_path is required")
}
default:
return fmt.Errorf("storage.type %q is invalid; must be \"sqlite\" or \"s3\"", s.Type)
}
return nil
}

func validate(cfg *Config) error {
if err := validateDataSource(&cfg.DataSource); err != nil {
return err
}
if cfg.Storage.DBPath == "" {
return fmt.Errorf("storage.db_path is required")
if err := validateStorage(&cfg.Storage); err != nil {
return err
Comment on lines +185 to +186

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The validateStorage function is now called within the main validate function to ensure that the storage configuration is valid. This replaces the previous direct check for cfg.Storage.DBPath.

Suggested change
if err := validateStorage(&cfg.Storage); err != nil {
return err
if err := validateStorage(&cfg.Storage); err != nil {
return err
}

}
if cfg.RPC.ListenAddr == "" {
return fmt.Errorf("rpc.listen_addr is required")
Expand All @@ -158,6 +203,9 @@ func validate(cfg *Config) error {
if cfg.Metrics.Enabled && cfg.Metrics.ListenAddr == "" {
return fmt.Errorf("metrics.listen_addr is required when metrics are enabled")
}
if cfg.Profiling.Enabled && cfg.Profiling.ListenAddr == "" {
return fmt.Errorf("profiling.listen_addr is required when profiling is enabled")
}
Comment on lines +206 to +208

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

A new validation check is added for ProfilingConfig, ensuring that profiling.listen_addr is provided if profiling is enabled. This prevents potential runtime errors if the profiling server cannot bind to an address.

Suggested change
if cfg.Profiling.Enabled && cfg.Profiling.ListenAddr == "" {
return fmt.Errorf("profiling.listen_addr is required when profiling is enabled")
}
if cfg.Profiling.Enabled && cfg.Profiling.ListenAddr == "" {
return fmt.Errorf("profiling.listen_addr is required when profiling is enabled")
}

if !validLogLevels[cfg.Log.Level] {
return fmt.Errorf("log.level %q is invalid; must be one of trace/debug/info/warn/error/fatal/panic", cfg.Log.Level)
}
Expand Down
Loading