diff --git a/receiver/jaegerreceiver/config.go b/receiver/jaegerreceiver/config.go index bfa43c09e3c8..adb5b087778a 100644 --- a/receiver/jaegerreceiver/config.go +++ b/receiver/jaegerreceiver/config.go @@ -37,10 +37,10 @@ type RemoteSamplingConfig struct { // Protocols is the configuration for the supported protocols. type Protocols struct { - GRPC *configgrpc.ServerConfig `mapstructure:"grpc"` - ThriftHTTP *confighttp.ServerConfig `mapstructure:"thrift_http"` - ThriftBinary *ProtocolUDP `mapstructure:"thrift_binary"` - ThriftCompact *ProtocolUDP `mapstructure:"thrift_compact"` + GRPC *configgrpc.ServerConfig `mapstructure:"grpc"` + ThriftHTTP *confighttp.ServerConfig `mapstructure:"thrift_http"` + ThriftBinaryUDP *ProtocolUDP `mapstructure:"thrift_binary"` + ThriftCompactUDP *ProtocolUDP `mapstructure:"thrift_compact"` } // ProtocolUDP is the configuration for a UDP protocol. @@ -82,8 +82,8 @@ var ( func (cfg *Config) Validate() error { if cfg.GRPC == nil && cfg.ThriftHTTP == nil && - cfg.ThriftBinary == nil && - cfg.ThriftCompact == nil { + cfg.ThriftBinaryUDP == nil && + cfg.ThriftCompactUDP == nil { return errors.New("must specify at least one protocol when using the Jaeger receiver") } @@ -99,14 +99,14 @@ func (cfg *Config) Validate() error { } } - if cfg.ThriftBinary != nil { - if err := checkPortFromEndpoint(cfg.ThriftBinary.Endpoint); err != nil { + if cfg.ThriftBinaryUDP != nil { + if err := checkPortFromEndpoint(cfg.ThriftBinaryUDP.Endpoint); err != nil { return fmt.Errorf("invalid port number for the Thrift UDP Binary endpoint: %w", err) } } - if cfg.ThriftCompact != nil { - if err := checkPortFromEndpoint(cfg.ThriftCompact.Endpoint); err != nil { + if cfg.ThriftCompactUDP != nil { + if err := checkPortFromEndpoint(cfg.ThriftCompactUDP.Endpoint); err != nil { return fmt.Errorf("invalid port number for the Thrift UDP Compact endpoint: %w", err) } } @@ -145,10 +145,10 @@ func (cfg *Config) Unmarshal(componentParser *confmap.Conf) error { cfg.ThriftHTTP = nil } if !protocols.IsSet(protoThriftBinary) { - cfg.ThriftBinary = nil + cfg.ThriftBinaryUDP = nil } if !protocols.IsSet(protoThriftCompact) { - cfg.ThriftCompact = nil + cfg.ThriftCompactUDP = nil } return nil diff --git a/receiver/jaegerreceiver/config_test.go b/receiver/jaegerreceiver/config_test.go index ee9a05b6e23d..e2bcaeb154c6 100644 --- a/receiver/jaegerreceiver/config_test.go +++ b/receiver/jaegerreceiver/config_test.go @@ -42,7 +42,7 @@ func TestLoadConfig(t *testing.T) { ThriftHTTP: &confighttp.ServerConfig{ Endpoint: ":3456", }, - ThriftCompact: &ProtocolUDP{ + ThriftCompactUDP: &ProtocolUDP{ Endpoint: "0.0.0.0:456", ServerConfigUDP: ServerConfigUDP{ QueueSize: 100_000, @@ -51,7 +51,7 @@ func TestLoadConfig(t *testing.T) { SocketBufferSize: 65_536, }, }, - ThriftBinary: &ProtocolUDP{ + ThriftBinaryUDP: &ProtocolUDP{ Endpoint: "0.0.0.0:789", ServerConfigUDP: ServerConfigUDP{ QueueSize: 1_000, @@ -76,11 +76,11 @@ func TestLoadConfig(t *testing.T) { ThriftHTTP: &confighttp.ServerConfig{ Endpoint: "localhost:14268", }, - ThriftCompact: &ProtocolUDP{ + ThriftCompactUDP: &ProtocolUDP{ Endpoint: "localhost:6831", ServerConfigUDP: defaultServerConfigUDP(), }, - ThriftBinary: &ProtocolUDP{ + ThriftBinaryUDP: &ProtocolUDP{ Endpoint: "localhost:6832", ServerConfigUDP: defaultServerConfigUDP(), }, @@ -97,7 +97,7 @@ func TestLoadConfig(t *testing.T) { Transport: confignet.TransportTypeTCP, }, }, - ThriftCompact: &ProtocolUDP{ + ThriftCompactUDP: &ProtocolUDP{ Endpoint: "localhost:6831", ServerConfigUDP: defaultServerConfigUDP(), }, @@ -183,7 +183,7 @@ func TestInvalidConfig(t *testing.T) { { desc: "thrift-udp-compact-no-port", apply: func(cfg *Config) { - cfg.ThriftCompact = &ProtocolUDP{ + cfg.ThriftCompactUDP = &ProtocolUDP{ Endpoint: "localhost:", } }, @@ -192,7 +192,7 @@ func TestInvalidConfig(t *testing.T) { { desc: "thrift-udp-binary-no-port", apply: func(cfg *Config) { - cfg.ThriftBinary = &ProtocolUDP{ + cfg.ThriftBinaryUDP = &ProtocolUDP{ Endpoint: "localhost:", } }, @@ -220,7 +220,7 @@ func TestInvalidConfig(t *testing.T) { { desc: "port-outside-of-range", apply: func(cfg *Config) { - cfg.ThriftBinary = &ProtocolUDP{ + cfg.ThriftBinaryUDP = &ProtocolUDP{ Endpoint: "localhost:65536", } }, diff --git a/receiver/jaegerreceiver/factory.go b/receiver/jaegerreceiver/factory.go index 92d1f0cf5af9..56e4b6084bd4 100644 --- a/receiver/jaegerreceiver/factory.go +++ b/receiver/jaegerreceiver/factory.go @@ -60,11 +60,11 @@ func createDefaultConfig() component.Config { ThriftHTTP: &confighttp.ServerConfig{ Endpoint: defaultHTTPEndpoint, }, - ThriftBinary: &ProtocolUDP{ + ThriftBinaryUDP: &ProtocolUDP{ Endpoint: defaultThriftBinaryEndpoint, ServerConfigUDP: defaultServerConfigUDP(), }, - ThriftCompact: &ProtocolUDP{ + ThriftCompactUDP: &ProtocolUDP{ Endpoint: defaultThriftCompactEndpoint, ServerConfigUDP: defaultServerConfigUDP(), }, @@ -85,28 +85,10 @@ func createTracesReceiver( rCfg := cfg.(*Config) - var config configuration - // Set ports - if rCfg.Protocols.GRPC != nil { - config.GRPCServerConfig = *rCfg.Protocols.GRPC - } - - if rCfg.Protocols.ThriftHTTP != nil { - config.HTTPServerConfig = *rCfg.ThriftHTTP - } - - if rCfg.Protocols.ThriftBinary != nil { - config.AgentBinaryThrift = *rCfg.ThriftBinary - } - - if rCfg.Protocols.ThriftCompact != nil { - config.AgentCompactThrift = *rCfg.ThriftCompact - } - if rCfg.RemoteSampling != nil { set.Logger.Warn("You are using a deprecated no-op `remote_sampling` option which will be removed soon; use a `jaegerremotesampling` extension instead") } // Create the receiver. - return newJaegerReceiver(set.ID, &config, nextConsumer, set) + return newJaegerReceiver(set.ID, rCfg.Protocols, nextConsumer, set) } diff --git a/receiver/jaegerreceiver/factory_test.go b/receiver/jaegerreceiver/factory_test.go index 194a5dcaebcb..6c809c823891 100644 --- a/receiver/jaegerreceiver/factory_test.go +++ b/receiver/jaegerreceiver/factory_test.go @@ -91,7 +91,7 @@ func TestCreateDefaultGRPCEndpoint(t *testing.T) { r, err := factory.CreateTraces(context.Background(), set, cfg, nil) assert.NoError(t, err, "unexpected error creating receiver") - assert.Equal(t, "0.0.0.0:14250", r.(*jReceiver).config.GRPCServerConfig.NetAddr.Endpoint, "grpc port should be default") + assert.Equal(t, "0.0.0.0:14250", r.(*jReceiver).config.GRPC.NetAddr.Endpoint, "grpc port should be default") } func TestCreateTLSGPRCEndpoint(t *testing.T) { @@ -144,33 +144,33 @@ func TestCreateInvalidHTTPEndpoint(t *testing.T) { r, err := factory.CreateTraces(context.Background(), set, cfg, nil) assert.NoError(t, err, "unexpected error creating receiver") - assert.Equal(t, "localhost:14268", r.(*jReceiver).config.HTTPServerConfig.Endpoint, "http port should be default") + assert.Equal(t, "localhost:14268", r.(*jReceiver).config.ThriftHTTP.Endpoint, "http port should be default") } func TestCreateInvalidThriftBinaryEndpoint(t *testing.T) { factory := NewFactory() cfg := factory.CreateDefaultConfig() - cfg.(*Config).Protocols.ThriftBinary = &ProtocolUDP{ + cfg.(*Config).Protocols.ThriftBinaryUDP = &ProtocolUDP{ Endpoint: "0.0.0.0:6832", } set := receivertest.NewNopSettings() r, err := factory.CreateTraces(context.Background(), set, cfg, nil) assert.NoError(t, err, "unexpected error creating receiver") - assert.Equal(t, "0.0.0.0:6832", r.(*jReceiver).config.AgentBinaryThrift.Endpoint, "thrift port should be default") + assert.Equal(t, "0.0.0.0:6832", r.(*jReceiver).config.ThriftBinaryUDP.Endpoint, "thrift port should be default") } func TestCreateInvalidThriftCompactEndpoint(t *testing.T) { factory := NewFactory() cfg := factory.CreateDefaultConfig() - cfg.(*Config).Protocols.ThriftCompact = &ProtocolUDP{ + cfg.(*Config).Protocols.ThriftCompactUDP = &ProtocolUDP{ Endpoint: "0.0.0.0:6831", } set := receivertest.NewNopSettings() r, err := factory.CreateTraces(context.Background(), set, cfg, nil) assert.NoError(t, err, "unexpected error creating receiver") - assert.Equal(t, "0.0.0.0:6831", r.(*jReceiver).config.AgentCompactThrift.Endpoint, "thrift port should be default") + assert.Equal(t, "0.0.0.0:6831", r.(*jReceiver).config.ThriftCompactUDP.Endpoint, "thrift port should be default") } diff --git a/receiver/jaegerreceiver/go.sum b/receiver/jaegerreceiver/go.sum index 3931eafab10b..f07a7fe1dc19 100644 --- a/receiver/jaegerreceiver/go.sum +++ b/receiver/jaegerreceiver/go.sum @@ -189,8 +189,6 @@ go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.57.0 h1:DheMAlT go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.57.0/go.mod h1:wZcGmeVO9nzP67aYSLDqXNWK87EZWhi7JWj1v7ZXf94= go.opentelemetry.io/otel v1.32.0 h1:WnBN+Xjcteh0zdk01SVqV55d/m62NJLJdIyb4y/WO5U= go.opentelemetry.io/otel v1.32.0/go.mod h1:00DCVSB0RQcnzlwyTfqtxSm+DRr9hpYrHjNGiBHVQIg= -go.opentelemetry.io/otel/exporters/prometheus v0.54.0 h1:rFwzp68QMgtzu9PgP3jm9XaMICI6TsofWWPcBDKwlsU= -go.opentelemetry.io/otel/exporters/prometheus v0.54.0/go.mod h1:QyjcV9qDP6VeK5qPyKETvNjmaaEc7+gqjh4SS0ZYzDU= go.opentelemetry.io/otel/metric v1.32.0 h1:xV2umtmNcThh2/a/aCP+h64Xx5wsj8qqnkYZktzNa0M= go.opentelemetry.io/otel/metric v1.32.0/go.mod h1:jH7CIbbK6SH2V2wE16W05BHCtIDzauciCRLoc/SyMv8= go.opentelemetry.io/otel/sdk v1.32.0 h1:RNxepc9vK59A8XsgZQouW8ue8Gkb4jpWtJm9ge5lEG4= diff --git a/receiver/jaegerreceiver/jaeger_agent_test.go b/receiver/jaegerreceiver/jaeger_agent_test.go index 1c0765052ef1..4e23cbda9696 100644 --- a/receiver/jaegerreceiver/jaeger_agent_test.go +++ b/receiver/jaegerreceiver/jaeger_agent_test.go @@ -5,9 +5,7 @@ package jaegerreceiver import ( "context" - "fmt" "net" - "net/http" "testing" "time" @@ -15,7 +13,6 @@ import ( "github.com/jaegertracing/jaeger/cmd/agent/app/servers/thriftudp" "github.com/jaegertracing/jaeger/model" jaegerconvert "github.com/jaegertracing/jaeger/model/converter/thrift/jaeger" - "github.com/jaegertracing/jaeger/proto-gen/api_v2" "github.com/jaegertracing/jaeger/thrift-gen/agent" jaegerthrift "github.com/jaegertracing/jaeger/thrift-gen/jaeger" "github.com/stretchr/testify/assert" @@ -26,7 +23,6 @@ import ( "go.opentelemetry.io/collector/pdata/ptrace" "go.opentelemetry.io/collector/receiver/receivertest" conventions "go.opentelemetry.io/collector/semconv/v1.27.0" - "google.golang.org/grpc" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/common/testutil" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/jaeger" @@ -37,8 +33,8 @@ var jaegerAgent = component.NewIDWithName(metadata.Type, "agent_test") func TestJaegerAgentUDP_ThriftCompact(t *testing.T) { addr := testutil.GetAvailableLocalAddress(t) - testJaegerAgent(t, addr, &configuration{ - AgentCompactThrift: ProtocolUDP{ + testJaegerAgent(t, addr, Protocols{ + ThriftCompactUDP: &ProtocolUDP{ Endpoint: addr, ServerConfigUDP: defaultServerConfigUDP(), }, @@ -46,8 +42,8 @@ func TestJaegerAgentUDP_ThriftCompact(t *testing.T) { } func TestJaegerAgentUDP_ThriftCompact_InvalidPort(t *testing.T) { - config := &configuration{ - AgentCompactThrift: ProtocolUDP{ + config := Protocols{ + ThriftCompactUDP: &ProtocolUDP{ Endpoint: "0.0.0.0:999999", ServerConfigUDP: defaultServerConfigUDP(), }, @@ -63,8 +59,8 @@ func TestJaegerAgentUDP_ThriftCompact_InvalidPort(t *testing.T) { func TestJaegerAgentUDP_ThriftBinary(t *testing.T) { addr := testutil.GetAvailableLocalAddress(t) - testJaegerAgent(t, addr, &configuration{ - AgentBinaryThrift: ProtocolUDP{ + testJaegerAgent(t, addr, Protocols{ + ThriftBinaryUDP: &ProtocolUDP{ Endpoint: addr, ServerConfigUDP: defaultServerConfigUDP(), }, @@ -75,8 +71,8 @@ func TestJaegerAgentUDP_ThriftBinary_PortInUse(t *testing.T) { // This test confirms that the thrift binary port is opened correctly. This is all we can test at the moment. See above. addr := testutil.GetAvailableLocalAddress(t) - config := &configuration{ - AgentBinaryThrift: ProtocolUDP{ + config := Protocols{ + ThriftBinaryUDP: &ProtocolUDP{ Endpoint: addr, ServerConfigUDP: defaultServerConfigUDP(), }, @@ -85,7 +81,7 @@ func TestJaegerAgentUDP_ThriftBinary_PortInUse(t *testing.T) { jr, err := newJaegerReceiver(jaegerAgent, config, nil, set) require.NoError(t, err) - assert.NoError(t, jr.startAgent(componenttest.NewNopHost()), "Start failed") + assert.NoError(t, jr.startAgent(), "Start failed") t.Cleanup(func() { require.NoError(t, jr.Shutdown(context.Background())) }) l, err := net.Listen("udp", addr) @@ -97,8 +93,8 @@ func TestJaegerAgentUDP_ThriftBinary_PortInUse(t *testing.T) { } func TestJaegerAgentUDP_ThriftBinary_InvalidPort(t *testing.T) { - config := &configuration{ - AgentBinaryThrift: ProtocolUDP{ + config := Protocols{ + ThriftBinaryUDP: &ProtocolUDP{ Endpoint: "0.0.0.0:999999", ServerConfigUDP: defaultServerConfigUDP(), }, @@ -112,60 +108,7 @@ func TestJaegerAgentUDP_ThriftBinary_InvalidPort(t *testing.T) { require.NoError(t, jr.Shutdown(context.Background())) } -func initializeGRPCTestServer(t *testing.T, beforeServe func(server *grpc.Server), opts ...grpc.ServerOption) (*grpc.Server, net.Addr) { - server := grpc.NewServer(opts...) - lis, err := net.Listen("tcp", "localhost:0") - require.NoError(t, err) - beforeServe(server) - go func() { - err := server.Serve(lis) - assert.NoError(t, err) - }() - return server, lis.Addr() -} - -type mockSamplingHandler struct{} - -func (*mockSamplingHandler) GetSamplingStrategy(context.Context, *api_v2.SamplingStrategyParameters) (*api_v2.SamplingStrategyResponse, error) { - return &api_v2.SamplingStrategyResponse{StrategyType: api_v2.SamplingStrategyType_PROBABILISTIC}, nil -} - -func TestJaegerHTTP(t *testing.T) { - s, _ := initializeGRPCTestServer(t, func(s *grpc.Server) { - api_v2.RegisterSamplingManagerServer(s, &mockSamplingHandler{}) - }) - defer s.GracefulStop() - - endpoint := testutil.GetAvailableLocalAddress(t) - config := &configuration{ - AgentHTTPEndpoint: endpoint, - } - set := receivertest.NewNopSettings() - jr, err := newJaegerReceiver(jaegerAgent, config, nil, set) - require.NoError(t, err) - t.Cleanup(func() { require.NoError(t, jr.Shutdown(context.Background())) }) - - assert.NoError(t, jr.Start(context.Background(), componenttest.NewNopHost()), "Start failed") - - // allow http server to start - assert.Eventually(t, func() bool { - var conn net.Conn - conn, err = net.Dial("tcp", endpoint) - if err == nil && conn != nil { - conn.Close() - return true - } - return false - }, 10*time.Second, 5*time.Millisecond, "failed to wait for the port to be open") - - resp, err := http.Get(fmt.Sprintf("http://%s/sampling?service=test", endpoint)) - assert.NoError(t, err, "should not have failed to make request") - assert.NotNil(t, resp) - defer resp.Body.Close() - assert.Equal(t, 500, resp.StatusCode, "should have returned 200") -} - -func testJaegerAgent(t *testing.T, agentEndpoint string, receiverConfig *configuration) { +func testJaegerAgent(t *testing.T, agentEndpoint string, receiverConfig Protocols) { // 1. Create the Jaeger receiver aka "server" sink := new(consumertest.TracesSink) set := receivertest.NewNopSettings() @@ -184,7 +127,7 @@ func testJaegerAgent(t *testing.T, agentEndpoint string, receiverConfig *configu require.NoError(t, err, "Start failed") // 2. Then send spans to the Jaeger receiver. - jexp, err := newClientUDP(agentEndpoint, jr.config.AgentBinaryThrift.Endpoint != "") + jexp, err := newClientUDP(agentEndpoint, jr.config.ThriftBinaryUDP != nil) require.NoError(t, err, "Failed to create the Jaeger OpenTelemetry exporter for the live application") // 3. Now finally send some spans diff --git a/receiver/jaegerreceiver/trace_receiver.go b/receiver/jaegerreceiver/trace_receiver.go index 3e3b44c83e72..af204faa68f0 100644 --- a/receiver/jaegerreceiver/trace_receiver.go +++ b/receiver/jaegerreceiver/trace_receiver.go @@ -15,8 +15,6 @@ import ( apacheThrift "github.com/apache/thrift/lib/go/thrift" "github.com/gorilla/mux" - "github.com/jaegertracing/jaeger/cmd/agent/app/configmanager" - "github.com/jaegertracing/jaeger/cmd/agent/app/httpserver" "github.com/jaegertracing/jaeger/cmd/agent/app/processors" "github.com/jaegertracing/jaeger/cmd/agent/app/servers" "github.com/jaegertracing/jaeger/cmd/agent/app/servers/thriftudp" @@ -24,13 +22,10 @@ import ( "github.com/jaegertracing/jaeger/pkg/metrics" "github.com/jaegertracing/jaeger/proto-gen/api_v2" "github.com/jaegertracing/jaeger/thrift-gen/agent" - "github.com/jaegertracing/jaeger/thrift-gen/baggage" "github.com/jaegertracing/jaeger/thrift-gen/jaeger" "github.com/jaegertracing/jaeger/thrift-gen/zipkincore" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/component/componentstatus" - "go.opentelemetry.io/collector/config/configgrpc" - "go.opentelemetry.io/collector/config/confighttp" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/receiver" "go.opentelemetry.io/collector/receiver/receiverhelper" @@ -40,24 +35,13 @@ import ( jaegertranslator "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/jaeger" ) -// configuration defines the behavior and the ports that -// the Jaeger receiver will use. -type configuration struct { - HTTPServerConfig confighttp.ServerConfig - GRPCServerConfig configgrpc.ServerConfig - - AgentCompactThrift ProtocolUDP - AgentBinaryThrift ProtocolUDP - AgentHTTPEndpoint string -} - // Receiver type is used to receive spans that were originally intended to be sent to Jaeger. // This receiver is basically a Jaeger collector. type jReceiver struct { nextConsumer consumer.Traces id component.ID - config *configuration + config Protocols grpc *grpc.Server collectorServer *http.Server @@ -92,7 +76,7 @@ var acceptedThriftFormats = map[string]struct{}{ // also as a Jaeger agent. func newJaegerReceiver( id component.ID, - config *configuration, + config Protocols, nextConsumer consumer.Traces, set receiver.Settings, ) (*jReceiver, error) { @@ -124,7 +108,7 @@ func newJaegerReceiver( } func (jr *jReceiver) Start(ctx context.Context, host component.Host) error { - if err := jr.startAgent(host); err != nil { + if err := jr.startAgent(); err != nil { return err } @@ -168,23 +152,10 @@ func consumeTraces(ctx context.Context, batch *jaeger.Batch, consumer consumer.T } var ( - _ agent.Agent = (*agentHandler)(nil) - _ api_v2.CollectorServiceServer = (*jReceiver)(nil) - _ configmanager.ClientConfigManager = (*notImplementedConfigManager)(nil) + _ agent.Agent = (*agentHandler)(nil) + _ api_v2.CollectorServiceServer = (*jReceiver)(nil) ) -var errNotImplemented = errors.New("not implemented") - -type notImplementedConfigManager struct{} - -func (notImplementedConfigManager) GetSamplingStrategy(_ context.Context, _ string) (*api_v2.SamplingStrategyResponse, error) { - return nil, errNotImplemented -} - -func (notImplementedConfigManager) GetBaggageRestrictions(_ context.Context, _ string) ([]*baggage.BaggageRestriction, error) { - return nil, errNotImplemented -} - type agentHandler struct { nextConsumer consumer.Traces obsrecv *receiverhelper.ObsReport @@ -223,12 +194,8 @@ func (jr *jReceiver) PostSpans(ctx context.Context, r *api_v2.PostSpansRequest) return &api_v2.PostSpansResponse{}, nil } -func (jr *jReceiver) startAgent(host component.Host) error { - if jr.config == nil { - return nil - } - - if jr.config.AgentBinaryThrift.Endpoint != "" { +func (jr *jReceiver) startAgent() error { + if jr.config.ThriftBinaryUDP != nil { obsrecv, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{ ReceiverID: jr.id, Transport: agentTransportBinary, @@ -242,14 +209,14 @@ func (jr *jReceiver) startAgent(host component.Host) error { nextConsumer: jr.nextConsumer, obsrecv: obsrecv, } - processor, err := jr.buildProcessor(jr.config.AgentBinaryThrift.Endpoint, jr.config.AgentBinaryThrift.ServerConfigUDP, apacheThrift.NewTBinaryProtocolFactoryConf(nil), h) + processor, err := jr.buildProcessor(jr.config.ThriftBinaryUDP.Endpoint, jr.config.ThriftBinaryUDP.ServerConfigUDP, apacheThrift.NewTBinaryProtocolFactoryConf(nil), h) if err != nil { return err } jr.agentProcessors = append(jr.agentProcessors, processor) } - if jr.config.AgentCompactThrift.Endpoint != "" { + if jr.config.ThriftCompactUDP != nil { obsrecv, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{ ReceiverID: jr.id, Transport: agentTransportCompact, @@ -262,7 +229,7 @@ func (jr *jReceiver) startAgent(host component.Host) error { nextConsumer: jr.nextConsumer, obsrecv: obsrecv, } - processor, err := jr.buildProcessor(jr.config.AgentCompactThrift.Endpoint, jr.config.AgentCompactThrift.ServerConfigUDP, apacheThrift.NewTCompactProtocolFactoryConf(nil), h) + processor, err := jr.buildProcessor(jr.config.ThriftCompactUDP.Endpoint, jr.config.ThriftCompactUDP.ServerConfigUDP, apacheThrift.NewTCompactProtocolFactoryConf(nil), h) if err != nil { return err } @@ -277,18 +244,6 @@ func (jr *jReceiver) startAgent(host component.Host) error { }(processor) } - if jr.config.AgentHTTPEndpoint != "" { - jr.agentServer = httpserver.NewHTTPServer(jr.config.AgentHTTPEndpoint, ¬ImplementedConfigManager{}, metrics.NullFactory, jr.settings.Logger) - - jr.goroutines.Add(1) - go func() { - defer jr.goroutines.Done() - if err := jr.agentServer.ListenAndServe(); !errors.Is(err, http.ErrServerClosed) && err != nil { - componentstatus.ReportStatus(host, componentstatus.NewFatalErrorEvent(fmt.Errorf("jaeger agent server error: %w", err))) - } - }() - } - return nil } @@ -370,20 +325,16 @@ func (jr *jReceiver) HandleThriftHTTPBatch(w http.ResponseWriter, r *http.Reques } func (jr *jReceiver) startCollector(ctx context.Context, host component.Host) error { - if jr.config == nil { - return nil - } - - if jr.config.HTTPServerConfig.Endpoint != "" { - cln, err := jr.config.HTTPServerConfig.ToListener(ctx) + if jr.config.ThriftHTTP != nil { + cln, err := jr.config.ThriftHTTP.ToListener(ctx) if err != nil { return fmt.Errorf("failed to bind to Collector address %q: %w", - jr.config.HTTPServerConfig.Endpoint, err) + jr.config.ThriftHTTP.Endpoint, err) } nr := mux.NewRouter() nr.HandleFunc("/api/traces", jr.HandleThriftHTTPBatch).Methods(http.MethodPost) - jr.collectorServer, err = jr.config.HTTPServerConfig.ToServer(ctx, host, jr.settings.TelemetrySettings, nr) + jr.collectorServer, err = jr.config.ThriftHTTP.ToServer(ctx, host, jr.settings.TelemetrySettings, nr) if err != nil { return err } @@ -397,16 +348,16 @@ func (jr *jReceiver) startCollector(ctx context.Context, host component.Host) er }() } - if jr.config.GRPCServerConfig.NetAddr.Endpoint != "" { + if jr.config.GRPC != nil { var err error - jr.grpc, err = jr.config.GRPCServerConfig.ToServer(ctx, host, jr.settings.TelemetrySettings) + jr.grpc, err = jr.config.GRPC.ToServer(ctx, host, jr.settings.TelemetrySettings) if err != nil { return fmt.Errorf("failed to build the options for the Jaeger gRPC Collector: %w", err) } - ln, err := jr.config.GRPCServerConfig.NetAddr.Listen(ctx) + ln, err := jr.config.GRPC.NetAddr.Listen(ctx) if err != nil { - return fmt.Errorf("failed to bind to gRPC address %q: %w", jr.config.GRPCServerConfig.NetAddr, err) + return fmt.Errorf("failed to bind to gRPC address %q: %w", jr.config.GRPC.NetAddr, err) } api_v2.RegisterCollectorServiceServer(jr.grpc, jr) diff --git a/receiver/jaegerreceiver/trace_receiver_test.go b/receiver/jaegerreceiver/trace_receiver_test.go index 31062bf57791..5377850bdc12 100644 --- a/receiver/jaegerreceiver/trace_receiver_test.go +++ b/receiver/jaegerreceiver/trace_receiver_test.go @@ -45,7 +45,7 @@ var jaegerReceiver = component.MustNewIDWithName("jaeger", "receiver_test") func TestTraceSource(t *testing.T) { set := receivertest.NewNopSettings() - jr, err := newJaegerReceiver(jaegerReceiver, &configuration{}, nil, set) + jr, err := newJaegerReceiver(jaegerReceiver, Protocols{}, nil, set) require.NoError(t, err) require.NotNil(t, jr) } @@ -77,8 +77,8 @@ func TestThriftHTTPBodyDecode(t *testing.T) { func TestReception(t *testing.T) { addr := testutil.GetAvailableLocalAddress(t) // 1. Create the Jaeger receiver aka "server" - config := &configuration{ - HTTPServerConfig: confighttp.ServerConfig{ + config := Protocols{ + ThriftHTTP: &confighttp.ServerConfig{ Endpoint: addr, }, } @@ -110,7 +110,7 @@ func TestReception(t *testing.T) { func TestPortsNotOpen(t *testing.T) { // an empty config should result in no open ports - config := &configuration{} + config := Protocols{} sink := new(consumertest.TracesSink) @@ -139,8 +139,8 @@ func TestPortsNotOpen(t *testing.T) { func TestGRPCReception(t *testing.T) { // prepare - config := &configuration{ - GRPCServerConfig: configgrpc.ServerConfig{ + config := Protocols{ + GRPC: &configgrpc.ServerConfig{ NetAddr: confignet.AddrConfig{ Endpoint: testutil.GetAvailableLocalAddress(t), Transport: confignet.TransportTypeTCP, @@ -156,7 +156,7 @@ func TestGRPCReception(t *testing.T) { require.NoError(t, jr.Start(context.Background(), componenttest.NewNopHost())) t.Cleanup(func() { require.NoError(t, jr.Shutdown(context.Background())) }) - conn, err := grpc.NewClient(config.GRPCServerConfig.NetAddr.Endpoint, grpc.WithTransportCredentials(insecure.NewCredentials())) + conn, err := grpc.NewClient(config.GRPC.NetAddr.Endpoint, grpc.WithTransportCredentials(insecure.NewCredentials())) require.NoError(t, err) defer conn.Close() @@ -194,7 +194,7 @@ func TestGRPCReceptionWithTLS(t *testing.T) { }, } - grpcServerSettings := configgrpc.ServerConfig{ + grpcServerSettings := &configgrpc.ServerConfig{ NetAddr: confignet.AddrConfig{ Endpoint: testutil.GetAvailableLocalAddress(t), Transport: confignet.TransportTypeTCP, @@ -202,8 +202,8 @@ func TestGRPCReceptionWithTLS(t *testing.T) { TLSSetting: tlsCreds, } - config := &configuration{ - GRPCServerConfig: grpcServerSettings, + config := Protocols{ + GRPC: grpcServerSettings, } sink := new(consumertest.TracesSink) @@ -335,8 +335,8 @@ func grpcFixture(t *testing.T, t1 time.Time, d1, d2 time.Duration) *api_v2.PostS } func TestSampling(t *testing.T) { - config := &configuration{ - GRPCServerConfig: configgrpc.ServerConfig{NetAddr: confignet.AddrConfig{ + config := Protocols{ + GRPC: &configgrpc.ServerConfig{NetAddr: confignet.AddrConfig{ Endpoint: testutil.GetAvailableLocalAddress(t), Transport: confignet.TransportTypeTCP, }}, @@ -350,7 +350,7 @@ func TestSampling(t *testing.T) { require.NoError(t, jr.Start(context.Background(), componenttest.NewNopHost())) t.Cleanup(func() { require.NoError(t, jr.Shutdown(context.Background())) }) - conn, err := grpc.NewClient(config.GRPCServerConfig.NetAddr.Endpoint, grpc.WithTransportCredentials(insecure.NewCredentials())) + conn, err := grpc.NewClient(config.GRPC.NetAddr.Endpoint, grpc.WithTransportCredentials(insecure.NewCredentials())) assert.NoError(t, err) defer conn.Close()