Skip to content

Commit

Permalink
Merge pull request #236 from timoreimann/upstream-expose-ready-state
Browse files Browse the repository at this point in the history
Implement and expose ready state.
  • Loading branch information
j1n6 authored May 28, 2017
2 parents 61c92ca + 767b0d1 commit 7e7071d
Show file tree
Hide file tree
Showing 2 changed files with 502 additions and 113 deletions.
258 changes: 146 additions & 112 deletions services/marathon/marathon.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,26 @@ package marathon

import (
"encoding/json"
"github.com/QubitProducts/bamboo/configuration"
"io/ioutil"
"log"
"net/http"
"sort"
"strings"

"github.com/QubitProducts/bamboo/configuration"
)

const taskStateRunning = "TASK_RUNNING"

// Describes an app process running
type Task struct {
Id string
Host string
Port int
Ports []int
Alive bool
State string
Ready bool
}

// A health check on the application
Expand All @@ -36,6 +42,7 @@ type App struct {
HealthCheckPath string
HealthCheckProtocol string
HealthChecks []HealthCheck
ReadinessCheckPath string
Tasks []Task
ServicePort int
ServicePorts []int
Expand All @@ -58,12 +65,6 @@ func (slice AppList) Swap(i, j int) {
slice[i], slice[j] = slice[j], slice[i]
}

type marathonTaskList []marathonTask

type marathonTasks struct {
Tasks marathonTaskList `json:"tasks"`
}

type HealthCheckResult struct {
Alive bool
}
Expand All @@ -74,12 +75,15 @@ type marathonTask struct {
Host string
Ports []int
ServicePorts []int
State string
StartedAt string
StagedAt string
Version string
HealthCheckResults []HealthCheckResult
}

type marathonTaskList []marathonTask

func (slice marathonTaskList) Len() int {
return len(slice)
}
Expand All @@ -97,11 +101,15 @@ type marathonApps struct {
}

type marathonApp struct {
Id string `json:"id"`
HealthChecks []marathonHealthCheck `json:"healthChecks"`
Ports []int `json:"ports"`
Env map[string]string `json:"env"`
Labels map[string]string `json:"labels"`
Id string `json:"id"`
HealthChecks []marathonHealthCheck `json:"healthChecks"`
Ports []int `json:"ports"`
Env map[string]string `json:"env"`
Labels map[string]string `json:"labels"`
Deployments []deployment `json:"deployments"`
Tasks marathonTaskList `json:"tasks"`
ReadinessChecks []marathonReadinessCheck `json:"readinessChecks"`
ReadinessCheckResults []readinessCheckResult `json:"readinessCheckResults"`
}

type marathonHealthCheck struct {
Expand All @@ -110,110 +118,92 @@ type marathonHealthCheck struct {
PortIndex int `json:"portIndex"`
}

func fetchMarathonApps(endpoint string, conf *configuration.Configuration) (map[string]marathonApp, error) {
client := &http.Client{}
req, _ := http.NewRequest("GET", endpoint+"/v2/apps", nil)
req.Header.Add("Accept", "application/json")
req.Header.Add("Content-Type", "application/json")
if len(conf.Marathon.User) > 0 && len(conf.Marathon.Password) > 0 {
req.SetBasicAuth(conf.Marathon.User, conf.Marathon.Password)
}
response, err := client.Do(req)
type marathonReadinessCheck struct {
Path string `json:"path"`
}

if err != nil {
return nil, err
}
type deployment struct {
ID string `json:"id"`
}

defer response.Body.Close()
var appResponse marathonApps
type readinessCheckResult struct {
TaskID string `json:"taskId"`
Ready bool `json:"ready"`
}

contents, err := ioutil.ReadAll(response.Body)
if err != nil {
return nil, err
}
/*
Apps returns a struct that describes Marathon current app and their
sub tasks information.
err = json.Unmarshal(contents, &appResponse)
if err != nil {
return nil, err
}
Parameters:
endpoint: Marathon HTTP endpoint, e.g. http://localhost:8080
*/
func FetchApps(maraconf configuration.Marathon, conf *configuration.Configuration) (AppList, error) {
var marathonApps []marathonApp
var err error

dataById := map[string]marathonApp{}
// Try all configured endpoints until one succeeds or we exhaust the list,
// whichever comes first.
for _, url := range maraconf.Endpoints() {
marathonApps, err = fetchMarathonApps(url, conf)
if err == nil {
for _, marathonApp := range marathonApps {
sort.Sort(marathonApp.Tasks)
}
apps := createApps(marathonApps)
sort.Sort(apps)
return apps, nil
}
}
// return last error
return nil, err
}

for _, appConfig := range appResponse.Apps {
dataById[appConfig.Id] = appConfig
func fetchMarathonApps(endpoint string, conf *configuration.Configuration) ([]marathonApp, error) {
var appResponse marathonApps
if err := parseJSON(endpoint+"/v2/apps?embed=app.tasks&embed=app.deployments&embed=app.readiness", conf, &appResponse); err != nil {
return nil, err
}

return dataById, nil
return appResponse.Apps, nil
}

func fetchTasks(endpoint string, conf *configuration.Configuration) (map[string]marathonTaskList, error) {
func parseJSON(url string, conf *configuration.Configuration, out interface{}) error {
client := &http.Client{}
req, _ := http.NewRequest("GET", endpoint+"/v2/tasks", nil)
req, _ := http.NewRequest("GET", url, nil)
req.Header.Add("Accept", "application/json")
req.Header.Add("Content-Type", "application/json")
if len(conf.Marathon.User) > 0 && len(conf.Marathon.Password) > 0 {
req.SetBasicAuth(conf.Marathon.User, conf.Marathon.Password)
}
response, err := client.Do(req)

var tasks marathonTasks

response, err := client.Do(req)
if err != nil {
return nil, err
return err
}

contents, err := ioutil.ReadAll(response.Body)
defer response.Body.Close()
if err != nil {
return nil, err
}

err = json.Unmarshal(contents, &tasks)
contents, err := ioutil.ReadAll(response.Body)
if err != nil {
return nil, err
}

taskList := tasks.Tasks
sort.Sort(taskList)

tasksById := map[string]marathonTaskList{}
for _, task := range taskList {
if tasksById[task.AppId] == nil {
tasksById[task.AppId] = marathonTaskList{}
}
tasksById[task.AppId] = append(tasksById[task.AppId], task)
return err
}

for _, task_list := range tasksById {
sort.Sort(task_list)
err = json.Unmarshal(contents, &out)
if err != nil {
return err
}

return tasksById, nil
return nil
}

func calculateTaskHealth(healthCheckResults []HealthCheckResult, healthChecks []marathonHealthCheck) bool {
//If we don't even have health check results for every health check, don't count the task as healthy
if len(healthChecks) > len(healthCheckResults) {
return false
}
for _, healthCheck := range healthCheckResults {
if !healthCheck.Alive {
return false
}
}
return true
}

func createApps(tasksById map[string]marathonTaskList, marathonApps map[string]marathonApp) AppList {
func createApps(marathonApps []marathonApp) AppList {
apps := AppList{}

for appId, mApp := range marathonApps {

for _, mApp := range marathonApps {
appId := mApp.Id
// Try to handle old app id format without slashes
appPath := appId
if !strings.HasPrefix(appId, "/") {
appPath = "/" + appId
}
appPath := "/" + strings.TrimPrefix(mApp.Id, "/")

// build App from marathonApp
app := App{
Expand All @@ -222,6 +212,7 @@ func createApps(tasksById map[string]marathonTaskList, marathonApps map[string]m
EscapedId: strings.Replace(appId, "/", "::", -1),
HealthCheckPath: parseHealthCheckPath(mApp.HealthChecks),
HealthCheckProtocol: parseHealthCheckProtocol(mApp.HealthChecks),
ReadinessCheckPath: parseReadinessCheckPath(mApp.ReadinessChecks),
Env: mApp.Env,
Labels: mApp.Labels,
SplitId: strings.Split(appId, "/"),
Expand All @@ -244,14 +235,16 @@ func createApps(tasksById map[string]marathonTaskList, marathonApps map[string]m

// build Tasks for this App
tasks := []Task{}
for _, mTask := range tasksById[appId] {
for _, mTask := range mApp.Tasks {
if len(mTask.Ports) > 0 {
t := Task{
Id: mTask.Id,
Host: mTask.Host,
Port: mTask.Ports[0],
Ports: mTask.Ports,
Alive: calculateTaskHealth(mTask.HealthCheckResults, mApp.HealthChecks),
State: mTask.State,
Ready: calculateReadiness(mTask, mApp),
}
tasks = append(tasks, t)
}
Expand Down Expand Up @@ -297,41 +290,82 @@ func parseHealthCheckProtocol(checks []marathonHealthCheck) string {
return ""
}

/*
Apps returns a struct that describes Marathon current app and their
sub tasks information.
Parameters:
endpoint: Marathon HTTP endpoint, e.g. http://localhost:8080
*/
func FetchApps(maraconf configuration.Marathon, conf *configuration.Configuration) (AppList, error) {
func parseReadinessCheckPath(checks []marathonReadinessCheck) string {
if len(checks) > 0 {
return checks[0].Path
}

var applist AppList
var err error
return ""
}

// try all configured endpoints until one succeeds
for _, url := range maraconf.Endpoints() {
applist, err = _fetchApps(url, conf)
if err == nil {
return applist, err
func calculateTaskHealth(healthCheckResults []HealthCheckResult, healthChecks []marathonHealthCheck) bool {
// If we don't even have health check results for every health check, don't
// count the task as healthy.
if len(healthChecks) > len(healthCheckResults) {
return false
}
for _, healthCheck := range healthCheckResults {
if !healthCheck.Alive {
return false
}
}
// return last error
return nil, err
return true
}

func _fetchApps(url string, conf *configuration.Configuration) (AppList, error) {
tasks, err := fetchTasks(url, conf)
if err != nil {
return nil, err
func calculateReadiness(task marathonTask, maraApp marathonApp) bool {
switch {
case task.State != taskStateRunning:
// By definition, a task not running cannot be ready.
log.Printf("task %s app %s: ready = false [task state %s != required state %s]", task.Id, maraApp.Id, task.State, taskStateRunning)
return false

case len(maraApp.Deployments) == 0:
// We only care about readiness during deployments; post-deployment readiness
// should be covered by a separate HAProxy health check definition.
log.Printf("task %s app %s: ready = true [no deployment ongoing]", task.Id, maraApp.Id)
return true

case len(maraApp.ReadinessChecks) == 0:
// Applications without configured readiness checks are always considered
// ready.
log.Printf("task %s app %s: ready = true [no readiness checks on app]", task.Id, maraApp.Id)
return true
}

marathonApps, err := fetchMarathonApps(url, conf)
if err != nil {
return nil, err
// Loop through all readiness check results and return the results for
// matching task IDs.
for _, readinessCheckResult := range maraApp.ReadinessCheckResults {
if readinessCheckResult.TaskID == task.Id {
log.Printf("task %s app %s: ready = %t [evaluating readiness check ready state]", task.Id, maraApp.Id, readinessCheckResult.Ready)
return readinessCheckResult.Ready
}
}

apps := createApps(tasks, marathonApps)
sort.Sort(apps)
return apps, nil
// There's a corner case sometimes hit where the first new task of a
// deployment goes from TASK_STAGING to TASK_RUNNING without a corresponding
// health check result being included in the API response. This only happens
// in a very short (yet unlucky) time frame and does not repeat for subsequent
// tasks of the same deployment.
// We identify this situation by checking that we are looking at a part of the
// deployment representing a new task (i.e., it has the most recent version
// timestamp while other timestamps exist as well). If that's the case, we
// err on the side of caution and mark it as non-ready.
versions := map[string]bool{}
var maxVersion string
for _, task := range maraApp.Tasks {
versions[task.Version] = true
if maxVersion == "" || maxVersion < task.Version {
maxVersion = task.Version
}
}
if len(versions) > 1 && task.Version == maxVersion {
log.Printf("task %s app %s: ready = false [new task with version %s not included in readiness check results yet]", task.Id, maraApp.Id, maxVersion)
return false
}

// Finally, we can be certain this task is not part of the deployment (i.e.,
// it's an old task that's going to transition into the TASK_KILLING and/or
// TASK_KILLED state as new tasks' readiness checks gradually turn green.)
log.Printf("task %s app %s: ready = true [task not involved in deployment]", task.Id, maraApp.Id)
return true
}
Loading

0 comments on commit 7e7071d

Please sign in to comment.