Skip to content

Commit

Permalink
Merge pull request #305 from Particular/backport-delete-race
Browse files Browse the repository at this point in the history
Incorrect handling of saga completion with optimistic concurrency
  • Loading branch information
SeanFeldman authored Oct 15, 2020
2 parents 804f46a + 5949c7a commit 2814263
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -77,19 +77,31 @@ public async Task Complete(IContainSagaData sagaData, SynchronizedStorageSession
{
var table = await GetTable(sagaData.GetType()).ConfigureAwait(false);

var sagaId = sagaData.Id;
var query = GenerateSagaTableQuery<DictionaryTableEntity>(sagaId);
var sagaId = sagaData.Id.ToString();
var meta = context.GetOrCreate<SagaInstanceMetadata>();
if (!meta.TryGetEtag(sagaData, out var etag))
{
etag = "*";
}

var entity = (await table.ExecuteQueryAsync(query).ConfigureAwait(false)).SafeFirstOrDefault();
if (entity == null)
var entity = new DictionaryTableEntity
{
ETag = etag,
PartitionKey = sagaId,
RowKey = sagaId
};
try
{
await table.ExecuteAsync(TableOperation.Delete(entity)).ConfigureAwait(false);
}
catch (StorageException e) when (e.RequestInformation.HttpStatusCode == (int) HttpStatusCode.NotFound)
{
return; // should not try to delete saga data that does not exist, this situation can occur on retry or parallel execution
// should not try to delete saga data that does not exist, this situation can occur on retry or parallel execution
}

await table.DeleteIgnoringNotFound(entity).ConfigureAwait(false);
try
{
await RemoveSecondaryIndex(sagaData, context).ConfigureAwait(false);
await RemoveSecondaryIndex(sagaData, meta).ConfigureAwait(false);
}
catch
{
Expand Down Expand Up @@ -126,10 +138,8 @@ async Task<TSagaData> GetByCorrelationProperty<TSagaData>(string propertyName, o
return query.Where(TableQuery.GenerateFilterCondition("PartitionKey", QueryComparisons.Equal, sagaId.ToString()));
}

Task RemoveSecondaryIndex(IContainSagaData sagaData, ContextBag context)
Task RemoveSecondaryIndex(IContainSagaData sagaData, SagaInstanceMetadata meta)
{
var meta = context.GetOrCreate<SagaInstanceMetadata>();

if (meta.TryGetSecondaryIndexKey(sagaData, out var secondaryIndexKey))
{
return secondaryIndices.RemoveSecondary(sagaData.GetType(), secondaryIndexKey.Value);
Expand All @@ -149,14 +159,9 @@ async Task<DictionaryTableEntity> GetDictionaryTableEntity(string sagaId, Type e
var tableEntity = (await table.ExecuteQueryAsync(query).ConfigureAwait(false)).SafeFirstOrDefault();
return tableEntity;
}
catch (StorageException e)
catch (StorageException e) when(e.RequestInformation.HttpStatusCode == (int) HttpStatusCode.NotFound)
{
if (e.RequestInformation.HttpStatusCode == (int) HttpStatusCode.NotFound)
{
return null;
}

throw;
return null;
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/Tests/Sagas/When_completing_saga.cs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ public async Task Should_succeed_if_saga_doesnt_exist()
OriginalMessageId = "MooId"
};

await persister.Complete(saga, null, null);
await persister.Complete(saga, null, new ContextBag());

var sagaData = await persister.Get<CompleteSagaData>(saga.Id, null, null);
Assert.IsNull(sagaData);
Expand Down
48 changes: 25 additions & 23 deletions src/Tests/Sagas/When_executing_concurrently.cs
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,12 @@ public async Task Should_not_find_saga_when_primary_is_removed_but_secondary_exi
await Save(persister1, v, Id1).ConfigureAwait(false);

// get by property just to load to cache
await GetByCorrelationProperty(persister2).ConfigureAwait(false);
await GetByCorrelationProperty(persister2, new ContextBag()).ConfigureAwait(false);

await DeletePrimary(Id1).ConfigureAwait(false);

// only secondary exists now, ensure it's null
var byProperty = await GetByCorrelationProperty(persister2).ConfigureAwait(false);
var byProperty = await GetByCorrelationProperty(persister2, new ContextBag()).ConfigureAwait(false);
Assert.IsNull(byProperty);
}

Expand All @@ -60,7 +60,7 @@ public async Task Should_enable_saving_another_saga_with_same_correlation_id_as_
await Save(persister1, v, Id1).ConfigureAwait(false);

// get by property just to load to cache
await GetByCorrelationProperty(persister2).ConfigureAwait(false);
await GetByCorrelationProperty(persister2, new ContextBag()).ConfigureAwait(false);

await DeletePrimary(Id1).ConfigureAwait(false);

Expand All @@ -69,7 +69,7 @@ public async Task Should_enable_saving_another_saga_with_same_correlation_id_as_
// save a new saga with the same correlation id
await Save(persister1, v2, Id2).ConfigureAwait(false);

var saga = await GetByCorrelationProperty(persister2).ConfigureAwait(false);
var saga = await GetByCorrelationProperty(persister2, new ContextBag()).ConfigureAwait(false);
AssertSaga(saga, v2, Id2);
}

Expand Down Expand Up @@ -98,22 +98,24 @@ async Task Should_enable_insert_saga_again(ISagaPersister p)

await Save(persister1, v, Id1).ConfigureAwait(false);

var saga1 = await Get(persister1, Id1).ConfigureAwait(false);
var saga2 = await Get(persister2, Id1).ConfigureAwait(false);
var saga1ByProperty = await GetByCorrelationProperty(persister1).ConfigureAwait(false);
var saga2ByProperty = await GetByCorrelationProperty(persister2).ConfigureAwait(false);
var persister1ContextBag = new ContextBag();
var saga1 = await Get(persister1, Id1, persister1ContextBag).ConfigureAwait(false);
var persister2ContextBag = new ContextBag();
var saga2 = await Get(persister2, Id1, persister2ContextBag).ConfigureAwait(false);
var saga1ByProperty = await GetByCorrelationProperty(persister1, new ContextBag()).ConfigureAwait(false);
var saga2ByProperty = await GetByCorrelationProperty(persister2, new ContextBag()).ConfigureAwait(false);

AssertSaga(saga1, v, Id1);
AssertSaga(saga2, v, Id1);
AssertSaga(saga1ByProperty, v, Id1);
AssertSaga(saga2ByProperty, v, Id1);

await Complete(saga1, persister1).ConfigureAwait(false);
await Complete(saga1, persister1, persister1ContextBag).ConfigureAwait(false);

saga1 = await Get(persister1, Id1).ConfigureAwait(false);
saga2 = await Get(persister2, Id1).ConfigureAwait(false);
saga1ByProperty = await GetByCorrelationProperty(persister1).ConfigureAwait(false);
saga2ByProperty = await GetByCorrelationProperty(persister2).ConfigureAwait(false);
saga1 = await Get(persister1, Id1, new ContextBag()).ConfigureAwait(false);
saga2 = await Get(persister2, Id1, new ContextBag()).ConfigureAwait(false);
saga1ByProperty = await GetByCorrelationProperty(persister1, new ContextBag()).ConfigureAwait(false);
saga2ByProperty = await GetByCorrelationProperty(persister2, new ContextBag()).ConfigureAwait(false);

Assert.IsNull(saga1);
Assert.IsNull(saga2);
Expand All @@ -123,20 +125,20 @@ async Task Should_enable_insert_saga_again(ISagaPersister p)
const string v2 = "2";
await Save(p, v2, Id2).ConfigureAwait(false);

saga1 = await Get(persister1, Id2).ConfigureAwait(false);
saga2 = await Get(persister2, Id2).ConfigureAwait(false);
saga1ByProperty = await GetByCorrelationProperty(persister1).ConfigureAwait(false);
saga2ByProperty = await GetByCorrelationProperty(persister2).ConfigureAwait(false);
saga1 = await Get(persister1, Id2, new ContextBag()).ConfigureAwait(false);
saga2 = await Get(persister2, Id2, new ContextBag()).ConfigureAwait(false);
saga1ByProperty = await GetByCorrelationProperty(persister1, new ContextBag()).ConfigureAwait(false);
saga2ByProperty = await GetByCorrelationProperty(persister2, new ContextBag()).ConfigureAwait(false);

AssertSaga(saga1, v2, Id2);
AssertSaga(saga2, v2, Id2);
AssertSaga(saga1ByProperty, v2, Id2);
AssertSaga(saga2ByProperty, v2, Id2);
}

static Task Complete(IContainSagaData saga, ISagaPersister persister)
static Task Complete(IContainSagaData saga, ISagaPersister persister, ContextBag contextBag)
{
return persister.Complete(saga, null, null);
return persister.Complete(saga, null, contextBag);
}

static void AssertSaga(ConcurrentSagaData saga, string value, Guid id)
Expand All @@ -147,14 +149,14 @@ static void AssertSaga(ConcurrentSagaData saga, string value, Guid id)
Assert.AreEqual(value, saga.Value);
}

static Task<ConcurrentSagaData> Get(ISagaPersister persister, Guid id)
static Task<ConcurrentSagaData> Get(ISagaPersister persister, Guid id, ContextBag contextBag)
{
return persister.Get<ConcurrentSagaData>(id, null, new ContextBag());
return persister.Get<ConcurrentSagaData>(id, null, contextBag);
}

static Task<ConcurrentSagaData> GetByCorrelationProperty(ISagaPersister persister)
static Task<ConcurrentSagaData> GetByCorrelationProperty(ISagaPersister persister, ContextBag contextBag)
{
return persister.Get<ConcurrentSagaData>(SagaCorrelationPropertyValue.Name, SagaCorrelationPropertyValue.Value, null, new ContextBag());
return persister.Get<ConcurrentSagaData>(SagaCorrelationPropertyValue.Name, SagaCorrelationPropertyValue.Value, null, contextBag);
}

static Task Save(ISagaPersister persister, string value, Guid id)
Expand Down

0 comments on commit 2814263

Please sign in to comment.