Skip to content

Commit

Permalink
Expose the possibility of setting the json settings and more advanced…
Browse files Browse the repository at this point in the history
… scenarios when needing to override the readers and writers

Apply suggestions from code review

Co-authored-by: Sean Feldman <[email protected]>

Customize the serializer to make it throw
  • Loading branch information
danielmarbach committed Feb 26, 2021
1 parent 9f8c1f4 commit b7ae858
Show file tree
Hide file tree
Showing 8 changed files with 142 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@ static class WellKnownConfigurationKeys
public const string SagaStorageAssumeSecondaryKeyUsesANonEmptyRowKeySetToThePartitionKey = "AzureSagaStorage.SagaStorageAssumeSecondaryKeyUsesANonEmptyRowKeySetToThePartitionKey";
public const string SagaStorageCompatibilityMode = "AzureSagaStorage.EnableCompatibilityMode";
public const string SagaStorageConventionalTablePrefix = "AzureSagaStorage.ConventionalTablePrefix";
public const string SagaJsonSerializer = "AzureSagaStorage.JsonSerializerSettings";
public const string SagaReaderCreator = "AzureSagaStorage.ReaderCreator";
public const string SagaWriterCreator = "AzureSagaStorage.WriterCreator";

public const string SubscriptionStorageTableName = "AzureSubscriptionStorage.TableName";
public const string SubscriptionStorageCacheFor = "AzureSubscriptionStorage.CacheFor";
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,29 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.IO;
using System.Net;
using System.Threading.Tasks;
using Extensibility;
using Microsoft.Azure.Cosmos.Table;
using Newtonsoft.Json;
using Sagas;

class AzureSagaPersister : ISagaPersister
{
public AzureSagaPersister(IProvideCloudTableClient tableClientProvider, bool disableTableCreation, bool compatibilityMode, SecondaryIndex secondaryIndex, string conventionalTablePrefix)
public AzureSagaPersister(
IProvideCloudTableClient tableClientProvider,
bool disableTableCreation,
bool compatibilityMode,
SecondaryIndex secondaryIndex,
string conventionalTablePrefix,
JsonSerializer jsonSerializer,
Func<TextReader, JsonReader> readerCreator,
Func<TextWriter, JsonWriter> writerCreator)
{
this.writerCreator = writerCreator;
this.readerCreator = readerCreator;
this.jsonSerializer = jsonSerializer;
this.conventionalTablePrefix = conventionalTablePrefix;
this.compatibilityMode = compatibilityMode;
this.disableTableCreation = disableTableCreation;
Expand All @@ -31,7 +44,7 @@ public async Task Save(IContainSagaData sagaData, SagaCorrelationProperty correl
{
PartitionKey = partitionKey.PartitionKey,
RowKey = sagaData.Id.ToString(),
});
}, jsonSerializer, writerCreator);

