Skip to content

Commit

Permalink
filebeat/input/filestream: Remove x-pack dependency (#40483)
Browse files Browse the repository at this point in the history
To fix the failing linter rule that requires OSS code to not depend on x-pack
code, this change removes the dependency on x-pack/dockerlogbeat/pipelinemock
from the Filebeat filestream input. It does this by creating a mock client
that is local to filestream test code ("A little copying is better than a
little dependency").

Fixes #40293
  • Loading branch information
vinit-chauhan authored Sep 3, 2024
1 parent 1f20a39 commit 49582f4
Showing 1 changed file with 70 additions and 2 deletions.
72 changes: 70 additions & 2 deletions filebeat/input/filestream/internal/input-logfile/harvester_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,9 @@ import (

"github.com/elastic/beats/v7/filebeat/input/filestream/internal/task"
input "github.com/elastic/beats/v7/filebeat/input/v2"
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/common/atomic"
"github.com/elastic/beats/v7/libbeat/tests/resources"
"github.com/elastic/beats/v7/x-pack/dockerlogbeat/pipelinemock"
"github.com/elastic/elastic-agent-libs/logp"
)

Expand Down Expand Up @@ -393,7 +393,7 @@ func TestDefaultHarvesterGroup(t *testing.T) {
func testDefaultHarvesterGroup(t *testing.T, mockHarvester Harvester) *defaultHarvesterGroup {
return &defaultHarvesterGroup{
readers: newReaderGroup(),
pipeline: &pipelinemock.MockPipelineConnector{},
pipeline: &MockPipeline{},
harvester: mockHarvester,
store: testOpenStore(t, "test", nil),
identifier: &sourceIdentifier{"filestream::.global::"},
Expand Down Expand Up @@ -465,3 +465,71 @@ func (tl *testLogger) Errorf(format string, args ...interface{}) {
func (tl *testLogger) String() string {
return (*strings.Builder)(tl).String()
}

// MockClient is a mock implementation of the beat.Client interface.
type MockClient struct {
published []beat.Event // Slice to store published events

closed bool // Flag to indicate if the client is closed
mu sync.Mutex // Mutex to synchronize access to the published events slice
}

// GetEvents returns all the events published by the mock client.
func (m *MockClient) GetEvents() []beat.Event {
m.mu.Lock()
defer m.mu.Unlock()

return m.published
}

// Publish publishes a single event.
func (m *MockClient) Publish(e beat.Event) {
es := make([]beat.Event, 1)
es = append(es, e)

m.PublishAll(es)
}

// PublishAll publishes multiple events.
func (m *MockClient) PublishAll(es []beat.Event) {
m.mu.Lock()
defer m.mu.Unlock()

m.published = append(m.published, es...)
}

// Close closes the mock client.
func (m *MockClient) Close() error {
m.mu.Lock()
defer m.mu.Unlock()

if m.closed {
return fmt.Errorf("mock already closed")
}

m.closed = true
return nil
}

// MockPipeline is a mock implementation of the beat.Pipeline interface.
type MockPipeline struct {
c beat.Client // Client used by the pipeline
mu sync.Mutex // Mutex to synchronize access to the client
}

// ConnectWith connects the mock pipeline with a client using the provided configuration.
func (mp *MockPipeline) ConnectWith(config beat.ClientConfig) (beat.Client, error) {
mp.mu.Lock()
defer mp.mu.Unlock()

c := &MockClient{}

mp.c = c

return c, nil
}

// Connect connects the mock pipeline with a client using the default configuration.
func (mp *MockPipeline) Connect() (beat.Client, error) {
return mp.ConnectWith(beat.ClientConfig{})
}

0 comments on commit 49582f4

Please sign in to comment.