Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
fa8bcf6
feat(s3): add e2e coverage for blob-backed API
pthmas Apr 28, 2026
d1eec81
fix(lint): simplify apex service setup wiring
pthmas Apr 29, 2026
d9453ce
Merge remote-tracking branch 'origin/main' into pthmas/s3-blob-export
pthmas Apr 29, 2026
f618715
feat(s3): add configurable sigv4 auth
pthmas Apr 29, 2026
8d1fb3e
warn on unauthenticated s3 startup
pthmas Apr 29, 2026
2e6da25
fix(s3): enforce read-only mode when submission is not configured
pthmas Apr 29, 2026
2867ed0
fix(s3): harden server, auth, and store correctness
pthmas Apr 29, 2026
6e7281b
docs(s3): clarify PutObject behavior for empty objects
pthmas Apr 29, 2026
d97437e
fix(s3): address critical routing/race bugs and medium hardening
pthmas Apr 29, 2026
e70b560
fix(s3): address critical routing/race bugs and medium hardening
pthmas Apr 30, 2026
529218f
feat(s3): submit commitment envelope to Celestia instead of raw data
pthmas Apr 30, 2026
a96ce16
fix(s3): validate payload hash and check bucket before Celestia submit
pthmas Apr 30, 2026
41fe108
fix(s3): address pagination, env-var credentials, and SQL limit
pthmas May 4, 2026
c868668
fix(e2e): verify envelope commitment on Celestia, not raw object data
pthmas May 4, 2026
0e2f36d
refactor(s3): remove unused fields, fix double ETag, clean dead code
pthmas May 4, 2026
3e61846
chore(s3): drop unused columns from migration
pthmas May 4, 2026
1222861
fix(s3): reject empty objects instead of storing them without Celesti…
pthmas May 4, 2026
ac46e3d
fix(e2e): remove pkg/s3 import to avoid proto conflict
pthmas May 4, 2026
5a3d2e0
fix(lint): fix gofumpt, perfsprint, unparam violations
pthmas May 4, 2026
c3bd226
fix(s3): address PR review comments
pthmas May 5, 2026
1a25d38
fix(s3): thread ctx into setupS3Server, close on startup failure
pthmas May 5, 2026
83ecea4
refactor(s3): extract startRPCServers to reduce gocyclo
pthmas May 5, 2026
e14fea8
chore(justfile): add e2e-s3 target for S3 lifecycle test
pthmas May 5, 2026
9451b9a
fix s3 empty upload and submitter wiring
pthmas May 5, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
171 changes: 138 additions & 33 deletions cmd/apex/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"net/http"
"os"
"os/signal"
"strings"
"syscall"
"time"

Expand All @@ -24,6 +25,7 @@ import (
"github.com/evstack/apex/pkg/fetch"
"github.com/evstack/apex/pkg/metrics"
"github.com/evstack/apex/pkg/profile"
apexs3 "github.com/evstack/apex/pkg/s3"
"github.com/evstack/apex/pkg/store"
"github.com/evstack/apex/pkg/submit"
syncer "github.com/evstack/apex/pkg/sync"
Expand Down Expand Up @@ -214,6 +216,83 @@ func setStoreMetrics(db store.Store, rec metrics.Recorder) {
}
}

