Skip to content

Commit 8e87d00

Browse files
committed
Starts separate opcache_reset request flow once all threads are stopped.
1 parent d1e28d5 commit 8e87d00

5 files changed

Lines changed: 73 additions & 69 deletions

File tree

frankenphp.c

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -343,11 +343,9 @@ PHP_FUNCTION(frankenphp_getenv) {
343343

344344
/* {{{ thread-safe opcache reset */
345345
PHP_FUNCTION(frankenphp_opcache_reset) {
346-
if (go_schedule_opcache_reset(thread_index)) {
347-
orig_opcache_reset(INTERNAL_FUNCTION_PARAM_PASSTHRU);
348-
}
346+
go_schedule_opcache_reset(thread_index);
349347

350-
RETVAL_FALSE;
348+
RETVAL_TRUE;
351349
} /* }}} */
352350

353351
/* {{{ Fetch all HTTP request headers */
@@ -1270,11 +1268,15 @@ int frankenphp_execute_script_cli(char *script, int argc, char **argv,
12701268
}
12711269

12721270
int frankenphp_reset_opcache(void) {
1271+
php_request_startup();
12731272
zend_function *opcache_reset =
12741273
zend_hash_str_find_ptr(CG(function_table), ZEND_STRL("opcache_reset"));
12751274
if (opcache_reset) {
1275+
((zend_internal_function *)opcache_reset)->handler = orig_opcache_reset;
12761276
zend_call_known_function(opcache_reset, NULL, NULL, NULL, 0, NULL, NULL);
1277+
((zend_internal_function *)opcache_reset)->handler = ZEND_FN(frankenphp_opcache_reset);
12771278
}
1279+
php_request_shutdown((void *)0);
12781280

12791281
return 0;
12801282
}

frankenphp.go

Lines changed: 33 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -59,10 +59,9 @@ var (
5959
contextKey = contextKeyStruct{}
6060
serverHeader = []string{"FrankenPHP"}
6161

62-
isRunning bool
63-
isOpcacheResetting atomic.Bool
64-
threadsAreRestarting atomic.Bool
65-
onServerShutdown []func()
62+
isRunning bool
63+
restartCounter atomic.Int32
64+
onServerShutdown []func()
6665

6766
// Set default values to make Shutdown() idempotent
6867
globalMu sync.Mutex
@@ -759,68 +758,49 @@ func go_is_context_done(threadIndex C.uintptr_t) C.bool {
759758
}
760759

761760
//export go_schedule_opcache_reset
762-
func go_schedule_opcache_reset(threadIndex C.uintptr_t) C.bool {
763-
if isOpcacheResetting.CompareAndSwap(false, true) {
764-
restartThreadsForOpcacheReset(phpThreads[threadIndex])
765-
return C.bool(true)
761+
func go_schedule_opcache_reset(threadIndex C.uintptr_t) {
762+
if restartCounter.CompareAndSwap(0, 1) {
763+
go restartThreadsForOpcacheReset()
766764
}
767-
768-
// always call the original opcache_reset if already restarting
769-
return C.bool(phpThreads[threadIndex].state.Is(state.Restarting))
770765
}
771766

772767
// restart all threads for an opcache_reset
773-
func restartThreadsForOpcacheReset(callingThread *phpThread) {
774-
if threadsAreRestarting.Load() {
775-
// ignore reloads while a restart is already ongoing
776-
return
777-
}
778-
768+
func restartThreadsForOpcacheReset() {
779769
// disallow scaling threads while restarting workers
780770
scalingMu.Lock()
781771
defer scalingMu.Unlock()
782772

783-
// drain workers
784-
globalLogger.Info("Restarting all PHP threads for opcache_reset")
785-
threadsToRestart := drainWorkerThreads()
786-
787-
// drain regular threads
788-
globalLogger.Info("Draining regular PHP threads for opcache_reset")
789-
wg := sync.WaitGroup{}
790-
for _, thread := range regularThreads {
791-
if thread.state.Is(state.Ready) {
792-
threadsToRestart = append(threadsToRestart, thread)
793-
thread.state.Set(state.Restarting)
794-
close(thread.drainChan)
795-
796-
wg.Go(func() {
797-
thread.state.WaitFor(state.Yielding)
798-
})
799-
}
773+
threadsToRestart := drainWorkerThreads(true)
774+
775+
for _, thread := range threadsToRestart {
776+
thread.drainChan = make(chan struct{})
777+
thread.state.Set(state.Ready)
800778
}
779+
}
801780

802-
wg.Done() // ignore the calling thread
803-
done := make(chan struct{})
804-
go func() {
805-
wg.Wait()
806-
close(done)
807-
}()
781+
func scheduleOpcacheReset(thread *phpThread) {
782+
restartCounter.Add(-1)
783+
if restartCounter.Load() != 1 {
784+
return // only the last restarting thread will trigger an actual opcache_reset
785+
}
786+
workerThread, ok := thread.handler.(*workerThread)
787+
fc, _ := newDummyContext("/opcache_reset")
788+
if ok && workerThread.worker != nil {
789+
workerThread.dummyFrankenPHPContext = fc
790+
defer func() { workerThread.dummyFrankenPHPContext = nil }()
791+
}
808792

809-
select {
810-
case <-done:
811-
// all other threads are drained
812-
case <-time.After(time.Second):
813-
// probably a deadlock, continue anyway and hope for the best
793+
regularThread, ok := thread.handler.(*regularThread)
794+
if ok {
795+
regularThread.contextHolder.frankenPHPContext = fc
796+
defer func() { regularThread.contextHolder.frankenPHPContext = nil }()
814797
}
815798

816-
go func() {
817-
callingThread.state.WaitFor(state.Yielding)
818-
for _, thread := range threadsToRestart {
819-
thread.drainChan = make(chan struct{})
820-
thread.state.Set(state.Ready)
821-
isOpcacheResetting.Store(false)
822-
}
823-
}()
799+
globalLogger.Info("resetting opcache in all threads")
800+
C.frankenphp_reset_opcache()
801+
802+
// all threads should have restarted now
803+
restartCounter.Store(0)
824804
}
825805

826806
// ExecuteScriptCLI executes the PHP script passed as parameter.

threadregular.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ func (handler *regularThread) beforeScriptExecution() string {
5050
return handler.waitForRequest()
5151
case state.Restarting:
5252
handler.state.Set(state.Yielding)
53+
scheduleOpcacheReset(handler.thread)
5354
handler.state.WaitFor(state.Ready, state.ShuttingDown)
5455
return handler.beforeScriptExecution()
5556

threadworker.go

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ func (handler *workerThread) beforeScriptExecution() string {
5151
handler.worker.onThreadShutdown(handler.thread.threadIndex)
5252
}
5353
handler.state.Set(state.Yielding)
54+
scheduleOpcacheReset(handler.thread)
5455
handler.state.WaitFor(state.Ready, state.ShuttingDown)
5556
return handler.beforeScriptExecution()
5657
case state.Ready, state.TransitionComplete:
@@ -226,12 +227,6 @@ func (handler *workerThread) waitForWorkerRequest() (bool, any) {
226227
globalLogger.LogAttrs(globalCtx, slog.LevelDebug, "shutting down", slog.String("worker", handler.worker.name), slog.Int("thread", handler.thread.threadIndex))
227228
}
228229

229-
// flush the opcache when restarting due to watcher or admin api
230-
// note: this is done right before frankenphp_handle_request() returns 'false'
231-
if handler.state.Is(state.Restarting) {
232-
C.frankenphp_reset_opcache()
233-
}
234-
235230
return false, nil
236231
case requestCH = <-handler.thread.requestChan:
237232
case requestCH = <-handler.worker.requestChan:

worker.go

Lines changed: 32 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -171,10 +171,10 @@ func newWorker(o workerOpt) (*worker, error) {
171171

172172
// EXPERIMENTAL: DrainWorkers finishes all worker scripts before a graceful shutdown
173173
func DrainWorkers() {
174-
_ = drainWorkerThreads()
174+
_ = drainWorkerThreads(false)
175175
}
176176

177-
func drainWorkerThreads() []*phpThread {
177+
func drainWorkerThreads(withRegularThreads bool) []*phpThread {
178178
var (
179179
ready sync.WaitGroup
180180
drainedThreads []*phpThread
@@ -192,7 +192,7 @@ func drainWorkerThreads() []*phpThread {
192192
// we'll proceed to restart all other threads anyway
193193
continue
194194
}
195-
195+
restartCounter.Add(1)
196196
close(thread.drainChan)
197197
drainedThreads = append(drainedThreads, thread)
198198

@@ -205,6 +205,31 @@ func drainWorkerThreads() []*phpThread {
205205
worker.threadMutex.RUnlock()
206206
}
207207

208+
if withRegularThreads {
209+
regularThreadMu.RLock()
210+
ready.Add(len(regularThreads))
211+
212+
for _, thread := range regularThreads {
213+
if !thread.state.RequestSafeStateChange(state.Restarting) {
214+
ready.Done()
215+
216+
// no state change allowed == thread is shutting down
217+
// we'll proceed to restart all other threads anyway
218+
continue
219+
}
220+
restartCounter.Add(1)
221+
close(thread.drainChan)
222+
drainedThreads = append(drainedThreads, thread)
223+
224+
go func(thread *phpThread) {
225+
thread.state.WaitFor(state.Yielding)
226+
ready.Done()
227+
}(thread)
228+
}
229+
230+
regularThreadMu.RUnlock()
231+
}
232+
208233
ready.Wait()
209234

210235
return drainedThreads
@@ -213,13 +238,14 @@ func drainWorkerThreads() []*phpThread {
213238
// RestartWorkers attempts to restart all workers gracefully
214239
// All workers must be restarted at the same time to prevent issues with opcache resetting.
215240
func RestartWorkers() {
216-
threadsAreRestarting.Store(true)
217-
defer threadsAreRestarting.Store(false)
241+
if !restartCounter.CompareAndSwap(0, 1) {
242+
return // another restart is already in progress
243+
}
218244
// disallow scaling threads while restarting workers
219245
scalingMu.Lock()
220246
defer scalingMu.Unlock()
221247

222-
threadsToRestart := drainWorkerThreads()
248+
threadsToRestart := drainWorkerThreads(false)
223249

224250
for _, thread := range threadsToRestart {
225251
thread.drainChan = make(chan struct{})

0 commit comments

Comments
 (0)