diff --git a/caddy/caddy.go b/caddy/caddy.go index 1e5cc8da27..ebcde3fcaf 100644 --- a/caddy/caddy.go +++ b/caddy/caddy.go @@ -11,6 +11,7 @@ import ( "path/filepath" "strconv" "strings" + "time" "github.com/dunglas/frankenphp/internal/fastabs" @@ -63,6 +64,8 @@ type FrankenPHPApp struct { Workers []workerConfig `json:"workers,omitempty"` // Overwrites the default php ini configuration PhpIni map[string]string `json:"php_ini,omitempty"` + // The maximum amount of time a request may be stalled waiting for a thread + MaxWaitTime time.Duration `json:"max_wait_time,omitempty"` metrics frankenphp.Metrics logger *zap.Logger @@ -93,6 +96,7 @@ func (f *FrankenPHPApp) Start() error { frankenphp.WithLogger(f.logger), frankenphp.WithMetrics(f.metrics), frankenphp.WithPhpIni(f.PhpIni), + frankenphp.WithMaxWaitTime(f.MaxWaitTime), } for _, w := range f.Workers { opts = append(opts, frankenphp.WithWorkers(repl.ReplaceKnown(w.FileName, ""), w.Num, w.Env, w.Watch)) @@ -119,6 +123,7 @@ func (f *FrankenPHPApp) Stop() error { // reset configuration so it doesn't bleed into later tests f.Workers = nil f.NumThreads = 0 + f.MaxWaitTime = 0 return nil } @@ -156,6 +161,17 @@ func (f *FrankenPHPApp) UnmarshalCaddyfile(d *caddyfile.Dispenser) error { } f.MaxThreads = int(v) + case "max_wait_time": + if !d.NextArg() { + return d.ArgErr() + } + + v, err := time.ParseDuration(d.Val()) + if err != nil { + return errors.New("max_wait_time must be a valid duration (example: 10s)") + } + + f.MaxWaitTime = v case "php_ini": parseIniLine := func(d *caddyfile.Dispenser) error { key := d.Val() @@ -266,7 +282,7 @@ func (f *FrankenPHPApp) UnmarshalCaddyfile(d *caddyfile.Dispenser) error { f.Workers = append(f.Workers, wc) default: - allowedDirectives := "num_threads, max_threads, php_ini, worker" + allowedDirectives := "num_threads, max_threads, php_ini, worker, max_wait_time" return wrongSubDirectiveError("frankenphp", allowedDirectives, d.Val()) } } diff --git a/caddy/caddy_test.go b/caddy/caddy_test.go index 2ff8bf207b..efc4241dcb 100644 --- a/caddy/caddy_test.go +++ b/caddy/caddy_test.go @@ -8,6 +8,7 @@ import ( "path/filepath" "strings" "sync" + "sync/atomic" "testing" "github.com/dunglas/frankenphp" @@ -748,3 +749,53 @@ func TestOsEnv(t *testing.T) { "ENV1=value1,ENV2=value2", ) } + +func TestMaxWaitTime(t *testing.T) { + tester := caddytest.NewTester(t) + tester.InitServer(` + { + skip_install_trust + admin localhost:2999 + http_port `+testPort+` + + frankenphp { + num_threads 1 + max_wait_time 1ns + } + } + + localhost:`+testPort+` { + route { + root ../testdata + php + } + } + `, "caddyfile") + + // send 10 requests simultaneously, at least one request should be stalled longer than 1ns + // since we only have 1 thread, this will cause a 504 Gateway Timeout + wg := sync.WaitGroup{} + success := atomic.Bool{} + wg.Add(10) + for i := 0; i < 10; i++ { + go func() { + statusCode := getStatusCode("http://localhost:"+testPort+"/sleep.php?sleep=10", t) + if statusCode == http.StatusGatewayTimeout { + success.Store(true) + } + wg.Done() + }() + } + wg.Wait() + + require.True(t, success.Load(), "At least one request should have failed with a 504 Gateway Timeout status") +} + +func getStatusCode(url string, t *testing.T) int { + req, err := http.NewRequest("GET", url, nil) + require.NoError(t, err) + resp, err := http.DefaultClient.Do(req) + require.NoError(t, err) + defer resp.Body.Close() + return resp.StatusCode +} diff --git a/frankenphp.go b/frankenphp.go index 48f71be381..a2e7b18e8e 100644 --- a/frankenphp.go +++ b/frankenphp.go @@ -41,6 +41,7 @@ import ( "strings" "sync" "syscall" + "time" "unsafe" "go.uber.org/zap" @@ -69,6 +70,8 @@ var ( logger *zap.Logger metrics Metrics = nullMetrics{} + + maxWaitTime time.Duration ) type syslogLevel int @@ -221,6 +224,8 @@ func Init(options ...Option) error { metrics = opt.metrics } + maxWaitTime = opt.maxWaitTime + totalThreadCount, workerThreadCount, maxThreadCount, err := calculateMaxThreads(opt) if err != nil { return err @@ -623,3 +628,11 @@ func executePHPFunction(functionName string) bool { return C.frankenphp_execute_php_function(cFunctionName) == 1 } + +func timeoutChan(timeout time.Duration) <-chan time.Time { + if timeout == 0 { + return nil + } + + return time.After(timeout) +} diff --git a/frankenphp_test.go b/frankenphp_test.go index 731fa32e85..30fc8323d2 100644 --- a/frankenphp_test.go +++ b/frankenphp_test.go @@ -672,11 +672,12 @@ func TestFailingWorker(t *testing.T) { } func TestEnv(t *testing.T) { - testEnv(t, &testOptions{nbParallelRequests:1}) + testEnv(t, &testOptions{nbParallelRequests: 1}) } func TestEnvWorker(t *testing.T) { - testEnv(t, &testOptions{nbParallelRequests:1, workerScript: "env/test-env.php"}) + testEnv(t, &testOptions{nbParallelRequests: 1, workerScript: "env/test-env.php"}) } + // testEnv cannot be run in parallel due to https://github.com/golang/go/issues/63567 func testEnv(t *testing.T, opts *testOptions) { assert.NoError(t, os.Setenv("EMPTY", "")) diff --git a/options.go b/options.go index 6d1a5ebd21..c3821205f1 100644 --- a/options.go +++ b/options.go @@ -1,6 +1,8 @@ package frankenphp import ( + "time" + "go.uber.org/zap" ) @@ -11,12 +13,13 @@ type Option func(h *opt) error // // If you change this, also update the Caddy module and the documentation. type opt struct { - numThreads int - maxThreads int - workers []workerOpt - logger *zap.Logger - metrics Metrics - phpIni map[string]string + numThreads int + maxThreads int + workers []workerOpt + logger *zap.Logger + metrics Metrics + phpIni map[string]string + maxWaitTime time.Duration } type workerOpt struct { @@ -76,3 +79,12 @@ func WithPhpIni(overrides map[string]string) Option { return nil } } + +// WithMaxWaitTime configures the max time a request may be stalled waiting for a thread. +func WithMaxWaitTime(maxWaitTime time.Duration) Option { + return func(o *opt) error { + o.maxWaitTime = maxWaitTime + + return nil + } +} diff --git a/threadregular.go b/threadregular.go index 19588d1a7c..ac119208a3 100644 --- a/threadregular.go +++ b/threadregular.go @@ -116,6 +116,10 @@ func handleRequestWithRegularPHPThreads(fc *frankenPHPContext) { return case scaleChan <- fc: // the request has triggered scaling, continue to wait for a thread + case <-timeoutChan(maxWaitTime): + // the request has timed out stalling + fc.reject(504, "Gateway Timeout") + return } } } diff --git a/worker.go b/worker.go index 598be8c630..aff577c34c 100644 --- a/worker.go +++ b/worker.go @@ -198,6 +198,10 @@ func (worker *worker) handleRequest(fc *frankenPHPContext) { return case scaleChan <- fc: // the request has triggered scaling, continue to wait for a thread + case <-timeoutChan(maxWaitTime): + // the request has timed out stalling + fc.reject(504, "Gateway Timeout") + return } } }