Skip to content

Commit

Permalink
[exporter/fileexporter] Add support for profiles signal
Browse files Browse the repository at this point in the history
  • Loading branch information
haoqixu committed Nov 21, 2024
1 parent 9309021 commit 5f2789e
Show file tree
Hide file tree
Showing 14 changed files with 304 additions and 48 deletions.
4 changes: 3 additions & 1 deletion exporter/fileexporter/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,13 @@
<!-- status autogenerated section -->
| Status | |
| ------------- |-----------|
| Stability | [alpha]: traces, metrics, logs |
| Stability | [development]: profiles |
| | [alpha]: traces, metrics, logs |
| Distributions | [core], [contrib], [k8s] |
| Issues | [![Open issues](https://img.shields.io/github/issues-search/open-telemetry/opentelemetry-collector-contrib?query=is%3Aissue%20is%3Aopen%20label%3Aexporter%2Ffile%20&label=open&color=orange&logo=opentelemetry)](https://github.com/open-telemetry/opentelemetry-collector-contrib/issues?q=is%3Aopen+is%3Aissue+label%3Aexporter%2Ffile) [![Closed issues](https://img.shields.io/github/issues-search/open-telemetry/opentelemetry-collector-contrib?query=is%3Aissue%20is%3Aclosed%20label%3Aexporter%2Ffile%20&label=closed&color=blue&logo=opentelemetry)](https://github.com/open-telemetry/opentelemetry-collector-contrib/issues?q=is%3Aclosed+is%3Aissue+label%3Aexporter%2Ffile) |
| [Code Owners](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/CONTRIBUTING.md#becoming-a-code-owner) | [@atingchen](https://www.github.com/atingchen) |

[development]: https://github.com/open-telemetry/opentelemetry-collector/blob/main/docs/component-stability.md#development
[alpha]: https://github.com/open-telemetry/opentelemetry-collector/blob/main/docs/component-stability.md#alpha
[core]: https://github.com/open-telemetry/opentelemetry-collector-releases/tree/main/distributions/otelcol
[contrib]: https://github.com/open-telemetry/opentelemetry-collector-releases/tree/main/distributions/otelcol-contrib
Expand Down
19 changes: 19 additions & 0 deletions exporter/fileexporter/encoding_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,13 @@ import (
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/exporter/exporterprofiles"
"go.opentelemetry.io/collector/exporter/exportertest"
"go.opentelemetry.io/collector/extension/extensiontest"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/pdata/pprofile"
"go.opentelemetry.io/collector/pdata/ptrace"

"github.com/open-telemetry/opentelemetry-collector-contrib/extension/encoding/otlpencodingextension"
Expand Down Expand Up @@ -51,28 +53,34 @@ func TestEncoding(t *testing.T) {
require.NoError(t, err)
le, err := f.CreateLogs(context.Background(), exportertest.NewNopSettings(), cfg)
require.NoError(t, err)
pe, err := f.(exporterprofiles.Factory).CreateProfiles(context.Background(), exportertest.NewNopSettings(), cfg)
require.NoError(t, err)
host := hostWithEncoding{
map[component.ID]component.Component{id: ext},
}
require.NoError(t, me.Start(context.Background(), host))
require.NoError(t, te.Start(context.Background(), host))
require.NoError(t, le.Start(context.Background(), host))
require.NoError(t, pe.Start(context.Background(), host))
t.Cleanup(func() {
})

require.NoError(t, me.ConsumeMetrics(context.Background(), generateMetrics()))
require.NoError(t, te.ConsumeTraces(context.Background(), generateTraces()))
require.NoError(t, le.ConsumeLogs(context.Background(), generateLogs()))
require.NoError(t, pe.ConsumeProfiles(context.Background(), generateProfiles()))

require.NoError(t, me.Shutdown(context.Background()))
require.NoError(t, te.Shutdown(context.Background()))
require.NoError(t, le.Shutdown(context.Background()))
require.NoError(t, pe.Shutdown(context.Background()))

b, err := os.ReadFile(cfg.Path)
require.NoError(t, err)
require.Contains(t, string(b), `{"resourceMetrics":`)
require.Contains(t, string(b), `{"resourceSpans":`)
require.Contains(t, string(b), `{"resourceLogs":`)
require.Contains(t, string(b), `{"resourceProfiles":`)
}

func generateLogs() plog.Logs {
Expand All @@ -85,6 +93,17 @@ func generateLogs() plog.Logs {
return logs
}

func generateProfiles() pprofile.Profiles {
proflies := pprofile.NewProfiles()
rp := proflies.ResourceProfiles().AppendEmpty()
rp.Resource().Attributes().PutStr("resource", "R1")
p := rp.ScopeProfiles().AppendEmpty().Profiles().AppendEmpty()
p.SetProfileID(pprofile.NewProfileIDEmpty())
p.SetStartTime(pcommon.NewTimestampFromTime(time.Now().Add(-1 * time.Second)))
p.SetEndTime(pcommon.NewTimestampFromTime(time.Now()))
return proflies
}

func generateMetrics() pmetric.Metrics {
metrics := pmetric.NewMetrics()
rm := metrics.ResourceMetrics().AppendEmpty()
Expand Down
30 changes: 26 additions & 4 deletions exporter/fileexporter/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,11 @@ import (
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/exporter"
"go.opentelemetry.io/collector/exporter/exporterhelper"
"go.opentelemetry.io/collector/exporter/exporterhelper/exporterhelperprofiles"
"go.opentelemetry.io/collector/exporter/exporterprofiles"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/pdata/pprofile"
"go.opentelemetry.io/collector/pdata/ptrace"
"go.uber.org/zap"
"gopkg.in/natefinch/lumberjack.v2"
Expand Down Expand Up @@ -44,16 +47,18 @@ type FileExporter interface {
consumeTraces(_ context.Context, td ptrace.Traces) error
consumeMetrics(_ context.Context, md pmetric.Metrics) error
consumeLogs(_ context.Context, ld plog.Logs) error
consumeProfiles(_ context.Context, pd pprofile.Profiles) error
}

// NewFactory creates a factory for OTLP exporter.
func NewFactory() exporter.Factory {
return exporter.NewFactory(
return exporterprofiles.NewFactory(
metadata.Type,
createDefaultConfig,
exporter.WithTraces(createTracesExporter, metadata.TracesStability),
exporter.WithMetrics(createMetricsExporter, metadata.MetricsStability),
exporter.WithLogs(createLogsExporter, metadata.LogsStability))
exporterprofiles.WithTraces(createTracesExporter, metadata.TracesStability),
exporterprofiles.WithMetrics(createMetricsExporter, metadata.MetricsStability),
exporterprofiles.WithLogs(createLogsExporter, metadata.LogsStability),
exporterprofiles.WithProfiles(createProfilesExporter, metadata.ProfilesStability))
}

func createDefaultConfig() component.Config {
Expand Down Expand Up @@ -118,6 +123,23 @@ func createLogsExporter(
)
}

func createProfilesExporter(
ctx context.Context,
set exporter.Settings,
cfg component.Config,
) (exporterprofiles.Profiles, error) {
fe := getOrCreateFileExporter(cfg, set.Logger)
return exporterhelperprofiles.NewProfilesExporter(
ctx,
set,
cfg,
fe.consumeProfiles,
exporterhelper.WithStart(fe.Start),
exporterhelper.WithShutdown(fe.Shutdown),
exporterhelper.WithCapabilities(consumer.Capabilities{MutatesData: false}),
)
}

// getOrCreateFileExporter creates a FileExporter and caches it for a particular configuration,
// or returns the already cached one. Caching is required because the factory is asked trace and
// metric receivers separately when it gets CreateTraces() and CreateMetrics()
Expand Down
27 changes: 27 additions & 0 deletions exporter/fileexporter/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,33 @@ func TestCreateLogsError(t *testing.T) {
assert.Error(t, err)
}

func TestCreateProfiles(t *testing.T) {
cfg := &Config{
FormatType: formatTypeJSON,
Path: tempFileName(t),
}
exp, err := createProfilesExporter(
context.Background(),
exportertest.NewNopSettings(),
cfg)
assert.NoError(t, err)
require.NotNil(t, exp)
assert.NoError(t, exp.Shutdown(context.Background()))
}

func TestCreateProfilesError(t *testing.T) {
cfg := &Config{
FormatType: formatTypeJSON,
}
e, err := createProfilesExporter(
context.Background(),
exportertest.NewNopSettings(),
cfg)
require.NoError(t, err)
err = e.Start(context.Background(), componenttest.NewNopHost())
assert.Error(t, err)
}

func TestNewFileWriter(t *testing.T) {
type args struct {
cfg *Config
Expand Down
9 changes: 9 additions & 0 deletions exporter/fileexporter/file_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/pdata/pprofile"
"go.opentelemetry.io/collector/pdata/ptrace"
)

Expand Down Expand Up @@ -43,6 +44,14 @@ func (e *fileExporter) consumeLogs(_ context.Context, ld plog.Logs) error {
return e.writer.export(buf)
}

func (e *fileExporter) consumeProfiles(_ context.Context, pd pprofile.Profiles) error {
buf, err := e.marshaller.marshalProfiles(pd)
if err != nil {
return err
}
return e.writer.export(buf)
}

// Start starts the flush timer if set.
func (e *fileExporter) Start(_ context.Context, host component.Host) error {
var err error
Expand Down
95 changes: 95 additions & 0 deletions exporter/fileexporter/file_exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/pdata/pprofile"
"go.opentelemetry.io/collector/pdata/ptrace"
"go.uber.org/zap"
"gopkg.in/natefinch/lumberjack.v2"
Expand Down Expand Up @@ -452,6 +453,100 @@ func TestFileLogsExporterErrors(t *testing.T) {
assert.NoError(t, fe.Shutdown(context.Background()))
}

func TestFileProfilesExporter(t *testing.T) {
type args struct {
conf *Config
unmarshaler pprofile.Unmarshaler
}
tests := []struct {
name string
args args
}{
{
name: "json: default configuration",
args: args{
conf: &Config{
Path: tempFileName(t),
FormatType: "json",
},
unmarshaler: &pprofile.JSONUnmarshaler{},
},
},
{
name: "json: compression configuration",
args: args{
conf: &Config{
Path: tempFileName(t),
FormatType: "json",
Compression: compressionZSTD,
},
unmarshaler: &pprofile.JSONUnmarshaler{},
},
},
{
name: "Proto: default configuration",
args: args{
conf: &Config{
Path: tempFileName(t),
FormatType: "proto",
},
unmarshaler: &pprofile.ProtoUnmarshaler{},
},
},
{
name: "Proto: compression configuration",
args: args{
conf: &Config{
Path: tempFileName(t),
FormatType: "proto",
Compression: compressionZSTD,
},
unmarshaler: &pprofile.ProtoUnmarshaler{},
},
},
{
name: "Proto: compression configuration--rotation",
args: args{
conf: &Config{
Path: tempFileName(t),
FormatType: "proto",
Compression: compressionZSTD,
Rotation: &Rotation{
MaxMegabytes: 3,
MaxDays: 0,
MaxBackups: defaultMaxBackups,
LocalTime: false,
},
},
unmarshaler: &pprofile.ProtoUnmarshaler{},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
// TODO
})
}
}

func TestFileProfilesExporterErrors(t *testing.T) {
mf := &errorWriter{}
fe := &fileExporter{
marshaller: &marshaller{
formatType: formatTypeJSON,
profilesMarshaler: profilesMarshalers[formatTypeJSON],
compressor: noneCompress,
},
writer: &fileWriter{
file: mf,
exporter: exportMessageAsLine,
},
}
require.NotNil(t, fe)

// TODO
}

func TestExportMessageAsBuffer(t *testing.T) {
path := tempFileName(t)
fe := &fileExporter{
Expand Down
3 changes: 1 addition & 2 deletions exporter/fileexporter/generated_package_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 5 additions & 2 deletions exporter/fileexporter/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,12 @@ require (
go.opentelemetry.io/collector/consumer v0.114.0
go.opentelemetry.io/collector/consumer/consumererror v0.114.0
go.opentelemetry.io/collector/exporter v0.114.0
go.opentelemetry.io/collector/exporter/exporterhelper/exporterhelperprofiles v0.114.0
go.opentelemetry.io/collector/exporter/exporterprofiles v0.114.0
go.opentelemetry.io/collector/exporter/exportertest v0.114.0
go.opentelemetry.io/collector/extension/extensiontest v0.114.0
go.opentelemetry.io/collector/pdata v1.20.0
go.opentelemetry.io/collector/pdata/pprofile v0.114.0
go.uber.org/goleak v1.3.0
go.uber.org/zap v1.27.0
gopkg.in/natefinch/lumberjack.v2 v2.2.1
Expand All @@ -44,13 +47,13 @@ require (
github.com/rogpeppe/go-internal v1.12.0 // indirect
go.opentelemetry.io/collector/config/configretry v1.20.0 // indirect
go.opentelemetry.io/collector/config/configtelemetry v0.114.0 // indirect
go.opentelemetry.io/collector/consumer/consumererror/consumererrorprofiles v0.114.0 // indirect
go.opentelemetry.io/collector/consumer/consumerprofiles v0.114.0 // indirect
go.opentelemetry.io/collector/consumer/consumertest v0.114.0 // indirect
go.opentelemetry.io/collector/exporter/exporterprofiles v0.114.0 // indirect
go.opentelemetry.io/collector/extension v0.114.0 // indirect
go.opentelemetry.io/collector/extension/experimental/storage v0.114.0 // indirect
go.opentelemetry.io/collector/pdata/pprofile v0.114.0 // indirect
go.opentelemetry.io/collector/pipeline v0.114.0 // indirect
go.opentelemetry.io/collector/pipeline/pipelineprofiles v0.114.0 // indirect
go.opentelemetry.io/collector/receiver v0.114.0 // indirect
go.opentelemetry.io/collector/receiver/receiverprofiles v0.114.0 // indirect
go.opentelemetry.io/collector/receiver/receivertest v0.114.0 // indirect
Expand Down
6 changes: 6 additions & 0 deletions exporter/fileexporter/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 5f2789e

Please sign in to comment.