Skip to content

Commit

Permalink
bugfix: avoid logging non-200 response and don't publish mesasge when…
Browse files Browse the repository at this point in the history
… there is err
  • Loading branch information
khorshuheng committed Apr 18, 2024
1 parent 09ff6f5 commit ceb306f
Show file tree
Hide file tree
Showing 2 changed files with 5 additions and 2 deletions.
5 changes: 4 additions & 1 deletion api/pkg/inference-logger/logger/mlobs_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"errors"
"fmt"
"math/rand"
"net/http"

"google.golang.org/protobuf/types/known/timestamppb"

Expand Down Expand Up @@ -168,16 +169,18 @@ func (m *MLObsSink) buildNewKafkaMessage(predictionLog *upiv1.PredictionLog) (*k

func (m *MLObsSink) Sink(rawLogEntries []*LogEntry) error {
for _, rawLogEntry := range rawLogEntries {
if rand.Float64() >= SamplingRate {
if rawLogEntry.ResponsePayload.StatusCode != http.StatusOK || rand.Float64() >= SamplingRate {
continue
}
predictionLog, err := m.newPredictionLog(rawLogEntry)
if err != nil {
m.logger.Errorf("unable to convert log entry: %v", err)
continue
}
kafkaMessage, err := m.buildNewKafkaMessage(predictionLog)
if err != nil {
m.logger.Errorf("unable to build kafka message: %v", err)
continue
}
err = m.producer.Produce(kafkaMessage, m.producer.Events())
if err != nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ def poll_new_logs(self) -> List[PredictionLog]:
return [
parse_message_to_prediction_log(msg.value())
for msg in messages
if (msg is not None and msg.error() is None)
if (msg is not None and msg.error() is None and msg.value() is not None)
]

def commit(self):
Expand Down

0 comments on commit ceb306f

Please sign in to comment.