Skip to content

Commit 0b2d3c9

Browse files
feat: per worker max threads (#1962)
* adds worker max_threads * Adds tests for all calculation cases. * Adds max_threads limitation to test. * Removes the test sleep. * Adds max_threads to error message. * correctly uses continue. * Fixes logic with only worker max_threads set. * Adjust comments. * Removes unnecessary check. * Fixes comment. * suggestions by @dunlgas. * copilot suggestions. * Renames logger. * review --------- Co-authored-by: Kévin Dunglas <kevin@dunglas.fr>
1 parent 75a48e8 commit 0b2d3c9

8 files changed

Lines changed: 108 additions & 9 deletions

File tree

caddy/admin_test.go

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,10 @@ func TestAutoScaleWorkerThreads(t *testing.T) {
9494
frankenphp {
9595
max_threads 10
9696
num_threads 2
97-
worker ../testdata/sleep.php 1
97+
worker ../testdata/sleep.php {
98+
num 1
99+
max_threads 3
100+
}
98101
}
99102
}
100103
@@ -128,8 +131,8 @@ func TestAutoScaleWorkerThreads(t *testing.T) {
128131
}
129132
}
130133

131-
// assert that there are now more threads than before
132-
assert.NotEqual(t, amountOfThreads, 2)
134+
assert.NotEqual(t, amountOfThreads, 2, "at least one thread should have been auto-scaled")
135+
assert.LessOrEqual(t, amountOfThreads, 4, "at most 3 max_threads + 1 regular thread should be present")
133136
}
134137

135138
// Note this test requires at least 2x40MB available memory for the process

caddy/app.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,7 @@ func (f *FrankenPHPApp) Start() error {
149149
frankenphp.WithWorkerEnv(w.Env),
150150
frankenphp.WithWorkerWatchMode(w.Watch),
151151
frankenphp.WithWorkerMaxFailures(w.MaxConsecutiveFailures),
152+
frankenphp.WithWorkerMaxThreads(w.MaxThreads),
152153
}
153154

154155
opts = append(opts, frankenphp.WithWorkers(w.Name, repl.ReplaceKnown(w.FileName, ""), w.Num, workerOpts...))

