Skip to content

Commit 2f7b987

Browse files
authored
feat: dequeue worker request on timeout (#1540)
1 parent 1d74b2c commit 2f7b987

3 files changed

Lines changed: 73 additions & 1 deletion

File tree

caddy/caddy_test.go

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -967,6 +967,74 @@ func TestMaxWaitTime(t *testing.T) {
967967
require.True(t, success.Load(), "At least one request should have failed with a 504 Gateway Timeout status")
968968
}
969969

970+
func TestMaxWaitTimeWorker(t *testing.T) {
971+
tester := caddytest.NewTester(t)
972+
tester.InitServer(`
973+
{
974+
skip_install_trust
975+
admin localhost:2999
976+
http_port `+testPort+`
977+
metrics
978+
979+
frankenphp {
980+
num_threads 2
981+
max_wait_time 1ns
982+
worker {
983+
num 1
984+
name service
985+
file ../testdata/sleep.php
986+
}
987+
}
988+
}
989+
990+
localhost:`+testPort+` {
991+
route {
992+
root ../testdata
993+
php
994+
}
995+
}
996+
`, "caddyfile")
997+
998+
// send 10 requests simultaneously, at least one request should be stalled longer than 1ns
999+
// since we only have 1 thread, this will cause a 504 Gateway Timeout
1000+
wg := sync.WaitGroup{}
1001+
success := atomic.Bool{}
1002+
wg.Add(10)
1003+
for i := 0; i < 10; i++ {
1004+
go func() {
1005+
statusCode := getStatusCode("http://localhost:"+testPort+"/sleep.php?sleep=10000&iteration=1", t)
1006+
if statusCode == http.StatusGatewayTimeout {
1007+
success.Store(true)
1008+
}
1009+
wg.Done()
1010+
}()
1011+
}
1012+
wg.Wait()
1013+
require.True(t, success.Load(), "At least one request should have failed with a 504 Gateway Timeout status")
1014+
1015+
// Fetch metrics
1016+
resp, err := http.Get("http://localhost:2999/metrics")
1017+
require.NoError(t, err, "failed to fetch metrics")
1018+
defer resp.Body.Close()
1019+
1020+
// Read and parse metrics
1021+
metrics := new(bytes.Buffer)
1022+
_, err = metrics.ReadFrom(resp.Body)
1023+
1024+
expectedMetrics := `
1025+
# TYPE frankenphp_worker_queue_depth gauge
1026+
frankenphp_worker_queue_depth{worker="service"} 0
1027+
`
1028+
1029+
ctx := caddy.ActiveContext()
1030+
require.NoError(t,
1031+
testutil.GatherAndCompare(
1032+
ctx.GetMetricsRegistry(),
1033+
strings.NewReader(expectedMetrics),
1034+
"frankenphp_worker_queue_depth",
1035+
))
1036+
}
1037+
9701038
func getStatusCode(url string, t *testing.T) int {
9711039
req, err := http.NewRequest("GET", url, nil)
9721040
require.NoError(t, err)

context.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -152,7 +152,10 @@ func (fc *frankenPHPContext) reject(statusCode int, message string) {
152152
if rw != nil {
153153
rw.WriteHeader(statusCode)
154154
_, _ = rw.Write([]byte(message))
155-
rw.(http.Flusher).Flush()
155+
156+
if f, ok := rw.(http.Flusher); ok {
157+
f.Flush()
158+
}
156159
}
157160

158161
fc.closeContext()

worker.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -225,6 +225,7 @@ func (worker *worker) handleRequest(fc *frankenPHPContext) {
225225
case scaleChan <- fc:
226226
// the request has triggered scaling, continue to wait for a thread
227227
case <-timeoutChan(maxWaitTime):
228+
metrics.DequeuedWorkerRequest(worker.name)
228229
// the request has timed out stalling
229230
fc.reject(504, "Gateway Timeout")
230231
return

0 commit comments

Comments
 (0)