diff --git a/internal/trigger/trigger/trigger.go b/internal/trigger/trigger/trigger.go index 6fb00026d..097d74141 100644 --- a/internal/trigger/trigger/trigger.go +++ b/internal/trigger/trigger/trigger.go @@ -456,6 +456,11 @@ func (t *trigger) writeEventToRetry(ctx context.Context, e *ce.Event, attempts i ec.Extensions[primitive.XVanusEventbus] = t.subscription.RetryEventbusID.Key() var writeAttempt int for { + select { + case <-ctx.Done(): + return + default: + } writeAttempt++ startTime := time.Now() _, err := api.AppendOne(ctx, t.timerEventWriter, e) @@ -471,7 +476,9 @@ func (t *trigger) writeEventToRetry(ctx context.Context, e *ce.Event, attempts i if writeAttempt >= t.config.MaxWriteAttempt { return } - time.Sleep(time.Second) + if !util.SleepWithContext(ctx, time.Second) { + return + } } else { break } @@ -491,6 +498,11 @@ func (t *trigger) writeEventToDeadLetter(ctx context.Context, e *ce.Event, reaso ec.Extensions[primitive.DeadLetterReason] = reason var writeAttempt int for { + select { + case <-ctx.Done(): + return + default: + } writeAttempt++ startTime := time.Now() _, err := api.AppendOne(ctx, t.dlEventWriter, e) @@ -505,7 +517,9 @@ func (t *trigger) writeEventToDeadLetter(ctx context.Context, e *ce.Event, reaso if writeAttempt >= t.config.MaxWriteAttempt { return } - time.Sleep(time.Second) + if !util.SleepWithContext(ctx, time.Second) { + return + } } else { break } @@ -602,12 +616,12 @@ func (t *trigger) Stop(ctx context.Context) error { t.reader.Close() t.retryEventReader.Close() t.stop() + t.pool.Release() close(t.eventCh) close(t.retryEventCh) close(t.sendCh) close(t.batchSendCh) t.wg.Wait() - t.pool.Release() t.offsetManager.Close() t.state = TriggerStopped log.Info(ctx).