Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 17 additions & 1 deletion caddy/caddy.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"path/filepath"
"strconv"
"strings"
"time"

"github.com/dunglas/frankenphp/internal/fastabs"

Expand Down Expand Up @@ -63,6 +64,8 @@ type FrankenPHPApp struct {
Workers []workerConfig `json:"workers,omitempty"`
// Overwrites the default php ini configuration
PhpIni map[string]string `json:"php_ini,omitempty"`
// The maximum amount of time a request may be stalled waiting for a thread
MaxWaitTime time.Duration `json:"max_wait_time,omitempty"`

metrics frankenphp.Metrics
logger *zap.Logger
Expand Down Expand Up @@ -93,6 +96,7 @@ func (f *FrankenPHPApp) Start() error {
frankenphp.WithLogger(f.logger),
frankenphp.WithMetrics(f.metrics),
frankenphp.WithPhpIni(f.PhpIni),
frankenphp.WithMaxWaitTime(f.MaxWaitTime),
}
for _, w := range f.Workers {
opts = append(opts, frankenphp.WithWorkers(repl.ReplaceKnown(w.FileName, ""), w.Num, w.Env, w.Watch))
Expand All @@ -119,6 +123,7 @@ func (f *FrankenPHPApp) Stop() error {
// reset configuration so it doesn't bleed into later tests
f.Workers = nil
f.NumThreads = 0
f.MaxWaitTime = 0

return nil
}
Expand Down Expand Up @@ -156,6 +161,17 @@ func (f *FrankenPHPApp) UnmarshalCaddyfile(d *caddyfile.Dispenser) error {
}

f.MaxThreads = int(v)
case "max_wait_time":
if !d.NextArg() {
return d.ArgErr()
}

v, err := time.ParseDuration(d.Val())
if err != nil {
return errors.New("max_wait_time must be a valid duration (example: 10s)")
}

f.MaxWaitTime = v
case "php_ini":
parseIniLine := func(d *caddyfile.Dispenser) error {
key := d.Val()
Expand Down Expand Up @@ -266,7 +282,7 @@ func (f *FrankenPHPApp) UnmarshalCaddyfile(d *caddyfile.Dispenser) error {

f.Workers = append(f.Workers, wc)
default:
allowedDirectives := "num_threads, max_threads, php_ini, worker"
allowedDirectives := "num_threads, max_threads, php_ini, worker, max_wait_time"
return wrongSubDirectiveError("frankenphp", allowedDirectives, d.Val())
}
}
Expand Down
51 changes: 51 additions & 0 deletions caddy/caddy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"path/filepath"
"strings"
"sync"
"sync/atomic"
"testing"

"github.com/dunglas/frankenphp"
Expand Down Expand Up @@ -748,3 +749,53 @@ func TestOsEnv(t *testing.T) {
"ENV1=value1,ENV2=value2",
)
}

func TestMaxWaitTime(t *testing.T) {
tester := caddytest.NewTester(t)
tester.InitServer(`
{
skip_install_trust
admin localhost:2999
http_port `+testPort+`

frankenphp {
num_threads 1
max_wait_time 1ns
}
}

localhost:`+testPort+` {
route {
root ../testdata
php
}
}
`, "caddyfile")

// send 10 requests simultaneously, at least one request should be stalled longer than 1ns
// since we only have 1 thread, this will cause a 504 Gateway Timeout
wg := sync.WaitGroup{}
success := atomic.Bool{}
wg.Add(10)
for i := 0; i < 10; i++ {
go func() {
statusCode := getStatusCode("http://localhost:"+testPort+"/sleep.php?sleep=10", t)
if statusCode == http.StatusGatewayTimeout {
success.Store(true)
}
wg.Done()
}()
}
wg.Wait()

require.True(t, success.Load(), "At least one request should have failed with a 504 Gateway Timeout status")
}

func getStatusCode(url string, t *testing.T) int {
req, err := http.NewRequest("GET", url, nil)
require.NoError(t, err)
resp, err := http.DefaultClient.Do(req)
require.NoError(t, err)
defer resp.Body.Close()
return resp.StatusCode
}
13 changes: 13 additions & 0 deletions frankenphp.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
"strings"
"sync"
"syscall"
"time"
"unsafe"

"go.uber.org/zap"
Expand Down Expand Up @@ -69,6 +70,8 @@ var (
logger *zap.Logger

metrics Metrics = nullMetrics{}

maxWaitTime time.Duration
)

type syslogLevel int
Expand Down Expand Up @@ -221,6 +224,8 @@ func Init(options ...Option) error {
metrics = opt.metrics
}

maxWaitTime = opt.maxWaitTime

totalThreadCount, workerThreadCount, maxThreadCount, err := calculateMaxThreads(opt)
if err != nil {
return err
Expand Down Expand Up @@ -623,3 +628,11 @@ func executePHPFunction(functionName string) bool {

return C.frankenphp_execute_php_function(cFunctionName) == 1
}

func timeoutChan(timeout time.Duration) <-chan time.Time {
if timeout == 0 {
return nil
}

return time.After(timeout)
}
5 changes: 3 additions & 2 deletions frankenphp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -672,11 +672,12 @@ func TestFailingWorker(t *testing.T) {
}

func TestEnv(t *testing.T) {
testEnv(t, &testOptions{nbParallelRequests:1})
testEnv(t, &testOptions{nbParallelRequests: 1})
}
func TestEnvWorker(t *testing.T) {
testEnv(t, &testOptions{nbParallelRequests:1, workerScript: "env/test-env.php"})
testEnv(t, &testOptions{nbParallelRequests: 1, workerScript: "env/test-env.php"})
}

// testEnv cannot be run in parallel due to https://github.com/golang/go/issues/63567
func testEnv(t *testing.T, opts *testOptions) {
assert.NoError(t, os.Setenv("EMPTY", ""))
Expand Down
24 changes: 18 additions & 6 deletions options.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package frankenphp

import (
"time"

"go.uber.org/zap"
)

Expand All @@ -11,12 +13,13 @@ type Option func(h *opt) error
//
// If you change this, also update the Caddy module and the documentation.
type opt struct {
numThreads int
maxThreads int
workers []workerOpt
logger *zap.Logger
metrics Metrics
phpIni map[string]string
numThreads int
maxThreads int
workers []workerOpt
logger *zap.Logger
metrics Metrics
phpIni map[string]string
maxWaitTime time.Duration
}

type workerOpt struct {
Expand Down Expand Up @@ -76,3 +79,12 @@ func WithPhpIni(overrides map[string]string) Option {
return nil
}
}

// WithMaxWaitTime configures the max time a request may be stalled waiting for a thread.
func WithMaxWaitTime(maxWaitTime time.Duration) Option {
return func(o *opt) error {
o.maxWaitTime = maxWaitTime

return nil
}
}
4 changes: 4 additions & 0 deletions threadregular.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,10 @@ func handleRequestWithRegularPHPThreads(fc *frankenPHPContext) {
return
case scaleChan <- fc:
// the request has triggered scaling, continue to wait for a thread
case <-timeoutChan(maxWaitTime):
// the request has timed out stalling
fc.reject(504, "Gateway Timeout")
return
}
}
}
Expand Down
4 changes: 4 additions & 0 deletions worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,10 @@ func (worker *worker) handleRequest(fc *frankenPHPContext) {
return
case scaleChan <- fc:
// the request has triggered scaling, continue to wait for a thread
case <-timeoutChan(maxWaitTime):
// the request has timed out stalling
fc.reject(504, "Gateway Timeout")
return
}
}
}