Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions doc/command-line-flags.md
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,16 @@ See also: [`resuming-migrations`](resume.md)

`--checkpoint-seconds` specifies the seconds between checkpoints. Default is 300.

### chunk-concurrent-size

`--chunk-concurrent-size=1`, the number of goroutines to execute chunk-copy operations concurrently in each copy time slot. Default `1` (sequential). Minimum `1`.

When set to a value greater than 1, multiple chunks are calculated and copied in parallel within each write-function invocation. This can significantly speed up row-copy on large tables when MySQL can handle concurrent writes to the ghost table.

Each concurrent chunk calculates its own non-overlapping key range under a serialization lock, so there is no risk of duplicate or overlapping copies.

Note: concurrency multiplies write pressure per time slot. Throttling (`--max-load`, `--nice-ratio`) applies per batch, not per chunk. Start with small values (2-8) and monitor replication lag.

### conf

`--conf=/path/to/my.cnf`: file where credentials are specified. Should be in (or contain) the following format:
Expand Down
109 changes: 65 additions & 44 deletions go/base/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,16 @@ import (
"github.com/go-ini/ini"
)

// IterationRangeValues holds the range boundaries for a single chunk-copy iteration.
// Used by concurrent row-copy to pass isolated range values to each worker goroutine.
type IterationRangeValues struct {
Min *sql.ColumnValues
Max *sql.ColumnValues
Size int64
IncludeMinValues bool
HasFurtherRange bool
}

// RowsEstimateMethod is the type of row number estimation
type RowsEstimateMethod string

