This repository has been archived by the owner on Aug 18, 2020. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 45
/
sched_resources.go
110 lines (87 loc) · 2.73 KB
/
sched_resources.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
package sectorstorage
import (
"sync"
"github.com/filecoin-project/sector-storage/storiface"
)
func (a *activeResources) withResources(id WorkerID, wr storiface.WorkerResources, r Resources, locker sync.Locker, cb func() error) error {
for !a.canHandleRequest(r, id, wr) {
if a.cond == nil {
a.cond = sync.NewCond(locker)
}
a.cond.Wait()
}
a.add(wr, r)
err := cb()
a.free(wr, r)
if a.cond != nil {
a.cond.Broadcast()
}
return err
}
func (a *activeResources) add(wr storiface.WorkerResources, r Resources) {
a.gpuUsed = r.CanGPU
if r.MultiThread() {
a.cpuUse += wr.CPUs
} else {
a.cpuUse += uint64(r.Threads)
}
a.memUsedMin += r.MinMemory
a.memUsedMax += r.MaxMemory
}
func (a *activeResources) free(wr storiface.WorkerResources, r Resources) {
if r.CanGPU {
a.gpuUsed = false
}
if r.MultiThread() {
a.cpuUse -= wr.CPUs
} else {
a.cpuUse -= uint64(r.Threads)
}
a.memUsedMin -= r.MinMemory
a.memUsedMax -= r.MaxMemory
}
func (a *activeResources) canHandleRequest(needRes Resources, wid WorkerID, res storiface.WorkerResources) bool {
// TODO: dedupe needRes.BaseMinMemory per task type (don't add if that task is already running)
minNeedMem := res.MemReserved + a.memUsedMin + needRes.MinMemory + needRes.BaseMinMemory
if minNeedMem > res.MemPhysical {
log.Debugf("sched: not scheduling on worker %d; not enough physical memory - need: %dM, have %dM", wid, minNeedMem/mib, res.MemPhysical/mib)
return false
}
maxNeedMem := res.MemReserved + a.memUsedMax + needRes.MaxMemory + needRes.BaseMinMemory
if maxNeedMem > res.MemSwap+res.MemPhysical {
log.Debugf("sched: not scheduling on worker %d; not enough virtual memory - need: %dM, have %dM", wid, maxNeedMem/mib, (res.MemSwap+res.MemPhysical)/mib)
return false
}
if needRes.MultiThread() {
if a.cpuUse > 0 {
log.Debugf("sched: not scheduling on worker %d; multicore process needs %d threads, %d in use, target %d", wid, res.CPUs, a.cpuUse, res.CPUs)
return false
}
} else {
if a.cpuUse+uint64(needRes.Threads) > res.CPUs {
log.Debugf("sched: not scheduling on worker %d; not enough threads, need %d, %d in use, target %d", wid, needRes.Threads, a.cpuUse, res.CPUs)
return false
}
}
if len(res.GPUs) > 0 && needRes.CanGPU {
if a.gpuUsed {
log.Debugf("sched: not scheduling on worker %d; GPU in use", wid)
return false
}
}
return true
}
func (a *activeResources) utilization(wr storiface.WorkerResources) float64 {
var max float64
cpu := float64(a.cpuUse) / float64(wr.CPUs)
max = cpu
memMin := float64(a.memUsedMin+wr.MemReserved) / float64(wr.MemPhysical)
if memMin > max {
max = memMin
}
memMax := float64(a.memUsedMax+wr.MemReserved) / float64(wr.MemPhysical+wr.MemSwap)
if memMax > max {
max = memMax
}
return max
}