Skip to content

Commit dffde6b

Browse files
committed
add hander for create.stare2 and resume.state2 in ResumeOperationsAfterRestart, fix #1083
1 parent 3c22661 commit dffde6b

7 files changed

Lines changed: 531 additions & 49 deletions

File tree

ReadMe.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -401,7 +401,8 @@ api:
401401
integration_tables_host: "" # API_INTEGRATION_TABLES_HOST, allow using DNS name to connect in `system.backup_list` and `system.backup_actions`
402402
allow_parallel: false # API_ALLOW_PARALLEL, enable parallel operations, this allows for significant memory allocation and spawns go-routines, don't enable it if you are not sure
403403
create_integration_tables: false # API_CREATE_INTEGRATION_TABLES, create `system.backup_list` and `system.backup_actions`
404-
complete_resumable_after_restart: true # API_COMPLETE_RESUMABLE_AFTER_RESTART, after API server startup, if `/var/lib/clickhouse/backup/*/(upload|download).state2` present, then operation will continue in the background
404+
complete_resumable_after_restart: true # API_COMPLETE_RESUMABLE_AFTER_RESTART, after API server startup, if `/var/lib/clickhouse/backup/*/{command}.state2` present and command is allowed by complete_resumable_after_restart_commands, then operation will continue in the background
405+
complete_resumable_after_restart_commands: [upload, download] # API_COMPLETE_RESUMABLE_AFTER_RESTART_COMMANDS, commands allowed for automatic resume after API server restart
405406
watch_is_main_process: false # WATCH_IS_MAIN_PROCESS, treats 'watch' command as a main api process, if it is stopped unexpectedly, api server is also stopped. Does not stop api server if 'watch' command canceled by the user.
406407
backup_actions_skip_commands: [] # API_BACKUP_ACTIONS_SKIP_COMMANDS, list of commands that must NOT be recorded into the in-memory async status exposed via `system.backup_actions` and `/backup/actions`. Useful to keep high-frequency monitoring calls (typically `list`) from growing the actions state and consuming RAM during long-running backups. Example: `[list]`
407408
cancel_operation_timeout: "1800s" # API_CANCEL_OPERATION_TIMEOUT, how long `/backup/kill` (and server stop/restart) waits for the underlying command goroutine to actually return after the context is canceled. If the goroutine is stuck on an IO without timeout, kill returns once this timeout elapses. See https://github.com/Altinity/clickhouse-backup/issues/1365

pkg/config/config.go

Lines changed: 33 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -304,21 +304,22 @@ type ClickHouseConfig struct {
304304
}
305305

