Skip to content

Commit a21d3f4

Browse files
committed
actually allow multiple workers per script filename
1 parent a4016df commit a21d3f4

6 files changed

Lines changed: 71 additions & 31 deletions

File tree

caddy/caddy.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"encoding/json"
1010
"errors"
1111
"fmt"
12+
"github.com/davecgh/go-spew/spew"
1213
"net/http"
1314
"path/filepath"
1415
"strconv"
@@ -121,11 +122,13 @@ func (f *FrankenPHPApp) Start() error {
121122
// Add workers from FrankenPHPApp configuration
122123
for _, w := range f.Workers {
123124
opts = append(opts, frankenphp.WithWorkers(w.Name, repl.ReplaceKnown(w.FileName, ""), w.Num, w.Env, w.Watch, w.ModuleID))
125+
caddy.Log().Warn(fmt.Sprintf("Starting FrankenPHP with worker: %s", spew.Sdump(w)))
124126
}
125127

126128
// Add workers from FrankenPHPModule configurations
127129
for _, w := range sharedState.Workers {
128130
opts = append(opts, frankenphp.WithWorkers(w.Name, repl.ReplaceKnown(w.FileName, ""), w.Num, w.Env, w.Watch, w.ModuleID))
131+
caddy.Log().Warn(fmt.Sprintf("Starting FrankenPHP with worker: %s", spew.Sdump(w)))
129132
}
130133

131134
frankenphp.Shutdown()
@@ -444,8 +447,8 @@ func (f *FrankenPHPModule) Provision(ctx caddy.Context) error {
444447
hash := sha256.Sum256(data)
445448
f.ModuleID = binary.LittleEndian.Uint64(hash[:8])
446449

447-
for _, w := range f.Workers {
448-
w.ModuleID = f.ModuleID
450+
for i := range f.Workers {
451+
f.Workers[i].ModuleID = f.ModuleID
449452
}
450453
sharedState.Workers = append(sharedState.Workers, f.Workers...)
451454
f.logger.Warn(fmt.Sprintf("Initialized FrankenPHP module %d with %d workers", f.ModuleID, len(f.Workers)))

frankenphp.go

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ import (
3232
"bytes"
3333
"errors"
3434
"fmt"
35+
"github.com/davecgh/go-spew/spew"
3536
"io"
3637
"net/http"
3738
"os"
@@ -405,11 +406,23 @@ func ServeHTTP(responseWriter http.ResponseWriter, request *http.Request) error
405406
}
406407

407408
// Detect if a worker is available to handle this request
408-
if worker, ok := workers[fc.scriptFilename]; ok {
409-
// can handle with a global worker, or a module worker from the matching module
410-
if worker.moduleID == 0 || worker.moduleID == fc.moduleID {
411-
worker.handleRequest(fc)
412-
return nil
409+
if workersList, ok := workers[fc.scriptFilename]; ok {
410+
workersString := ""
411+
for _, worker := range workersList {
412+
envString := ""
413+
for k, v := range worker.env {
414+
envString += k + "=" + v + ","
415+
}
416+
workersString += strconv.FormatUint(worker.moduleID, 10) + ":" + envString + "\n"
417+
}
418+
fc.logger.Info(fmt.Sprintf("Available workers: %s", workersString))
419+
// Look for a worker with matching moduleID or a global worker (moduleID == 0)
420+
for _, worker := range workersList {
421+
if worker.moduleID == 0 || worker.moduleID == fc.moduleID {
422+
fc.logger.Warn(fmt.Sprintf("Handled %d by worker %d with env: %s", fc.moduleID, worker.moduleID, spew.Sdump(worker.env)))
423+
worker.handleRequest(fc)
424+
return nil
425+
}
413426
}
414427
}
415428

phpmainthread_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -179,7 +179,7 @@ func TestFinishBootingAWorkerScript(t *testing.T) {
179179

180180
func getDummyWorker(fileName string) *worker {
181181
if workers == nil {
182-
workers = make(map[string]*worker)
182+
workers = make(map[string][]*worker)
183183
}
184184
worker, _ := newWorker(workerOpt{
185185
fileName: testDataPath + "/" + fileName,
@@ -211,9 +211,9 @@ func allPossibleTransitions(worker1Path string, worker2Path string) []func(*phpT
211211
thread.boot()
212212
}
213213
},
214-
func(thread *phpThread) { convertToWorkerThread(thread, workers[worker1Path]) },
214+
func(thread *phpThread) { convertToWorkerThread(thread, workers[worker1Path][0]) },
215215
convertToInactiveThread,
216-
func(thread *phpThread) { convertToWorkerThread(thread, workers[worker2Path]) },
216+
func(thread *phpThread) { convertToWorkerThread(thread, workers[worker2Path][0]) },
217217
convertToInactiveThread,
218218
}
219219
}

scaling.go

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -160,8 +160,19 @@ func startUpscalingThreads(maxScaledThreads int, scale chan *frankenPHPContext,
160160
}
161161

162162
// if the request has been stalled long enough, scale
163-
if worker, ok := workers[fc.scriptFilename]; ok {
164-
scaleWorkerThread(worker)
163+
if workersList, ok := workers[fc.scriptFilename]; ok {
164+
// Look for a worker with matching moduleID or a global worker (moduleID == 0)
165+
workerFound := false
166+
for _, worker := range workersList {
167+
if worker.moduleID == 0 || worker.moduleID == fc.moduleID {
168+
scaleWorkerThread(worker)
169+
workerFound = true
170+
break
171+
}
172+
}
173+
if !workerFound {
174+
scaleRegularThread()
175+
}
165176
} else {
166177
scaleRegularThread()
167178
}

scaling_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ func TestScaleAWorkerThreadUpAndDown(t *testing.T) {
4343
autoScaledThread := phpThreads[2]
4444

4545
// scale up
46-
scaleWorkerThread(workers[workerPath])
46+
scaleWorkerThread(workers[workerPath][0])
4747
assert.Equal(t, stateReady, autoScaledThread.state.get())
4848

4949
// on down-scale, the thread will be marked as inactive

worker.go

Lines changed: 31 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ package frankenphp
44
import "C"
55
import (
66
"fmt"
7+
"sort"
78
"sync"
89
"time"
910

@@ -24,12 +25,12 @@ type worker struct {
2425
}
2526

2627
var (
27-
workers map[string]*worker
28+
workers map[string][]*worker
2829
watcherIsEnabled bool
2930
)
3031

3132
func initWorkers(opt []workerOpt) error {
32-
workers = make(map[string]*worker, len(opt))
33+
workers = make(map[string][]*worker, len(opt))
3334
workersReady := sync.WaitGroup{}
3435
directoriesToWatch := getDirectoriesToWatch(opt)
3536
watcherIsEnabled = len(directoriesToWatch) > 0
@@ -84,7 +85,17 @@ func newWorker(o workerOpt) (*worker, error) {
8485
requestChan: make(chan *frankenPHPContext),
8586
moduleID: o.moduleID,
8687
}
87-
workers[absFileName] = w
88+
89+
// Check if we already have workers for this filename
90+
if _, ok := workers[absFileName]; !ok {
91+
workers[absFileName] = make([]*worker, 0)
92+
}
93+
workers[absFileName] = append(workers[absFileName], w)
94+
95+
// Sort workers by descending moduleID, this way FrankenPHPApp::ServeHTTP will prefer a module-specific worker over a global one
96+
sort.Slice(workers[absFileName], func(i, j int) bool {
97+
return workers[absFileName][i].moduleID > workers[absFileName][j].moduleID
98+
})
8899

89100
return w, nil
90101
}
@@ -97,23 +108,25 @@ func DrainWorkers() {
97108
func drainWorkerThreads() []*phpThread {
98109
ready := sync.WaitGroup{}
99110
drainedThreads := make([]*phpThread, 0)
100-
for _, worker := range workers {
101-
worker.threadMutex.RLock()
102-
ready.Add(len(worker.threads))
103-
for _, thread := range worker.threads {
104-
if !thread.state.requestSafeStateChange(stateRestarting) {
105-
// no state change allowed == thread is shutting down
106-
// we'll proceed to restart all other threads anyways
107-
continue
111+
for _, workersList := range workers {
112+
for _, worker := range workersList {
113+
worker.threadMutex.RLock()
114+
ready.Add(len(worker.threads))
115+
for _, thread := range worker.threads {
116+
if !thread.state.requestSafeStateChange(stateRestarting) {
117+
// no state change allowed == thread is shutting down
118+
// we'll proceed to restart all other threads anyways
119+
continue
120+
}
121+
close(thread.drainChan)
122+
drainedThreads = append(drainedThreads, thread)
123+
go func(thread *phpThread) {
124+
thread.state.waitFor(stateYielding)
125+
ready.Done()
126+
}(thread)
108127
}
109-
close(thread.drainChan)
110-
drainedThreads = append(drainedThreads, thread)
111-
go func(thread *phpThread) {
112-
thread.state.waitFor(stateYielding)
113-
ready.Done()
114-
}(thread)
128+
worker.threadMutex.RUnlock()
115129
}
116-
worker.threadMutex.RUnlock()
117130
}
118131
ready.Wait()
119132

0 commit comments

Comments
 (0)