Skip to content

Commit 57b77b3

Browse files
committed
Fix data loss when PanicOnWarnings encounters duplicate keys on non-migration indexes
gh-ost could silently lose data when adding a unique index to a column during a migration, even with the PanicOnWarnings flag enabled. This occurred when: 1. A migration adds a unique index to a column (e.g., email) 2. Rows with duplicate values are inserted into the original table after the bulk copy phase completes (during postponed cutover) 3. These duplicate rows are applied via binlog replay to the ghost table **Expected behavior:** Migration fails with clear error **Actual behavior:** Original rows with duplicate values silently deleted, data lost Original table: id PRIMARY KEY, email (no unique constraint) Ghost table: id PRIMARY KEY, email UNIQUE (being added) Initial state (after bulk copy): - Ghost table: (id=1, email='bob@example.com') (id=2, email='alice@example.com') During postponed cutover: - INSERT (id=3, email='bob@example.com') into original table Binlog replay attempts: - INSERT (id=3, email='bob@example.com') into ghost table - Duplicate email='bob@example.com' (conflicts with id=1) - Row with id=1 silently deleted → DATA LOSS The DMLInsertQueryBuilder used `REPLACE` statements for binlog event replay: ```sql REPLACE INTO ghost_table (id, email) VALUES (3, 'bob@example.com'); REPLACE behavior: - If duplicate PRIMARY KEY: deletes old row, inserts new row (no warning/error) - If duplicate on OTHER unique index: deletes conflicting row, inserts new row - NO warnings or errors generated, so PanicOnWarnings cannot detect the issue The original design assumed REPLACE was needed to handle timing edge cases where binlog replay might process a row before bulk copy, but this caused silent data corruption when other unique indexes had duplicates. Changed DMLInsertQueryBuilder to use INSERT IGNORE instead of REPLACE: INSERT IGNORE INTO ghost_table (id, email) VALUES (3, 'bob@example.com'); INSERT IGNORE behavior: - If duplicate on ANY unique index: skip insert, generate WARNING - Does not delete existing rows Added warning detection to ApplyDMLEventQueries() when PanicOnWarnings is enabled: - Checks SHOW WARNINGS after batch execution - Ignores duplicates on migration unique key (expected - row already copied) - FAILS migration for duplicates on other unique indexes - Transaction rollback ensures no partial state Edge Case: DELETE+INSERT Conversion When an UPDATE modifies the migration unique key (e.g., PRIMARY KEY), gh-ost converts it to DELETE+INSERT within a single transaction: BEGIN; DELETE FROM ghost WHERE id=2; INSERT IGNORE INTO ghost VALUES (3, 'bob@example.com'); COMMIT; If the INSERT encounters a duplicate on a non-migration unique index: - With PanicOnWarnings: Warning detected, transaction rolled back, both DELETE and INSERT undone → no data loss ✓ - Without PanicOnWarnings: DELETE succeeds, INSERT silently skips → data loss. This further reinforces that PanicOnWarnings should default to on.
1 parent c6f95cc commit 57b77b3

File tree

5 files changed

+321
-7
lines changed

5 files changed

+321
-7
lines changed

go/logic/applier.go

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1557,6 +1557,42 @@ func (this *Applier) ApplyDMLEventQueries(dmlEvents [](*binlog.BinlogDMLEvent))
15571557
if execErr != nil {
15581558
return rollback(execErr)
15591559
}
1560+
1561+
// Check for warnings when PanicOnWarnings is enabled
1562+
if this.migrationContext.PanicOnWarnings {
1563+
rows, err := tx.Query("SHOW WARNINGS")
1564+
if err != nil {
1565+
return rollback(err)
1566+
}
1567+
defer rows.Close()
1568+
if err = rows.Err(); err != nil {
1569+
return rollback(err)
1570+
}
1571+
1572+
var sqlWarnings []string
1573+
for rows.Next() {
1574+
var level, message string
1575+
var code int
1576+
if err := rows.Scan(&level, &code, &message); err != nil {
1577+
this.migrationContext.Log.Warningf("Failed to read SHOW WARNINGS row")
1578+
continue
1579+
}
1580+
// Duplicate warnings are formatted differently across mysql versions, hence the optional table name prefix
1581+
migrationUniqueKeyExpression := fmt.Sprintf("for key '(%s\\.)?%s'", this.migrationContext.GetGhostTableName(), this.migrationContext.UniqueKey.NameInGhostTable)
1582+
matched, _ := regexp.MatchString(migrationUniqueKeyExpression, message)
1583+
if strings.Contains(message, "Duplicate entry") && matched {
1584+
// Duplicate entry on migration unique key is expected during binlog replay
1585+
// (row was already copied during bulk copy phase)
1586+
continue
1587+
}
1588+
sqlWarnings = append(sqlWarnings, fmt.Sprintf("%s: %s (%d)", level, message, code))
1589+
}
1590+
if len(sqlWarnings) > 0 {
1591+
warningMsg := fmt.Sprintf("Warnings detected during DML event application: %v", sqlWarnings)
1592+
return rollback(fmt.Errorf(warningMsg))
1593+
}
1594+
}
1595+
15601596
if err := tx.Commit(); err != nil {
15611597
return err
15621598
}
Lines changed: 201 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,201 @@
1+
/*
2+
Copyright 2025 GitHub Inc.
3+
See https://github.com/github/gh-ost/blob/master/LICENSE
4+
*/
5+
6+
package logic
7+
8+
import (
9+
"context"
10+
"fmt"
11+
"testing"
12+
13+
"github.com/github/gh-ost/go/binlog"
14+
"github.com/github/gh-ost/go/sql"
15+
"github.com/stretchr/testify/suite"
16+
)
17+
18+
type DeleteInsertTestSuite struct {
19+
ApplierTestSuite
20+
}
21+
22+
// TestUpdateModifyingUniqueKeyWithDuplicateOnOtherIndex tests the scenario where:
23+
// 1. An UPDATE modifies the unique key (converted to DELETE+INSERT)
24+
// 2. The INSERT would create a duplicate on a NON-migration unique index
25+
// 3. With INSERT IGNORE, the DELETE succeeds but INSERT skips = DATA LOSS
26+
func (suite *DeleteInsertTestSuite) TestUpdateModifyingUniqueKeyWithDuplicateOnOtherIndex() {
27+
ctx := context.Background()
28+
29+
var err error
30+
31+
// Create table with id (PRIMARY) and email (NO unique constraint yet)
32+
_, err = suite.db.ExecContext(ctx, fmt.Sprintf("CREATE TABLE %s (id INT PRIMARY KEY, email VARCHAR(100));", getTestTableName()))
33+
suite.Require().NoError(err)
34+
35+
// Create ghost table with id (PRIMARY) AND email unique index (being added)
36+
_, err = suite.db.ExecContext(ctx, fmt.Sprintf("CREATE TABLE %s (id INT PRIMARY KEY, email VARCHAR(100), UNIQUE KEY email_unique (email));", getTestGhostTableName()))
37+
suite.Require().NoError(err)
38+
39+
connectionConfig, err := getTestConnectionConfig(ctx, suite.mysqlContainer)
40+
suite.Require().NoError(err)
41+
42+
migrationContext := newTestMigrationContext()
43+
migrationContext.ApplierConnectionConfig = connectionConfig
44+
migrationContext.SetConnectionConfig("innodb")
45+
46+
migrationContext.PanicOnWarnings = true
47+
48+
migrationContext.OriginalTableColumns = sql.NewColumnList([]string{"id", "email"})
49+
migrationContext.SharedColumns = sql.NewColumnList([]string{"id", "email"})
50+
migrationContext.MappedSharedColumns = sql.NewColumnList([]string{"id", "email"})
51+
migrationContext.UniqueKey = &sql.UniqueKey{
52+
Name: "PRIMARY",
53+
NameInGhostTable: "PRIMARY",
54+
Columns: *sql.NewColumnList([]string{"id"}),
55+
}
56+
57+
applier := NewApplier(migrationContext)
58+
suite.Require().NoError(applier.prepareQueries())
59+
defer applier.Teardown()
60+
61+
err = applier.InitDBConnections()
62+
suite.Require().NoError(err)
63+
64+
// Setup: Insert initial rows into ghost table
65+
// Row 1: id=1, email='bob@example.com'
66+
// Row 2: id=2, email='charlie@example.com'
67+
_, err = suite.db.ExecContext(ctx, fmt.Sprintf("INSERT INTO %s (id, email) VALUES (1, 'bob@example.com'), (2, 'charlie@example.com');", getTestGhostTableName()))
68+
suite.Require().NoError(err)
69+
70+
// Simulate binlog event: UPDATE that changes BOTH PRIMARY KEY and email
71+
// From: id=2, email='charlie@example.com'
72+
// To: id=3, email='bob@example.com' (duplicate email with id=1)
73+
// This will be converted to DELETE (id=2) + INSERT (id=3, 'bob@example.com')
74+
// With INSERT IGNORE, the INSERT will skip because email='bob@example.com' already exists in id=1
75+
// Result: id=2 deleted, id=3 never inserted = DATA LOSS
76+
dmlEvents := []*binlog.BinlogDMLEvent{
77+
{
78+
DatabaseName: testMysqlDatabase,
79+
TableName: testMysqlTableName,
80+
DML: binlog.UpdateDML,
81+
NewColumnValues: sql.ToColumnValues([]interface{}{3, "bob@example.com"}), // new: id=3, email='bob@example.com'
82+
WhereColumnValues: sql.ToColumnValues([]interface{}{2, "charlie@example.com"}), // old: id=2, email='charlie@example.com'
83+
},
84+
}
85+
86+
// First verify this would be converted to DELETE+INSERT
87+
buildResults := applier.buildDMLEventQuery(dmlEvents[0])
88+
suite.Require().Len(buildResults, 2, "UPDATE modifying unique key should be converted to DELETE+INSERT")
89+
90+
// Apply the event - this should FAIL because INSERT will have duplicate email warning
91+
err = applier.ApplyDMLEventQueries(dmlEvents)
92+
suite.Require().Error(err, "Should fail when DELETE+INSERT causes duplicate on non-migration unique key")
93+
suite.Require().Contains(err.Error(), "Duplicate entry", "Error should mention duplicate entry")
94+
95+
// Verify that BOTH rows still exist (transaction rolled back)
96+
rows, err := suite.db.Query("SELECT id, email FROM " + getTestGhostTableName() + " ORDER BY id")
97+
suite.Require().NoError(err)
98+
defer rows.Close()
99+
100+
var count int
101+
var ids []int
102+
var emails []string
103+
for rows.Next() {
104+
var id int
105+
var email string
106+
err = rows.Scan(&id, &email)
107+
suite.Require().NoError(err)
108+
ids = append(ids, id)
109+
emails = append(emails, email)
110+
count++
111+
}
112+
suite.Require().NoError(rows.Err())
113+
114+
// Transaction should have rolled back, so original 2 rows should still be there
115+
suite.Require().Equal(2, count, "Should still have 2 rows after failed transaction")
116+
suite.Require().Equal([]int{1, 2}, ids, "Should have original ids")
117+
suite.Require().Equal([]string{"bob@example.com", "charlie@example.com"}, emails)
118+
}
119+
120+
// TestNormalUpdateWithPanicOnWarnings tests that normal UPDATEs (not modifying unique key) work correctly
121+
func (suite *DeleteInsertTestSuite) TestNormalUpdateWithPanicOnWarnings() {
122+
ctx := context.Background()
123+
124+
var err error
125+
126+
// Create table with id (PRIMARY) and email
127+
_, err = suite.db.ExecContext(ctx, fmt.Sprintf("CREATE TABLE %s (id INT PRIMARY KEY, email VARCHAR(100));", getTestTableName()))
128+
suite.Require().NoError(err)
129+
130+
// Create ghost table with same schema plus unique index on email
131+
_, err = suite.db.ExecContext(ctx, fmt.Sprintf("CREATE TABLE %s (id INT PRIMARY KEY, email VARCHAR(100), UNIQUE KEY email_unique (email));", getTestGhostTableName()))
132+
suite.Require().NoError(err)
133+
134+
connectionConfig, err := getTestConnectionConfig(ctx, suite.mysqlContainer)
135+
suite.Require().NoError(err)
136+
137+
migrationContext := newTestMigrationContext()
138+
migrationContext.ApplierConnectionConfig = connectionConfig
139+
migrationContext.SetConnectionConfig("innodb")
140+
141+
migrationContext.PanicOnWarnings = true
142+
143+
migrationContext.OriginalTableColumns = sql.NewColumnList([]string{"id", "email"})
144+
migrationContext.SharedColumns = sql.NewColumnList([]string{"id", "email"})
145+
migrationContext.MappedSharedColumns = sql.NewColumnList([]string{"id", "email"})
146+
migrationContext.UniqueKey = &sql.UniqueKey{
147+
Name: "PRIMARY",
148+
NameInGhostTable: "PRIMARY",
149+
Columns: *sql.NewColumnList([]string{"id"}),
150+
}
151+
152+
applier := NewApplier(migrationContext)
153+
suite.Require().NoError(applier.prepareQueries())
154+
defer applier.Teardown()
155+
156+
err = applier.InitDBConnections()
157+
suite.Require().NoError(err)
158+
159+
// Setup: Insert initial rows into ghost table
160+
_, err = suite.db.ExecContext(ctx, fmt.Sprintf("INSERT INTO %s (id, email) VALUES (1, 'alice@example.com'), (2, 'bob@example.com');", getTestGhostTableName()))
161+
suite.Require().NoError(err)
162+
163+
// Simulate binlog event: Normal UPDATE that only changes email (not PRIMARY KEY)
164+
// This should use UPDATE query, not DELETE+INSERT
165+
dmlEvents := []*binlog.BinlogDMLEvent{
166+
{
167+
DatabaseName: testMysqlDatabase,
168+
TableName: testMysqlTableName,
169+
DML: binlog.UpdateDML,
170+
NewColumnValues: sql.ToColumnValues([]interface{}{2, "robert@example.com"}), // update email only
171+
WhereColumnValues: sql.ToColumnValues([]interface{}{2, "bob@example.com"}),
172+
},
173+
}
174+
175+
// Verify this generates a single UPDATE query (not DELETE+INSERT)
176+
buildResults := applier.buildDMLEventQuery(dmlEvents[0])
177+
suite.Require().Len(buildResults, 1, "Normal UPDATE should generate single UPDATE query")
178+
179+
// Apply the event - should succeed
180+
err = applier.ApplyDMLEventQueries(dmlEvents)
181+
suite.Require().NoError(err)
182+
183+
// Verify the update was applied correctly
184+
rows, err := suite.db.Query("SELECT id, email FROM " + getTestGhostTableName() + " WHERE id = 2")
185+
suite.Require().NoError(err)
186+
defer rows.Close()
187+
188+
var id int
189+
var email string
190+
suite.Require().True(rows.Next(), "Should find updated row")
191+
err = rows.Scan(&id, &email)
192+
suite.Require().NoError(err)
193+
suite.Require().Equal(2, id)
194+
suite.Require().Equal("robert@example.com", email)
195+
suite.Require().False(rows.Next(), "Should only have one row")
196+
suite.Require().NoError(rows.Err())
197+
}
198+
199+
func TestDeleteInsert(t *testing.T) {
200+
suite.Run(t, new(DeleteInsertTestSuite))
201+
}

go/logic/applier_test.go

Lines changed: 78 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -147,7 +147,7 @@ func TestApplierBuildDMLEventQuery(t *testing.T) {
147147
require.Len(t, res, 1)
148148
require.NoError(t, res[0].err)
149149
require.Equal(t,
150-
`replace /* gh-ost `+"`test`.`_test_gho`"+` */
150+
`insert /* gh-ost `+"`test`.`_test_gho`"+` */ ignore
151151
into
152152
`+"`test`.`_test_gho`"+`
153153
`+"(`id`, `item_id`)"+`
@@ -721,6 +721,83 @@ func (suite *ApplierTestSuite) TestWriteCheckpoint() {
721721
suite.Require().Equal(chk.IsCutover, gotChk.IsCutover)
722722
}
723723

724+
func (suite *ApplierTestSuite) TestPanicOnWarningsWithDuplicateKeyOnNonMigrationIndex() {
725+
ctx := context.Background()
726+
727+
var err error
728+
729+
// Create table with id and email columns, where id is the primary key
730+
_, err = suite.db.ExecContext(ctx, fmt.Sprintf("CREATE TABLE %s (id INT PRIMARY KEY, email VARCHAR(100));", getTestTableName()))
731+
suite.Require().NoError(err)
732+
733+
// Create ghost table with same schema plus a new unique index on email
734+
_, err = suite.db.ExecContext(ctx, fmt.Sprintf("CREATE TABLE %s (id INT PRIMARY KEY, email VARCHAR(100), UNIQUE KEY email_unique (email));", getTestGhostTableName()))
735+
suite.Require().NoError(err)
736+
737+
connectionConfig, err := getTestConnectionConfig(ctx, suite.mysqlContainer)
738+
suite.Require().NoError(err)
739+
740+
migrationContext := newTestMigrationContext()
741+
migrationContext.ApplierConnectionConfig = connectionConfig
742+
migrationContext.SetConnectionConfig("innodb")
743+
744+
migrationContext.PanicOnWarnings = true
745+
746+
migrationContext.OriginalTableColumns = sql.NewColumnList([]string{"id", "email"})
747+
migrationContext.SharedColumns = sql.NewColumnList([]string{"id", "email"})
748+
migrationContext.MappedSharedColumns = sql.NewColumnList([]string{"id", "email"})
749+
migrationContext.UniqueKey = &sql.UniqueKey{
750+
Name: "PRIMARY",
751+
NameInGhostTable: "PRIMARY",
752+
Columns: *sql.NewColumnList([]string{"id"}),
753+
}
754+
755+
applier := NewApplier(migrationContext)
756+
suite.Require().NoError(applier.prepareQueries())
757+
defer applier.Teardown()
758+
759+
err = applier.InitDBConnections()
760+
suite.Require().NoError(err)
761+
762+
// Insert initial rows into ghost table (simulating bulk copy phase)
763+
_, err = suite.db.ExecContext(ctx, fmt.Sprintf("INSERT INTO %s (id, email) VALUES (1, 'user1@example.com'), (2, 'user2@example.com'), (3, 'user3@example.com');", getTestGhostTableName()))
764+
suite.Require().NoError(err)
765+
766+
// Simulate binlog event: try to insert a row with duplicate email
767+
// This should fail with a warning because the ghost table has a unique index on email
768+
dmlEvents := []*binlog.BinlogDMLEvent{
769+
{
770+
DatabaseName: testMysqlDatabase,
771+
TableName: testMysqlTableName,
772+
DML: binlog.InsertDML,
773+
NewColumnValues: sql.ToColumnValues([]interface{}{4, "user2@example.com"}), // duplicate email
774+
},
775+
}
776+
777+
// This should return an error when PanicOnWarnings is enabled
778+
err = applier.ApplyDMLEventQueries(dmlEvents)
779+
suite.Require().Error(err)
780+
suite.Require().Contains(err.Error(), "Duplicate entry")
781+
782+
// Verify that the ghost table still has only 3 rows (no data loss)
783+
rows, err := suite.db.Query("SELECT * FROM " + getTestGhostTableName() + " ORDER BY id")
784+
suite.Require().NoError(err)
785+
defer rows.Close()
786+
787+
var count int
788+
for rows.Next() {
789+
var id int
790+
var email string
791+
err = rows.Scan(&id, &email)
792+
suite.Require().NoError(err)
793+
count += 1
794+
}
795+
suite.Require().NoError(rows.Err())
796+
797+
// All 3 original rows should still be present
798+
suite.Require().Equal(3, count)
799+
}
800+
724801
func TestApplier(t *testing.T) {
725802
suite.Run(t, new(ApplierTestSuite))
726803
}

go/sql/builder.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -566,7 +566,7 @@ func NewDMLInsertQueryBuilder(databaseName, tableName string, tableColumns, shar
566566
preparedValues := buildColumnsPreparedValues(mappedSharedColumns)
567567

568568
stmt := fmt.Sprintf(`
569-
replace /* gh-ost %s.%s */
569+
insert /* gh-ost %s.%s */ ignore
570570
into
571571
%s.%s
572572
(%s)

