Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions components/dm/spec/logic.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ func (i *MasterInstance) InitConfig(
clusterVersion,
deployUser string,
paths meta.DirPaths,
opt spec.InstanceOpt,
) error {
if err := i.BaseInstance.InitConfig(ctx, e, i.topo.GlobalOptions, deployUser, paths); err != nil {
return err
Expand Down Expand Up @@ -234,8 +235,9 @@ func (i *MasterInstance) ScaleConfig(
clusterVersion,
deployUser string,
paths meta.DirPaths,
opt spec.InstanceOpt,
) error {
if err := i.InitConfig(ctx, e, clusterName, clusterVersion, deployUser, paths); err != nil {
if err := i.InitConfig(ctx, e, clusterName, clusterVersion, deployUser, paths, opt); err != nil {
return err
}

Expand Down Expand Up @@ -361,6 +363,7 @@ func (i *WorkerInstance) InitConfig(
clusterVersion,
deployUser string,
paths meta.DirPaths,
opt spec.InstanceOpt,
) error {
if err := i.BaseInstance.InitConfig(ctx, e, i.topo.GlobalOptions, deployUser, paths); err != nil {
return err
Expand Down Expand Up @@ -455,13 +458,14 @@ func (i *WorkerInstance) ScaleConfig(
clusterVersion,
deployUser string,
paths meta.DirPaths,
opt spec.InstanceOpt,
) error {
s := i.topo
defer func() {
i.topo = s
}()
i.topo = topo.(*Specification)
return i.InitConfig(ctx, e, clusterName, clusterVersion, deployUser, paths)
return i.InitConfig(ctx, e, clusterName, clusterVersion, deployUser, paths, opt)
}

// GetGlobalOptions returns cluster topology
Expand Down
1 change: 1 addition & 0 deletions pkg/cluster/manager/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -689,6 +689,7 @@ func buildInitConfigTasks(
Log: logDir,
Cache: m.specManager.Path(name, spec.TempConfigPath),
},
spec.InstanceOpt{},
).
BuildAsStep(fmt.Sprintf(" - Generate config %s -> %s", compName, instance.ID()))
tasks = append(tasks, t)
Expand Down
18 changes: 8 additions & 10 deletions pkg/cluster/manager/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,19 +23,17 @@ import (
)

func TestVersionCompare(t *testing.T) {
var err error
res := versionCompare("v4.0.0", "v4.0.1")
assert.Equal(t, res, -1)

err = versionCompare("v4.0.0", "v4.0.1")
assert.Nil(t, err)
res = versionCompare("v4.0.1", "v4.0.0")
assert.Equal(t, res, 1)

err = versionCompare("v4.0.1", "v4.0.0")
assert.NotNil(t, err)
res = versionCompare("v4.0.0", "nightly")
assert.Equal(t, res, -1)

err = versionCompare("v4.0.0", "nightly")
assert.Nil(t, err)

err = versionCompare("nightly", "nightly")
assert.Nil(t, err)
res = versionCompare("nightly", "nightly")
assert.Equal(t, res, 0)
}

func TestValidateNewTopo(t *testing.T) {
Expand Down
33 changes: 20 additions & 13 deletions pkg/cluster/manager/upgrade.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ import (
"golang.org/x/mod/semver"
)

const (
cdcNewArchVersion = "v9.0.0"
)

func (m *Manager) upgradePrecheck(name string, componentVersions map[string]string, opt operator.Options, skipConfirm bool) error {
if !skipConfirm && strings.ToLower(opt.DisplayMode) != "json" {
for _, v := range componentVersions {
Expand Down Expand Up @@ -86,9 +90,9 @@ func (m *Manager) Upgrade(name string, clusterVersion string, componentVersions
uniqueComps = map[string]struct{}{}
)

if err := versionCompare(base.Version, clusterVersion); err != nil {
if versionCompare(base.Version, clusterVersion) == 1 {
if !ignoreVersionCheck {
return err
return perrs.Errorf("please specify a higher or equle version than %s", base.Version)
}
m.logger.Warnf("%s", color.RedString("There is no guarantee that the cluster can be downgraded. Be careful before you continue."))
}
Expand Down Expand Up @@ -143,8 +147,8 @@ This operation will upgrade %s %s cluster %s (with a concurrency of %d) to %s:%s

hasImported := false
for _, comp := range components {
oldver := comp.CalculateVersion(base.Version)
version := comp.CalculateVersion(clusterVersion)

for _, inst := range comp.Instances() {
// Download component from repository
key := fmt.Sprintf("%s-%s-%s-%s", inst.ComponentSource(), version, inst.OS(), inst.Arch())
Expand Down Expand Up @@ -217,6 +221,10 @@ This operation will upgrade %s %s cluster %s (with a concurrency of %d) to %s:%s
}
}

checkCDCNewArch := false
if versionCompare(oldver, cdcNewArchVersion) == -1 && versionCompare(cdcNewArchVersion, clusterVersion) > -1 {
checkCDCNewArch = true
}
tb.InitConfig(
name,
clusterVersion,
Expand All @@ -230,6 +238,7 @@ This operation will upgrade %s %s cluster %s (with a concurrency of %d) to %s:%s
Log: logDir,
Cache: m.specManager.Path(name, spec.TempConfigPath),
},
spec.InstanceOpt{CheckCDCNewArch: checkCDCNewArch},
)
copyCompTasks = append(copyCompTasks, tb.Build())
}
Expand Down Expand Up @@ -372,18 +381,16 @@ This operation will upgrade %s %s cluster %s (with a concurrency of %d) to %s:%s
return nil
}

func versionCompare(curVersion, newVersion string) error {
// versionCompare returns an integer comparing two versions according to semantic version precedence.
// The result will be 0 if curVersion == newVersion, -1 if curVersion < newVersion, or +1 if curVersion > newVersion.
func versionCompare(curVersion, newVersion string) int {
if newVersion == curVersion {
return 0
}
// Can always upgrade to 'nightly' event the current version is 'nightly'
if newVersion == utils.NightlyVersionAlias {
return nil
return -1
}

switch semver.Compare(curVersion, newVersion) {
case -1, 0:
return nil
case 1:
return perrs.Errorf("please specify a higher or equle version than %s", curVersion)
default:
return perrs.Errorf("unreachable")
}
return semver.Compare(curVersion, newVersion)
}
4 changes: 3 additions & 1 deletion pkg/cluster/spec/alertmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ func (i *AlertManagerInstance) InitConfig(
clusterVersion,
deployUser string,
paths meta.DirPaths,
opt InstanceOpt,
) error {
gOpts := *i.topo.BaseTopo().GlobalOptions
if err := i.BaseInstance.InitConfig(ctx, e, gOpts, deployUser, paths); err != nil {
Expand Down Expand Up @@ -247,11 +248,12 @@ func (i *AlertManagerInstance) ScaleConfig(
clusterVersion string,
deployUser string,
paths meta.DirPaths,
opt InstanceOpt,
) error {
s := i.topo
defer func() { i.topo = s }()
i.topo = topo
return i.InitConfig(ctx, e, clusterName, clusterVersion, deployUser, paths)
return i.InitConfig(ctx, e, clusterName, clusterVersion, deployUser, paths, opt)
}

// setTLSConfig set TLS Config to support enable/disable TLS
Expand Down
4 changes: 3 additions & 1 deletion pkg/cluster/spec/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,14 +182,15 @@ func (i *CDCInstance) ScaleConfig(
clusterVersion,
user string,
paths meta.DirPaths,
opt InstanceOpt,
) error {
s := i.topo
defer func() {
i.topo = s
}()
i.topo = mustBeClusterTopo(topo)

return i.InitConfig(ctx, e, clusterName, clusterVersion, user, paths)
return i.InitConfig(ctx, e, clusterName, clusterVersion, user, paths, opt)
}

// InitConfig implements Instance interface.
Expand All @@ -200,6 +201,7 @@ func (i *CDCInstance) InitConfig(
clusterVersion,
deployUser string,
paths meta.DirPaths,
opt InstanceOpt,
) error {
topo := i.topo.(*Specification)
if err := i.BaseInstance.InitConfig(ctx, e, topo.GlobalOptions, deployUser, paths); err != nil {
Expand Down
4 changes: 3 additions & 1 deletion pkg/cluster/spec/dashboard.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,14 +178,15 @@ func (i *DashboardInstance) ScaleConfig(
clusterVersion,
user string,
paths meta.DirPaths,
opt InstanceOpt,
) error {
s := i.topo
defer func() {
i.topo = s
}()
i.topo = mustBeClusterTopo(topo)

return i.InitConfig(ctx, e, clusterName, clusterVersion, user, paths)
return i.InitConfig(ctx, e, clusterName, clusterVersion, user, paths, opt)
}

// InitConfig implements Instance interface.
Expand All @@ -196,6 +197,7 @@ func (i *DashboardInstance) InitConfig(
clusterVersion,
deployUser string,
paths meta.DirPaths,
opt InstanceOpt,
) error {
topo := i.topo.(*Specification)
if err := i.BaseInstance.InitConfig(ctx, e, topo.GlobalOptions, deployUser, paths); err != nil {
Expand Down
4 changes: 3 additions & 1 deletion pkg/cluster/spec/drainer.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,14 +197,15 @@ func (i *DrainerInstance) ScaleConfig(
clusterVersion,
user string,
paths meta.DirPaths,
opt InstanceOpt,
) error {
s := i.topo
defer func() {
i.topo = s
}()
i.topo = mustBeClusterTopo(topo)

return i.InitConfig(ctx, e, clusterName, clusterVersion, user, paths)
return i.InitConfig(ctx, e, clusterName, clusterVersion, user, paths, opt)
}

// InitConfig implements Instance interface.
Expand All @@ -215,6 +216,7 @@ func (i *DrainerInstance) InitConfig(
clusterVersion,
deployUser string,
paths meta.DirPaths,
opt InstanceOpt,
) error {
topo := i.topo.(*Specification)
if err := i.BaseInstance.InitConfig(ctx, e, topo.GlobalOptions, deployUser, paths); err != nil {
Expand Down
4 changes: 3 additions & 1 deletion pkg/cluster/spec/grafana.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,7 @@ func (i *GrafanaInstance) InitConfig(
clusterVersion,
deployUser string,
paths meta.DirPaths,
opt InstanceOpt,
) error {
gOpts := *i.topo.BaseTopo().GlobalOptions
if err := i.BaseInstance.InitConfig(ctx, e, gOpts, deployUser, paths); err != nil {
Expand Down Expand Up @@ -453,11 +454,12 @@ func (i *GrafanaInstance) ScaleConfig(
clusterVersion string,
deployUser string,
paths meta.DirPaths,
opt InstanceOpt,
) error {
s := i.topo
defer func() { i.topo = s }()
i.topo = topo.Merge(i.topo)
return i.InitConfig(ctx, e, clusterName, clusterVersion, deployUser, paths)
return i.InitConfig(ctx, e, clusterName, clusterVersion, deployUser, paths, opt)
}

func mergeAdditionalGrafanaConf(source string, addition map[string]string) error {
Expand Down
6 changes: 3 additions & 3 deletions pkg/cluster/spec/grafana_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ func TestGrafanaDatasourceConfig(t *testing.T) {

// Test datasource configuration
clusterName := "test-cluster"
err := grafanaInstance.InitConfig(ctxt.New(ctx, 0, logprinter.NewLogger("")), mockExec, clusterName, "v5.4.0", "tidb", paths)
err := grafanaInstance.InitConfig(ctxt.New(ctx, 0, logprinter.NewLogger("")), mockExec, clusterName, "v5.4.0", "tidb", paths, InstanceOpt{})
require.NoError(t, err)

// Verify the datasource configuration file
Expand All @@ -247,7 +247,7 @@ func TestGrafanaDatasourceConfig(t *testing.T) {

// Test without VM remote write enabled
topo.Monitors[0].PromRemoteWriteToVM = false
err = grafanaInstance.InitConfig(ctxt.New(ctx, 0, logprinter.NewLogger("")), mockExec, clusterName, "v5.4.0", "tidb", paths)
err = grafanaInstance.InitConfig(ctxt.New(ctx, 0, logprinter.NewLogger("")), mockExec, clusterName, "v5.4.0", "tidb", paths, InstanceOpt{})
require.NoError(t, err)

// Verify the datasource configuration file again
Expand Down Expand Up @@ -400,7 +400,7 @@ func TestVictoriaMetricsDefaultDatasource(t *testing.T) {
grafanaInstance := comp.Instances()[0].(*GrafanaInstance)

// Run InitConfig which will process dashboards
err = grafanaInstance.InitConfig(ctxt.New(ctx, 0, logprinter.NewLogger("")), origExecutor, "test-cluster", "v5.4.0", "tidb", paths)
err = grafanaInstance.InitConfig(ctxt.New(ctx, 0, logprinter.NewLogger("")), origExecutor, "test-cluster", "v5.4.0", "tidb", paths, InstanceOpt{})
require.NoError(t, err)

// Check if the dashboard file was created and datasource references were updated
Expand Down
12 changes: 9 additions & 3 deletions pkg/cluster/spec/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,8 @@ type Instance interface {
InstanceSpec
ID() string
Ready(context.Context, ctxt.Executor, uint64, *tls.Config) error
InitConfig(ctx context.Context, e ctxt.Executor, clusterName string, clusterVersion string, deployUser string, paths meta.DirPaths) error
ScaleConfig(ctx context.Context, e ctxt.Executor, topo Topology, clusterName string, clusterVersion string, deployUser string, paths meta.DirPaths) error
InitConfig(ctx context.Context, e ctxt.Executor, clusterName string, clusterVersion string, deployUser string, paths meta.DirPaths, opt InstanceOpt) error
ScaleConfig(ctx context.Context, e ctxt.Executor, topo Topology, clusterName string, clusterVersion string, deployUser string, paths meta.DirPaths, opt InstanceOpt) error
PrepareStart(ctx context.Context, tlsCfg *tls.Config) error
ComponentName() string
ComponentSource() string
Expand Down Expand Up @@ -149,6 +149,11 @@ func PortStopped(ctx context.Context, e ctxt.Executor, port int, timeout uint64)
return w.Execute(ctx, e)
}

// InstanceOpt can be used to store options when initializing the configuration.
type InstanceOpt struct {
CheckCDCNewArch bool
}

// BaseInstance implements some method of Instance interface..
type BaseInstance struct {
InstanceSpec
Expand All @@ -168,7 +173,8 @@ type BaseInstance struct {
StatusFn func(ctx context.Context, timeout time.Duration, tlsCfg *tls.Config, pdHosts ...string) string
UptimeFn func(ctx context.Context, timeout time.Duration, tlsCfg *tls.Config) time.Duration

Component Component
Component Component
InstanceOpt InstanceOpt
}

// Ready implements Instance interface
Expand Down
4 changes: 3 additions & 1 deletion pkg/cluster/spec/monitoring.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,7 @@ func (i *MonitorInstance) InitConfig(
clusterVersion,
deployUser string,
paths meta.DirPaths,
opt InstanceOpt,
) error {
gOpts := *i.topo.BaseTopo().GlobalOptions
if err := i.BaseInstance.InitConfig(ctx, e, gOpts, deployUser, paths); err != nil {
Expand Down Expand Up @@ -651,11 +652,12 @@ func (i *MonitorInstance) ScaleConfig(
clusterVersion string,
deployUser string,
paths meta.DirPaths,
opt InstanceOpt,
) error {
s := i.topo
defer func() { i.topo = s }()
i.topo = topo
return i.InitConfig(ctx, e, clusterName, clusterVersion, deployUser, paths)
return i.InitConfig(ctx, e, clusterName, clusterVersion, deployUser, paths, opt)
}

func mergeAdditionalScrapeConf(source string, addition map[string]any) error {
Expand Down
4 changes: 3 additions & 1 deletion pkg/cluster/spec/pd.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,7 @@ func (i *PDInstance) InitConfig(
clusterVersion,
deployUser string,
paths meta.DirPaths,
opt InstanceOpt,
) error {
topo := i.topo.(*Specification)
if err := i.BaseInstance.InitConfig(ctx, e, topo.GlobalOptions, deployUser, paths); err != nil {
Expand Down Expand Up @@ -355,9 +356,10 @@ func (i *PDInstance) ScaleConfig(
clusterVersion,
deployUser string,
paths meta.DirPaths,
opt InstanceOpt,
) error {
// We need pd.toml here, but we don't need to check it
if err := i.InitConfig(ctx, e, clusterName, clusterVersion, deployUser, paths); err != nil &&
if err := i.InitConfig(ctx, e, clusterName, clusterVersion, deployUser, paths, opt); err != nil &&
errors.Cause(err) != ErrorCheckConfig {
return err
}
Expand Down
Loading
Loading