From b95f7ee6ecfb6ede909f0c1a8ec77d42ea6f0302 Mon Sep 17 00:00:00 2001 From: Marcel Tyszkiewicz Date: Fri, 16 May 2025 11:05:35 +0200 Subject: [PATCH 1/2] change logic for batch size checking --- writers/batchwriter/batchwriter.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/writers/batchwriter/batchwriter.go b/writers/batchwriter/batchwriter.go index af1b0bcb23..76dcb69615 100644 --- a/writers/batchwriter/batchwriter.go +++ b/writers/batchwriter/batchwriter.go @@ -278,7 +278,7 @@ func (w *BatchWriter) Write(ctx context.Context, msgs <-chan message.WriteMessag w.deleteStaleMessages = append(w.deleteStaleMessages, m) l := int64(len(w.deleteStaleMessages)) w.deleteStaleLock.Unlock() - if w.batchSize > 0 && l > w.batchSize { + if w.batchSize > 0 && l >= w.batchSize { if err := w.flushDeleteStaleTables(ctx); err != nil { return err } @@ -298,7 +298,7 @@ func (w *BatchWriter) Write(ctx context.Context, msgs <-chan message.WriteMessag w.deleteRecordMessages = append(w.deleteRecordMessages, m) l := int64(len(w.deleteRecordMessages)) w.deleteRecordLock.Unlock() - if w.batchSize > 0 && l > w.batchSize { + if w.batchSize > 0 && l >= w.batchSize { if err := w.flushDeleteRecordTables(ctx); err != nil { return err } @@ -322,7 +322,7 @@ func (w *BatchWriter) Write(ctx context.Context, msgs <-chan message.WriteMessag w.migrateTableMessages = append(w.migrateTableMessages, m) l := int64(len(w.migrateTableMessages)) w.migrateTableLock.Unlock() - if w.batchSize > 0 && l > w.batchSize { + if w.batchSize > 0 && l >= w.batchSize { if err := w.flushMigrateTables(ctx); err != nil { return err } From 05b421261b2d926c13e61cfe9b70e97e8a70b6a5 Mon Sep 17 00:00:00 2001 From: Marcel Tyszkiewicz Date: Mon, 19 May 2025 12:30:53 +0200 Subject: [PATCH 2/2] refactor limit check --- writers/batchwriter/batchwriter.go | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/writers/batchwriter/batchwriter.go b/writers/batchwriter/batchwriter.go index 76dcb69615..9bbd67020f 100644 --- a/writers/batchwriter/batchwriter.go +++ b/writers/batchwriter/batchwriter.go @@ -278,7 +278,7 @@ func (w *BatchWriter) Write(ctx context.Context, msgs <-chan message.WriteMessag w.deleteStaleMessages = append(w.deleteStaleMessages, m) l := int64(len(w.deleteStaleMessages)) w.deleteStaleLock.Unlock() - if w.batchSize > 0 && l >= w.batchSize { + if w.isLimitReached(l) { if err := w.flushDeleteStaleTables(ctx); err != nil { return err } @@ -298,7 +298,7 @@ func (w *BatchWriter) Write(ctx context.Context, msgs <-chan message.WriteMessag w.deleteRecordMessages = append(w.deleteRecordMessages, m) l := int64(len(w.deleteRecordMessages)) w.deleteRecordLock.Unlock() - if w.batchSize > 0 && l >= w.batchSize { + if w.isLimitReached(l) { if err := w.flushDeleteRecordTables(ctx); err != nil { return err } @@ -322,7 +322,7 @@ func (w *BatchWriter) Write(ctx context.Context, msgs <-chan message.WriteMessag w.migrateTableMessages = append(w.migrateTableMessages, m) l := int64(len(w.migrateTableMessages)) w.migrateTableLock.Unlock() - if w.batchSize > 0 && l >= w.batchSize { + if w.isLimitReached(l) { if err := w.flushMigrateTables(ctx); err != nil { return err } @@ -332,6 +332,12 @@ func (w *BatchWriter) Write(ctx context.Context, msgs <-chan message.WriteMessag return nil } +func (w *BatchWriter) isLimitReached(rowCount int64) bool { + limit := batch.CappedAt(0, w.batchSize) + limit.AddRows(rowCount) + return limit.ReachedLimit() +} + func (w *BatchWriter) startWorker(_ context.Context, msg *message.WriteInsert) error { w.workersLock.RLock() md := msg.Record.Schema().Metadata()