From 615662e1ef6ff461b1d2d10ec70853e26065cc3f Mon Sep 17 00:00:00 2001 From: Alliballibaba Date: Sat, 11 Jan 2025 14:44:09 +0100 Subject: [PATCH] Adjusts timers and passes the main thread directly to scaling. --- frankenphp.go | 5 +++-- internal/cpu/cpu_fallback.go | 3 +-- internal/cpu/cpu_unix.go | 3 +-- internal/memory/memory_fallback.go | 2 -- phpmainthread.go | 6 +++--- phpmainthread_test.go | 11 +++++++---- scaling.go | 30 ++++++++++++++++++++---------- 7 files changed, 35 insertions(+), 25 deletions(-) diff --git a/frankenphp.go b/frankenphp.go index e8d93d7ce..e30db7b6b 100644 --- a/frankenphp.go +++ b/frankenphp.go @@ -333,7 +333,8 @@ func Init(options ...Option) error { logger.Warn(`ZTS is not enabled, only 1 thread will be available, recompile PHP using the "--enable-zts" configuration option or performance will be degraded`) } - if err := initPHPThreads(totalThreadCount, maxThreadCount, opt.phpIniOverrides); err != nil { + mainThread, err := initPHPThreads(totalThreadCount, maxThreadCount, opt.phpIniOverrides) + if err != nil { return err } @@ -348,7 +349,7 @@ func Init(options ...Option) error { return err } - initAutoScaling(mainThread.numThreads, mainThread.maxThreads) + initAutoScaling(mainThread) if c := logger.Check(zapcore.InfoLevel, "FrankenPHP started 🐘"); c != nil { c.Write(zap.String("php_version", Version().Version), zap.Int("num_threads", mainThread.numThreads), zap.Int("max_threads", mainThread.maxThreads)) diff --git a/internal/cpu/cpu_fallback.go b/internal/cpu/cpu_fallback.go index a8171e1b3..a0ec2890e 100644 --- a/internal/cpu/cpu_fallback.go +++ b/internal/cpu/cpu_fallback.go @@ -10,11 +10,10 @@ import ( // The fallback always determines that the CPU limits are not reached func ProbeCPUs(probeTime time.Duration, maxCPUUsage float64, abort chan struct{}) bool { - timer := time.NewTimer(probeTime) select { case <-abort: return false - case <-timer.C: + case <-time.After(probeTime): return true } } diff --git a/internal/cpu/cpu_unix.go b/internal/cpu/cpu_unix.go index 333809dc3..b1c0e2a09 100644 --- a/internal/cpu/cpu_unix.go +++ b/internal/cpu/cpu_unix.go @@ -23,11 +23,10 @@ func ProbeCPUs(probeTime time.Duration, maxCPUUsage float64, abort chan struct{} C.clock_gettime(C.CLOCK_MONOTONIC, &start) C.clock_gettime(C.CLOCK_PROCESS_CPUTIME_ID, &cpuStart) - timer := time.NewTimer(probeTime) select { case <-abort: return false - case <-timer.C: + case <-time.After(probeTime): } C.clock_gettime(C.CLOCK_MONOTONIC, &end) diff --git a/internal/memory/memory_fallback.go b/internal/memory/memory_fallback.go index bc90c802a..cbf42865f 100644 --- a/internal/memory/memory_fallback.go +++ b/internal/memory/memory_fallback.go @@ -4,8 +4,6 @@ package memory -import "syscall" - // Return 0 if the total system memory cannot be determined func TotalSysMemory() uint64 { return 0 diff --git a/phpmainthread.go b/phpmainthread.go index cd1d71111..5b812775c 100644 --- a/phpmainthread.go +++ b/phpmainthread.go @@ -28,7 +28,7 @@ var ( // start the main PHP thread // start a fixed number of inactive PHP threads // reserve a fixed number of possible PHP threads -func initPHPThreads(numThreads int, numMaxThreads int, phpIniOverrides map[string]string) error { +func initPHPThreads(numThreads int, numMaxThreads int, phpIniOverrides map[string]string) (*phpMainThread, error) { mainThread = &phpMainThread{ state: newThreadState(), done: make(chan struct{}), @@ -45,7 +45,7 @@ func initPHPThreads(numThreads int, numMaxThreads int, phpIniOverrides map[strin phpThreads = []*phpThread{initialThread} if err := mainThread.start(); err != nil { - return err + return nil, err } // initialize all other threads @@ -67,7 +67,7 @@ func initPHPThreads(numThreads int, numMaxThreads int, phpIniOverrides map[strin } ready.Wait() - return nil + return mainThread, nil } // ThreadDebugStatus prints the state of all PHP threads - debugging purposes only diff --git a/phpmainthread_test.go b/phpmainthread_test.go index 0db0bcc87..6980da065 100644 --- a/phpmainthread_test.go +++ b/phpmainthread_test.go @@ -17,8 +17,9 @@ import ( var testDataPath, _ = filepath.Abs("./testdata") func TestStartAndStopTheMainThreadWithOneInactiveThread(t *testing.T) { - logger = zap.NewNop() // the logger needs to not be nil - assert.NoError(t, initPHPThreads(1, 1, nil)) // boot 1 thread + logger = zap.NewNop() // the logger needs to not be nil + _, err := initPHPThreads(1, 1, nil) // boot 1 thread + assert.NoError(t, err) assert.Len(t, phpThreads, 1) assert.Equal(t, 0, phpThreads[0].threadIndex) @@ -30,7 +31,8 @@ func TestStartAndStopTheMainThreadWithOneInactiveThread(t *testing.T) { func TestTransitionRegularThreadToWorkerThread(t *testing.T) { logger = zap.NewNop() - assert.NoError(t, initPHPThreads(1, 1, nil)) + _, err := initPHPThreads(1, 1, nil) + assert.NoError(t, err) // transition to regular thread convertToRegularThread(phpThreads[0]) @@ -53,7 +55,8 @@ func TestTransitionRegularThreadToWorkerThread(t *testing.T) { func TestTransitionAThreadBetween2DifferentWorkers(t *testing.T) { logger = zap.NewNop() - assert.NoError(t, initPHPThreads(1, 1, nil)) + _, err := initPHPThreads(1, 1, nil) + assert.NoError(t, err) firstWorker := getDummyWorker("transition-worker-1.php") secondWorker := getDummyWorker("transition-worker-2.php") diff --git a/scaling.go b/scaling.go index c41171c70..8d268c1ec 100644 --- a/scaling.go +++ b/scaling.go @@ -42,13 +42,16 @@ var ( WorkerNotFoundError = errors.New("worker not found for given filename") ) -func initAutoScaling(numThreads int, maxThreads int) { - if maxThreads <= numThreads { +func initAutoScaling(mainThread *phpMainThread) { + if mainThread.maxThreads <= mainThread.numThreads { return } - maxScaledThreads := maxThreads - numThreads + maxScaledThreads := mainThread.maxThreads - mainThread.numThreads + scalingMu.Lock() autoScaledThreads = make([]*phpThread, 0, maxScaledThreads) + scalingMu.Unlock() + go startUpscalingThreads(mainThread.done, maxScaledThreads) go startDownScalingThreads(mainThread.done) } @@ -197,8 +200,13 @@ func startUpscalingThreads(done chan struct{}, maxScaledThreads int) { scaledThreadCount := len(autoScaledThreads) scalingMu.Unlock() if scaledThreadCount >= maxScaledThreads { - time.Sleep(upscaleCheckTime) - continue + // we have reached max_threads, check again later + select { + case <-done: + return + case <-time.After(downScaleCheckTime): + continue + } } select { @@ -207,8 +215,12 @@ func startUpscalingThreads(done chan struct{}, maxScaledThreads int) { // if the request has not been stalled long enough, wait and repeat if timeSinceStalled < minStallTime { - time.Sleep(upscaleCheckTime) - continue + select { + case <-done: + return + case <-time.After(minStallTime - timeSinceStalled): + continue + } } // if the request has been stalled long enough, scale @@ -224,14 +236,12 @@ func startUpscalingThreads(done chan struct{}, maxScaledThreads int) { } func startDownScalingThreads(done chan struct{}) { - timer := time.NewTimer(downScaleCheckTime) for { select { case <-done: return - case <-timer.C: + case <-time.After(downScaleCheckTime): deactivateThreads() - timer.Reset(downScaleCheckTime) } } }