Skip to content

Commit

Permalink
Support logs scraper (#12116)
Browse files Browse the repository at this point in the history
<!--Ex. Fixing a bug - Describe the bug and how this fixes the issue.
Ex. Adding a feature - Explain what this achieves.-->
#### Description
This PR added support for logs scraper

<!-- Issue number if applicable -->
#### Link to tracking issue
Relevant to #11238 

<!--Describe what testing was performed and which tests were added.-->
#### Testing
Added

<!--Describe the documentation added.-->
#### Documentation
Added

<!--Please delete paragraphs that you did not use before submitting.-->
  • Loading branch information
sincejune authored Jan 22, 2025
1 parent 9757ead commit 6740a28
Show file tree
Hide file tree
Showing 7 changed files with 558 additions and 51 deletions.
25 changes: 25 additions & 0 deletions .chloggen/support-logs-scraper.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: scraper

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Support logs scraper

# One or more tracking issues or pull requests related to the change
issues: [12116]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [api]
34 changes: 33 additions & 1 deletion scraper/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,19 @@ type Settings struct {
type Factory interface {
component.Factory

// CreateLogs creates a Logs scraper based on this config.
// If the scraper type does not support logs,
// this function returns the error [pipeline.ErrSignalNotSupported].
CreateLogs(ctx context.Context, set Settings, cfg component.Config) (Logs, error)

// CreateMetrics creates a Metrics scraper based on this config.
// If the scraper type does not support metrics,
// this function returns the error [pipeline.ErrSignalNotSupported].
// Implementers can assume `next` is never nil.
CreateMetrics(ctx context.Context, set Settings, cfg component.Config) (Metrics, error)

// LogsStability gets the stability level of the Logs scraper.
LogsStability() component.StabilityLevel

// MetricsStability gets the stability level of the Metrics scraper.
MetricsStability() component.StabilityLevel

Expand All @@ -59,7 +66,9 @@ func (f factoryOptionFunc) applyOption(o *factory) {
type factory struct {
cfgType component.Type
component.CreateDefaultConfigFunc
CreateLogsFunc
CreateMetricsFunc
logsStabilityLevel component.StabilityLevel
metricsStabilityLevel component.StabilityLevel
}

Expand All @@ -69,13 +78,28 @@ func (f *factory) Type() component.Type {

func (f *factory) unexportedFactoryFunc() {}

func (f *factory) LogsStability() component.StabilityLevel {
return f.logsStabilityLevel
}

func (f *factory) MetricsStability() component.StabilityLevel {
return f.metricsStabilityLevel
}

// CreateLogsFunc is the equivalent of Factory.CreateLogs().
type CreateLogsFunc func(context.Context, Settings, component.Config) (Logs, error)

// CreateMetricsFunc is the equivalent of Factory.CreateMetrics().
type CreateMetricsFunc func(context.Context, Settings, component.Config) (Metrics, error)

// CreateLogs implements Factory.CreateLogs.
func (f CreateLogsFunc) CreateLogs(ctx context.Context, set Settings, cfg component.Config) (Logs, error) {
if f == nil {
return nil, pipeline.ErrSignalNotSupported
}
return f(ctx, set, cfg)
}

// CreateMetrics implements Factory.CreateMetrics.
func (f CreateMetricsFunc) CreateMetrics(ctx context.Context, set Settings, cfg component.Config) (Metrics, error) {
if f == nil {
Expand All @@ -84,6 +108,14 @@ func (f CreateMetricsFunc) CreateMetrics(ctx context.Context, set Settings, cfg
return f(ctx, set, cfg)
}

// WithLogs overrides the default "error not supported" implementation for CreateLogs and the default "undefined" stability level.
func WithLogs(createLogs CreateLogsFunc, sl component.StabilityLevel) FactoryOption {
return factoryOptionFunc(func(o *factory) {
o.logsStabilityLevel = sl
o.CreateLogsFunc = createLogs
})
}

// WithMetrics overrides the default "error not supported" implementation for CreateMetrics and the default "undefined" stability level.
func WithMetrics(createMetrics CreateMetricsFunc, sl component.StabilityLevel) FactoryOption {
return factoryOptionFunc(func(o *factory) {
Expand Down
15 changes: 13 additions & 2 deletions scraper/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,9 @@ func TestNewFactory(t *testing.T) {
func() component.Config { return &defaultCfg })
assert.EqualValues(t, testType, f.Type())
assert.EqualValues(t, &defaultCfg, f.CreateDefaultConfig())
_, err := f.CreateMetrics(context.Background(), nopSettings(), &defaultCfg)
_, err := f.CreateLogs(context.Background(), nopSettings(), &defaultCfg)
require.ErrorIs(t, err, pipeline.ErrSignalNotSupported)
_, err = f.CreateMetrics(context.Background(), nopSettings(), &defaultCfg)
require.ErrorIs(t, err, pipeline.ErrSignalNotSupported)
}

Expand All @@ -41,12 +43,17 @@ func TestNewFactoryWithOptions(t *testing.T) {
f := NewFactory(
testType,
func() component.Config { return &defaultCfg },
WithLogs(createLogs, component.StabilityLevelAlpha),
WithMetrics(createMetrics, component.StabilityLevelAlpha))
assert.EqualValues(t, testType, f.Type())
assert.EqualValues(t, &defaultCfg, f.CreateDefaultConfig())

assert.Equal(t, component.StabilityLevelAlpha, f.LogsStability())
_, err := f.CreateLogs(context.Background(), Settings{}, &defaultCfg)
require.NoError(t, err)

assert.Equal(t, component.StabilityLevelAlpha, f.MetricsStability())
_, err := f.CreateMetrics(context.Background(), Settings{}, &defaultCfg)
_, err = f.CreateMetrics(context.Background(), Settings{}, &defaultCfg)
require.NoError(t, err)
}

Expand Down Expand Up @@ -87,6 +94,10 @@ func TestMakeFactoryMap(t *testing.T) {
}
}

func createLogs(context.Context, Settings, component.Config) (Logs, error) {
return NewLogs(newTestScrapeLogsFunc(nil))
}

func createMetrics(context.Context, Settings, component.Config) (Metrics, error) {
return NewMetrics(newTestScrapeMetricsFunc(nil))
}
44 changes: 44 additions & 0 deletions scraper/scraperhelper/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/receiver"
"go.opentelemetry.io/collector/receiver/receiverhelper"
Expand Down Expand Up @@ -179,6 +180,30 @@ func (sc *controller[T]) startScraping() {
}()
}

// NewLogsController creates a receiver.Logs with the configured options, that can control multiple scraper.Logs.
func NewLogsController(cfg *ControllerConfig,
rSet receiver.Settings,
nextConsumer consumer.Logs,
options ...ControllerOption,
) (receiver.Logs, error) {
co := getOptions(options)
scrapers := make([]scraper.Logs, 0, len(co.factoriesWithConfig))
for _, fwc := range co.factoriesWithConfig {
set := getSettings(fwc.f.Type(), rSet)
s, err := fwc.f.CreateLogs(context.Background(), set, fwc.cfg)
if err != nil {
return nil, err
}
s, err = wrapObsLogs(s, rSet.ID, set.ID, set.TelemetrySettings)
if err != nil {
return nil, err
}
scrapers = append(scrapers, s)
}
return newController[scraper.Logs](
cfg, rSet, scrapers, func(c *controller[scraper.Logs]) { scrapeLogs(c, nextConsumer) }, co.tickerCh)
}

// NewMetricsController creates a receiver.Metrics with the configured options, that can control multiple scraper.Metrics.
func NewMetricsController(cfg *ControllerConfig,
rSet receiver.Settings,
Expand All @@ -203,6 +228,25 @@ func NewMetricsController(cfg *ControllerConfig,
cfg, rSet, scrapers, func(c *controller[scraper.Metrics]) { scrapeMetrics(c, nextConsumer) }, co.tickerCh)
}

func scrapeLogs(c *controller[scraper.Logs], nextConsumer consumer.Logs) {
ctx, done := withScrapeContext(c.timeout)
defer done()

logs := plog.NewLogs()
for i := range c.scrapers {
md, err := c.scrapers[i].ScrapeLogs(ctx)
if err != nil && !scrapererror.IsPartialScrapeError(err) {
continue
}
md.ResourceLogs().MoveAndAppendTo(logs.ResourceLogs())
}

logRecordCount := logs.LogRecordCount()
ctx = c.obsrecv.StartMetricsOp(ctx)
err := nextConsumer.ConsumeLogs(ctx, logs)
c.obsrecv.EndMetricsOp(ctx, "", logRecordCount, err)
}

func scrapeMetrics(c *controller[scraper.Metrics], nextConsumer consumer.Metrics) {
ctx, done := withScrapeContext(c.timeout)
defer done()
Expand Down
Loading

0 comments on commit 6740a28

Please sign in to comment.