var table = await GetTableAndCreateIfNotExists(storageSession, sagaDataType)
.ConfigureAwait(false);
Expand All @@ -52,7 +65,7 @@ public Task Update(IContainSagaData sagaData, SynchronizedStorageSession session
var meta = context.GetOrCreate<SagaInstanceMetadata>();
var sagaDataEntityToUpdate = meta.Entities[sagaData];

var sagaAsDictionaryTableEntity = DictionaryTableEntityExtensions.ToDictionaryTableEntity(sagaData, sagaDataEntityToUpdate);
var sagaAsDictionaryTableEntity = DictionaryTableEntityExtensions.ToDictionaryTableEntity(sagaData, sagaDataEntityToUpdate, jsonSerializer, writerCreator);

storageSession.Add(new SagaUpdate(partitionKey, sagaAsDictionaryTableEntity));

Expand Down Expand Up @@ -84,7 +97,7 @@ public async Task<TSagaData> Get<TSagaData>(Guid sagaId, SynchronizedStorageSess

readSagaDataEntity.Table = tableToReadFrom;

var sagaData = DictionaryTableEntityExtensions.ToSagaData<TSagaData>(readSagaDataEntity);
var sagaData = DictionaryTableEntityExtensions.ToSagaData<TSagaData>(readSagaDataEntity, jsonSerializer, readerCreator);
var meta = context.GetOrCreate<SagaInstanceMetadata>();
meta.Entities[sagaData] = readSagaDataEntity;
return sagaData;
Expand Down Expand Up @@ -199,13 +212,16 @@ static TableEntityPartitionKey GetPartitionKey(ContextBag context, Guid sagaData
return partitionKey;
}

bool disableTableCreation;
CloudTableClient client;
SecondaryIndex secondaryIndex;
readonly bool disableTableCreation;
readonly CloudTableClient client;
readonly SecondaryIndex secondaryIndex;
const string SecondaryIndexIndicatorProperty = "NServiceBus_2ndIndexKey";
static ConcurrentDictionary<string, bool> tableCreated = new ConcurrentDictionary<string, bool>();
static readonly ConcurrentDictionary<string, bool> tableCreated = new ConcurrentDictionary<string, bool>();
private readonly bool compatibilityMode;
private readonly string conventionalTablePrefix;
readonly JsonSerializer jsonSerializer;
readonly Func<TextReader, JsonReader> readerCreator;
readonly Func<TextWriter, JsonWriter> writerCreator;

/// <summary>
/// Holds saga instance related metadata in a scope of a <see cref="ContextBag" />.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
namespace NServiceBus
{
using System;
using System.IO;
using Configuration.AdvancedExtensibility;
using Microsoft.Azure.Cosmos.Table;
using Newtonsoft.Json;
using Persistence.AzureTable;

/// <summary>
Expand Down Expand Up @@ -46,6 +49,50 @@ public static partial class ConfigureAzureSagaStorage
return config;
}

/// <summary>
/// Overrides the default settings used by the saga property serializer used for complex property types serialization that is not supported by default with tables.
/// </summary>
public static PersistenceExtensions<AzureTablePersistence, StorageType.Sagas> JsonSettings(this PersistenceExtensions<AzureTablePersistence, StorageType.Sagas> config, JsonSerializerSettings jsonSerializerSettings)
{
Guard.AgainstNull(nameof(config), config);
Guard.AgainstNull(nameof(jsonSerializerSettings), jsonSerializerSettings);

var settings = config.GetSettings();
settings.Set(WellKnownConfigurationKeys.SagaJsonSerializer, Newtonsoft.Json.JsonSerializer.Create(jsonSerializerSettings));

return config;
}

/// <summary>
/// Overrides the reader creator to customize data deserialization.
/// </summary>
public static PersistenceExtensions<AzureTablePersistence, StorageType.Sagas> ReaderCreator(this PersistenceExtensions<AzureTablePersistence, StorageType.Sagas> config, Func<TextReader, JsonReader> readerCreator)
{
Guard.AgainstNull(nameof(config), config);
Guard.AgainstNull(nameof(readerCreator), readerCreator);

var settings = config.GetSettings();

settings.Set(WellKnownConfigurationKeys.SagaReaderCreator, readerCreator);

return config;
}

/// <summary>
/// Overrides the writer creator to customize data serialization.
/// </summary>
public static PersistenceExtensions<AzureTablePersistence, StorageType.Sagas> WriterCreator(this PersistenceExtensions<AzureTablePersistence, StorageType.Sagas> config, Func<StringWriter, JsonWriter> writerCreator)
{
Guard.AgainstNull(nameof(config), config);
Guard.AgainstNull(nameof(writerCreator), writerCreator);

var settings = config.GetSettings();

settings.Set(WellKnownConfigurationKeys.SagaWriterCreator, writerCreator);

return config;
}

/// <summary>
/// Configures the backward compatibility specific settings.
/// </summary>
Expand All @@ -56,4 +103,4 @@ public static CompatibilitySettings Compatibility(this PersistenceExtensions<Azu
return new CompatibilitySettings(config.GetSettings());
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,14 @@ namespace NServiceBus.Persistence.AzureTable
using System.Collections.Concurrent;
using System.Linq.Expressions;
using Newtonsoft.Json;
using Newtonsoft.Json.Serialization;

static class DictionaryTableEntityExtensions
{
public static TEntity ToSagaData<TEntity>(DictionaryTableEntity entity)
where TEntity : IContainSagaData
{
return (TEntity)ToSagaData(typeof(TEntity), entity);
}
public static TEntity ToSagaData<TEntity>(DictionaryTableEntity entity, JsonSerializer jsonSerializer, Func<TextReader, JsonReader> readerCreator)
where TEntity : IContainSagaData =>
(TEntity)ToSagaData(typeof(TEntity), entity, jsonSerializer, readerCreator);

private static object ToSagaData(Type sagaDataType, DictionaryTableEntity entity)
static object ToSagaData(Type sagaDataType, DictionaryTableEntity entity, JsonSerializer jsonSerializer, Func<TextReader, JsonReader> readerCreator)
{
var toCreate = Activator.CreateInstance(sagaDataType);
foreach (var accessor in GetPropertyAccessors(sagaDataType))
Expand Down Expand Up @@ -51,9 +48,10 @@ private static object ToSagaData(Type sagaDataType, DictionaryTableEntity entity
// possibly serialized JSON.NET value
try
{
using (var stringReader = new StringReader(value.StringValue))
using (var reader = new StringReader(value.StringValue))
using (var jsonReader = readerCreator(reader))
{
var deserialized = jsonSerializer.Deserialize(stringReader, type);
var deserialized = jsonSerializer.Deserialize(jsonReader, type);
accessor.Setter(toCreate, deserialized);
}
}
Expand All @@ -71,7 +69,7 @@ private static object ToSagaData(Type sagaDataType, DictionaryTableEntity entity
return toCreate;
}

public static DictionaryTableEntity ToDictionaryTableEntity(object sagaData, DictionaryTableEntity toPersist)
public static DictionaryTableEntity ToDictionaryTableEntity(object sagaData, DictionaryTableEntity toPersist, JsonSerializer jsonSerializer, Func<TextWriter, JsonWriter> writerCreator)
{
foreach (var accessor in GetPropertyAccessors(sagaData.GetType()))
{
Expand Down Expand Up @@ -118,17 +116,18 @@ public static DictionaryTableEntity ToDictionaryTableEntity(object sagaData, Dic
}
else
{
using (var sw = new StringWriter())
using (var stringWriter = new StringWriter())
using (var writer = writerCreator(stringWriter))
{
try
{
jsonSerializerWithNonAbstractDefaultContractResolver.Serialize(sw, value, type);
jsonSerializer.Serialize(writer, value, type);
}
catch (Exception)
{
throw new NotSupportedException($"The property type '{type.Name}' is not supported in Azure Table Storage and it cannot be serialized with JSON.NET.");
}
toPersist[name] = new EntityProperty(sw.ToString());
toPersist[name] = new EntityProperty(stringWriter.ToString());
}
}
}
Expand Down Expand Up @@ -254,12 +253,6 @@ public static TableQuery<DictionaryTableEntity> BuildWherePropertyQuery<TSagaDat

static readonly ConcurrentDictionary<Type, IReadOnlyCollection<PropertyAccessor>> propertyAccessorCache = new ConcurrentDictionary<Type, IReadOnlyCollection<PropertyAccessor>>();

static readonly JsonSerializer jsonSerializer = JsonSerializer.Create();
static readonly JsonSerializer jsonSerializerWithNonAbstractDefaultContractResolver = new JsonSerializer
{
ContractResolver = new NonAbstractDefaultContractResolver(),
};

private static readonly DateTime StorageTableMinDateTime = new DateTime(1601, 1, 1);

sealed class PropertyAccessor
Expand Down Expand Up @@ -307,17 +300,5 @@ private static Action<object, object> GenerateSetter(PropertyInfo propertyInfo)
public string Name { get; }
public Type PropertyType { get; }
}

class NonAbstractDefaultContractResolver : DefaultContractResolver
{
protected override JsonObjectContract CreateObjectContract(Type objectType)
{
if (objectType.IsAbstract || objectType.IsInterface)
{
throw new ArgumentException("Cannot serialize an abstract class/interface", nameof(objectType));
}
return base.CreateObjectContract(objectType);
}
}
}
}
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
namespace NServiceBus.Persistence.AzureTable
{
using System;
using System.IO;
using Features;
using Logging;
using Sagas;
using Migration;
using Newtonsoft.Json;

class SagaStorage : Feature
{
Expand All @@ -15,6 +18,9 @@ internal SagaStorage()
s.SetDefault(WellKnownConfigurationKeys.SagaStorageAssumeSecondaryKeyUsesANonEmptyRowKeySetToThePartitionKey, AzureStorageSagaDefaults.AssumeSecondaryKeyUsesANonEmptyRowKeySetToThePartitionKey);
s.SetDefault(WellKnownConfigurationKeys.SagaStorageCompatibilityMode, AzureStorageSagaDefaults.CompatibilityModeEnabled);
s.SetDefault(WellKnownConfigurationKeys.SagaStorageConventionalTablePrefix, AzureStorageSagaDefaults.ConventionalTablePrefix);
s.SetDefault(WellKnownConfigurationKeys.SagaJsonSerializer, JsonSerializer.Create());
s.SetDefault(WellKnownConfigurationKeys.SagaReaderCreator, (Func<TextReader, JsonReader>)(reader => new JsonTextReader(reader)));
s.SetDefault(WellKnownConfigurationKeys.SagaWriterCreator, (Func<TextWriter, JsonWriter>)(writer => new JsonTextWriter(writer)));
s.EnableFeatureByDefault<SynchronizedStorage>();
s.SetDefault<ISagaIdGenerator>(new SagaIdGenerator());
Expand Down Expand Up @@ -49,7 +55,20 @@ protected override void Setup(FeatureConfigurationContext context)
builder.Build<TableHolderResolver>(), secondaryIndices, compatibilityModeEnabled, conventionalTablePrefix), DependencyLifecycle.SingleInstance);

var installerSettings = context.Settings.Get<SynchronizedStorageInstallerSettings>();
context.Container.ConfigureComponent<ISagaPersister>(builder => new AzureSagaPersister(builder.Build<IProvideCloudTableClient>(), installerSettings.Disabled, compatibilityModeEnabled, secondaryIndices, conventionalTablePrefix), DependencyLifecycle.SingleInstance);
var jsonSerializer = context.Settings.Get<JsonSerializer>(WellKnownConfigurationKeys.SagaJsonSerializer);
var readerCreator = context.Settings.Get<Func<TextReader, JsonReader>>(WellKnownConfigurationKeys.SagaReaderCreator);
var writerCreator = context.Settings.Get<Func<TextWriter, JsonWriter>>(WellKnownConfigurationKeys.SagaWriterCreator);

context.Container.ConfigureComponent<ISagaPersister>(builder =>
new AzureSagaPersister(builder.Build<IProvideCloudTableClient>(),
installerSettings.Disabled,
compatibilityModeEnabled,
secondaryIndices,
conventionalTablePrefix,
jsonSerializer,
readerCreator,
writerCreator),
DependencyLifecycle.SingleInstance);
}

static readonly ILog Logger = LogManager.GetLogger<SagaStorage>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
using Persistence.AzureTable;
using Microsoft.Azure.Cosmos.Table;
using Extensibility;
using Newtonsoft.Json;
using JsonSerializer = Newtonsoft.Json.JsonSerializer;

public partial class PersistenceTestsConfiguration : IProvideCloudTableClient
{
Expand Down Expand Up @@ -39,7 +41,15 @@ public Task Configure()
SagaIdGenerator = new SagaIdGenerator();
var resolver = new TableHolderResolver(this, new TableInformation(SetupFixture.TableName));
var secondaryIndices = new SecondaryIndex();
SagaStorage = new AzureSagaPersister(this, true, false, secondaryIndices, null);
SagaStorage = new AzureSagaPersister(
this,
true,
false,
secondaryIndices,
null,
JsonSerializer.Create(),
reader => new JsonTextReader(reader),
writer => new JsonTextWriter(writer));
SynchronizedStorage = new StorageSessionFactory(resolver, null);
SynchronizedStorageAdapter = new StorageSessionAdapter(null);
OutboxStorage = new OutboxPersister(resolver);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ namespace NServiceBus.AcceptanceTests
using System;
using System.Linq;
using System.Threading.Tasks;
using global::Newtonsoft.Json;
using global::Newtonsoft.Json.Serialization;
using NServiceBus;
using NServiceBus.AcceptanceTesting;
using NServiceBus.AcceptanceTesting.Support;
Expand Down Expand Up @@ -38,7 +40,24 @@ public class EndpointWithNonSerializableSaga : EndpointConfigurationBuilder
{
public EndpointWithNonSerializableSaga()
{
EndpointSetup<DefaultServer>();
EndpointSetup<DefaultServer>(b =>
{
var sagaPersistence = b.UsePersistence<AzureTablePersistence, StorageType.Sagas>();
var customSettings = new JsonSerializerSettings { ContractResolver = new NonAbstractDefaultContractResolver() };
sagaPersistence.JsonSettings(customSettings);
});
}

class NonAbstractDefaultContractResolver : DefaultContractResolver
{
protected override JsonObjectContract CreateObjectContract(Type objectType)
{
if (objectType.IsAbstract || objectType.IsInterface)
{
throw new ArgumentException("Cannot serialize an abstract class/interface", nameof(objectType));
}
return base.CreateObjectContract(objectType);
}
}

public class NonSerializableSaga : Saga<NonSerializableSagaData>, IAmStartedByMessages<StartSagaMessage>
Expand Down
Loading

0 comments on commit b7ae858

Please sign in to comment.