-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathexecution.go
180 lines (155 loc) · 4.28 KB
/
execution.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
package dockerlang
import (
"context"
"fmt"
"io"
"os"
"strings"
"github.com/docker/docker/api/types"
"github.com/docker/docker/api/types/container"
"github.com/docker/docker/api/types/network"
"github.com/docker/docker/client"
"github.com/docker/go-connections/nat"
uuid "github.com/satori/go.uuid"
)
const (
DOCKERLANG_IMAGE = "dockerlang"
MEMORY_PORT = "6666"
COMPUTATION_TYPE_ENV_VAR = "COMPUTATION_TYPE_ENV_VAR"
COMPUTATION_VALUE_ENV_VAR = "COMPUTATION_VALUE_ENV_VAR"
COMPUTATION_DEPS_ENV_VAR = "COMPUTATION_DEPS_ENV_VAR"
)
var (
executer *ExecutionEngine
)
type ExecutionEngine struct {
Docker *client.Client
Guillotine string
Network string
}
type ExecutionData struct {
ComputationType string
Value string
Operands []string
}
// constructs an ExecutionEngine and binds to the globally scoped executer.
func NewExecutionEngine() error {
// set the API version to use in an environment variable
// TODO it would be nice to configure based on the docker version
// a user currently has.... not enough time right now so skipping that.
err := os.Setenv("DOCKER_API_VERSION", "1.35")
if err != nil {
return err
}
dockerClient, err := client.NewEnvClient()
if err != nil {
// this is probably because the person who is using dockerlang
// hasn't installed or started docker on their system -____-
// unclear why anyone would *not* have docker in their life.
return err
}
// define unique network name
networkName := fmt.Sprintf("dockerlang.%s", uuid.NewV4().String())
// bind new ExecutionEngine to globally scoped variable
executer = &ExecutionEngine{
Docker: dockerClient,
Guillotine: "robespierre",
Network: networkName,
}
// setup container bridge network if one doesn't already exist.
_, err = executer.Docker.NetworkCreate(
context.TODO(),
networkName,
types.NetworkCreate{},
)
if err != nil {
return err
}
// create docker image from Dockerfile
// NOTE: this didn't work because of relative directories etc.
// _, err = executer.Docker.ImageBuild(
// context.TODO(),
// nil, // TODO what is this io.Reader about?
// types.ImageBuildOptions{
// Dockerfile: "../Dockerfile",
// Tags: []string{DOCKERLANG_IMAGE},
// },
// )
// if err != nil {
// return err
// }
return nil
}
func ShutdownExecutionEngine() error {
err := executer.Docker.NetworkRemove(context.TODO(), executer.Network)
if err != nil && !client.IsErrNotFound(err) {
// something is very wrong here
panic(err)
}
return nil
}
// construct the arguments to the computation about to be run and then create/start
// a new docker container to perform the actual computation!
func (e *ExecutionEngine) Run(d *ExecutionData) (string, error) {
var ctx = context.Background()
// construct a comma delimited list of dockerlang container ids
// which we will pass to the container as an environment variable
dependencies := strings.Join(d.Operands, ",")
// create the DockerLang Container Id for this computation
dlci := uuid.NewV4().String()
// create docker container for this computation
_, err := e.Docker.ContainerCreate(
ctx, // i have no idea what this is or should be
&container.Config{
ExposedPorts: nat.PortSet{MEMORY_PORT: struct{}{}},
Image: DOCKERLANG_IMAGE,
Env: []string{
fmt.Sprintf("%s=%s", COMPUTATION_TYPE_ENV_VAR, d.ComputationType),
fmt.Sprintf("%s=%s", COMPUTATION_VALUE_ENV_VAR, d.Value),
fmt.Sprintf("%s=%s", COMPUTATION_DEPS_ENV_VAR, dependencies),
},
AttachStdout: true,
Tty: true,
},
nil,
&network.NetworkingConfig{
EndpointsConfig: map[string]*network.EndpointSettings{
e.Network: &network.EndpointSettings{
NetworkID: e.Network,
},
},
},
dlci,
)
if err != nil {
return "", err
}
// setup stdout stream from container
hijackedResp, err := e.Docker.ContainerAttach(
ctx,
dlci,
types.ContainerAttachOptions{
Stream: true,
Stdout: true,
Stderr: true,
},
)
if err != nil {
return "", err
}
go func() {
defer hijackedResp.Close()
io.Copy(os.Stdout, hijackedResp.Reader)
}()
// okay lets start the container...
err = e.Docker.ContainerStart(
ctx,
dlci,
types.ContainerStartOptions{},
)
if err != nil {
return "", err
}
// lets return the DockerLang Container Id for this computation
return dlci, nil
}