Skip to content

Commit

Permalink
kvflowcontroller: add trace statement at end of wait
Browse files Browse the repository at this point in the history
Epic: none

Release note: None
  • Loading branch information
sumeerbhola committed Sep 21, 2023
1 parent 378b181 commit 9379c63
Showing 1 changed file with 23 additions and 6 deletions.
29 changes: 23 additions & 6 deletions pkg/kv/kvserver/kvflowcontrol/kvflowcontroller/kvflowcontroller.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ func (c *Controller) Admit(
class := admissionpb.WorkClassFromPri(pri)
c.metrics.onWaiting(class)

// The value of logged transitions to true if there is any waiting.
logged := false
tstart := c.clock.PhysicalTime()
for {
Expand All @@ -155,10 +156,15 @@ func (c *Controller) Admit(
// applying flow control to their specific work class.
bypass := c.mode() == kvflowcontrol.ApplyToElastic && class == admissionpb.RegularWorkClass
if tokens > 0 || bypass {
if log.V(2) {
log.Infof(ctx, "admitted request (pri=%s stream=%s tokens=%s wait-duration=%s mode=%s)",
pri, connection.Stream(), tokens, c.clock.PhysicalTime().Sub(tstart), c.mode())
waitDuration := c.clock.PhysicalTime().Sub(tstart)
const formatStr = "admitted request (pri=%s stream=%s tokens=%s wait-duration=%s mode=%s)"
if logged {
// Always trace if there is any waiting.
log.VEventf(ctx, 2, formatStr, pri, connection.Stream(), tokens, waitDuration, c.mode())
} else if log.V(2) {
log.Infof(ctx, formatStr, pri, connection.Stream(), tokens, waitDuration, c.mode())
}
// Else, common case, no waiting and logging verbosity is not high.

// TODO(irfansharif): Right now we continue forwarding admission
// grants to request while the available tokens > 0, which can lead
Expand All @@ -179,7 +185,7 @@ func (c *Controller) Admit(
// that count is greater than some small multiple of GOMAXPROCS.

b.signal() // signal a waiter, if any
c.metrics.onAdmitted(class, c.clock.PhysicalTime().Sub(tstart))
c.metrics.onAdmitted(class, waitDuration)
if bypass {
return false, nil
}
Expand All @@ -195,12 +201,23 @@ func (c *Controller) Admit(
select {
case <-b.wait(): // wait for a signal
case <-connection.Disconnected():
c.metrics.onBypassed(class, c.clock.PhysicalTime().Sub(tstart))
waitDuration := c.clock.PhysicalTime().Sub(tstart)
log.VEventf(ctx, 2,
"bypassed as stream disconnected (pri=%s stream=%s tokens=%s wait-duration=%s mode=%s)",
pri, connection.Stream(), tokens, waitDuration, c.mode())
c.metrics.onBypassed(class, waitDuration)
return true, nil
case <-ctx.Done():
if ctx.Err() != nil {
c.metrics.onErrored(class, c.clock.PhysicalTime().Sub(tstart))
waitDuration := c.clock.PhysicalTime().Sub(tstart)
log.VEventf(ctx, 2,
"canceled after waiting (pri=%s stream=%s tokens=%s wait-duration=%s mode=%s)",
pri, connection.Stream(), tokens, waitDuration, c.mode())
c.metrics.onErrored(class, waitDuration)
}
// Else ...
// TODO(sumeer): when would error be non-nil?

return false, ctx.Err()
}
}
Expand Down

0 comments on commit 9379c63

Please sign in to comment.