Skip to content

Commit b000b24

Browse files
authored
Replace usage of Fatale with context cancellation (#1639)
* Replace log.Fatale with context-based error propagation This patch modifies gh-ost to use a cancellable context instead of log.Fatale() in listenOnPanicAbort. When using gh-ost as a library, this allows the calling application to recover from aborts (e.g. log the failure reason) instead of having the entire process terminate via os.Exit(). Now we store the error and cancel a context to signal all goroutines to stop gracefully. * Fix shadowing * Simplify non-blocking poll * Simplify non-blocking poll in migrator.go * Fix error return * Fix hang on blocking channel send * Add defensive fix for other potential blocking channel send deadlocks * Add SendWithContext helper to avoid deadlocks * Fix deadlock on PanicAbort sends * Use checkAbort * Fix migration abort race condition * Remove buffer on PanicAbort channel
1 parent f7862c0 commit b000b24

File tree

9 files changed

+580
-36
lines changed

9 files changed

+580
-36
lines changed

.github/CONTRIBUTING.md

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,24 @@ Here are a few things you can do that will increase the likelihood of your pull
1919
- Keep your change as focused as possible. If there are multiple changes you would like to make that are not dependent upon each other, consider submitting them as separate pull requests.
2020
- Write a [good commit message](http://tbaggery.com/2008/04/19/a-note-about-git-commit-messages.html).
2121

22+
## Development Guidelines
23+
24+
### Channel Safety
25+
26+
When working with channels in goroutines, it's critical to prevent deadlocks that can occur when a channel receiver exits due to an error while senders are still trying to send values. Always use `base.SendWithContext` for channel sends to avoid deadlocks:
27+
28+
```go
29+
// ✅ CORRECT - Uses helper to prevent deadlock
30+
if err := base.SendWithContext(ctx, ch, value); err != nil {
31+
return err // context was cancelled
32+
}
33+
34+
// ❌ WRONG - Can deadlock if receiver exits
35+
ch <- value
36+
```
37+
38+
Even if the destination channel is buffered, deadlocks could still occur if the buffer fills up and the receiver exits, so it's important to use `SendWithContext` in those cases as well.
39+
2240
## Resources
2341

2442
- [Contributing to Open Source on GitHub](https://guides.github.com/activities/contributing-to-open-source/)

go/base/context.go

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
package base
77

88
import (
9+
"context"
910
"fmt"
1011
"math"
1112
"os"
@@ -225,6 +226,16 @@ type MigrationContext struct {
225226
InCutOverCriticalSectionFlag int64
226227
PanicAbort chan error
227228

229+
// Context for cancellation signaling across all goroutines
230+
// Stored in struct as it spans the entire migration lifecycle, not per-function.
231+
// context.Context is safe for concurrent use by multiple goroutines.
232+
ctx context.Context //nolint:containedctx
233+
cancelFunc context.CancelFunc
234+
235+
// Stores the fatal error that triggered abort
236+
AbortError error
237+
abortMutex *sync.Mutex
238+
228239
OriginalTableColumnsOnApplier *sql.ColumnList
229240
OriginalTableColumns *sql.ColumnList
230241
OriginalTableVirtualColumns *sql.ColumnList
@@ -293,6 +304,7 @@ type ContextConfig struct {
293304
}
294305

295306
func NewMigrationContext() *MigrationContext {
307+
ctx, cancelFunc := context.WithCancel(context.Background())
296308
return &MigrationContext{
297309
Uuid: uuid.NewString(),
298310
defaultNumRetries: 60,
@@ -313,6 +325,9 @@ func NewMigrationContext() *MigrationContext {
313325
lastHeartbeatOnChangelogMutex: &sync.Mutex{},
314326
ColumnRenameMap: make(map[string]string),
315327
PanicAbort: make(chan error),
328+
ctx: ctx,
329+
cancelFunc: cancelFunc,
330+
abortMutex: &sync.Mutex{},
316331
Log: NewDefaultLogger(),
317332
}
318333
}
@@ -982,3 +997,54 @@ func (this *MigrationContext) GetGhostTriggerName(triggerName string) string {
982997
func (this *MigrationContext) ValidateGhostTriggerLengthBelowMaxLength(triggerName string) bool {
983998
return utf8.RuneCountInString(triggerName) <= mysql.MaxTableNameLength
984999
}
1000+
1001+
// GetContext returns the migration context for cancellation checking
1002+
func (this *MigrationContext) GetContext() context.Context {
1003+
return this.ctx
1004+
}
1005+
1006+
// SetAbortError stores the fatal error that triggered abort
1007+
// Only the first error is stored (subsequent errors are ignored)
1008+
func (this *MigrationContext) SetAbortError(err error) {
1009+
this.abortMutex.Lock()
1010+
defer this.abortMutex.Unlock()
1011+
if this.AbortError == nil {
1012+
this.AbortError = err
1013+
}
1014+
}
1015+
1016+
// GetAbortError retrieves the stored abort error
1017+
func (this *MigrationContext) GetAbortError() error {
1018+
this.abortMutex.Lock()
1019+
defer this.abortMutex.Unlock()
1020+
return this.AbortError
1021+
}
1022+
1023+
// CancelContext cancels the migration context to signal all goroutines to stop
1024+
// The cancel function is safe to call multiple times and from multiple goroutines.
1025+
func (this *MigrationContext) CancelContext() {
1026+
if this.cancelFunc != nil {
1027+
this.cancelFunc()
1028+
}
1029+
}
1030+
1031+
// SendWithContext attempts to send a value to a channel, but returns early
1032+
// if the context is cancelled. This prevents goroutine deadlocks when the
1033+
// channel receiver has exited due to an error.
1034+
//
1035+
// Use this instead of bare channel sends (ch <- val) in goroutines to ensure
1036+
// proper cleanup when the migration is aborted.
1037+
//
1038+
// Example:
1039+
//
1040+
// if err := base.SendWithContext(ctx, ch, value); err != nil {
1041+
// return err // context was cancelled
1042+
// }
1043+
func SendWithContext[T any](ctx context.Context, ch chan<- T, val T) error {
1044+
select {
1045+
case ch <- val:
1046+
return nil
1047+
case <-ctx.Done():
1048+
return ctx.Err()
1049+
}
1050+
}

go/base/context_test.go

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,10 @@
66
package base
77

88
import (
9+
"errors"
910
"os"
1011
"strings"
12+
"sync"
1113
"testing"
1214
"time"
1315

@@ -213,3 +215,58 @@ func TestReadConfigFile(t *testing.T) {
213215
}
214216
}
215217
}
218+
219+
func TestSetAbortError_StoresFirstError(t *testing.T) {
220+
ctx := NewMigrationContext()
221+
222+
err1 := errors.New("first error")
223+
err2 := errors.New("second error")
224+
225+
ctx.SetAbortError(err1)
226+
ctx.SetAbortError(err2)
227+
228+
got := ctx.GetAbortError()
229+
if got != err1 { //nolint:errorlint // Testing pointer equality for sentinel error
230+
t.Errorf("Expected first error %v, got %v", err1, got)
231+
}
232+
}
233+
234+
func TestSetAbortError_ThreadSafe(t *testing.T) {
235+
ctx := NewMigrationContext()
236+
237+
var wg sync.WaitGroup
238+
errs := []error{
239+
errors.New("error 1"),
240+
errors.New("error 2"),
241+
errors.New("error 3"),
242+
}
243+
244+
// Launch 3 goroutines trying to set error concurrently
245+
for _, err := range errs {
246+
wg.Add(1)
247+
go func(e error) {
248+
defer wg.Done()
249+
ctx.SetAbortError(e)
250+
}(err)
251+
}
252+
253+
wg.Wait()
254+
255+
// Should store exactly one of the errors
256+
got := ctx.GetAbortError()
257+
if got == nil {
258+
t.Fatal("Expected error to be stored, got nil")
259+
}
260+
261+
// Verify it's one of the errors we sent
262+
found := false
263+
for _, err := range errs {
264+
if got == err { //nolint:errorlint // Testing pointer equality for sentinel error
265+
found = true
266+
break
267+
}
268+
}
269+
if !found {
270+
t.Errorf("Stored error %v not in list of sent errors", got)
271+
}
272+
}

go/logic/applier.go

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -695,7 +695,17 @@ func (this *Applier) InitiateHeartbeat() {
695695

696696
ticker := time.NewTicker(time.Duration(this.migrationContext.HeartbeatIntervalMilliseconds) * time.Millisecond)
697697
defer ticker.Stop()
698-
for range ticker.C {
698+
for {
699+
// Check for context cancellation each iteration
700+
ctx := this.migrationContext.GetContext()
701+
select {
702+
case <-ctx.Done():
703+
this.migrationContext.Log.Debugf("Heartbeat injection cancelled")
704+
return
705+
case <-ticker.C:
706+
// Process heartbeat
707+
}
708+
699709
if atomic.LoadInt64(&this.finishedMigrating) > 0 {
700710
return
701711
}
@@ -706,7 +716,8 @@ func (this *Applier) InitiateHeartbeat() {
706716
continue
707717
}
708718
if err := injectHeartbeat(); err != nil {
709-
this.migrationContext.PanicAbort <- fmt.Errorf("injectHeartbeat writing failed %d times, last error: %w", numSuccessiveFailures, err)
719+
// Use helper to prevent deadlock if listenOnPanicAbort already exited
720+
_ = base.SendWithContext(this.migrationContext.GetContext(), this.migrationContext.PanicAbort, fmt.Errorf("injectHeartbeat writing failed %d times, last error: %w", numSuccessiveFailures, err))
710721
return
711722
}
712723
}

0 commit comments

Comments
 (0)