Skip to content

Commit

Permalink
Update task executor to reclaim resources immediately from idle conta…
Browse files Browse the repository at this point in the history
…iners when resources required for new tasks
  • Loading branch information
hugy718 committed Apr 1, 2024
1 parent 62682b1 commit 25d6f2c
Show file tree
Hide file tree
Showing 3 changed files with 206 additions and 93 deletions.
2 changes: 1 addition & 1 deletion task_executor/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ docker run --name meca_executor_test --gpus=all -v /var/run/docker.sock:/var/ru
Retrieve stats

```sh
curl http://172.18.0.255:2591/stats -X POST
curl http://172.18.0.255:2591/stats
```

Provide resource limit for the task
Expand Down
292 changes: 200 additions & 92 deletions task_executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,10 @@ func newTaskTracker() *taskTracker {

func (t *taskTracker) add(id string, task Task) {
t.mu.Lock()
t.tasks[id] = &taskHandle{task: task}
t.tasks[id].ref.Add(1)
if _, ok := t.tasks[id]; !ok {
t.tasks[id] = &taskHandle{task: task}
t.tasks[id].ref.Add(1)
}
t.tasks[id].updateLastUsed()
t.mu.Unlock()
}
Expand Down Expand Up @@ -108,6 +110,15 @@ func (t *taskTracker) remove(id string) {
delete(t.tasks, id)
}

func (t *taskTracker) dropCache() {
t.mu.Lock()
for id, h := range t.tasks {
h.release()
delete(t.tasks, id)
}
t.mu.Unlock()
}

// timeout in minutes
func (t *taskTracker) clean(timeout int) {
t.mu.RLock()
Expand All @@ -128,6 +139,33 @@ func (t *taskTracker) clean(timeout int) {
t.mu.RUnlock()
}

type MecaRequestHandle struct {
ctx context.Context
complete chan struct{}
err error
output []byte

taskCfg TaskConfig
input []byte
}

func NewMecaRequestHandle(ctx context.Context, taskCfg TaskConfig, input []byte) *MecaRequestHandle {
return &MecaRequestHandle{complete: make(chan struct{}, 1), ctx: ctx, taskCfg: taskCfg, input: input}
}

func (h *MecaRequestHandle) Done() {
h.complete <- struct{}{}
}

func (h *MecaRequestHandle) Wait() {
select {
case <-h.complete:
log.Printf("request handle wait completed")
case <-h.ctx.Done():
log.Printf("request handle wait canceled")
}
}

type MecaExecutor struct {
timeout int
tracker *taskTracker
Expand All @@ -141,6 +179,10 @@ type MecaExecutor struct {
stopChn chan<- struct{}
cleanerStoppedChn <-chan struct{}
pauseMu sync.RWMutex

reqQueue chan *MecaRequestHandle
reqStopChn chan<- struct{} // stop the request manager
reqManagerStoppedChn <-chan struct{} // wait for the request manager to stop
}

func NewMecaExecutorFromConfig(cfg MecaExecutorConfig) *MecaExecutor {
Expand Down Expand Up @@ -173,15 +215,150 @@ func (meca *MecaExecutor) cleaner(stopChn <-chan struct{}, cleanerStoppedChn cha
}
}

func (meca *MecaExecutor) handleOneRequest(req *MecaRequestHandle) {

// get the taskid as key for tracker.
taskId := GetTaskId(req.taskCfg)

log.Printf("image ready")
// get the task handle
var h *taskHandle
for {
if h = meca.tracker.get(taskId); h != nil {
break
}
log.Printf("adding task")
task, err := meca.fac.Build(taskId, req.taskCfg)
if err != nil {
req.err = err
req.Done()
return
}
meca.tracker.add(taskId, task)
log.Printf("task handle added")
}
log.Printf("task handle ready")

retryDueToResourceShortage := 0
for {
time.Sleep(time.Millisecond * time.Duration(retryDueToResourceShortage))
var err error
// initialize the task once
if !h.initialized.Load() {
h.initMu.Lock()
if !h.initialized.Load() {
// reserve the resources
// reserve the gpu
var gpus []int
if req.taskCfg.Rsrc.UseGPU {
// set to 10 percent utilization for filtering out GPUs in use.
if gpus, err = meca.rm.ReserveGPU(int(req.taskCfg.Rsrc.GPUCount), 10); err != nil {
h.initMu.Unlock()
if err != errMECANotEnoughResource {
req.err = err
req.Done()
h.release()
return
}
// experience resource shortage disable tracker facilitated task caching
meca.tracker.dropCache()
retryDueToResourceShortage++
continue
}
}

// reserve the cpu and memory
if err := meca.rm.Reserve(float64(req.taskCfg.Rsrc.CPU), int(req.taskCfg.Rsrc.MEM)); err != nil {
if req.taskCfg.Rsrc.UseGPU {
meca.rm.ReleaseGPU(gpus)
}
h.initMu.Unlock()
if err != errMECANotEnoughResource {
req.err = err
req.Done()
h.release()
return
}
meca.tracker.dropCache()
retryDueToResourceShortage++
continue
}
// retry in port allocation
port := port_alloc()
retryCount := 0
for {
if err = h.task.Init(req.ctx, "", port, gpus); err == nil {
// register release callback when the task init successfully
releaseCpu := float64(req.taskCfg.Rsrc.CPU)
releaseMem := int(req.taskCfg.Rsrc.MEM)
h.releaseCb = func() error {
meca.rm.ReleaseGPU(gpus)
return meca.rm.Release(releaseCpu, releaseMem)
}
h.initialized.Store(true)
break
} else if retryCount > 10 {
// init failed, release resources
meca.rm.ReleaseGPU(gpus)
meca.rm.Release(float64(req.taskCfg.Rsrc.CPU), int(req.taskCfg.Rsrc.MEM))
break
}
retryCount++
port = port_alloc()
}
}
h.initMu.Unlock()
}
if err != nil {
req.err = err
req.Done()
h.release()
return
}
log.Printf("task with config %v added: %v ", req.taskCfg, h.task)
go func() {
defer req.Done()
req.output, req.err = h.task.Execute(req.ctx, req.input)
h.release()
}()
return
}
}

func (meca *MecaExecutor) consumeRequests(reqQueue <-chan *MecaRequestHandle, reqStopChn <-chan struct{}, reqManagerStoppedChn chan<- struct{}) {
scheduledToStop := 0 // because schedule to stop is deferred until all requests are handled, so we use this variable to track the number of caller waiting for it to return stopped signal on reqManagerStoppedChn.
for {
select {
case req := <-reqQueue:
meca.handleOneRequest(req)
case <-reqStopChn:
scheduledToStop++
default:
if scheduledToStop > 0 {
reqManagerStoppedChn <- struct{}{}
scheduledToStop--
}
time.Sleep(time.Millisecond)
}
}
}

// will launch cleaner goroutine to remove tasks after timeout
func (meca *MecaExecutor) Start() {
if !meca.started {
stopChn := make(chan struct{})
cleanerStoppedChn := make(chan struct{})
reqStopChn := make(chan struct{})
reqManagerStoppedChn := make(chan struct{})
reqQueue := make(chan *MecaRequestHandle, 100)
meca.rm.Start()
go meca.cleaner(stopChn, cleanerStoppedChn)
go meca.consumeRequests(reqQueue, reqStopChn, reqManagerStoppedChn)
meca.stopChn = stopChn
meca.cleanerStoppedChn = cleanerStoppedChn
meca.reqQueue = reqQueue
meca.reqStopChn = reqStopChn
meca.reqManagerStoppedChn = reqManagerStoppedChn
meca.started = true
}
}
Expand All @@ -193,6 +370,8 @@ func (meca *MecaExecutor) Stop() {
meca.stopChn <- struct{}{}
meca.rm.Stop()
<-meca.cleanerStoppedChn
meca.reqStopChn <- struct{}{}
<-meca.reqManagerStoppedChn
}
meca.pauseMu.Unlock()
}
Expand All @@ -208,6 +387,8 @@ func (meca *MecaExecutor) Pause() {
// avoid concurrent pause and unpause
meca.stopChn <- struct{}{}
<-meca.cleanerStoppedChn
meca.reqStopChn <- struct{}{}
<-meca.reqManagerStoppedChn
}
meca.pauseMu.Unlock()
}
Expand Down Expand Up @@ -288,6 +469,17 @@ func (meca *MecaExecutor) Execute(ctx context.Context, taskCfg TaskConfig, input
return nil, errors.New("not enough GPU available")
}
}
// validate cpu and mem setting
if taskCfg.Rsrc.CPU <= 0 || taskCfg.Rsrc.MEM <= 0 {
return nil, errors.New("invalid resource limit")
} else {
maxCpu, maxMem := meca.rm.GetMaxCPUAndMem()
if float64(taskCfg.Rsrc.CPU) > maxCpu || uint64(taskCfg.Rsrc.MEM) > maxMem {
return nil, errors.New("resource limit exceeds the maximum")
}
}

