Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion aggregator/cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func main() {
}
messageDisablementRulesDB = db
sqlxDB := sqlx.NewDb(db, "postgres")
store := postgres.NewDatabaseStorage(sqlxDB, cfg.Storage.PageSize, cfg.Storage.QueryTimeout, sugaredLggr)
store := postgres.NewDatabaseStorage(sqlxDB, cfg.Storage.PageSize, time.Duration(cfg.Storage.QueryTimeout), sugaredLggr)
messageDisablementRulesDeps = messagedisablementcli.Deps{
Logger: lggr,
Store: store,
Expand Down
23 changes: 12 additions & 11 deletions aggregator/pkg/model/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"time"

"github.com/smartcontractkit/chainlink-ccv/aggregator/pkg/auth"
"github.com/smartcontractkit/chainlink-ccv/common"
"github.com/smartcontractkit/chainlink-ccv/pkg/monitoring"
"github.com/smartcontractkit/chainlink-ccv/protocol"
hmacutil "github.com/smartcontractkit/chainlink-ccv/protocol/common/hmac"
Expand Down Expand Up @@ -108,14 +109,14 @@ const (

// StorageConfig represents the configuration for the storage backend.
type StorageConfig struct {
StorageType StorageType `toml:"type"`
ConnectionURL string `toml:"-"`
PageSize int `toml:"pageSize"`
MaxOpenConns int `toml:"maxOpenConns"`
MaxIdleConns int `toml:"maxIdleConns"`
ConnMaxLifetime time.Duration `toml:"connMaxLifetime"`
ConnMaxIdleTime time.Duration `toml:"connMaxIdleTime"`
QueryTimeout time.Duration `toml:"queryTimeout"`
StorageType StorageType `toml:"type"`
ConnectionURL string `toml:"-"`
PageSize int `toml:"pageSize"`
MaxOpenConns int `toml:"maxOpenConns"`
MaxIdleConns int `toml:"maxIdleConns"`
ConnMaxLifetime common.Duration `toml:"connMaxLifetime"`
ConnMaxIdleTime common.Duration `toml:"connMaxIdleTime"`
QueryTimeout common.Duration `toml:"queryTimeout"`
}

// ServerConfig represents the configuration for the server.
Expand Down Expand Up @@ -534,14 +535,14 @@ func (c *AggregatorConfig) SetDefaults() {
c.Storage.MaxIdleConns = 5
}
if c.Storage.ConnMaxLifetime == 0 {
c.Storage.ConnMaxLifetime = time.Hour
c.Storage.ConnMaxLifetime = common.Duration(time.Hour)
}
if c.Storage.ConnMaxIdleTime == 0 {
c.Storage.ConnMaxIdleTime = 5 * time.Minute
c.Storage.ConnMaxIdleTime = common.Duration(5 * time.Minute)
}
// Default query timeout: 10 seconds
if c.Storage.QueryTimeout == 0 {
c.Storage.QueryTimeout = 10 * time.Second
c.Storage.QueryTimeout = common.Duration(10 * time.Second)
}

// Default message-disablement registry refresh: 30 seconds
Expand Down
13 changes: 7 additions & 6 deletions aggregator/pkg/model/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/stretchr/testify/require"

"github.com/smartcontractkit/chainlink-ccv/aggregator/pkg/auth"
"github.com/smartcontractkit/chainlink-ccv/common"
hmacutil "github.com/smartcontractkit/chainlink-ccv/protocol/common/hmac"
)

Expand Down Expand Up @@ -71,16 +72,16 @@ func TestSetDefaults(t *testing.T) {
assert.Equal(t, 100, cfg.Storage.PageSize)
assert.Equal(t, 25, cfg.Storage.MaxOpenConns)
assert.Equal(t, 5, cfg.Storage.MaxIdleConns)
assert.Equal(t, time.Hour, cfg.Storage.ConnMaxLifetime)
assert.Equal(t, 5*time.Minute, cfg.Storage.ConnMaxIdleTime)
assert.Equal(t, common.Duration(time.Hour), cfg.Storage.ConnMaxLifetime)
assert.Equal(t, common.Duration(5*time.Minute), cfg.Storage.ConnMaxIdleTime)
assert.Equal(t, 5*time.Minute, cfg.OrphanRecovery.Interval)
assert.Equal(t, 168*time.Hour, cfg.OrphanRecovery.MaxAge)
assert.Equal(t, "8080", cfg.HealthCheck.Port)
assert.Equal(t, 10*time.Second, cfg.Server.RequestTimeout)
assert.Equal(t, 5*time.Second, cfg.Aggregation.CheckAggregationTimeout)
assert.Equal(t, 10*time.Second, cfg.Aggregation.DrainTimeout)
assert.Equal(t, 5*time.Second, cfg.OrphanRecovery.CheckAggregationTimeout)
assert.Equal(t, 10*time.Second, cfg.Storage.QueryTimeout)
assert.Equal(t, common.Duration(10*time.Second), cfg.Storage.QueryTimeout)
assert.Equal(t, uint(3), cfg.OrphanRecovery.MaxConsecutiveErrors)
assert.Equal(t, 4*time.Minute, cfg.OrphanRecovery.ScanTimeout)
assert.Equal(t, 10000, cfg.OrphanRecovery.MaxOrphansPerScan)
Expand Down Expand Up @@ -394,19 +395,19 @@ func TestValidateStorageConfig(t *testing.T) {
},
{
name: "negative conn max lifetime fails",
config: &StorageConfig{PageSize: 100, MaxOpenConns: 25, MaxIdleConns: 5, ConnMaxLifetime: -1 * time.Second},
config: &StorageConfig{PageSize: 100, MaxOpenConns: 25, MaxIdleConns: 5, ConnMaxLifetime: common.Duration(-1 * time.Second)},
expectError: true,
errorMsg: "connMaxLifetime cannot be negative",
},
{
name: "negative conn max idle time fails",
config: &StorageConfig{PageSize: 100, MaxOpenConns: 25, MaxIdleConns: 5, ConnMaxIdleTime: -1 * time.Second},
config: &StorageConfig{PageSize: 100, MaxOpenConns: 25, MaxIdleConns: 5, ConnMaxIdleTime: common.Duration(-1 * time.Second)},
expectError: true,
errorMsg: "connMaxIdleTime cannot be negative",
},
{
name: "negative query timeout fails",
config: &StorageConfig{PageSize: 100, MaxOpenConns: 25, MaxIdleConns: 5, QueryTimeout: -1 * time.Second},
config: &StorageConfig{PageSize: 100, MaxOpenConns: 25, MaxIdleConns: 5, QueryTimeout: common.Duration(-1 * time.Second)},
expectError: true,
errorMsg: "queryTimeout cannot be negative",
},
Expand Down
6 changes: 3 additions & 3 deletions aggregator/pkg/storage/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,13 +77,13 @@ func (f *Factory) createPostgreSQLStorage(config *model.StorageConfig) (CommitVe
}
db.SetMaxIdleConns(maxIdleConns)

connMaxLifetime := config.ConnMaxLifetime
connMaxLifetime := time.Duration(config.ConnMaxLifetime)
if connMaxLifetime <= 0 {
connMaxLifetime = defaultConnMaxLifetime
}
db.SetConnMaxLifetime(connMaxLifetime)

connMaxIdleTime := config.ConnMaxIdleTime
connMaxIdleTime := time.Duration(config.ConnMaxIdleTime)
if connMaxIdleTime <= 0 {
connMaxIdleTime = defaultConnMaxIdleTime
}
Expand All @@ -109,5 +109,5 @@ func (f *Factory) createPostgreSQLStorage(config *model.StorageConfig) (CommitVe
return nil, fmt.Errorf("failed to run PostgreSQL migrations: %w", err)
}

return postgres.NewDatabaseStorage(sqlxDB, config.PageSize, config.QueryTimeout, f.logger), nil
return postgres.NewDatabaseStorage(sqlxDB, config.PageSize, time.Duration(config.QueryTimeout), f.logger), nil
}
30 changes: 30 additions & 0 deletions common/toml.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,36 @@ import (
"github.com/smartcontractkit/chainlink-ccv/protocol"
)

// Duration is a TOML-aware time.Duration that rejects bare integer values.
// In TOML config files, durations must be quoted strings (e.g. "30m", "5s").
// This prevents silent misinterpretation: a bare TOML integer would be treated
// as nanoseconds by the standard time.Duration unmarshaler.
type Duration time.Duration

func (d *Duration) UnmarshalTOML(val any) error {
s, ok := val.(string)
if !ok {
return fmt.Errorf("duration must be a quoted string (e.g. \"30m\", \"5s\"), got %T, %v", val, val)
}
dur, err := time.ParseDuration(s)
if err != nil {
return fmt.Errorf("invalid duration %q: %w", s, err)
}
*d = Duration(dur)
return nil
}

// MarshalTOML implements toml.Marshaler so round-trip marshal/unmarshal
// produces a quoted string (e.g. "1h0m0s") rather than a bare int64.
func (d Duration) MarshalTOML() ([]byte, error) {
return []byte(strconv.Quote(time.Duration(d).String())), nil
}

// String implements fmt.Stringer.
func (d Duration) String() string {
return time.Duration(d).String()
}

func ParseIntOrDefault(val any, defaultVal int) (int, error) {
switch v := val.(type) {
case int:
Expand Down
2 changes: 1 addition & 1 deletion indexer/cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,7 @@ func createPostgresStorage(ctx context.Context, lggr logger.Logger, cfg *config.
}

// Create a new postgres storage
dbStore, err := storage.NewPostgresStorage(ctx, lggr, indexerMonitoring, cfg.URI, pg.DriverPostgres, dbConfig)
dbStore, err := storage.NewPostgresStorage(ctx, lggr, indexerMonitoring, cfg.URI, pg.DriverPostgres, dbConfig, time.Duration(cfg.ConnMaxLifetime), time.Duration(cfg.ConnMaxIdleTime))
if err != nil {
lggr.Fatalf("Failed to create postgres storage: %v", err)
}
Expand Down
6 changes: 3 additions & 3 deletions indexer/cmd/replay/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -327,12 +327,12 @@ func mustBuildEngine(ctx context.Context, needsDiscoveryReader bool) (*replay.En
lggr.Warnf("Error closing migration database connection: %v", err)
}

replayStore, err := replay.NewStoreFromConfig(ctx, lggr, pgCfg.URI, dbConfig)
replayStore, err := replay.NewStoreFromConfig(ctx, lggr, pgCfg.URI, dbConfig, time.Duration(pgCfg.ConnMaxLifetime), time.Duration(pgCfg.ConnMaxIdleTime))
if err != nil {
lggr.Fatalf("Failed to create replay store: %v", err)
}

indexerStorage, err := storage.NewPostgresStorage(ctx, lggr, indexerMonitoring, pgCfg.URI, pg.DriverPostgres, dbConfig)
indexerStorage, err := storage.NewPostgresStorage(ctx, lggr, indexerMonitoring, pgCfg.URI, pg.DriverPostgres, dbConfig, time.Duration(pgCfg.ConnMaxLifetime), time.Duration(pgCfg.ConnMaxIdleTime))
if err != nil {
lggr.Fatalf("Failed to create indexer storage: %v", err)
}
Expand Down Expand Up @@ -391,7 +391,7 @@ func mustBuildStore(ctx context.Context) *replay.Store {
LockTimeout: time.Duration(pgCfg.LockTimeout) * time.Second,
}

store, err := replay.NewStoreFromConfig(ctx, lggr, pgCfg.URI, dbConfig)
store, err := replay.NewStoreFromConfig(ctx, lggr, pgCfg.URI, dbConfig, time.Duration(pgCfg.ConnMaxLifetime), time.Duration(pgCfg.ConnMaxIdleTime))
if err != nil {
lggr.Fatalf("Failed to create store: %v", err)
}
Expand Down
20 changes: 20 additions & 0 deletions indexer/pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@ import (
"errors"
"fmt"
"os"
"time"

"github.com/BurntSushi/toml"

"github.com/smartcontractkit/chainlink-ccv/common"
hmacutil "github.com/smartcontractkit/chainlink-ccv/protocol/common/hmac"
)

Expand Down Expand Up @@ -110,6 +112,10 @@ type PostgresConfig struct {
MaxOpenConnections int `toml:"MaxOpenConnections"`
// MaxIdleConnections is the maximum number of idle connections to the database.
MaxIdleConnections int `toml:"MaxIdleConnections"`
// ConnMaxLifetime is the maximum lifetime of a connection. 0 defaults to 30 minutes.
ConnMaxLifetime common.Duration `toml:"ConnMaxLifetime"`
// ConnMaxIdleTime is the maximum idle time of a connection. 0 defaults to 5 minutes.
ConnMaxIdleTime common.Duration `toml:"ConnMaxIdleTime"`
// IdleInTxSessionTimeout is the idle_in_transaction_session_timeout in seconds.
IdleInTxSessionTimeout int64 `toml:"IdleInTxSessionTimeout"`
// LockTimeout is the lock_timeout in seconds.
Expand Down Expand Up @@ -459,6 +465,20 @@ func (p *PostgresConfig) Validate() error {
return fmt.Errorf("postgres max_idle_connections (%d) cannot be greater than max_open_connections (%d)", p.MaxIdleConnections, p.MaxOpenConnections)
}

if p.ConnMaxLifetime < 0 {
return fmt.Errorf("postgres conn_max_lifetime must be non-negative, got %s", p.ConnMaxLifetime)
}
if p.ConnMaxLifetime == 0 {
p.ConnMaxLifetime = common.Duration(30 * time.Minute)
}

if p.ConnMaxIdleTime < 0 {
return fmt.Errorf("postgres conn_max_idle_time must be non-negative, got %s", p.ConnMaxIdleTime)
}
if p.ConnMaxIdleTime == 0 {
p.ConnMaxIdleTime = common.Duration(5 * time.Minute)
}
Comment thread
bukata-sa marked this conversation as resolved.

if p.IdleInTxSessionTimeout < 0 {
return fmt.Errorf("postgres idle_in_tx_session_timeout must be non-negative, got %d", p.IdleInTxSessionTimeout)
}
Expand Down
83 changes: 83 additions & 0 deletions indexer/pkg/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,12 @@ package config

import (
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/smartcontractkit/chainlink-ccv/common"
)

func TestValidateMaxResponseBytes(t *testing.T) {
Expand Down Expand Up @@ -242,6 +245,86 @@ func validPostgresConfig() *PostgresConfig {
}
}

func TestPostgresConfigValidate_ConnMaxLifetime(t *testing.T) {
tests := []struct {
name string
connMaxLifetime common.Duration
wantErr string
wantDefault common.Duration
}{
{
name: "zero defaults to 30min",
wantDefault: common.Duration(30 * time.Minute),
},
{
name: "positive value is accepted",
connMaxLifetime: common.Duration(time.Hour),
},
{
name: "negative value is rejected",
connMaxLifetime: common.Duration(-1 * time.Second),
wantErr: "postgres conn_max_lifetime must be non-negative, got -1s",
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
cfg := validPostgresConfig()
cfg.ConnMaxLifetime = tt.connMaxLifetime
err := cfg.Validate()
if tt.wantErr != "" {
require.Error(t, err)
assert.Contains(t, err.Error(), tt.wantErr)
return
}
require.NoError(t, err)
if tt.wantDefault != 0 {
assert.Equal(t, tt.wantDefault, cfg.ConnMaxLifetime)
}
})
}
}

func TestPostgresConfigValidate_ConnMaxIdleTime(t *testing.T) {
tests := []struct {
name string
connMaxIdleTime common.Duration
wantErr string
wantDefault common.Duration
}{
{
name: "zero defaults to 5min",
wantDefault: common.Duration(5 * time.Minute),
},
{
name: "positive value is accepted",
connMaxIdleTime: common.Duration(time.Hour),
},
{
name: "negative value is rejected",
connMaxIdleTime: common.Duration(-1 * time.Second),
wantErr: "postgres conn_max_idle_time must be non-negative, got -1s",
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
cfg := validPostgresConfig()
cfg.ConnMaxIdleTime = tt.connMaxIdleTime
err := cfg.Validate()
if tt.wantErr != "" {
require.Error(t, err)
assert.Contains(t, err.Error(), tt.wantErr)
return
}
require.NoError(t, err)
if tt.wantDefault != 0 {
assert.Equal(t, tt.wantDefault, cfg.ConnMaxIdleTime)
}
})
}
}

func TestStorageConfigValidate(t *testing.T) {
tests := []struct {
name string
Expand Down
8 changes: 5 additions & 3 deletions indexer/pkg/replay/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,14 @@ func NewStore(ds sqlutil.DataSource, lggr logger.Logger) *Store {
}

// NewStoreFromConfig creates a Store with its own Postgres connection pool.
func NewStoreFromConfig(ctx context.Context, lggr logger.Logger, uri string, dbConfig pg.DBConfig) (*Store, error) {
ds, err := dbConfig.New(ctx, uri, pg.DriverPostgres)
func NewStoreFromConfig(ctx context.Context, lggr logger.Logger, uri string, dbConfig pg.DBConfig, connMaxLifetime, connMaxIdleTime time.Duration) (*Store, error) {
db, err := dbConfig.New(ctx, uri, pg.DriverPostgres)
if err != nil {
return nil, fmt.Errorf("failed to open replay store connection: %w", err)
}
return &Store{ds: ds, lggr: lggr}, nil
db.SetConnMaxLifetime(connMaxLifetime)
db.SetConnMaxIdleTime(connMaxIdleTime)
return &Store{ds: db, lggr: lggr}, nil
}

// DataSource returns the underlying DataSource so the engine can pass it
Expand Down
8 changes: 5 additions & 3 deletions indexer/pkg/storage/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ type PostgresStorage struct {
monitoring common.IndexerMonitoring
}

func NewPostgresStorage(ctx context.Context, lggr logger.Logger, monitoring common.IndexerMonitoring, uri, driverName string, config pg.DBConfig) (*PostgresStorage, error) {
func NewPostgresStorage(ctx context.Context, lggr logger.Logger, monitoring common.IndexerMonitoring, uri, driverName string, config pg.DBConfig, connMaxLifetime, connMaxIdleTime time.Duration) (*PostgresStorage, error) {
if lggr == nil {
return nil, fmt.Errorf("logger is required")
}
Expand All @@ -76,13 +76,15 @@ func NewPostgresStorage(ctx context.Context, lggr logger.Logger, monitoring comm
if driverName == "" {
return nil, fmt.Errorf("database driver name is required")
}
ds, err := config.New(ctx, uri, driverName)
db, err := config.New(ctx, uri, driverName)
if err != nil {
return nil, fmt.Errorf("failed to open database connection: %w", err)
}
db.SetConnMaxLifetime(connMaxLifetime)
db.SetConnMaxIdleTime(connMaxIdleTime)

return &PostgresStorage{
ds: ds,
ds: db,
lggr: lggr,
monitoring: monitoring,
}, nil
Expand Down
2 changes: 1 addition & 1 deletion indexer/pkg/storage/postgres_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func TestNewPostgresStorage_Errors(t *testing.T) {

for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
_, err := NewPostgresStorage(context.Background(), tc.lggr, tc.monitoring, tc.uri, tc.driverName, pg.DBConfig{})
_, err := NewPostgresStorage(context.Background(), tc.lggr, tc.monitoring, tc.uri, tc.driverName, pg.DBConfig{}, 0, 0)
require.Error(t, err)
require.Contains(t, err.Error(), tc.wantErrSubstr)
})
Expand Down
Loading