Skip to content

Commit

Permalink
APIGOV-29256 - agent status update over gRPC stream (#861)
Browse files Browse the repository at this point in the history
* APIGOV-29256 - new format for User-Agent header

* APIGOV-29256 - watch proto updates
- added optional request type to watch request to identify token refresh or agent status update. Defaults to token refresh for backward compatibility
- added TriggerAction event type to watch event
- added optional action metadata to watch event metadata to hold details about TriggerAction event

* APIGOV-29256 - update user agent in sample

* APIGOV-29256 - update user agent in sample

* APIGOV-29256 - updated watch client for forwarding request using a channel

* APIGOV-29256 - updated stream client to use request queue to send status updates

* APIGOV-29256 - use stream to send status update when using gRPC

* APIGOV-29256 - unit test fixes

* APIGOV-29256 - code review update

* APIGOV-29256 - fix regex to parse commit sha

* APIGOV-29256 - agent dataplane type to component map
includes fix to close wait on error before any watch request send

* APIGOV-29256 - race condition fix

* APIGOV-29256 - Fix user-agent regex - thanks Jason

* APIGOV-29256 - watch client test optimization
  • Loading branch information
vivekschauhan authored Dec 11, 2024
1 parent 99e67fb commit 2d68f3b
Show file tree
Hide file tree
Showing 21 changed files with 1,217 additions and 210 deletions.
28 changes: 23 additions & 5 deletions pkg/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -536,14 +536,13 @@ func GetUserAgent() string {
agentName = agent.cfg.GetAgentName()
isGRPC = agent.cfg.IsUsingGRPC()
}
return util.FormatUserAgent(
return util.NewUserAgent(
config.AgentTypeName,
config.AgentVersion,
config.SDKVersion,
envName,
agentName,
isRunningInDockerContainer(),
isGRPC)
isGRPC).FormatUserAgent()
}

