Skip to content

Commit

Permalink
ci: add fluentd basic e2e tests (#71)
Browse files Browse the repository at this point in the history
- breaking out the 'log generation' piece out of the aws log test file
into the main test file as func sendTestLogByContainerd so that it can
be reused for every driver (which seems like a good idea)
- add a fluentd test that does validation
- adds fluentd to the GitHub action (note this launches a docker
containerd with the fluentd server)
- adds fluentd test to makefile

Signed-off-by: Ziwen Ning <[email protected]>
  • Loading branch information
ningziwen authored Sep 19, 2023
1 parent 9ee39e1 commit 1e30706
Show file tree
Hide file tree
Showing 5 changed files with 180 additions and 55 deletions.
34 changes: 34 additions & 0 deletions .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,40 @@ jobs:
run: sudo make build
- name: test-e2e
run: sudo -E make test-e2e-for-awslogs # containerd interaction requires sudo and aws cloudwatch interaction requires passing env vars
e2e-tests-for-fluentd:
strategy:
fail-fast: false
matrix:
go: [ '1.20', '1.21' ]
os: [ ubuntu-latest ] # TODO: Add Windows e2e tests: https://github.com/aws/shim-loggers-for-containerd/issues/68
name: E2E tests / fluentd / ${{ matrix.os }} / Go ${{ matrix.go }}
runs-on: ${{ matrix.os }}
permissions:
id-token: write
contents: write
steps:
- uses: actions/checkout@v3
- uses: actions/setup-go@v4
with:
go-version: ${{ matrix.go }}
cache: false
- name: install and start containerd
shell: bash
run: sudo scripts/install-containerd
- name: start fluentd local endpoint
shell: bash
run: |
: # not using github action env because env map cannot be defined in terms of other vars in the map. public.ecr.aws/docker/library/fluentd:v1.16-debian-1
FLUENTD_LOG_DIR=${GITHUB_WORKSPACE}/fluentd-logs
FLUENTD_PORT=24224
FLUENTD_IMAGE=public.ecr.aws/docker/library/fluentd:v1.16-debian-1
: # Fluentd container is not using root user so need 777 to make it writable. https://docs.github.com/en/actions/using-github-hosted-runners/about-github-hosted-runners#docker-container-filesystem
sudo mkdir -m 777 $FLUENTD_LOG_DIR
docker run -d -p $FLUENTD_PORT:24224 -p $FLUENTD_PORT:24224/udp -v $FLUENTD_LOG_DIR:/fluentd/log $FLUENTD_IMAGE
- name: build
run: sudo make build
- name: test-e2e
run: sudo make test-e2e-for-fluentd # containerd interaction requires sudo
go-mod-tidy-check:
runs-on: ubuntu-latest
steps:
Expand Down
4 changes: 4 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ test-e2e:
test-e2e-for-aws-logs:
go test -timeout 30m ./e2e -test.v -ginkgo.v --binary "$(AWS_CONTAINERD_LOGGERS_BINARY)" --log-driver "awslogs"

.PHONY: test-e2e-for-fluentd
test-e2e-for-fluentd:
go test -timeout 30m ./e2e -test.v -ginkgo.v --binary "$(AWS_CONTAINERD_LOGGERS_BINARY)" --log-driver "fluentd"

.PHONY: coverage
coverage:
go test -tags unit $(shell go list ./... | grep -v e2e) -coverprofile=test-coverage.out
Expand Down
78 changes: 23 additions & 55 deletions e2e/awslogs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,11 @@ package e2e

import (
"context"
"fmt"

"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/service/cloudwatchlogs"
"github.com/containerd/containerd"
"github.com/containerd/containerd/cio"
"github.com/containerd/containerd/namespaces"
"github.com/containerd/containerd/oci"
"github.com/onsi/ginkgo/v2"
"github.com/onsi/gomega"
)
Expand All @@ -28,31 +24,22 @@ const (
testAwslogsRegion = "us-west-2"
testAwsLogsStream = "test-stream"
testAwsLogsGroup = "test-shim-logger"
testAwsLogsMessage = "test-e2e-log"
)

var testAwslogs = func() {
// These tests are run in serial because we only define one log driver instance.
ginkgo.Describe("awslogs shim logger", ginkgo.Serial, func() { //nolint:typecheck
ginkgo.Describe("awslogs shim logger", ginkgo.Serial, func() {
var cwClient *cloudwatchlogs.Client
ginkgo.BeforeEach(func() {
cfg, err := config.LoadDefaultConfig(context.TODO(), config.WithRegion(testAwslogsRegion))
gomega.Expect(err).ShouldNot(gomega.HaveOccurred())
cwClient = cloudwatchlogs.NewFromConfig(cfg)
_, err = cwClient.DeleteLogStream(context.TODO(), &cloudwatchlogs.DeleteLogStreamInput{
LogStreamName: aws.String(testAwsLogsStream),
LogGroupName: aws.String(testAwsLogsGroup),
})
gomega.Expect(err).ShouldNot(gomega.HaveOccurred())
cleanupAwslogs(cwClient, testAwsLogsGroup, testAwsLogsStream)
})
ginkgo.AfterEach(func() {
_, err := cwClient.DeleteLogStream(context.TODO(), &cloudwatchlogs.DeleteLogStreamInput{
LogStreamName: aws.String(testAwsLogsStream),
LogGroupName: aws.String(testAwsLogsGroup),
})
gomega.Expect(err).ShouldNot(gomega.HaveOccurred())
cleanupAwslogs(cwClient, testAwsLogsGroup, testAwsLogsStream)
})
ginkgo.It("should send logs to awslogs log driver", func() { //nolint:typecheck
ginkgo.It("should send logs to awslogs log driver", func() {
args := map[string]string{
logDriverTypeKey: awslogsDriverName,
containerIdKey: testContainerId,
Expand All @@ -63,45 +50,26 @@ var testAwslogs = func() {
awslogsStreamKey: testAwsLogsStream,
}
creator := cio.BinaryIO(*Binary, args)
// Create a new client connected to the containerd daemon
client, err := containerd.New(containerdAddress)
gomega.Expect(err).ShouldNot(gomega.HaveOccurred())
defer client.Close()
// Create a new context with a customized namespace
ctx := namespaces.WithNamespace(context.Background(), "testAwslogs")
// Pull an image
image, err := client.Pull(ctx, testImage, containerd.WithPullUnpack)
gomega.Expect(err).ShouldNot(gomega.HaveOccurred())
// Create a new container with the pulled image
container, err := client.NewContainer(ctx, testContainerId, containerd.WithImage(image),
containerd.WithNewSnapshot("test-snapshot", image), containerd.WithNewSpec(oci.WithImageConfig(image),
oci.WithProcessArgs("/bin/sh", "-c", fmt.Sprintf("echo '%s'", testAwsLogsMessage))))
gomega.Expect(err).ShouldNot(gomega.HaveOccurred())
defer container.Delete(ctx, containerd.WithSnapshotCleanup) //nolint:errcheck // testing only
// Create a new task from the container and start it
task, err := container.NewTask(ctx, creator)
gomega.Expect(err).ShouldNot(gomega.HaveOccurred())
defer task.Delete(ctx) //nolint:errcheck // testing only

err = task.Start(ctx)
gomega.Expect(err).ShouldNot(gomega.HaveOccurred())
sendTestLogByContainerd(creator, testLog)
validateTestLogsInAwslogs(cwClient, testAwsLogsGroup, testAwsLogsStream, testLog)
})
})
}

statusC, err := task.Wait(ctx)
gomega.Expect(err).ShouldNot(gomega.HaveOccurred())
// Waiting for the task to finish
status := <-statusC
code, _, err := status.Result()
gomega.Expect(err).ShouldNot(gomega.HaveOccurred())
gomega.Expect(code).Should(gomega.Equal(uint32(0)))
func validateTestLogsInAwslogs(client *cloudwatchlogs.Client, logGroupName string, logStreamName string, testLog string) {
cwOutput, err := client.GetLogEvents(context.TODO(), &cloudwatchlogs.GetLogEventsInput{
LogStreamName: aws.String(logGroupName),
LogGroupName: aws.String(logStreamName),
Limit: aws.Int32(1),
})
gomega.Expect(err).ShouldNot(gomega.HaveOccurred())
gomega.Expect(*cwOutput.Events[0].Message).Should(gomega.Equal(testLog))
}

// Validating in AWS logs
cwOutput, err := cwClient.GetLogEvents(context.TODO(), &cloudwatchlogs.GetLogEventsInput{
LogStreamName: aws.String(testAwsLogsStream),
LogGroupName: aws.String(testAwsLogsGroup),
Limit: aws.Int32(1),
})
gomega.Expect(err).ShouldNot(gomega.HaveOccurred())
gomega.Expect(*cwOutput.Events[0].Message).Should(gomega.Equal(testAwsLogsMessage))
})
func cleanupAwslogs(client *cloudwatchlogs.Client, logGroupName string, logStreamName string) {
_, err := client.DeleteLogStream(context.TODO(), &cloudwatchlogs.DeleteLogStreamInput{
LogStreamName: aws.String(logGroupName),
LogGroupName: aws.String(logStreamName),
})
gomega.Expect(err).ShouldNot(gomega.HaveOccurred())
}
75 changes: 75 additions & 0 deletions e2e/fluentd_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0

package e2e

import (
"bufio"
"encoding/json"
"os"
"path/filepath"
"strings"

"github.com/containerd/containerd/cio"
"github.com/onsi/ginkgo/v2"
"github.com/onsi/gomega"
)

const (
fluentdLogDirName = "./../fluentd-logs"
)

var testFluentd = func() {
// These tests are run in serial because we only define one log driver instance.
ginkgo.Describe("fluentd shim logger", ginkgo.Serial, func() {
ginkgo.It("should send logs to fluentd log driver", func() {
args := map[string]string{
logDriverTypeKey: fluentdDriverName,
containerIdKey: testContainerId,
containerNameKey: testContainerName,
"--verbose": "true",
}
creator := cio.BinaryIO(*Binary, args)
sendTestLogByContainerd(creator, testLog)
validateTestLogsInFluentd(fluentdLogDirName, testLog)
})
})
}

func validateTestLogsInFluentd(dirName string, testLog string) {
// For single test, there are 3 files in Fluentd log dir: "data.<hash>.log", "data.<hash>.log.meta" and "data.log".
// For example: "data.b60581c99383f387cfaba1fc90272852e.log", "data.b60581c99383f387cfaba1fc90272852e.log.meta" and "data.log"
// "data.<hash>.log" has the logs that the tests sent.
// "data.<hash>.log" can have multiple lines of records following time sequence. Here is a sample content with 3 lines.
// 2023-09-16T22:54:11+00:00 123456789012 {"source":"stdout","log":"test-e2e-log","container_id":"123456789012","container_name":"test-container"}
// 2023-09-16T22:54:30+00:00 123456789012 {"container_id":"123456789012","container_name":"test-container","source":"stdout","log":"test-e2e-log"}
// 2023-09-16T22:56:17+00:00 123456789012 {"container_id":"123456789012","container_name":"test-container","source":"stdout","log":"test-e2e-log"}
// The following steps retrieves the "log" field of the third string parsed by tab of the last line to validate the tests sent.
var fileName string
err := filepath.Walk(dirName, func(path string, info os.FileInfo, err error) error {
if err != nil {
return err
}
if strings.HasPrefix(info.Name(), "data.") && strings.HasSuffix(info.Name(), ".log") && info.Name() != "data.log" {
fileName = path
}
return nil
})
gomega.Expect(err).ShouldNot(gomega.HaveOccurred())
gomega.Expect(fileName).ShouldNot(gomega.Equal(""))
file, err := os.Open(fileName)
gomega.Expect(err).ShouldNot(gomega.HaveOccurred())
defer file.Close()
var lastLine string
scanner := bufio.NewScanner(file)
for scanner.Scan() {
lastLine = scanner.Text()
}
gomega.Expect(err).ShouldNot(gomega.HaveOccurred())
contentParts := strings.Split(lastLine, "\t")
gomega.Expect(len(contentParts)).Should(gomega.Equal(3))
var logContent map[string]string
err = json.Unmarshal([]byte(contentParts[2]), &logContent)
gomega.Expect(err).ShouldNot(gomega.HaveOccurred())
gomega.Expect(logContent["log"]).Should(gomega.Equal(testLog))
}
44 changes: 44 additions & 0 deletions e2e/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,15 @@
package e2e

import (
"context"
"flag"
"fmt"
"testing"

"github.com/containerd/containerd"
"github.com/containerd/containerd/cio"
"github.com/containerd/containerd/namespaces"
"github.com/containerd/containerd/oci"
"github.com/onsi/ginkgo/v2"
"github.com/onsi/gomega"
)
Expand All @@ -15,12 +21,14 @@ const (
// LogDriver options
logDriverTypeKey = "--log-driver"
awslogsDriverName = "awslogs"
fluentdDriverName = "fluentd"
containerIdKey = "--container-id"
containerNameKey = "--container-name"
testContainerId = "test-container-id"
testContainerName = "test-container-name"
containerdAddress = "/run/containerd/containerd.sock"
testImage = "public.ecr.aws/docker/library/ubuntu:latest"
testLog = "test-e2e-log"
)

var (
Expand All @@ -36,8 +44,44 @@ func TestShimLoggers(t *testing.T) {
if *LogDriver == awslogsDriverName || *LogDriver == "" {
testAwslogs()
}
if *LogDriver == fluentdDriverName || *LogDriver == "" {
testFluentd()
}
})

gomega.RegisterFailHandler(ginkgo.Fail)
ginkgo.RunSpecs(t, description)
}

func sendTestLogByContainerd(creator cio.Creator, testLog string) {
// Create a new client connected to the containerd daemon
client, err := containerd.New(containerdAddress)
gomega.Expect(err).ShouldNot(gomega.HaveOccurred())
defer client.Close()
// Create a new context with a customized namespace
ctx := namespaces.WithNamespace(context.Background(), "testShimLoggers")
// Pull an image
image, err := client.Pull(ctx, testImage, containerd.WithPullUnpack)
gomega.Expect(err).ShouldNot(gomega.HaveOccurred())
// Create a new container with the pulled image
container, err := client.NewContainer(ctx, testContainerId, containerd.WithImage(image),
containerd.WithNewSnapshot("test-snapshot", image), containerd.WithNewSpec(oci.WithImageConfig(image),
oci.WithProcessArgs("/bin/sh", "-c", fmt.Sprintf("echo '%s'", testLog))))
gomega.Expect(err).ShouldNot(gomega.HaveOccurred())
defer container.Delete(ctx, containerd.WithSnapshotCleanup) //nolint:errcheck // testing only
// Create a new task from the container and start it
task, err := container.NewTask(ctx, creator)
gomega.Expect(err).ShouldNot(gomega.HaveOccurred())
defer task.Delete(ctx) //nolint:errcheck // testing only

err = task.Start(ctx)
gomega.Expect(err).ShouldNot(gomega.HaveOccurred())

statusC, err := task.Wait(ctx)
gomega.Expect(err).ShouldNot(gomega.HaveOccurred())
// Waiting for the task to finish
status := <-statusC
code, _, err := status.Result()
gomega.Expect(err).ShouldNot(gomega.HaveOccurred())
gomega.Expect(code).Should(gomega.Equal(uint32(0)))
}

0 comments on commit 1e30706

Please sign in to comment.