Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update DataBusProperty serialization #6894

Merged
merged 6 commits into from
Oct 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@
using AcceptanceTesting;
using AcceptanceTesting.Customization;
using EndpointTemplates;
using NServiceBus.MessageMutator;
using MessageMutator;
using NUnit.Framework;

public class When_sending_databus_properties_with_systemjsonserializer : NServiceBusAcceptanceTest
public class When_sending_databus_properties_with_systemjson_message_serializer : NServiceBusAcceptanceTest
{
[Test]
public async Task Should_receive_messages_with_largepayload_correctly()
Expand Down Expand Up @@ -78,17 +78,17 @@ public Task Handle(MyMessageWithLargePayload messageWithLargePayload, IMessageHa

Context testContext;
}
}

public class Mutator : IMutateIncomingTransportMessages
{
public Task MutateIncoming(MutateIncomingTransportMessageContext context)
public class Mutator : IMutateIncomingTransportMessages
{
if (context.Body.Length > PayloadSize)
public Task MutateIncoming(MutateIncomingTransportMessageContext context)
{
throw new Exception("The message body is too large, which means the DataBus was not used to transfer the payload.");
if (context.Body.Length > PayloadSize)
{
throw new Exception("The message body is too large, which means the DataBus was not used to transfer the payload.");
}
return Task.CompletedTask;
}
return Task.CompletedTask;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
using MessageMutator;
using NUnit.Framework;

public class When_sending_databus_properties : NServiceBusAcceptanceTest
public class When_sending_databus_properties_with_xml_message_serializer : NServiceBusAcceptanceTest
{
[Test]
public async Task Should_receive_messages_with_largepayload_correctly()
Expand Down Expand Up @@ -43,7 +43,7 @@ public Sender()
{
var basePath = Path.Combine(TestContext.CurrentContext.TestDirectory, "databus", "sender");
builder.UseDataBus<FileShareDataBus, SystemJsonDataBusSerializer>().BasePath(basePath);

builder.UseSerialization<XmlSerializer>();
builder.ConfigureRouting().RouteToEndpoint(typeof(MyMessageWithLargePayload), typeof(Receiver));
});
}
Expand All @@ -57,6 +57,7 @@ public Receiver()
{
var basePath = Path.Combine(TestContext.CurrentContext.TestDirectory, "databus", "sender");
builder.UseDataBus<FileShareDataBus, SystemJsonDataBusSerializer>().BasePath(basePath);
builder.UseSerialization<XmlSerializer>();
builder.RegisterMessageMutator(new Mutator());
});
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
namespace NServiceBus.AcceptanceTests.DataBus
{
using System;
using System.IO;
using System.Threading.Tasks;
using AcceptanceTesting;
using AcceptanceTesting.Customization;
using EndpointTemplates;
using MessageMutator;
using NUnit.Framework;

public class When_sending_unobtrusive_databus_properties_with_systemjson_message_serializer : NServiceBusAcceptanceTest
{
[Test]
public async Task Should_receive_messages_with_largepayload_correctly()
{
var payloadToSend = new byte[PayloadSize];

var context = await Scenario.Define<Context>()
.WithEndpoint<Sender>(b => b.When(session => session.Send(new MyMessageWithLargePayload
{
Payload = payloadToSend
})))
.WithEndpoint<Receiver>()
.Done(c => c.ReceivedPayload != null)
.Run();

Assert.AreEqual(payloadToSend, context.ReceivedPayload, "The large payload should be marshalled correctly using the databus");
}

const int PayloadSize = 500;

public class Context : ScenarioContext
{
public byte[] ReceivedPayload { get; set; }
}

public class Sender : EndpointConfigurationBuilder
{
public Sender()
{
EndpointSetup<DefaultServer>(builder =>
{
builder.Conventions()
.DefiningCommandsAs(t => t.Namespace != null && t.FullName == typeof(MyMessageWithLargePayload).FullName)
.DefiningDataBusPropertiesAs(t => t.Name.Contains("Payload"));

var basePath = Path.Combine(TestContext.CurrentContext.TestDirectory, "databus", "sender");
builder.UseDataBus<FileShareDataBus, SystemJsonDataBusSerializer>().BasePath(basePath);
builder.UseSerialization<SystemJsonSerializer>();
builder.ConfigureRouting().RouteToEndpoint(typeof(MyMessageWithLargePayload), typeof(Receiver));
}).ExcludeType<MyMessageWithLargePayload>(); // remove that type from assembly scanning to simulate what would happen with true unobtrusive mode
}
}

public class Receiver : EndpointConfigurationBuilder
{
public Receiver()
{
EndpointSetup<DefaultServer>(builder =>
{
builder.Conventions()
.DefiningCommandsAs(t => t.Namespace != null && t.FullName == typeof(MyMessageWithLargePayload).FullName)
.DefiningDataBusPropertiesAs(t => t.Name.Contains("Payload"));

var basePath = Path.Combine(TestContext.CurrentContext.TestDirectory, "databus", "sender");
builder.UseDataBus<FileShareDataBus, SystemJsonDataBusSerializer>().BasePath(basePath);
builder.UseSerialization<SystemJsonSerializer>();
builder.RegisterMessageMutator(new Mutator());
});
}

public class MyMessageHandler : IHandleMessages<MyMessageWithLargePayload>
{
public MyMessageHandler(Context context)
{
testContext = context;
}

public Task Handle(MyMessageWithLargePayload messageWithLargePayload, IMessageHandlerContext context)
{
testContext.ReceivedPayload = messageWithLargePayload.Payload;

return Task.CompletedTask;
}

Context testContext;
}

public class Mutator : IMutateIncomingTransportMessages
{
public Task MutateIncoming(MutateIncomingTransportMessageContext context)
{
if (context.Body.Length > PayloadSize)
{
throw new Exception("The message body is too large, which means the DataBus was not used to transfer the payload.");
}
return Task.CompletedTask;
}
}
}

public class MyMessageWithLargePayload
{
public byte[] Payload { get; set; }
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
using MessageMutator;
using NUnit.Framework;

public class When_sending_databus_properties_with_unobtrusive : NServiceBusAcceptanceTest
public class When_sending_unobtrusive_databus_properties_with_xml_message_serializer : NServiceBusAcceptanceTest
{
[Test]
public async Task Should_receive_messages_with_largepayload_correctly()
Expand Down Expand Up @@ -47,7 +47,7 @@ public Sender()

var basePath = Path.Combine(TestContext.CurrentContext.TestDirectory, "databus", "sender");
builder.UseDataBus<FileShareDataBus, SystemJsonDataBusSerializer>().BasePath(basePath);

builder.UseSerialization<XmlSerializer>();
builder.ConfigureRouting().RouteToEndpoint(typeof(MyMessageWithLargePayload), typeof(Receiver));
}).ExcludeType<MyMessageWithLargePayload>(); // remove that type from assembly scanning to simulate what would happen with true unobtrusive mode
}
Expand All @@ -65,6 +65,7 @@ public Receiver()

var basePath = Path.Combine(TestContext.CurrentContext.TestDirectory, "databus", "sender");
builder.UseDataBus<FileShareDataBus, SystemJsonDataBusSerializer>().BasePath(basePath);
builder.UseSerialization<XmlSerializer>();
builder.RegisterMessageMutator(new Mutator());
});
}
Expand All @@ -85,17 +86,17 @@ public Task Handle(MyMessageWithLargePayload messageWithLargePayload, IMessageHa

Context testContext;
}
}

