diff --git a/doc/command-line-flags.md b/doc/command-line-flags.md index b12e8a7ce..7b5efd9fb 100644 --- a/doc/command-line-flags.md +++ b/doc/command-line-flags.md @@ -160,6 +160,10 @@ Table name prefix to be used on the temporary tables. Add this flag when executing on a 1st generation Google Cloud Platform (GCP). +### gtid + +Add this flag to enable support for [MySQL replication GTIDs](https://dev.mysql.com/doc/refman/5.7/en/replication-gtids-concepts.html) for replication positioning. This requires `gtid_mode` and `enforce_gtid_consistency` to be set to `ON`. + ### heartbeat-interval-millis Default 100. See [`subsecond-lag`](subsecond-lag.md) for details. diff --git a/go/base/context.go b/go/base/context.go index e4008cdd6..0a1cae739 100644 --- a/go/base/context.go +++ b/go/base/context.go @@ -108,6 +108,7 @@ type MigrationContext struct { // This is useful when connecting to a MySQL instance where the external port // may not match the internal port. SkipPortValidation bool + UseGTIDs bool config ContextConfig configMutex *sync.Mutex diff --git a/go/base/context_test.go b/go/base/context_test.go index f3323b9c1..f87bc9f13 100644 --- a/go/base/context_test.go +++ b/go/base/context_test.go @@ -1,5 +1,5 @@ /* - Copyright 2022 GitHub Inc. + Copyright 2021 GitHub Inc. See https://github.com/github/gh-ost/blob/master/LICENSE */ diff --git a/go/binlog/binlog_entry.go b/go/binlog/binlog_entry.go index 5650accdd..69a2fc31d 100644 --- a/go/binlog/binlog_entry.go +++ b/go/binlog/binlog_entry.go @@ -1,5 +1,5 @@ /* - Copyright 2016 GitHub Inc. + Copyright 2022 GitHub Inc. See https://github.com/github/gh-ost/blob/master/LICENSE */ @@ -7,23 +7,14 @@ package binlog import ( "fmt" + "github.com/github/gh-ost/go/mysql" ) // BinlogEntry describes an entry in the binary log type BinlogEntry struct { Coordinates mysql.BinlogCoordinates - EndLogPos uint64 - - DmlEvent *BinlogDMLEvent -} - -// NewBinlogEntry creates an empty, ready to go BinlogEntry object -func NewBinlogEntry(logFile string, logPos uint64) *BinlogEntry { - binlogEntry := &BinlogEntry{ - Coordinates: mysql.BinlogCoordinates{LogFile: logFile, LogPos: int64(logPos)}, - } - return binlogEntry + DmlEvent *BinlogDMLEvent } // NewBinlogEntryAt creates an empty, ready to go BinlogEntry object @@ -34,13 +25,6 @@ func NewBinlogEntryAt(coordinates mysql.BinlogCoordinates) *BinlogEntry { return binlogEntry } -// Duplicate creates and returns a new binlog entry, with some of the attributes pre-assigned -func (this *BinlogEntry) Duplicate() *BinlogEntry { - binlogEntry := NewBinlogEntry(this.Coordinates.LogFile, uint64(this.Coordinates.LogPos)) - binlogEntry.EndLogPos = this.EndLogPos - return binlogEntry -} - // String() returns a string representation of this binlog entry func (this *BinlogEntry) String() string { return fmt.Sprintf("[BinlogEntry at %+v; dml:%+v]", this.Coordinates, this.DmlEvent) diff --git a/go/binlog/gomysql_reader.go b/go/binlog/gomysql_reader.go index d42ba1f30..d690a9f65 100644 --- a/go/binlog/gomysql_reader.go +++ b/go/binlog/gomysql_reader.go @@ -17,17 +17,20 @@ import ( gomysql "github.com/go-mysql-org/go-mysql/mysql" "github.com/go-mysql-org/go-mysql/replication" + uuid "github.com/google/uuid" "golang.org/x/net/context" ) type GoMySQLReader struct { - migrationContext *base.MigrationContext - connectionConfig *mysql.ConnectionConfig - binlogSyncer *replication.BinlogSyncer - binlogStreamer *replication.BinlogStreamer - currentCoordinates mysql.BinlogCoordinates - currentCoordinatesMutex *sync.Mutex - LastAppliedRowsEventHint mysql.BinlogCoordinates + migrationContext *base.MigrationContext + connectionConfig *mysql.ConnectionConfig + binlogSyncer *replication.BinlogSyncer + binlogStreamer *replication.BinlogStreamer + currentCoordinates mysql.BinlogCoordinates + currentCoordinatesMutex *sync.Mutex + // LastTrxCoords are the coordinates of the last transaction completely read. + // If using the file coordinates it is binlog position of the transaction's XID event. + LastTrxCoords mysql.BinlogCoordinates } func NewGoMySQLReader(migrationContext *base.MigrationContext) *GoMySQLReader { @@ -35,7 +38,6 @@ func NewGoMySQLReader(migrationContext *base.MigrationContext) *GoMySQLReader { return &GoMySQLReader{ migrationContext: migrationContext, connectionConfig: connectionConfig, - currentCoordinates: mysql.BinlogCoordinates{}, currentCoordinatesMutex: &sync.Mutex{}, binlogSyncer: replication.NewBinlogSyncer(replication.BinlogSyncerConfig{ ServerID: uint32(migrationContext.ReplicaServerId), @@ -46,8 +48,8 @@ func NewGoMySQLReader(migrationContext *base.MigrationContext) *GoMySQLReader { Password: connectionConfig.Password, TLSConfig: connectionConfig.TLSConfig(), UseDecimal: true, - MaxReconnectAttempts: migrationContext.BinlogSyncerMaxReconnectAttempts, TimestampStringLocation: time.UTC, + MaxReconnectAttempts: migrationContext.BinlogSyncerMaxReconnectAttempts, }), } } @@ -58,35 +60,33 @@ func (this *GoMySQLReader) ConnectBinlogStreamer(coordinates mysql.BinlogCoordin return this.migrationContext.Log.Errorf("Empty coordinates at ConnectBinlogStreamer()") } + this.currentCoordinatesMutex.Lock() + defer this.currentCoordinatesMutex.Unlock() this.currentCoordinates = coordinates - this.migrationContext.Log.Infof("Connecting binlog streamer at %+v", this.currentCoordinates) - // Start sync with specified binlog file and position - this.binlogStreamer, err = this.binlogSyncer.StartSync(gomysql.Position{ - Name: this.currentCoordinates.LogFile, - Pos: uint32(this.currentCoordinates.LogPos), - }) - + this.migrationContext.Log.Infof("Connecting binlog streamer at %+v", coordinates) + + // Start sync with specified GTID set or binlog file and position + if this.migrationContext.UseGTIDs { + coords := coordinates.(*mysql.GTIDBinlogCoordinates) + this.binlogStreamer, err = this.binlogSyncer.StartSyncGTID(coords.GTIDSet) + } else { + coords := this.currentCoordinates.(*mysql.FileBinlogCoordinates) + this.binlogStreamer, err = this.binlogSyncer.StartSync(gomysql.Position{ + Name: coords.LogFile, + Pos: uint32(coords.LogPos)}, + ) + } return err } -func (this *GoMySQLReader) GetCurrentBinlogCoordinates() *mysql.BinlogCoordinates { +func (this *GoMySQLReader) GetCurrentBinlogCoordinates() mysql.BinlogCoordinates { this.currentCoordinatesMutex.Lock() defer this.currentCoordinatesMutex.Unlock() - returnCoordinates := this.currentCoordinates - return &returnCoordinates + return this.currentCoordinates.Clone() } -// StreamEvents func (this *GoMySQLReader) handleRowsEvent(ev *replication.BinlogEvent, rowsEvent *replication.RowsEvent, entriesChannel chan<- *BinlogEntry) error { - if this.currentCoordinates.IsLogPosOverflowBeyond4Bytes(&this.LastAppliedRowsEventHint) { - return fmt.Errorf("Unexpected rows event at %+v, the binlog end_log_pos is overflow 4 bytes", this.currentCoordinates) - } - - if this.currentCoordinates.SmallerThanOrEquals(&this.LastAppliedRowsEventHint) { - this.migrationContext.Log.Debugf("Skipping handled query at %+v", this.currentCoordinates) - return nil - } - + currentCoords := this.GetCurrentBinlogCoordinates() dml := ToEventDML(ev.Header.EventType.String()) if dml == NotDML { return fmt.Errorf("Unknown DML type: %s", ev.Header.EventType.String()) @@ -97,7 +97,7 @@ func (this *GoMySQLReader) handleRowsEvent(ev *replication.BinlogEvent, rowsEven // We do both at the same time continue } - binlogEntry := NewBinlogEntryAt(this.currentCoordinates) + binlogEntry := NewBinlogEntryAt(currentCoords) binlogEntry.DmlEvent = NewBinlogDMLEvent( string(rowsEvent.Table.Schema), string(rowsEvent.Table.Table), @@ -118,13 +118,13 @@ func (this *GoMySQLReader) handleRowsEvent(ev *replication.BinlogEvent, rowsEven binlogEntry.DmlEvent.WhereColumnValues = sql.ToColumnValues(row) } } + // The channel will do the throttling. Whoever is reading from the channel // decides whether action is taken synchronously (meaning we wait before // next iteration) or asynchronously (we keep pushing more events) // In reality, reads will be synchronous entriesChannel <- binlogEntry } - this.LastAppliedRowsEventHint = this.currentCoordinates return nil } @@ -141,23 +141,56 @@ func (this *GoMySQLReader) StreamEvents(canStopStreaming func() bool, entriesCha if err != nil { return err } - func() { + + // Update binlog coords if using file-based coords. + // GTID coordinates are updated on receiving GTID events. + if !this.migrationContext.UseGTIDs { this.currentCoordinatesMutex.Lock() - defer this.currentCoordinatesMutex.Unlock() - this.currentCoordinates.LogPos = int64(ev.Header.LogPos) - this.currentCoordinates.EventSize = int64(ev.Header.EventSize) - }() + coords := this.currentCoordinates.(*mysql.FileBinlogCoordinates) + prevCoords := coords.Clone().(*mysql.FileBinlogCoordinates) + coords.LogPos = int64(ev.Header.LogPos) + coords.EventSize = int64(ev.Header.EventSize) + if coords.IsLogPosOverflowBeyond4Bytes(prevCoords) { + this.currentCoordinatesMutex.Unlock() + return fmt.Errorf("Unexpected rows event at %+v, the binlog end_log_pos is overflow 4 bytes", coords) + } + this.currentCoordinatesMutex.Unlock() + } - switch binlogEvent := ev.Event.(type) { + switch event := ev.Event.(type) { + case *replication.GTIDEvent: + if !this.migrationContext.UseGTIDs { + continue + } + sid, err := uuid.FromBytes(event.SID) + if err != nil { + return err + } + this.currentCoordinatesMutex.Lock() + if this.LastTrxCoords != nil { + this.currentCoordinates = this.LastTrxCoords.Clone() + } + coords := this.currentCoordinates.(*mysql.GTIDBinlogCoordinates) + trxGset := gomysql.NewUUIDSet(sid, gomysql.Interval{Start: event.GNO, Stop: event.GNO + 1}) + coords.GTIDSet.AddSet(trxGset) + this.currentCoordinatesMutex.Unlock() case *replication.RotateEvent: - func() { - this.currentCoordinatesMutex.Lock() - defer this.currentCoordinatesMutex.Unlock() - this.currentCoordinates.LogFile = string(binlogEvent.NextLogName) - }() - this.migrationContext.Log.Infof("rotate to next log from %s:%d to %s", this.currentCoordinates.LogFile, int64(ev.Header.LogPos), binlogEvent.NextLogName) + if this.migrationContext.UseGTIDs { + continue + } + this.currentCoordinatesMutex.Lock() + coords := this.currentCoordinates.(*mysql.FileBinlogCoordinates) + coords.LogFile = string(event.NextLogName) + this.migrationContext.Log.Infof("rotate to next log from %s:%d to %s", coords.LogFile, int64(ev.Header.LogPos), event.NextLogName) + this.currentCoordinatesMutex.Unlock() + case *replication.XIDEvent: + if this.migrationContext.UseGTIDs { + this.LastTrxCoords = &mysql.GTIDBinlogCoordinates{GTIDSet: event.GSet.(*gomysql.MysqlGTIDSet)} + } else { + this.LastTrxCoords = this.currentCoordinates.Clone() + } case *replication.RowsEvent: - if err := this.handleRowsEvent(ev, binlogEvent, entriesChannel); err != nil { + if err := this.handleRowsEvent(ev, event, entriesChannel); err != nil { return err } } diff --git a/go/cmd/gh-ost/main.go b/go/cmd/gh-ost/main.go index feabd8658..6391cf4fb 100644 --- a/go/cmd/gh-ost/main.go +++ b/go/cmd/gh-ost/main.go @@ -87,6 +87,7 @@ func main() { flag.BoolVar(&migrationContext.AliyunRDS, "aliyun-rds", false, "set to 'true' when you execute on Aliyun RDS.") flag.BoolVar(&migrationContext.GoogleCloudPlatform, "gcp", false, "set to 'true' when you execute on a 1st generation Google Cloud Platform (GCP).") flag.BoolVar(&migrationContext.AzureMySQL, "azure", false, "set to 'true' when you execute on Azure Database on MySQL.") + flag.BoolVar(&migrationContext.UseGTIDs, "gtid", false, "(experimental) set to 'true' to use MySQL GTIDs for binlog positioning.") executeFlag := flag.Bool("execute", false, "actually execute the alter & migrate the table. Default is noop: do some tests and exit") flag.BoolVar(&migrationContext.TestOnReplica, "test-on-replica", false, "Have the migration run on a replica, not on the master. At the end of migration replication is stopped, and tables are swapped and immediately swap-revert. Replication remains stopped and you can compare the two tables for building trust") diff --git a/go/logic/applier.go b/go/logic/applier.go index 9dcc61840..30ac97695 100644 --- a/go/logic/applier.go +++ b/go/logic/applier.go @@ -1,5 +1,5 @@ /* - Copyright 2022 GitHub Inc. + Copyright 2021 GitHub Inc. See https://github.com/github/gh-ost/blob/master/LICENSE */ @@ -990,11 +990,11 @@ func (this *Applier) StopReplication() error { return err } - readBinlogCoordinates, executeBinlogCoordinates, err := mysql.GetReplicationBinlogCoordinates(this.migrationContext.ApplierMySQLVersion, this.db) + readBinlogCoordinates, executeBinlogCoordinates, err := mysql.GetReplicationBinlogCoordinates(this.migrationContext.ApplierMySQLVersion, this.db, this.migrationContext.UseGTIDs) if err != nil { return err } - this.migrationContext.Log.Infof("Replication IO thread at %+v. SQL thread is at %+v", *readBinlogCoordinates, *executeBinlogCoordinates) + this.migrationContext.Log.Infof("Replication IO thread at %+v. SQL thread is at %+v", readBinlogCoordinates, executeBinlogCoordinates) return nil } diff --git a/go/logic/inspect.go b/go/logic/inspect.go index b6d80fda7..7a7dc8424 100644 --- a/go/logic/inspect.go +++ b/go/logic/inspect.go @@ -73,6 +73,11 @@ func (this *Inspector) InitDBConnections() (err error) { if err := this.validateBinlogs(); err != nil { return err } + if this.migrationContext.UseGTIDs { + if err := this.validateGTIDConfig(); err != nil { + return err + } + } if err := this.applyBinlogFormat(); err != nil { return err } @@ -379,7 +384,7 @@ func (this *Inspector) applyBinlogFormat() error { // validateBinlogs checks that binary log configuration is good to go func (this *Inspector) validateBinlogs() error { - query := `select /* gh-ost */ @@global.log_bin, @@global.binlog_format` + query := `select /* gh-ost */@@global.log_bin, @@global.binlog_format` var hasBinaryLogs bool if err := this.db.QueryRow(query).Scan(&hasBinaryLogs, &this.migrationContext.OriginalBinlogFormat); err != nil { return err @@ -418,6 +423,22 @@ func (this *Inspector) validateBinlogs() error { return nil } +// validateGTIDConfig checks that the GTID configuration is good to go +func (this *Inspector) validateGTIDConfig() error { + var gtidMode, enforceGtidConsistency string + query := `select @@global.gtid_mode, @@global.enforce_gtid_consistency` + if err := this.db.QueryRow(query).Scan(>idMode, &enforceGtidConsistency); err != nil { + return err + } + enforceGtidConsistency = strings.ToUpper(enforceGtidConsistency) + if strings.ToUpper(gtidMode) != "ON" || (enforceGtidConsistency != "ON" && enforceGtidConsistency != "1") { + return fmt.Errorf("%s must have gtid_mode=ON and enforce_gtid_consistency=ON to use GTID support", this.connectionConfig.Key.String()) + } + + this.migrationContext.Log.Infof("gtid config validated on %s", this.connectionConfig.Key.String()) + return nil +} + // validateLogSlaveUpdates checks that binary log log_slave_updates is set. This test is not required when migrating on replica or when migrating directly on master func (this *Inspector) validateLogSlaveUpdates() error { query := `select /* gh-ost */ @@global.log_slave_updates` diff --git a/go/logic/migrator.go b/go/logic/migrator.go index 880b9b4c5..4d7074b22 100644 --- a/go/logic/migrator.go +++ b/go/logic/migrator.go @@ -1075,7 +1075,7 @@ func (this *Migrator) printStatus(rule PrintStatusRule, writers ...io.Writer) { return } - currentBinlogCoordinates := *this.eventsStreamer.GetCurrentBinlogCoordinates() + currentBinlogCoordinates := this.eventsStreamer.GetCurrentBinlogCoordinates() status := fmt.Sprintf("Copy: %d/%d %.1f%%; Applied: %d; Backlog: %d/%d; Time: %+v(total), %+v(copy); streamer: %+v; Lag: %.2fs, HeartbeatLag: %.2fs, State: %s; ETA: %s", totalRowsCopied, rowsEstimate, progressPct, @@ -1140,7 +1140,7 @@ func (this *Migrator) initiateStreaming() error { if atomic.LoadInt64(&this.finishedMigrating) > 0 { return } - this.migrationContext.SetRecentBinlogCoordinates(*this.eventsStreamer.GetCurrentBinlogCoordinates()) + this.migrationContext.SetRecentBinlogCoordinates(this.eventsStreamer.GetCurrentBinlogCoordinates()) } }() return nil diff --git a/go/logic/server.go b/go/logic/server.go index b5d05b758..45e5b2bd4 100644 --- a/go/logic/server.go +++ b/go/logic/server.go @@ -1,5 +1,5 @@ /* - Copyright 2022 GitHub Inc. + Copyright 2021 GitHub Inc. See https://github.com/github/gh-ost/blob/master/LICENSE */ diff --git a/go/logic/streamer.go b/go/logic/streamer.go index 20bcf4275..fd0240ffd 100644 --- a/go/logic/streamer.go +++ b/go/logic/streamer.go @@ -16,6 +16,7 @@ import ( "github.com/github/gh-ost/go/binlog" "github.com/github/gh-ost/go/mysql" + gomysql "github.com/go-mysql-org/go-mysql/mysql" "github.com/openark/golib/sqlutils" ) @@ -28,7 +29,7 @@ type BinlogEventListener struct { const ( EventsChannelBufferSize = 1 - ReconnectStreamerSleepSeconds = 5 + ReconnectStreamerSleepSeconds = 1 ) // EventsStreamer reads data from binary logs and streams it on. It acts as a publisher, @@ -38,7 +39,7 @@ type EventsStreamer struct { db *gosql.DB dbVersion string migrationContext *base.MigrationContext - initialBinlogCoordinates *mysql.BinlogCoordinates + initialBinlogCoordinates mysql.BinlogCoordinates listeners [](*BinlogEventListener) listenersMutex *sync.Mutex eventsChannel chan *binlog.BinlogEntry @@ -124,35 +125,39 @@ func (this *EventsStreamer) InitDBConnections() (err error) { } // initBinlogReader creates and connects the reader: we hook up to a MySQL server as a replica -func (this *EventsStreamer) initBinlogReader(binlogCoordinates *mysql.BinlogCoordinates) error { +func (this *EventsStreamer) initBinlogReader(binlogCoordinates mysql.BinlogCoordinates) error { goMySQLReader := binlog.NewGoMySQLReader(this.migrationContext) - if err := goMySQLReader.ConnectBinlogStreamer(*binlogCoordinates); err != nil { + if err := goMySQLReader.ConnectBinlogStreamer(binlogCoordinates); err != nil { return err } this.binlogReader = goMySQLReader return nil } -func (this *EventsStreamer) GetCurrentBinlogCoordinates() *mysql.BinlogCoordinates { +func (this *EventsStreamer) GetCurrentBinlogCoordinates() mysql.BinlogCoordinates { return this.binlogReader.GetCurrentBinlogCoordinates() } -func (this *EventsStreamer) GetReconnectBinlogCoordinates() *mysql.BinlogCoordinates { - return &mysql.BinlogCoordinates{LogFile: this.GetCurrentBinlogCoordinates().LogFile, LogPos: 4} -} - // readCurrentBinlogCoordinates reads master status from hooked server func (this *EventsStreamer) readCurrentBinlogCoordinates() error { binaryLogStatusTerm := mysql.ReplicaTermFor(this.dbVersion, "master status") query := fmt.Sprintf("show /* gh-ost readCurrentBinlogCoordinates */ %s", binaryLogStatusTerm) foundMasterStatus := false err := sqlutils.QueryRowsMap(this.db, query, func(m sqlutils.RowMap) error { - this.initialBinlogCoordinates = &mysql.BinlogCoordinates{ - LogFile: m.GetString("File"), - LogPos: m.GetInt64("Position"), + if this.migrationContext.UseGTIDs { + execGtidSet := m.GetString("Executed_Gtid_Set") + gtidSet, err := gomysql.ParseMysqlGTIDSet(execGtidSet) + if err != nil { + return err + } + this.initialBinlogCoordinates = &mysql.GTIDBinlogCoordinates{GTIDSet: gtidSet.(*gomysql.MysqlGTIDSet)} + } else { + this.initialBinlogCoordinates = &mysql.FileBinlogCoordinates{ + LogFile: m.GetString("File"), + LogPos: m.GetInt64("Position"), + } } foundMasterStatus = true - return nil }) if err != nil { @@ -161,7 +166,7 @@ func (this *EventsStreamer) readCurrentBinlogCoordinates() error { if !foundMasterStatus { return fmt.Errorf("Got no results from SHOW %s. Bailing out", strings.ToUpper(binaryLogStatusTerm)) } - this.migrationContext.Log.Debugf("Streamer binlog coordinates: %+v", *this.initialBinlogCoordinates) + this.migrationContext.Log.Debugf("Streamer binlog coordinates: %+v", this.initialBinlogCoordinates) return nil } @@ -175,13 +180,16 @@ func (this *EventsStreamer) StreamEvents(canStopStreaming func() bool) error { } } }() - // The next should block and execute forever, unless there's a serious error - var successiveFailures int64 - var lastAppliedRowsEventHint mysql.BinlogCoordinates + // The next should block and execute forever, unless there's a serious error. + var successiveFailures int + var reconnectCoords mysql.BinlogCoordinates for { if canStopStreaming() { return nil } + // We will reconnect the binlog streamer at the coordinates + // of the last trx that was read completely from the streamer. + // Since row event application is idempotent, it's OK if we reapply some events. if err := this.binlogReader.StreamEvents(canStopStreaming, this.eventsChannel); err != nil { if canStopStreaming() { return nil @@ -192,22 +200,27 @@ func (this *EventsStreamer) StreamEvents(canStopStreaming func() bool) error { time.Sleep(ReconnectStreamerSleepSeconds * time.Second) // See if there's retry overflow - if this.binlogReader.LastAppliedRowsEventHint.Equals(&lastAppliedRowsEventHint) { + if this.migrationContext.BinlogSyncerMaxReconnectAttempts > 0 && successiveFailures >= this.migrationContext.BinlogSyncerMaxReconnectAttempts { + return fmt.Errorf("%d successive failures in streamer reconnect at coordinates %+v", successiveFailures, reconnectCoords) + } + + // Reposition at same coordinates + if this.binlogReader.LastTrxCoords != nil { + reconnectCoords = this.binlogReader.LastTrxCoords.Clone() + } else { + reconnectCoords = this.initialBinlogCoordinates.Clone() + } + if !reconnectCoords.SmallerThan(this.GetCurrentBinlogCoordinates()) { successiveFailures += 1 } else { successiveFailures = 0 } - if successiveFailures >= this.migrationContext.MaxRetries() { - return fmt.Errorf("%d successive failures in streamer reconnect at coordinates %+v", successiveFailures, this.GetReconnectBinlogCoordinates()) - } - // Reposition at same binlog file. - lastAppliedRowsEventHint = this.binlogReader.LastAppliedRowsEventHint - this.migrationContext.Log.Infof("Reconnecting... Will resume at %+v", lastAppliedRowsEventHint) - if err := this.initBinlogReader(this.GetReconnectBinlogCoordinates()); err != nil { + this.migrationContext.Log.Infof("Reconnecting EventsStreamer... Will resume at %+v", reconnectCoords) + _ = this.binlogReader.Close() + if err := this.initBinlogReader(reconnectCoords); err != nil { return err } - this.binlogReader.LastAppliedRowsEventHint = lastAppliedRowsEventHint } } } diff --git a/go/mysql/binlog.go b/go/mysql/binlog.go index 5f52f1d4e..01790da29 100644 --- a/go/mysql/binlog.go +++ b/go/mysql/binlog.go @@ -6,96 +6,12 @@ package mysql -import ( - "fmt" - "strconv" - "strings" -) - -// BinlogCoordinates described binary log coordinates in the form of log file & log position. -type BinlogCoordinates struct { - LogFile string - LogPos int64 - EventSize int64 -} - -// ParseBinlogCoordinates will parse an InstanceKey from a string representation such as 127.0.0.1:3306 -func ParseBinlogCoordinates(logFileLogPos string) (*BinlogCoordinates, error) { - tokens := strings.SplitN(logFileLogPos, ":", 2) - if len(tokens) != 2 { - return nil, fmt.Errorf("ParseBinlogCoordinates: Cannot parse BinlogCoordinates from %s. Expected format is file:pos", logFileLogPos) - } - - if logPos, err := strconv.ParseInt(tokens[1], 10, 0); err != nil { - return nil, fmt.Errorf("ParseBinlogCoordinates: invalid pos: %s", tokens[1]) - } else { - return &BinlogCoordinates{LogFile: tokens[0], LogPos: logPos}, nil - } -} - -// DisplayString returns a user-friendly string representation of these coordinates -func (this *BinlogCoordinates) DisplayString() string { - return fmt.Sprintf("%s:%d", this.LogFile, this.LogPos) -} - -// String returns a user-friendly string representation of these coordinates -func (this BinlogCoordinates) String() string { - return this.DisplayString() -} - -// Equals tests equality of this coordinate and another one. -func (this *BinlogCoordinates) Equals(other *BinlogCoordinates) bool { - if other == nil { - return false - } - return this.LogFile == other.LogFile && this.LogPos == other.LogPos -} - -// IsEmpty returns true if the log file is empty, unnamed -func (this *BinlogCoordinates) IsEmpty() bool { - return this.LogFile == "" -} - -// SmallerThan returns true if this coordinate is strictly smaller than the other. -func (this *BinlogCoordinates) SmallerThan(other *BinlogCoordinates) bool { - if this.LogFile < other.LogFile { - return true - } - if this.LogFile == other.LogFile && this.LogPos < other.LogPos { - return true - } - return false -} - -// SmallerThanOrEquals returns true if this coordinate is the same or equal to the other one. -// We do NOT compare the type so we can not use this.Equals() -func (this *BinlogCoordinates) SmallerThanOrEquals(other *BinlogCoordinates) bool { - if this.SmallerThan(other) { - return true - } - return this.LogFile == other.LogFile && this.LogPos == other.LogPos -} - -// IsLogPosOverflowBeyond4Bytes returns true if the coordinate endpos is overflow beyond 4 bytes. -// The binlog event end_log_pos field type is defined as uint32, 4 bytes. -// https://github.com/go-mysql-org/go-mysql/blob/master/replication/event.go -// https://dev.mysql.com/doc/dev/mysql-server/latest/page_protocol_replication_binlog_event.html#sect_protocol_replication_binlog_event_header -// Issue: https://github.com/github/gh-ost/issues/1366 -func (this *BinlogCoordinates) IsLogPosOverflowBeyond4Bytes(preCoordinate *BinlogCoordinates) bool { - if preCoordinate == nil { - return false - } - if preCoordinate.IsEmpty() { - return false - } - - if this.LogFile != preCoordinate.LogFile { - return false - } - - if preCoordinate.LogPos+this.EventSize >= 1<<32 { - // Unexpected rows event, the previous binlog log_pos + current binlog event_size is overflow 4 bytes - return true - } - return false +type BinlogCoordinates interface { + String() string + DisplayString() string + IsEmpty() bool + Equals(other BinlogCoordinates) bool + SmallerThan(other BinlogCoordinates) bool + SmallerThanOrEquals(other BinlogCoordinates) bool + Clone() BinlogCoordinates } diff --git a/go/mysql/binlog_file.go b/go/mysql/binlog_file.go new file mode 100644 index 000000000..426e54076 --- /dev/null +++ b/go/mysql/binlog_file.go @@ -0,0 +1,208 @@ +/* + Copyright 2015 Shlomi Noach, courtesy Booking.com + Copyright 2022 GitHub Inc. + See https://github.com/github/gh-ost/blob/master/LICENSE +*/ + +package mysql + +import ( + "errors" + "fmt" + "regexp" + "strconv" + "strings" +) + +var detachPattern *regexp.Regexp + +func init() { + detachPattern, _ = regexp.Compile(`//([^/:]+):([\d]+)`) // e.g. `//binlog.01234:567890` +} + +// FileBinlogCoordinates described binary log coordinates in the form of a binlog file & log position. +type FileBinlogCoordinates struct { + LogFile string + LogPos int64 + EventSize int64 +} + +func NewFileBinlogCoordinates(logFile string, logPos int64) *FileBinlogCoordinates { + return &FileBinlogCoordinates{ + LogFile: logFile, + LogPos: logPos, + } +} + +// ParseFileBinlogCoordinates parses a log file/position string into a *BinlogCoordinates struct. +func ParseFileBinlogCoordinates(logFileLogPos string) (*FileBinlogCoordinates, error) { + tokens := strings.SplitN(logFileLogPos, ":", 2) + if len(tokens) != 2 { + return nil, fmt.Errorf("ParseFileBinlogCoordinates: Cannot parse BinlogCoordinates from %s. Expected format is file:pos", logFileLogPos) + } + + if logPos, err := strconv.ParseInt(tokens[1], 10, 0); err != nil { + return nil, fmt.Errorf("ParseFileBinlogCoordinates: invalid pos: %s", tokens[1]) + } else { + return &FileBinlogCoordinates{LogFile: tokens[0], LogPos: logPos}, nil + } +} + +// DisplayString returns a user-friendly string representation of these coordinates +func (this *FileBinlogCoordinates) DisplayString() string { + return fmt.Sprintf("%s:%d", this.LogFile, this.LogPos) +} + +// String returns a user-friendly string representation of these coordinates +func (this FileBinlogCoordinates) String() string { + return this.DisplayString() +} + +// Equals tests equality of this coordinate and another one. +func (this *FileBinlogCoordinates) Equals(other BinlogCoordinates) bool { + coord, ok := other.(*FileBinlogCoordinates) + if !ok || other == nil { + return false + } + return this.LogFile == coord.LogFile && this.LogPos == coord.LogPos +} + +// IsEmpty returns true if the log file is empty, unnamed +func (this *FileBinlogCoordinates) IsEmpty() bool { + return this.LogFile == "" +} + +// SmallerThan returns true if this coordinate is strictly smaller than the other. +func (this *FileBinlogCoordinates) SmallerThan(other BinlogCoordinates) bool { + coord, ok := other.(*FileBinlogCoordinates) + if !ok || other == nil { + return false + } + if this.LogFile < coord.LogFile { + return true + } + if this.LogFile == coord.LogFile && this.LogPos < coord.LogPos { + return true + } + return false +} + +// SmallerThanOrEquals returns true if this coordinate is the same or equal to the other one. +// We do NOT compare the type so we can not use this.Equals() +func (this *FileBinlogCoordinates) SmallerThanOrEquals(other BinlogCoordinates) bool { + coord, ok := other.(*FileBinlogCoordinates) + if !ok || other == nil { + return false + } + if this.SmallerThan(other) { + return true + } + return this.LogFile == coord.LogFile && this.LogPos == coord.LogPos // No Type comparison +} + +// FileSmallerThan returns true if this coordinate's file is strictly smaller than the other's. +func (this *FileBinlogCoordinates) FileSmallerThan(other BinlogCoordinates) bool { + coord, ok := other.(*FileBinlogCoordinates) + if !ok || other == nil { + return false + } + return this.LogFile < coord.LogFile +} + +// FileNumberDistance returns the numeric distance between this coordinate's file number and the other's. +// Effectively it means "how many rotates/FLUSHes would make these coordinates's file reach the other's" +func (this *FileBinlogCoordinates) FileNumberDistance(other *FileBinlogCoordinates) int { + thisNumber, _ := this.FileNumber() + otherNumber, _ := other.FileNumber() + return otherNumber - thisNumber +} + +// FileNumber returns the numeric value of the file, and the length in characters representing the number in the filename. +// Example: FileNumber() of mysqld.log.000789 is (789, 6) +func (this *FileBinlogCoordinates) FileNumber() (int, int) { + tokens := strings.Split(this.LogFile, ".") + numPart := tokens[len(tokens)-1] + numLen := len(numPart) + fileNum, err := strconv.Atoi(numPart) + if err != nil { + return 0, 0 + } + return fileNum, numLen +} + +// PreviousFileCoordinatesBy guesses the filename of the previous binlog/relaylog, by given offset (number of files back) +func (this *FileBinlogCoordinates) PreviousFileCoordinatesBy(offset int) (BinlogCoordinates, error) { + result := &FileBinlogCoordinates{} + + fileNum, numLen := this.FileNumber() + if fileNum == 0 { + return result, errors.New("Log file number is zero, cannot detect previous file") + } + newNumStr := fmt.Sprintf("%d", (fileNum - offset)) + newNumStr = strings.Repeat("0", numLen-len(newNumStr)) + newNumStr + + tokens := strings.Split(this.LogFile, ".") + tokens[len(tokens)-1] = newNumStr + result.LogFile = strings.Join(tokens, ".") + return result, nil +} + +// PreviousFileCoordinates guesses the filename of the previous binlog/relaylog +func (this *FileBinlogCoordinates) PreviousFileCoordinates() (BinlogCoordinates, error) { + return this.PreviousFileCoordinatesBy(1) +} + +// PreviousFileCoordinates guesses the filename of the previous binlog/relaylog +func (this *FileBinlogCoordinates) NextFileCoordinates() (BinlogCoordinates, error) { + result := &FileBinlogCoordinates{} + + fileNum, numLen := this.FileNumber() + newNumStr := fmt.Sprintf("%d", (fileNum + 1)) + newNumStr = strings.Repeat("0", numLen-len(newNumStr)) + newNumStr + + tokens := strings.Split(this.LogFile, ".") + tokens[len(tokens)-1] = newNumStr + result.LogFile = strings.Join(tokens, ".") + return result, nil +} + +// FileSmallerThan returns true if this coordinate's file is strictly smaller than the other's. +func (this *FileBinlogCoordinates) DetachedCoordinates() (isDetached bool, detachedLogFile string, detachedLogPos string) { + detachedCoordinatesSubmatch := detachPattern.FindStringSubmatch(this.LogFile) + if len(detachedCoordinatesSubmatch) == 0 { + return false, "", "" + } + return true, detachedCoordinatesSubmatch[1], detachedCoordinatesSubmatch[2] +} + +func (this *FileBinlogCoordinates) Clone() BinlogCoordinates { + return &FileBinlogCoordinates{ + LogPos: this.LogPos, + LogFile: this.LogFile, + EventSize: this.EventSize, + } +} + +// IsLogPosOverflowBeyond4Bytes returns true if the coordinate endpos is overflow beyond 4 bytes. +// The binlog event end_log_pos field type is defined as uint32, 4 bytes. +// https://github.com/go-mysql-org/go-mysql/blob/master/replication/event.go +// https://dev.mysql.com/doc/dev/mysql-server/latest/page_protocol_replication_binlog_event.html#sect_protocol_replication_binlog_event_header +// Issue: https://github.com/github/gh-ost/issues/1366 +func (this *FileBinlogCoordinates) IsLogPosOverflowBeyond4Bytes(preCoordinate *FileBinlogCoordinates) bool { + if preCoordinate == nil { + return false + } + if preCoordinate.IsEmpty() { + return false + } + + if this.LogFile != preCoordinate.LogFile { + return false + } + + if preCoordinate.LogPos+this.EventSize >= 1<<32 { + // Unexpected rows event, the previous binlog log_pos + current binlog event_size is overflow 4 bytes + return true + } + return false +} diff --git a/go/mysql/binlog_file_test.go b/go/mysql/binlog_file_test.go new file mode 100644 index 000000000..f12f5514f --- /dev/null +++ b/go/mysql/binlog_file_test.go @@ -0,0 +1,136 @@ +/* + Copyright 2022 GitHub Inc. + See https://github.com/github/gh-ost/blob/master/LICENSE +*/ + +package mysql + +import ( + "math" + "testing" + + gomysql "github.com/go-mysql-org/go-mysql/mysql" + "github.com/openark/golib/log" + "github.com/stretchr/testify/require" +) + +func init() { + log.SetLevel(log.ERROR) +} + +func TestBinlogCoordinates(t *testing.T) { + c1 := FileBinlogCoordinates{LogFile: "mysql-bin.00017", LogPos: 104} + c2 := FileBinlogCoordinates{LogFile: "mysql-bin.00017", LogPos: 104} + c3 := FileBinlogCoordinates{LogFile: "mysql-bin.00017", LogPos: 5000} + c4 := FileBinlogCoordinates{LogFile: "mysql-bin.00112", LogPos: 104} + + gtidSet1, _ := gomysql.ParseMysqlGTIDSet("3E11FA47-71CA-11E1-9E33-C80AA9429562:23") + gtidSet2, _ := gomysql.ParseMysqlGTIDSet("3E11FA47-71CA-11E1-9E33-C80AA9429562:100") + gtidSet3, _ := gomysql.ParseMysqlGTIDSet("7F80FA47-FF33-71A1-AE01-B80CC7823548:100") + gtidSetBig1, _ := gomysql.ParseMysqlGTIDSet(`08dc06d7-c27c-11ea-b204-e4434b77a5ce:1-1497873603, +0b4ff540-a712-11ea-9857-e4434b2a1c98:1-4315312982, +19636248-246d-11e9-ab0d-0263df733a8e:1, +1c8cd5dd-8c79-11eb-ae94-e4434b27ee9c:1-18850436, +3342d1ad-bda0-11ea-ba96-e4434b28e6e0:1-475232304, +3bcd300c-c811-11e9-9970-e4434b714c24:1-6209943929, +418b92ed-d6f6-11e8-b18f-246e961e5ed0:1-3299395227, +4465ebe1-2bcc-11e9-8913-e4434b21c560:1-4724945648, +48e2bc1d-d66d-11e8-bf56-a0369f9437b8:1, +492e2980-4518-11e9-92c6-e4434b3eca94:1-4926754392`) + gtidSetBig2, _ := gomysql.ParseMysqlGTIDSet(`08dc06d7-c27c-11ea-b204-e4434b77a5ce:1-1497873603, +0b4ff540-a712-11ea-9857-e4434b2a1c98:1-4315312982, +19636248-246d-11e9-ab0d-0263df733a8e:1, +1c8cd5dd-8c79-11eb-ae94-e4434b27ee9c:1-18850436, +3342d1ad-bda0-11ea-ba96-e4434b28e6e0:1-475232304, +3bcd300c-c811-11e9-9970-e4434b714c24:1-6209943929, +418b92ed-d6f6-11e8-b18f-246e961e5ed0:1-3299395227, +4465ebe1-2bcc-11e9-8913-e4434b21c560:1-4724945648, +48e2bc1d-d66d-11e8-bf56-a0369f9437b8:1, +492e2980-4518-11e9-92c6-e4434b3eca94:1-4926754399`) + + c5 := GTIDBinlogCoordinates{GTIDSet: gtidSet1.(*gomysql.MysqlGTIDSet)} + c6 := GTIDBinlogCoordinates{GTIDSet: gtidSet1.(*gomysql.MysqlGTIDSet)} + c7 := GTIDBinlogCoordinates{GTIDSet: gtidSet2.(*gomysql.MysqlGTIDSet)} + c8 := GTIDBinlogCoordinates{GTIDSet: gtidSet3.(*gomysql.MysqlGTIDSet)} + c9 := GTIDBinlogCoordinates{GTIDSet: gtidSetBig1.(*gomysql.MysqlGTIDSet)} + c10 := GTIDBinlogCoordinates{GTIDSet: gtidSetBig2.(*gomysql.MysqlGTIDSet)} + + require.True(t, c5.Equals(&c6)) + require.True(t, c1.Equals(&c2)) + require.False(t, c1.Equals(&c3)) + require.False(t, c1.Equals(&c4)) + require.False(t, c1.SmallerThan(&c2)) + require.True(t, c1.SmallerThan(&c3)) + require.True(t, c1.SmallerThan(&c4)) + require.True(t, c3.SmallerThan(&c4)) + require.False(t, c3.SmallerThan(&c2)) + require.False(t, c4.SmallerThan(&c2)) + require.False(t, c4.SmallerThan(&c3)) + require.True(t, c1.SmallerThanOrEquals(&c2)) + require.True(t, c1.SmallerThanOrEquals(&c3)) + require.True(t, c1.SmallerThanOrEquals(&c2)) + require.True(t, c1.SmallerThanOrEquals(&c3)) + require.True(t, c6.SmallerThanOrEquals(&c7)) + require.True(t, c7.SmallerThanOrEquals(&c8)) + require.True(t, c9.SmallerThanOrEquals(&c9)) + require.True(t, c9.SmallerThanOrEquals(&c10)) +} + +func TestBinlogCoordinatesAsKey(t *testing.T) { + m := make(map[BinlogCoordinates]bool) + + c1 := &FileBinlogCoordinates{LogFile: "mysql-bin.00017", LogPos: 104} + c2 := &FileBinlogCoordinates{LogFile: "mysql-bin.00022", LogPos: 104} + c3 := &FileBinlogCoordinates{LogFile: "mysql-bin.00017", LogPos: 104} + c4 := &FileBinlogCoordinates{LogFile: "mysql-bin.00017", LogPos: 222} + + m[c1] = true + m[c2] = true + m[c3] = true + m[c4] = true + + require.Len(t, m, 4) +} + +func TestIsLogPosOverflowBeyond4Bytes(t *testing.T) { + { + var preCoordinates *FileBinlogCoordinates + curCoordinates := &FileBinlogCoordinates{LogFile: "mysql-bin.00017", LogPos: 10321, EventSize: 1100} + require.False(t, curCoordinates.IsLogPosOverflowBeyond4Bytes(preCoordinates)) + } + { + preCoordinates := &FileBinlogCoordinates{LogFile: "mysql-bin.00017", LogPos: 1100, EventSize: 1100} + curCoordinates := &FileBinlogCoordinates{LogFile: "mysql-bin.00017", LogPos: int64(uint32(preCoordinates.LogPos + 1100)), EventSize: 1100} + require.False(t, curCoordinates.IsLogPosOverflowBeyond4Bytes(preCoordinates)) + } + { + preCoordinates := &FileBinlogCoordinates{LogFile: "mysql-bin.00016", LogPos: 1100, EventSize: 1100} + curCoordinates := &FileBinlogCoordinates{LogFile: "mysql-bin.00017", LogPos: int64(uint32(preCoordinates.LogPos + 1100)), EventSize: 1100} + require.False(t, curCoordinates.IsLogPosOverflowBeyond4Bytes(preCoordinates)) + } + { + preCoordinates := &FileBinlogCoordinates{LogFile: "mysql-bin.00017", LogPos: math.MaxUint32 - 1001, EventSize: 1000} + curCoordinates := &FileBinlogCoordinates{LogFile: "mysql-bin.00017", LogPos: int64(uint32(preCoordinates.LogPos + 1000)), EventSize: 1000} + require.False(t, curCoordinates.IsLogPosOverflowBeyond4Bytes(preCoordinates)) + } + { + preCoordinates := &FileBinlogCoordinates{LogFile: "mysql-bin.00017", LogPos: math.MaxUint32 - 1000, EventSize: 1000} + curCoordinates := &FileBinlogCoordinates{LogFile: "mysql-bin.00017", LogPos: int64(uint32(preCoordinates.LogPos + 1000)), EventSize: 1000} + require.False(t, curCoordinates.IsLogPosOverflowBeyond4Bytes(preCoordinates)) + } + { + preCoordinates := &FileBinlogCoordinates{LogFile: "mysql-bin.00017", LogPos: math.MaxUint32 - 999, EventSize: 1000} + curCoordinates := &FileBinlogCoordinates{LogFile: "mysql-bin.00017", LogPos: int64(uint32(preCoordinates.LogPos + 1000)), EventSize: 1000} + require.True(t, curCoordinates.IsLogPosOverflowBeyond4Bytes(preCoordinates)) + } + { + preCoordinates := &FileBinlogCoordinates{LogFile: "mysql-bin.00017", LogPos: int64(uint32(math.MaxUint32 - 500)), EventSize: 1000} + curCoordinates := &FileBinlogCoordinates{LogFile: "mysql-bin.00017", LogPos: int64(uint32(preCoordinates.LogPos + 1000)), EventSize: 1000} + require.True(t, curCoordinates.IsLogPosOverflowBeyond4Bytes(preCoordinates)) + } + { + preCoordinates := &FileBinlogCoordinates{LogFile: "mysql-bin.00017", LogPos: math.MaxUint32, EventSize: 1000} + curCoordinates := &FileBinlogCoordinates{LogFile: "mysql-bin.00017", LogPos: int64(uint32(preCoordinates.LogPos + 1000)), EventSize: 1000} + require.True(t, curCoordinates.IsLogPosOverflowBeyond4Bytes(preCoordinates)) + } +} diff --git a/go/mysql/binlog_gtid.go b/go/mysql/binlog_gtid.go new file mode 100644 index 000000000..d7b86c04f --- /dev/null +++ b/go/mysql/binlog_gtid.go @@ -0,0 +1,87 @@ +/* + Copyright 2022 GitHub Inc. + See https://github.com/github/gh-ost/blob/master/LICENSE +*/ + +package mysql + +import ( + gomysql "github.com/go-mysql-org/go-mysql/mysql" +) + +// GTIDBinlogCoordinates describe binary log coordinates in MySQL GTID format. +type GTIDBinlogCoordinates struct { + GTIDSet *gomysql.MysqlGTIDSet + UUIDSet *gomysql.UUIDSet +} + +// NewGTIDBinlogCoordinates parses a MySQL GTID set into a *GTIDBinlogCoordinates struct. +func NewGTIDBinlogCoordinates(gtidSet string) (*GTIDBinlogCoordinates, error) { + set, err := gomysql.ParseMysqlGTIDSet(gtidSet) + return >IDBinlogCoordinates{ + GTIDSet: set.(*gomysql.MysqlGTIDSet), + }, err +} + +// DisplayString returns a user-friendly string representation of these current UUID set or the full GTID set. +func (this *GTIDBinlogCoordinates) DisplayString() string { + if this.UUIDSet != nil { + return this.UUIDSet.String() + } + return this.String() +} + +// String returns a user-friendly string representation of these full GTID set. +func (this GTIDBinlogCoordinates) String() string { + return this.GTIDSet.String() +} + +// Equals tests equality of this coordinate and another one. +func (this *GTIDBinlogCoordinates) Equals(other BinlogCoordinates) bool { + if other == nil || this.IsEmpty() || other.IsEmpty() { + return false + } + + otherCoords, ok := other.(*GTIDBinlogCoordinates) + if !ok { + return false + } + + return this.GTIDSet.Equal(otherCoords.GTIDSet) +} + +// IsEmpty returns true if the GTID set is empty. +func (this *GTIDBinlogCoordinates) IsEmpty() bool { + return this.GTIDSet == nil +} + +// SmallerThan returns true if this coordinate is strictly smaller than the other. +func (this *GTIDBinlogCoordinates) SmallerThan(other BinlogCoordinates) bool { + if other == nil || this.IsEmpty() || other.IsEmpty() { + return false + } + otherCoords, ok := other.(*GTIDBinlogCoordinates) + if !ok { + return false + } + + // if 'this' does not contain the same sets we assume we are behind 'other'. + // there are probably edge cases where this isn't true + return !this.GTIDSet.Contain(otherCoords.GTIDSet) +} + +// SmallerThanOrEquals returns true if this coordinate is the same or equal to the other one. +func (this *GTIDBinlogCoordinates) SmallerThanOrEquals(other BinlogCoordinates) bool { + return this.Equals(other) || this.SmallerThan(other) +} + +func (this *GTIDBinlogCoordinates) Clone() BinlogCoordinates { + out := >IDBinlogCoordinates{} + if this.GTIDSet != nil { + out.GTIDSet = this.GTIDSet.Clone().(*gomysql.MysqlGTIDSet) + } + if this.UUIDSet != nil { + out.UUIDSet = this.UUIDSet.Clone() + } + return out +} diff --git a/go/mysql/binlog_test.go b/go/mysql/binlog_test.go deleted file mode 100644 index 4e7a9c7db..000000000 --- a/go/mysql/binlog_test.go +++ /dev/null @@ -1,98 +0,0 @@ -/* - Copyright 2016 GitHub Inc. - See https://github.com/github/gh-ost/blob/master/LICENSE -*/ - -package mysql - -import ( - "math" - "testing" - - "github.com/openark/golib/log" - "github.com/stretchr/testify/require" -) - -func init() { - log.SetLevel(log.ERROR) -} - -func TestBinlogCoordinates(t *testing.T) { - c1 := BinlogCoordinates{LogFile: "mysql-bin.00017", LogPos: 104} - c2 := BinlogCoordinates{LogFile: "mysql-bin.00017", LogPos: 104} - c3 := BinlogCoordinates{LogFile: "mysql-bin.00017", LogPos: 5000} - c4 := BinlogCoordinates{LogFile: "mysql-bin.00112", LogPos: 104} - - require.True(t, c1.Equals(&c2)) - require.False(t, c1.Equals(&c3)) - require.False(t, c1.Equals(&c4)) - require.False(t, c1.SmallerThan(&c2)) - require.True(t, c1.SmallerThan(&c3)) - require.True(t, c1.SmallerThan(&c4)) - require.True(t, c3.SmallerThan(&c4)) - require.False(t, c3.SmallerThan(&c2)) - require.False(t, c4.SmallerThan(&c2)) - require.False(t, c4.SmallerThan(&c3)) - - require.True(t, c1.SmallerThanOrEquals(&c2)) - require.True(t, c1.SmallerThanOrEquals(&c3)) -} - -func TestBinlogCoordinatesAsKey(t *testing.T) { - m := make(map[BinlogCoordinates]bool) - - c1 := BinlogCoordinates{LogFile: "mysql-bin.00017", LogPos: 104} - c2 := BinlogCoordinates{LogFile: "mysql-bin.00022", LogPos: 104} - c3 := BinlogCoordinates{LogFile: "mysql-bin.00017", LogPos: 104} - c4 := BinlogCoordinates{LogFile: "mysql-bin.00017", LogPos: 222} - - m[c1] = true - m[c2] = true - m[c3] = true - m[c4] = true - - require.Len(t, m, 3) -} - -func TestIsLogPosOverflowBeyond4Bytes(t *testing.T) { - { - var preCoordinates *BinlogCoordinates - curCoordinates := &BinlogCoordinates{LogFile: "mysql-bin.00017", LogPos: 10321, EventSize: 1100} - require.False(t, curCoordinates.IsLogPosOverflowBeyond4Bytes(preCoordinates)) - } - { - preCoordinates := &BinlogCoordinates{LogFile: "mysql-bin.00017", LogPos: 1100, EventSize: 1100} - curCoordinates := &BinlogCoordinates{LogFile: "mysql-bin.00017", LogPos: int64(uint32(preCoordinates.LogPos + 1100)), EventSize: 1100} - require.False(t, curCoordinates.IsLogPosOverflowBeyond4Bytes(preCoordinates)) - } - { - preCoordinates := &BinlogCoordinates{LogFile: "mysql-bin.00016", LogPos: 1100, EventSize: 1100} - curCoordinates := &BinlogCoordinates{LogFile: "mysql-bin.00017", LogPos: int64(uint32(preCoordinates.LogPos + 1100)), EventSize: 1100} - require.False(t, curCoordinates.IsLogPosOverflowBeyond4Bytes(preCoordinates)) - } - { - preCoordinates := &BinlogCoordinates{LogFile: "mysql-bin.00017", LogPos: math.MaxUint32 - 1001, EventSize: 1000} - curCoordinates := &BinlogCoordinates{LogFile: "mysql-bin.00017", LogPos: int64(uint32(preCoordinates.LogPos + 1000)), EventSize: 1000} - require.False(t, curCoordinates.IsLogPosOverflowBeyond4Bytes(preCoordinates)) - } - { - preCoordinates := &BinlogCoordinates{LogFile: "mysql-bin.00017", LogPos: math.MaxUint32 - 1000, EventSize: 1000} - curCoordinates := &BinlogCoordinates{LogFile: "mysql-bin.00017", LogPos: int64(uint32(preCoordinates.LogPos + 1000)), EventSize: 1000} - require.False(t, curCoordinates.IsLogPosOverflowBeyond4Bytes(preCoordinates)) - } - { - preCoordinates := &BinlogCoordinates{LogFile: "mysql-bin.00017", LogPos: math.MaxUint32 - 999, EventSize: 1000} - curCoordinates := &BinlogCoordinates{LogFile: "mysql-bin.00017", LogPos: int64(uint32(preCoordinates.LogPos + 1000)), EventSize: 1000} - require.True(t, curCoordinates.IsLogPosOverflowBeyond4Bytes(preCoordinates)) - } - { - preCoordinates := &BinlogCoordinates{LogFile: "mysql-bin.00017", LogPos: int64(uint32(math.MaxUint32 - 500)), EventSize: 1000} - curCoordinates := &BinlogCoordinates{LogFile: "mysql-bin.00017", LogPos: int64(uint32(preCoordinates.LogPos + 1000)), EventSize: 1000} - require.True(t, curCoordinates.IsLogPosOverflowBeyond4Bytes(preCoordinates)) - } - { - preCoordinates := &BinlogCoordinates{LogFile: "mysql-bin.00017", LogPos: math.MaxUint32, EventSize: 1000} - curCoordinates := &BinlogCoordinates{LogFile: "mysql-bin.00017", LogPos: int64(uint32(preCoordinates.LogPos + 1000)), EventSize: 1000} - require.True(t, curCoordinates.IsLogPosOverflowBeyond4Bytes(preCoordinates)) - } -} diff --git a/go/mysql/utils.go b/go/mysql/utils.go index 71146d70a..7d57afbf1 100644 --- a/go/mysql/utils.go +++ b/go/mysql/utils.go @@ -1,5 +1,5 @@ /* - Copyright 2016 GitHub Inc. + Copyright 2022 GitHub Inc. See https://github.com/github/gh-ost/blob/master/LICENSE */ @@ -159,28 +159,43 @@ func GetMasterConnectionConfigSafe(dbVersion string, connectionConfig *Connectio return GetMasterConnectionConfigSafe(dbVersion, masterConfig, visitedKeys, allowMasterMaster) } -func GetReplicationBinlogCoordinates(dbVersion string, db *gosql.DB) (readBinlogCoordinates *BinlogCoordinates, executeBinlogCoordinates *BinlogCoordinates, err error) { +func GetReplicationBinlogCoordinates(dbVersion string, db *gosql.DB, gtid bool) (readBinlogCoordinates, executeBinlogCoordinates BinlogCoordinates, err error) { showReplicaStatusQuery := fmt.Sprintf("show %s", ReplicaTermFor(dbVersion, `slave status`)) err = sqlutils.QueryRowsMap(db, showReplicaStatusQuery, func(m sqlutils.RowMap) error { - readBinlogCoordinates = &BinlogCoordinates{ - LogFile: m.GetString(ReplicaTermFor(dbVersion, "Master_Log_File")), - LogPos: m.GetInt64(ReplicaTermFor(dbVersion, "Read_Master_Log_Pos")), - } - executeBinlogCoordinates = &BinlogCoordinates{ - LogFile: m.GetString(ReplicaTermFor(dbVersion, "Relay_Master_Log_File")), - LogPos: m.GetInt64(ReplicaTermFor(dbVersion, "Exec_Master_Log_Pos")), + if gtid { + executeBinlogCoordinates, err = NewGTIDBinlogCoordinates(m.GetString("Executed_Gtid_Set")) + if err != nil { + return err + } + readBinlogCoordinates, err = NewGTIDBinlogCoordinates(m.GetString("Retrieved_Gtid_Set")) + if err != nil { + return err + } + } else { + readBinlogCoordinates = NewFileBinlogCoordinates( + m.GetString(ReplicaTermFor(dbVersion, "Master_Log_File")), + m.GetInt64(ReplicaTermFor(dbVersion, "Read_Master_Log_Pos")), + ) + executeBinlogCoordinates = NewFileBinlogCoordinates( + m.GetString(ReplicaTermFor(dbVersion, "Relay_Master_Log_File")), + m.GetInt64(ReplicaTermFor(dbVersion, "Exec_Master_Log_Pos")), + ) } return nil }) return readBinlogCoordinates, executeBinlogCoordinates, err } -func GetSelfBinlogCoordinates(dbVersion string, db *gosql.DB) (selfBinlogCoordinates *BinlogCoordinates, err error) { +func GetSelfBinlogCoordinates(dbVersion string, db *gosql.DB, gtid bool) (selfBinlogCoordinates BinlogCoordinates, err error) { binaryLogStatusTerm := ReplicaTermFor(dbVersion, "master status") err = sqlutils.QueryRowsMap(db, fmt.Sprintf("show %s", binaryLogStatusTerm), func(m sqlutils.RowMap) error { - selfBinlogCoordinates = &BinlogCoordinates{ - LogFile: m.GetString("File"), - LogPos: m.GetInt64("Position"), + if gtid { + selfBinlogCoordinates, err = NewGTIDBinlogCoordinates(m.GetString("Executed_Gtid_Set")) + } else { + selfBinlogCoordinates = NewFileBinlogCoordinates( + m.GetString("File"), + m.GetInt64("Position"), + ) } return nil }) diff --git a/localtests/gtid/create.sql b/localtests/gtid/create.sql new file mode 100644 index 000000000..c3e7f8d7d --- /dev/null +++ b/localtests/gtid/create.sql @@ -0,0 +1,19 @@ +drop table if exists gh_ost_test; +create table gh_ost_test ( + id int auto_increment, + t varchar(128) charset utf8mb4, + primary key(id) +) auto_increment=1; + +drop event if exists gh_ost_test; +delimiter ;; +create event gh_ost_test + on schedule every 1 second + starts current_timestamp + ends current_timestamp + interval 60 second + on completion not preserve + enable + do +begin + insert into gh_ost_test values (null, md5(rand())); +end ;; diff --git a/localtests/gtid/extra_args b/localtests/gtid/extra_args new file mode 100644 index 000000000..ac1e8e795 --- /dev/null +++ b/localtests/gtid/extra_args @@ -0,0 +1 @@ +--gtid diff --git a/localtests/gtid/gtid_mode b/localtests/gtid/gtid_mode new file mode 100644 index 000000000..76371f28f --- /dev/null +++ b/localtests/gtid/gtid_mode @@ -0,0 +1 @@ +ON diff --git a/localtests/gtid/ignore_versions b/localtests/gtid/ignore_versions new file mode 100644 index 000000000..7acd3f06f --- /dev/null +++ b/localtests/gtid/ignore_versions @@ -0,0 +1 @@ +(5.5) diff --git a/localtests/keyword-column/extra_args b/localtests/keyword-column/extra_args index 5d73843b0..4b091d601 100644 --- a/localtests/keyword-column/extra_args +++ b/localtests/keyword-column/extra_args @@ -1 +1 @@ ---alter='add column `index` int unsigned' \ +--alter='add column `index` int unsigned' diff --git a/localtests/test.sh b/localtests/test.sh index 5cdc65f7e..7fe9c2ab2 100755 --- a/localtests/test.sh +++ b/localtests/test.sh @@ -13,6 +13,7 @@ default_ghost_binary=/tmp/gh-ost-test ghost_binary="" docker=false toxiproxy=false +gtid=false storage_engine=innodb exec_command_file=/tmp/gh-ost-test.bash ghost_structure_output_file=/tmp/gh-ost-test.ghost.structure.sql @@ -27,10 +28,11 @@ master_port= replica_host= replica_port= original_sql_mode= +current_gtid_mode= sysbench_pid= OPTIND=1 -while getopts "b:s:dt" OPTION; do +while getopts "b:s:dtg" OPTION; do case $OPTION in b) ghost_binary="$OPTARG" @@ -44,6 +46,9 @@ while getopts "b:s:dt" OPTION; do d) docker=true ;; + g) + gtid=true + ;; esac done shift $((OPTIND - 1)) @@ -65,6 +70,13 @@ verify_master_and_replica() { original_sql_mode="$(gh-ost-test-mysql-master -e "select @@global.sql_mode" -s -s)" echo "sql_mode on master is ${original_sql_mode}" + current_gtid_mode=$(gh-ost-test-mysql-master -s -s -e "select @@global.gtid_mode" 2>/dev/null || echo unsupported) + current_enforce_gtid_consistency=$(gh-ost-test-mysql-master -s -s -e "select @@global.enforce_gtid_consistency" 2>/dev/null || echo unsupported) + current_master_server_uuid=$(gh-ost-test-mysql-master -s -s -e "select @@global.server_uuid" 2>/dev/null || echo unsupported) + current_replica_server_uuid=$(gh-ost-test-mysql-replica -s -s -e "select @@global.server_uuid" 2>/dev/null || echo unsupported) + echo "gtid_mode on master is ${current_gtid_mode} with enforce_gtid_consistency=${current_enforce_gtid_consistency}" + echo "server_uuid on master is ${current_master_server_uuid}, replica is ${current_replica_server_uuid}" + echo "Gracefully sleeping for 3 seconds while replica is setting up..." sleep 3 @@ -191,6 +203,14 @@ test_single() { start_replication echo_dot + if [ -f $tests_path/$test_name/gtid_mode ]; then + target_gtid_mode=$(cat $tests_path/$test_name/gtid_mode) + if [ "$current_gtid_mode" != "$target_gtid_mode" ]; then + echo "gtid_mode is ${current_gtid_mode}, expected ${target_gtid_mode}" + exit 1 + fi + fi + if [ -f $tests_path/$test_name/sql_mode ]; then gh-ost-test-mysql-master --default-character-set=utf8mb4 test -e "set @@global.sql_mode='$(cat $tests_path/$test_name/sql_mode)'" gh-ost-test-mysql-replica --default-character-set=utf8mb4 test -e "set @@global.sql_mode='$(cat $tests_path/$test_name/sql_mode)'" @@ -210,6 +230,9 @@ test_single() { if [ -f $tests_path/$test_name/extra_args ]; then extra_args=$(cat $tests_path/$test_name/extra_args) fi + if [ "$gtid" = true ]; then + extra_args+=" --gtid" + fi if [ "$toxiproxy" = true ]; then extra_args+=" --skip-port-validation" fi @@ -252,7 +275,7 @@ test_single() { fi # - cmd="$ghost_binary \ + cmd="GOTRACEBACK=crash $ghost_binary \ --user=gh-ost \ --password=gh-ost \ --host=$replica_host \ diff --git a/localtests/trivial/extra_args b/localtests/trivial/extra_args index 8b6320aa1..75bbe43a4 100644 --- a/localtests/trivial/extra_args +++ b/localtests/trivial/extra_args @@ -1 +1 @@ ---throttle-query='select false' \ +--throttle-query='select false' diff --git a/script/docker-gh-ost-replica-tests b/script/docker-gh-ost-replica-tests index 4c068e6d6..e267d4f89 100755 --- a/script/docker-gh-ost-replica-tests +++ b/script/docker-gh-ost-replica-tests @@ -63,7 +63,7 @@ create_toxiproxy() { -H "Content-Type: application/json" \ -d '{"name": "limit_data_downstream", "type": "limit_data", - "attributes": {"bytes": 300000}}' + "attributes": {"bytes": 1000000}}' echo }