Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
313 changes: 305 additions & 8 deletions go/sql/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,23 +281,34 @@ func BuildRangeInsertQuery(databaseName, originalTableName, ghostTableName strin
sharedColumnsListing := strings.Join(sharedColumns, ", ")

uniqueKey = EscapeName(uniqueKey)
transactionalClause := ""
if transactionalTable {
if noWait {
transactionalClause = "for share nowait"
} else {
transactionalClause = "lock in share mode"
}
}
var minRangeComparisonSign = GreaterThanComparisonSign
if includeRangeStartValues {
minRangeComparisonSign = GreaterThanOrEqualsComparisonSign
}

if uniqueKeyColumns.Len() == 2 {
return buildRangeInsertQueryTwoColumn(
databaseName, originalTableName, ghostTableName,
sharedColumnsListing, mappedSharedColumnsListing,
uniqueKey, uniqueKeyColumns,
rangeStartValues, rangeEndValues,
rangeStartArgs, rangeEndArgs,
minRangeComparisonSign, transactionalClause,
)
}
rangeStartComparison, rangeExplodedArgs, err := BuildRangeComparison(uniqueKeyColumns.Names(), rangeStartValues, rangeStartArgs, minRangeComparisonSign)
if err != nil {
return "", explodedArgs, err
}
explodedArgs = append(explodedArgs, rangeExplodedArgs...)
transactionalClause := ""
if transactionalTable {
if noWait {
transactionalClause = "for share nowait"
} else {
transactionalClause = "lock in share mode"
}
}
rangeEndComparison, rangeExplodedArgs, err := BuildRangeComparison(uniqueKeyColumns.Names(), rangeEndValues, rangeEndArgs, LessThanOrEqualsComparisonSign)
if err != nil {
return "", explodedArgs, err
Expand All @@ -323,6 +334,88 @@ func BuildRangeInsertQuery(databaseName, originalTableName, ghostTableName strin
return result, explodedArgs, nil
}

func sameFirstColumnValue(rangeStartArgs, rangeEndArgs []interface{}) bool {
return fmt.Sprintf("%v", rangeStartArgs[0]) == fmt.Sprintf("%v", rangeEndArgs[0])
}

func buildRangeInsertQueryTwoColumn(
databaseName, originalTableName, ghostTableName string,
sharedColumnsListing, mappedSharedColumnsListing string,
uniqueKey string,
uniqueKeyColumns *ColumnList,
rangeStartValues, rangeEndValues []string,
rangeStartArgs, rangeEndArgs []interface{},
minRangeComparisonSign ValueComparisonSign,
transactionalClause string,
) (result string, explodedArgs []interface{}, err error) {
cols := uniqueKeyColumns.Columns()

if len(cols) != len(rangeStartValues) {
return "", explodedArgs, fmt.Errorf("got %d columns but %d rangeStartValues in buildRangeInsertQueryTwoColumn", len(cols), len(rangeStartValues))
}
if len(cols) != len(rangeEndValues) {
return "", explodedArgs, fmt.Errorf("got %d columns but %d rangeEndValues in buildRangeInsertQueryTwoColumn", len(cols), len(rangeEndValues))
}
if len(cols) != len(rangeStartArgs) {
return "", explodedArgs, fmt.Errorf("got %d columns but %d rangeStartArgs in buildRangeInsertQueryTwoColumn", len(cols), len(rangeStartArgs))
}
if len(cols) != len(rangeEndArgs) {
return "", explodedArgs, fmt.Errorf("got %d columns but %d rangeEndArgs in buildRangeInsertQueryTwoColumn", len(cols), len(rangeEndArgs))
}

col1Name := EscapeName(cols[0].Name)
col2Name := EscapeName(cols[1].Name)
col1StartVal := rangeStartValues[0]
col2StartVal := rangeStartValues[1]
col1EndVal := rangeEndValues[0]
col2EndVal := rangeEndValues[1]
col2StartOp := string(minRangeComparisonSign)
fromClause := fmt.Sprintf("%s.%s force index (%s)", databaseName, originalTableName, uniqueKey)

if sameFirstColumnValue(rangeStartArgs, rangeEndArgs) {
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sameFirstColumnValue reports whether the first column's range start and end bounds are equal. When true, the range fits within a single col1 partition, and a simpler query must replace the 3-part UNION.

It's not just a performance shortcut, it's a correctness requirement. Look at what the UNION produces when col1_start == col1_end:

  • Part 1: col1 = x AND col2 > start_col2 - no upper bound on col2, so it includes all rows, including rows above end_col2
  • Part 2: col1 > x AND col1 < x - returns nothing
  • Part 3: col1 = x AND col2 <= end_col2 - no lower bound, so it includes rows below start_col2

The UNION ALL result is essentially every row where col1 = x. The outer ORDER BY ... LIMIT 1 OFFSET chunkSize-1 then picks the Nth row, which is the wrong boundary.

The simplified query fixes this:

WHERE col1 = ? AND col2 > ? AND col2 <= ?

Both bounds are explicit, and because col1 is pinned with equality, MySQL can use the composite index directly as a seek on (col1, col2).

result = fmt.Sprintf(`
insert /* gh-ost %s.%s */ ignore
into
%s.%s
(%s)
(
select %s
from
%s.%s
force index (%s)
where (%s = %s and %s %s %s and %s <= %s)
%s
)`,
databaseName, originalTableName,
databaseName, ghostTableName, mappedSharedColumnsListing,
sharedColumnsListing,
databaseName, originalTableName, uniqueKey,
col1Name, col1StartVal, col2Name, col2StartOp, col2StartVal, col2Name, col2EndVal,
transactionalClause,
)
explodedArgs = append(explodedArgs, rangeStartArgs[0], rangeStartArgs[1], rangeEndArgs[1])
return result, explodedArgs, nil
}

part1, part2, part3, explodedArgs := buildTwoColumnUnionParts(
sharedColumnsListing, fromClause,
col1Name, col2Name,
col1StartVal, col2StartVal, col1EndVal, col2EndVal,
col2StartOp, transactionalClause,
rangeStartArgs, rangeEndArgs,
)

result = fmt.Sprintf(`
insert /* gh-ost %s.%s */ ignore
into %s.%s (%s)
%s union all %s union all %s`,
databaseName, originalTableName,
databaseName, ghostTableName, mappedSharedColumnsListing,
part1, part2, part3,
)
return result, explodedArgs, nil
}

func BuildRangeInsertPreparedQuery(databaseName, originalTableName, ghostTableName string, sharedColumns []string, mappedSharedColumns []string, uniqueKey string, uniqueKeyColumns *ColumnList, rangeStartArgs, rangeEndArgs []interface{}, includeRangeStartValues bool, transactionalTable bool, noWait bool) (result string, explodedArgs []interface{}, err error) {
rangeStartValues := buildColumnsPreparedValues(uniqueKeyColumns)
rangeEndValues := buildColumnsPreparedValues(uniqueKeyColumns)
Expand All @@ -340,6 +433,11 @@ func BuildUniqueKeyRangeEndPreparedQueryViaOffset(databaseName, tableName string
if includeRangeStartValues {
startRangeComparisonSign = GreaterThanOrEqualsComparisonSign
}

if uniqueKeyColumns.Len() == 2 {
return buildUniqueKeyRangeEndTwoColumnViaOffset(databaseName, tableName, uniqueKeyColumns, rangeStartArgs, rangeEndArgs, chunkSize, startRangeComparisonSign, hint)
}

rangeStartComparison, rangeExplodedArgs, err := BuildRangePreparedComparison(uniqueKeyColumns, rangeStartArgs, startRangeComparisonSign)
if err != nil {
return "", explodedArgs, err
Expand Down Expand Up @@ -393,6 +491,11 @@ func BuildUniqueKeyRangeEndPreparedQueryViaTemptable(databaseName, tableName str
if includeRangeStartValues {
startRangeComparisonSign = GreaterThanOrEqualsComparisonSign
}

if uniqueKeyColumns.Len() == 2 {
return buildUniqueKeyRangeEndTwoColumnViaTemptable(databaseName, tableName, uniqueKeyColumns, rangeStartArgs, rangeEndArgs, chunkSize, startRangeComparisonSign, hint)
}

rangeStartComparison, rangeExplodedArgs, err := BuildRangePreparedComparison(uniqueKeyColumns, rangeStartArgs, startRangeComparisonSign)
if err != nil {
return "", explodedArgs, err
Expand Down Expand Up @@ -442,6 +545,200 @@ func BuildUniqueKeyRangeEndPreparedQueryViaTemptable(databaseName, tableName str
return result, explodedArgs, nil
}

type twoColumnRangeMeta struct {
col1Name, col2Name string
col1Val, col2Val string
orderByAsc string
orderByDesc string
}

func newTwoColumnRangeMeta(uniqueKeyColumns *ColumnList) twoColumnRangeMeta {
colVals := buildColumnsPreparedValues(uniqueKeyColumns)
cols := uniqueKeyColumns.Columns()
col1Name := EscapeName(cols[0].Name)
col2Name := EscapeName(cols[1].Name)
col1Asc := fmt.Sprintf("%s asc", col1Name)
col2Asc := fmt.Sprintf("%s asc", col2Name)
col1Desc := fmt.Sprintf("%s desc", col1Name)
col2Desc := fmt.Sprintf("%s desc", col2Name)
if cols[0].Type == EnumColumnType {
col1Asc = fmt.Sprintf("concat(%s) asc", col1Name)
col1Desc = fmt.Sprintf("concat(%s) desc", col1Name)
}
if cols[1].Type == EnumColumnType {
col2Asc = fmt.Sprintf("concat(%s) asc", col2Name)
col2Desc = fmt.Sprintf("concat(%s) desc", col2Name)
}
return twoColumnRangeMeta{
col1Name: col1Name,
col2Name: col2Name,
col1Val: colVals[0],
col2Val: colVals[1],
orderByAsc: col1Asc + ", " + col2Asc,
orderByDesc: col1Desc + ", " + col2Desc,
}
}

func buildTwoColumnUnionParts(
selectClause, fromClause string,
col1Name, col2Name string,
col1StartVal, col2StartVal, col1EndVal, col2EndVal string,
col2StartOp, partSuffix string,
rangeStartArgs, rangeEndArgs []interface{},
) (part1, part2, part3 string, explodedArgs []interface{}) {
part1 = fmt.Sprintf(
`(select %s from %s where %s = %s and %s %s %s %s)`,
selectClause, fromClause,
col1Name, col1StartVal, col2Name, col2StartOp, col2StartVal,
partSuffix,
)
explodedArgs = append(explodedArgs, rangeStartArgs[0], rangeStartArgs[1])

part2 = fmt.Sprintf(
`(select %s from %s where %s > %s and %s < %s %s)`,
selectClause, fromClause,
col1Name, col1StartVal, col1Name, col1EndVal,
partSuffix,
)
explodedArgs = append(explodedArgs, rangeStartArgs[0], rangeEndArgs[0])

part3 = fmt.Sprintf(
`(select %s from %s where %s = %s and %s <= %s %s)`,
selectClause, fromClause,
col1Name, col1EndVal, col2Name, col2EndVal,
partSuffix,
)
explodedArgs = append(explodedArgs, rangeEndArgs[0], rangeEndArgs[1])
return
}

func buildUniqueKeyRangeEndTwoColumnViaOffset(
databaseName, tableName string,
uniqueKeyColumns *ColumnList,
rangeStartArgs, rangeEndArgs []interface{},
chunkSize int64,
startRangeComparisonSign ValueComparisonSign,
hint string,
) (result string, explodedArgs []interface{}, err error) {
m := newTwoColumnRangeMeta(uniqueKeyColumns)
col2StartOp := string(startRangeComparisonSign)
selectClause := m.col1Name + ", " + m.col2Name
fromClause := databaseName + "." + tableName
partSuffix := fmt.Sprintf("order by %s limit %d", m.orderByAsc, chunkSize)

if sameFirstColumnValue(rangeStartArgs, rangeEndArgs) {
result = fmt.Sprintf(`
select /* gh-ost %s.%s %s */
%s, %s
from
%s.%s
where
(%s = %s and %s %s %s and %s <= %s)
order by
%s
limit 1
offset %d`,
databaseName, tableName, hint,
m.col1Name, m.col2Name,
databaseName, tableName,
m.col1Name, m.col1Val, m.col2Name, col2StartOp, m.col2Val, m.col2Name, m.col2Val,
m.orderByAsc,
chunkSize-1,
)
explodedArgs = append(explodedArgs, rangeStartArgs[0], rangeStartArgs[1], rangeEndArgs[1])
return result, explodedArgs, nil
}

part1, part2, part3, explodedArgs := buildTwoColumnUnionParts(
selectClause, fromClause,
m.col1Name, m.col2Name,
m.col1Val, m.col2Val, m.col1Val, m.col2Val,
col2StartOp, partSuffix,
rangeStartArgs, rangeEndArgs,
)

result = fmt.Sprintf(`
select /* gh-ost %s.%s %s */
%s, %s
from
(%s union all %s union all %s) t
order by
%s
limit 1
offset %d`,
databaseName, tableName, hint,
m.col1Name, m.col2Name,
part1, part2, part3,
m.orderByAsc,
chunkSize-1,
)
return result, explodedArgs, nil
}

func buildUniqueKeyRangeEndTwoColumnViaTemptable(
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The UNION optimization is arguably unnecessary for the ViaTemptable code path, since it'll only be called once at the end of the migration, when the query is unlikely to be slow. However, it provides consistency with ViaOffset and doesn't add much code.

databaseName, tableName string,
uniqueKeyColumns *ColumnList,
rangeStartArgs, rangeEndArgs []interface{},
chunkSize int64,
startRangeComparisonSign ValueComparisonSign,
hint string,
) (result string, explodedArgs []interface{}, err error) {
m := newTwoColumnRangeMeta(uniqueKeyColumns)
col2StartOp := string(startRangeComparisonSign)
selectClause := m.col1Name + ", " + m.col2Name
fromClause := databaseName + "." + tableName
partSuffix := fmt.Sprintf("order by %s limit %d", m.orderByAsc, chunkSize)

if sameFirstColumnValue(rangeStartArgs, rangeEndArgs) {
result = fmt.Sprintf(`
select /* gh-ost %s.%s %s */ %s, %s
from (
select %s, %s
from %s.%s
where (%s = %s and %s %s %s and %s <= %s)
order by %s
limit %d
) select_osc_chunk
order by %s
limit 1`,
databaseName, tableName, hint, m.col1Name, m.col2Name,
m.col1Name, m.col2Name,
databaseName, tableName,
m.col1Name, m.col1Val, m.col2Name, col2StartOp, m.col2Val, m.col2Name, m.col2Val,
m.orderByAsc, chunkSize,
m.orderByDesc,
)
explodedArgs = append(explodedArgs, rangeStartArgs[0], rangeStartArgs[1], rangeEndArgs[1])
return result, explodedArgs, nil
}

part1, part2, part3, explodedArgs := buildTwoColumnUnionParts(
selectClause, fromClause,
m.col1Name, m.col2Name,
m.col1Val, m.col2Val, m.col1Val, m.col2Val,
col2StartOp, partSuffix,
rangeStartArgs, rangeEndArgs,
)

result = fmt.Sprintf(`
select /* gh-ost %s.%s %s */ %s, %s
from (
select %s, %s
from (%s union all %s union all %s) t
order by %s
limit %d
) select_osc_chunk
order by %s
limit 1`,
databaseName, tableName, hint, m.col1Name, m.col2Name,
m.col1Name, m.col2Name,
part1, part2, part3,
m.orderByAsc, chunkSize,
m.orderByDesc,
)
return result, explodedArgs, nil
}

func BuildUniqueKeyMinValuesPreparedQuery(databaseName, tableName string, uniqueKey *UniqueKey) (string, error) {
return buildUniqueKeyMinMaxValuesPreparedQuery(databaseName, tableName, uniqueKey, "asc")
}
Expand Down
Loading
Loading