Skip to content

Commit

Permalink
fetch service event logs.
Browse files Browse the repository at this point in the history
  • Loading branch information
fujiwara committed Dec 3, 2021
1 parent e41be6f commit 19e8c70
Showing 1 changed file with 42 additions and 8 deletions.
50 changes: 42 additions & 8 deletions tracer.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,11 @@ type Tracer struct {
logs *cloudwatchlogs.CloudWatchLogs
timeline *Timeline
Duration time.Duration

headBegin time.Time
headEnd time.Time
tailBegin time.Time
tailEnd time.Time
}

func (t *Tracer) AddEvent(ts *time.Time, source, message string) {
Expand Down Expand Up @@ -129,6 +134,21 @@ func (t *Tracer) traceTask(cluster string, taskID string) (*ecs.Task, error) {
return nil, errors.New("no tasks found")
}
task := res.Tasks[0]

t.headBegin = task.CreatedAt.Add(-t.Duration)
t.headEnd = task.CreatedAt.Add(t.Duration)
if task.StoppingAt != nil {
t.tailBegin = task.StoppingAt.Add(-t.Duration)
} else {
t.tailBegin = time.Now().Add(-t.Duration)
}
t.tailEnd = time.Now()

taskGroup := strings.SplitN(aws.StringValue(task.Group), ":", 2)
if len(taskGroup) == 2 && taskGroup[0] == "service" {
t.fetchServiceEvents(cluster, taskGroup[1])
}

t.AddEvent(task.CreatedAt, "TASK", "Created")
t.AddEvent(task.ConnectivityAt, "TASK", "Connected")
t.AddEvent(task.StartedAt, "TASK", "Started")
Expand Down Expand Up @@ -188,16 +208,10 @@ func (t *Tracer) traceLogs(task *ecs.Task) error {
go func() {
defer wg.Done()
// head of logs
t.fetchLogs(containerName, logGroup, logStream, nil, aws.Time(task.CreatedAt.Add(t.Duration)))
t.fetchLogs(containerName, logGroup, logStream, &t.headBegin, &t.headEnd)

// tail of logs
var end time.Time
if task.StoppingAt != nil {
end = *task.StoppingAt
} else {
end = time.Now()
}
t.fetchLogs(containerName, logGroup, logStream, aws.Time(end.Add(-t.Duration)), nil)
t.fetchLogs(containerName, logGroup, logStream, &t.tailBegin, nil)
}()
}
wg.Wait()
Expand All @@ -214,6 +228,26 @@ func taskID(task *ecs.Task) string {
return an[strings.LastIndex(an, "/")+1:]
}

func (t *Tracer) fetchServiceEvents(cluster, service string) error {
res, err := t.ecs.DescribeServicesWithContext(t.ctx, &ecs.DescribeServicesInput{
Cluster: &cluster,
Services: []*string{&service},
})
if err != nil {
return errors.Wrap(err, "failed to describe services")
}
if len(res.Services) == 0 {
return errors.New("no services found")
}
for _, e := range res.Services[0].Events {
ts := *e.CreatedAt
if ts.After(t.headBegin) && ts.Before(t.headEnd) || ts.After(t.tailBegin) && ts.Before(t.tailEnd) {
t.AddEvent(e.CreatedAt, "SERVICE", *e.Message)
}
}
return nil
}

func (t *Tracer) fetchLogs(containerName, group, stream string, from, to *time.Time) error {
var nextToken *string
in := &cloudwatchlogs.GetLogEventsInput{
Expand Down

0 comments on commit 19e8c70

Please sign in to comment.