Skip to content

Commit

Permalink
Preparation for variable rate heartbeats (#49433)
Browse files Browse the repository at this point in the history
* Revert "Add unstable envvar to disable server keepalives (#43342)"

This reverts commit 1e2d772.

* Allow changing a VariableDuration by more than one

* Get rid of control log pings

* Dedicated lightweight jittered ticker for the inventory

* Split off per-protocol tickers

* Tests for jittered ticker

* Use a start barrier in TestVariableDurationIncDec

* Explain value to be passed to Advance

* Add explanations in jitteredticker tests

* Rename jitteredticker to delay
  • Loading branch information
espadolini authored Nov 27, 2024
1 parent aed09a1 commit e4378be
Show file tree
Hide file tree
Showing 10 changed files with 782 additions and 515 deletions.
588 changes: 295 additions & 293 deletions api/client/proto/authservice.pb.go

Large diffs are not rendered by default.

8 changes: 4 additions & 4 deletions api/proto/teleport/legacy/client/proto/authservice.proto
Original file line number Diff line number Diff line change
Expand Up @@ -2553,10 +2553,10 @@ message InventoryPingRequest {
// ServerID is the ID of the instance to ping.
string ServerID = 1;

// ControlLog forces the ping to use the standard "commit then act" model of control log synchronization
// for the ping. This significantly increases the amount of time it takes for the ping request to
// complete, but is useful for testing/debugging control log issues.
bool ControlLog = 2;
// ControlLog used to signal that the ping should use the control log synchronization.
//
// Deprecated: the control log is unsupported and unsound to use.
bool ControlLog = 2 [deprecated = true];
}

// InventoryPingResponse returns the result of an inventory ping initiated via an
Expand Down
76 changes: 2 additions & 74 deletions lib/auth/auth.go
Original file line number Diff line number Diff line change
Expand Up @@ -4831,94 +4831,22 @@ func (a *Server) GetInventoryConnectedServiceCount(service types.SystemRole) uin
}

func (a *Server) PingInventory(ctx context.Context, req proto.InventoryPingRequest) (proto.InventoryPingResponse, error) {
const pingAttempt = "ping-attempt"
const pingSuccess = "ping-success"
const maxAttempts = 16
stream, ok := a.inventory.GetControlStream(req.ServerID)
if !ok {
return proto.InventoryPingResponse{}, trace.NotFound("no control stream found for server %q", req.ServerID)
}

id := mathrand.Uint64()

if !req.ControlLog {
// this ping doesn't pass through the control log, so just execute it immediately.
d, err := stream.Ping(ctx, id)
return proto.InventoryPingResponse{
Duration: d,
}, trace.Wrap(err)
}

// matchEntry is used to check if our log entry has been included
// in the control log.
matchEntry := func(entry types.InstanceControlLogEntry) bool {
return entry.Type == pingAttempt && entry.ID == id
}

var included bool
for i := 1; i <= maxAttempts; i++ {
stream.VisitInstanceState(func(ref inventory.InstanceStateRef) (update inventory.InstanceStateUpdate) {
// check if we've already successfully included the ping entry
if ref.LastHeartbeat != nil {
if slices.IndexFunc(ref.LastHeartbeat.GetControlLog(), matchEntry) >= 0 {
included = true
return
}
}

// if the entry pending already, we just need to wait
if slices.IndexFunc(ref.QualifiedPendingControlLog, matchEntry) >= 0 {
return
}

// either this is the first iteration, or the pending control log was reset.
update.QualifiedPendingControlLog = append(update.QualifiedPendingControlLog, types.InstanceControlLogEntry{
Type: pingAttempt,
ID: id,
Time: time.Now(),
})
stream.HeartbeatInstance()
return
})

if included {
// entry appeared in control log
break
}

// pause briefly, then re-sync our state. note that this strategy is not scalable. control log usage is intended only
// for periodic operations. control-log based pings are a mechanism for testing/debugging only, hence the use of a
// simple sleep loop.
select {
case <-time.After(time.Millisecond * 100 * time.Duration(i)):
case <-stream.Done():
return proto.InventoryPingResponse{}, trace.Errorf("control stream closed during ping attempt")
case <-ctx.Done():
return proto.InventoryPingResponse{}, trace.Wrap(ctx.Err())
}
}

if !included {
return proto.InventoryPingResponse{}, trace.LimitExceeded("failed to include ping %d in control log for instance %q (max attempts exceeded)", id, req.ServerID)
if req.ControlLog { //nolint:staticcheck // SA1019. Checking deprecated field that may be sent by older clients.
return proto.InventoryPingResponse{}, trace.BadParameter("ControlLog pings are not supported")
}

d, err := stream.Ping(ctx, id)
if err != nil {
return proto.InventoryPingResponse{}, trace.Wrap(err)
}

stream.VisitInstanceState(func(_ inventory.InstanceStateRef) (update inventory.InstanceStateUpdate) {
update.UnqualifiedPendingControlLog = append(update.UnqualifiedPendingControlLog, types.InstanceControlLogEntry{
Type: pingSuccess,
ID: id,
Labels: map[string]string{
"duration": d.String(),
},
})
return
})
stream.HeartbeatInstance()

return proto.InventoryPingResponse{
Duration: d,
}, nil
Expand Down
Loading

0 comments on commit e4378be

Please sign in to comment.