Skip to content
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions writers/batchwriter/batchwriter.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@ func (w *BatchWriter) Write(ctx context.Context, msgs <-chan message.WriteMessag
w.deleteStaleMessages = append(w.deleteStaleMessages, m)
l := int64(len(w.deleteStaleMessages))
w.deleteStaleLock.Unlock()
if w.batchSize > 0 && l > w.batchSize {
if w.batchSize > 0 && l >= w.batchSize {
if err := w.flushDeleteStaleTables(ctx); err != nil {
return err
}
Expand All @@ -298,7 +298,7 @@ func (w *BatchWriter) Write(ctx context.Context, msgs <-chan message.WriteMessag
w.deleteRecordMessages = append(w.deleteRecordMessages, m)
l := int64(len(w.deleteRecordMessages))
w.deleteRecordLock.Unlock()
if w.batchSize > 0 && l > w.batchSize {
if w.batchSize > 0 && l >= w.batchSize {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if w.batchSize > 0 && l >= w.batchSize {
limit := batch.CappedAt(0, w.batchSize)
limit.AddRows(l)
if limit.ReachedLimit() {

I think this is more aligned with the rest of the codebase e.g.

if len(toFlush) > 0 || rest != nil || s.limit.ReachedLimit() {

same comment for other places where we check the limit

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done in 05b4212. Tested and it works correctly

if err := w.flushDeleteRecordTables(ctx); err != nil {
return err
}
Expand All @@ -322,7 +322,7 @@ func (w *BatchWriter) Write(ctx context.Context, msgs <-chan message.WriteMessag
w.migrateTableMessages = append(w.migrateTableMessages, m)
l := int64(len(w.migrateTableMessages))
w.migrateTableLock.Unlock()
if w.batchSize > 0 && l > w.batchSize {
if w.batchSize > 0 && l >= w.batchSize {
if err := w.flushMigrateTables(ctx); err != nil {
return err
}
Expand Down
Loading