Expand Down Expand Up @@ -130,6 +140,7 @@ type MigrationContext struct {
HeartbeatIntervalMilliseconds int64
defaultNumRetries int64
ChunkSize int64
ChunkConcurrentSize int64
niceRatio float64
MaxLagMillisecondsThrottleThreshold int64
throttleControlReplicaKeys *mysql.InstanceKeyMap
Expand Down Expand Up @@ -237,27 +248,28 @@ type MigrationContext struct {
AbortError error
abortMutex *sync.Mutex

OriginalTableColumnsOnApplier *sql.ColumnList
OriginalTableColumns *sql.ColumnList
OriginalTableVirtualColumns *sql.ColumnList
OriginalTableUniqueKeys [](*sql.UniqueKey)
OriginalTableAutoIncrement uint64
GhostTableColumns *sql.ColumnList
GhostTableVirtualColumns *sql.ColumnList
GhostTableUniqueKeys [](*sql.UniqueKey)
UniqueKey *sql.UniqueKey
SharedColumns *sql.ColumnList
ColumnRenameMap map[string]string
DroppedColumnsMap map[string]bool
MappedSharedColumns *sql.ColumnList
MigrationLastInsertSQLWarnings []string
MigrationRangeMinValues *sql.ColumnValues
MigrationRangeMaxValues *sql.ColumnValues
Iteration int64
MigrationIterationRangeMinValues *sql.ColumnValues
MigrationIterationRangeMaxValues *sql.ColumnValues
InitialStreamerCoords mysql.BinlogCoordinates
ForceTmpTableName string
OriginalTableColumnsOnApplier *sql.ColumnList
OriginalTableColumns *sql.ColumnList
OriginalTableVirtualColumns *sql.ColumnList
OriginalTableUniqueKeys [](*sql.UniqueKey)
OriginalTableAutoIncrement uint64
GhostTableColumns *sql.ColumnList
GhostTableVirtualColumns *sql.ColumnList
GhostTableUniqueKeys [](*sql.UniqueKey)
UniqueKey *sql.UniqueKey
SharedColumns *sql.ColumnList
ColumnRenameMap map[string]string
DroppedColumnsMap map[string]bool
MappedSharedColumns *sql.ColumnList
MigrationLastInsertSQLWarnings []string
MigrationRangeMinValues *sql.ColumnValues
MigrationRangeMaxValues *sql.ColumnValues
Iteration int64
MigrationIterationRangeMinValues *sql.ColumnValues
MigrationIterationRangeMaxValues *sql.ColumnValues
CalculateNextIterationRangeEndValuesLock *sync.Mutex
InitialStreamerCoords mysql.BinlogCoordinates
ForceTmpTableName string

IncludeTriggers bool
RemoveTriggerSuffix bool
Expand Down Expand Up @@ -307,29 +319,31 @@ type ContextConfig struct {
func NewMigrationContext() *MigrationContext {
ctx, cancelFunc := context.WithCancel(context.Background())
return &MigrationContext{
Uuid: uuid.NewString(),
defaultNumRetries: 60,
ChunkSize: 1000,
InspectorConnectionConfig: mysql.NewConnectionConfig(),
ApplierConnectionConfig: mysql.NewConnectionConfig(),
MaxLagMillisecondsThrottleThreshold: 1500,
CutOverLockTimeoutSeconds: 3,
DMLBatchSize: 10,
etaNanoseonds: ETAUnknown,
maxLoad: NewLoadMap(),
criticalLoad: NewLoadMap(),
throttleMutex: &sync.Mutex{},
throttleHTTPMutex: &sync.Mutex{},
throttleControlReplicaKeys: mysql.NewInstanceKeyMap(),
configMutex: &sync.Mutex{},
pointOfInterestTimeMutex: &sync.Mutex{},
lastHeartbeatOnChangelogMutex: &sync.Mutex{},
ColumnRenameMap: make(map[string]string),
PanicAbort: make(chan error),
ctx: ctx,
cancelFunc: cancelFunc,
abortMutex: &sync.Mutex{},
Log: NewDefaultLogger(),
Uuid: uuid.NewString(),
defaultNumRetries: 60,
ChunkSize: 1000,
ChunkConcurrentSize: 1,
InspectorConnectionConfig: mysql.NewConnectionConfig(),
ApplierConnectionConfig: mysql.NewConnectionConfig(),
MaxLagMillisecondsThrottleThreshold: 1500,
CutOverLockTimeoutSeconds: 3,
DMLBatchSize: 10,
etaNanoseonds: ETAUnknown,
maxLoad: NewLoadMap(),
criticalLoad: NewLoadMap(),
throttleMutex: &sync.Mutex{},
throttleHTTPMutex: &sync.Mutex{},
throttleControlReplicaKeys: mysql.NewInstanceKeyMap(),
configMutex: &sync.Mutex{},
pointOfInterestTimeMutex: &sync.Mutex{},
lastHeartbeatOnChangelogMutex: &sync.Mutex{},
CalculateNextIterationRangeEndValuesLock: &sync.Mutex{},
ColumnRenameMap: make(map[string]string),
PanicAbort: make(chan error),
ctx: ctx,
cancelFunc: cancelFunc,
abortMutex: &sync.Mutex{},
Log: NewDefaultLogger(),
}
}

Expand Down Expand Up @@ -690,6 +704,13 @@ func (mctx *MigrationContext) SetChunkSize(chunkSize int64) {
atomic.StoreInt64(&mctx.ChunkSize, chunkSize)
}

func (mctx *MigrationContext) SetChunkConcurrentSize(chunkConcurrentSize int64) {
if chunkConcurrentSize < 1 {
chunkConcurrentSize = 1
}
atomic.StoreInt64(&mctx.ChunkConcurrentSize, chunkConcurrentSize)
}

func (mctx *MigrationContext) SetDMLBatchSize(batchSize int64) {
if batchSize < 1 {
batchSize = 1
Expand Down
2 changes: 2 additions & 0 deletions go/cmd/gh-ost/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ func main() {
flag.BoolVar(&migrationContext.CutOverExponentialBackoff, "cut-over-exponential-backoff", false, "Wait exponentially longer intervals between failed cut-over attempts. Wait intervals obey a maximum configurable with 'exponential-backoff-max-interval').")
exponentialBackoffMaxInterval := flag.Int64("exponential-backoff-max-interval", 64, "Maximum number of seconds to wait between attempts when performing various operations with exponential backoff.")
chunkSize := flag.Int64("chunk-size", 1000, "amount of rows to handle in each iteration (allowed range: 10-100,000)")
chunkConcurrentSize := flag.Int64("chunk-concurrent-size", 1, "number of goroutines to execute chunks concurrently in each copy time slot (range 1-100)")
dmlBatchSize := flag.Int64("dml-batch-size", 10, "batch size for DML events to apply in a single transaction (range 1-1000)")
defaultRetries := flag.Int64("default-retries", 60, "Default number of retries for various operations before panicking")
flag.BoolVar(&migrationContext.PanicOnWarnings, "panic-on-warnings", false, "Panic when SQL warnings are encountered when copying a batch indicating data loss")
Expand Down Expand Up @@ -355,6 +356,7 @@ func main() {
migrationContext.SetHeartbeatIntervalMilliseconds(*heartbeatIntervalMillis)
migrationContext.SetNiceRatio(*niceRatio)
migrationContext.SetChunkSize(*chunkSize)
migrationContext.SetChunkConcurrentSize(*chunkConcurrentSize)
migrationContext.SetDMLBatchSize(*dmlBatchSize)
migrationContext.SetMaxLagMillisecondsThrottleThreshold(*maxLagMillis)
migrationContext.SetThrottleQuery(*throttleQuery)
Expand Down
Loading