diff --git a/caddy/caddy_test.go b/caddy/caddy_test.go index 41a5daa4ef..1bf6fc066b 100644 --- a/caddy/caddy_test.go +++ b/caddy/caddy_test.go @@ -967,6 +967,74 @@ func TestMaxWaitTime(t *testing.T) { require.True(t, success.Load(), "At least one request should have failed with a 504 Gateway Timeout status") } +func TestMaxWaitTimeWorker(t *testing.T) { + tester := caddytest.NewTester(t) + tester.InitServer(` + { + skip_install_trust + admin localhost:2999 + http_port `+testPort+` + metrics + + frankenphp { + num_threads 2 + max_wait_time 1ns + worker { + num 1 + name service + file ../testdata/sleep.php + } + } + } + + localhost:`+testPort+` { + route { + root ../testdata + php + } + } + `, "caddyfile") + + // send 10 requests simultaneously, at least one request should be stalled longer than 1ns + // since we only have 1 thread, this will cause a 504 Gateway Timeout + wg := sync.WaitGroup{} + success := atomic.Bool{} + wg.Add(10) + for i := 0; i < 10; i++ { + go func() { + statusCode := getStatusCode("http://localhost:"+testPort+"/sleep.php?sleep=10000&iteration=1", t) + if statusCode == http.StatusGatewayTimeout { + success.Store(true) + } + wg.Done() + }() + } + wg.Wait() + require.True(t, success.Load(), "At least one request should have failed with a 504 Gateway Timeout status") + + // Fetch metrics + resp, err := http.Get("http://localhost:2999/metrics") + require.NoError(t, err, "failed to fetch metrics") + defer resp.Body.Close() + + // Read and parse metrics + metrics := new(bytes.Buffer) + _, err = metrics.ReadFrom(resp.Body) + + expectedMetrics := ` + # TYPE frankenphp_worker_queue_depth gauge + frankenphp_worker_queue_depth{worker="service"} 0 + ` + + ctx := caddy.ActiveContext() + require.NoError(t, + testutil.GatherAndCompare( + ctx.GetMetricsRegistry(), + strings.NewReader(expectedMetrics), + "frankenphp_worker_queue_depth", + )) +} + func getStatusCode(url string, t *testing.T) int { req, err := http.NewRequest("GET", url, nil) require.NoError(t, err) diff --git a/context.go b/context.go index 0336e07223..822a5ff5c6 100644 --- a/context.go +++ b/context.go @@ -152,7 +152,10 @@ func (fc *frankenPHPContext) reject(statusCode int, message string) { if rw != nil { rw.WriteHeader(statusCode) _, _ = rw.Write([]byte(message)) - rw.(http.Flusher).Flush() + + if f, ok := rw.(http.Flusher); ok { + f.Flush() + } } fc.closeContext() diff --git a/worker.go b/worker.go index 22d904a2f9..3c9455b775 100644 --- a/worker.go +++ b/worker.go @@ -225,6 +225,7 @@ func (worker *worker) handleRequest(fc *frankenPHPContext) { case scaleChan <- fc: // the request has triggered scaling, continue to wait for a thread case <-timeoutChan(maxWaitTime): + metrics.DequeuedWorkerRequest(worker.name) // the request has timed out stalling fc.reject(504, "Gateway Timeout") return