public class Mutator : IMutateIncomingTransportMessages
{
public Task MutateIncoming(MutateIncomingTransportMessageContext context)
public class Mutator : IMutateIncomingTransportMessages
{
if (context.Body.Length > PayloadSize)
public Task MutateIncoming(MutateIncomingTransportMessageContext context)
{
throw new Exception("The message body is too large, which means the DataBus was not used to transfer the payload.");
if (context.Body.Length > PayloadSize)
{
throw new Exception("The message body is too large, which means the DataBus was not used to transfer the payload.");
}
return Task.CompletedTask;
}
return Task.CompletedTask;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,14 +164,13 @@ namespace NServiceBus
{
public DataBusProperty() { }
public DataBusProperty(T value) { }
protected DataBusProperty(System.Runtime.Serialization.SerializationInfo info, System.Runtime.Serialization.StreamingContext context) { }
public bool HasValue { get; set; }
public string Key { get; set; }
[System.Text.Json.Serialization.JsonIgnore]
public System.Type Type { get; }
[System.Text.Json.Serialization.JsonIgnore]
[System.Xml.Serialization.XmlIgnore]
public T Value { get; }
public void GetObjectData(System.Runtime.Serialization.SerializationInfo info, System.Runtime.Serialization.StreamingContext context) { }
public object GetValue() { }
public void SetValue(object valueToSet) { }
}
Expand Down
70 changes: 11 additions & 59 deletions src/NServiceBus.Core/DataBus/DataBusProperty.cs
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
namespace NServiceBus
{
using System;
using System.Runtime.Serialization;
using System.Security;
using System.Text.Json.Serialization;
using System.Xml.Serialization;

/// <summary>
/// Default implementation for <see cref="IDataBusProperty" />.
Expand All @@ -12,53 +11,28 @@
public class DataBusProperty<T> : IDataBusProperty where T : class
{
/// <summary>
/// initializes a <see cref="DataBusProperty{T}" /> with no value set.
/// Initializes a <see cref="DataBusProperty{T}" /> with no value set.
/// </summary>
public DataBusProperty()
{
Type = typeof(T);
}
public DataBusProperty() { }

/// <summary>
/// initializes a <see cref="DataBusProperty{T}" /> with the <paramref name="value" />.
/// Initializes a <see cref="DataBusProperty{T}" /> with the <paramref name="value" />.
/// </summary>
/// <param name="value">The value to initialize with.</param>
public DataBusProperty(T value)
{
if (value != null)
{
Type = typeof(T);
}

SetValue(value);
}

/// <summary>
/// For serialization purposes.
/// </summary>
/// <param name="info">The <see cref="SerializationInfo" /> to populate with data. </param>
/// <param name="context">The destination (see <see cref="StreamingContext" />) for this serialization. </param>
/// <exception cref="SecurityException">The caller does not have the required permission. </exception>
protected DataBusProperty(SerializationInfo info, StreamingContext context)
{
ArgumentNullException.ThrowIfNull(info);
Key = info.GetString("Key");
HasValue = info.GetBoolean("HasValue");
}
public DataBusProperty(T value) => SetValue(value);

/// <summary>
/// The value.
/// </summary>
#pragma warning disable IDE0032 // Use auto property - Value will be serialized into the message body if it is an auto property
[JsonIgnore]
public T Value => value;
#pragma warning restore IDE0032 // Use auto property
[XmlIgnore]
public T Value { get; private set; }

/// <summary>
/// The property <see cref="Type" />.
/// </summary>
[JsonIgnore]
public Type Type { get; }
public Type Type { get; } = typeof(T);

/// <summary>
/// The <see cref="IDataBusProperty" /> key.
Expand All @@ -76,36 +50,14 @@ protected DataBusProperty(SerializationInfo info, StreamingContext context)
/// <param name="valueToSet">The value to set.</param>
public void SetValue(object valueToSet)
{
value = valueToSet as T;
HasValue = value != null;
Value = valueToSet as T;
HasValue = Value != null;
}

/// <summary>
/// Gets the value of the <see cref="IDataBusProperty" />.
/// </summary>
/// <returns>The value.</returns>
public object GetValue()
{
return Value;
}


/// <summary>
/// Populates a <see cref="SerializationInfo" /> with the data needed to serialize the target object.
/// </summary>
/// <param name="info">The <see cref="SerializationInfo" /> to populate with data. </param>
/// <param name="context">The destination (see <see cref="StreamingContext" />) for this serialization. </param>
/// <exception cref="T:System.Security.SecurityException">The caller does not have the required permission. </exception>
public void GetObjectData(SerializationInfo info, StreamingContext context)
{
ArgumentNullException.ThrowIfNull(info);
info.AddValue("Key", Key);
info.AddValue("HasValue", HasValue);
}

#pragma warning disable IDE0032 // Use auto property - value will be serialized into the message body if it is an auto property
T value;
#pragma warning restore IDE0032 // Use auto property
public object GetValue() => Value;
}

}