Skip to content

Commit

Permalink
add -duration option
Browse files Browse the repository at this point in the history
  • Loading branch information
fujiwara committed Nov 30, 2021
1 parent 288cfeb commit c06aaf8
Show file tree
Hide file tree
Showing 4 changed files with 92 additions and 51 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,6 @@

*~
.envrc
cmd/lambroll/lambroll
cmd/tracer/tracer
dist/
tracer
5 changes: 5 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
tracer: *.go *.go cmd/tracer/*
go build -o $@ cmd/tracer/main.go

install:
go install github.com/fujiwara/tracer/cmd/tracer
12 changes: 9 additions & 3 deletions cmd/tracer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@ package main

import (
"context"
"flag"
"fmt"
"os"
"time"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
Expand All @@ -19,14 +21,18 @@ func main() {
if err != nil {
panic(err)
}
t, err := tracer.New(ctx, sess)
t, err := tracer.NewWithSession(ctx, sess)
if err != nil {
panic(err)
}
if len(os.Args) < 2 {
flag.DurationVar(&t.Duration, "duration", time.Minute, "fetch logs duration from created / before stopping")
flag.Parse()

args := flag.Args()
if len(args) < 2 {
usage()
}
if err := t.Run(os.Args[1], os.Args[2]); err != nil {
if err := t.Run(args[0], args[1]); err != nil {
fmt.Fprintln(os.Stderr, err)
os.Exit(1)
}
Expand Down
123 changes: 76 additions & 47 deletions tracer.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"io"
"log"
"os"
"sort"
"strings"
Expand All @@ -21,11 +22,14 @@ const timeFormat = "2006-01-02T15:04:05.000Z07:00"

var epochBase = time.Unix(0, 0)

var MaxFetchLogs = 100

type Tracer struct {
ctx context.Context
ecs *ecs.ECS
logs *cloudwatchlogs.CloudWatchLogs
timeline *Timeline
Duration time.Duration
}

func (t *Tracer) AddEvent(ts *time.Time, source, message string) {
Expand Down Expand Up @@ -78,16 +82,20 @@ func (e *TimeLineEvent) String() string {
return fmt.Sprintf("%s\t%s\t%s\n", ts.Format(timeFormat), e.Source, e.Message)
}

func New(ctx context.Context, sess *session.Session) (*Tracer, error) {
tracer := &Tracer{
func NewWithSession(ctx context.Context, sess *session.Session) (*Tracer, error) {
return New(ctx, ecs.New(sess), cloudwatchlogs.New(sess))
}

func New(ctx context.Context, ecsSv *ecs.ECS, logsSv *cloudwatchlogs.CloudWatchLogs) (*Tracer, error) {
return &Tracer{
ctx: ctx,
ecs: ecs.New(sess),
logs: cloudwatchlogs.New(sess),
ecs: ecsSv,
logs: logsSv,
timeline: &Timeline{
seen: make(map[string]bool),
},
}
return tracer, nil
Duration: time.Minute,
}, nil
}

func newEvent(ts *time.Time, src, msg string) *TimeLineEvent {
Expand Down Expand Up @@ -136,6 +144,25 @@ func (t *Tracer) traceTask(cluster string, taskID string) (*ecs.Task, error) {
t.AddEvent(task.StoppingAt, "TASK", "StoppedCode:"+*task.StopCode)
}
t.AddEvent(task.ExecutionStoppedAt, "TASK", "Execution stopped")

for _, c := range task.Containers {
containerName := *c.Name
msg := fmt.Sprintf(*c.LastStatus)
if c.ExitCode != nil {
msg += fmt.Sprintf(" (exit code: %d)", *c.ExitCode)
}
if c.Reason != nil {
msg += fmt.Sprintf(" (reason: %s)", *c.Reason)
}
var ts *time.Time
if aws.StringValue(c.LastStatus) == "RUNNING" {
ts = now()
} else {
ts = task.StoppedAt
}
t.AddEvent(ts, "CONTAINER:"+containerName, msg)
}

return task, nil
}

Expand All @@ -161,26 +188,19 @@ func (t *Tracer) traceLogs(task *ecs.Task) error {
wg.Add(1)
go func() {
defer wg.Done()
t.followLogs(containerName, logGroup, logStream, task.CreatedAt, task.StoppingAt)
// head of logs
t.fetchLogs(containerName, logGroup, logStream, nil, aws.Time(task.CreatedAt.Add(t.Duration)))

// tail of logs
var end time.Time
if task.StoppingAt != nil {
end = *task.StoppingAt
} else {
end = time.Now()
}
t.fetchLogs(containerName, logGroup, logStream, aws.Time(end.Add(-t.Duration)), nil)
}()
}
for _, c := range task.Containers {
containerName := *c.Name
msg := fmt.Sprintf(*c.LastStatus)
if c.ExitCode != nil {
msg += fmt.Sprintf(" (exit code: %d)", *c.ExitCode)
}
if c.Reason != nil {
msg += fmt.Sprintf(" (reason: %s)", *c.Reason)
}
var ts *time.Time
if aws.StringValue(c.LastStatus) == "RUNNING" {
ts = now()
} else {
ts = task.StoppedAt
}
t.AddEvent(ts, "CONTAINER:"+containerName, msg)
}
wg.Wait()
return nil
}
Expand All @@ -195,36 +215,45 @@ func taskID(task *ecs.Task) string {
return an[strings.LastIndex(an, "/")+1:]
}

func (t *Tracer) followLogs(containerName, group, stream string, start, end *time.Time) error {
res, err := t.logs.GetLogEventsWithContext(t.ctx, &cloudwatchlogs.GetLogEventsInput{
func (t *Tracer) fetchLogs(containerName, group, stream string, from, to *time.Time) error {
var nextToken *string
in := &cloudwatchlogs.GetLogEventsInput{
LogGroupName: aws.String(group),
LogStreamName: aws.String(stream),
StartTime: aws.Int64(timeToInt64msec(*start)),
Limit: aws.Int64(100),
})
if err != nil {
return err
}
for _, e := range res.Events {
ts := msecToTime(aws.Int64Value(e.Timestamp))
t.AddEvent(&ts, "CONTAINER:"+containerName, aws.StringValue(e.Message))
if from != nil {
in.StartTime = aws.Int64(timeToInt64msec(*from))
} else {
in.StartFromHead = aws.Bool(true)
}
if end == nil {
return nil
if to != nil {
in.EndTime = aws.Int64(timeToInt64msec(*to))
}

res, err = t.logs.GetLogEventsWithContext(t.ctx, &cloudwatchlogs.GetLogEventsInput{
LogGroupName: aws.String(group),
LogStreamName: aws.String(stream),
StartTime: aws.Int64(timeToInt64msec(*end) - 300*1000), // 5 minutes
Limit: aws.Int64(1000),
})
if err != nil {
return err
}
for _, e := range res.Events {
ts := msecToTime(aws.Int64Value(e.Timestamp))
t.AddEvent(&ts, "CONTAINER:"+containerName, aws.StringValue(e.Message))
fetched := 0
for {
if nextToken != nil {
in.NextToken = nextToken
in.StartFromHead = nil
}
log.Printf("fetching logs %s", in.GoString())
res, err := t.logs.GetLogEventsWithContext(t.ctx, in)
if err != nil {
return err
}
fetched++
for _, e := range res.Events {
ts := msecToTime(aws.Int64Value(e.Timestamp))
t.AddEvent(&ts, "CONTAINER:"+containerName, aws.StringValue(e.Message))
}
if aws.StringValue(nextToken) == aws.StringValue(res.NextForwardToken) {
break
}
if fetched >= MaxFetchLogs {
break
}
nextToken = res.NextForwardToken
}
return nil
}
Expand Down

0 comments on commit c06aaf8

Please sign in to comment.