Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/main' into es-direct-serialization
Browse files Browse the repository at this point in the history
  • Loading branch information
felixbarny committed Jan 10, 2025
2 parents 7ba2575 + 4d17daf commit 80976d0
Show file tree
Hide file tree
Showing 13 changed files with 123 additions and 117 deletions.
121 changes: 61 additions & 60 deletions cmd/opampsupervisor/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func (tl testLogger) Errorf(_ context.Context, format string, args ...any) {
tl.t.Logf(format, args...)
}

func defaultConnectingHandler(connectionCallbacks server.ConnectionCallbacksStruct) func(request *http.Request) types.ConnectionResponse {
func defaultConnectingHandler(connectionCallbacks types.ConnectionCallbacks) func(request *http.Request) types.ConnectionResponse {
return func(_ *http.Request) types.ConnectionResponse {
return types.ConnectionResponse{
Accept: true,
Expand All @@ -73,11 +73,11 @@ func defaultConnectingHandler(connectionCallbacks server.ConnectionCallbacksStru
}
}

// onConnectingFuncFactory is a function that will be given to server.CallbacksStruct as
// onConnectingFuncFactory is a function that will be given to types.ConnectionCallbacks as
// OnConnectingFunc. This allows changing the ConnectionCallbacks both from the newOpAMPServer
// caller and inside of newOpAMP Server, and for custom implementations of the value for `Accept`
// in types.ConnectionResponse.
type onConnectingFuncFactory func(connectionCallbacks server.ConnectionCallbacksStruct) func(request *http.Request) types.ConnectionResponse
type onConnectingFuncFactory func(connectionCallbacks types.ConnectionCallbacks) func(request *http.Request) types.ConnectionResponse

type testingOpAMPServer struct {
addr string
Expand All @@ -87,38 +87,38 @@ type testingOpAMPServer struct {
shutdown func()
}

func newOpAMPServer(t *testing.T, connectingCallback onConnectingFuncFactory, callbacks server.ConnectionCallbacksStruct) *testingOpAMPServer {
func newOpAMPServer(t *testing.T, connectingCallback onConnectingFuncFactory, callbacks types.ConnectionCallbacks) *testingOpAMPServer {
s := newUnstartedOpAMPServer(t, connectingCallback, callbacks)
s.start()
return s
}

func newUnstartedOpAMPServer(t *testing.T, connectingCallback onConnectingFuncFactory, callbacks server.ConnectionCallbacksStruct) *testingOpAMPServer {
func newUnstartedOpAMPServer(t *testing.T, connectingCallback onConnectingFuncFactory, callbacks types.ConnectionCallbacks) *testingOpAMPServer {
var agentConn atomic.Value
var isAgentConnected atomic.Bool
var didShutdown atomic.Bool
connectedChan := make(chan bool)
s := server.New(testLogger{t: t})
onConnectedFunc := callbacks.OnConnectedFunc
callbacks.OnConnectedFunc = func(ctx context.Context, conn types.Connection) {
onConnectedFunc := callbacks.OnConnected
callbacks.OnConnected = func(ctx context.Context, conn types.Connection) {
if onConnectedFunc != nil {
onConnectedFunc(ctx, conn)
}
agentConn.Store(conn)
isAgentConnected.Store(true)
connectedChan <- true
}
onConnectionCloseFunc := callbacks.OnConnectionCloseFunc
callbacks.OnConnectionCloseFunc = func(conn types.Connection) {
onConnectionCloseFunc := callbacks.OnConnectionClose
callbacks.OnConnectionClose = func(conn types.Connection) {
isAgentConnected.Store(false)
connectedChan <- false
if onConnectionCloseFunc != nil {
onConnectionCloseFunc(conn)
}
}
handler, _, err := s.Attach(server.Settings{
Callbacks: server.CallbacksStruct{
OnConnectingFunc: connectingCallback(callbacks),
Callbacks: types.Callbacks{
OnConnecting: connectingCallback(callbacks),
},
})
require.NoError(t, err)
Expand Down Expand Up @@ -211,8 +211,8 @@ func TestSupervisorStartsCollectorWithRemoteConfig(t *testing.T) {
server := newOpAMPServer(
t,
defaultConnectingHandler,
server.ConnectionCallbacksStruct{
OnMessageFunc: func(_ context.Context, _ types.Connection, message *protobufs.AgentToServer) *protobufs.ServerToAgent {
types.ConnectionCallbacks{
OnMessage: func(_ context.Context, _ types.Connection, message *protobufs.AgentToServer) *protobufs.ServerToAgent {
if message.EffectiveConfig != nil {
config := message.EffectiveConfig.ConfigMap.ConfigMap[""]
if config != nil {
Expand Down Expand Up @@ -287,8 +287,8 @@ func TestSupervisorStartsCollectorWithNoOpAMPServer(t *testing.T) {
require.NoError(t, os.WriteFile(remoteConfigFilePath, marshalledRemoteConfig, 0o600))

connected := atomic.Bool{}
server := newUnstartedOpAMPServer(t, defaultConnectingHandler, server.ConnectionCallbacksStruct{
OnConnectedFunc: func(ctx context.Context, conn types.Connection) {
server := newUnstartedOpAMPServer(t, defaultConnectingHandler, types.ConnectionCallbacks{
OnConnected: func(ctx context.Context, conn types.Connection) {
connected.Store(true)
},
})
Expand Down Expand Up @@ -331,19 +331,20 @@ func TestSupervisorStartsWithNoOpAMPServer(t *testing.T) {

configuredChan := make(chan struct{})
connected := atomic.Bool{}
server := newUnstartedOpAMPServer(t, defaultConnectingHandler, server.ConnectionCallbacksStruct{
OnConnectedFunc: func(ctx context.Context, conn types.Connection) {
connected.Store(true)
},
OnMessageFunc: func(ctx context.Context, conn types.Connection, message *protobufs.AgentToServer) *protobufs.ServerToAgent {
lastCfgHash := message.GetRemoteConfigStatus().GetLastRemoteConfigHash()
if bytes.Equal(lastCfgHash, hash) {
close(configuredChan)
}
server := newUnstartedOpAMPServer(t, defaultConnectingHandler,
types.ConnectionCallbacks{
OnConnected: func(ctx context.Context, conn types.Connection) {
connected.Store(true)
},
OnMessage: func(ctx context.Context, conn types.Connection, message *protobufs.AgentToServer) *protobufs.ServerToAgent {
lastCfgHash := message.GetRemoteConfigStatus().GetLastRemoteConfigHash()
if bytes.Equal(lastCfgHash, hash) {
close(configuredChan)
}

return &protobufs.ServerToAgent{}
},
})
return &protobufs.ServerToAgent{}
},
})
defer server.shutdown()

// The supervisor is started without a running OpAMP server.
Expand Down Expand Up @@ -415,8 +416,8 @@ func TestSupervisorRestartsCollectorAfterBadConfig(t *testing.T) {
server := newOpAMPServer(
t,
defaultConnectingHandler,
server.ConnectionCallbacksStruct{
OnMessageFunc: func(_ context.Context, _ types.Connection, message *protobufs.AgentToServer) *protobufs.ServerToAgent {
types.ConnectionCallbacks{
OnMessage: func(_ context.Context, _ types.Connection, message *protobufs.AgentToServer) *protobufs.ServerToAgent {
if message.Health != nil {
healthReport.Store(message.Health)
}
Expand Down Expand Up @@ -501,8 +502,8 @@ func TestSupervisorConfiguresCapabilities(t *testing.T) {
server := newOpAMPServer(
t,
defaultConnectingHandler,
server.ConnectionCallbacksStruct{
OnMessageFunc: func(_ context.Context, _ types.Connection, message *protobufs.AgentToServer) *protobufs.ServerToAgent {
types.ConnectionCallbacks{
OnMessage: func(_ context.Context, _ types.Connection, message *protobufs.AgentToServer) *protobufs.ServerToAgent {
capabilities.Store(message.Capabilities)

return &protobufs.ServerToAgent{}
Expand Down Expand Up @@ -556,8 +557,8 @@ func TestSupervisorBootstrapsCollector(t *testing.T) {
server := newOpAMPServer(
t,
defaultConnectingHandler,
server.ConnectionCallbacksStruct{
OnMessageFunc: func(_ context.Context, _ types.Connection, message *protobufs.AgentToServer) *protobufs.ServerToAgent {
types.ConnectionCallbacks{
OnMessage: func(_ context.Context, _ types.Connection, message *protobufs.AgentToServer) *protobufs.ServerToAgent {
if message.AgentDescription != nil {
agentDescription.Store(message.AgentDescription)
}
Expand Down Expand Up @@ -602,8 +603,8 @@ func TestSupervisorReportsEffectiveConfig(t *testing.T) {
server := newOpAMPServer(
t,
defaultConnectingHandler,
server.ConnectionCallbacksStruct{
OnMessageFunc: func(_ context.Context, _ types.Connection, message *protobufs.AgentToServer) *protobufs.ServerToAgent {
types.ConnectionCallbacks{
OnMessage: func(_ context.Context, _ types.Connection, message *protobufs.AgentToServer) *protobufs.ServerToAgent {
if message.EffectiveConfig != nil {
config := message.EffectiveConfig.ConfigMap.ConfigMap[""]
if config != nil {
Expand Down Expand Up @@ -713,8 +714,8 @@ func TestSupervisorAgentDescriptionConfigApplies(t *testing.T) {
server := newOpAMPServer(
t,
defaultConnectingHandler,
server.ConnectionCallbacksStruct{
OnMessageFunc: func(_ context.Context, _ types.Connection, message *protobufs.AgentToServer) *protobufs.ServerToAgent {
types.ConnectionCallbacks{
OnMessage: func(_ context.Context, _ types.Connection, message *protobufs.AgentToServer) *protobufs.ServerToAgent {
if message.AgentDescription != nil {
select {
case agentDescMessageChan <- message:
Expand Down Expand Up @@ -866,8 +867,8 @@ func TestSupervisorRestartCommand(t *testing.T) {
server := newOpAMPServer(
t,
defaultConnectingHandler,
server.ConnectionCallbacksStruct{
OnMessageFunc: func(_ context.Context, _ types.Connection, message *protobufs.AgentToServer) *protobufs.ServerToAgent {
types.ConnectionCallbacks{
OnMessage: func(_ context.Context, _ types.Connection, message *protobufs.AgentToServer) *protobufs.ServerToAgent {
if message.Health != nil {
healthReport.Store(message.Health)
}
Expand Down Expand Up @@ -948,7 +949,7 @@ func TestSupervisorOpAMPConnectionSettings(t *testing.T) {
initialServer := newOpAMPServer(
t,
defaultConnectingHandler,
server.ConnectionCallbacksStruct{})
types.ConnectionCallbacks{})

s := newSupervisor(t, "accepts_conn", map[string]string{"url": initialServer.addr})

Expand All @@ -960,11 +961,11 @@ func TestSupervisorOpAMPConnectionSettings(t *testing.T) {
newServer := newOpAMPServer(
t,
defaultConnectingHandler,
server.ConnectionCallbacksStruct{
OnConnectedFunc: func(_ context.Context, _ types.Connection) {
types.ConnectionCallbacks{
OnConnected: func(_ context.Context, _ types.Connection) {
connectedToNewServer.Store(true)
},
OnMessageFunc: func(_ context.Context, _ types.Connection, _ *protobufs.AgentToServer) *protobufs.ServerToAgent {
OnMessage: func(_ context.Context, _ types.Connection, _ *protobufs.AgentToServer) *protobufs.ServerToAgent {
return &protobufs.ServerToAgent{}
},
})
Expand Down Expand Up @@ -999,8 +1000,8 @@ func TestSupervisorRestartsWithLastReceivedConfig(t *testing.T) {
initialServer := newOpAMPServer(
t,
defaultConnectingHandler,
server.ConnectionCallbacksStruct{
OnMessageFunc: func(_ context.Context, _ types.Connection, message *protobufs.AgentToServer) *protobufs.ServerToAgent {
types.ConnectionCallbacks{
OnMessage: func(_ context.Context, _ types.Connection, message *protobufs.AgentToServer) *protobufs.ServerToAgent {
if message.EffectiveConfig != nil {
config := message.EffectiveConfig.ConfigMap.ConfigMap[""]
if config != nil {
Expand Down Expand Up @@ -1043,8 +1044,8 @@ func TestSupervisorRestartsWithLastReceivedConfig(t *testing.T) {
newServer := newOpAMPServer(
t,
defaultConnectingHandler,
server.ConnectionCallbacksStruct{
OnMessageFunc: func(_ context.Context, _ types.Connection, message *protobufs.AgentToServer) *protobufs.ServerToAgent {
types.ConnectionCallbacks{
OnMessage: func(_ context.Context, _ types.Connection, message *protobufs.AgentToServer) *protobufs.ServerToAgent {
if message.EffectiveConfig != nil {
config := message.EffectiveConfig.ConfigMap.ConfigMap[""]
if config != nil {
Expand Down Expand Up @@ -1087,8 +1088,8 @@ func TestSupervisorPersistsInstanceID(t *testing.T) {
server := newOpAMPServer(
t,
defaultConnectingHandler,
server.ConnectionCallbacksStruct{
OnMessageFunc: func(_ context.Context, _ types.Connection, message *protobufs.AgentToServer) *protobufs.ServerToAgent {
types.ConnectionCallbacks{
OnMessage: func(_ context.Context, _ types.Connection, message *protobufs.AgentToServer) *protobufs.ServerToAgent {
select {
case agentIDChan <- message.InstanceUid:
default:
Expand Down Expand Up @@ -1163,8 +1164,8 @@ func TestSupervisorPersistsNewInstanceID(t *testing.T) {
server := newOpAMPServer(
t,
defaultConnectingHandler,
server.ConnectionCallbacksStruct{
OnMessageFunc: func(_ context.Context, _ types.Connection, message *protobufs.AgentToServer) *protobufs.ServerToAgent {
types.ConnectionCallbacks{
OnMessage: func(_ context.Context, _ types.Connection, message *protobufs.AgentToServer) *protobufs.ServerToAgent {
select {
case agentIDChan <- message.InstanceUid:
default:
Expand Down Expand Up @@ -1242,7 +1243,7 @@ func TestSupervisorWritesAgentFilesToStorageDir(t *testing.T) {
server := newOpAMPServer(
t,
defaultConnectingHandler,
server.ConnectionCallbacksStruct{},
types.ConnectionCallbacks{},
)

s := newSupervisor(t, "basic", map[string]string{
Expand Down Expand Up @@ -1270,8 +1271,8 @@ func TestSupervisorStopsAgentProcessWithEmptyConfigMap(t *testing.T) {
server := newOpAMPServer(
t,
defaultConnectingHandler,
server.ConnectionCallbacksStruct{
OnMessageFunc: func(_ context.Context, _ types.Connection, message *protobufs.AgentToServer) *protobufs.ServerToAgent {
types.ConnectionCallbacks{
OnMessage: func(_ context.Context, _ types.Connection, message *protobufs.AgentToServer) *protobufs.ServerToAgent {
if message.EffectiveConfig != nil {
config := message.EffectiveConfig.ConfigMap.ConfigMap[""]
if config != nil {
Expand Down Expand Up @@ -1386,8 +1387,8 @@ func TestSupervisorLogging(t *testing.T) {
require.NoError(t, os.WriteFile(remoteCfgFilePath, marshalledRemoteCfg, 0o600))

connected := atomic.Bool{}
server := newUnstartedOpAMPServer(t, defaultConnectingHandler, server.ConnectionCallbacksStruct{
OnConnectedFunc: func(ctx context.Context, conn types.Connection) {
server := newUnstartedOpAMPServer(t, defaultConnectingHandler, types.ConnectionCallbacks{
OnConnected: func(ctx context.Context, conn types.Connection) {
connected.Store(true)
},
})
Expand Down Expand Up @@ -1449,8 +1450,8 @@ func TestSupervisorRemoteConfigApplyStatus(t *testing.T) {
server := newOpAMPServer(
t,
defaultConnectingHandler,
server.ConnectionCallbacksStruct{
OnMessageFunc: func(_ context.Context, _ types.Connection, message *protobufs.AgentToServer) *protobufs.ServerToAgent {
types.ConnectionCallbacks{
OnMessage: func(_ context.Context, _ types.Connection, message *protobufs.AgentToServer) *protobufs.ServerToAgent {
if message.EffectiveConfig != nil {
config := message.EffectiveConfig.ConfigMap.ConfigMap[""]
if config != nil {
Expand Down Expand Up @@ -1586,8 +1587,8 @@ func TestSupervisorOpAmpServerPort(t *testing.T) {
server := newOpAMPServer(
t,
defaultConnectingHandler,
server.ConnectionCallbacksStruct{
OnMessageFunc: func(_ context.Context, _ types.Connection, message *protobufs.AgentToServer) *protobufs.ServerToAgent {
types.ConnectionCallbacks{
OnMessage: func(_ context.Context, _ types.Connection, message *protobufs.AgentToServer) *protobufs.ServerToAgent {
if message.EffectiveConfig != nil {
config := message.EffectiveConfig.ConfigMap.ConfigMap[""]
if config != nil {
Expand Down
3 changes: 1 addition & 2 deletions cmd/opampsupervisor/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ require (
github.com/knadh/koanf/providers/file v1.1.2
github.com/knadh/koanf/providers/rawbytes v0.1.0
github.com/knadh/koanf/v2 v2.1.2
github.com/open-telemetry/opamp-go v0.17.0
github.com/open-telemetry/opamp-go v0.18.0
github.com/stretchr/testify v1.10.0
go.opentelemetry.io/collector/config/configopaque v1.23.0
go.opentelemetry.io/collector/config/configtls v1.23.0
Expand All @@ -29,7 +29,6 @@ require (
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
github.com/fsnotify/fsnotify v1.8.0 // indirect
github.com/go-viper/mapstructure/v2 v2.2.1 // indirect
github.com/google/go-cmp v0.6.0 // indirect
github.com/gorilla/websocket v1.5.3 // indirect
github.com/knadh/koanf/providers/confmap v0.1.0 // indirect
github.com/mitchellh/copystructure v1.2.0 // indirect
Expand Down
6 changes: 4 additions & 2 deletions cmd/opampsupervisor/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 80976d0

Please sign in to comment.