Skip to content

Commit a0e2801

Browse files
fix: Fix race condition on batchsender when sending resources from multiple goroutines (#2405)
1 parent b7188f1 commit a0e2801

File tree

2 files changed

+64
-20
lines changed

2 files changed

+64
-20
lines changed

scheduler/batchsender/batch_sender.go

Lines changed: 21 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -19,17 +19,20 @@ const (
1919
// - If the current batch has reached the batch size, it will be sent immediately
2020
// - Otherwise, a timer will be started to send the current batch after the batch timeout
2121
type BatchSender struct {
22-
sendFn func(any)
23-
items []any
24-
timer *time.Timer
25-
itemsLock sync.Mutex
22+
sendFn func(any)
23+
items []any
24+
timer *time.Timer
25+
mu sync.Mutex
2626
}
2727

2828
func NewBatchSender(sendFn func(any)) *BatchSender {
2929
return &BatchSender{sendFn: sendFn}
3030
}
3131

3232
func (bs *BatchSender) Send(item any) {
33+
bs.mu.Lock()
34+
defer bs.mu.Unlock()
35+
3336
if bs.timer != nil {
3437
bs.timer.Stop()
3538
}
@@ -39,34 +42,29 @@ func (bs *BatchSender) Send(item any) {
3942
// If item is already a slice, send it directly
4043
// together with the current batch
4144
if len(items) > 1 {
42-
bs.flush(items...)
45+
bs.flushLocked(items...)
4346
return
4447
}
4548

4649
// Otherwise, add item to the current batch
47-
bs.appendToBatch(items...)
50+
bs.items = append(bs.items, items...)
4851

4952
// If the current batch has reached the batch size, send it
5053
if len(bs.items) >= batchSize {
51-
bs.flush()
54+
bs.flushLocked()
5255
return
5356
}
5457

5558
// Otherwise, start a timer to send the current batch after the batch timeout
56-
bs.timer = time.AfterFunc(batchTimeout, func() { bs.flush() })
57-
}
58-
59-
func (bs *BatchSender) appendToBatch(items ...any) {
60-
bs.itemsLock.Lock()
61-
defer bs.itemsLock.Unlock()
62-
63-
bs.items = append(bs.items, items...)
59+
bs.timer = time.AfterFunc(batchTimeout, func() {
60+
bs.mu.Lock()
61+
defer bs.mu.Unlock()
62+
bs.flushLocked()
63+
})
6464
}
6565

66-
func (bs *BatchSender) flush(items ...any) {
67-
bs.itemsLock.Lock()
68-
defer bs.itemsLock.Unlock()
69-
66+
// flushLocked sends all buffered items. Must be called with bs.mu held.
67+
func (bs *BatchSender) flushLocked(items ...any) {
7068
bs.items = append(bs.items, items...)
7169

7270
if len(bs.items) == 0 {
@@ -78,8 +76,11 @@ func (bs *BatchSender) flush(items ...any) {
7876
}
7977

8078
func (bs *BatchSender) Close() {
79+
bs.mu.Lock()
80+
defer bs.mu.Unlock()
81+
8182
if bs.timer != nil {
8283
bs.timer.Stop()
8384
}
84-
bs.flush()
85+
bs.flushLocked()
8586
}
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
package batchsender
2+
3+
import (
4+
"sync"
5+
"testing"
6+
"time"
7+
)
8+
9+
// This test verifies there is no data race between Send() and the timer-triggered flush.
10+
func TestSend_ConcurrentWithTimerFlush(_ *testing.T) {
11+
// The race occurs when:
12+
// 1. a Send() call schedules a timer via time.AfterFunc
13+
// 2. the timer fires and calls flush() on a separate goroutine
14+
// 3. another Send() reads bs.items concurrently.
15+
//
16+
// To trigger this, we send items from multiple goroutines with delays around batchTimeout so the timer fires between Sends.
17+
var mu sync.Mutex
18+
var received []any
19+
20+
const numGoroutines = 5
21+
const sendsPerGoroutine = 20
22+
23+
bs := NewBatchSender(func(items any) {
24+
mu.Lock()
25+
defer mu.Unlock()
26+
received = append(received, items)
27+
})
28+
29+
var wg sync.WaitGroup
30+
wg.Add(numGoroutines)
31+
for range numGoroutines {
32+
go func() {
33+
defer wg.Done()
34+
for range sendsPerGoroutine {
35+
bs.Send("item")
36+
time.Sleep(batchTimeout + 10*time.Millisecond)
37+
}
38+
}()
39+
}
40+
41+
wg.Wait()
42+
bs.Close()
43+
}

0 commit comments

Comments
 (0)