go/sql/builder_test.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -538,7 +538,7 @@ func TestBuildDMLInsertQuery(t *testing.T) {
538538
query, sharedArgs, err := builder.BuildQuery(args)
539539
require.NoError(t, err)
540540
expected := `
541-
replace /* gh-ost mydb.tbl */
541+
insert /* gh-ost mydb.tbl */ ignore
542542
into mydb.tbl
543543
(id, name, position, age)
544544
values
@@ -554,7 +554,7 @@ func TestBuildDMLInsertQuery(t *testing.T) {
554554
query, sharedArgs, err := builder.BuildQuery(args)
555555
require.NoError(t, err)
556556
expected := `
557-
replace /* gh-ost mydb.tbl */
557+
insert /* gh-ost mydb.tbl */ ignore
558558
into mydb.tbl
559559
(position, name, age, id)
560560
values
@@ -589,7 +589,7 @@ func TestBuildDMLInsertQuerySignedUnsigned(t *testing.T) {
589589
query, sharedArgs, err := builder.BuildQuery(args)
590590
require.NoError(t, err)
591591
expected := `
592-
replace /* gh-ost mydb.tbl */
592+
insert /* gh-ost mydb.tbl */ ignore
593593
into mydb.tbl
594594
(id, name, position, age)
595595
values
@@ -607,7 +607,7 @@ func TestBuildDMLInsertQuerySignedUnsigned(t *testing.T) {
607607
query, sharedArgs, err := builder.BuildQuery(args)
608608
require.NoError(t, err)
609609
expected := `
610-
replace /* gh-ost mydb.tbl */
610+
insert /* gh-ost mydb.tbl */ ignore
611611
into mydb.tbl
612612
(id, name, position, age)
613613
values
@@ -625,7 +625,7 @@ func TestBuildDMLInsertQuerySignedUnsigned(t *testing.T) {
625625
query, sharedArgs, err := builder.BuildQuery(args)
626626
require.NoError(t, err)
627627
expected := `
628-
replace /* gh-ost mydb.tbl */
628+
insert /* gh-ost mydb.tbl */ ignore
629629
into mydb.tbl
630630
(id, name, position, age)
631631
values

0 commit comments

Comments
 (0)