Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions go/base/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,8 @@ type MigrationContext struct {
recentBinlogCoordinates mysql.BinlogCoordinates

Log Logger

IsAddUniqueKey bool
}

type Logger interface {
Expand Down
23 changes: 18 additions & 5 deletions go/logic/applier.go
Original file line number Diff line number Diff line change
Expand Up @@ -478,6 +478,7 @@ func (this *Applier) ApplyIterationInsertQuery() (chunkSize int64, rowsAffected
this.migrationContext.MigrationIterationRangeMaxValues.AbstractValues(),
this.migrationContext.GetIteration() == 0,
this.migrationContext.IsTransactionalTable(),
this.migrationContext.IsAddUniqueKey,
)
if err != nil {
return chunkSize, rowsAffected, duration, err
Expand Down Expand Up @@ -959,7 +960,7 @@ func (this *Applier) updateModifiesUniqueKeyColumns(dmlEvent *binlog.BinlogDMLEv

// buildDMLEventQuery creates a query to operate on the ghost table, based on an intercepted binlog
// event entry on the original table.
func (this *Applier) buildDMLEventQuery(dmlEvent *binlog.BinlogDMLEvent) (results [](*dmlBuildResult)) {
func (this *Applier) buildDMLEventQuery(dmlEvent *binlog.BinlogDMLEvent, secondary bool) (results [](*dmlBuildResult)) {
switch dmlEvent.DML {
case binlog.DeleteDML:
{
Expand All @@ -968,16 +969,27 @@ func (this *Applier) buildDMLEventQuery(dmlEvent *binlog.BinlogDMLEvent) (result
}
case binlog.InsertDML:
{
query, sharedArgs, err := sql.BuildDMLInsertQuery(dmlEvent.DatabaseName, this.migrationContext.GetGhostTableName(), this.migrationContext.OriginalTableColumns, this.migrationContext.SharedColumns, this.migrationContext.MappedSharedColumns, dmlEvent.NewColumnValues.AbstractValues())
if this.migrationContext.IsAddUniqueKey && !secondary {
dmlEvent.DML = binlog.DeleteDML
dmlEvent.WhereColumnValues = dmlEvent.NewColumnValues
results = append(results, this.buildDMLEventQuery(dmlEvent, true)...)
dmlEvent.DML = binlog.InsertDML
results = append(results, this.buildDMLEventQuery(dmlEvent, true)...)
return results
}
query, sharedArgs, err := sql.BuildDMLInsertQuery(dmlEvent.DatabaseName, this.migrationContext.GetGhostTableName(), this.migrationContext.OriginalTableColumns, this.migrationContext.SharedColumns, this.migrationContext.MappedSharedColumns, dmlEvent.NewColumnValues.AbstractValues(), this.migrationContext.IsAddUniqueKey)
// query, sharedArgs, err := sql.BuildDMLInsertQuery(dmlEvent.DatabaseName, this.migrationContext.GetGhostTableName(), this.migrationContext.OriginalTableColumns, this.migrationContext.SharedColumns, this.migrationContext.MappedSharedColumns, dmlEvent.NewColumnValues.AbstractValues())
return append(results, newDmlBuildResult(query, sharedArgs, 1, err))
}
case binlog.UpdateDML:
{
if _, isModified := this.updateModifiesUniqueKeyColumns(dmlEvent); isModified {
dmlEvent.DML = binlog.DeleteDML
results = append(results, this.buildDMLEventQuery(dmlEvent)...)
// results = append(results, this.buildDMLEventQuery(dmlEvent)...)
results = append(results, this.buildDMLEventQuery(dmlEvent, true)...)
dmlEvent.DML = binlog.InsertDML
results = append(results, this.buildDMLEventQuery(dmlEvent)...)
// results = append(results, this.buildDMLEventQuery(dmlEvent)...)
results = append(results, this.buildDMLEventQuery(dmlEvent, true)...)
return results
}
query, sharedArgs, uniqueKeyArgs, err := sql.BuildDMLUpdateQuery(dmlEvent.DatabaseName, this.migrationContext.GetGhostTableName(), this.migrationContext.OriginalTableColumns, this.migrationContext.SharedColumns, this.migrationContext.MappedSharedColumns, &this.migrationContext.UniqueKey.Columns, dmlEvent.NewColumnValues.AbstractValues(), dmlEvent.WhereColumnValues.AbstractValues())
Expand Down Expand Up @@ -1018,7 +1030,8 @@ func (this *Applier) ApplyDMLEventQueries(dmlEvents [](*binlog.BinlogDMLEvent))
return rollback(err)
}
for _, dmlEvent := range dmlEvents {
for _, buildResult := range this.buildDMLEventQuery(dmlEvent) {
// for _, buildResult := range this.buildDMLEventQuery(dmlEvent) {
for _, buildResult := range this.buildDMLEventQuery(dmlEvent, false) {
if buildResult.err != nil {
return rollback(buildResult.err)
}
Expand Down
1 change: 1 addition & 0 deletions go/logic/migrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,7 @@ func (this *Migrator) validateStatement() (err error) {
this.migrationContext.Log.Infof("Alter statement has column(s) renamed. gh-ost finds the following renames: %v; --approve-renamed-columns is given and so migration proceeds.", this.parser.GetNonTrivialRenames())
}
this.migrationContext.DroppedColumnsMap = this.parser.DroppedColumnsMap()
this.migrationContext.IsAddUniqueKey = this.parser.IsAddUniqueKey()
return nil
}

Expand Down
90 changes: 69 additions & 21 deletions go/sql/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,18 +62,25 @@ func duplicateNames(names []string) []string {
return duplicate
}

func BuildValueComparison(column string, value string, comparisonSign ValueComparisonSign) (result string, err error) {
func BuildValueComparison(column string, value, alias string, comparisonSign ValueComparisonSign) (result string, err error) {
if column == "" {
return "", fmt.Errorf("Empty column in GetValueComparison")
}
if value == "" {
return "", fmt.Errorf("Empty value in GetValueComparison")
}
comparison := fmt.Sprintf("(%s %s %s)", EscapeName(column), string(comparisonSign), value)
// comparison := fmt.Sprintf("(%s %s %s)", EscapeName(column), string(comparisonSign), value)
var comparison string
if alias != "" {
comparison = fmt.Sprintf("(%s.%s %s %s)", alias, EscapeName(column), string(comparisonSign), value)
} else {
comparison = fmt.Sprintf("(%s %s %s)", EscapeName(column), string(comparisonSign), value)
}

return comparison, err
}

func BuildEqualsComparison(columns []string, values []string) (result string, err error) {
func BuildEqualsComparison(columns []string, values []string, alias string) (result string, err error) {
if len(columns) == 0 {
return "", fmt.Errorf("Got 0 columns in GetEqualsComparison")
}
Expand All @@ -83,7 +90,7 @@ func BuildEqualsComparison(columns []string, values []string) (result string, er
comparisons := []string{}
for i, column := range columns {
value := values[i]
comparison, err := BuildValueComparison(column, value, EqualsComparisonSign)
comparison, err := BuildValueComparison(column, value, alias, EqualsComparisonSign)
if err != nil {
return "", err
}
Expand All @@ -96,7 +103,7 @@ func BuildEqualsComparison(columns []string, values []string) (result string, er

func BuildEqualsPreparedComparison(columns []string) (result string, err error) {
values := buildPreparedValues(len(columns))
return BuildEqualsComparison(columns, values)
return BuildEqualsComparison(columns, values, "")
}

func BuildSetPreparedClause(columns *ColumnList) (result string, err error) {
Expand All @@ -118,7 +125,7 @@ func BuildSetPreparedClause(columns *ColumnList) (result string, err error) {
return strings.Join(setTokens, ", "), nil
}

func BuildRangeComparison(columns []string, values []string, args []interface{}, comparisonSign ValueComparisonSign) (result string, explodedArgs []interface{}, err error) {
func BuildRangeComparison(columns []string, values []string, args []interface{}, comparisonSign ValueComparisonSign, alias string) (result string, explodedArgs []interface{}, err error) {
if len(columns) == 0 {
return "", explodedArgs, fmt.Errorf("Got 0 columns in GetRangeComparison")
}
Expand All @@ -141,12 +148,12 @@ func BuildRangeComparison(columns []string, values []string, args []interface{},

for i, column := range columns {
value := values[i]
rangeComparison, err := BuildValueComparison(column, value, comparisonSign)
rangeComparison, err := BuildValueComparison(column, value, alias, comparisonSign)
if err != nil {
return "", explodedArgs, err
}
if i > 0 {
equalitiesComparison, err := BuildEqualsComparison(columns[0:i], values[0:i])
equalitiesComparison, err := BuildEqualsComparison(columns[0:i], values[0:i], alias)
if err != nil {
return "", explodedArgs, err
}
Expand All @@ -161,7 +168,7 @@ func BuildRangeComparison(columns []string, values []string, args []interface{},
}

if includeEquals {
comparison, err := BuildEqualsComparison(columns, values)
comparison, err := BuildEqualsComparison(columns, values, alias)
if err != nil {
return "", explodedArgs, nil
}
Expand All @@ -175,10 +182,10 @@ func BuildRangeComparison(columns []string, values []string, args []interface{},

func BuildRangePreparedComparison(columns *ColumnList, args []interface{}, comparisonSign ValueComparisonSign) (result string, explodedArgs []interface{}, err error) {
values := buildColumnsPreparedValues(columns)
return BuildRangeComparison(columns.Names(), values, args, comparisonSign)
return BuildRangeComparison(columns.Names(), values, args, comparisonSign, "")
}

func BuildRangeInsertQuery(databaseName, originalTableName, ghostTableName string, sharedColumns []string, mappedSharedColumns []string, uniqueKey string, uniqueKeyColumns *ColumnList, rangeStartValues, rangeEndValues []string, rangeStartArgs, rangeEndArgs []interface{}, includeRangeStartValues bool, transactionalTable bool) (result string, explodedArgs []interface{}, err error) {
func BuildRangeInsertQuery(databaseName, originalTableName, ghostTableName string, sharedColumns []string, mappedSharedColumns []string, uniqueKey string, uniqueKeyColumns *ColumnList, rangeStartValues, rangeEndValues []string, rangeStartArgs, rangeEndArgs []interface{}, includeRangeStartValues bool, transactionalTable bool, isAddUniqueKey bool) (result string, explodedArgs []interface{}, err error) {
if len(sharedColumns) == 0 {
return "", explodedArgs, fmt.Errorf("Got 0 shared columns in BuildRangeInsertQuery")
}
Expand All @@ -197,18 +204,24 @@ func BuildRangeInsertQuery(databaseName, originalTableName, ghostTableName strin
sharedColumns[i] = EscapeName(sharedColumns[i])
}
sharedColumnsListing := strings.Join(sharedColumns, ", ")
sharedColumnsListingAlias := "a." + strings.Join(sharedColumns, ", a.")

uniqueKey = EscapeName(uniqueKey)
var minRangeComparisonSign ValueComparisonSign = GreaterThanComparisonSign
if includeRangeStartValues {
minRangeComparisonSign = GreaterThanOrEqualsComparisonSign
}
rangeStartComparison, rangeExplodedArgs, err := BuildRangeComparison(uniqueKeyColumns.Names(), rangeStartValues, rangeStartArgs, minRangeComparisonSign)
// rangeStartComparison, rangeExplodedArgs, err := BuildRangeComparison(uniqueKeyColumns.Names(), rangeStartValues, rangeStartArgs, minRangeComparisonSign)
alias := ""
if isAddUniqueKey {
alias = "a"
}
rangeStartComparison, rangeExplodedArgs, err := BuildRangeComparison(uniqueKeyColumns.Names(), rangeStartValues, rangeStartArgs, minRangeComparisonSign, alias)
if err != nil {
return "", explodedArgs, err
}
explodedArgs = append(explodedArgs, rangeExplodedArgs...)
rangeEndComparison, rangeExplodedArgs, err := BuildRangeComparison(uniqueKeyColumns.Names(), rangeEndValues, rangeEndArgs, LessThanOrEqualsComparisonSign)
rangeEndComparison, rangeExplodedArgs, err := BuildRangeComparison(uniqueKeyColumns.Names(), rangeEndValues, rangeEndArgs, LessThanOrEqualsComparisonSign, alias)
if err != nil {
return "", explodedArgs, err
}
Expand All @@ -217,21 +230,51 @@ func BuildRangeInsertQuery(databaseName, originalTableName, ghostTableName strin
if transactionalTable {
transactionalClause = "lock in share mode"
}
result = fmt.Sprintf(`
// result = fmt.Sprintf(`
// insert /* gh-ost %s.%s */ ignore into %s.%s (%s)
// (select %s from %s.%s force index (%s)
// where (%s and %s) %s
// )
// `, databaseName, originalTableName, databaseName, ghostTableName, mappedSharedColumnsListing,
// sharedColumnsListing, databaseName, originalTableName, uniqueKey,
// rangeStartComparison, rangeEndComparison, transactionalClause)

if !isAddUniqueKey {
result = fmt.Sprintf(`
insert /* gh-ost %s.%s */ ignore into %s.%s (%s)
(select %s from %s.%s force index (%s)
where (%s and %s) %s
)
`, databaseName, originalTableName, databaseName, ghostTableName, mappedSharedColumnsListing,
sharedColumnsListing, databaseName, originalTableName, uniqueKey,
rangeStartComparison, rangeEndComparison, transactionalClause)
sharedColumnsListing, databaseName, originalTableName, uniqueKey,
rangeStartComparison, rangeEndComparison, transactionalClause)
} else {
var onJoins []string
var nullStmt string
for i, column := range uniqueKeyColumns.Names() {
onJoins = append(onJoins, fmt.Sprintf("a.%s = b.%s", EscapeName(column), EscapeName(column)))
if i == 0 {
nullStmt = fmt.Sprintf(" and b.%s IS NULL", EscapeName(column))
}
}
result = fmt.Sprintf(`
insert /* gh-ost %s.%s */ into %s.%s (%s)
(select %s from %s.%s a force index (%s) left join %s.%s b on %s
where (%s and %s) %s %s
)
`, databaseName, originalTableName, databaseName, ghostTableName, mappedSharedColumnsListing,
sharedColumnsListingAlias, databaseName, originalTableName, uniqueKey, databaseName, ghostTableName,
strings.Join(onJoins, " and "), rangeStartComparison, rangeEndComparison, nullStmt, transactionalClause)
}

return result, explodedArgs, nil
}

func BuildRangeInsertPreparedQuery(databaseName, originalTableName, ghostTableName string, sharedColumns []string, mappedSharedColumns []string, uniqueKey string, uniqueKeyColumns *ColumnList, rangeStartArgs, rangeEndArgs []interface{}, includeRangeStartValues bool, transactionalTable bool) (result string, explodedArgs []interface{}, err error) {
func BuildRangeInsertPreparedQuery(databaseName, originalTableName, ghostTableName string, sharedColumns []string, mappedSharedColumns []string, uniqueKey string, uniqueKeyColumns *ColumnList, rangeStartArgs, rangeEndArgs []interface{}, includeRangeStartValues bool, transactionalTable bool, isAddUniqueKey bool) (result string, explodedArgs []interface{}, err error) {
rangeStartValues := buildColumnsPreparedValues(uniqueKeyColumns)
rangeEndValues := buildColumnsPreparedValues(uniqueKeyColumns)
return BuildRangeInsertQuery(databaseName, originalTableName, ghostTableName, sharedColumns, mappedSharedColumns, uniqueKey, uniqueKeyColumns, rangeStartValues, rangeEndValues, rangeStartArgs, rangeEndArgs, includeRangeStartValues, transactionalTable)
// return BuildRangeInsertQuery(databaseName, originalTableName, ghostTableName, sharedColumns, mappedSharedColumns, uniqueKey, uniqueKeyColumns, rangeStartValues, rangeEndValues, rangeStartArgs, rangeEndArgs, includeRangeStartValues, transactionalTable)
return BuildRangeInsertQuery(databaseName, originalTableName, ghostTableName, sharedColumns, mappedSharedColumns, uniqueKey, uniqueKeyColumns, rangeStartValues, rangeEndValues, rangeStartArgs, rangeEndArgs, includeRangeStartValues, transactionalTable, isAddUniqueKey)
}

func BuildUniqueKeyRangeEndPreparedQueryViaOffset(databaseName, tableName string, uniqueKeyColumns *ColumnList, rangeStartArgs, rangeEndArgs []interface{}, chunkSize int64, includeRangeStartValues bool, hint string) (result string, explodedArgs []interface{}, err error) {
Expand Down Expand Up @@ -418,7 +461,7 @@ func BuildDMLDeleteQuery(databaseName, tableName string, tableColumns, uniqueKey
return result, uniqueKeyArgs, nil
}

func BuildDMLInsertQuery(databaseName, tableName string, tableColumns, sharedColumns, mappedSharedColumns *ColumnList, args []interface{}) (result string, sharedArgs []interface{}, err error) {
func BuildDMLInsertQuery(databaseName, tableName string, tableColumns, sharedColumns, mappedSharedColumns *ColumnList, args []interface{}, isAddUniqueKey bool) (result string, sharedArgs []interface{}, err error) {
if len(args) != tableColumns.Len() {
return result, args, fmt.Errorf("args count differs from table column count in BuildDMLInsertQuery")
}
Expand All @@ -443,13 +486,18 @@ func BuildDMLInsertQuery(databaseName, tableName string, tableColumns, sharedCol
}
preparedValues := buildColumnsPreparedValues(mappedSharedColumns)

keyboard := "replace"
if isAddUniqueKey {
keyboard = "insert"
}

result = fmt.Sprintf(`
replace /* gh-ost %s.%s */ into
%s /* gh-ost %s.%s */ into
%s.%s
(%s)
values
(%s)
`, databaseName, tableName,
`, keyboard, databaseName, tableName,
databaseName, tableName,
strings.Join(mappedSharedColumnNames, ", "),
strings.Join(preparedValues, ", "),
Expand Down
Loading