Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added option to run existing swarm service #49

Open
wants to merge 23 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 16 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,27 @@ command = touch /tmp/example
schedule = @hourly
command = touch /tmp/example


[job-service-run "service-executed-on-new-container"]
schedule = 0,20,40 * * * *
image = ubuntu
network = swarm_network
command = touch /tmp/example

[job-service-run "job-executed-on-existing-service"]
schedule = 0,20,40 * * * *
service = my-service
```

#### Docker labels configurations

In order to use this type of configurations, ofelia need access to docker socket.

```sh
docker run -it --rm \
-v /var/run/docker.sock:/var/run/docker.sock:ro \
--label ofelia.job-local.my-test-job.schedule="@every 5s" \
--label ofelia.job-local.my-test-job.command="date" \
mcuadros/ofelia:latest daemon --docker
```

#### Docker labels configurations
Expand Down
145 changes: 130 additions & 15 deletions core/runservice.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,30 +20,62 @@ type RunServiceJob struct {
Delete bool `default:"true"`
Image string
Network string
Service string
}

func NewRunServiceJob(c *docker.Client) *RunServiceJob {
return &RunServiceJob{Client: c}
}

// Main method for running a service-based job
// If the service has been provided it will start a new task for the existing service
// Otherwise it will create a new service based on the image and other parameters
func (j *RunServiceJob) Run(ctx *Context) error {
if err := j.pullImage(); err != nil {
return err

if j.Image != "" {
if err := j.pullImage(); err != nil {
return err
}
}

svc, err := j.buildService()
var svcID string
if j.Service == "" {
svc, err := j.buildService()

if err != nil {
return err
}
if err != nil {
return err
}

svcID = svc.ID
ctx.Logger.Noticef("Created service %s for job %s\n", svcID, j.Name)
} else {
svc, err := j.inspectService(ctx, j.Service)
if err != nil {
return err
}
svcID = svc.ID
ctx.Logger.Noticef("Found service %s for job %s\n", svcID, j.Name)

ctx.Logger.Noticef("Created service %s for job %s\n", svc.ID, j.Name)
_, err = j.scaleService(ctx, svcID, false)
if err != nil {
return err
}

_, err = j.scaleService(ctx, svcID, true)
if err != nil {
return err
}
}

if err := j.watchContainer(ctx, svc.ID); err != nil {
if err := j.watchContainer(ctx, svcID); err != nil {
return err
}

return j.deleteService(ctx, svc.ID)
if j.Service == "" {
return j.deleteService(ctx, svcID)
} else {
return nil
}
}

func (j *RunServiceJob) pullImage() error {
Expand Down Expand Up @@ -96,6 +128,48 @@ func (j *RunServiceJob) buildService() (*swarm.Service, error) {
return svc, err
}

// Scale an existing service one replica up or down
func (j *RunServiceJob) scaleService(ctx *Context, svcID string, up bool) (*swarm.Service, error) {
svc, err := j.inspectService(ctx, j.Service)
if err != nil {
return nil, err
}

replicas := *svc.Spec.Mode.Replicated.Replicas
if up {
replicas += 1
} else {
// If there already 0 replicas of a service, there is no need to scale down
if replicas == 0 {
return svc, err
}
replicas -= 1
}

updateSvcOpts := docker.UpdateServiceOptions{}

updateSvcOpts.Name = svc.Spec.Name
updateSvcOpts.Version = svc.Version.Index

// The old spec is required, otherwise defaults will override the service
updateSvcOpts.ServiceSpec = svc.Spec

updateSvcOpts.Mode.Replicated =
&swarm.ReplicatedService{
Replicas: &replicas,
}

// Do the actual scaling
err = j.Client.UpdateService(svcID, updateSvcOpts)
if err != nil {
return nil, err
}

// Give docker the time to do the scaling
time.Sleep(time.Millisecond * 1000)
return svc, err
}

const (

// TODO are these const defined somewhere in the docker API?
Expand All @@ -110,9 +184,9 @@ func (j *RunServiceJob) watchContainer(ctx *Context, svcID string) error {

ctx.Logger.Noticef("Checking for service ID %s (%s) termination\n", svcID, j.Name)

svc, err := j.Client.InspectService(svcID)
svc, err := j.inspectService(ctx, svcID)
if err != nil {
return fmt.Errorf("Failed to inspect service %s: %s", svcID, err.Error())
return err
}

// On every tick, check if all the services have completed, or have error out
Expand All @@ -123,12 +197,14 @@ func (j *RunServiceJob) watchContainer(ctx *Context, svcID string) error {
defer wg.Done()
for _ = range svcChecker.C {

// TODO will not work with longer existing services
// TODO doesn't work
if svc.CreatedAt.After(time.Now().Add(maxProcessDuration)) {
err = ErrMaxTimeRunning
return
}

taskExitCode, found := j.findtaskstatus(ctx, svc.ID)
taskExitCode, found := j.findTaskStatus(ctx, svc.ID)

if found {
exitCode = taskExitCode
Expand All @@ -140,19 +216,28 @@ func (j *RunServiceJob) watchContainer(ctx *Context, svcID string) error {
wg.Wait()

ctx.Logger.Noticef("Service ID %s (%s) has completed with exit code %d\n", svcID, j.Name, exitCode)

switch exitCode {
case 0:
return nil
case -1:
return ErrUnexpected
default:
return fmt.Errorf("error non-zero exit code: %d", exitCode)
}
return err
}

func (j *RunServiceJob) findtaskstatus(ctx *Context, taskID string) (int, bool) {
func (j *RunServiceJob) findTaskStatus(ctx *Context, svcID string) (int, bool) {
taskFilters := make(map[string][]string)
taskFilters["service"] = []string{taskID}
taskFilters["service"] = []string{svcID}

tasks, err := j.Client.ListTasks(docker.ListTasksOptions{
Filters: taskFilters,
})

if err != nil {
ctx.Logger.Errorf("Failed to find task ID %s. Considering the task terminated: %s\n", taskID, err.Error())
ctx.Logger.Errorf("Failed to find tasks fo service %s. Considering the task terminated: %s\n", svcID, err.Error())
return 0, false
}

Expand Down Expand Up @@ -186,6 +271,20 @@ func (j *RunServiceJob) findtaskstatus(ctx *Context, taskID string) (int, bool)
if exitCode == 0 && task.Status.State == swarm.TaskStateRejected {
exitCode = 255 // force non-zero exit for task rejected
}

err = j.Client.GetServiceLogs(docker.LogsServiceOptions{
Service: svcID,
Stderr: true,
Stdout: true,
Follow: false,
ErrorStream: ctx.Execution.ErrorStream,
OutputStream: ctx.Execution.OutputStream,
})
if err != nil {
ctx.Logger.Errorf("Error getting logs for service: %s - %s \n", svcID, err.Error())
return 0, false
}

done = true
break
}
Expand All @@ -211,3 +310,19 @@ func (j *RunServiceJob) deleteService(ctx *Context, svcID string) error {
return err

}

// Convenience method for inspecting a service
func (j *RunServiceJob) inspectService(ctx *Context, svcID string) (*swarm.Service, error) {
var err error
var svc *swarm.Service
if j.Service == "" {
svc, err = j.Client.InspectService(svcID)
} else {
svc, err = j.Client.InspectService(j.Service)
}

if err != nil {
return nil, fmt.Errorf("Failed to inspect service %s: %s", j.Service, err.Error())
}
return svc, err
}