Skip to content

Commit

Permalink
Merge pull request #378 from Particular/changes-for-minor
Browse files Browse the repository at this point in the history
Bring back the necessary changes to 3.1.0
  • Loading branch information
danielmarbach authored Mar 1, 2021
2 parents f85a7b7 + 596d311 commit 6f80606
Show file tree
Hide file tree
Showing 9 changed files with 151 additions and 52 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@ static class WellKnownConfigurationKeys
public const string SagaStorageAssumeSecondaryKeyUsesANonEmptyRowKeySetToThePartitionKey = "AzureSagaStorage.SagaStorageAssumeSecondaryKeyUsesANonEmptyRowKeySetToThePartitionKey";
public const string SagaStorageCompatibilityMode = "AzureSagaStorage.EnableCompatibilityMode";
public const string SagaStorageConventionalTablePrefix = "AzureSagaStorage.ConventionalTablePrefix";
public const string SagaJsonSerializer = "AzureSagaStorage.JsonSerializerSettings";
public const string SagaReaderCreator = "AzureSagaStorage.ReaderCreator";
public const string SagaWriterCreator = "AzureSagaStorage.WriterCreator";

public const string SubscriptionStorageTableName = "AzureSubscriptionStorage.TableName";
public const string SubscriptionStorageCacheFor = "AzureSubscriptionStorage.CacheFor";
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,29 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.IO;
using System.Net;
using System.Threading.Tasks;
using Extensibility;
using Microsoft.Azure.Cosmos.Table;
using Newtonsoft.Json;
using Sagas;

class AzureSagaPersister : ISagaPersister
{
public AzureSagaPersister(IProvideCloudTableClient tableClientProvider, bool disableTableCreation, bool compatibilityMode, SecondaryIndex secondaryIndex, string conventionalTablePrefix)
public AzureSagaPersister(
IProvideCloudTableClient tableClientProvider,
bool disableTableCreation,
bool compatibilityMode,
SecondaryIndex secondaryIndex,
string conventionalTablePrefix,
JsonSerializer jsonSerializer,
Func<TextReader, JsonReader> readerCreator,
Func<TextWriter, JsonWriter> writerCreator)
{
this.writerCreator = writerCreator;
this.readerCreator = readerCreator;
this.jsonSerializer = jsonSerializer;
this.conventionalTablePrefix = conventionalTablePrefix;
this.compatibilityMode = compatibilityMode;
this.disableTableCreation = disableTableCreation;
Expand All @@ -31,15 +44,15 @@ public async Task Save(IContainSagaData sagaData, SagaCorrelationProperty correl
{
PartitionKey = partitionKey.PartitionKey,
RowKey = sagaData.Id.ToString(),
});
}, jsonSerializer, writerCreator);

var table = await GetTableAndCreateIfNotExists(storageSession, sagaDataType)
.ConfigureAwait(false);

sagaDataEntityToSave.Table = table;

var meta = context.GetOrCreate<SagaInstanceMetadata>();
meta.Entities[sagaData] = sagaDataEntityToSave;
meta.Entities[sagaData.Id] = sagaDataEntityToSave;