func setupS3Server(ctx context.Context, cfg *config.Config, db store.Store, blobSubmitter submit.Submitter, log zerolog.Logger) (*http.Server, error) {
if !cfg.S3.Enabled {
return nil, nil
}

if cfg.S3.AccessKeyID == "" && cfg.S3.SecretAccessKey == "" {
warnLog := log.Warn().Str("addr", cfg.S3.ListenAddr)
if isLoopbackBindAddr(cfg.S3.ListenAddr) {
warnLog.Msg("S3 API authentication is disabled; restrict access to trusted local clients or set APEX_S3_ACCESS_KEY_ID and APEX_S3_SECRET_ACCESS_KEY")
} else {
warnLog.Msg("S3 API authentication is disabled on a non-loopback bind; set APEX_S3_ACCESS_KEY_ID and APEX_S3_SECRET_ACCESS_KEY or place Apex behind a trusted authenticated proxy")
}
}

var ns types.Namespace
if cfg.S3.Namespace != "" {
var err error
ns, err = types.NamespaceFromHex(cfg.S3.Namespace)
if err != nil {
return nil, fmt.Errorf("parse S3 namespace: %w", err)
}
}

sqliteDB, ok := db.(*store.SQLiteStore)
if !ok {
return nil, fmt.Errorf("S3 API requires SQLite store, got %T", db)
}

objStore := store.NewObjectStore(sqliteDB)
s3Svc := apexs3.NewService(objStore, blobSubmitter, ns)
s3Srv := apexs3.NewServer(s3Svc, cfg.S3.Region, cfg.S3.AccessKeyID, cfg.S3.SecretAccessKey, log)

lis, err := (&net.ListenConfig{}).Listen(ctx, "tcp", cfg.S3.ListenAddr)
if err != nil {
return nil, fmt.Errorf("S3 API: listen %s: %w", cfg.S3.ListenAddr, err)
}

httpSrv := &http.Server{
Handler: s3Srv,
ReadHeaderTimeout: 10 * time.Second,
}

go func() {
log.Info().Str("addr", cfg.S3.ListenAddr).Msg("S3 API server listening")
if err := httpSrv.Serve(lis); err != nil && !errors.Is(err, http.ErrServerClosed) {
log.Error().Err(err).Msg("S3 API server error")
}
}()
Comment thread
coderabbitai[bot] marked this conversation as resolved.

return httpSrv, nil
}

func isLoopbackBindAddr(addr string) bool {
addr = strings.TrimSpace(addr)
if addr == "" {
return false
}
// Unix sockets are always local.
if strings.HasPrefix(addr, "/") || strings.HasPrefix(addr, "unix:") {
return true
}
host := addr
if h, _, err := net.SplitHostPort(addr); err == nil {
host = h
}
host = strings.Trim(host, "[]") // strip IPv6 brackets
if host == "" {
return false // bare ":port" binds all interfaces
}
lower := strings.ToLower(host)
if lower == "localhost" || strings.HasSuffix(lower, ".localhost") {
return true
}
ip := net.ParseIP(host)
return ip != nil && ip.IsLoopback()
}

