Skip to content

Commit 753cf88

Browse files
ggildermeiji163
andauthored
Fix data loss when inserting duplicate values during a migration (#1633)
* Fix data loss when PanicOnWarnings encounters duplicate keys on non-migration indexes gh-ost could silently lose data when adding a unique index to a column during a migration, even with the PanicOnWarnings flag enabled. This occurred when: 1. A migration adds a unique index to a column (e.g., email) 2. Rows with duplicate values are inserted into the original table after the bulk copy phase completes (during postponed cutover) 3. These duplicate rows are applied via binlog replay to the ghost table **Expected behavior:** Migration fails with clear error **Actual behavior:** Original rows with duplicate values silently deleted, data lost Original table: id PRIMARY KEY, email (no unique constraint) Ghost table: id PRIMARY KEY, email UNIQUE (being added) Initial state (after bulk copy): - Ghost table: (id=1, email='bob@example.com') (id=2, email='alice@example.com') During postponed cutover: - INSERT (id=3, email='bob@example.com') into original table Binlog replay attempts: - INSERT (id=3, email='bob@example.com') into ghost table - Duplicate email='bob@example.com' (conflicts with id=1) - Row with id=1 silently deleted → DATA LOSS The DMLInsertQueryBuilder used `REPLACE` statements for binlog event replay: ```sql REPLACE INTO ghost_table (id, email) VALUES (3, 'bob@example.com'); REPLACE behavior: - If duplicate PRIMARY KEY: deletes old row, inserts new row (no warning/error) - If duplicate on OTHER unique index: deletes conflicting row, inserts new row - NO warnings or errors generated, so PanicOnWarnings cannot detect the issue The original design assumed REPLACE was needed to handle timing edge cases where binlog replay might process a row before bulk copy, but this caused silent data corruption when other unique indexes had duplicates. Changed DMLInsertQueryBuilder to use INSERT IGNORE instead of REPLACE: INSERT IGNORE INTO ghost_table (id, email) VALUES (3, 'bob@example.com'); INSERT IGNORE behavior: - If duplicate on ANY unique index: skip insert, generate WARNING - Does not delete existing rows Added warning detection to ApplyDMLEventQueries() when PanicOnWarnings is enabled: - Checks SHOW WARNINGS after batch execution - Ignores duplicates on migration unique key (expected - row already copied) - FAILS migration for duplicates on other unique indexes - Transaction rollback ensures no partial state Edge Case: DELETE+INSERT Conversion When an UPDATE modifies the migration unique key (e.g., PRIMARY KEY), gh-ost converts it to DELETE+INSERT within a single transaction: BEGIN; DELETE FROM ghost WHERE id=2; INSERT IGNORE INTO ghost VALUES (3, 'bob@example.com'); COMMIT; If the INSERT encounters a duplicate on a non-migration unique index: - With PanicOnWarnings: Warning detected, transaction rolled back, both DELETE and INSERT undone → no data loss ✓ - Without PanicOnWarnings: DELETE succeeds, INSERT silently skips → data loss. This further reinforces that PanicOnWarnings should default to on. * Address linter issue * Clarify test comment * Consolidate test files * Add execinquery linter comment --------- Co-authored-by: meiji163 <meiji163@github.com>
1 parent c72b237 commit 753cf88

File tree

4 files changed

+300
-7
lines changed

4 files changed

+300
-7
lines changed

go/logic/applier.go

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1546,6 +1546,43 @@ func (this *Applier) ApplyDMLEventQueries(dmlEvents [](*binlog.BinlogDMLEvent))
15461546
if execErr != nil {
15471547
return rollback(execErr)
15481548
}
1549+
1550+
// Check for warnings when PanicOnWarnings is enabled
1551+
if this.migrationContext.PanicOnWarnings {
1552+
//nolint:execinquery
1553+
rows, err := tx.Query("SHOW WARNINGS")
1554+
if err != nil {
1555+
return rollback(err)
1556+
}
1557+
defer rows.Close()
1558+
if err = rows.Err(); err != nil {
1559+
return rollback(err)
1560+
}
1561+
1562+
var sqlWarnings []string
1563+
for rows.Next() {
1564+
var level, message string
1565+
var code int
1566+
if err := rows.Scan(&level, &code, &message); err != nil {
1567+
this.migrationContext.Log.Warningf("Failed to read SHOW WARNINGS row")
1568+
continue
1569+
}
1570+
// Duplicate warnings are formatted differently across mysql versions, hence the optional table name prefix
1571+
migrationUniqueKeyExpression := fmt.Sprintf("for key '(%s\\.)?%s'", this.migrationContext.GetGhostTableName(), this.migrationContext.UniqueKey.NameInGhostTable)
1572+
matched, _ := regexp.MatchString(migrationUniqueKeyExpression, message)
1573+
if strings.Contains(message, "Duplicate entry") && matched {
1574+
// Duplicate entry on migration unique key is expected during binlog replay
1575+
// (row was already copied during bulk copy phase)
1576+
continue
1577+
}
1578+
sqlWarnings = append(sqlWarnings, fmt.Sprintf("%s: %s (%d)", level, message, code))
1579+
}
1580+
if len(sqlWarnings) > 0 {
1581+
warningMsg := fmt.Sprintf("Warnings detected during DML event application: %v", sqlWarnings)
1582+
return rollback(errors.New(warningMsg))
1583+
}
1584+
}
1585+
15491586
if err := tx.Commit(); err != nil {
15501587
return err
15511588
}

go/logic/applier_test.go

Lines changed: 257 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -147,7 +147,7 @@ func TestApplierBuildDMLEventQuery(t *testing.T) {
147147
require.Len(t, res, 1)
148148
require.NoError(t, res[0].err)
149149
require.Equal(t,
150-
`replace /* gh-ost `+"`test`.`_test_gho`"+` */
150+
`insert /* gh-ost `+"`test`.`_test_gho`"+` */ ignore
151151
into
152152
`+"`test`.`_test_gho`"+`
153153
`+"(`id`, `item_id`)"+`
@@ -724,6 +724,262 @@ func (suite *ApplierTestSuite) TestWriteCheckpoint() {
724724
suite.Require().Equal(chk.IsCutover, gotChk.IsCutover)
725725
}
726726

727+
func (suite *ApplierTestSuite) TestPanicOnWarningsWithDuplicateKeyOnNonMigrationIndex() {
728+
ctx := context.Background()
729+
730+
var err error
731+
732+
// Create table with id and email columns, where id is the primary key
733+
_, err = suite.db.ExecContext(ctx, fmt.Sprintf("CREATE TABLE %s (id INT PRIMARY KEY, email VARCHAR(100));", getTestTableName()))
734+
suite.Require().NoError(err)
735+
736+
// Create ghost table with same schema plus a new unique index on email
737+
_, err = suite.db.ExecContext(ctx, fmt.Sprintf("CREATE TABLE %s (id INT PRIMARY KEY, email VARCHAR(100), UNIQUE KEY email_unique (email));", getTestGhostTableName()))
738+
suite.Require().NoError(err)
739+
740+
connectionConfig, err := getTestConnectionConfig(ctx, suite.mysqlContainer)
741+
suite.Require().NoError(err)
742+
743+
migrationContext := newTestMigrationContext()
744+
migrationContext.ApplierConnectionConfig = connectionConfig
745+
migrationContext.SetConnectionConfig("innodb")
746+
747+
migrationContext.PanicOnWarnings = true
748+
749+
migrationContext.OriginalTableColumns = sql.NewColumnList([]string{"id", "email"})
750+
migrationContext.SharedColumns = sql.NewColumnList([]string{"id", "email"})
751+
migrationContext.MappedSharedColumns = sql.NewColumnList([]string{"id", "email"})
752+
migrationContext.UniqueKey = &sql.UniqueKey{
753+
Name: "PRIMARY",
754+
NameInGhostTable: "PRIMARY",
755+
Columns: *sql.NewColumnList([]string{"id"}),
756+
}
757+
758+
applier := NewApplier(migrationContext)
759+
suite.Require().NoError(applier.prepareQueries())
760+
defer applier.Teardown()
761+
762+
err = applier.InitDBConnections()
763+
suite.Require().NoError(err)
764+
765+
// Insert initial rows into ghost table (simulating bulk copy phase)
766+
_, err = suite.db.ExecContext(ctx, fmt.Sprintf("INSERT INTO %s (id, email) VALUES (1, 'user1@example.com'), (2, 'user2@example.com'), (3, 'user3@example.com');", getTestGhostTableName()))
767+
suite.Require().NoError(err)
768+
769+
// Simulate binlog event: try to insert a row with duplicate email
770+
// This should fail with a warning because the ghost table has a unique index on email
771+
dmlEvents := []*binlog.BinlogDMLEvent{
772+
{
773+
DatabaseName: testMysqlDatabase,
774+
TableName: testMysqlTableName,
775+
DML: binlog.InsertDML,
776+
NewColumnValues: sql.ToColumnValues([]interface{}{4, "user2@example.com"}), // duplicate email
777+
},
778+
}
779+
780+
// This should return an error when PanicOnWarnings is enabled
781+
err = applier.ApplyDMLEventQueries(dmlEvents)
782+
suite.Require().Error(err)
783+
suite.Require().Contains(err.Error(), "Duplicate entry")
784+
785+
// Verify that the ghost table still has only 3 rows (no data loss)
786+
rows, err := suite.db.Query("SELECT * FROM " + getTestGhostTableName() + " ORDER BY id")
787+
suite.Require().NoError(err)
788+
defer rows.Close()
789+
790+
var count int
791+
for rows.Next() {
792+
var id int
793+
var email string
794+
err = rows.Scan(&id, &email)
795+
suite.Require().NoError(err)
796+
count += 1
797+
}
798+
suite.Require().NoError(rows.Err())
799+
800+
// All 3 original rows should still be present
801+
suite.Require().Equal(3, count)
802+
}
803+
804+
// TestUpdateModifyingUniqueKeyWithDuplicateOnOtherIndex tests the scenario where:
805+
// 1. An UPDATE modifies the unique key (converted to DELETE+INSERT)
806+
// 2. The INSERT would create a duplicate on a NON-migration unique index
807+
// 3. Without warning detection: DELETE succeeds, INSERT IGNORE skips = DATA LOSS
808+
// 4. With PanicOnWarnings: Warning detected, transaction rolled back, no data loss
809+
// This test verifies that PanicOnWarnings correctly prevents the data loss scenario.
810+
func (suite *ApplierTestSuite) TestUpdateModifyingUniqueKeyWithDuplicateOnOtherIndex() {
811+
ctx := context.Background()
812+
813+
var err error
814+
815+
// Create table with id (PRIMARY) and email (NO unique constraint yet)
816+
_, err = suite.db.ExecContext(ctx, fmt.Sprintf("CREATE TABLE %s (id INT PRIMARY KEY, email VARCHAR(100));", getTestTableName()))
817+
suite.Require().NoError(err)
818+
819+
// Create ghost table with id (PRIMARY) AND email unique index (being added)
820+
_, err = suite.db.ExecContext(ctx, fmt.Sprintf("CREATE TABLE %s (id INT PRIMARY KEY, email VARCHAR(100), UNIQUE KEY email_unique (email));", getTestGhostTableName()))
821+
suite.Require().NoError(err)
822+
823+
connectionConfig, err := getTestConnectionConfig(ctx, suite.mysqlContainer)
824+
suite.Require().NoError(err)
825+
826+
migrationContext := newTestMigrationContext()
827+
migrationContext.ApplierConnectionConfig = connectionConfig
828+
migrationContext.SetConnectionConfig("innodb")
829+
830+
migrationContext.PanicOnWarnings = true
831+
832+
migrationContext.OriginalTableColumns = sql.NewColumnList([]string{"id", "email"})
833+
migrationContext.SharedColumns = sql.NewColumnList([]string{"id", "email"})
834+
migrationContext.MappedSharedColumns = sql.NewColumnList([]string{"id", "email"})
835+
migrationContext.UniqueKey = &sql.UniqueKey{
836+
Name: "PRIMARY",
837+
NameInGhostTable: "PRIMARY",
838+
Columns: *sql.NewColumnList([]string{"id"}),
839+
}
840+
841+
applier := NewApplier(migrationContext)
842+
suite.Require().NoError(applier.prepareQueries())
843+
defer applier.Teardown()
844+
845+
err = applier.InitDBConnections()
846+
suite.Require().NoError(err)
847+
848+
// Setup: Insert initial rows into ghost table
849+
// Row 1: id=1, email='bob@example.com'
850+
// Row 2: id=2, email='charlie@example.com'
851+
_, err = suite.db.ExecContext(ctx, fmt.Sprintf("INSERT INTO %s (id, email) VALUES (1, 'bob@example.com'), (2, 'charlie@example.com');", getTestGhostTableName()))
852+
suite.Require().NoError(err)
853+
854+
// Simulate binlog event: UPDATE that changes BOTH PRIMARY KEY and email
855+
// From: id=2, email='charlie@example.com'
856+
// To: id=3, email='bob@example.com' (duplicate email with id=1)
857+
// This will be converted to DELETE (id=2) + INSERT (id=3, 'bob@example.com')
858+
// With INSERT IGNORE, the INSERT will skip because email='bob@example.com' already exists in id=1
859+
// Result: id=2 deleted, id=3 never inserted = DATA LOSS
860+
dmlEvents := []*binlog.BinlogDMLEvent{
861+
{
862+
DatabaseName: testMysqlDatabase,
863+
TableName: testMysqlTableName,
864+
DML: binlog.UpdateDML,
865+
NewColumnValues: sql.ToColumnValues([]interface{}{3, "bob@example.com"}), // new: id=3, email='bob@example.com'
866+
WhereColumnValues: sql.ToColumnValues([]interface{}{2, "charlie@example.com"}), // old: id=2, email='charlie@example.com'
867+
},
868+
}
869+
870+
// First verify this would be converted to DELETE+INSERT
871+
buildResults := applier.buildDMLEventQuery(dmlEvents[0])
872+
suite.Require().Len(buildResults, 2, "UPDATE modifying unique key should be converted to DELETE+INSERT")
873+
874+
// Apply the event - this should FAIL because INSERT will have duplicate email warning
875+
err = applier.ApplyDMLEventQueries(dmlEvents)
876+
suite.Require().Error(err, "Should fail when DELETE+INSERT causes duplicate on non-migration unique key")
877+
suite.Require().Contains(err.Error(), "Duplicate entry", "Error should mention duplicate entry")
878+
879+
// Verify that BOTH rows still exist (transaction rolled back)
880+
rows, err := suite.db.Query("SELECT id, email FROM " + getTestGhostTableName() + " ORDER BY id")
881+
suite.Require().NoError(err)
882+
defer rows.Close()
883+
884+
var count int
885+
var ids []int
886+
var emails []string
887+
for rows.Next() {
888+
var id int
889+
var email string
890+
err = rows.Scan(&id, &email)
891+
suite.Require().NoError(err)
892+
ids = append(ids, id)
893+
emails = append(emails, email)
894+
count++
895+
}
896+
suite.Require().NoError(rows.Err())
897+
898+
// Transaction should have rolled back, so original 2 rows should still be there
899+
suite.Require().Equal(2, count, "Should still have 2 rows after failed transaction")
900+
suite.Require().Equal([]int{1, 2}, ids, "Should have original ids")
901+
suite.Require().Equal([]string{"bob@example.com", "charlie@example.com"}, emails)
902+
}
903+
904+
// TestNormalUpdateWithPanicOnWarnings tests that normal UPDATEs (not modifying unique key) work correctly
905+
func (suite *ApplierTestSuite) TestNormalUpdateWithPanicOnWarnings() {
906+
ctx := context.Background()
907+
908+
var err error
909+
910+
// Create table with id (PRIMARY) and email
911+
_, err = suite.db.ExecContext(ctx, fmt.Sprintf("CREATE TABLE %s (id INT PRIMARY KEY, email VARCHAR(100));", getTestTableName()))
912+
suite.Require().NoError(err)
913+
914+
// Create ghost table with same schema plus unique index on email
915+
_, err = suite.db.ExecContext(ctx, fmt.Sprintf("CREATE TABLE %s (id INT PRIMARY KEY, email VARCHAR(100), UNIQUE KEY email_unique (email));", getTestGhostTableName()))
916+
suite.Require().NoError(err)
917+
918+
connectionConfig, err := getTestConnectionConfig(ctx, suite.mysqlContainer)
919+
suite.Require().NoError(err)
920+
921+
migrationContext := newTestMigrationContext()
922+
migrationContext.ApplierConnectionConfig = connectionConfig
923+
migrationContext.SetConnectionConfig("innodb")
924+
925+
migrationContext.PanicOnWarnings = true
926+
927+
migrationContext.OriginalTableColumns = sql.NewColumnList([]string{"id", "email"})
928+
migrationContext.SharedColumns = sql.NewColumnList([]string{"id", "email"})
929+
migrationContext.MappedSharedColumns = sql.NewColumnList([]string{"id", "email"})
930+
migrationContext.UniqueKey = &sql.UniqueKey{
931+
Name: "PRIMARY",
932+
NameInGhostTable: "PRIMARY",
933+
Columns: *sql.NewColumnList([]string{"id"}),
934+
}
935+
936+
applier := NewApplier(migrationContext)
937+
suite.Require().NoError(applier.prepareQueries())
938+
defer applier.Teardown()
939+
940+
err = applier.InitDBConnections()
941+
suite.Require().NoError(err)
942+
943+
// Setup: Insert initial rows into ghost table
944+
_, err = suite.db.ExecContext(ctx, fmt.Sprintf("INSERT INTO %s (id, email) VALUES (1, 'alice@example.com'), (2, 'bob@example.com');", getTestGhostTableName()))
945+
suite.Require().NoError(err)
946+
947+
// Simulate binlog event: Normal UPDATE that only changes email (not PRIMARY KEY)
948+
// This should use UPDATE query, not DELETE+INSERT
949+
dmlEvents := []*binlog.BinlogDMLEvent{
950+
{
951+
DatabaseName: testMysqlDatabase,
952+
TableName: testMysqlTableName,
953+
DML: binlog.UpdateDML,
954+
NewColumnValues: sql.ToColumnValues([]interface{}{2, "robert@example.com"}), // update email only
955+
WhereColumnValues: sql.ToColumnValues([]interface{}{2, "bob@example.com"}),
956+
},
957+
}
958+
959+
// Verify this generates a single UPDATE query (not DELETE+INSERT)
960+
buildResults := applier.buildDMLEventQuery(dmlEvents[0])
961+
suite.Require().Len(buildResults, 1, "Normal UPDATE should generate single UPDATE query")
962+
963+
// Apply the event - should succeed
964+
err = applier.ApplyDMLEventQueries(dmlEvents)
965+
suite.Require().NoError(err)
966+
967+
// Verify the update was applied correctly
968+
rows, err := suite.db.Query("SELECT id, email FROM " + getTestGhostTableName() + " WHERE id = 2")
969+
suite.Require().NoError(err)
970+
defer rows.Close()
971+
972+
var id int
973+
var email string
974+
suite.Require().True(rows.Next(), "Should find updated row")
975+
err = rows.Scan(&id, &email)
976+
suite.Require().NoError(err)
977+
suite.Require().Equal(2, id)
978+
suite.Require().Equal("robert@example.com", email)
979+
suite.Require().False(rows.Next(), "Should only have one row")
980+
suite.Require().NoError(rows.Err())
981+
}
982+
727983
func TestApplier(t *testing.T) {
728984
suite.Run(t, new(ApplierTestSuite))
729985
}

go/sql/builder.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -566,7 +566,7 @@ func NewDMLInsertQueryBuilder(databaseName, tableName string, tableColumns, shar
566566
preparedValues := buildColumnsPreparedValues(mappedSharedColumns)
567567

568568
stmt := fmt.Sprintf(`
569-
replace /* gh-ost %s.%s */
569+
insert /* gh-ost %s.%s */ ignore
570570
into
571571
%s.%s
572572
(%s)

go/sql/builder_test.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -538,7 +538,7 @@ func TestBuildDMLInsertQuery(t *testing.T) {
538538
query, sharedArgs, err := builder.BuildQuery(args)
539539
require.NoError(t, err)
540540
expected := `
541-
replace /* gh-ost mydb.tbl */
541+
insert /* gh-ost mydb.tbl */ ignore
542542
into mydb.tbl
543543
(id, name, position, age)
544544
values
@@ -554,7 +554,7 @@ func TestBuildDMLInsertQuery(t *testing.T) {
554554
query, sharedArgs, err := builder.BuildQuery(args)
555555
require.NoError(t, err)
556556
expected := `
557-
replace /* gh-ost mydb.tbl */
557+
insert /* gh-ost mydb.tbl */ ignore
558558
into mydb.tbl
559559
(position, name, age, id)
560560
values
@@ -589,7 +589,7 @@ func TestBuildDMLInsertQuerySignedUnsigned(t *testing.T) {
589589
query, sharedArgs, err := builder.BuildQuery(args)
590590
require.NoError(t, err)
591591
expected := `
592-
replace /* gh-ost mydb.tbl */
592+
insert /* gh-ost mydb.tbl */ ignore
593593
into mydb.tbl
594594
(id, name, position, age)
595595
values
@@ -607,7 +607,7 @@ func TestBuildDMLInsertQuerySignedUnsigned(t *testing.T) {
607607
query, sharedArgs, err := builder.BuildQuery(args)
608608
require.NoError(t, err)
609609
expected := `
610-
replace /* gh-ost mydb.tbl */
610+
insert /* gh-ost mydb.tbl */ ignore
611611
into mydb.tbl
612612
(id, name, position, age)
613613
values
@@ -625,7 +625,7 @@ func TestBuildDMLInsertQuerySignedUnsigned(t *testing.T) {
625625
query, sharedArgs, err := builder.BuildQuery(args)
626626
require.NoError(t, err)
627627
expected := `
628-
replace /* gh-ost mydb.tbl */
628+
insert /* gh-ost mydb.tbl */ ignore
629629
into mydb.tbl
630630
(id, name, position, age)
631631
values

0 commit comments

Comments
 (0)