306306
type APIConfig struct {
307-
ListenAddr string `yaml:"listen" envconfig:"API_LISTEN"`
308-
EnableMetrics bool `yaml:"enable_metrics" envconfig:"API_ENABLE_METRICS"`
309-
EnablePprof bool `yaml:"enable_pprof" envconfig:"API_ENABLE_PPROF"`
310-
Username string `yaml:"username" envconfig:"API_USERNAME"`
311-
Password string `yaml:"password" envconfig:"API_PASSWORD"`
312-
Secure bool `yaml:"secure" envconfig:"API_SECURE"`
313-
CertificateFile string `yaml:"certificate_file" envconfig:"API_CERTIFICATE_FILE"`
314-
PrivateKeyFile string `yaml:"private_key_file" envconfig:"API_PRIVATE_KEY_FILE"`
315-
CAKeyFile string `yaml:"ca_cert_file" envconfig:"API_CA_KEY_FILE"`
316-
CACertFile string `yaml:"ca_key_file" envconfig:"API_CA_CERT_FILE"`
317-
CreateIntegrationTables bool `yaml:"create_integration_tables" envconfig:"API_CREATE_INTEGRATION_TABLES"`
318-
IntegrationTablesHost string `yaml:"integration_tables_host" envconfig:"API_INTEGRATION_TABLES_HOST"`
319-
AllowParallel bool `yaml:"allow_parallel" envconfig:"API_ALLOW_PARALLEL"`
320-
CompleteResumableAfterRestart bool `yaml:"complete_resumable_after_restart" envconfig:"API_COMPLETE_RESUMABLE_AFTER_RESTART"`
321-
WatchIsMainProcess bool `yaml:"watch_is_main_process" envconfig:"WATCH_IS_MAIN_PROCESS"`
307+
ListenAddr string `yaml:"listen" envconfig:"API_LISTEN"`
308+
EnableMetrics bool `yaml:"enable_metrics" envconfig:"API_ENABLE_METRICS"`
309+
EnablePprof bool `yaml:"enable_pprof" envconfig:"API_ENABLE_PPROF"`
310+
Username string `yaml:"username" envconfig:"API_USERNAME"`
311+
Password string `yaml:"password" envconfig:"API_PASSWORD"`
312+
Secure bool `yaml:"secure" envconfig:"API_SECURE"`
313+
CertificateFile string `yaml:"certificate_file" envconfig:"API_CERTIFICATE_FILE"`
314+
PrivateKeyFile string `yaml:"private_key_file" envconfig:"API_PRIVATE_KEY_FILE"`
315+
CAKeyFile string `yaml:"ca_cert_file" envconfig:"API_CA_KEY_FILE"`
316+
CACertFile string `yaml:"ca_key_file" envconfig:"API_CA_CERT_FILE"`
317+
CreateIntegrationTables bool `yaml:"create_integration_tables" envconfig:"API_CREATE_INTEGRATION_TABLES"`
318+
IntegrationTablesHost string `yaml:"integration_tables_host" envconfig:"API_INTEGRATION_TABLES_HOST"`
319+
AllowParallel bool `yaml:"allow_parallel" envconfig:"API_ALLOW_PARALLEL"`
320+
CompleteResumableAfterRestart bool `yaml:"complete_resumable_after_restart" envconfig:"API_COMPLETE_RESUMABLE_AFTER_RESTART"`
321+
CompleteResumableAfterRestartCommands []string `yaml:"complete_resumable_after_restart_commands" envconfig:"API_COMPLETE_RESUMABLE_AFTER_RESTART_COMMANDS"`
322+
WatchIsMainProcess bool `yaml:"watch_is_main_process" envconfig:"WATCH_IS_MAIN_PROCESS"`
322323
// BackupActionsSkipCommands - commands that should not be tracked in system.backup_actions
323324
// (the in-memory async status list). Useful to exclude high-frequency monitoring calls
324325
// like "list" from growing the actions state. See https://github.com/Altinity/clickhouse-backup/issues/1359
@@ -343,6 +344,17 @@ func (cfg *APIConfig) IsBackupActionsSkipCommand(command string) bool {
343344
return false
344345
}
345346

347+
// IsCompleteResumableAfterRestartCommand returns true if the given command may
348+
// be resumed automatically after API server restart.
349+
func (cfg *APIConfig) IsCompleteResumableAfterRestartCommand(command string) bool {
350+
for _, c := range cfg.CompleteResumableAfterRestartCommands {
351+
if c == command {
352+
return true
353+
}
354+
}
355+
return false
356+
}
357+
346358
// ArchiveExtensions - list of available compression formats and associated file extensions
347359
var ArchiveExtensions = map[string]string{
348360
"tar": "tar",
@@ -746,11 +758,12 @@ func DefaultConfig() *Config {
746758
DeleteConcurrency: 50,
747759
},
748760
API: APIConfig{
749-
ListenAddr: "localhost:7171",
750-
EnableMetrics: true,
751-
CompleteResumableAfterRestart: true,
752-
CancelOperationTimeout: "1800s",
753-
CancelOperationTimeoutDuration: 1800 * time.Second,
761+
ListenAddr: "localhost:7171",
762+
EnableMetrics: true,
763+
CompleteResumableAfterRestart: true,
764+
CompleteResumableAfterRestartCommands: []string{"upload", "download"},
765+
CancelOperationTimeout: "1800s",
766+
CancelOperationTimeoutDuration: 1800 * time.Second,
754767
},
755768
FTP: FTPConfig{
756769
Timeout: "2m",

pkg/config/config_test.go

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
package config
2+
3+
import "testing"
4+
5+
func TestDefaultCompleteResumableAfterRestartCommands(t *testing.T) {
6+
cfg := DefaultConfig()
7+
8+
for _, command := range []string{"upload", "download"} {
9+
if !cfg.API.IsCompleteResumableAfterRestartCommand(command) {
10+
t.Fatalf("expected %q to be allowed for automatic resume after restart", command)
11+
}
12+
}
13+
14+
for _, command := range []string{"create", "restore"} {
15+
if cfg.API.IsCompleteResumableAfterRestartCommand(command) {
16+
t.Fatalf("expected %q to require explicit opt-in for automatic resume after restart", command)
17+
}
18+
}
19+
}

pkg/server/server.go

Lines changed: 120 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -2426,67 +2426,160 @@ func (api *APIServer) ResumeOperationsAfterRestart() error {
24262426
stateFiles = append(stateFiles, embeddedStateFiles...)
24272427
for _, stateFile := range stateFiles {
24282428
command := strings.TrimSuffix(filepath.Base(stateFile), ".state2")
2429+
if !api.GetConfig().API.IsCompleteResumableAfterRestartCommand(command) {
2430+
log.Warn().Str("operation", "ResumeOperationsAfterRestart").Msgf("skip %s: command %q is not allowed by api.complete_resumable_after_restart_commands", stateFile, command)
2431+
continue
2432+
}
24292433
state := resumable.NewState(strings.TrimSuffix(filepath.Dir(stateFile), filepath.Join("backup", backupName)), backupName, command, nil)
24302434
params := state.GetParams()
24312435
state.Close()
24322436
if !api.GetConfig().API.AllowParallel && status.Current.InProgress() {
24332437
return errors.New("another commands in progress")
24342438
}
2439+
args := []string{command}
24352440
switch command {
2441+
case "create":
2442+
if diffFromRemote := resumableStringParam(params, "diffFromRemote"); diffFromRemote != "" {
2443+
args = append(args, fmt.Sprintf("--diff-from-remote=%s", diffFromRemote))
2444+
}
2445+
2446+
if tablePattern := resumableStringParam(params, "tablePattern"); tablePattern != "" {
2447+
args = append(args, fmt.Sprintf("--tables=%s", tablePattern))
2448+
}
2449+
2450+
if resumableBoolParam(params, "schemaOnly") {
2451+
args = append(args, "--schema=1")
2452+
}
2453+
2454+
if partitions := resumableStringSliceParam(params, "partitions"); len(partitions) > 0 {
2455+
partitionsStr := make([]string, len(partitions))
2456+
for j, v := range partitions {
2457+
partitionsStr[j] = fmt.Sprintf("--partitions=%s", v)
2458+
}
2459+
args = append(args, partitionsStr...)
2460+
}
24362461
case "download":
24372462
case "upload":
2438-
args := make([]string, 0)
2439-
args = append(args, command)
2440-
if diffFrom, ok := params["diffFrom"]; ok && diffFrom.(string) != "" {
2441-
args = append(args, fmt.Sprintf("--diff-from=\"%s\"", diffFrom))
2463+
if diffFrom := resumableStringParam(params, "diffFrom"); diffFrom != "" {
2464+
args = append(args, fmt.Sprintf("--diff-from=%s", diffFrom))
24422465
}
2443-
if diffFromRemote, ok := params["diffFromRemote"]; ok && diffFromRemote.(string) != "" {
2444-
args = append(args, fmt.Sprintf("--diff-from-remote=\"%s\"", diffFromRemote))
2466+
if diffFromRemote := resumableStringParam(params, "diffFromRemote"); diffFromRemote != "" {
2467+
args = append(args, fmt.Sprintf("--diff-from-remote=%s", diffFromRemote))
24452468
}
24462469

2447-
if tablePattern, ok := params["tablePattern"]; ok && tablePattern.(string) != "" {
2448-
args = append(args, fmt.Sprintf("--tables=\"%s\"", tablePattern))
2470+
if tablePattern := resumableStringParam(params, "tablePattern"); tablePattern != "" {
2471+
args = append(args, fmt.Sprintf("--tables=%s", tablePattern))
24492472
}
24502473

2451-
if schemaOnly, ok := params["schemaOnly"]; ok && schemaOnly.(bool) {
2474+
if resumableBoolParam(params, "schemaOnly") {
24522475
args = append(args, "--schema=1")
24532476
}
24542477

2455-
if partitions, ok := params["partitions"]; ok && len(partitions.([]interface{})) > 0 {
2456-
partitionsStr := make([]string, len(partitions.([]interface{})))
2457-
for j, v := range partitions.([]interface{}) {
2458-
partitionsStr[j] = fmt.Sprintf("--partitions=\"%s\"", v.(string))
2478+
if partitions := resumableStringSliceParam(params, "partitions"); len(partitions) > 0 {
2479+
partitionsStr := make([]string, len(partitions))
2480+
for j, v := range partitions {
2481+
partitionsStr[j] = fmt.Sprintf("--partitions=%s", v)
24592482
}
24602483
args = append(args, partitionsStr...)
24612484
}
2462-
args = append(args, "--resumable=1", backupName)
2463-
fullCommand := strings.Join(args, " ")
2464-
log.Info().Str("operation", "ResumeOperationsAfterRestart").Send()
2465-
commandId, _ := status.Current.Start(fullCommand)
2466-
err, _ = api.metrics.ExecuteWithMetrics(command, 0, func() error {
2467-
return api.cliApp.Run(append([]string{"clickhouse-backup", "-c", api.configPath, "--command-id", strconv.FormatInt(int64(commandId), 10)}, args...))
2468-
})
2469-
status.Current.Stop(commandId, err)
2470-
if err != nil {
2471-
return errors.WithStack(err)
2485+
case "restore":
2486+
if tablePattern := resumableStringParam(params, "tablePattern"); tablePattern != "" {
2487+
args = append(args, fmt.Sprintf("--tables=%s", tablePattern))
2488+
}
2489+
2490+
if resumableBoolParam(params, "schemaOnly") {
2491+
args = append(args, "--schema=1")
2492+
}
2493+
2494+
if resumableBoolParam(params, "dataOnly") {
2495+
args = append(args, "--data=1")
24722496
}
24732497

2474-
if err = os.Remove(stateFile); err != nil {
2475-
if api.GetConfig().General.BackupsToKeepLocal >= 0 {
2476-
return errors.WithStack(err)
2498+
if resumableBoolParam(params, "dropExists") {
2499+
args = append(args, "--drop=1")
2500+
}
2501+
2502+
if partitions := resumableStringSliceParam(params, "partitions"); len(partitions) > 0 {
2503+
partitionsStr := make([]string, len(partitions))
2504+
for j, v := range partitions {
2505+
partitionsStr[j] = fmt.Sprintf("--partitions=%s", v)
24772506
}
2478-
log.Warn().Str("operation", "ResumeOperationsAfterRestart").Msgf("remove %s return error: ", err)
2507+
args = append(args, partitionsStr...)
24792508
}
24802509
default:
24812510
return errors.Errorf("unkown command for state file %s", stateFile)
24822511
}
2512+
args = append(args, "--resumable=1", backupName)
2513+
fullCommand := strings.Join(args, " ")
2514+
log.Info().Str("operation", "ResumeOperationsAfterRestart").Send()
2515+
commandId, _ := status.Current.Start(fullCommand)
2516+
err, _ = api.metrics.ExecuteWithMetrics(command, 0, func() error {
2517+
return api.cliApp.Run(append([]string{"clickhouse-backup", "-c", api.configPath, "--command-id", strconv.FormatInt(int64(commandId), 10)}, args...))
2518+
})
2519+
status.Current.Stop(commandId, err)
2520+
if err != nil {
2521+
return errors.WithStack(err)
2522+
}
2523+
2524+
if err = os.Remove(stateFile); err != nil {
2525+
if api.GetConfig().General.BackupsToKeepLocal >= 0 {
2526+
return errors.WithStack(err)
2527+
}
2528+
log.Warn().Str("operation", "ResumeOperationsAfterRestart").Msgf("remove %s return error: ", err)
2529+
}
24832530
}
24842531
}
24852532
}
24862533

24872534
return nil
24882535
}
24892536

2537+
func resumableStringParam(params map[string]interface{}, key string) string {
2538+
if params == nil {
2539+
return ""
2540+
}
2541+
v, ok := params[key]
2542+
if !ok {
2543+
return ""
2544+
}
2545+
s, _ := v.(string)
2546+
return s
2547+
}
2548+
2549+
func resumableBoolParam(params map[string]interface{}, key string) bool {
2550+
if params == nil {
2551+
return false
2552+
}
2553+
v, ok := params[key]
2554+
if !ok {
2555+
return false
2556+
}
2557+
b, _ := v.(bool)
2558+
return b
2559+
}
2560+
2561+
func resumableStringSliceParam(params map[string]interface{}, key string) []string {
2562+
if params == nil {
2563+
return nil
2564+
}
2565+
v, ok := params[key]
2566+
if !ok {
2567+
return nil
2568+
}
2569+
values, ok := v.([]interface{})
2570+
if !ok {
2571+
return nil
2572+
}
2573+
result := make([]string, 0, len(values))
2574+
for _, value := range values {
2575+
s, ok := value.(string)
2576+
if ok {
2577+
result = append(result, s)
2578+
}
2579+
}
2580+
return result
2581+
}
2582+
24902583
func (api *APIServer) getQueryParameter(q url.Values, paramName string) (string, bool) {
24912584
paramNames := []string{strings.Replace(paramName, "-", "_", -1), strings.Replace(paramName, "_", "-", -1)}
24922585
for _, name := range paramNames {

test/integration/configs/dynamic_settings.sh

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -199,6 +199,43 @@ fi
199199

200200
fi
201201

202+
if [[ "${CLICKHOUSE_VERSION}" == "head" || "${CLICKHOUSE_VERSION}" =~ ^21\.[8-9]|^21\.[0-9]{2} || "${CLICKHOUSE_VERSION}" =~ ^2[2-9]\.[0-9]+ || "${CLICKHOUSE_VERSION}" =~ ^[3-9] ]]; then
203+
204+
if [[ "" != "${QA_TENCENT_SECRET_ID}" && "" != "${QA_TENCENT_SECRET_KEY}" ]]; then
205+
COS_DISK_TYPE="<type>s3</type>"
206+
if [[ "${CLICKHOUSE_VERSION}" == "head" || "${CLICKHOUSE_VERSION}" =~ ^2[5-9]\.[0-9]+ || "${CLICKHOUSE_VERSION}" =~ ^[3-9] ]]; then
207+
COS_DISK_TYPE="<type>object_storage</type><object_storage_type>s3</object_storage_type><metadata_type>local</metadata_type>"
208+
fi
209+
210+
cat <<EOT > /etc/clickhouse-server/config.d/storage_configuration_cos.xml
211+
<yandex>
212+
<storage_configuration>
213+
<disks>
214+
<disk_cos>
215+
${COS_DISK_TYPE}
216+
<endpoint>https://clickhouse-backup-1336113806.cos.na-ashburn.myqcloud.com/disk_cos/{cluster}/{shard}/</endpoint>
217+
<access_key_id>${QA_TENCENT_SECRET_ID}</access_key_id>
218+
<secret_access_key>${QA_TENCENT_SECRET_KEY}</secret_access_key>
219+
<send_metadata>false</send_metadata>
220+
</disk_cos>
221+
</disks>
222+
<policies>
223+
<cos_only>
224+
<volumes>
225+
<cos_only>
226+
<disk>disk_cos</disk>
227+
</cos_only>
228+
</volumes>
229+
</cos_only>
230+
</policies>
231+
</storage_configuration>
232+
</yandex>
233+
EOT
234+
235+
fi
236+
237+
fi
238+
202239
if [[ "${CLICKHOUSE_VERSION}" == "head" || "${CLICKHOUSE_VERSION}" =~ ^21\.12 || "${CLICKHOUSE_VERSION}" =~ ^2[2-9]\.[0-9]+ || "${CLICKHOUSE_VERSION}" =~ ^[3-9] ]]; then
203240

204241
if [[ -f /var/lib/clickhouse/storage_configuration_encrypted_s3.xml ]]; then

0 commit comments

Comments
 (0)