diff --git a/cmd/dequeuer/main.go b/cmd/dequeuer/main.go index c5402f0283..188202f2f7 100644 --- a/cmd/dequeuer/main.go +++ b/cmd/dequeuer/main.go @@ -57,6 +57,7 @@ func main() { flag.StringVar(&clusterUID, "cluster-uid", "", "cluster unique identifier") flag.StringVar(&probesPath, "probes-path", "", "path to the probes spec") flag.StringVar(&queueURL, "queue", "", "target queue URL from which the api messages will be dequeued") + // TODO add queue_timeout_seconds flag.StringVar(&apiKind, "api-kind", "", fmt.Sprintf("api kind (%s|%s)", userconfig.BatchAPIKind.String(), userconfig.AsyncAPIKind.String())) flag.StringVar(&apiName, "api-name", "", "api name") flag.StringVar(&jobID, "job-id", "", "job ID") diff --git a/docs/workloads/async/configuration.md b/docs/workloads/async/configuration.md index 798a697871..0c468e2c78 100644 --- a/docs/workloads/async/configuration.md +++ b/docs/workloads/async/configuration.md @@ -6,6 +6,7 @@ pod: # pod configuration (required) port: # port to which requests will be sent (default: 8080; exported as $CORTEX_PORT) max_concurrency: # maximum number of requests that will be concurrently sent into the container (default: 1, max allowed: 100) + queue_timeout_seconds: # maximum amount of time a request can be queued before beginning to be processed containers: # configurations for the containers to run (at least one constainer must be provided) - name: # name of the container (required) image: # docker image to use for the container (required) diff --git a/docs/workloads/realtime/configuration.md b/docs/workloads/realtime/configuration.md index b918ceb130..95f08a31cb 100644 --- a/docs/workloads/realtime/configuration.md +++ b/docs/workloads/realtime/configuration.md @@ -7,6 +7,7 @@ port: # port to which requests will be sent (default: 8080; exported as $CORTEX_PORT) max_concurrency: # maximum number of requests that will be concurrently sent into the container (default: 1) max_queue_length: # maximum number of requests per replica which will be queued (beyond max_concurrency) before requests are rejected with error code 503 (default: 100) + queue_timeout_seconds: # maximum amount of time a request can be queued before beginning to be processed containers: # configurations for the containers to run (at least one constainer must be provided) - name: # name of the container (required) image: # docker image to use for the container (required) diff --git a/pkg/dequeuer/async_handler.go b/pkg/dequeuer/async_handler.go index c2065cdb9b..17a943ba88 100644 --- a/pkg/dequeuer/async_handler.go +++ b/pkg/dequeuer/async_handler.go @@ -28,7 +28,10 @@ import ( "github.com/aws/aws-sdk-go/service/s3" "github.com/aws/aws-sdk-go/service/sqs" awslib "github.com/cortexlabs/cortex/pkg/lib/aws" + "github.com/cortexlabs/cortex/pkg/lib/debug" "github.com/cortexlabs/cortex/pkg/lib/errors" + "github.com/cortexlabs/cortex/pkg/lib/pointer" + s "github.com/cortexlabs/cortex/pkg/lib/strings" "github.com/cortexlabs/cortex/pkg/lib/telemetry" "github.com/cortexlabs/cortex/pkg/types/async" "go.uber.org/zap" @@ -75,15 +78,26 @@ func (h *AsyncMessageHandler) Handle(message *sqs.Message) error { return errors.ErrorUnexpected("got unexpected sqs message with empty or nil body") } + var msgSentTime *time.Time + if msgSentTimestamp, ok := message.Attributes[sqs.MessageSystemAttributeNameSentTimestamp]; ok { + if msgSentTimestamp != nil { + if parsed, ok := s.ParseInt64(*msgSentTimestamp); ok { + msgSentTime = pointer.Time(time.UnixMilli(parsed)) + } + } + } + requestID := *message.Body - err := h.handleMessage(requestID) + err := h.handleMessage(requestID, msgSentTime) if err != nil { return err } return nil } -func (h *AsyncMessageHandler) handleMessage(requestID string) error { +func (h *AsyncMessageHandler) handleMessage(requestID string, msgSentTime *time.Time) error { + debug.Ppg(msgSentTime) + h.log.Infow("processing workload", "id", requestID) err := h.updateStatus(requestID, async.StatusInProgress) diff --git a/pkg/dequeuer/dequeuer.go b/pkg/dequeuer/dequeuer.go index beb42d0f1e..7f375a14f9 100644 --- a/pkg/dequeuer/dequeuer.go +++ b/pkg/dequeuer/dequeuer.go @@ -84,6 +84,7 @@ func (d *SQSDequeuer) ReceiveMessage() (*sqs.Message, error) { MessageAttributeNames: aws.StringSlice(_messageAttributes), VisibilityTimeout: d.visibilityTimeout, WaitTimeSeconds: d.waitTimeSeconds, + AttributeNames: aws.StringSlice([]string{sqs.MessageSystemAttributeNameSentTimestamp}), }) if err != nil { diff --git a/pkg/lib/k8s/virtual_service.go b/pkg/lib/k8s/virtual_service.go index 98ce122c47..98f39e053b 100644 --- a/pkg/lib/k8s/virtual_service.go +++ b/pkg/lib/k8s/virtual_service.go @@ -45,6 +45,7 @@ type VirtualServiceSpec struct { Labels map[string]string Annotations map[string]string Headers *istionetworking.Headers + Retries *int32 } type Destination struct { @@ -153,6 +154,14 @@ func VirtualService(spec *VirtualServiceSpec) *istioclientnetworking.VirtualServ httpRoutes = append(httpRoutes, exactMatch, prefixMatch) } + if spec.Retries != nil { + for i := range httpRoutes { + httpRoutes[i].Retries = &istionetworking.HTTPRetry{ + Attempts: *spec.Retries, + } + } + } + virtualService := &istioclientnetworking.VirtualService{ TypeMeta: _virtualServiceTypeMeta, ObjectMeta: kmeta.ObjectMeta{ diff --git a/pkg/operator/resources/realtimeapi/k8s_specs.go b/pkg/operator/resources/realtimeapi/k8s_specs.go index 7a6824d14d..339fa07b54 100644 --- a/pkg/operator/resources/realtimeapi/k8s_specs.go +++ b/pkg/operator/resources/realtimeapi/k8s_specs.go @@ -149,6 +149,7 @@ func virtualServiceSpec(api *spec.API) *istioclientnetworking.VirtualService { }, PrefixPath: api.Networking.Endpoint, Rewrite: pointer.String("/"), + Retries: pointer.Int32(0), Annotations: api.ToK8sAnnotations(), Labels: map[string]string{ "apiName": api.Name, diff --git a/pkg/operator/resources/trafficsplitter/k8s_specs.go b/pkg/operator/resources/trafficsplitter/k8s_specs.go index 5c6834885f..2c87cb53a5 100644 --- a/pkg/operator/resources/trafficsplitter/k8s_specs.go +++ b/pkg/operator/resources/trafficsplitter/k8s_specs.go @@ -32,6 +32,7 @@ func virtualServiceSpec(trafficSplitter *spec.API) *istioclientnetworking.Virtua Destinations: getTrafficSplitterDestinations(trafficSplitter), ExactPath: trafficSplitter.Networking.Endpoint, Rewrite: pointer.String("/"), + Retries: pointer.Int32(0), Annotations: trafficSplitter.ToK8sAnnotations(), Labels: map[string]string{ "apiName": trafficSplitter.Name, diff --git a/pkg/proxy/breaker.go b/pkg/proxy/breaker.go index ff00f35c38..59d0ddd445 100644 --- a/pkg/proxy/breaker.go +++ b/pkg/proxy/breaker.go @@ -22,6 +22,7 @@ import ( "context" "errors" "fmt" + "time" "go.uber.org/atomic" ) @@ -29,6 +30,7 @@ import ( var ( // ErrRequestQueueFull indicates the breaker queue depth was exceeded. ErrRequestQueueFull = errors.New("pending request queue full") + ErrQueueTimeout = errors.New("queue timeout") ) // BreakerParams defines the parameters of the breaker. @@ -139,7 +141,8 @@ func (b *Breaker) Maybe(ctx context.Context, thunk func()) error { defer b.releasePending() // Wait for capacity in the active queue. - if err := b.sem.acquire(ctx); err != nil { + // TODO use the actual timeout + if err := b.sem.acquireWithTimeout(ctx, 0); err != nil { return err } // Defer releasing capacity in the active. @@ -239,6 +242,38 @@ func (s *semaphore) acquire(ctx context.Context) error { } } +// acquireWithTimeout acquires capacity from the semaphore, with a timeout. +// if timeout <= 0, no timeout is used +func (s *semaphore) acquireWithTimeout(ctx context.Context, timeout time.Duration) error { + if timeout <= 0 { + return s.acquire(ctx) + } + + timer := time.NewTimer(timeout) + + for { + old := s.state.Load() + capacity, in := unpack(old) + + if in >= capacity { + select { + case <-timer.C: + return ErrQueueTimeout + case <-ctx.Done(): + return ctx.Err() + case <-s.queue: + } + // Force reload state. + continue + } + + in++ + if s.state.CAS(old, pack(capacity, in)) { + return nil + } + } +} + // release releases capacity in the semaphore. // If the semaphore capacity was reduced in between and as a result inFlight is greater // than capacity, we don't wake up goroutines as they'd not get any capacity anyway. diff --git a/pkg/proxy/handler.go b/pkg/proxy/handler.go index 06e349ab1f..0a0420bea1 100644 --- a/pkg/proxy/handler.go +++ b/pkg/proxy/handler.go @@ -35,7 +35,7 @@ func Handler(breaker *Breaker, next http.Handler) http.HandlerFunc { if err := breaker.Maybe(r.Context(), func() { next.ServeHTTP(w, r) }); err != nil { - if errors.Is(err, context.DeadlineExceeded) || errors.Is(err, ErrRequestQueueFull) { + if errors.Is(err, context.DeadlineExceeded) || errors.Is(err, ErrRequestQueueFull) || errors.Is(err, ErrQueueTimeout) { http.Error(w, err.Error(), http.StatusServiceUnavailable) } else { w.WriteHeader(http.StatusInternalServerError)