// after validation it is assumed that the task will eventually be processed when all other tasks are finished.

// translate the runtime type
if len(taskCfg.Runtime) > 0 {
Expand All @@ -301,102 +493,18 @@ func (meca *MecaExecutor) Execute(ctx context.Context, taskCfg TaskConfig, input

// ensure the task uses correct version of image
// TODO (expose more control for version later)
imageUpdate := false
if found, err := meca.repo.Exists(taskCfg.ImageId, ""); err != nil && err != errUnFoundImageVersion {
return nil, err
} else if !found {
if err := meca.repo.Fetch(taskCfg.ImageId, "", ""); err != nil {
return nil, err
}
imageUpdate = true
}

// if an image update is performed, launch a new task and schedule an cleanup of the last task
// here the id is the uid for an image.
// TODO: when id is not uid for an image (using image tag), we should construct uid for task id here, and release the old task handle under the old uid

// get the taskid as key for tracker.
taskId := GetTaskId(taskCfg)

if imageUpdate {
task, err := meca.fac.Build(taskId, taskCfg)
if err != nil {
return nil, err
}
meca.tracker.add(taskId, task)
}

log.Printf("image ready")
// get the task handle
var h *taskHandle
for {
if h = meca.tracker.get(taskId); h != nil {
break
}
log.Printf("adding task")
task, err := meca.fac.Build(taskId, taskCfg)
if err != nil {
return nil, err
}
meca.tracker.add(taskId, task)
log.Printf("task handle added")
}
log.Printf("task handle ready")
defer h.release()

var err error
// initialize the task once
if !h.initialized.Load() {
h.initMu.Lock()
if !h.initialized.Load() {
// reserve the resources
// reserve the gpu
var gpus []int
if taskCfg.Rsrc.UseGPU {
// set to 10 percent utilization for filtering out GPUs in use.
if gpus, err = meca.rm.ReserveGPU(int(taskCfg.Rsrc.GPUCount), 10); err != nil {
h.initMu.Unlock()
return nil, err
}
}

// reserve the cpu and memory
if err := meca.rm.Reserve(float64(taskCfg.Rsrc.CPU), int(taskCfg.Rsrc.MEM)); err != nil {
if taskCfg.Rsrc.UseGPU {
meca.rm.ReleaseGPU(gpus)
}
h.initMu.Unlock()
return nil, err
}
// retry in port allocation
port := port_alloc()
retryCount := 0
for {
if err = h.task.Init(ctx, "", port, gpus); err == nil {
// register release callback when the task init successfully
releaseCpu := float64(taskCfg.Rsrc.CPU)
releaseMem := int(taskCfg.Rsrc.MEM)
h.releaseCb = func() error {
meca.rm.ReleaseGPU(gpus)
return meca.rm.Release(releaseCpu, releaseMem)
}
h.initialized.Store(true)
break
} else if retryCount > 10 {
// init failed, release resources
meca.rm.ReleaseGPU(gpus)
meca.rm.Release(float64(taskCfg.Rsrc.CPU), int(taskCfg.Rsrc.MEM))
break
}
retryCount++
port = port_alloc()
}
}
h.initMu.Unlock()
}
if err != nil {
return nil, err
}
log.Printf("task with config %v added: %v ", taskCfg, h.task)
return h.task.Execute(ctx, input)
// if an image update is performed, the image id shall be different. It is a uid. launch a new task and an cleanup of the last task will be eventually scheduled.
req := NewMecaRequestHandle(ctx, taskCfg, input)
meca.reqQueue <- req
req.Wait()
log.Printf("request handle completed: %v, err: %v", req.output, req.err)
return req.output, req.err
}
5 changes: 5 additions & 0 deletions task_executor/resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,7 @@ type ResourceManager interface {
UpdateConfig(cpu float64, mem int) string
Reserve(cpu float64, mem int) error
Release(cpu float64, mem int) error
GetMaxCPUAndMem() (float64, uint64)
GetGPUCount() int
ReserveGPU(count int, utilLimit int) ([]int, error)
ReleaseGPU([]int) error
Expand Down Expand Up @@ -344,6 +345,10 @@ func (m *resourceManager) Release(cpu float64, mem int) error {
return nil
}

func (m *resourceManager) GetMaxCPUAndMem() (float64, uint64) {
return float64(m.taskTotalCPU) / 10, m.taskTotalMEM
}

func (m *resourceManager) GetGPUCount() int {
if !m.hasGPU {
return 0
Expand Down

0 comments on commit 25d6f2c

Please sign in to comment.