// setCentralConfig - Sets the central config
Expand Down Expand Up @@ -625,16 +624,32 @@ func UpdateStatusWithPrevious(status, prevStatus, description string) {
func UpdateStatusWithContext(ctx context.Context, status, prevStatus, description string) {
agent.status = status
logger := ctx.Value(ctxLogger).(log.FieldLogger)
if agent.cfg != nil && agent.cfg.IsUsingGRPC() {
updateStatusOverStream(status, prevStatus, description)
return
}

if agent.agentResourceManager != nil {
err := agent.agentResourceManager.UpdateAgentStatus(status, prevStatus, description)
if err != nil {
logger.WithError(err).Warnf("could not update the agent status reference")
logger.WithError(err).Warn("could not update the agent status reference")
}
} else {
logger.WithField("status", agent.status).Trace("skipping status update, agent resource manager is not initialized")
}
}

func updateStatusOverStream(status, prevStatus, description string) {
if agent.streamer != nil {
err := agent.streamer.UpdateAgentStatus(status, prevStatus, description)
if err != nil {
logger.WithError(err).Warn("could not update the agent status reference")
}
} else {
logger.WithField("status", agent.status).Trace("skipping status update, stream client is not initialized")
}
}

func setupSignalProcessor() {
if !agent.agentFeaturesCfg.ProcessSystemSignalsEnabled() {
return
Expand Down Expand Up @@ -712,7 +727,10 @@ func setupProfileSignalProcessor(cpuProfile, memProfile string) {

// cleanUp - AgentCleanup
func cleanUp() {
UpdateStatusWithPrevious(AgentStopped, AgentRunning, "")
// stopped status updated with gRPC watch
if !agent.cfg.IsUsingGRPC() {
UpdateStatusWithPrevious(AgentStopped, AgentRunning, "")
}
}

func newHandlers() []handler.Handler {
Expand Down
120 changes: 120 additions & 0 deletions pkg/agent/events/requestqueue.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
package events

import (
"context"
"errors"
"sync"

"github.com/Axway/agent-sdk/pkg/util/log"

"github.com/Axway/agent-sdk/pkg/watchmanager/proto"
)

type RequestQueue interface {
Start()
Write(request *proto.Request) error
Stop()
IsActive() bool
}

// requestQueue
type requestQueue struct {
cancel context.CancelFunc
ctx context.Context
logger log.FieldLogger
requestCh chan *proto.Request
receiveCh chan *proto.Request
isActive bool
lock *sync.Mutex
}

// NewRequestQueueFunc type for creating a new request queue
type NewRequestQueueFunc func(requestCh chan *proto.Request) RequestQueue

// NewRequestQueue creates a new queue for the requests to be sent for watch subscription
func NewRequestQueue(requestCh chan *proto.Request) RequestQueue {
ctx, cancel := context.WithCancel(context.Background())
logger := log.NewFieldLogger().
WithComponent("requestQueue").
WithPackage("sdk.agent.events")

return &requestQueue{
cancel: cancel,
ctx: ctx,
logger: logger,
requestCh: requestCh,
receiveCh: make(chan *proto.Request, 1),
lock: &sync.Mutex{},
}
}

func (q *requestQueue) Stop() {
q.lock.Lock()
defer q.lock.Unlock()

if q.cancel != nil {
q.cancel()
}
}

func (q *requestQueue) IsActive() bool {
q.lock.Lock()
defer q.lock.Unlock()

return q.isActive
}

func (q *requestQueue) Write(request *proto.Request) error {
q.lock.Lock()
defer q.lock.Unlock()

if !q.isActive {
return errors.New("request queue is not active")
}

if q.receiveCh != nil {
q.logger.WithField("requestType", request.RequestType).Trace("received stream request")
q.receiveCh <- request
}
return nil
}

func (q *requestQueue) Start() {
go func() {
q.lock.Lock()
q.isActive = true
q.lock.Unlock()

defer func() {
q.lock.Lock()
defer q.lock.Unlock()
q.isActive = false
}()

for {
if q.process() {
break
}
}
}()
}

func (q *requestQueue) process() bool {
done := false
select {
case req := <-q.receiveCh:
q.logger.WithField("requestType", req.RequestType).Trace("forwarding stream request")
q.requestCh <- req
q.logger.WithField("requestType", req.RequestType).Trace("stream request forwarded")
case <-q.ctx.Done():
q.logger.Trace("stream request queue has been gracefully stopped")
done = true
if q.receiveCh != nil {
close(q.receiveCh)
q.receiveCh = nil
}
break
}

return done
}
64 changes: 64 additions & 0 deletions pkg/agent/events/requestqueue_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package events

import (
"sync"
"testing"
"time"

"github.com/Axway/agent-sdk/pkg/watchmanager/proto"
"github.com/stretchr/testify/assert"
)

func TestRequestQueue(t *testing.T) {
cases := []struct {
name string
writeErr bool
queueActivated bool
}{
{
name: "write error when queue is not active",
writeErr: true,
queueActivated: false,
},
{
name: "write success with active queue",
queueActivated: true,
},
}
for _, tc := range cases {
requestCh := make(chan *proto.Request, 1)
t.Run(tc.name, func(t *testing.T) {
q := NewRequestQueue(requestCh)
var receivedReq *proto.Request
wg := sync.WaitGroup{}
if tc.queueActivated {
wg.Add(1)
q.Start()
time.Sleep(1 * time.Second)
go func() {
receivedReq = <-requestCh
q.Stop()
wg.Done()
}()
}

req := &proto.Request{
RequestType: proto.RequestType_AGENT_STATUS.Enum(),
AgentStatus: &proto.AgentStatus{
State: "running",
},
}
writeErr := q.Write(req)
if tc.writeErr {
assert.NotNil(t, writeErr)
return
}
assert.Nil(t, writeErr)

wg.Wait()
assert.Equal(t, req, receivedReq)
assert.False(t, q.IsActive())
})
}

}
4 changes: 2 additions & 2 deletions pkg/agent/resource/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ func (a *agentResourceManager) FetchAgentResource() error {
func (a *agentResourceManager) UpdateAgentStatus(status, prevStatus, message string) error {
if a.cfg == nil || a.cfg.GetAgentName() == "" {
a.logger.WithField("status", status).
WithField("previous-status", prevStatus).
WithField("previousStatus", prevStatus).
Trace("skipping agent status update, agent name config not set")
return nil
}
Expand All @@ -148,7 +148,7 @@ func (a *agentResourceManager) UpdateAgentStatus(status, prevStatus, message str

if a.agentResource == nil {
a.logger.WithField("status", status).
WithField("previous-status", prevStatus).
WithField("previousStatus", prevStatus).
Trace("skipping agent status update, agent resource not initialized")
return nil
}
Expand Down
46 changes: 31 additions & 15 deletions pkg/agent/statusupdate.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,9 @@ type agentStatusUpdate struct {
immediateStatusChange bool
typeOfStatusUpdate string
prevStatus string
prevStatusDetail string
logger log.FieldLogger
initialExecution bool
}

var periodicStatusUpdate *agentStatusUpdate
Expand All @@ -65,6 +67,14 @@ func (su *agentStatusUpdate) Status() error {
}

func (su *agentStatusUpdate) Execute() error {
if su.typeOfStatusUpdate == immediate && !su.initialExecution {
su.initialExecution = true
if previousStatus == "" {
previousStatus = AgentRunning
}
return nil
}

id, _ := uuid.NewUUID()
log := su.logger.WithField("status-update-id", id)

Expand All @@ -84,20 +94,20 @@ func (su *agentStatusUpdate) Execute() error {
if su.typeOfStatusUpdate == periodic {
// always update on the periodic status update, even if the status has not changed
log.
WithField("previous-status", previousStatus).
WithField("previous-status-detail", previousStatusDetail).
WithField("new-status", status).
WithField("new-status-detail", statusDetail).
WithField("previousStatus", previousStatus).
WithField("previousStatusDetail", previousStatusDetail).
WithField("newStatus", status).
WithField("newStatusDetail", statusDetail).
Debugf("%s -- Last activity updated", su.typeOfStatusUpdate)
UpdateStatusWithContext(ctx, status, previousStatus, statusDetail)
su.previousActivityTime = su.currentActivityTime
} else if previousStatus != status || previousStatusDetail != statusDetail {
// if the status has changed then report that on the immediate check
log.
WithField("previous-status", previousStatus).
WithField("previous-status-detail", previousStatusDetail).
WithField("new-status", status).
WithField("new-status-detail", statusDetail).
WithField("previousStatus", previousStatus).
WithField("previousStatusDetail", previousStatusDetail).
WithField("newStatus", status).
WithField("newStatusDetail", statusDetail).
Debug("status is changing")
UpdateStatusWithContext(ctx, status, previousStatus, statusDetail)
su.previousActivityTime = su.currentActivityTime
Expand All @@ -117,7 +127,9 @@ func StartAgentStatusUpdate() {
logger.WithError(err).Error("not starting status update jobs")
return
}
startPeriodicStatusUpdate(logger)
if !agent.cfg.IsUsingGRPC() {
startPeriodicStatusUpdate(logger)
}
startImmediateStatusUpdate(logger)
}

Expand All @@ -126,7 +138,7 @@ func startPeriodicStatusUpdate(logger log.FieldLogger) {
interval := agent.cfg.GetReportActivityFrequency()
periodicStatusUpdate = &agentStatusUpdate{
typeOfStatusUpdate: periodic,
logger: logger.WithField("status-check", periodic),
logger: logger.WithField("statusCheck", periodic),
}
_, err := jobs.RegisterIntervalJobWithName(periodicStatusUpdate, interval, "Status Update")

Expand All @@ -142,7 +154,7 @@ func startImmediateStatusUpdate(logger log.FieldLogger) {
immediateStatusUpdate = &agentStatusUpdate{
immediateStatusChange: true,
typeOfStatusUpdate: immediate,
logger: logger.WithField("status-check", immediate),
logger: logger.WithField("statusCheck", immediate),
}
_, err := jobs.RegisterDetachedIntervalJobWithName(immediateStatusUpdate, interval, "Immediate Status Update")

Expand All @@ -156,13 +168,16 @@ func (su *agentStatusUpdate) getCombinedStatus(ctx context.Context) (string, str
status := su.getJobPoolStatus(ctx)
statusDetail := ""
if status != AgentRunning {
statusDetail = "agent job pool not running"
// do not clean the previous unhealthy status detail until the agent is in running state
if su.prevStatusDetail != "" {
statusDetail = su.prevStatusDetail
}
}

hcStatus, hcStatusDetail := su.getHealthcheckStatus(ctx)
entry := log.WithField("pool-status", status).
WithField("healthcheck-status", hcStatus).
WithField("healthcheck-status-detail", hcStatusDetail)
entry := log.WithField("poolStatus", status).
WithField("healthCheckStatus", hcStatus).
WithField("healthCheckStatusDetail", hcStatusDetail)

if hcStatus != AgentRunning {
entry.Info("agent not in running status")
Expand All @@ -175,6 +190,7 @@ func (su *agentStatusUpdate) getCombinedStatus(ctx context.Context) (string, str
}

su.prevStatus = status
su.prevStatusDetail = statusDetail
return status, statusDetail
}

Expand Down
Loading

0 comments on commit 2d68f3b

Please sign in to comment.