@@ -130,6 +130,18 @@ func (this *Migrator) sleepWhileTrue(operation func() (bool, error)) error {
130130 }
131131}
132132
133+ func (this * Migrator ) retryBatchCopyWithHooks (operation func () error , notFatalHint ... bool ) (err error ) {
134+ wrappedOperation := func () error {
135+ if err := operation (); err != nil {
136+ this .hooksExecutor .onBatchCopyRetry ()
137+ return err
138+ }
139+ return nil
140+ }
141+
142+ return this .retryOperation (wrappedOperation , notFatalHint ... )
143+ }
144+
133145// retryOperation attempts up to `count` attempts at running given function,
134146// exiting as soon as it returns with non-error.
135147func (this * Migrator ) retryOperation (operation func () error , notFatalHint ... bool ) (err error ) {
@@ -1232,28 +1244,28 @@ func (this *Migrator) iterateChunks() error {
12321244 return nil
12331245 }
12341246 copyRowsFunc := func () error {
1235- if atomic .LoadInt64 (& this .rowCopyCompleteFlag ) == 1 || atomic .LoadInt64 (& hasNoFurtherRangeFlag ) == 1 {
1236- // Done.
1237- // There's another such check down the line
1238- return nil
1239- }
1240-
1241- // When hasFurtherRange is false, original table might be write locked and CalculateNextIterationRangeEndValues would hangs forever
1242-
1243- hasFurtherRange := false
1244- expectedRangeSize := int64 (0 )
1245- if err := this .retryOperation (func () (e error ) {
1246- hasFurtherRange , expectedRangeSize , e = this .applier .CalculateNextIterationRangeEndValues ()
1247- return e
1248- }); err != nil {
1249- return terminateRowIteration (err )
1250- }
1251- if ! hasFurtherRange {
1252- atomic .StoreInt64 (& hasNoFurtherRangeFlag , 1 )
1253- return terminateRowIteration (nil )
1254- }
12551247 // Copy task:
12561248 applyCopyRowsFunc := func () error {
1249+ if atomic .LoadInt64 (& this .rowCopyCompleteFlag ) == 1 || atomic .LoadInt64 (& hasNoFurtherRangeFlag ) == 1 {
1250+ // Done.
1251+ // There's another such check down the line
1252+ return nil
1253+ }
1254+
1255+ // When hasFurtherRange is false, original table might be write locked and CalculateNextIterationRangeEndValues would hangs forever
1256+
1257+ hasFurtherRange := false
1258+ // TODO: figure out how to rewrite this double retry?
1259+ if err := this .retryOperation (func () (e error ) {
1260+ hasFurtherRange , e = this .applier .CalculateNextIterationRangeEndValues ()
1261+ return e
1262+ }); err != nil {
1263+ return terminateRowIteration (err )
1264+ }
1265+ if ! hasFurtherRange {
1266+ atomic .StoreInt64 (& hasNoFurtherRangeFlag , 1 )
1267+ return terminateRowIteration (nil )
1268+ }
12571269 if atomic .LoadInt64 (& this .rowCopyCompleteFlag ) == 1 {
12581270 // No need for more writes.
12591271 // This is the de-facto place where we avoid writing in the event of completed cut-over.
@@ -1286,7 +1298,7 @@ func (this *Migrator) iterateChunks() error {
12861298 atomic .AddInt64 (& this .migrationContext .Iteration , 1 )
12871299 return nil
12881300 }
1289- if err := this .retryOperation (applyCopyRowsFunc ); err != nil {
1301+ if err := this .retryBatchCopyWithHooks (applyCopyRowsFunc ); err != nil {
12901302 return terminateRowIteration (err )
12911303 }
12921304 return nil
0 commit comments