Skip to content

Commit

Permalink
Adjusts timers and passes the main thread directly to scaling.
Browse files Browse the repository at this point in the history
  • Loading branch information
Alliballibaba2 committed Jan 11, 2025
1 parent 0545e1b commit 615662e
Show file tree
Hide file tree
Showing 7 changed files with 35 additions and 25 deletions.
5 changes: 3 additions & 2 deletions frankenphp.go
Original file line number Diff line number Diff line change
Expand Up @@ -333,7 +333,8 @@ func Init(options ...Option) error {
logger.Warn(`ZTS is not enabled, only 1 thread will be available, recompile PHP using the "--enable-zts" configuration option or performance will be degraded`)
}

if err := initPHPThreads(totalThreadCount, maxThreadCount, opt.phpIniOverrides); err != nil {
mainThread, err := initPHPThreads(totalThreadCount, maxThreadCount, opt.phpIniOverrides)
if err != nil {
return err
}

Expand All @@ -348,7 +349,7 @@ func Init(options ...Option) error {
return err
}

initAutoScaling(mainThread.numThreads, mainThread.maxThreads)
initAutoScaling(mainThread)

if c := logger.Check(zapcore.InfoLevel, "FrankenPHP started 🐘"); c != nil {
c.Write(zap.String("php_version", Version().Version), zap.Int("num_threads", mainThread.numThreads), zap.Int("max_threads", mainThread.maxThreads))
Expand Down
3 changes: 1 addition & 2 deletions internal/cpu/cpu_fallback.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,10 @@ import (

// The fallback always determines that the CPU limits are not reached
func ProbeCPUs(probeTime time.Duration, maxCPUUsage float64, abort chan struct{}) bool {
timer := time.NewTimer(probeTime)
select {
case <-abort:
return false
case <-timer.C:
case <-time.After(probeTime):
return true
}
}
3 changes: 1 addition & 2 deletions internal/cpu/cpu_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,10 @@ func ProbeCPUs(probeTime time.Duration, maxCPUUsage float64, abort chan struct{}
C.clock_gettime(C.CLOCK_MONOTONIC, &start)
C.clock_gettime(C.CLOCK_PROCESS_CPUTIME_ID, &cpuStart)

timer := time.NewTimer(probeTime)
select {
case <-abort:
return false
case <-timer.C:
case <-time.After(probeTime):
}

C.clock_gettime(C.CLOCK_MONOTONIC, &end)
Expand Down
2 changes: 0 additions & 2 deletions internal/memory/memory_fallback.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@

package memory

import "syscall"

// Return 0 if the total system memory cannot be determined
func TotalSysMemory() uint64 {
return 0
Expand Down
6 changes: 3 additions & 3 deletions phpmainthread.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ var (
// start the main PHP thread
// start a fixed number of inactive PHP threads
// reserve a fixed number of possible PHP threads
func initPHPThreads(numThreads int, numMaxThreads int, phpIniOverrides map[string]string) error {
func initPHPThreads(numThreads int, numMaxThreads int, phpIniOverrides map[string]string) (*phpMainThread, error) {
mainThread = &phpMainThread{
state: newThreadState(),
done: make(chan struct{}),
Expand All @@ -45,7 +45,7 @@ func initPHPThreads(numThreads int, numMaxThreads int, phpIniOverrides map[strin
phpThreads = []*phpThread{initialThread}

if err := mainThread.start(); err != nil {
return err
return nil, err
}

// initialize all other threads
Expand All @@ -67,7 +67,7 @@ func initPHPThreads(numThreads int, numMaxThreads int, phpIniOverrides map[strin
}
ready.Wait()

return nil
return mainThread, nil
}

// ThreadDebugStatus prints the state of all PHP threads - debugging purposes only
Expand Down
11 changes: 7 additions & 4 deletions phpmainthread_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,9 @@ import (
var testDataPath, _ = filepath.Abs("./testdata")

func TestStartAndStopTheMainThreadWithOneInactiveThread(t *testing.T) {
logger = zap.NewNop() // the logger needs to not be nil
assert.NoError(t, initPHPThreads(1, 1, nil)) // boot 1 thread
logger = zap.NewNop() // the logger needs to not be nil
_, err := initPHPThreads(1, 1, nil) // boot 1 thread
assert.NoError(t, err)

assert.Len(t, phpThreads, 1)
assert.Equal(t, 0, phpThreads[0].threadIndex)
Expand All @@ -30,7 +31,8 @@ func TestStartAndStopTheMainThreadWithOneInactiveThread(t *testing.T) {

func TestTransitionRegularThreadToWorkerThread(t *testing.T) {
logger = zap.NewNop()
assert.NoError(t, initPHPThreads(1, 1, nil))
_, err := initPHPThreads(1, 1, nil)
assert.NoError(t, err)

// transition to regular thread
convertToRegularThread(phpThreads[0])
Expand All @@ -53,7 +55,8 @@ func TestTransitionRegularThreadToWorkerThread(t *testing.T) {

func TestTransitionAThreadBetween2DifferentWorkers(t *testing.T) {
logger = zap.NewNop()
assert.NoError(t, initPHPThreads(1, 1, nil))
_, err := initPHPThreads(1, 1, nil)
assert.NoError(t, err)
firstWorker := getDummyWorker("transition-worker-1.php")
secondWorker := getDummyWorker("transition-worker-2.php")

Expand Down
30 changes: 20 additions & 10 deletions scaling.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,16 @@ var (
WorkerNotFoundError = errors.New("worker not found for given filename")
)

func initAutoScaling(numThreads int, maxThreads int) {
if maxThreads <= numThreads {
func initAutoScaling(mainThread *phpMainThread) {
if mainThread.maxThreads <= mainThread.numThreads {
return
}

maxScaledThreads := maxThreads - numThreads
maxScaledThreads := mainThread.maxThreads - mainThread.numThreads
scalingMu.Lock()
autoScaledThreads = make([]*phpThread, 0, maxScaledThreads)
scalingMu.Unlock()

go startUpscalingThreads(mainThread.done, maxScaledThreads)
go startDownScalingThreads(mainThread.done)
}
Expand Down Expand Up @@ -197,8 +200,13 @@ func startUpscalingThreads(done chan struct{}, maxScaledThreads int) {
scaledThreadCount := len(autoScaledThreads)
scalingMu.Unlock()
if scaledThreadCount >= maxScaledThreads {
time.Sleep(upscaleCheckTime)
continue
// we have reached max_threads, check again later
select {
case <-done:
return
case <-time.After(downScaleCheckTime):
continue
}
}

select {
Expand All @@ -207,8 +215,12 @@ func startUpscalingThreads(done chan struct{}, maxScaledThreads int) {

// if the request has not been stalled long enough, wait and repeat
if timeSinceStalled < minStallTime {
time.Sleep(upscaleCheckTime)
continue
select {
case <-done:
return
case <-time.After(minStallTime - timeSinceStalled):
continue
}
}

// if the request has been stalled long enough, scale
Expand All @@ -224,14 +236,12 @@ func startUpscalingThreads(done chan struct{}, maxScaledThreads int) {
}

func startDownScalingThreads(done chan struct{}) {
timer := time.NewTimer(downScaleCheckTime)
for {
select {
case <-done:
return
case <-timer.C:
case <-time.After(downScaleCheckTime):
deactivateThreads()
timer.Reset(downScaleCheckTime)
}
}
}
Expand Down

0 comments on commit 615662e

Please sign in to comment.