Skip to content

Commit 36eaf27

Browse files
authored
Merge pull request #5 from Shopify/grodowski/raise_on_warnings_unique_key_patch
Fix unique key logic to support dynamically selected unique keys
2 parents 0522c75 + d386786 commit 36eaf27

File tree

2 files changed

+94
-2
lines changed

2 files changed

+94
-2
lines changed

go/logic/applier.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -777,8 +777,8 @@ func (this *Applier) ApplyIterationInsertQuery() (chunkSize int64, rowsAffected
777777
this.migrationContext.Log.Warningf("Failed to read SHOW WARNINGS row")
778778
continue
779779
}
780-
pkDuplicateSuffix := fmt.Sprintf("for key '%s.PRIMARY'", this.migrationContext.GetGhostTableName())
781-
if strings.HasPrefix(message, "Duplicate entry") && strings.HasSuffix(message, pkDuplicateSuffix) {
780+
migrationUniqueKeySuffix := fmt.Sprintf("for key '%s.%s'", this.migrationContext.GetGhostTableName(), this.migrationContext.UniqueKey.Name)
781+
if strings.HasPrefix(message, "Duplicate entry") && strings.HasSuffix(message, migrationUniqueKeySuffix) {
782782
continue
783783
}
784784
sqlWarnings = append(sqlWarnings, fmt.Sprintf("%s: %s (%d)", level, message, code))

go/logic/applier_test.go

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -504,6 +504,98 @@ func (suite *ApplierTestSuite) TestCreateGhostTable() {
504504
suite.Require().Equal("CREATE TABLE `_testing_gho` (\n `id` int DEFAULT NULL,\n `item_id` int DEFAULT NULL\n) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci", createDDL)
505505
}
506506

507+
func (suite *ApplierTestSuite) TestPanicOnWarningsInApplyIterationInsertQuerySucceedsWithUniqueKeyWarningInsertedByDMLEvent() {
508+
ctx := context.Background()
509+
510+
var err error
511+
512+
_, err = suite.db.ExecContext(ctx, "CREATE TABLE test.testing (id INT, item_id INT, UNIQUE KEY (item_id));")
513+
suite.Require().NoError(err)
514+
515+
_, err = suite.db.ExecContext(ctx, "CREATE TABLE test._testing_gho (id INT, item_id INT, UNIQUE KEY (item_id));")
516+
suite.Require().NoError(err)
517+
518+
connectionConfig, err := GetConnectionConfig(ctx, suite.mysqlContainer)
519+
suite.Require().NoError(err)
520+
521+
migrationContext := base.NewMigrationContext()
522+
migrationContext.ApplierConnectionConfig = connectionConfig
523+
migrationContext.DatabaseName = "test"
524+
migrationContext.SkipPortValidation = true
525+
migrationContext.OriginalTableName = "testing"
526+
migrationContext.SetConnectionConfig("innodb")
527+
528+
migrationContext.PanicOnWarnings = true
529+
530+
migrationContext.OriginalTableColumns = sql.NewColumnList([]string{"id", "item_id"})
531+
migrationContext.SharedColumns = sql.NewColumnList([]string{"id", "item_id"})
532+
migrationContext.MappedSharedColumns = sql.NewColumnList([]string{"id", "item_id"})
533+
migrationContext.UniqueKey = &sql.UniqueKey{
534+
Name: "item_id",
535+
Columns: *sql.NewColumnList([]string{"item_id"}),
536+
}
537+
538+
applier := NewApplier(migrationContext)
539+
suite.Require().NoError(applier.prepareQueries())
540+
defer applier.Teardown()
541+
542+
err = applier.InitDBConnections()
543+
suite.Require().NoError(err)
544+
545+
_, err = suite.db.ExecContext(ctx, "INSERT INTO test.testing (id, item_id) VALUES (123456, 42);")
546+
suite.Require().NoError(err)
547+
548+
dmlEvents := []*binlog.BinlogDMLEvent{
549+
{
550+
DatabaseName: "test",
551+
TableName: "testing",
552+
DML: binlog.InsertDML,
553+
NewColumnValues: sql.ToColumnValues([]interface{}{123456, 42}),
554+
},
555+
}
556+
err = applier.ApplyDMLEventQueries(dmlEvents)
557+
suite.Require().NoError(err)
558+
559+
err = applier.CreateChangelogTable()
560+
suite.Require().NoError(err)
561+
err = applier.ReadMigrationRangeValues()
562+
suite.Require().NoError(err)
563+
564+
hasFurtherRange, expectedRangeSize, err := applier.CalculateNextIterationRangeEndValues()
565+
suite.Require().NoError(err)
566+
suite.Require().True(hasFurtherRange)
567+
suite.Require().Equal(int64(1), expectedRangeSize)
568+
569+
_, rowsAffected, _, err := applier.ApplyIterationInsertQuery()
570+
suite.Require().NoError(err)
571+
suite.Require().Equal(int64(0), rowsAffected)
572+
573+
// Ensure Duplicate entry '42' for key '_testing_gho.item_id' is ignored correctly
574+
suite.Require().Empty(applier.migrationContext.MigrationLastInsertSQLWarnings)
575+
576+
// Check that the row was inserted
577+
rows, err := suite.db.Query("SELECT * FROM test._testing_gho")
578+
suite.Require().NoError(err)
579+
defer rows.Close()
580+
581+
var count, id, item_id int
582+
for rows.Next() {
583+
err = rows.Scan(&id, &item_id)
584+
suite.Require().NoError(err)
585+
count += 1
586+
}
587+
suite.Require().NoError(rows.Err())
588+
589+
suite.Require().Equal(1, count)
590+
suite.Require().Equal(123456, id)
591+
suite.Require().Equal(42, item_id)
592+
593+
suite.Require().
594+
Equal(int64(1), migrationContext.TotalDMLEventsApplied)
595+
suite.Require().
596+
Equal(int64(0), migrationContext.RowsDeltaEstimate)
597+
}
598+
507599
func TestApplier(t *testing.T) {
508600
suite.Run(t, new(ApplierTestSuite))
509601
}

0 commit comments

Comments
 (0)