Skip to content

Commit

Permalink
Merge pull request #4 from fujiwara/output
Browse files Browse the repository at this point in the history
add -sns option
  • Loading branch information
fujiwara authored Feb 16, 2022
2 parents e761ecf + 0a8616e commit 82cd3a3
Show file tree
Hide file tree
Showing 3 changed files with 112 additions and 37 deletions.
20 changes: 10 additions & 10 deletions cmd/tracer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,11 @@ func main() {
panic(err)
}
var showVersion bool
flag.DurationVar(&t.Duration, "duration", time.Minute, "fetch logs duration from created / before stopping")
opt := tracer.RunOption{}
flag.DurationVar(&opt.Duration, "duration", time.Minute, "fetch logs duration from created / before stopping")
flag.BoolVar(&showVersion, "version", false, "show the version")
flag.BoolVar(&opt.Stdout, "stdout", true, "output to stdout")
flag.StringVar(&opt.SNSTopicArn, "sns", "", "SNS topic ARN")
flag.VisitAll(envToFlag)
flag.Parse()

Expand All @@ -51,17 +54,14 @@ func main() {
}

if onLambda() {
lambda.Start(t.LambdaHandler)
lambda.Start(t.LambdaHandlerFunc(&opt))
return
}
args := flag.Args()
switch len(args) {
case 0:
args = []string{"", ""}
case 1:
args = append(args, "")
}
if err := t.Run(args[0], args[1]); err != nil {

args := make([]string, 2)
copy(args, flag.Args())

if err := t.Run(args[0], args[1], &opt); err != nil {
fmt.Fprintln(os.Stderr, err)
os.Exit(1)
}
Expand Down
14 changes: 8 additions & 6 deletions lambda.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,13 @@ import (
"fmt"
)

func (t *Tracer) LambdaHandler(ctx context.Context, event *ECSTaskEvent) error {
fmt.Println(event.String())
lastStatus := event.Detail.LastStatus
if lastStatus != "STOPPED" {
return nil
func (t *Tracer) LambdaHandlerFunc(opt *RunOption) func(ctx context.Context, event *ECSTaskEvent) error {
return func(ctx context.Context, event *ECSTaskEvent) error {
fmt.Println(event.String())
lastStatus := event.Detail.LastStatus
if lastStatus != "STOPPED" {
return nil
}
return t.Run(event.Detail.ClusterArn, event.Detail.TaskArn, opt)
}
return t.Run(event.Detail.ClusterArn, event.Detail.TaskArn)
}
115 changes: 94 additions & 21 deletions tracer.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package tracer

import (
"bytes"
"context"
"fmt"
"io"
Expand All @@ -14,10 +15,15 @@ import (
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/cloudwatchlogs"
"github.com/aws/aws-sdk-go/service/ecs"
"github.com/aws/aws-sdk-go/service/sns"
"github.com/pkg/errors"
)

const timeFormat = "2006-01-02T15:04:05.000Z07:00"
const (
snsMaxPayloadSize = 256 * 1024
)

var TimeFormat = "2006-01-02T15:04:05.000Z07:00"

var epochBase = time.Unix(0, 0)

Expand All @@ -27,15 +33,17 @@ type Tracer struct {
ctx context.Context
ecs *ecs.ECS
logs *cloudwatchlogs.CloudWatchLogs
sns *sns.SNS
timeline *Timeline
Duration time.Duration
w io.Writer
buf *bytes.Buffer

now time.Time
headBegin time.Time
headEnd time.Time
tailBegin time.Time
tailEnd time.Time

option *RunOption
}

func (t *Tracer) AddEvent(ts *time.Time, source, message string) {
Expand Down Expand Up @@ -85,23 +93,23 @@ type TimeLineEvent struct {

func (e *TimeLineEvent) String() string {
ts := e.Timestamp.In(time.Local)
return fmt.Sprintf("%s\t%s\t%s\n", ts.Format(timeFormat), e.Source, e.Message)
return fmt.Sprintf("%s\t%s\t%s\n", ts.Format(TimeFormat), e.Source, e.Message)
}

func NewWithSession(ctx context.Context, sess *session.Session) (*Tracer, error) {
return New(ctx, ecs.New(sess), cloudwatchlogs.New(sess))
func New(ctx context.Context) (*Tracer, error) {
return NewWithSession(ctx, session.Must(session.NewSession()))
}

func New(ctx context.Context, ecsSv *ecs.ECS, logsSv *cloudwatchlogs.CloudWatchLogs) (*Tracer, error) {
func NewWithSession(ctx context.Context, sess *session.Session) (*Tracer, error) {
return &Tracer{
ctx: ctx,
ecs: ecsSv,
logs: logsSv,
ecs: ecs.New(sess),
logs: cloudwatchlogs.New(sess),
sns: sns.New(sess),
timeline: &Timeline{
seen: make(map[string]bool),
},
w: os.Stdout,
Duration: time.Minute,
buf: new(bytes.Buffer),
}, nil
}

Expand All @@ -113,8 +121,17 @@ func newEvent(ts *time.Time, src, msg string) *TimeLineEvent {
}
}

func (t *Tracer) Run(cluster string, taskID string) error {
type RunOption struct {
Stdout bool
SNSTopicArn string
Duration time.Duration
}

func (t *Tracer) Run(cluster string, taskID string, opt *RunOption) error {
t.now = time.Now()
t.option = opt

defer func() { t.report(cluster, taskID) }()

if cluster == "" {
return t.listClusters()
Expand All @@ -131,10 +148,58 @@ func (t *Tracer) Run(cluster string, taskID string) error {
if err := t.traceLogs(task); err != nil {
return err
}
t.timeline.Print(t.w)

return nil
}

func (t *Tracer) report(cluster, taskID string) {
opt := t.option
if opt.Stdout {
fmt.Fprintln(os.Stdout, subject(cluster, taskID))
if _, err := t.WriteTo(os.Stdout); err != nil {
fmt.Fprintln(os.Stderr, err)
}
}
if opt.SNSTopicArn != "" {
if err := t.Publish(opt.SNSTopicArn, cluster, taskID); err != nil {
fmt.Fprintln(os.Stderr, err)
}
}
}

func (t *Tracer) WriteTo(w io.Writer) (int64, error) {
n, err := io.WriteString(w, t.buf.String())
return int64(n), err
}

func subject(cluster, taskID string) string {
s := "Tracer:"
if taskID != "" {
s += " " + taskID
} else if cluster != "" {
s += " tasks"
}
if cluster != "" {
s += " on " + cluster
} else {
s += " clusters"
}
return s
}

func (t *Tracer) Publish(topicArn, cluster, taskID string) error {
msg := t.buf.String()
if len(msg) >= snsMaxPayloadSize {
msg = msg[:snsMaxPayloadSize]
}
_, err := t.sns.PublishWithContext(t.ctx, &sns.PublishInput{
Message: &msg,
Subject: aws.String(subject(cluster, taskID)),
TopicArn: &topicArn,
})
return err
}

func (t *Tracer) traceTask(cluster string, taskID string) (*ecs.Task, error) {
res, err := t.ecs.DescribeTasksWithContext(t.ctx, &ecs.DescribeTasksInput{
Cluster: &cluster,
Expand Down Expand Up @@ -192,6 +257,8 @@ func (t *Tracer) traceTask(cluster string, taskID string) (*ecs.Task, error) {
}

func (t *Tracer) traceLogs(task *ecs.Task) error {
defer t.timeline.Print(t.buf)

res, err := t.ecs.DescribeTaskDefinitionWithContext(t.ctx, &ecs.DescribeTaskDefinitionInput{
TaskDefinition: task.TaskDefinitionArn,
})
Expand Down Expand Up @@ -311,7 +378,10 @@ func (t *Tracer) listClusters() error {
clusters = append(clusters, arnToName(aws.StringValue(c)))
}
sort.Strings(clusters)
fmt.Println(strings.Join(clusters, "\n"))
for _, c := range clusters {
t.buf.WriteString(c)
t.buf.WriteByte('\n')
}
return nil
}

Expand All @@ -337,7 +407,8 @@ func (t *Tracer) listTasks(cluster, status string) error {
return errors.Wrap(err, "failed to describe tasks")
}
for _, task := range res.Tasks {
fmt.Println(strings.Join(taskToColumns(task), "\t"))
t.buf.WriteString(strings.Join(taskToColumns(task), "\t"))
t.buf.WriteRune('\n')
}
if nextToken = listRes.NextToken; nextToken == nil {
break
Expand All @@ -347,20 +418,22 @@ func (t *Tracer) listTasks(cluster, status string) error {
}

func (t *Tracer) setBoundaries(task *ecs.Task) {
t.headBegin = task.CreatedAt.Add(-t.Duration)
d := t.option.Duration

t.headBegin = task.CreatedAt.Add(-d)
if task.StartedAt != nil {
t.headEnd = task.StartedAt.Add(t.Duration)
t.headEnd = task.StartedAt.Add(d)
} else {
t.headEnd = task.CreatedAt.Add(t.Duration)
t.headEnd = task.CreatedAt.Add(d)
}

if task.StoppingAt != nil {
t.tailBegin = task.StoppingAt.Add(-t.Duration)
t.tailBegin = task.StoppingAt.Add(-d)
} else {
t.tailBegin = t.now.Add(-t.Duration)
t.tailBegin = t.now.Add(-d)
}
if task.StoppedAt != nil {
t.tailEnd = task.StoppedAt.Add(t.Duration)
t.tailEnd = task.StoppedAt.Add(d)
} else {
t.tailEnd = t.now
}
Expand Down

0 comments on commit 82cd3a3

Please sign in to comment.