From 0f9c99d6cd6e1d9212c11aa861586945af2f19df Mon Sep 17 00:00:00 2001 From: xdlbdy Date: Thu, 6 Apr 2023 10:21:45 +0800 Subject: [PATCH] fix: close subscription resource release Signed-off-by: xdlbdy --- internal/trigger/trigger/trigger.go | 20 +++++++++++++++++--- 1 file changed, 17 insertions(+), 3 deletions(-) diff --git a/internal/trigger/trigger/trigger.go b/internal/trigger/trigger/trigger.go index 6fb00026d..097d74141 100644 --- a/internal/trigger/trigger/trigger.go +++ b/internal/trigger/trigger/trigger.go @@ -456,6 +456,11 @@ func (t *trigger) writeEventToRetry(ctx context.Context, e *ce.Event, attempts i ec.Extensions[primitive.XVanusEventbus] = t.subscription.RetryEventbusID.Key() var writeAttempt int for { + select { + case <-ctx.Done(): + return + default: + } writeAttempt++ startTime := time.Now() _, err := api.AppendOne(ctx, t.timerEventWriter, e) @@ -471,7 +476,9 @@ func (t *trigger) writeEventToRetry(ctx context.Context, e *ce.Event, attempts i if writeAttempt >= t.config.MaxWriteAttempt { return } - time.Sleep(time.Second) + if !util.SleepWithContext(ctx, time.Second) { + return + } } else { break } @@ -491,6 +498,11 @@ func (t *trigger) writeEventToDeadLetter(ctx context.Context, e *ce.Event, reaso ec.Extensions[primitive.DeadLetterReason] = reason var writeAttempt int for { + select { + case <-ctx.Done(): + return + default: + } writeAttempt++ startTime := time.Now() _, err := api.AppendOne(ctx, t.dlEventWriter, e) @@ -505,7 +517,9 @@ func (t *trigger) writeEventToDeadLetter(ctx context.Context, e *ce.Event, reaso if writeAttempt >= t.config.MaxWriteAttempt { return } - time.Sleep(time.Second) + if !util.SleepWithContext(ctx, time.Second) { + return + } } else { break } @@ -602,12 +616,12 @@ func (t *trigger) Stop(ctx context.Context) error { t.reader.Close() t.retryEventReader.Close() t.stop() + t.pool.Release() close(t.eventCh) close(t.retryEventCh) close(t.sendCh) close(t.batchSendCh) t.wg.Wait() - t.pool.Release() t.offsetManager.Close() t.state = TriggerStopped log.Info(ctx).