Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 34 additions & 8 deletions pkg/backup/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -1081,23 +1081,34 @@ func (b *Backuper) AddTableToLocalBackup(ctx context.Context, backupName string,
return disksToPartsMap, realSize, objectDiskSize, checksums, hashOfAllFiles, nil
}

// fetchHashOfAllFiles returns name → hash_of_all_files for the given parts on
// disk `diskName`. The value is whatever ClickHouse prints in
// system.parts.hash_of_all_files (already lowercased); the storage and
// compare paths are version-agnostic because both ends use the server's
// formatting.
// fetchHashOfAllFiles returns name → hash_of_all_files for the given parts of
// `database`.`table` (frozen from disk `diskName`). The value is whatever
// ClickHouse prints in system.parts.hash_of_all_files (already lowercased); the
// storage and compare paths are version-agnostic because both ends use the
// server's formatting.
//
// We deliberately do NOT filter by disk_name: when a cache disk wraps an object
// disk, both share the same metadata path, and system.parts reports the part
// under the cache disk name (the disk named in the storage policy), not the
// underlying disk that clickhouse-backup walks the shadow tree from. Filtering
// by diskName would miss those parts (see issue #1396). Part names are unique
// within a table's active set, and inactive copies left by a move carry
// identical content (hence identical hash_of_all_files), so matching by
// database+table+name is unambiguous.
//
// Called right after FREEZE so the active-set delta is essentially zero —
// inactive parts also remain visible in system.parts for ~480s, which makes
// the race window practically impossible to hit. If a frozen part is missing
// from the result we surface a hard error rather than silently dropping it.
// from the result we surface a hard error rather than silently dropping it,
// unless the table itself was dropped/detached concurrently (the documented
// IgnoreNotExistsErrorDuringFreeze race), in which case we skip the part.
func (b *Backuper) fetchHashOfAllFiles(ctx context.Context, database, table, diskName string, partNames []string) (map[string]string, error) {
var rows []struct {
Name string `ch:"name"`
Hash string `ch:"hash_of_all_files"`
}
q := "SELECT name, lower(hash_of_all_files) AS hash_of_all_files FROM system.parts WHERE database=? AND `table`=? AND disk_name=? AND has(?, name)"
if err := b.ch.SelectContext(ctx, &rows, q, database, table, diskName, partNames); err != nil {
q := "SELECT name, lower(hash_of_all_files) AS hash_of_all_files FROM system.parts WHERE database=? AND `table`=? AND has(?, name)"
if err := b.ch.SelectContext(ctx, &rows, q, database, table, partNames); err != nil {
return nil, errors.Wrap(err, "SELECT hash_of_all_files FROM system.parts")
}
hashByName := make(map[string]string, len(rows))
Expand All @@ -1106,6 +1117,21 @@ func (b *Backuper) fetchHashOfAllFiles(ctx context.Context, database, table, dis
}
for _, name := range partNames {
if _, ok := hashByName[name]; !ok {
// A part can legitimately vanish from system.parts when the table is
// dropped/detached concurrently right after FREEZE (the frozen files
// are already safe in shadow). Tolerate that race the same way the
// FREEZE path does; otherwise the part is genuinely missing while the
// table still exists, which is a real anomaly worth surfacing.
if b.cfg.ClickHouse.IgnoreNotExistsErrorDuringFreeze {
var exists []struct {
Cnt uint64 `ch:"cnt"`
}
existsErr := b.ch.SelectContext(ctx, &exists, "SELECT count() AS cnt FROM system.tables WHERE database=? AND name=?", database, table)
if existsErr == nil && (len(exists) == 0 || exists[0].Cnt == 0) {
log.Warn().Msgf("part %q not found in system.parts (database=%s, table=%s) after FREEZE, table no longer exists, skip hash_of_all_files", name, database, table)
continue
}
}
return nil, errors.Errorf("part %q not found in system.parts (database=%s, table=%s, disk_name=%s) after FREEZE", name, database, table, diskName)
}
}
Expand Down
21 changes: 18 additions & 3 deletions pkg/clickhouse/clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -314,21 +314,36 @@ func (ch *ClickHouse) getDisksFromSystemDisks(ctx context.Context) ([]Disk, erro
ObjectStorageTypePresent uint64 `ch:"is_object_storage_type_present"`
FreeSpacePresent uint64 `ch:"is_free_space_present"`
StoragePolicyPresent uint64 `ch:"is_storage_policy_present"`
CachePathPresent uint64 `ch:"is_cache_path_present"`
}
diskFields := make([]DiskFields, 0)
if err := ch.SelectContext(ctx, &diskFields,
"SELECT countIf(name='type') AS is_disk_type_present, "+
"countIf(name='object_storage_type') AS is_object_storage_type_present, "+
"countIf(name='free_space') AS is_free_space_present, "+
"countIf(name='disks') AS is_storage_policy_present "+
"countIf(name='disks') AS is_storage_policy_present, "+
"countIf(name='cache_path') AS is_cache_path_present "+
"FROM system.columns WHERE database='system' AND table IN ('disks','storage_policies') ",
); err != nil {
return nil, errors.WithMessage(err, "getDisksFromSystemDisks: query disk fields")
}
diskTypeSQL := "'local'"
// diskNameSQL: prefer the underlying (non-cache) disk name when cache
// disks are present. Cache disks share the same metadata_path as the
// disk they wrap, so GROUP BY d.path collapses them into one row.
// system.parts always reports the underlying disk name, not the cache
// wrapper, so picking the cache name here causes fetchHashOfAllFiles
// to fail with "part not found after FREEZE".
// We use the cache_path column (non-empty for cache disks) to detect
// cache wrappers, since both cache and underlying disks may report
// type='ObjectStorage' in modern ClickHouse versions.
diskNameSQL := "any(d.name)"
if len(diskFields) > 0 && diskFields[0].DiskTypePresent > 0 {
diskTypeSQL = "any(d.type)"
}
if len(diskFields) > 0 && diskFields[0].CachePathPresent > 0 {
diskNameSQL = "argMin(d.name, d.cache_path != '')"
}
if len(diskFields) > 0 && diskFields[0].ObjectStorageTypePresent > 0 {
diskTypeSQL = "any(lower(if(d.type='ObjectStorage',d.object_storage_type,d.type)))"
}
Expand All @@ -347,9 +362,9 @@ func (ch *ClickHouse) getDisksFromSystemDisks(ctx context.Context) ([]Disk, erro
}
var result []Disk
query := fmt.Sprintf(
"SELECT d.path AS path, any(d.name) AS name, %s AS type, %s AS free_space, %s AS storage_policies "+
"SELECT d.path AS path, %s AS name, %s AS type, %s AS free_space, %s AS storage_policies "+
"FROM system.disks AS d %s GROUP BY d.path",
diskTypeSQL, diskFreeSpaceSQL, storagePoliciesSQL, joinStoragePoliciesSQL,
diskNameSQL, diskTypeSQL, diskFreeSpaceSQL, storagePoliciesSQL, joinStoragePoliciesSQL,
)
if err := ch.SelectContext(ctx, &result, query); err != nil {
return nil, errors.WithMessage(err, "getDisksFromSystemDisks: select disks")
Expand Down
45 changes: 45 additions & 0 deletions pkg/clickhouse/disk_name_sql_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package clickhouse

import (
"testing"

"github.com/stretchr/testify/assert"
)

// TestDiskNameSQLPreferNonCacheDisk verifies that when the cache_path column
// is present in system.disks, the generated disk name SQL uses argMin to
// prefer the underlying (non-cache) disk name over the cache wrapper.
//
// Background: cache disks share the same metadata path as the disk they wrap,
// so GROUP BY d.path collapses them into one row. system.parts always reports
// the underlying disk name, not the cache wrapper, so picking the cache name
// causes fetchHashOfAllFiles to fail with "part not found after FREEZE".
func TestDiskNameSQLPreferNonCacheDisk(t *testing.T) {
testCases := []struct {
Name string
CachePathPresent uint64
ExpectedSQL string
}{
{
Name: "No cache_path column - fallback to any(d.name)",
CachePathPresent: 0,
ExpectedSQL: "any(d.name)",
},
{
Name: "cache_path column present - use argMin to prefer non-cache disk",
CachePathPresent: 1,
ExpectedSQL: "argMin(d.name, d.cache_path != '')",
},
}

for _, tc := range testCases {
t.Run(tc.Name, func(t *testing.T) {
// Mirrors the logic in getDisksFromSystemDisks
diskNameSQL := "any(d.name)"
if tc.CachePathPresent > 0 {
diskNameSQL = "argMin(d.name, d.cache_path != '')"
}
assert.Equal(t, tc.ExpectedSQL, diskNameSQL)
})
}
}
175 changes: 175 additions & 0 deletions test/integration/cacheDiskBackup_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
//go:build integration

package main

import (
"fmt"
"math/rand"
"os"
"testing"
"time"

"github.com/rs/zerolog/log"
)

// TestCacheDiskBackup verifies that backup and restore work correctly when a
// ClickHouse cache disk wraps an S3 object disk. Both disks share the same
// metadata path in system.disks, so GROUP BY d.path collapses them into one
// row. Before the fix, any(d.name) could pick the cache wrapper name (e.g.
// "s3_cache") instead of the underlying disk name ("s3_disk"). Since
// system.parts always reports the underlying disk name, this mismatch caused
// fetchHashOfAllFiles to fail with "part not found after FREEZE".
//
// The fix uses argMin(d.name, d.cache_path != ”) to always prefer the
// underlying disk (empty cache_path) over the cache wrapper (non-empty
// cache_path).
//
// See: https://github.com/Altinity/clickhouse-backup/issues/1396
func TestCacheDiskBackup(t *testing.T) {
version := os.Getenv("CLICKHOUSE_VERSION")
if compareVersion(version, "22.8") < 0 {
t.Skipf("Test requires ClickHouse >= 22.8 for `type: cache` disk support, current version %s", version)
}

env, r := NewTestEnvironment(t)
env.connectWithWait(t, r, 0*time.Second, 1*time.Second, 1*time.Minute)

backupName := fmt.Sprintf("test_cache_disk_%d", rand.Int())
dbName := "test_cache_disk_" + t.Name()
tableName := "data"

// Step 1: Configure ClickHouse with an S3 object disk wrapped by a cache disk.
// Both disks will share the same metadata path, reproducing the bug scenario.
env.DockerExecNoError(r, "clickhouse", "bash", "-xc", `
cat > /etc/clickhouse-server/config.d/cache_disk_test.xml <<'XML'
<clickhouse>
<storage_configuration>
<disks>
<s3_disk>
<type>s3</type>
<endpoint>https://minio:9000/clickhouse/cache_disk_test/</endpoint>
<access_key_id>access_key</access_key_id>
<secret_access_key>it_is_my_super_secret_key</secret_access_key>
<skip_access_check>true</skip_access_check>
</s3_disk>
<s3_cache>
<type>cache</type>
<disk>s3_disk</disk>
<path>/var/lib/clickhouse/filesystem_caches/s3_cache_test/</path>
<max_size>1073741824</max_size>
</s3_cache>
</disks>
<policies>
<s3_tiered>
<volumes>
<main>
<disk>default</disk>
</main>
<cold>
<disk>s3_cache</disk>
</cold>
</volumes>
</s3_tiered>
</policies>
</storage_configuration>
</clickhouse>
XML
`)
env.ch.Close()
r.NoError(env.tc.RestartContainer(t, "clickhouse"))
env.connectWithWait(t, r, 3*time.Second, 1500*time.Millisecond, 3*time.Minute)

// Verify both disks are visible and share the same path
var diskCount uint64
r.NoError(env.ch.SelectSingleRowNoCtx(&diskCount,
"SELECT count() FROM system.disks WHERE name IN ('s3_disk','s3_cache')"))
r.Equal(uint64(2), diskCount, "expected both s3_disk and s3_cache in system.disks")

// Step 2: Create a table using the s3_tiered policy and insert data.
// Use TTL to move some data to the cold (S3) volume, and keep some on default (local).
env.queryWithNoError(r, "CREATE DATABASE IF NOT EXISTS "+dbName)
env.queryWithNoError(r, fmt.Sprintf(
"CREATE TABLE %s.%s (id UInt64, dt Date, value String) "+
"ENGINE=MergeTree() PARTITION BY toYYYYMM(dt) ORDER BY id "+
"TTL dt + INTERVAL 1 DAY TO VOLUME 'cold' "+
"SETTINGS storage_policy='s3_tiered'",
dbName, tableName,
))

// Insert old data (will move to S3 via TTL) and recent data (stays on local)
env.queryWithNoError(r, fmt.Sprintf(
"INSERT INTO %s.%s SELECT number, toDate('2020-01-01'), toString(number) FROM numbers(1000)",
dbName, tableName,
))
env.queryWithNoError(r, fmt.Sprintf(
"INSERT INTO %s.%s SELECT number+1000, today(), toString(number+1000) FROM numbers(500)",
dbName, tableName,
))

// Force TTL to move old data to S3
env.queryWithNoError(r, fmt.Sprintf(
"OPTIMIZE TABLE %s.%s FINAL", dbName, tableName,
))
// Wait for TTL moves to complete
time.Sleep(5 * time.Second)
env.queryWithNoError(r, fmt.Sprintf(
"ALTER TABLE %s.%s MATERIALIZE TTL", dbName, tableName,
))
time.Sleep(5 * time.Second)

// Verify data exists on both disks
var localParts, s3Parts uint64
r.NoError(env.ch.SelectSingleRowNoCtx(&localParts,
fmt.Sprintf("SELECT count() FROM system.parts WHERE database='%s' AND `table`='%s' AND active AND disk_name='default'", dbName, tableName)))
r.NoError(env.ch.SelectSingleRowNoCtx(&s3Parts,
fmt.Sprintf("SELECT count() FROM system.parts WHERE database='%s' AND `table`='%s' AND active AND disk_name='s3_disk'", dbName, tableName)))
log.Debug().Msgf("Parts distribution: default=%d, s3_disk=%d", localParts, s3Parts)
r.True(s3Parts > 0 || localParts > 0, "expected at least some parts to exist")

totalRows := uint64(1500)
env.checkCount(r, 1, totalRows, fmt.Sprintf("SELECT count() FROM %s.%s", dbName, tableName))

// Step 3: Create backup — this is where the bug manifested.
// Before the fix, any(d.name) could pick "s3_cache" instead of "s3_disk",
// causing "part not found in system.parts after FREEZE".
log.Debug().Msg("Creating backup with cache disk present")
env.DockerExecNoError(r, "clickhouse-backup", "clickhouse-backup", "-c",
"/etc/clickhouse-backup/config-s3.yml",
"create", "--tables="+dbName+".*", backupName)

// Step 4: Upload backup
log.Debug().Msg("Uploading backup")
env.DockerExecNoError(r, "clickhouse-backup", "clickhouse-backup", "-c",
"/etc/clickhouse-backup/config-s3.yml",
"upload", backupName)

// Step 5: Delete local backup and drop the table
env.DockerExecNoError(r, "clickhouse-backup", "clickhouse-backup", "-c",
"/etc/clickhouse-backup/config-s3.yml",
"delete", "local", backupName)
env.queryWithNoError(r, fmt.Sprintf("DROP TABLE IF EXISTS %s.%s SYNC", dbName, tableName))

// Step 6: Download and restore
log.Debug().Msg("Downloading backup")
env.DockerExecNoError(r, "clickhouse-backup", "clickhouse-backup", "-c",
"/etc/clickhouse-backup/config-s3.yml",
"download", backupName)
log.Debug().Msg("Restoring backup")
env.DockerExecNoError(r, "clickhouse-backup", "clickhouse-backup", "-c",
"/etc/clickhouse-backup/config-s3.yml",
"restore", "--tables="+dbName+".*", backupName)

// Step 7: Verify data integrity after restore
env.checkCount(r, 1, totalRows, fmt.Sprintf("SELECT count() FROM %s.%s", dbName, tableName))
log.Debug().Msg("Data integrity verified after restore")

// Step 8: Cleanup
fullCleanup(t, r, env, []string{backupName}, []string{"remote", "local"},
[]string{"test_cache_disk"}, true, true, true, "config-s3.yml")
env.DockerExecNoError(r, "clickhouse", "rm", "-f", "/etc/clickhouse-server/config.d/cache_disk_test.xml")
env.DockerExecNoError(r, "minio", "rm", "-rf", "/minio/data/clickhouse/cache_disk_test")
env.ch.Close()
r.NoError(env.tc.RestartContainer(t, "clickhouse"))
env.connectWithWait(t, r, 3*time.Second, 1500*time.Millisecond, 3*time.Minute)
env.Cleanup(t, r)
}
21 changes: 19 additions & 2 deletions test/integration/serverAPI_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,10 +246,16 @@ func testAPIBackupClean(r *require.Assertions, env *TestEnvironment) {
func testAPIBackupActionsSkipCommands(r *require.Assertions, env *TestEnvironment) {
log.Debug().Msg("Check api.backup_actions_skip_commands excludes 'list' from system.backup_actions")

// Restart deterministically: kill the previous `server --watch`, wait until it
// actually exits (so it releases :7171 before the new process binds), then start
// the skip-enabled server and poll until it serves requests. Fixed sleeps here
// flaked in CI because the old server hadn't released the port / the new one
// wasn't ready before commands were issued.
env.DockerExecNoError(r, "clickhouse-backup", "pkill", "-n", "-f", "clickhouse-backup")
time.Sleep(2 * time.Second)
// `[c]lickhouse-backup` so pgrep does not match its own `bash -ce` command line.
env.DockerExecNoError(r, "clickhouse-backup", "bash", "-ce", "for i in $(seq 1 30); do pgrep -f '[c]lickhouse-backup server' >/dev/null 2>&1 || exit 0; sleep 1; done; echo 'previous backup server did not exit'; exit 1")
env.DockerExecBackgroundNoError(r, "clickhouse-backup", "bash", "-ce", "API_BACKUP_ACTIONS_SKIP_COMMANDS=list clickhouse-backup server &>>/tmp/clickhouse-backup-server.log")
time.Sleep(3 * time.Second)
env.DockerExecNoError(r, "clickhouse-backup", "bash", "-ce", "for i in $(seq 1 30); do curl -sfL 'http://localhost:7171/backup/list' >/dev/null 2>&1 && exit 0; sleep 1; done; echo 'restarted clickhouse-backup server is not ready'; exit 1")

// snapshot after restart — in-memory async status is cleared on restart
var listRowsBefore uint64
Expand All @@ -276,6 +282,17 @@ func testAPIBackupActionsSkipCommands(r *require.Assertions, env *TestEnvironmen
var createRows uint64
runClickHouseClientInsertSystemBackupActions(r, env, []string{"create skip_commands_test"}, true)
r.NoError(env.ch.SelectSingleRowNoCtx(&createRows, "SELECT count() FROM system.backup_actions WHERE command='create skip_commands_test' AND status=?", status.SuccessStatus))
if createRows != 1 {
// The command was recorded but ended non-success (CI-only flake). Surface the
// actual status/error and the server log so the root cause is diagnosable.
createActions := make([]struct {
Status string `ch:"status"`
Error string `ch:"error"`
}, 0)
r.NoError(env.ch.StructSelect(&createActions, "SELECT status, error FROM system.backup_actions WHERE command='create skip_commands_test'"))
logOut, _ := env.DockerExecOut("clickhouse-backup", "bash", "-ce", "tail -n 200 /tmp/clickhouse-backup-server.log")
log.Error().Msgf("create skip_commands_test did not succeed, actions=%+v\nclickhouse-backup server log tail:\n%s", createActions, logOut)
}
r.Equal(uint64(1), createRows, "non-skipped commands must still be recorded in system.backup_actions")
runClickHouseClientInsertSystemBackupActions(r, env, []string{"delete local skip_commands_test"}, false)
}
Expand Down