Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 36 additions & 0 deletions pkg/backup/backuper.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"regexp"
"strings"
"sync"
"time"

"github.com/Altinity/clickhouse-backup/v2/pkg/common"
"github.com/Altinity/clickhouse-backup/v2/pkg/metadata"
Expand Down Expand Up @@ -51,6 +52,8 @@ type Backuper struct {
resumableState *resumable.State
shadowBackupUUIDs []string
shadowBackupUUIDsMutex sync.Mutex
fileManifest *storage.BackupManifest
fileManifestMu sync.Mutex
}

func NewBackuper(cfg *config.Config, opts ...BackuperOpt) *Backuper {
Expand Down Expand Up @@ -569,6 +572,39 @@ func (b *Backuper) addShadowBackupUUID(uuid string) {
b.shadowBackupUUIDsMutex.Unlock()
}

// recordUploadedFile records a single file in the backup manifest (thread-safe).
// remotePath should be the full remote path including the backupName prefix.
func (b *Backuper) recordUploadedFile(backupName, remotePath string, size int64) {
if b.fileManifest == nil {
return
}
relPath := strings.TrimPrefix(remotePath, backupName+"/")
b.fileManifestMu.Lock()
b.fileManifest.AddFile(relPath, size, time.Now().UTC())
b.fileManifestMu.Unlock()
}

// recordUploadedLocalFiles records multiple files in the backup manifest by
// stating the local source files to obtain their sizes (thread-safe).
// remotePath is the remote directory (including backupName prefix), localBasePath
// is the local directory the files are relative to.
func (b *Backuper) recordUploadedLocalFiles(backupName, remotePath, localBasePath string, files []string) {
if b.fileManifest == nil {
return
}
b.fileManifestMu.Lock()
defer b.fileManifestMu.Unlock()
for _, f := range files {
fullLocal := path.Join(localBasePath, f)
info, err := os.Stat(fullLocal)
if err != nil {
continue
}
relPath := strings.TrimPrefix(path.Join(remotePath, f), backupName+"/")
b.fileManifest.AddFile(relPath, info.Size(), info.ModTime())
}
}

// CheckDisksUsage - https://github.com/Altinity/clickhouse-backup/issues/878
func (b *Backuper) CheckDisksUsage(backup storage.Backup, disks []clickhouse.Disk, isResumeExists bool, tablePattern string) error {
if tablePattern != "" && tablePattern != "*.*" && tablePattern != "*" {
Expand Down
26 changes: 24 additions & 2 deletions pkg/backup/download.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,10 @@ func (b *Backuper) Download(backupName string, tablePattern string, partitions [
if !found {
return errors.Errorf("'%s' is not found on remote storage", backupName)
}
// Download file manifest for Walk-free restore (falls back gracefully if not present)
var backupManifest *storage.BackupManifest
backupManifest, _ = b.dst.DownloadManifest(ctx, backupName)

if len(remoteBackup.Tables) == 0 && remoteBackup.RBACSize == 0 && remoteBackup.ConfigSize == 0 && remoteBackup.NamedCollectionsSize == 0 && !b.cfg.General.AllowEmptyBackups {
return errors.Errorf("'%s' is empty backup", backupName)
}
Expand Down Expand Up @@ -266,7 +270,7 @@ func (b *Backuper) Download(backupName string, tablePattern string, partitions [
}).Msg("download table start")
var downloadDataErr error
var downloadDataSize uint64
downloadDataSize, downloadDataErr = b.downloadTableData(dataCtx, remoteBackup.BackupMetadata, *tableMetadataAfterDownload[idx], disks, hardlinkExistsFiles)
downloadDataSize, downloadDataErr = b.downloadTableData(dataCtx, remoteBackup.BackupMetadata, *tableMetadataAfterDownload[idx], disks, hardlinkExistsFiles, backupManifest)
if downloadDataErr != nil {
return errors.WithMessage(downloadDataErr, "downloadTableData")
}
Expand Down Expand Up @@ -720,7 +724,7 @@ func (b *Backuper) downloadBackupRelatedDir(ctx context.Context, remoteBackup st
return uint64(remoteFileInfo.Size()), nil
}

func (b *Backuper) downloadTableData(ctx context.Context, remoteBackup metadata.BackupMetadata, table metadata.TableMetadata, disks []clickhouse.Disk, hardlinkExistsFiles bool) (uint64, error) {
func (b *Backuper) downloadTableData(ctx context.Context, remoteBackup metadata.BackupMetadata, table metadata.TableMetadata, disks []clickhouse.Disk, hardlinkExistsFiles bool, manifest *storage.BackupManifest) (uint64, error) {
dbAndTableDir := path.Join(common.TablePathEncode(table.Database), common.TablePathEncode(table.Table))
dataGroup, dataCtx := errgroup.WithContext(ctx)
dataGroup.SetLimit(int(b.cfg.General.DownloadConcurrency))
Expand Down Expand Up @@ -882,6 +886,24 @@ func (b *Backuper) downloadTableData(ctx context.Context, remoteBackup metadata.
}
}

// Try manifest-based download to avoid Walk (ListObjectsV2)
if manifest != nil {
manifestPrefix := path.Join("shadow", dbAndTableDir, capturedDisk, capturedPart.Name)
manifestFiles := manifest.FilesUnderPrefix(manifestPrefix)
if len(manifestFiles) > 0 {
pathSize, downloadErr := b.dst.DownloadPathWithManifest(dataCtx, partRemotePath, partLocalPath, manifestFiles, manifestPrefix, b.cfg.General.RetriesOnFailure, b.cfg.General.RetriesDuration, b.cfg.General.RetriesJitter, b, b.cfg.General.DownloadMaxBytesPerSecond)
if downloadErr != nil {
return errors.WithMessage(downloadErr, "DownloadPathWithManifest")
}
atomic.AddUint64(&downloadedSize, uint64(pathSize))
if b.resume {
b.resumableState.AppendToState(partRemotePath, pathSize)
}
log.Debug().Msgf("finish %s -> %s (manifest, %d files)", partRemotePath, partLocalPath, len(manifestFiles))
return nil
}
}
// Fall back to Walk (ListObjectsV2) when no manifest is available
pathSize, downloadErr := b.dst.DownloadPath(dataCtx, partRemotePath, partLocalPath, b.cfg.General.RetriesOnFailure, b.cfg.General.RetriesDuration, b.cfg.General.RetriesJitter, b, b.cfg.General.DownloadMaxBytesPerSecond)
if downloadErr != nil {
return errors.WithMessage(downloadErr, "DownloadPath")
Expand Down
17 changes: 17 additions & 0 deletions pkg/backup/upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,9 @@ func (b *Backuper) Upload(backupName string, deleteSource bool, diffFrom, diffFr
defer b.resumableState.Close()
}

// Initialize file manifest to record all uploaded files for Walk-free restore
b.fileManifest = storage.NewBackupManifest(backupName)

compressedDataSize := int64(0)
metadataSize := int64(0)

Expand Down Expand Up @@ -272,6 +275,17 @@ func (b *Backuper) Upload(backupName string, deleteSource bool, diffFrom, diffFr
return errors.Wrapf(err, "can't upload %s", remoteBackupMetaFile)
}
}
// Record metadata.json in the manifest, then upload the manifest itself
b.recordUploadedFile(backupName, remoteBackupMetaFile, int64(len(newBackupMetadataBody)))
if b.fileManifest != nil {
if manifestErr := b.dst.UploadManifest(ctx, backupName, b.fileManifest); manifestErr != nil {
log.Warn().Err(manifestErr).Msg("failed to upload manifest.json, restore will fall back to Walk")
} else {
log.Info().Int("total_files", b.fileManifest.TotalFiles).
Int64("total_size", b.fileManifest.TotalSize).
Msg("uploaded backup manifest")
}
}
log.Info().Fields(map[string]interface{}{
"backup": backupName,
"operation": "upload",
Expand Down Expand Up @@ -604,6 +618,7 @@ func (b *Backuper) uploadTableData(ctx context.Context, backupName string, delet
}

atomic.AddInt64(&uploadedBytes, uploadPathBytes)
b.recordUploadedLocalFiles(backupName, remotePath, backupPath, partFiles)
if b.resume {
b.resumableState.AppendToState(remotePathFull, uploadPathBytes)
}
Expand Down Expand Up @@ -649,6 +664,7 @@ func (b *Backuper) uploadTableData(ctx context.Context, backupName string, delet
return errors.Wrapf(err, "can't check uploaded remoteDataFile: %s, error", remoteDataFile)
}
atomic.AddInt64(&uploadedBytes, remoteFile.Size())
b.recordUploadedFile(backupName, remoteDataFile, remoteFile.Size())
if b.resume {
b.resumableState.AppendToState(remoteDataFile, remoteFile.Size())
}
Expand Down Expand Up @@ -707,6 +723,7 @@ func (b *Backuper) uploadTableMetadataRegular(ctx context.Context, backupName st
if err != nil {
return 0, errors.Wrap(err, "can't upload")
}
b.recordUploadedFile(backupName, remoteTableMetaFile, int64(len(content)))
if b.resume {
b.resumableState.AppendToState(remoteTableMetaFile, int64(len(content)))
}
Expand Down
Loading
Loading