diff --git a/dev.yml b/dev.yml new file mode 100644 index 000000000..4c28ff349 --- /dev/null +++ b/dev.yml @@ -0,0 +1,21 @@ +name: gh-ost + +env: + TESTCONTAINERS_DOCKER_SOCKET_OVERRIDE: /var/run/docker.sock + TESTCONTAINERS_RYUK_DISABLED: "true" + +up: + - go: + version: "1.22.12" + - podman + - custom: + name: Go Dependencies + met?: go mod download + meet: echo 'go mod failed to download dependencies'; false + +commands: + test: + desc: Run all the tests. + run: | + export DOCKER_HOST=unix://$(podman machine inspect --format '{{.ConnectionInfo.PodmanSocket.Path}}') + script/test diff --git a/go/base/context.go b/go/base/context.go index 8a09c43a8..ac077076f 100644 --- a/go/base/context.go +++ b/go/base/context.go @@ -151,6 +151,7 @@ type MigrationContext struct { HooksHintOwner string HooksHintToken string HooksStatusIntervalSec int64 + PanicOnWarnings bool DropServeSocket bool ServeSocketFile string @@ -231,6 +232,7 @@ type MigrationContext struct { ColumnRenameMap map[string]string DroppedColumnsMap map[string]bool MappedSharedColumns *sql.ColumnList + MigrationLastInsertSQLWarnings []string MigrationRangeMinValues *sql.ColumnValues MigrationRangeMaxValues *sql.ColumnValues Iteration int64 diff --git a/go/cmd/gh-ost/main.go b/go/cmd/gh-ost/main.go index 225676364..a1670cdd4 100644 --- a/go/cmd/gh-ost/main.go +++ b/go/cmd/gh-ost/main.go @@ -108,6 +108,7 @@ func main() { chunkSize := flag.Int64("chunk-size", 1000, "amount of rows to handle in each iteration (allowed range: 10-100,000)") dmlBatchSize := flag.Int64("dml-batch-size", 10, "batch size for DML events to apply in a single transaction (range 1-100)") defaultRetries := flag.Int64("default-retries", 60, "Default number of retries for various operations before panicking") + flag.BoolVar(&migrationContext.PanicOnWarnings, "panic-on-warnings", false, "Panic when SQL warnings are encountered when copying a batch indicating data loss") cutOverLockTimeoutSeconds := flag.Int64("cut-over-lock-timeout-seconds", 3, "Max number of seconds to hold locks on tables while attempting to cut-over (retry attempted when lock exceeds timeout) or attempting instant DDL") niceRatio := flag.Float64("nice-ratio", 0, "force being 'nice', imply sleep time per chunk time; range: [0.0..100.0]. Example values: 0 is aggressive. 1: for every 1ms spent copying rows, sleep additional 1ms (effectively doubling runtime); 0.7: for every 10ms spend in a rowcopy chunk, spend 7ms sleeping immediately after") diff --git a/go/logic/applier.go b/go/logic/applier.go index 6c8ac71a8..e50df7dd2 100644 --- a/go/logic/applier.go +++ b/go/logic/applier.go @@ -662,7 +662,7 @@ func (this *Applier) ReadMigrationRangeValues() error { // which will be used for copying the next chunk of rows. Ir returns "false" if there is // no further chunk to work through, i.e. we're past the last chunk and are done with // iterating the range (and this done with copying row chunks) -func (this *Applier) CalculateNextIterationRangeEndValues() (hasFurtherRange bool, err error) { +func (this *Applier) CalculateNextIterationRangeEndValues() (hasFurtherRange bool, expectedRowCount int64, err error) { this.migrationContext.MigrationIterationRangeMinValues = this.migrationContext.MigrationIterationRangeMaxValues if this.migrationContext.MigrationIterationRangeMinValues == nil { this.migrationContext.MigrationIterationRangeMinValues = this.migrationContext.MigrationRangeMinValues @@ -683,32 +683,36 @@ func (this *Applier) CalculateNextIterationRangeEndValues() (hasFurtherRange boo fmt.Sprintf("iteration:%d", this.migrationContext.GetIteration()), ) if err != nil { - return hasFurtherRange, err + return hasFurtherRange, expectedRowCount, err } rows, err := this.db.Query(query, explodedArgs...) if err != nil { - return hasFurtherRange, err + return hasFurtherRange, expectedRowCount, err } defer rows.Close() - iterationRangeMaxValues := sql.NewColumnValues(this.migrationContext.UniqueKey.Len()) + iterationRangeMaxValues := sql.NewColumnValues(this.migrationContext.UniqueKey.Len() + 1) for rows.Next() { if err = rows.Scan(iterationRangeMaxValues.ValuesPointers...); err != nil { - return hasFurtherRange, err + return hasFurtherRange, expectedRowCount, err } - hasFurtherRange = true + + expectedRowCount = (*iterationRangeMaxValues.ValuesPointers[len(iterationRangeMaxValues.ValuesPointers)-1].(*interface{})).(int64) + iterationRangeMaxValues = sql.ToColumnValues(iterationRangeMaxValues.AbstractValues()[:len(iterationRangeMaxValues.AbstractValues())-1]) + + hasFurtherRange = expectedRowCount > 0 } if err = rows.Err(); err != nil { - return hasFurtherRange, err + return hasFurtherRange, expectedRowCount, err } if hasFurtherRange { this.migrationContext.MigrationIterationRangeMaxValues = iterationRangeMaxValues - return hasFurtherRange, nil + return hasFurtherRange, expectedRowCount, nil } } this.migrationContext.Log.Debugf("Iteration complete: no further range to iterate") - return hasFurtherRange, nil + return hasFurtherRange, expectedRowCount, nil } // ApplyIterationInsertQuery issues a chunk-INSERT query on the ghost table. It is where @@ -753,6 +757,35 @@ func (this *Applier) ApplyIterationInsertQuery() (chunkSize int64, rowsAffected if err != nil { return nil, err } + + if this.migrationContext.PanicOnWarnings { + //nolint:execinquery + rows, err := tx.Query("SHOW WARNINGS") + if err != nil { + return nil, err + } + defer rows.Close() + if err = rows.Err(); err != nil { + return nil, err + } + + var sqlWarnings []string + for rows.Next() { + var level, message string + var code int + if err := rows.Scan(&level, &code, &message); err != nil { + this.migrationContext.Log.Warningf("Failed to read SHOW WARNINGS row") + continue + } + migrationUniqueKeySuffix := fmt.Sprintf("for key '%s.%s'", this.migrationContext.GetGhostTableName(), this.migrationContext.UniqueKey.Name) + if strings.HasPrefix(message, "Duplicate entry") && strings.HasSuffix(message, migrationUniqueKeySuffix) { + continue + } + sqlWarnings = append(sqlWarnings, fmt.Sprintf("%s: %s (%d)", level, message, code)) + } + this.migrationContext.MigrationLastInsertSQLWarnings = sqlWarnings + } + if err := tx.Commit(); err != nil { return nil, err } diff --git a/go/logic/applier_test.go b/go/logic/applier_test.go index f53e65ffb..8214499d5 100644 --- a/go/logic/applier_test.go +++ b/go/logic/applier_test.go @@ -504,6 +504,98 @@ func (suite *ApplierTestSuite) TestCreateGhostTable() { suite.Require().Equal("CREATE TABLE `_testing_gho` (\n `id` int DEFAULT NULL,\n `item_id` int DEFAULT NULL\n) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci", createDDL) } +func (suite *ApplierTestSuite) TestPanicOnWarningsInApplyIterationInsertQuerySucceedsWithUniqueKeyWarningInsertedByDMLEvent() { + ctx := context.Background() + + var err error + + _, err = suite.db.ExecContext(ctx, "CREATE TABLE test.testing (id INT, item_id INT, UNIQUE KEY (item_id));") + suite.Require().NoError(err) + + _, err = suite.db.ExecContext(ctx, "CREATE TABLE test._testing_gho (id INT, item_id INT, UNIQUE KEY (item_id));") + suite.Require().NoError(err) + + connectionConfig, err := GetConnectionConfig(ctx, suite.mysqlContainer) + suite.Require().NoError(err) + + migrationContext := base.NewMigrationContext() + migrationContext.ApplierConnectionConfig = connectionConfig + migrationContext.DatabaseName = "test" + migrationContext.SkipPortValidation = true + migrationContext.OriginalTableName = "testing" + migrationContext.SetConnectionConfig("innodb") + + migrationContext.PanicOnWarnings = true + + migrationContext.OriginalTableColumns = sql.NewColumnList([]string{"id", "item_id"}) + migrationContext.SharedColumns = sql.NewColumnList([]string{"id", "item_id"}) + migrationContext.MappedSharedColumns = sql.NewColumnList([]string{"id", "item_id"}) + migrationContext.UniqueKey = &sql.UniqueKey{ + Name: "item_id", + Columns: *sql.NewColumnList([]string{"item_id"}), + } + + applier := NewApplier(migrationContext) + suite.Require().NoError(applier.prepareQueries()) + defer applier.Teardown() + + err = applier.InitDBConnections() + suite.Require().NoError(err) + + _, err = suite.db.ExecContext(ctx, "INSERT INTO test.testing (id, item_id) VALUES (123456, 42);") + suite.Require().NoError(err) + + dmlEvents := []*binlog.BinlogDMLEvent{ + { + DatabaseName: "test", + TableName: "testing", + DML: binlog.InsertDML, + NewColumnValues: sql.ToColumnValues([]interface{}{123456, 42}), + }, + } + err = applier.ApplyDMLEventQueries(dmlEvents) + suite.Require().NoError(err) + + err = applier.CreateChangelogTable() + suite.Require().NoError(err) + err = applier.ReadMigrationRangeValues() + suite.Require().NoError(err) + + hasFurtherRange, expectedRangeSize, err := applier.CalculateNextIterationRangeEndValues() + suite.Require().NoError(err) + suite.Require().True(hasFurtherRange) + suite.Require().Equal(int64(1), expectedRangeSize) + + _, rowsAffected, _, err := applier.ApplyIterationInsertQuery() + suite.Require().NoError(err) + suite.Require().Equal(int64(0), rowsAffected) + + // Ensure Duplicate entry '42' for key '_testing_gho.item_id' is ignored correctly + suite.Require().Empty(applier.migrationContext.MigrationLastInsertSQLWarnings) + + // Check that the row was inserted + rows, err := suite.db.Query("SELECT * FROM test._testing_gho") + suite.Require().NoError(err) + defer rows.Close() + + var count, id, item_id int + for rows.Next() { + err = rows.Scan(&id, &item_id) + suite.Require().NoError(err) + count += 1 + } + suite.Require().NoError(rows.Err()) + + suite.Require().Equal(1, count) + suite.Require().Equal(123456, id) + suite.Require().Equal(42, item_id) + + suite.Require(). + Equal(int64(1), migrationContext.TotalDMLEventsApplied) + suite.Require(). + Equal(int64(0), migrationContext.RowsDeltaEstimate) +} + func TestApplier(t *testing.T) { suite.Run(t, new(ApplierTestSuite)) } diff --git a/go/logic/migrator.go b/go/logic/migrator.go index 5e4e4eccf..6095d4d45 100644 --- a/go/logic/migrator.go +++ b/go/logic/migrator.go @@ -1238,8 +1238,9 @@ func (this *Migrator) iterateChunks() error { // When hasFurtherRange is false, original table might be write locked and CalculateNextIterationRangeEndValues would hangs forever hasFurtherRange := false + expectedRangeSize := int64(0) if err := this.retryOperation(func() (e error) { - hasFurtherRange, e = this.applier.CalculateNextIterationRangeEndValues() + hasFurtherRange, expectedRangeSize, e = this.applier.CalculateNextIterationRangeEndValues() return e }); err != nil { return terminateRowIteration(err) @@ -1265,6 +1266,19 @@ func (this *Migrator) iterateChunks() error { if err != nil { return err // wrapping call will retry } + + if this.migrationContext.PanicOnWarnings { + if len(this.migrationContext.MigrationLastInsertSQLWarnings) > 0 { + for _, warning := range this.migrationContext.MigrationLastInsertSQLWarnings { + this.migrationContext.Log.Infof("ApplyIterationInsertQuery has SQL warnings! %s", warning) + } + if expectedRangeSize != rowsAffected { + joinedWarnings := strings.Join(this.migrationContext.MigrationLastInsertSQLWarnings, "; ") + terminateRowIteration(fmt.Errorf("ApplyIterationInsertQuery failed because of SQL warnings: [%s]", joinedWarnings)) + } + } + } + atomic.AddInt64(&this.migrationContext.TotalRowsCopied, rowsAffected) atomic.AddInt64(&this.migrationContext.Iteration, 1) return nil diff --git a/go/sql/builder.go b/go/sql/builder.go index 332aef100..d6cf505ae 100644 --- a/go/sql/builder.go +++ b/go/sql/builder.go @@ -275,7 +275,7 @@ func BuildUniqueKeyRangeEndPreparedQueryViaOffset(databaseName, tableName string uniqueKeyColumnNames := duplicateNames(uniqueKeyColumns.Names()) uniqueKeyColumnAscending := make([]string, len(uniqueKeyColumnNames)) - uniqueKeyColumnDescending := make([]string, len(uniqueKeyColumnNames)) + uniqueKeyColumnDescending := make([]string, len(uniqueKeyColumnNames)) // TODO unused variable for i, column := range uniqueKeyColumns.Columns() { uniqueKeyColumnNames[i] = EscapeName(uniqueKeyColumnNames[i]) if column.Type == EnumColumnType { @@ -286,25 +286,46 @@ func BuildUniqueKeyRangeEndPreparedQueryViaOffset(databaseName, tableName string uniqueKeyColumnDescending[i] = fmt.Sprintf("%s desc", uniqueKeyColumnNames[i]) } } + joinedColumnNames := strings.Join(uniqueKeyColumnNames, ", ") result = fmt.Sprintf(` select /* gh-ost %s.%s %s */ - %s - from - %s.%s - where - %s and %s + %s, + (select count(*) from ( + select + %s + from + %s.%s + where + %s and %s + limit + %d + ) select_osc_chunk) + from ( + select + %s + from + %s.%s + where + %s and %s + limit + %d + ) select_osc_chunk order by %s limit 1 offset %d`, databaseName, tableName, hint, - strings.Join(uniqueKeyColumnNames, ", "), + joinedColumnNames, joinedColumnNames, databaseName, tableName, - rangeStartComparison, rangeEndComparison, + rangeStartComparison, rangeEndComparison, chunkSize, + joinedColumnNames, + databaseName, tableName, + rangeStartComparison, rangeEndComparison, chunkSize, strings.Join(uniqueKeyColumnAscending, ", "), (chunkSize - 1), ) - return result, explodedArgs, nil + // 2x the explodedArgs for the subquery (CTE would be possible but not supported by MySQL 5) + return result, append(explodedArgs, explodedArgs...), nil } func BuildUniqueKeyRangeEndPreparedQueryViaTemptable(databaseName, tableName string, uniqueKeyColumns *ColumnList, rangeStartArgs, rangeEndArgs []interface{}, chunkSize int64, includeRangeStartValues bool, hint string) (result string, explodedArgs []interface{}, err error) { @@ -342,8 +363,22 @@ func BuildUniqueKeyRangeEndPreparedQueryViaTemptable(databaseName, tableName str uniqueKeyColumnDescending[i] = fmt.Sprintf("%s desc", uniqueKeyColumnNames[i]) } } + + joinedColumnNames := strings.Join(uniqueKeyColumnNames, ", ") result = fmt.Sprintf(` - select /* gh-ost %s.%s %s */ %s + select /* gh-ost %s.%s %s */ + %s, + (select count(*) from ( + select + %s + from + %s.%s + where + %s and %s + order by + %s + limit %d + ) select_osc_chunk) from ( select %s @@ -353,17 +388,22 @@ func BuildUniqueKeyRangeEndPreparedQueryViaTemptable(databaseName, tableName str %s and %s order by %s - limit %d) select_osc_chunk + limit %d + ) select_osc_chunk order by %s limit 1`, - databaseName, tableName, hint, strings.Join(uniqueKeyColumnNames, ", "), - strings.Join(uniqueKeyColumnNames, ", "), databaseName, tableName, + databaseName, tableName, hint, joinedColumnNames, + joinedColumnNames, databaseName, tableName, + rangeStartComparison, rangeEndComparison, + strings.Join(uniqueKeyColumnAscending, ", "), chunkSize, + joinedColumnNames, databaseName, tableName, rangeStartComparison, rangeEndComparison, strings.Join(uniqueKeyColumnAscending, ", "), chunkSize, strings.Join(uniqueKeyColumnDescending, ", "), ) - return result, explodedArgs, nil + // 2x the explodedArgs for the subquery (CTE would be possible but not supported by MySQL 5) + return result, append(explodedArgs, explodedArgs...), nil } func BuildUniqueKeyMinValuesPreparedQuery(databaseName, tableName string, uniqueKey *UniqueKey) (string, error) { diff --git a/go/sql/builder_test.go b/go/sql/builder_test.go index d43f65056..1ef94da9f 100644 --- a/go/sql/builder_test.go +++ b/go/sql/builder_test.go @@ -337,7 +337,18 @@ func TestBuildUniqueKeyRangeEndPreparedQuery(t *testing.T) { query, explodedArgs, err := BuildUniqueKeyRangeEndPreparedQueryViaTemptable(databaseName, originalTableName, uniqueKeyColumns, rangeStartArgs, rangeEndArgs, chunkSize, false, "test") require.NoError(t, err) expected := ` - select /* gh-ost mydb.tbl test */ name, position + select /* gh-ost mydb.tbl test */ + name, position, + (select count(*) from ( + select + name, position + from + mydb.tbl + where ((name > ?) or (((name = ?)) AND (position > ?))) and ((name < ?) or (((name = ?)) AND (position < ?)) or ((name = ?) and (position = ?))) + order by + name asc, position asc + limit 500 + ) select_osc_chunk) from ( select name, position @@ -346,12 +357,13 @@ func TestBuildUniqueKeyRangeEndPreparedQuery(t *testing.T) { where ((name > ?) or (((name = ?)) AND (position > ?))) and ((name < ?) or (((name = ?)) AND (position < ?)) or ((name = ?) and (position = ?))) order by name asc, position asc - limit 500) select_osc_chunk + limit 500 + ) select_osc_chunk order by name desc, position desc limit 1` require.Equal(t, normalizeQuery(expected), normalizeQuery(query)) - require.Equal(t, []interface{}{3, 3, 17, 103, 103, 117, 103, 117}, explodedArgs) + require.Equal(t, []interface{}{3, 3, 17, 103, 103, 117, 103, 117, 3, 3, 17, 103, 103, 117, 103, 117}, explodedArgs) } } diff --git a/localtests/panic-on-warnings-duplicate-unique-values-on-column-type-change/create.sql b/localtests/panic-on-warnings-duplicate-unique-values-on-column-type-change/create.sql new file mode 100644 index 000000000..0bdce5fd7 --- /dev/null +++ b/localtests/panic-on-warnings-duplicate-unique-values-on-column-type-change/create.sql @@ -0,0 +1,25 @@ +drop table if exists gh_ost_test; +create table gh_ost_test ( + id int auto_increment, + name varchar(255) not null, + primary key (id) +) auto_increment=1; + +create unique index name_index on gh_ost_test (name); + +insert into gh_ost_test (`name`) values ('John Smith'); +insert into gh_ost_test (`name`) values ('John Travolta'); + +drop event if exists gh_ost_test; +delimiter ;; +create event gh_ost_test + on schedule every 1 second + starts current_timestamp + ends current_timestamp + interval 60 second + on completion not preserve + enable + do +begin + insert ignore into gh_ost_test values ('John ' || last_insert_id()); + insert ignore into gh_ost_test values ('Adam ' || last_insert_id()); +end ;; diff --git a/localtests/panic-on-warnings-duplicate-unique-values-on-column-type-change/expect_failure b/localtests/panic-on-warnings-duplicate-unique-values-on-column-type-change/expect_failure new file mode 100644 index 000000000..2c81a143c --- /dev/null +++ b/localtests/panic-on-warnings-duplicate-unique-values-on-column-type-change/expect_failure @@ -0,0 +1,2 @@ +Warning: Duplicate entry 'John' for key +Warning: Duplicate entry 'John' for key \ No newline at end of file diff --git a/localtests/panic-on-warnings-duplicate-unique-values-on-column-type-change/extra_args b/localtests/panic-on-warnings-duplicate-unique-values-on-column-type-change/extra_args new file mode 100644 index 000000000..eb98d683a --- /dev/null +++ b/localtests/panic-on-warnings-duplicate-unique-values-on-column-type-change/extra_args @@ -0,0 +1 @@ +--panic-on-warnings --alter "modify column name varchar(4) not null" \ No newline at end of file diff --git a/localtests/panic-on-warnings-duplicate-values-for-unique-index/create.sql b/localtests/panic-on-warnings-duplicate-values-for-unique-index/create.sql new file mode 100644 index 000000000..218f57314 --- /dev/null +++ b/localtests/panic-on-warnings-duplicate-values-for-unique-index/create.sql @@ -0,0 +1,23 @@ +drop table if exists gh_ost_test; +create table gh_ost_test ( + id int auto_increment, + name varchar(255) not null, + primary key (id) +) auto_increment=1; + +insert into gh_ost_test (`name`) values ('John'); +insert into gh_ost_test (`name`) values ('John'); + +drop event if exists gh_ost_test; +delimiter ;; +create event gh_ost_test + on schedule every 1 second + starts current_timestamp + ends current_timestamp + interval 60 second + on completion not preserve + enable + do +begin + insert ignore into gh_ost_test values ('John ' || last_insert_id()); + insert ignore into gh_ost_test values ('Adam ' || last_insert_id()); +end ;; diff --git a/localtests/panic-on-warnings-duplicate-values-for-unique-index/expect_failure b/localtests/panic-on-warnings-duplicate-values-for-unique-index/expect_failure new file mode 100644 index 000000000..0f683a9b8 --- /dev/null +++ b/localtests/panic-on-warnings-duplicate-values-for-unique-index/expect_failure @@ -0,0 +1 @@ +Warning: Duplicate entry 'John' \ No newline at end of file diff --git a/localtests/panic-on-warnings-duplicate-values-for-unique-index/extra_args b/localtests/panic-on-warnings-duplicate-values-for-unique-index/extra_args new file mode 100644 index 000000000..2a9d95a0d --- /dev/null +++ b/localtests/panic-on-warnings-duplicate-values-for-unique-index/extra_args @@ -0,0 +1 @@ +--panic-on-warnings --alter "add unique index name_index(name)" \ No newline at end of file