func persistNamespaces(ctx context.Context, db store.Store, namespaces []types.Namespace) error {
for _, ns := range namespaces {
if err := db.PutNamespace(ctx, ns); err != nil {
Expand Down Expand Up @@ -282,22 +361,59 @@ func runIndexer(ctx context.Context, cfg *config.Config) error {
}
defer dataFetcher.Close() //nolint:errcheck

svc, notifier, closeSubmitter, err := setupAPIService(cfg, db, dataFetcher, proofFwd, rec)
directSubmitter, err := openBlobSubmitter(cfg)
if err != nil {
return err
}
defer closeSubmitter()

blobSubmitter := normalizeBlobSubmitter(directSubmitter)
if directSubmitter != nil {
defer directSubmitter.Close() //nolint:errcheck
}

// Setup S3 API server if enabled.
s3Srv, err := setupS3Server(ctx, cfg, db, blobSubmitter, log.Logger)
if err != nil {
return fmt.Errorf("setup S3 server: %w", err)
}

svc, notifier := setupAPIService(cfg, db, dataFetcher, proofFwd, rec, blobSubmitter)

// Build and run the sync coordinator with observer hook.
coordOpts, closeBackfill, err := buildCoordinatorOptions(cfg, notifier, rec)
if err != nil {
if s3Srv != nil {
_ = s3Srv.Close()
}
return err
}
defer closeBackfill()

coord := syncer.New(db, dataFetcher, coordOpts...)

// Build HTTP mux: mount health endpoints alongside JSON-RPC.
httpSrv, grpcSrv, err := startRPCServers(ctx, cfg, svc, coord, db, notifier, s3Srv)
if err != nil {
return err
}

log.Info().
Int("namespaces", len(namespaces)).
Uint64("start_height", cfg.Sync.StartHeight).
Msg("sync coordinator starting")

err = coord.Run(ctx)

gracefulShutdown(httpSrv, grpcSrv, metricsSrv, profileSrv, s3Srv)

if err != nil && !errors.Is(err, context.Canceled) {
return fmt.Errorf("coordinator: %w", err)
}

log.Info().Msg("apex indexer stopped")
return nil
}

func startRPCServers(ctx context.Context, cfg *config.Config, svc *api.Service, coord *syncer.Coordinator, db store.Store, notifier *api.Notifier, s3Srv *http.Server) (*http.Server, *grpc.Server, error) {
rpcServer := jsonrpcapi.NewServer(svc, log.Logger)
healthHandler := api.NewHealthHandler(coord, db, notifier, version)

Expand All @@ -320,12 +436,14 @@ func runIndexer(ctx context.Context, cfg *config.Config) error {
}
}()

// Start gRPC server.
grpcSrv := grpcapi.NewServer(svc, log.Logger)
lis, err := (&net.ListenConfig{}).Listen(ctx, "tcp", cfg.RPC.GRPCListenAddr)
if err != nil {
_ = httpSrv.Close()
return fmt.Errorf("listen gRPC: %w", err)
if s3Srv != nil {
_ = s3Srv.Close()
}
return nil, nil, fmt.Errorf("listen gRPC: %w", err)
}

go func() {
Expand All @@ -335,21 +453,7 @@ func runIndexer(ctx context.Context, cfg *config.Config) error {
}
}()

log.Info().
Int("namespaces", len(namespaces)).
Uint64("start_height", cfg.Sync.StartHeight).
Msg("sync coordinator starting")

err = coord.Run(ctx)

gracefulShutdown(httpSrv, grpcSrv, metricsSrv, profileSrv)

if err != nil && !errors.Is(err, context.Canceled) {
return fmt.Errorf("coordinator: %w", err)
}

log.Info().Msg("apex indexer stopped")
return nil
return httpSrv, grpcSrv, nil
}

func openBlobSubmitter(cfg *config.Config) (*submit.DirectSubmitter, error) {
Expand Down Expand Up @@ -385,19 +489,14 @@ func openBlobSubmitter(cfg *config.Config) (*submit.DirectSubmitter, error) {
return blobSubmitter, nil
}

func setupAPIService(cfg *config.Config, db store.Store, dataFetcher fetch.DataFetcher, proofFwd fetch.ProofForwarder, rec metrics.Recorder) (*api.Service, *api.Notifier, func(), error) {
blobSubmitter, err := openBlobSubmitter(cfg)
if err != nil {
return nil, nil, nil, err
}

closeSubmitter := func() {}
if blobSubmitter != nil {
closeSubmitter = func() {
_ = blobSubmitter.Close()
}
func normalizeBlobSubmitter(directSubmitter *submit.DirectSubmitter) submit.Submitter {
if directSubmitter == nil {
return nil
}
return directSubmitter
}

func setupAPIService(cfg *config.Config, db store.Store, dataFetcher fetch.DataFetcher, proofFwd fetch.ProofForwarder, rec metrics.Recorder, blobSubmitter submit.Submitter) (*api.Service, *api.Notifier) {
notifier := api.NewNotifier(cfg.Subscription.BufferSize, cfg.Subscription.MaxSubscribers, log.Logger)
notifier.SetMetrics(rec)

Expand All @@ -407,7 +506,7 @@ func setupAPIService(cfg *config.Config, db store.Store, dataFetcher fetch.DataF
}

svc := api.NewService(db, dataFetcher, proofFwd, notifier, log.Logger, svcOpts...)
return svc, notifier, closeSubmitter, nil
return svc, notifier
}

func buildCoordinatorOptions(cfg *config.Config, notifier *api.Notifier, rec metrics.Recorder) ([]syncer.Option, func(), error) {
Expand Down Expand Up @@ -436,7 +535,7 @@ func buildCoordinatorOptions(cfg *config.Config, notifier *api.Notifier, rec met
return coordOpts, closeBackfill, nil
}

func gracefulShutdown(httpSrv *http.Server, grpcSrv *grpc.Server, metricsSrv *metrics.Server, profileSrv *profile.Server) {
func gracefulShutdown(httpSrv *http.Server, grpcSrv *grpc.Server, metricsSrv *metrics.Server, profileSrv *profile.Server, s3Srv *http.Server) {
stopped := make(chan struct{})
go func() {
grpcSrv.GracefulStop()
Expand All @@ -457,6 +556,12 @@ func gracefulShutdown(httpSrv *http.Server, grpcSrv *grpc.Server, metricsSrv *me
log.Error().Err(err).Msg("JSON-RPC server shutdown error")
}

if s3Srv != nil {
if err := s3Srv.Shutdown(shutdownCtx); err != nil {
log.Error().Err(err).Msg("S3 API server shutdown error")
}
}

if metricsSrv != nil {
if err := metricsSrv.Shutdown(shutdownCtx); err != nil {
log.Error().Err(err).Msg("metrics server shutdown error")
Expand Down
47 changes: 47 additions & 0 deletions cmd/apex/main_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package main

import (
"testing"

"github.com/evstack/apex/pkg/submit"
)

func TestIsLoopbackBindAddr(t *testing.T) {
t.Parallel()

tests := []struct {
addr string
want bool
}{
{addr: "127.0.0.1:8333", want: true},
{addr: "[::1]:8333", want: true},
{addr: "localhost:8333", want: true},
{addr: "api.localhost:8333", want: true},
{addr: "unix:///tmp/apex.sock", want: true},
{addr: "/tmp/apex.sock", want: true},
{addr: ":8333", want: false},
{addr: "0.0.0.0:8333", want: false},
{addr: "[::]:8333", want: false},
{addr: "apex.example.com:8333", want: false},
{addr: "", want: false},
}

for _, tt := range tests {
t.Run(tt.addr, func(t *testing.T) {
t.Parallel()

if got := isLoopbackBindAddr(tt.addr); got != tt.want {
t.Fatalf("isLoopbackBindAddr(%q) = %v, want %v", tt.addr, got, tt.want)
}
})
}
}

func TestNormalizeBlobSubmitter(t *testing.T) {
t.Parallel()

var directSubmitter *submit.DirectSubmitter
if got := normalizeBlobSubmitter(directSubmitter); got != nil {
t.Fatalf("normalizeBlobSubmitter(nil) = %#v, want nil", got)
}
}
18 changes: 18 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ type Config struct {
Profiling ProfilingConfig `yaml:"profiling"`
Log LogConfig `yaml:"log"`
Submission SubmissionConfig `yaml:"submission"`
S3 S3APIConfig `yaml:"s3"`
}

// DataSourceConfig configures the Celestia data source.
Expand Down Expand Up @@ -94,6 +95,18 @@ type LogConfig struct {
Format string `yaml:"format"`
}

// S3APIConfig configures the S3-compatible API server.
type S3APIConfig struct {
Enabled bool `yaml:"enabled"`
ListenAddr string `yaml:"listen_addr"`
Region string `yaml:"region"`
Namespace string `yaml:"namespace"` // Celestia namespace for S3 objects (hex)
// AccessKeyID and SecretAccessKey are not read from YAML; set via
// APEX_S3_ACCESS_KEY_ID and APEX_S3_SECRET_ACCESS_KEY env vars.
AccessKeyID string `yaml:"-"`
SecretAccessKey string `yaml:"-"`
}

// SubmissionConfig contains settings for the future blob submission pipeline.
type SubmissionConfig struct {
Enabled bool `yaml:"enabled"`
Expand Down Expand Up @@ -155,6 +168,11 @@ func DefaultConfig() Config {
Level: "info",
Format: "json",
},
S3: S3APIConfig{
Enabled: false,
ListenAddr: ":8333",
Region: "us-east-1",
},
}
}

Expand Down
Loading
Loading