Skip to content

Commit

Permalink
Update Pop Receipt (#1066)
Browse files Browse the repository at this point in the history
* WIP

* Refactor

* nit
  • Loading branch information
wsugarman authored May 29, 2024
1 parent 5ed67a5 commit 584babe
Show file tree
Hide file tree
Showing 4 changed files with 28 additions and 9 deletions.
5 changes: 5 additions & 0 deletions src/DurableTask.AzureStorage/MessageData.cs
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,11 @@ public MessageData()
internal long TotalMessageSizeBytes { get; set; }

internal MessageFormatFlags MessageFormat { get; set; }

internal void Update(UpdateReceipt receipt)
{
this.OriginalQueueMessage = this.OriginalQueueMessage.Update(receipt);
}
}

/// <summary>
Expand Down
5 changes: 3 additions & 2 deletions src/DurableTask.AzureStorage/Messaging/ControlQueue.cs
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,8 @@ await batch.ParallelForEachAsync(async delegate (QueueMessage queueMessage)
e.ToString());

// Abandon the message so we can try it again later.
await this.AbandonMessageAsync(queueMessage);
// Note: We will fetch the message again from the queue before retrying, so no need to read the receipt
_ = await this.AbandonMessageAsync(queueMessage);
return;
}

Expand Down Expand Up @@ -191,7 +192,7 @@ await batch.ParallelForEachAsync(async delegate (QueueMessage queueMessage)
}

// This overload is intended for cases where we aren't able to deserialize an instance of MessageData.
public Task AbandonMessageAsync(QueueMessage queueMessage)
public Task<UpdateReceipt?> AbandonMessageAsync(QueueMessage queueMessage)
{
this.stats.PendingOrchestratorMessages.TryRemove(queueMessage.MessageId, out _);
return base.AbandonMessageAsync(
Expand Down
22 changes: 17 additions & 5 deletions src/DurableTask.AzureStorage/Messaging/TaskHubQueue.cs
Original file line number Diff line number Diff line change
Expand Up @@ -212,22 +212,29 @@ await this.storageQueue.AddMessageAsync(
return initialVisibilityDelay;
}

public virtual Task AbandonMessageAsync(MessageData message, SessionBase? session = null)
public virtual async Task AbandonMessageAsync(MessageData message, SessionBase? session = null)
{
QueueMessage queueMessage = message.OriginalQueueMessage;
TaskMessage taskMessage = message.TaskMessage;
OrchestrationInstance instance = taskMessage.OrchestrationInstance;
long sequenceNumber = message.SequenceNumber;

return this.AbandonMessageAsync(
UpdateReceipt? receipt = await this.AbandonMessageAsync(
queueMessage,
taskMessage,
instance,
session?.TraceActivityId,
sequenceNumber);

// If we've successfully abandoned the message, update the pop receipt
// (even though we'll likely no longer interact with this message)
if (receipt is not null)
{
message.Update(receipt);
}
}

protected async Task AbandonMessageAsync(
protected async Task<UpdateReceipt?> AbandonMessageAsync(
QueueMessage queueMessage,
TaskMessage? taskMessage,
OrchestrationInstance? instance,
Expand Down Expand Up @@ -276,7 +283,7 @@ protected async Task AbandonMessageAsync(
{
// We "abandon" the message by settings its visibility timeout using an exponential backoff algorithm.
// This allows it to be reprocessed on this node or another node at a later time, hopefully successfully.
await this.storageQueue.UpdateMessageAsync(
return await this.storageQueue.UpdateMessageAsync(
queueMessage,
TimeSpan.FromSeconds(numSecondsToWait),
traceActivityId);
Expand All @@ -293,6 +300,8 @@ await this.storageQueue.UpdateMessageAsync(
taskEventId,
details: $"Caller: {nameof(AbandonMessageAsync)}",
queueMessage.PopReceipt);

return null;
}
}

Expand All @@ -316,10 +325,13 @@ public async Task RenewMessageAsync(MessageData message, SessionBase session)

try
{
await this.storageQueue.UpdateMessageAsync(
UpdateReceipt receipt = await this.storageQueue.UpdateMessageAsync(
queueMessage,
this.MessageVisibilityTimeout,
session?.TraceActivityId);

// Update the pop receipt
message.Update(receipt);
}
catch (Exception e)
{
Expand Down
5 changes: 3 additions & 2 deletions src/DurableTask.AzureStorage/Storage/Queue.cs
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,10 @@ await this.queueClient
this.stats.MessagesSent.Increment();
}

public async Task UpdateMessageAsync(QueueMessage queueMessage, TimeSpan visibilityTimeout, Guid? clientRequestId = null, CancellationToken cancellationToken = default)
public async Task<UpdateReceipt> UpdateMessageAsync(QueueMessage queueMessage, TimeSpan visibilityTimeout, Guid? clientRequestId = null, CancellationToken cancellationToken = default)
{
using IDisposable scope = OperationContext.CreateClientRequestScope(clientRequestId);
await this.queueClient
UpdateReceipt receipt = await this.queueClient
.UpdateMessageAsync(
queueMessage.MessageId,
queueMessage.PopReceipt,
Expand All @@ -71,6 +71,7 @@ await this.queueClient
.DecorateFailure();

this.stats.MessagesUpdated.Increment();
return receipt;
}

public async Task DeleteMessageAsync(QueueMessage queueMessage, Guid? clientRequestId = null, CancellationToken cancellationToken = default)
Expand Down

0 comments on commit 584babe

Please sign in to comment.