storageSession.Add(new SagaSave(partitionKey, sagaDataEntityToSave));
}
Expand All @@ -50,9 +63,9 @@ public Task Update(IContainSagaData sagaData, SynchronizedStorageSession session
var partitionKey = GetPartitionKey(context, sagaData.Id);

var meta = context.GetOrCreate<SagaInstanceMetadata>();
var sagaDataEntityToUpdate = meta.Entities[sagaData];
var sagaDataEntityToUpdate = meta.Entities[sagaData.Id];

var sagaAsDictionaryTableEntity = DictionaryTableEntityExtensions.ToDictionaryTableEntity(sagaData, sagaDataEntityToUpdate);
var sagaAsDictionaryTableEntity = DictionaryTableEntityExtensions.ToDictionaryTableEntity(sagaData, sagaDataEntityToUpdate, jsonSerializer, writerCreator);

storageSession.Add(new SagaUpdate(partitionKey, sagaAsDictionaryTableEntity));

Expand Down Expand Up @@ -84,9 +97,9 @@ public async Task<TSagaData> Get<TSagaData>(Guid sagaId, SynchronizedStorageSess

readSagaDataEntity.Table = tableToReadFrom;

var sagaData = DictionaryTableEntityExtensions.ToSagaData<TSagaData>(readSagaDataEntity);
var sagaData = DictionaryTableEntityExtensions.ToSagaData<TSagaData>(readSagaDataEntity, jsonSerializer, readerCreator);
var meta = context.GetOrCreate<SagaInstanceMetadata>();
meta.Entities[sagaData] = readSagaDataEntity;
meta.Entities[sagaData.Id] = readSagaDataEntity;
return sagaData;
}

Expand Down Expand Up @@ -168,7 +181,7 @@ public Task Complete(IContainSagaData sagaData, SynchronizedStorageSession sessi
{
var storageSession = (StorageSession)session;
var meta = context.GetOrCreate<SagaInstanceMetadata>();
var sagaDataEntityToDelete = meta.Entities[sagaData];
var sagaDataEntityToDelete = meta.Entities[sagaData.Id];
var partitionKey = GetPartitionKey(context, sagaData.Id);

storageSession.Add(new SagaDelete(partitionKey, sagaDataEntityToDelete));
Expand Down Expand Up @@ -199,20 +212,23 @@ static TableEntityPartitionKey GetPartitionKey(ContextBag context, Guid sagaData
return partitionKey;
}

bool disableTableCreation;
CloudTableClient client;
SecondaryIndex secondaryIndex;
readonly bool disableTableCreation;
readonly CloudTableClient client;
readonly SecondaryIndex secondaryIndex;
const string SecondaryIndexIndicatorProperty = "NServiceBus_2ndIndexKey";
static ConcurrentDictionary<string, bool> tableCreated = new ConcurrentDictionary<string, bool>();
static readonly ConcurrentDictionary<string, bool> tableCreated = new ConcurrentDictionary<string, bool>();
private readonly bool compatibilityMode;
private readonly string conventionalTablePrefix;
readonly JsonSerializer jsonSerializer;
readonly Func<TextReader, JsonReader> readerCreator;
readonly Func<TextWriter, JsonWriter> writerCreator;

/// <summary>
/// Holds saga instance related metadata in a scope of a <see cref="ContextBag" />.
/// </summary>
class SagaInstanceMetadata
{
public Dictionary<object, DictionaryTableEntity> Entities { get; } = new Dictionary<object, DictionaryTableEntity>();
public Dictionary<Guid, DictionaryTableEntity> Entities { get; } = new Dictionary<Guid, DictionaryTableEntity>();
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
namespace NServiceBus
{
using System;
using System.IO;
using Configuration.AdvancedExtensibility;
using Microsoft.Azure.Cosmos.Table;
using Newtonsoft.Json;
using Persistence.AzureTable;

/// <summary>
Expand Down Expand Up @@ -46,6 +49,50 @@ public static partial class ConfigureAzureSagaStorage
return config;
}

/// <summary>
/// Overrides the default settings used by the saga property serializer used for complex property types serialization that is not supported by default with tables.
/// </summary>
public static PersistenceExtensions<AzureTablePersistence, StorageType.Sagas> JsonSettings(this PersistenceExtensions<AzureTablePersistence, StorageType.Sagas> config, JsonSerializerSettings jsonSerializerSettings)
{
Guard.AgainstNull(nameof(config), config);
Guard.AgainstNull(nameof(jsonSerializerSettings), jsonSerializerSettings);

var settings = config.GetSettings();
settings.Set(WellKnownConfigurationKeys.SagaJsonSerializer, Newtonsoft.Json.JsonSerializer.Create(jsonSerializerSettings));

return config;
}

/// <summary>
/// Overrides the reader creator to customize data deserialization.
/// </summary>
public static PersistenceExtensions<AzureTablePersistence, StorageType.Sagas> ReaderCreator(this PersistenceExtensions<AzureTablePersistence, StorageType.Sagas> config, Func<TextReader, JsonReader> readerCreator)
{
Guard.AgainstNull(nameof(config), config);
Guard.AgainstNull(nameof(readerCreator), readerCreator);

var settings = config.GetSettings();

settings.Set(WellKnownConfigurationKeys.SagaReaderCreator, readerCreator);

return config;
}

/// <summary>
/// Overrides the writer creator to customize data serialization.
/// </summary>
public static PersistenceExtensions<AzureTablePersistence, StorageType.Sagas> WriterCreator(this PersistenceExtensions<AzureTablePersistence, StorageType.Sagas> config, Func<StringWriter, JsonWriter> writerCreator)
{
Guard.AgainstNull(nameof(config), config);
Guard.AgainstNull(nameof(writerCreator), writerCreator);

var settings = config.GetSettings();

settings.Set(WellKnownConfigurationKeys.SagaWriterCreator, writerCreator);

return config;
}

/// <summary>
/// Configures the backward compatibility specific settings.
/// </summary>
Expand All @@ -56,4 +103,4 @@ public static CompatibilitySettings Compatibility(this PersistenceExtensions<Azu
return new CompatibilitySettings(config.GetSettings());
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,14 @@ namespace NServiceBus.Persistence.AzureTable
using System.Collections.Concurrent;
using System.Linq.Expressions;
using Newtonsoft.Json;
using Newtonsoft.Json.Serialization;

static class DictionaryTableEntityExtensions
{
public static TEntity ToSagaData<TEntity>(DictionaryTableEntity entity)
where TEntity : IContainSagaData
{
return (TEntity)ToSagaData(typeof(TEntity), entity);
}
public static TEntity ToSagaData<TEntity>(DictionaryTableEntity entity, JsonSerializer jsonSerializer, Func<TextReader, JsonReader> readerCreator)
where TEntity : IContainSagaData =>
(TEntity)ToSagaData(typeof(TEntity), entity, jsonSerializer, readerCreator);

private static object ToSagaData(Type sagaDataType, DictionaryTableEntity entity)
static object ToSagaData(Type sagaDataType, DictionaryTableEntity entity, JsonSerializer jsonSerializer, Func<TextReader, JsonReader> readerCreator)
{
var toCreate = Activator.CreateInstance(sagaDataType);
foreach (var accessor in GetPropertyAccessors(sagaDataType))
Expand Down Expand Up @@ -51,9 +48,10 @@ private static object ToSagaData(Type sagaDataType, DictionaryTableEntity entity
// possibly serialized JSON.NET value
try
{
using (var stringReader = new StringReader(value.StringValue))
using (var reader = new StringReader(value.StringValue))
using (var jsonReader = readerCreator(reader))
{
var deserialized = jsonSerializer.Deserialize(stringReader, type);
var deserialized = jsonSerializer.Deserialize(jsonReader, type);
accessor.Setter(toCreate, deserialized);
}
}
Expand All @@ -71,7 +69,7 @@ private static object ToSagaData(Type sagaDataType, DictionaryTableEntity entity
return toCreate;
}

public static DictionaryTableEntity ToDictionaryTableEntity(object sagaData, DictionaryTableEntity toPersist)
public static DictionaryTableEntity ToDictionaryTableEntity(object sagaData, DictionaryTableEntity toPersist, JsonSerializer jsonSerializer, Func<TextWriter, JsonWriter> writerCreator)
{
foreach (var accessor in GetPropertyAccessors(sagaData.GetType()))
{
Expand Down Expand Up @@ -118,17 +116,18 @@ public static DictionaryTableEntity ToDictionaryTableEntity(object sagaData, Dic
}
else
{
using (var sw = new StringWriter())
using (var stringWriter = new StringWriter())
using (var writer = writerCreator(stringWriter))
{
try
{
jsonSerializerWithNonAbstractDefaultContractResolver.Serialize(sw, value, type);
jsonSerializer.Serialize(writer, value, type);
}
catch (Exception)
{
throw new NotSupportedException($"The property type '{type.Name}' is not supported in Azure Table Storage and it cannot be serialized with JSON.NET.");
}
toPersist[name] = new EntityProperty(sw.ToString());
toPersist[name] = new EntityProperty(stringWriter.ToString());
}
}
}
Expand Down Expand Up @@ -254,12 +253,6 @@ public static TableQuery<DictionaryTableEntity> BuildWherePropertyQuery<TSagaDat

static readonly ConcurrentDictionary<Type, IReadOnlyCollection<PropertyAccessor>> propertyAccessorCache = new ConcurrentDictionary<Type, IReadOnlyCollection<PropertyAccessor>>();

static readonly JsonSerializer jsonSerializer = JsonSerializer.Create();
static readonly JsonSerializer jsonSerializerWithNonAbstractDefaultContractResolver = new JsonSerializer
{
ContractResolver = new NonAbstractDefaultContractResolver(),
};

private static readonly DateTime StorageTableMinDateTime = new DateTime(1601, 1, 1);

sealed class PropertyAccessor
Expand Down Expand Up @@ -307,17 +300,5 @@ private static Action<object, object> GenerateSetter(PropertyInfo propertyInfo)
public string Name { get; }
public Type PropertyType { get; }
}

class NonAbstractDefaultContractResolver : DefaultContractResolver
{
protected override JsonObjectContract CreateObjectContract(Type objectType)
{
if (objectType.IsAbstract || objectType.IsInterface)
{
throw new ArgumentException("Cannot serialize an abstract class/interface", nameof(objectType));
}
return base.CreateObjectContract(objectType);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,13 @@ public interface IProvidePartitionKeyFromSagaId
{
/// <summary>
/// Sets the partition key based on the
/// - SagaId header if present -or-
/// - The SagaId on the secondary index derived from the correlation property information when the migration mode is enabled -or-
/// - The deterministic SagaId precalculated by using the correlation property information
/// - The saga ID header if present -or-
/// - The saga ID on the secondary index derived from the specified <paramref name="correlationProperty"/> when compatibility mode is enabled -or-
/// - The saga ID calculated using the specified <paramref name="correlationProperty"/>.
/// </summary>
/// <param name="context">The logical message handler context.</param>
/// <param name="correlationProperty">The correlation property information derived from the logical message.</param>
/// <exception cref="Exception">When the saga id could not be determined by the saga id header and <see cref="SagaCorrelationProperty.None"/> is passed.</exception>
/// <exception cref="Exception">When the specified <paramref name="correlationProperty"/> is <see cref="SagaCorrelationProperty.None"/> and the saga ID header is not present.</exception>
Task SetPartitionKey<TSagaData>(IIncomingLogicalMessageContext context,
SagaCorrelationProperty correlationProperty)
where TSagaData : IContainSagaData;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
namespace NServiceBus.Persistence.AzureTable
{
using System;
using System.IO;
using Features;
using Logging;
using Sagas;
using Migration;
using Newtonsoft.Json;

class SagaStorage : Feature
{
Expand All @@ -15,6 +18,9 @@ internal SagaStorage()
s.SetDefault(WellKnownConfigurationKeys.SagaStorageAssumeSecondaryKeyUsesANonEmptyRowKeySetToThePartitionKey, AzureStorageSagaDefaults.AssumeSecondaryKeyUsesANonEmptyRowKeySetToThePartitionKey);
s.SetDefault(WellKnownConfigurationKeys.SagaStorageCompatibilityMode, AzureStorageSagaDefaults.CompatibilityModeEnabled);
s.SetDefault(WellKnownConfigurationKeys.SagaStorageConventionalTablePrefix, AzureStorageSagaDefaults.ConventionalTablePrefix);
s.SetDefault(WellKnownConfigurationKeys.SagaJsonSerializer, JsonSerializer.Create());
s.SetDefault(WellKnownConfigurationKeys.SagaReaderCreator, (Func<TextReader, JsonReader>)(reader => new JsonTextReader(reader)));
s.SetDefault(WellKnownConfigurationKeys.SagaWriterCreator, (Func<TextWriter, JsonWriter>)(writer => new JsonTextWriter(writer)));
s.EnableFeatureByDefault<SynchronizedStorage>();
s.SetDefault<ISagaIdGenerator>(new SagaIdGenerator());
Expand Down Expand Up @@ -49,7 +55,20 @@ protected override void Setup(FeatureConfigurationContext context)
builder.Build<TableHolderResolver>(), secondaryIndices, compatibilityModeEnabled, conventionalTablePrefix), DependencyLifecycle.SingleInstance);

var installerSettings = context.Settings.Get<SynchronizedStorageInstallerSettings>();
context.Container.ConfigureComponent<ISagaPersister>(builder => new AzureSagaPersister(builder.Build<IProvideCloudTableClient>(), installerSettings.Disabled, compatibilityModeEnabled, secondaryIndices, conventionalTablePrefix), DependencyLifecycle.SingleInstance);
var jsonSerializer = context.Settings.Get<JsonSerializer>(WellKnownConfigurationKeys.SagaJsonSerializer);
var readerCreator = context.Settings.Get<Func<TextReader, JsonReader>>(WellKnownConfigurationKeys.SagaReaderCreator);
var writerCreator = context.Settings.Get<Func<TextWriter, JsonWriter>>(WellKnownConfigurationKeys.SagaWriterCreator);

context.Container.ConfigureComponent<ISagaPersister>(builder =>
new AzureSagaPersister(builder.Build<IProvideCloudTableClient>(),
installerSettings.Disabled,
compatibilityModeEnabled,
secondaryIndices,
conventionalTablePrefix,
jsonSerializer,
readerCreator,
writerCreator),
DependencyLifecycle.SingleInstance);
}

static readonly ILog Logger = LogManager.GetLogger<SagaStorage>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
using Persistence.AzureTable;
using Microsoft.Azure.Cosmos.Table;
using Extensibility;
using Newtonsoft.Json;
using JsonSerializer = Newtonsoft.Json.JsonSerializer;

public partial class PersistenceTestsConfiguration : IProvideCloudTableClient
{
Expand Down Expand Up @@ -39,7 +41,15 @@ public Task Configure()
SagaIdGenerator = new SagaIdGenerator();
var resolver = new TableHolderResolver(this, new TableInformation(SetupFixture.TableName));
var secondaryIndices = new SecondaryIndex();
SagaStorage = new AzureSagaPersister(this, true, false, secondaryIndices, null);
SagaStorage = new AzureSagaPersister(
this,
true,
false,
secondaryIndices,
null,
JsonSerializer.Create(),
reader => new JsonTextReader(reader),
writer => new JsonTextWriter(writer));
SynchronizedStorage = new StorageSessionFactory(resolver, null);
SynchronizedStorageAdapter = new StorageSessionAdapter(null);
OutboxStorage = new OutboxPersister(resolver);
Expand Down
Loading

0 comments on commit 6f80606

Please sign in to comment.