-
Notifications
You must be signed in to change notification settings - Fork 0
feat: add S3-compatible storage backend #32
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
|
|
@@ -7,6 +7,7 @@ import ( | |||||
| "os" | ||||||
| "strconv" | ||||||
|
|
||||||
| "github.com/evstack/apex/pkg/types" | ||||||
| "github.com/spf13/cobra" | ||||||
| ) | ||||||
|
|
||||||
|
|
@@ -34,12 +35,12 @@ func blobGetCmd() *cobra.Command { | |||||
| return fmt.Errorf("invalid height: %w", err) | ||||||
| } | ||||||
|
|
||||||
| ns, err := hex.DecodeString(args[1]) | ||||||
| ns, err := hex.DecodeString(types.StripHexPrefix(args[1])) | ||||||
| if err != nil { | ||||||
| return fmt.Errorf("invalid namespace hex: %w", err) | ||||||
| } | ||||||
|
|
||||||
| commitment, err := hex.DecodeString(args[2]) | ||||||
| commitment, err := hex.DecodeString(types.StripHexPrefix(args[2])) | ||||||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Similar to the
Suggested change
|
||||||
| if err != nil { | ||||||
| return fmt.Errorf("invalid commitment hex: %w", err) | ||||||
| } | ||||||
|
|
@@ -63,7 +64,7 @@ func blobGetByCommitmentCmd() *cobra.Command { | |||||
| RunE: func(cmd *cobra.Command, args []string) error { | ||||||
| addr, _ := cmd.Flags().GetString("rpc-addr") | ||||||
|
|
||||||
| commitment, err := hex.DecodeString(args[0]) | ||||||
| commitment, err := hex.DecodeString(types.StripHexPrefix(args[0])) | ||||||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The
Suggested change
|
||||||
| if err != nil { | ||||||
| return fmt.Errorf("invalid commitment hex: %w", err) | ||||||
| } | ||||||
|
|
@@ -95,7 +96,7 @@ func blobListCmd() *cobra.Command { | |||||
|
|
||||||
| var namespaces [][]byte | ||||||
| if nsHex != "" { | ||||||
| ns, err := hex.DecodeString(nsHex) | ||||||
| ns, err := hex.DecodeString(types.StripHexPrefix(nsHex)) | ||||||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The
Suggested change
|
||||||
| if err != nil { | ||||||
| return fmt.Errorf("invalid namespace hex: %w", err) | ||||||
| } | ||||||
|
|
||||||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -22,6 +22,7 @@ import ( | |
| jsonrpcapi "github.com/evstack/apex/pkg/api/jsonrpc" | ||
| "github.com/evstack/apex/pkg/fetch" | ||
| "github.com/evstack/apex/pkg/metrics" | ||
| "github.com/evstack/apex/pkg/profile" | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
| "github.com/evstack/apex/pkg/store" | ||
| syncer "github.com/evstack/apex/pkg/sync" | ||
| "github.com/evstack/apex/pkg/types" | ||
|
|
@@ -151,6 +152,49 @@ func setupMetrics(cfg *config.Config) (metrics.Recorder, *metrics.Server) { | |
| return rec, srv | ||
| } | ||
|
|
||
| func setupProfiling(cfg *config.Config) *profile.Server { | ||
| if !cfg.Profiling.Enabled { | ||
| return nil | ||
| } | ||
| srv := profile.NewServer(cfg.Profiling.ListenAddr, log.Logger) | ||
| go func() { | ||
| if err := srv.Start(); err != nil { | ||
| log.Error().Err(err).Msg("profiling server error") | ||
| } | ||
| }() | ||
| return srv | ||
| } | ||
|
|
||
| func openDataSource(ctx context.Context, cfg *config.Config) (fetch.DataFetcher, fetch.ProofForwarder, error) { | ||
| switch cfg.DataSource.Type { | ||
| case "app": | ||
| appFetcher, err := fetch.NewCelestiaAppFetcher(cfg.DataSource.CelestiaAppURL, cfg.DataSource.AuthToken, log.Logger) | ||
| if err != nil { | ||
| return nil, nil, fmt.Errorf("create celestia-app fetcher: %w", err) | ||
| } | ||
| return appFetcher, nil, nil | ||
| case "node", "": | ||
| nodeFetcher, err := fetch.NewCelestiaNodeFetcher(ctx, cfg.DataSource.CelestiaNodeURL, cfg.DataSource.AuthToken, log.Logger) | ||
| if err != nil { | ||
| return nil, nil, fmt.Errorf("connect to celestia node: %w", err) | ||
| } | ||
| return nodeFetcher, nodeFetcher, nil | ||
| default: | ||
| return nil, nil, fmt.Errorf("unsupported data source type: %q", cfg.DataSource.Type) | ||
| } | ||
| } | ||
|
|
||
| func openStore(ctx context.Context, cfg *config.Config) (store.Store, error) { | ||
| switch cfg.Storage.Type { | ||
| case "s3": | ||
| return store.NewS3Store(ctx, cfg.Storage.S3) | ||
| case "sqlite", "": | ||
| return store.Open(cfg.Storage.DBPath) | ||
| default: | ||
| return nil, fmt.Errorf("unsupported storage type: %q", cfg.Storage.Type) | ||
| } | ||
| } | ||
|
|
||
| func runIndexer(ctx context.Context, cfg *config.Config) error { | ||
| // Parse namespaces from config. | ||
| namespaces, err := cfg.ParsedNamespaces() | ||
|
|
@@ -159,14 +203,22 @@ func runIndexer(ctx context.Context, cfg *config.Config) error { | |
| } | ||
|
|
||
| rec, metricsSrv := setupMetrics(cfg) | ||
| profileSrv := setupProfiling(cfg) | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
|
|
||
| // Open store. | ||
| db, err := store.Open(cfg.Storage.DBPath) | ||
| db, err := openStore(ctx, cfg) | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
| if err != nil { | ||
| return fmt.Errorf("open store: %w", err) | ||
| } | ||
| defer db.Close() //nolint:errcheck | ||
| db.SetMetrics(rec) | ||
|
|
||
| // Wire metrics into the store. | ||
| switch s := db.(type) { | ||
| case *store.SQLiteStore: | ||
| s.SetMetrics(rec) | ||
| case *store.S3Store: | ||
| s.SetMetrics(rec) | ||
| } | ||
|
Comment on lines
+215
to
+221
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
|
|
||
| // Persist configured namespaces. | ||
| ctx, stop := signal.NotifyContext(ctx, syscall.SIGINT, syscall.SIGTERM) | ||
|
|
@@ -179,27 +231,9 @@ func runIndexer(ctx context.Context, cfg *config.Config) error { | |
| } | ||
|
|
||
| // Connect to data source. | ||
| var ( | ||
| dataFetcher fetch.DataFetcher | ||
| proofFwd fetch.ProofForwarder | ||
| ) | ||
| switch cfg.DataSource.Type { | ||
| case "app": | ||
| appFetcher, err := fetch.NewCelestiaAppFetcher(cfg.DataSource.CelestiaAppURL, cfg.DataSource.AuthToken, log.Logger) | ||
| if err != nil { | ||
| return fmt.Errorf("create celestia-app fetcher: %w", err) | ||
| } | ||
| dataFetcher = appFetcher | ||
| // celestia-app does not serve blob proofs; proofFwd stays nil. | ||
| case "node", "": | ||
| nodeFetcher, err := fetch.NewCelestiaNodeFetcher(ctx, cfg.DataSource.CelestiaNodeURL, cfg.DataSource.AuthToken, log.Logger) | ||
| if err != nil { | ||
| return fmt.Errorf("connect to celestia node: %w", err) | ||
| } | ||
| dataFetcher = nodeFetcher | ||
| proofFwd = nodeFetcher | ||
| default: | ||
| return fmt.Errorf("unsupported data source type: %q", cfg.DataSource.Type) | ||
| dataFetcher, proofFwd, err := openDataSource(ctx, cfg) | ||
| if err != nil { | ||
| return err | ||
| } | ||
|
Comment on lines
+234
to
237
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
| defer dataFetcher.Close() //nolint:errcheck | ||
|
|
||
|
|
@@ -263,7 +297,7 @@ func runIndexer(ctx context.Context, cfg *config.Config) error { | |
|
|
||
| err = coord.Run(ctx) | ||
|
|
||
| gracefulShutdown(httpSrv, grpcSrv, metricsSrv) | ||
| gracefulShutdown(httpSrv, grpcSrv, metricsSrv, profileSrv) | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
|
|
||
| if err != nil && !errors.Is(err, context.Canceled) { | ||
| return fmt.Errorf("coordinator: %w", err) | ||
|
|
@@ -273,7 +307,7 @@ func runIndexer(ctx context.Context, cfg *config.Config) error { | |
| return nil | ||
| } | ||
|
|
||
| func gracefulShutdown(httpSrv *http.Server, grpcSrv *grpc.Server, metricsSrv *metrics.Server) { | ||
| func gracefulShutdown(httpSrv *http.Server, grpcSrv *grpc.Server, metricsSrv *metrics.Server, profileSrv *profile.Server) { | ||
| stopped := make(chan struct{}) | ||
| go func() { | ||
| grpcSrv.GracefulStop() | ||
|
|
@@ -299,4 +333,9 @@ func gracefulShutdown(httpSrv *http.Server, grpcSrv *grpc.Server, metricsSrv *me | |
| log.Error().Err(err).Msg("metrics server shutdown error") | ||
| } | ||
| } | ||
| if profileSrv != nil { | ||
| if err := profileSrv.Shutdown(shutdownCtx); err != nil { | ||
| log.Error().Err(err).Msg("profiling server shutdown error") | ||
| } | ||
| } | ||
|
Comment on lines
+336
to
+340
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
| } | ||
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -14,6 +14,7 @@ type Config struct { | |||||||||||||||||||
| Sync SyncConfig `yaml:"sync"` | ||||||||||||||||||||
| Subscription SubscriptionConfig `yaml:"subscription"` | ||||||||||||||||||||
| Metrics MetricsConfig `yaml:"metrics"` | ||||||||||||||||||||
| Profiling ProfilingConfig `yaml:"profiling"` | ||||||||||||||||||||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||||||||||||||||||||
| Log LogConfig `yaml:"log"` | ||||||||||||||||||||
| } | ||||||||||||||||||||
|
|
||||||||||||||||||||
|
|
@@ -28,9 +29,24 @@ type DataSourceConfig struct { | |||||||||||||||||||
| Namespaces []string `yaml:"namespaces"` | ||||||||||||||||||||
| } | ||||||||||||||||||||
|
|
||||||||||||||||||||
| // StorageConfig configures the SQLite database. | ||||||||||||||||||||
| // StorageConfig configures the persistence backend. | ||||||||||||||||||||
| // Type selects the backend: "sqlite" (default) uses a local SQLite file, | ||||||||||||||||||||
| // "s3" uses an S3-compatible object store. | ||||||||||||||||||||
| type StorageConfig struct { | ||||||||||||||||||||
| DBPath string `yaml:"db_path"` | ||||||||||||||||||||
| Type string `yaml:"type"` // "sqlite" (default) or "s3" | ||||||||||||||||||||
| DBPath string `yaml:"db_path"` // SQLite path (used when type=sqlite) | ||||||||||||||||||||
| S3 *S3Config `yaml:"s3"` // S3 settings (used when type=s3) | ||||||||||||||||||||
|
Comment on lines
+36
to
+38
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The
Suggested change
|
||||||||||||||||||||
| } | ||||||||||||||||||||
|
|
||||||||||||||||||||
| // S3Config configures an S3-compatible object store backend. | ||||||||||||||||||||
| // Credentials are resolved via standard AWS SDK mechanisms | ||||||||||||||||||||
| // (env vars, IAM role, shared credentials file). | ||||||||||||||||||||
| type S3Config struct { | ||||||||||||||||||||
| Bucket string `yaml:"bucket"` | ||||||||||||||||||||
| Prefix string `yaml:"prefix"` | ||||||||||||||||||||
| Region string `yaml:"region"` | ||||||||||||||||||||
| Endpoint string `yaml:"endpoint"` // custom endpoint for MinIO, R2, Spaces | ||||||||||||||||||||
| ChunkSize int `yaml:"chunk_size"` // heights per S3 object, default 64 | ||||||||||||||||||||
| } | ||||||||||||||||||||
|
|
||||||||||||||||||||
| // RPCConfig configures the API servers. | ||||||||||||||||||||
|
|
@@ -57,6 +73,12 @@ type MetricsConfig struct { | |||||||||||||||||||
| ListenAddr string `yaml:"listen_addr"` | ||||||||||||||||||||
| } | ||||||||||||||||||||
|
|
||||||||||||||||||||
| // ProfilingConfig configures pprof profiling endpoints. | ||||||||||||||||||||
| type ProfilingConfig struct { | ||||||||||||||||||||
| Enabled bool `yaml:"enabled"` | ||||||||||||||||||||
| ListenAddr string `yaml:"listen_addr"` | ||||||||||||||||||||
| } | ||||||||||||||||||||
|
Comment on lines
+77
to
+80
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The
Suggested change
|
||||||||||||||||||||
|
|
||||||||||||||||||||
| // LogConfig configures logging. | ||||||||||||||||||||
| type LogConfig struct { | ||||||||||||||||||||
| Level string `yaml:"level"` | ||||||||||||||||||||
|
|
@@ -71,6 +93,7 @@ func DefaultConfig() Config { | |||||||||||||||||||
| CelestiaNodeURL: "http://localhost:26658", | ||||||||||||||||||||
| }, | ||||||||||||||||||||
| Storage: StorageConfig{ | ||||||||||||||||||||
| Type: "sqlite", | ||||||||||||||||||||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||||||||||||||||||||
| DBPath: "apex.db", | ||||||||||||||||||||
| }, | ||||||||||||||||||||
| RPC: RPCConfig{ | ||||||||||||||||||||
|
|
@@ -88,6 +111,10 @@ func DefaultConfig() Config { | |||||||||||||||||||
| Enabled: true, | ||||||||||||||||||||
| ListenAddr: ":9091", | ||||||||||||||||||||
| }, | ||||||||||||||||||||
| Profiling: ProfilingConfig{ | ||||||||||||||||||||
| Enabled: false, | ||||||||||||||||||||
| ListenAddr: "127.0.0.1:6061", | ||||||||||||||||||||
| }, | ||||||||||||||||||||
|
Comment on lines
+114
to
+117
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Default values for
Suggested change
|
||||||||||||||||||||
| Log: LogConfig{ | ||||||||||||||||||||
| Level: "info", | ||||||||||||||||||||
| Format: "json", | ||||||||||||||||||||
|
|
||||||||||||||||||||
| Original file line number | Diff line number | Diff line change | ||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -45,9 +45,20 @@ data_source: | |||||||||||||
| namespaces: [] | ||||||||||||||
|
|
||||||||||||||
| storage: | ||||||||||||||
| # Path to the SQLite database file | ||||||||||||||
| # Storage backend: "sqlite" (default) or "s3" | ||||||||||||||
| type: "sqlite" | ||||||||||||||
|
|
||||||||||||||
| # Path to the SQLite database file (used when type: "sqlite") | ||||||||||||||
| db_path: "apex.db" | ||||||||||||||
|
|
||||||||||||||
| # S3-compatible object store settings (used when type: "s3") | ||||||||||||||
| # s3: | ||||||||||||||
| # bucket: "my-apex-bucket" | ||||||||||||||
| # prefix: "indexer" | ||||||||||||||
| # region: "us-east-1" | ||||||||||||||
| # endpoint: "" # custom endpoint for MinIO, R2, etc. | ||||||||||||||
| # chunk_size: 64 # heights per S3 object | ||||||||||||||
|
|
||||||||||||||
| rpc: | ||||||||||||||
| # Address for the JSON-RPC API server (HTTP/WebSocket) | ||||||||||||||
| listen_addr: ":8080" | ||||||||||||||
|
|
@@ -72,6 +83,12 @@ metrics: | |||||||||||||
| # Address for the metrics server | ||||||||||||||
| listen_addr: ":9091" | ||||||||||||||
|
|
||||||||||||||
| profiling: | ||||||||||||||
| # Enable pprof endpoints (/debug/pprof/*) | ||||||||||||||
| enabled: false | ||||||||||||||
| # Bind address for profiling HTTP server (prefer loopback) | ||||||||||||||
| listen_addr: "127.0.0.1:6061" | ||||||||||||||
|
|
||||||||||||||
| log: | ||||||||||||||
| # Log level: trace, debug, info, warn, error, fatal, panic | ||||||||||||||
| level: "info" | ||||||||||||||
|
|
@@ -133,12 +150,40 @@ func validateDataSource(ds *DataSourceConfig) error { | |||||||||||||
| return nil | ||||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
| func validateStorage(s *StorageConfig) error { | ||||||||||||||
| switch s.Type { | ||||||||||||||
| case "s3": | ||||||||||||||
| if s.S3 == nil { | ||||||||||||||
| return fmt.Errorf("storage.s3 is required when storage.type is \"s3\"") | ||||||||||||||
| } | ||||||||||||||
| if s.S3.Bucket == "" { | ||||||||||||||
| return fmt.Errorf("storage.s3.bucket is required") | ||||||||||||||
| } | ||||||||||||||
| if s.S3.Region == "" && s.S3.Endpoint == "" { | ||||||||||||||
| return fmt.Errorf("storage.s3.region is required (unless endpoint is set)") | ||||||||||||||
| } | ||||||||||||||
| if s.S3.ChunkSize == 0 { | ||||||||||||||
| s.S3.ChunkSize = 64 | ||||||||||||||
| } | ||||||||||||||
| if s.S3.ChunkSize < 0 { | ||||||||||||||
| return fmt.Errorf("storage.s3.chunk_size must be positive") | ||||||||||||||
| } | ||||||||||||||
| case "sqlite", "": | ||||||||||||||
| if s.DBPath == "" { | ||||||||||||||
| return fmt.Errorf("storage.db_path is required") | ||||||||||||||
| } | ||||||||||||||
| default: | ||||||||||||||
| return fmt.Errorf("storage.type %q is invalid; must be \"sqlite\" or \"s3\"", s.Type) | ||||||||||||||
| } | ||||||||||||||
| return nil | ||||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
| func validate(cfg *Config) error { | ||||||||||||||
| if err := validateDataSource(&cfg.DataSource); err != nil { | ||||||||||||||
| return err | ||||||||||||||
| } | ||||||||||||||
| if cfg.Storage.DBPath == "" { | ||||||||||||||
| return fmt.Errorf("storage.db_path is required") | ||||||||||||||
| if err := validateStorage(&cfg.Storage); err != nil { | ||||||||||||||
| return err | ||||||||||||||
|
Comment on lines
+185
to
+186
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The
Suggested change
|
||||||||||||||
| } | ||||||||||||||
| if cfg.RPC.ListenAddr == "" { | ||||||||||||||
| return fmt.Errorf("rpc.listen_addr is required") | ||||||||||||||
|
|
@@ -158,6 +203,9 @@ func validate(cfg *Config) error { | |||||||||||||
| if cfg.Metrics.Enabled && cfg.Metrics.ListenAddr == "" { | ||||||||||||||
| return fmt.Errorf("metrics.listen_addr is required when metrics are enabled") | ||||||||||||||
| } | ||||||||||||||
| if cfg.Profiling.Enabled && cfg.Profiling.ListenAddr == "" { | ||||||||||||||
| return fmt.Errorf("profiling.listen_addr is required when profiling is enabled") | ||||||||||||||
| } | ||||||||||||||
|
Comment on lines
+206
to
+208
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. A new validation check is added for
Suggested change
|
||||||||||||||
| if !validLogLevels[cfg.Log.Level] { | ||||||||||||||
| return fmt.Errorf("log.level %q is invalid; must be one of trace/debug/info/warn/error/fatal/panic", cfg.Log.Level) | ||||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The
types.StripHexPrefixfunction is now applied toargs[1]fornsdecoding. This is a good improvement for robustness, as it handles cases where the input hex string might include a0xor0Xprefix.