Skip to content

Commit

Permalink
Recover interrupted eventhubs subscriptions (#3344)
Browse files Browse the repository at this point in the history
Signed-off-by: Bernd Verst <[email protected]>
  • Loading branch information
berndverst committed Feb 8, 2024
1 parent ba865c5 commit adf5cdb
Showing 1 changed file with 32 additions and 7 deletions.
39 changes: 32 additions & 7 deletions internal/component/azure/eventhubs/eventhubs.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,13 +309,16 @@ func (aeh *AzureEventHubs) Subscribe(subscribeCtx context.Context, config Subscr
Handler: retryHandler,
}

subscriptionLoopFinished := make(chan bool, 1)

// Process all partition clients as they come in
go func() {
subscriberLoop := func() {
for {
// This will block until a new partition client is available
// It returns nil if processor.Run terminates or if the context is canceled
partitionClient := processor.NextPartitionClient(subscribeCtx)
if partitionClient == nil {
subscriptionLoopFinished <- true
return
}
aeh.logger.Debugf("Received client for partition %s", partitionClient.PartitionID())
Expand All @@ -329,15 +332,37 @@ func (aeh *AzureEventHubs) Subscribe(subscribeCtx context.Context, config Subscr
}
}()
}
}()
}

// Start the processor
go func() {
// This is a blocking call that runs until the context is canceled
err = processor.Run(subscribeCtx)
// Do not log context.Canceled which happens at shutdown
if err != nil && !errors.Is(err, context.Canceled) {
aeh.logger.Errorf("Error from event processor: %v", err)
for {
go subscriberLoop()
// This is a blocking call that runs until the context is canceled
err = processor.Run(subscribeCtx)
// Exit if the context is canceled
if err != nil && errors.Is(err, context.Canceled) {
return
}
if err != nil {
aeh.logger.Errorf("Error from event processor: %v", err)
} else {
aeh.logger.Debugf("Event processor terminated without error")
}
// wait for subscription loop finished signal
select {
case <-subscribeCtx.Done():
return
case <-subscriptionLoopFinished:
// noop
}
// Waiting here is not strictly necessary, however, we will wait for a short time to increase the likelihood of transient errors having disappeared
select {
case <-subscribeCtx.Done():
return
case <-time.After(5 * time.Second):
// noop - continue the for loop
}
}
}()

Expand Down

0 comments on commit adf5cdb

Please sign in to comment.