Skip to content

Commit

Permalink
#170: Handling 401 error to mark the model as permanently unhealthy
Browse files Browse the repository at this point in the history
  • Loading branch information
roma-glushko committed Mar 17, 2024
1 parent 3742035 commit 27656ea
Show file tree
Hide file tree
Showing 5 changed files with 32 additions and 19 deletions.
1 change: 1 addition & 0 deletions pkg/providers/clients/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

var (
ErrProviderUnavailable = errors.New("provider is not available")
ErrUnauthorized = errors.New("API key is wrong or not set")
ErrChatStreamNotImplemented = errors.New("streaming chat API is not implemented for provider")
)

Expand Down
27 changes: 13 additions & 14 deletions pkg/providers/lang.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,26 +110,25 @@ func (m *LanguageModel) ChatStream(ctx context.Context, req *schemas.ChatRequest
return nil, err
}

streamResultC := make(chan *clients.ChatStreamResult)

go func() {
defer close(streamResultC)
startedAt := time.Now()
err = stream.Open()
chunkLatency := time.Since(startedAt)

startedAt := time.Now()
err = stream.Open()
chunkLatency := time.Since(startedAt)
// the first chunk latency
m.chatStreamLatency.Add(float64(chunkLatency))

// the first chunk latency
m.chatStreamLatency.Add(float64(chunkLatency))
if err != nil {
// if connection was not even open, we should not send our clients any messages about this failure

if err != nil {
m.healthTracker.TrackErr(err)
m.healthTracker.TrackErr(err)

// if connection was not even open, we should not send our clients any messages about this failure
return nil, err
}

return
}
streamResultC := make(chan *clients.ChatStreamResult)

go func() {
defer close(streamResultC)
defer stream.Close()

for {
Expand Down
4 changes: 4 additions & 0 deletions pkg/providers/openai/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,10 @@ func (m *ErrorMapper) Map(resp *http.Response) error {
return clients.NewRateLimitError(&cooldownDelay)
}

if resp.StatusCode == http.StatusUnauthorized {
return clients.ErrUnauthorized
}

// Server & client errors result in the same error to keep gateway resilient
return clients.ErrProviderUnavailable
}
18 changes: 13 additions & 5 deletions pkg/routers/health/tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,24 +8,32 @@ import (

// Tracker tracks errors and general health of model provider
type Tracker struct {
errBudget *TokenBucket
rateLimit *RateLimitTracker
unauthorized bool
errBudget *TokenBucket
rateLimit *RateLimitTracker
}

func NewTracker(budget *ErrorBudget) *Tracker {
return &Tracker{
rateLimit: NewRateLimitTracker(),
errBudget: NewTokenBucket(budget.TimePerTokenMicro(), budget.Budget()),
unauthorized: false,
rateLimit: NewRateLimitTracker(),
errBudget: NewTokenBucket(budget.TimePerTokenMicro(), budget.Budget()),
}
}

func (t *Tracker) Healthy() bool {
return !t.rateLimit.Limited() && t.errBudget.HasTokens()
return !t.unauthorized && !t.rateLimit.Limited() && t.errBudget.HasTokens()
}

func (t *Tracker) TrackErr(err error) {
var rateLimitErr *clients.RateLimitError

if errors.Is(err, clients.ErrUnauthorized) {
t.unauthorized = true

return
}

if errors.As(err, &rateLimitErr) {
t.rateLimit.SetLimited(rateLimitErr.UntilReset())

Expand Down
1 change: 1 addition & 0 deletions pkg/routers/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ func (r *LangRouter) ChatStream(

langModel := model.(providers.LangModel)
modelRespC, err := langModel.ChatStream(ctx, req)

if err != nil {
r.tel.L().Error(
"Lang model failed to create streaming chat request",
Expand Down

0 comments on commit 27656ea

Please sign in to comment.