caddy/workerconfig.go

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@ type workerConfig struct {
2828
FileName string `json:"file_name,omitempty"`
2929
// Num sets the number of workers to start.
3030
Num int `json:"num,omitempty"`
31+
// MaxThreads sets the maximum number of threads for this worker.
32+
MaxThreads int `json:"max_threads,omitempty"`
3133
// Env sets an extra environment variable to the given value. Can be specified more than once for multiple environment variables.
3234
Env map[string]string `json:"env,omitempty"`
3335
// Directories to watch for file changes
@@ -85,6 +87,17 @@ func parseWorkerConfig(d *caddyfile.Dispenser) (workerConfig, error) {
8587
}
8688

8789
wc.Num = int(v)
90+
case "max_threads":
91+
if !d.NextArg() {
92+
return wc, d.ArgErr()
93+
}
94+
95+
v, err := strconv.ParseUint(d.Val(), 10, 32)
96+
if err != nil {
97+
return wc, d.WrapErr(err)
98+
}
99+
100+
wc.MaxThreads = int(v)
88101
case "env":
89102
args := d.RemainingArgs()
90103
if len(args) != 2 {
@@ -125,7 +138,7 @@ func parseWorkerConfig(d *caddyfile.Dispenser) (workerConfig, error) {
125138

126139
wc.MaxConsecutiveFailures = v
127140
default:
128-
return wc, wrongSubDirectiveError("worker", "name, file, num, env, watch, match, max_consecutive_failures", v)
141+
return wc, wrongSubDirectiveError("worker", "name, file, num, env, watch, match, max_consecutive_failures, max_threads", v)
129142
}
130143
}
131144

frankenphp.go

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,7 @@ func Config() PHPConfig {
155155

156156
func calculateMaxThreads(opt *opt) (numWorkers int, _ error) {
157157
maxProcs := runtime.GOMAXPROCS(0) * 2
158+
maxThreadsFromWorkers := 0
158159

159160
for i, w := range opt.workers {
160161
if w.num <= 0 {
@@ -164,12 +165,34 @@ func calculateMaxThreads(opt *opt) (numWorkers int, _ error) {
164165
metrics.TotalWorkers(w.name, w.num)
165166

166167
numWorkers += opt.workers[i].num
168+
169+
if w.maxThreads > 0 {
170+
if w.maxThreads < w.num {
171+
return 0, fmt.Errorf("worker max_threads (%d) must be greater or equal to worker num (%d) (%q)", w.maxThreads, w.num, w.fileName)
172+
}
173+
174+
if w.maxThreads > opt.maxThreads && opt.maxThreads > 0 {
175+
return 0, fmt.Errorf("worker max_threads (%d) cannot be greater than total max_threads (%d) (%q)", w.maxThreads, opt.maxThreads, w.fileName)
176+
}
177+
178+
maxThreadsFromWorkers += w.maxThreads - w.num
179+
}
167180
}
168181

169182
numThreadsIsSet := opt.numThreads > 0
170183
maxThreadsIsSet := opt.maxThreads != 0
171184
maxThreadsIsAuto := opt.maxThreads < 0 // maxthreads < 0 signifies auto mode (see phpmaintread.go)
172185

186+
// if max_threads is only defined in workers, scale up to the sum of all worker max_threads
187+
if !maxThreadsIsSet && maxThreadsFromWorkers > 0 {
188+
maxThreadsIsSet = true
189+
if numThreadsIsSet {
190+
opt.maxThreads = opt.numThreads + maxThreadsFromWorkers
191+
} else {
192+
opt.maxThreads = numWorkers + 1 + maxThreadsFromWorkers
193+
}
194+
}
195+
173196
if numThreadsIsSet && !maxThreadsIsSet {
174197
opt.maxThreads = opt.numThreads
175198
if opt.numThreads <= numWorkers {
@@ -188,7 +211,7 @@ func calculateMaxThreads(opt *opt) (numWorkers int, _ error) {
188211
return numWorkers, nil
189212
}
190213

191-
if !numThreadsIsSet {
214+
if !maxThreadsIsSet && !numThreadsIsSet {
192215
if numWorkers >= maxProcs {
193216
// Start at least as many threads as workers, and keep a free thread to handle requests in non-worker mode
194217
opt.numThreads = numWorkers + 1

options.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ type workerOpt struct {
3434
name string
3535
fileName string
3636
num int
37+
maxThreads int
3738
env PreparedEnv
3839
watch []string
3940
maxConsecutiveFailures int
@@ -159,6 +160,15 @@ func WithWorkerEnv(env map[string]string) WorkerOption {
159160
}
160161
}
161162

163+
// WithWorkerMaxThreads sets the max number of threads for this specific worker
164+
func WithWorkerMaxThreads(num int) WorkerOption {
165+
return func(w *workerOpt) error {
166+
w.maxThreads = num
167+
168+
return nil
169+
}
170+
}
171+
162172
// WithWorkerWatchMode sets directories to watch for file changes
163173
func WithWorkerWatchMode(watch []string) WorkerOption {
164174
return func(w *workerOpt) error {

phpmainthread_test.go

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -284,23 +284,42 @@ func TestCorrectThreadCalculation(t *testing.T) {
284284
testThreadCalculation(t, 2, -1, &opt{maxThreads: -1, workers: oneWorkerThread})
285285
testThreadCalculation(t, 2, -1, &opt{numThreads: 2, maxThreads: -1})
286286

287+
// max_threads should be thread minimum + sum of worker max_threads
288+
testThreadCalculation(t, 2, 6, &opt{workers: []workerOpt{{num: 1, maxThreads: 5}}})
289+
testThreadCalculation(t, 6, 9, &opt{workers: []workerOpt{{num: 1, maxThreads: 4}, {num: 4, maxThreads: 4}}})
290+
testThreadCalculation(t, 10, 14, &opt{numThreads: 10, workers: []workerOpt{{num: 1, maxThreads: 4}, {num: 3, maxThreads: 4}}})
291+
292+
// max_threads should remain equal to overall max_threads
293+
testThreadCalculation(t, 2, 5, &opt{maxThreads: 5, workers: []workerOpt{{num: 1, maxThreads: 3}}})
294+
testThreadCalculation(t, 3, 5, &opt{maxThreads: 5, workers: []workerOpt{{num: 1, maxThreads: 4}, {num: 1, maxThreads: 4}}})
295+
287296
// not enough num threads
288297
testThreadCalculationError(t, &opt{numThreads: 1, workers: oneWorkerThread})
289298
testThreadCalculationError(t, &opt{numThreads: 1, maxThreads: 1, workers: oneWorkerThread})
290299

291300
// not enough max_threads
292301
testThreadCalculationError(t, &opt{numThreads: 2, maxThreads: 1})
293302
testThreadCalculationError(t, &opt{maxThreads: 1, workers: oneWorkerThread})
303+
304+
// worker max_threads is bigger than overall max_threads
305+
testThreadCalculationError(t, &opt{maxThreads: 5, workers: []workerOpt{{num: 1, maxThreads: 10}}})
306+
307+
// worker max_threads is smaller than num_threads
308+
testThreadCalculationError(t, &opt{workers: []workerOpt{{num: 3, maxThreads: 2}}})
294309
}
295310

296311
func testThreadCalculation(t *testing.T, expectedNumThreads int, expectedMaxThreads int, o *opt) {
312+
t.Helper()
313+
297314
_, err := calculateMaxThreads(o)
298315
assert.NoError(t, err, "no error should be returned")
299316
assert.Equal(t, expectedNumThreads, o.numThreads, "num_threads must be correct")
300317
assert.Equal(t, expectedMaxThreads, o.maxThreads, "max_threads must be correct")
301318
}
302319

303320
func testThreadCalculationError(t *testing.T, o *opt) {
321+
t.Helper()
322+
304323
_, err := calculateMaxThreads(o)
305324
assert.Error(t, err, "configuration must error")
306325
}

scaling.go

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -171,11 +171,21 @@ func startUpscalingThreads(maxScaledThreads int, scale chan *frankenPHPContext,
171171
}
172172

173173
// if the request has been stalled long enough, scale
174-
if fc.worker != nil {
175-
scaleWorkerThread(fc.worker)
176-
} else {
174+
if fc.worker == nil {
177175
scaleRegularThread()
176+
continue
178177
}
178+
179+
// check for max worker threads here again in case requests overflowed while waiting
180+
if fc.worker.isAtThreadLimit() {
181+
if globalLogger.Enabled(globalCtx, slog.LevelInfo) {
182+
globalLogger.LogAttrs(globalCtx, slog.LevelInfo, "cannot scale worker thread, max threads reached for worker", slog.String("worker", fc.worker.name))
183+
}
184+
185+
continue
186+
}
187+
188+
scaleWorkerThread(fc.worker)
179189
case <-done:
180190
return
181191
}

worker.go

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ type worker struct {
1818
name string
1919
fileName string
2020
num int
21+
maxThreads int
2122
env PreparedEnv
2223
requestChan chan contextHolder
2324
threads []*phpThread
@@ -127,6 +128,7 @@ func newWorker(o workerOpt) (*worker, error) {
127128
name: o.name,
128129
fileName: absFileName,
129130
num: o.num,
131+
maxThreads: o.maxThreads,
130132
env: o.env,
131133
requestChan: make(chan contextHolder),
132134
threads: make([]*phpThread, 0, o.num),
@@ -228,6 +230,19 @@ func (worker *worker) countThreads() int {
228230
return l
229231
}
230232

233+
// check if max_threads has been reached
234+
func (worker *worker) isAtThreadLimit() bool {
235+
if worker.maxThreads <= 0 {
236+
return false
237+
}
238+
239+
worker.threadMutex.RLock()
240+
atMaxThreads := len(worker.threads) >= worker.maxThreads
241+
worker.threadMutex.RUnlock()
242+
243+
return atMaxThreads
244+
}
245+
231246
func (worker *worker) handleRequest(ch contextHolder) error {
232247
metrics.StartWorkerRequest(worker.name)
233248

@@ -250,14 +265,19 @@ func (worker *worker) handleRequest(ch contextHolder) error {
250265
// if no thread was available, mark the request as queued and apply the scaling strategy
251266
metrics.QueuedWorkerRequest(worker.name)
252267
for {
268+
workerScaleChan := scaleChan
269+
if worker.isAtThreadLimit() {
270+
workerScaleChan = nil // max_threads for this worker reached, do not attempt scaling
271+
}
272+
253273
select {
254274
case worker.requestChan <- ch:
255275
metrics.DequeuedWorkerRequest(worker.name)
256276
<-ch.frankenPHPContext.done
257277
metrics.StopWorkerRequest(worker.name, time.Since(ch.frankenPHPContext.startedAt))
258278

259279
return nil
260-
case scaleChan <- ch.frankenPHPContext:
280+
case workerScaleChan <- ch.frankenPHPContext:
261281
// the request has triggered scaling, continue to wait for a thread
262282
case <-timeoutChan(maxWaitTime):
263283
// the request has timed out stalling

0 commit comments

Comments
 (0)