diff --git a/task_executor/README.md b/task_executor/README.md index aa3e453..9180d7d 100644 --- a/task_executor/README.md +++ b/task_executor/README.md @@ -60,7 +60,7 @@ docker run --name meca_executor_test --gpus=all -v /var/run/docker.sock:/var/ru Retrieve stats ```sh -curl http://172.18.0.255:2591/stats -X POST +curl http://172.18.0.255:2591/stats ``` Provide resource limit for the task diff --git a/task_executor/executor.go b/task_executor/executor.go index 679d363..1175546 100644 --- a/task_executor/executor.go +++ b/task_executor/executor.go @@ -78,8 +78,10 @@ func newTaskTracker() *taskTracker { func (t *taskTracker) add(id string, task Task) { t.mu.Lock() - t.tasks[id] = &taskHandle{task: task} - t.tasks[id].ref.Add(1) + if _, ok := t.tasks[id]; !ok { + t.tasks[id] = &taskHandle{task: task} + t.tasks[id].ref.Add(1) + } t.tasks[id].updateLastUsed() t.mu.Unlock() } @@ -108,6 +110,15 @@ func (t *taskTracker) remove(id string) { delete(t.tasks, id) } +func (t *taskTracker) dropCache() { + t.mu.Lock() + for id, h := range t.tasks { + h.release() + delete(t.tasks, id) + } + t.mu.Unlock() +} + // timeout in minutes func (t *taskTracker) clean(timeout int) { t.mu.RLock() @@ -128,6 +139,33 @@ func (t *taskTracker) clean(timeout int) { t.mu.RUnlock() } +type MecaRequestHandle struct { + ctx context.Context + complete chan struct{} + err error + output []byte + + taskCfg TaskConfig + input []byte +} + +func NewMecaRequestHandle(ctx context.Context, taskCfg TaskConfig, input []byte) *MecaRequestHandle { + return &MecaRequestHandle{complete: make(chan struct{}, 1), ctx: ctx, taskCfg: taskCfg, input: input} +} + +func (h *MecaRequestHandle) Done() { + h.complete <- struct{}{} +} + +func (h *MecaRequestHandle) Wait() { + select { + case <-h.complete: + log.Printf("request handle wait completed") + case <-h.ctx.Done(): + log.Printf("request handle wait canceled") + } +} + type MecaExecutor struct { timeout int tracker *taskTracker @@ -141,6 +179,10 @@ type MecaExecutor struct { stopChn chan<- struct{} cleanerStoppedChn <-chan struct{} pauseMu sync.RWMutex + + reqQueue chan *MecaRequestHandle + reqStopChn chan<- struct{} // stop the request manager + reqManagerStoppedChn <-chan struct{} // wait for the request manager to stop } func NewMecaExecutorFromConfig(cfg MecaExecutorConfig) *MecaExecutor { @@ -173,15 +215,150 @@ func (meca *MecaExecutor) cleaner(stopChn <-chan struct{}, cleanerStoppedChn cha } } +func (meca *MecaExecutor) handleOneRequest(req *MecaRequestHandle) { + + // get the taskid as key for tracker. + taskId := GetTaskId(req.taskCfg) + + log.Printf("image ready") + // get the task handle + var h *taskHandle + for { + if h = meca.tracker.get(taskId); h != nil { + break + } + log.Printf("adding task") + task, err := meca.fac.Build(taskId, req.taskCfg) + if err != nil { + req.err = err + req.Done() + return + } + meca.tracker.add(taskId, task) + log.Printf("task handle added") + } + log.Printf("task handle ready") + + retryDueToResourceShortage := 0 + for { + time.Sleep(time.Millisecond * time.Duration(retryDueToResourceShortage)) + var err error + // initialize the task once + if !h.initialized.Load() { + h.initMu.Lock() + if !h.initialized.Load() { + // reserve the resources + // reserve the gpu + var gpus []int + if req.taskCfg.Rsrc.UseGPU { + // set to 10 percent utilization for filtering out GPUs in use. + if gpus, err = meca.rm.ReserveGPU(int(req.taskCfg.Rsrc.GPUCount), 10); err != nil { + h.initMu.Unlock() + if err != errMECANotEnoughResource { + req.err = err + req.Done() + h.release() + return + } + // experience resource shortage disable tracker facilitated task caching + meca.tracker.dropCache() + retryDueToResourceShortage++ + continue + } + } + + // reserve the cpu and memory + if err := meca.rm.Reserve(float64(req.taskCfg.Rsrc.CPU), int(req.taskCfg.Rsrc.MEM)); err != nil { + if req.taskCfg.Rsrc.UseGPU { + meca.rm.ReleaseGPU(gpus) + } + h.initMu.Unlock() + if err != errMECANotEnoughResource { + req.err = err + req.Done() + h.release() + return + } + meca.tracker.dropCache() + retryDueToResourceShortage++ + continue + } + // retry in port allocation + port := port_alloc() + retryCount := 0 + for { + if err = h.task.Init(req.ctx, "", port, gpus); err == nil { + // register release callback when the task init successfully + releaseCpu := float64(req.taskCfg.Rsrc.CPU) + releaseMem := int(req.taskCfg.Rsrc.MEM) + h.releaseCb = func() error { + meca.rm.ReleaseGPU(gpus) + return meca.rm.Release(releaseCpu, releaseMem) + } + h.initialized.Store(true) + break + } else if retryCount > 10 { + // init failed, release resources + meca.rm.ReleaseGPU(gpus) + meca.rm.Release(float64(req.taskCfg.Rsrc.CPU), int(req.taskCfg.Rsrc.MEM)) + break + } + retryCount++ + port = port_alloc() + } + } + h.initMu.Unlock() + } + if err != nil { + req.err = err + req.Done() + h.release() + return + } + log.Printf("task with config %v added: %v ", req.taskCfg, h.task) + go func() { + defer req.Done() + req.output, req.err = h.task.Execute(req.ctx, req.input) + h.release() + }() + return + } +} + +func (meca *MecaExecutor) consumeRequests(reqQueue <-chan *MecaRequestHandle, reqStopChn <-chan struct{}, reqManagerStoppedChn chan<- struct{}) { + scheduledToStop := 0 // because schedule to stop is deferred until all requests are handled, so we use this variable to track the number of caller waiting for it to return stopped signal on reqManagerStoppedChn. + for { + select { + case req := <-reqQueue: + meca.handleOneRequest(req) + case <-reqStopChn: + scheduledToStop++ + default: + if scheduledToStop > 0 { + reqManagerStoppedChn <- struct{}{} + scheduledToStop-- + } + time.Sleep(time.Millisecond) + } + } +} + // will launch cleaner goroutine to remove tasks after timeout func (meca *MecaExecutor) Start() { if !meca.started { stopChn := make(chan struct{}) cleanerStoppedChn := make(chan struct{}) + reqStopChn := make(chan struct{}) + reqManagerStoppedChn := make(chan struct{}) + reqQueue := make(chan *MecaRequestHandle, 100) meca.rm.Start() go meca.cleaner(stopChn, cleanerStoppedChn) + go meca.consumeRequests(reqQueue, reqStopChn, reqManagerStoppedChn) meca.stopChn = stopChn meca.cleanerStoppedChn = cleanerStoppedChn + meca.reqQueue = reqQueue + meca.reqStopChn = reqStopChn + meca.reqManagerStoppedChn = reqManagerStoppedChn meca.started = true } } @@ -193,6 +370,8 @@ func (meca *MecaExecutor) Stop() { meca.stopChn <- struct{}{} meca.rm.Stop() <-meca.cleanerStoppedChn + meca.reqStopChn <- struct{}{} + <-meca.reqManagerStoppedChn } meca.pauseMu.Unlock() } @@ -208,6 +387,8 @@ func (meca *MecaExecutor) Pause() { // avoid concurrent pause and unpause meca.stopChn <- struct{}{} <-meca.cleanerStoppedChn + meca.reqStopChn <- struct{}{} + <-meca.reqManagerStoppedChn } meca.pauseMu.Unlock() } @@ -288,6 +469,17 @@ func (meca *MecaExecutor) Execute(ctx context.Context, taskCfg TaskConfig, input return nil, errors.New("not enough GPU available") } } + // validate cpu and mem setting + if taskCfg.Rsrc.CPU <= 0 || taskCfg.Rsrc.MEM <= 0 { + return nil, errors.New("invalid resource limit") + } else { + maxCpu, maxMem := meca.rm.GetMaxCPUAndMem() + if float64(taskCfg.Rsrc.CPU) > maxCpu || uint64(taskCfg.Rsrc.MEM) > maxMem { + return nil, errors.New("resource limit exceeds the maximum") + } + } + + // after validation it is assumed that the task will eventually be processed when all other tasks are finished. // translate the runtime type if len(taskCfg.Runtime) > 0 { @@ -301,102 +493,18 @@ func (meca *MecaExecutor) Execute(ctx context.Context, taskCfg TaskConfig, input // ensure the task uses correct version of image // TODO (expose more control for version later) - imageUpdate := false if found, err := meca.repo.Exists(taskCfg.ImageId, ""); err != nil && err != errUnFoundImageVersion { return nil, err } else if !found { if err := meca.repo.Fetch(taskCfg.ImageId, "", ""); err != nil { return nil, err } - imageUpdate = true - } - - // if an image update is performed, launch a new task and schedule an cleanup of the last task - // here the id is the uid for an image. - // TODO: when id is not uid for an image (using image tag), we should construct uid for task id here, and release the old task handle under the old uid - - // get the taskid as key for tracker. - taskId := GetTaskId(taskCfg) - - if imageUpdate { - task, err := meca.fac.Build(taskId, taskCfg) - if err != nil { - return nil, err - } - meca.tracker.add(taskId, task) } - log.Printf("image ready") - // get the task handle - var h *taskHandle - for { - if h = meca.tracker.get(taskId); h != nil { - break - } - log.Printf("adding task") - task, err := meca.fac.Build(taskId, taskCfg) - if err != nil { - return nil, err - } - meca.tracker.add(taskId, task) - log.Printf("task handle added") - } - log.Printf("task handle ready") - defer h.release() - - var err error - // initialize the task once - if !h.initialized.Load() { - h.initMu.Lock() - if !h.initialized.Load() { - // reserve the resources - // reserve the gpu - var gpus []int - if taskCfg.Rsrc.UseGPU { - // set to 10 percent utilization for filtering out GPUs in use. - if gpus, err = meca.rm.ReserveGPU(int(taskCfg.Rsrc.GPUCount), 10); err != nil { - h.initMu.Unlock() - return nil, err - } - } - - // reserve the cpu and memory - if err := meca.rm.Reserve(float64(taskCfg.Rsrc.CPU), int(taskCfg.Rsrc.MEM)); err != nil { - if taskCfg.Rsrc.UseGPU { - meca.rm.ReleaseGPU(gpus) - } - h.initMu.Unlock() - return nil, err - } - // retry in port allocation - port := port_alloc() - retryCount := 0 - for { - if err = h.task.Init(ctx, "", port, gpus); err == nil { - // register release callback when the task init successfully - releaseCpu := float64(taskCfg.Rsrc.CPU) - releaseMem := int(taskCfg.Rsrc.MEM) - h.releaseCb = func() error { - meca.rm.ReleaseGPU(gpus) - return meca.rm.Release(releaseCpu, releaseMem) - } - h.initialized.Store(true) - break - } else if retryCount > 10 { - // init failed, release resources - meca.rm.ReleaseGPU(gpus) - meca.rm.Release(float64(taskCfg.Rsrc.CPU), int(taskCfg.Rsrc.MEM)) - break - } - retryCount++ - port = port_alloc() - } - } - h.initMu.Unlock() - } - if err != nil { - return nil, err - } - log.Printf("task with config %v added: %v ", taskCfg, h.task) - return h.task.Execute(ctx, input) + // if an image update is performed, the image id shall be different. It is a uid. launch a new task and an cleanup of the last task will be eventually scheduled. + req := NewMecaRequestHandle(ctx, taskCfg, input) + meca.reqQueue <- req + req.Wait() + log.Printf("request handle completed: %v, err: %v", req.output, req.err) + return req.output, req.err } diff --git a/task_executor/resource.go b/task_executor/resource.go index 82724a2..bd81f6b 100644 --- a/task_executor/resource.go +++ b/task_executor/resource.go @@ -229,6 +229,7 @@ type ResourceManager interface { UpdateConfig(cpu float64, mem int) string Reserve(cpu float64, mem int) error Release(cpu float64, mem int) error + GetMaxCPUAndMem() (float64, uint64) GetGPUCount() int ReserveGPU(count int, utilLimit int) ([]int, error) ReleaseGPU([]int) error @@ -344,6 +345,10 @@ func (m *resourceManager) Release(cpu float64, mem int) error { return nil } +func (m *resourceManager) GetMaxCPUAndMem() (float64, uint64) { + return float64(m.taskTotalCPU) / 10, m.taskTotalMEM +} + func (m *resourceManager) GetGPUCount() int { if !m.hasGPU { return 0