Skip to content

Commit dadeb5a

Browse files
perf: tail latency with goSched (#2033)
Alternate implementation to #2016 that doesn't reduce RPS with lower amounts of threads
1 parent abaf03c commit dadeb5a

5 files changed

Lines changed: 57 additions & 26 deletions

File tree

caddy/mercure-skip.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
//go:build nomercure
2+
23
package caddy
34

45
import (

caddy/workerconfig.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ type workerConfig struct {
3939
// MaxConsecutiveFailures sets the maximum number of consecutive failures before panicking (defaults to 6, set to -1 to never panick)
4040
MaxConsecutiveFailures int `json:"max_consecutive_failures,omitempty"`
4141

42-
requestOptions []frankenphp.RequestOption
42+
requestOptions []frankenphp.RequestOption
4343
}
4444

4545
func parseWorkerConfig(d *caddyfile.Dispenser) (workerConfig, error) {

frankenphp.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -307,7 +307,7 @@ func Init(options ...Option) error {
307307
return err
308308
}
309309

310-
regularRequestChan = make(chan contextHolder, opt.numThreads-workerThreadCount)
310+
regularRequestChan = make(chan contextHolder)
311311
regularThreads = make([]*phpThread, 0, opt.numThreads-workerThreadCount)
312312
for i := 0; i < opt.numThreads-workerThreadCount; i++ {
313313
convertToRegularThread(getInactivePHPThread())

threadregular.go

Lines changed: 30 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,9 @@ package frankenphp
22

33
import (
44
"context"
5+
"runtime"
56
"sync"
7+
"sync/atomic"
68
)
79

810
// representation of a non-worker PHP thread
@@ -16,9 +18,10 @@ type regularThread struct {
1618
}
1719

1820
var (
19-
regularThreads []*phpThread
20-
regularThreadMu = &sync.RWMutex{}
21-
regularRequestChan chan contextHolder
21+
regularThreads []*phpThread
22+
regularThreadMu = &sync.RWMutex{}
23+
regularRequestChan chan contextHolder
24+
queuedRegularThreads = atomic.Int32{}
2225
)
2326

2427
func convertToRegularThread(thread *phpThread) {
@@ -81,6 +84,7 @@ func (handler *regularThread) waitForRequest() string {
8184
// go back to beforeScriptExecution
8285
return handler.beforeScriptExecution()
8386
case ch = <-regularRequestChan:
87+
case ch = <-handler.thread.requestChan:
8488
}
8589

8690
handler.ctx = ch.ctx
@@ -100,23 +104,35 @@ func (handler *regularThread) afterRequest() {
100104
func handleRequestWithRegularPHPThreads(ch contextHolder) error {
101105
metrics.StartRequest()
102106

103-
select {
104-
case regularRequestChan <- ch:
105-
// a thread was available to handle the request immediately
106-
<-ch.frankenPHPContext.done
107-
metrics.StopRequest()
108-
109-
return nil
110-
default:
111-
// no thread was available
107+
runtime.Gosched()
108+
109+
if queuedRegularThreads.Load() == 0 {
110+
regularThreadMu.RLock()
111+
for _, thread := range regularThreads {
112+
select {
113+
case thread.requestChan <- ch:
114+
regularThreadMu.RUnlock()
115+
<-ch.frankenPHPContext.done
116+
metrics.StopRequest()
117+
118+
return nil
119+
default:
120+
// thread was not available
121+
}
122+
}
123+
regularThreadMu.RUnlock()
112124
}
113125

114126
// if no thread was available, mark the request as queued and fan it out to all threads
127+
queuedRegularThreads.Add(1)
115128
metrics.QueuedRequest()
129+
116130
for {
117131
select {
118132
case regularRequestChan <- ch:
133+
queuedRegularThreads.Add(-1)
119134
metrics.DequeuedRequest()
135+
120136
<-ch.frankenPHPContext.done
121137
metrics.StopRequest()
122138

@@ -125,7 +141,9 @@ func handleRequestWithRegularPHPThreads(ch contextHolder) error {
125141
// the request has triggered scaling, continue to wait for a thread
126142
case <-timeoutChan(maxWaitTime):
127143
// the request has timed out stalling
144+
queuedRegularThreads.Add(-1)
128145
metrics.DequeuedRequest()
146+
metrics.StopRequest()
129147

130148
ch.frankenPHPContext.reject(ErrMaxWaitTimeExceeded)
131149

worker.go

Lines changed: 24 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,10 @@ import (
66
"fmt"
77
"os"
88
"path/filepath"
9+
"runtime"
910
"strings"
1011
"sync"
12+
"sync/atomic"
1113
"time"
1214

1315
"github.com/dunglas/frankenphp/internal/fastabs"
@@ -28,6 +30,7 @@ type worker struct {
2830
maxConsecutiveFailures int
2931
onThreadReady func(int)
3032
onThreadShutdown func(int)
33+
queuedRequests atomic.Int32
3134
}
3235

3336
var (
@@ -253,24 +256,30 @@ func (worker *worker) isAtThreadLimit() bool {
253256
func (worker *worker) handleRequest(ch contextHolder) error {
254257
metrics.StartWorkerRequest(worker.name)
255258

256-
// dispatch requests to all worker threads in order
257-
worker.threadMutex.RLock()
258-
for _, thread := range worker.threads {
259-
select {
260-
case thread.requestChan <- ch:
261-
worker.threadMutex.RUnlock()
262-
<-ch.frankenPHPContext.done
263-
metrics.StopWorkerRequest(worker.name, time.Since(ch.frankenPHPContext.startedAt))
259+
runtime.Gosched()
264260

265-
return nil
266-
default:
267-
// thread is busy, continue
261+
if worker.queuedRequests.Load() == 0 {
262+
// dispatch requests to all worker threads in order
263+
worker.threadMutex.RLock()
264+
for _, thread := range worker.threads {
265+
select {
266+
case thread.requestChan <- ch:
267+
worker.threadMutex.RUnlock()
268+
<-ch.frankenPHPContext.done
269+
metrics.StopWorkerRequest(worker.name, time.Since(ch.frankenPHPContext.startedAt))
270+
271+
return nil
272+
default:
273+
// thread is busy, continue
274+
}
268275
}
276+
worker.threadMutex.RUnlock()
269277
}
270-
worker.threadMutex.RUnlock()
271278

272279
// if no thread was available, mark the request as queued and apply the scaling strategy
280+
worker.queuedRequests.Add(1)
273281
metrics.QueuedWorkerRequest(worker.name)
282+
274283
for {
275284
workerScaleChan := scaleChan
276285
if worker.isAtThreadLimit() {
@@ -279,6 +288,7 @@ func (worker *worker) handleRequest(ch contextHolder) error {
279288

280289
select {
281290
case worker.requestChan <- ch:
291+
worker.queuedRequests.Add(-1)
282292
metrics.DequeuedWorkerRequest(worker.name)
283293
<-ch.frankenPHPContext.done
284294
metrics.StopWorkerRequest(worker.name, time.Since(ch.frankenPHPContext.startedAt))
@@ -288,7 +298,9 @@ func (worker *worker) handleRequest(ch contextHolder) error {
288298
// the request has triggered scaling, continue to wait for a thread
289299
case <-timeoutChan(maxWaitTime):
290300
// the request has timed out stalling
301+
worker.queuedRequests.Add(-1)
291302
metrics.DequeuedWorkerRequest(worker.name)
303+
metrics.StopWorkerRequest(worker.name, time.Since(ch.frankenPHPContext.startedAt))
292304

293305
ch.frankenPHPContext.reject(ErrMaxWaitTimeExceeded)
294306

0 commit comments

Comments
 (0)