Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 85 additions & 0 deletions cmd/pgwatch/main_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -471,3 +471,88 @@ presets:
<-mainCtx.Done()
assert.Equal(t, cmdopts.ExitCodeOK, gotExit)
}

// TestMain_Integration_PartitionPrecreation tests that pgwatch does not create
// excessive partitions when restarted multiple times if they are not needed.
func TestMain_Integration_PartitionPrecreation(t *testing.T) {
tempDir := t.TempDir()

pg, tearDown, err := testutil.SetupPostgresContainer()
require.NoError(t, err)
defer tearDown()

connStr, err := pg.ConnectionString(testutil.TestContext, "sslmode=disable")
require.NoError(t, err)

var gotExit int32
Exit = func(code int) { gotExit = int32(code) }
defer func() { Exit = os.Exit }()

metricsYaml := filepath.Join(tempDir, "metrics.yaml")
sourcesYaml := filepath.Join(tempDir, "sources.yaml")

require.NoError(t, os.WriteFile(metricsYaml, []byte(`
metrics:
partition_test_metric:
sqls:
11: select (extract(epoch from now()) * 1e9)::int8 as epoch_ns, 1 as value
`), 0644))

require.NoError(t, os.WriteFile(sourcesYaml, []byte(`
- name: partition_test_source
conn_str: `+connStr+`
kind: postgres
is_enabled: true
custom_metrics:
partition_test_metric: 300
`), 0644))

sinkConn, err := pgx.Connect(context.Background(), connStr)
require.NoError(t, err)
defer sinkConn.Close(context.Background())

os.Args = []string{
"pgwatch",
"--metrics", metricsYaml,
"--sources", sourcesYaml,
"--sink", connStr,
"--web-disable",
}

// Restart pgwatch multiple times to trigger partition precreation
for range 5 {
mainCtx, cancel = context.WithCancel(context.Background())
go main()

// Wait for partitions to be created
time.Sleep(2 * time.Second)

cancel()
<-mainCtx.Done()
assert.Equal(t, cmdopts.ExitCodeOK, gotExit, "expected exit code 0")
}

// Query the number of partitions created for our specific metric
var partitionCount int
err = sinkConn.QueryRow(context.Background(), `
SELECT count(*)
FROM pg_class c
JOIN pg_inherits i ON c.oid = i.inhrelid
WHERE c.relkind IN ('r', 'p')
AND c.relnamespace = 'subpartitions'::regnamespace
AND pg_catalog.obj_description(c.oid, 'pg_class') IN ('pgwatch-generated-metric-time-lvl', 'pgwatch-generated-metric-dbname-time-lvl')
AND c.oid::regclass::text LIKE '%partition_test_metric_partition_test_source%'
`).Scan(&partitionCount)
require.NoError(t, err)

// Should not have more than 4 partitions despite multiple restarts
assert.Equal(t, 4, partitionCount,
"expected 4 partitions for partition_test_metric after multiple consecutive restarts, but found %d", partitionCount)

// Also verify that metrics were actually collected
var metricCount int
err = sinkConn.QueryRow(context.Background(),
`SELECT count(*) FROM public.partition_test_metric WHERE dbname = 'partition_test_source'`).Scan(&metricCount)
require.NoError(t, err)
assert.Greater(t, metricCount, 0, "expected some metrics to be collected")
}
68 changes: 34 additions & 34 deletions internal/sinks/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,14 +60,17 @@ var (
// However, one is able to use any Postgres-compatible database as a storage backend,
// e.g. PGEE, Citus, Greenplum, CockroachDB, etc.
type PostgresWriter struct {
ctx context.Context
sinkDb db.PgxPoolIface
metricSchema DbStorageSchemaType
opts *CmdOpts
retentionInterval time.Duration
maintenanceInterval time.Duration
input chan metrics.MeasurementEnvelope
lastError chan error
ctx context.Context
sinkDb db.PgxPoolIface
metricSchema DbStorageSchemaType
opts *CmdOpts
retentionInterval time.Duration
maintenanceInterval time.Duration
input chan metrics.MeasurementEnvelope
lastError chan error
forceRecreatePartitions bool // to signal override PG metrics storage cache
partitionMapMetric map[string]ExistingPartitionInfo // metric = min/max bounds
partitionMapMetricDbname map[string]map[string]ExistingPartitionInfo // metric[dbname = min/max bounds]
}

func NewPostgresWriter(ctx context.Context, connstr string, opts *CmdOpts) (pgw *PostgresWriter, err error) {
Expand All @@ -84,11 +87,14 @@ func NewWriterFromPostgresConn(ctx context.Context, conn db.PgxPoolIface, opts *
l := log.GetLogger(ctx).WithField("sink", "postgres").WithField("db", conn.Config().ConnConfig.Database)
ctx = log.WithLogger(ctx, l)
pgw = &PostgresWriter{
ctx: ctx,
opts: opts,
input: make(chan metrics.MeasurementEnvelope, cacheLimit),
lastError: make(chan error),
sinkDb: conn,
ctx: ctx,
opts: opts,
input: make(chan metrics.MeasurementEnvelope, cacheLimit),
lastError: make(chan error),
sinkDb: conn,
forceRecreatePartitions: false,
partitionMapMetric: make(map[string]ExistingPartitionInfo),
partitionMapMetricDbname: make(map[string]map[string]ExistingPartitionInfo),
}
l.Info("initialising measurements database...")
if err = pgw.init(); err != nil {
Expand Down Expand Up @@ -196,12 +202,6 @@ func (pgw *PostgresWriter) ReadMetricSchemaType() (err error) {
return
}

var (
forceRecreatePartitions = false // to signal override PG metrics storage cache
partitionMapMetric = make(map[string]ExistingPartitionInfo) // metric = min/max bounds
partitionMapMetricDbname = make(map[string]map[string]ExistingPartitionInfo) // metric[dbname = min/max bounds]
)

// SyncMetric ensures that tables exist for newly added metrics and/or sources
func (pgw *PostgresWriter) SyncMetric(sourceName, metricName string, op SyncOp) error {
if op == AddOp {
Expand Down Expand Up @@ -415,13 +415,13 @@ func (pgw *PostgresWriter) flush(msgs []metrics.MeasurementEnvelope) {

switch pgw.metricSchema {
case DbStorageSchemaPostgres:
err = pgw.EnsureMetricDbnameTime(pgPartBoundsDbName, forceRecreatePartitions)
err = pgw.EnsureMetricDbnameTime(pgPartBoundsDbName)
case DbStorageSchemaTimescale:
err = pgw.EnsureMetricTimescale(pgPartBounds)
default:
logger.Fatal("unknown storage schema...")
}
forceRecreatePartitions = false
pgw.forceRecreatePartitions = false
if err != nil {
pgw.lastError <- err
}
Expand All @@ -435,9 +435,9 @@ func (pgw *PostgresWriter) flush(msgs []metrics.MeasurementEnvelope) {
if err != nil {
logger.Error(err)
if PgError, ok := err.(*pgconn.PgError); ok {
forceRecreatePartitions = PgError.Code == "23514"
pgw.forceRecreatePartitions = PgError.Code == "23514"
}
if forceRecreatePartitions {
if pgw.forceRecreatePartitions {
logger.Warning("Some metric partitions might have been removed, halting all metric storage. Trying to re-create all needed partitions on next run")
}
}
Expand All @@ -454,48 +454,48 @@ func (pgw *PostgresWriter) EnsureMetricTimescale(pgPartBounds map[string]Existin
logger := log.GetLogger(pgw.ctx)
sqlEnsure := `select * from admin.ensure_partition_timescale($1)`
for metric := range pgPartBounds {
if _, ok := partitionMapMetric[metric]; !ok {
if _, ok := pgw.partitionMapMetric[metric]; !ok {
if _, err = pgw.sinkDb.Exec(pgw.ctx, sqlEnsure, metric); err != nil {
logger.Errorf("Failed to create a TimescaleDB table for metric '%s': %v", metric, err)
return err
}
partitionMapMetric[metric] = ExistingPartitionInfo{}
pgw.partitionMapMetric[metric] = ExistingPartitionInfo{}
}
}
return
}

func (pgw *PostgresWriter) EnsureMetricDbnameTime(metricDbnamePartBounds map[string]map[string]ExistingPartitionInfo, force bool) (err error) {
func (pgw *PostgresWriter) EnsureMetricDbnameTime(metricDbnamePartBounds map[string]map[string]ExistingPartitionInfo) (err error) {
var rows pgx.Rows
sqlEnsure := `select * from admin.ensure_partition_metric_dbname_time($1, $2, $3, $4)`
for metric, dbnameTimestampMap := range metricDbnamePartBounds {
_, ok := partitionMapMetricDbname[metric]
_, ok := pgw.partitionMapMetricDbname[metric]
if !ok {
partitionMapMetricDbname[metric] = make(map[string]ExistingPartitionInfo)
pgw.partitionMapMetricDbname[metric] = make(map[string]ExistingPartitionInfo)
}

for dbname, pb := range dbnameTimestampMap {
if pb.StartTime.IsZero() || pb.EndTime.IsZero() {
return fmt.Errorf("zero StartTime/EndTime in partitioning request: [%s:%v]", metric, pb)
}
partInfo, ok := partitionMapMetricDbname[metric][dbname]
if !ok || (ok && (pb.StartTime.Before(partInfo.StartTime))) || force {
partInfo, ok := pgw.partitionMapMetricDbname[metric][dbname]
if !ok || (ok && (pb.StartTime.Before(partInfo.StartTime))) || pgw.forceRecreatePartitions {
if rows, err = pgw.sinkDb.Query(pgw.ctx, sqlEnsure, metric, dbname, pb.StartTime, pgw.opts.PartitionInterval); err != nil {
return
}
if partInfo, err = pgx.CollectOneRow(rows, pgx.RowToStructByPos[ExistingPartitionInfo]); err != nil {
return err
}
partitionMapMetricDbname[metric][dbname] = partInfo
pgw.partitionMapMetricDbname[metric][dbname] = partInfo
}
if pb.EndTime.After(partInfo.EndTime) || pb.EndTime.Equal(partInfo.EndTime) || force {
if rows, err = pgw.sinkDb.Query(pgw.ctx, sqlEnsure, metric, dbname, pb.StartTime, pgw.opts.PartitionInterval); err != nil {
if pb.EndTime.After(partInfo.EndTime) || pb.EndTime.Equal(partInfo.EndTime) || pgw.forceRecreatePartitions {
if rows, err = pgw.sinkDb.Query(pgw.ctx, sqlEnsure, metric, dbname, pb.EndTime, pgw.opts.PartitionInterval); err != nil {
return
}
if partInfo, err = pgx.CollectOneRow(rows, pgx.RowToStructByPos[ExistingPartitionInfo]); err != nil {
return err
}
partitionMapMetricDbname[metric][dbname] = partInfo
pgw.partitionMapMetricDbname[metric][dbname] = partInfo
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions internal/sinks/postgres_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -694,7 +694,7 @@ func TestPartitionInterval(t *testing.T) {
},
},
}
err = pgw.EnsureMetricDbnameTime(m, false)
err = pgw.EnsureMetricDbnameTime(m)
r.NoError(err)

var partitionsNum int
Expand All @@ -704,7 +704,7 @@ func TestPartitionInterval(t *testing.T) {
// + 4 time partitions (1 we asked for + 3 precreated)
a.Equal(6, partitionsNum)

part := partitionMapMetricDbname["test_metric"]["test_db"]
part := pgw.partitionMapMetricDbname["test_metric"]["test_db"]
// partition bounds should have a difference of 3 weeks
a.Equal(part.StartTime.Add(3*7*24*time.Hour), part.EndTime)
}
Expand Down
2 changes: 2 additions & 0 deletions internal/sinks/sql/ensure_partition_postgres.sql
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,8 @@ BEGIN
IF metric_timestamp >= l_part_start AND metric_timestamp < l_part_end THEN
part_available_from := l_part_start;
part_available_to := l_part_end;
ELSEIF metric_timestamp < l_part_start THEN
EXIT; -- No need to create further partitions
END IF;
Comment thread
0xgouda marked this conversation as resolved.
END IF;

Expand Down
Loading