Skip to content

Commit

Permalink
Akka.Remote: ensure RemoteActorRef are serialized with correct `Add…
Browse files Browse the repository at this point in the history
…ress` when using multiple transports (#7393)

* harden specs for #7378

Need to ensure we use `RemoteActorRef` instead of `ActorSelection` in order to properly reproduce bug

* changed up `ActorSystem` names to make it easier to debug

* fixed outbound address serialization

close #7378

* fixed compilation issue
  • Loading branch information
Aaronontheweb authored Nov 19, 2024
1 parent e412b58 commit 4ae4792
Show file tree
Hide file tree
Showing 6 changed files with 45 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
using Akka.Remote;
using Akka.Remote.Serialization;
using Akka.Remote.Transport;
using Akka.Serialization;
using BenchmarkDotNet.Attributes;
using BenchmarkDotNet.Loggers;
using Google.Protobuf;
Expand All @@ -40,6 +41,7 @@ public class AkkaPduCodecBenchmark

private Address _addr1;
private Address _addr2;
private Information _addr2Info;
private AkkaPduProtobuffCodec _recvCodec;
private AkkaPduProtobuffCodec _sendCodec;

Expand All @@ -62,6 +64,7 @@ public async Task Setup()
_rarp = RARP.For(_sys1).Provider;
_addr1 = _rarp.DefaultAddress;
_addr2 = RARP.For(_sys2).Provider.DefaultAddress;
_addr2Info = new Information(_addr2, _sys2);

_senderActorRef =
_sys2.ActorOf(act => { act.ReceiveAny((_, context) => context.Sender.Tell(context.Sender)); },
Expand Down Expand Up @@ -188,7 +191,7 @@ public void DeserializePayloadOnly()
private ByteString CreatePayloadPdu()
{
return _sendCodec.ConstructPayload(_sendCodec.ConstructMessage(_remoteReceiveRef.LocalAddressToUse, _remoteReceiveRef,
MessageSerializer.Serialize(_sys2, _addr2, _message), _senderActorRef, null, _lastAck));
MessageSerializer.Serialize(_sys2, _addr2Info, _message), _senderActorRef, null, _lastAck));
}
}
}
3 changes: 2 additions & 1 deletion src/core/Akka.Remote.Tests/Serialization/BugFix5062Spec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@ public void Failed_serialization_should_give_proper_exception_message()
true);

var node1 = new Address("akka.tcp", "Sys", "localhost", 2551);
var serialized = MessageSerializer.Serialize((ExtendedActorSystem)Sys, node1, message);
var info = new Information(node1, Sys);
var serialized = MessageSerializer.Serialize((ExtendedActorSystem)Sys, info, message);

var o = new object();
o.Invoking(_ => MessageSerializer.Deserialize((ExtendedActorSystem)Sys, serialized)).Should()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@ namespace Akka.Remote.Tests.Transport;
/// <summary>
/// Added this spec to prove the existence of https://github.com/akkadotnet/akka.net/issues/7378
/// </summary>
public class MultiTransportAddressingSpec : AkkaSpec
public class MultiTransportAddressingSpec : TestKit.Xunit2.TestKit
{
public MultiTransportAddressingSpec(ITestOutputHelper output) : base(GetConfig(Sys1Port1, Sys1Port2), output)
public MultiTransportAddressingSpec(ITestOutputHelper output) : base(GetConfig(Sys1Port1, Sys1Port2), "MultiTransportSpec", output)
{
}

Expand All @@ -30,7 +30,7 @@ public MultiTransportAddressingSpec(ITestOutputHelper output) : base(GetConfig(S
public const int Sys2Port1 = 9993;
public const int Sys2Port2 = 9994;

private static Config GetConfig(int transportPort1, int transportPort2)
private static Config GetConfig(int transportPort1, int transportPort2, string actorSystemName = "MultiTransportSpec")
{
return $$"""
Expand All @@ -45,15 +45,15 @@ private static Config GetConfig(int transportPort1, int transportPort2)
transport-class = "Akka.Remote.Transport.TestTransport, Akka.Remote"
applied-adapters = []
registry-key = aX33k0jWKg
local-address = "test1://MultiTransportSpec@localhost:{{transportPort1}}"
local-address = "test1://{{actorSystemName}}@localhost:{{transportPort1}}"
maximum-payload-bytes = 32000b
scheme-identifier = test1
}
test2 {
transport-class = "Akka.Remote.Transport.TestTransport, Akka.Remote"
applied-adapters = []
registry-key = aX33k0j11c
local-address = "test2://MultiTransportSpec@localhost:{{transportPort2}}"
local-address = "test2://{{actorSystemName}}@localhost:{{transportPort2}}"
maximum-payload-bytes = 32000b
scheme-identifier = test2
}
Expand All @@ -67,7 +67,8 @@ private static Config GetConfig(int transportPort1, int transportPort2)
[Fact]
public async Task Should_Use_Second_Transport_For_Communication()
{
var secondSystem = ActorSystem.Create("MultiTransportSpec", GetConfig(Sys2Port1, Sys2Port2).WithFallback(Sys.Settings.Config));
const string secondActorSystemName = "MultiTransportSpec2";
var secondSystem = ActorSystem.Create(secondActorSystemName, GetConfig(Sys2Port1, Sys2Port2, secondActorSystemName).WithFallback(Sys.Settings.Config));
InitializeLogger(secondSystem);
var assertProbe = CreateTestProbe(secondSystem);

Expand All @@ -87,11 +88,19 @@ public async Task Should_Use_Second_Transport_For_Communication()
Shutdown(secondSystem);
}

return;

async Task PingAndVerify(string scheme, int port)
{
var selection = Sys.ActorSelection($"akka.{scheme}://MultiTransportSpec@localhost:{port}/user/echo");
selection.Tell("ping", TestActor);

var selection = Sys.ActorSelection($"akka.{scheme}://{secondActorSystemName}@localhost:{port}/user/echo");

// important: https://github.com/akkadotnet/akka.net/issues/7378 only occurs with IActorRefs
var actor = await selection.ResolveOne(TimeSpan.FromSeconds(1));

// assert that the remote actor is using the correct transport
Assert.Contains(scheme, actor.Path.Address.Protocol);

actor.Tell("ping");
var reply = await ExpectMsgAsync<string>(TimeSpan.FromSeconds(3));
Assert.Equal("pong", reply);

Expand Down
4 changes: 3 additions & 1 deletion src/core/Akka.Remote/Endpoint.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1043,6 +1043,7 @@ public EndpointWriter(
Inbound = handleOrActive != null;
_ackDeadline = NewAckDeadline();
_handle = handleOrActive;
_transportInformation = new Information(localAddress, Context.System);
_remoteMetrics = RemoteMetricsExtension.Create(Context.System.AsInstanceOf<ExtendedActorSystem>());

if (_handle == null)
Expand All @@ -1056,6 +1057,7 @@ public EndpointWriter(
}

private readonly ILoggingAdapter _log = Context.GetLogger();
private readonly Information _transportInformation;
private readonly int? _refuseUid;
private readonly AkkaPduCodec _codec;
private readonly IActorRef _reliableDeliverySupervisor;
Expand Down Expand Up @@ -1357,7 +1359,7 @@ private SerializedMessage SerializeMessage(object msg)
{
throw new EndpointException("Internal error: No handle was present during serialization of outbound message.");
}
return MessageSerializer.Serialize(_system, _handle.LocalAddress, msg);
return MessageSerializer.Serialize(_system, _transportInformation, msg);
}

private int _writeCount = 0;
Expand Down
20 changes: 11 additions & 9 deletions src/core/Akka.Remote/EndpointManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -549,14 +549,14 @@ Directive Hopeless(HopelessAssociation e)
{
switch (e)
{
case HopelessAssociation h when h.Uid != null:
_log.Error(e.InnerException ?? e, "Association to [{0}] with UID [{1}] is irrecoverably failed. Quarantining address.", h.RemoteAddress, h.Uid);
case HopelessAssociation { Uid: not null }:
_log.Error(e.InnerException ?? e, "Association to [{0}] with UID [{1}] is irrecoverably failed. Quarantining address.", e.RemoteAddress, e.Uid);
if (_settings.QuarantineDuration.HasValue && _settings.QuarantineDuration != TimeSpan.MaxValue)
{
// have a finite quarantine duration specified in settings.
// If we don't have one specified, don't bother quarantining - it's disabled.
_endpoints.MarkAsQuarantined(h.RemoteAddress, h.Uid.Value, Deadline.Now + _settings.QuarantineDuration);
_eventPublisher.NotifyListeners(new QuarantinedEvent(h.RemoteAddress, h.Uid.Value));
_endpoints.MarkAsQuarantined(e.RemoteAddress, e.Uid.Value, Deadline.Now + _settings.QuarantineDuration);
_eventPublisher.NotifyListeners(new QuarantinedEvent(e.RemoteAddress, e.Uid.Value));
}

return Directive.Stop;
Expand Down Expand Up @@ -818,9 +818,6 @@ bool MatchesQuarantine(AkkaProtocolHandle handle)
Receive<Send>(send =>
{
var recipientAddress = send.Recipient.Path.Address;
IActorRef CreateAndRegisterWritingEndpoint() => _endpoints.RegisterWritableEndpoint(recipientAddress,
CreateEndpoint(recipientAddress, send.Recipient.LocalAddressToUse, _transportMapping[send.Recipient.LocalAddressToUse],
_settings, writing: true, handleOption: null), uid: null);

// pattern match won't throw a NullReferenceException if one is returned by WritableEndpointWithPolicyFor
switch (_endpoints.WritableEndpointWithPolicyFor(recipientAddress))
Expand All @@ -841,6 +838,12 @@ IActorRef CreateAndRegisterWritingEndpoint() => _endpoints.RegisterWritableEndpo
CreateAndRegisterWritingEndpoint().Tell(send);
break;
}

return;

IActorRef CreateAndRegisterWritingEndpoint() => _endpoints.RegisterWritableEndpoint(recipientAddress,
CreateEndpoint(recipientAddress, send.Recipient.LocalAddressToUse, _transportMapping[send.Recipient.LocalAddressToUse],
_settings, writing: true, handleOption: null), uid: null);
});
Receive<InboundAssociation>(ia => HandleInboundAssociation(ia, false));
Receive<EndpointWriter.StoppedReading>(endpoint => AcceptPendingReader(endpoint.Writer));
Expand Down Expand Up @@ -894,8 +897,7 @@ IActorRef CreateAndRegisterWritingEndpoint() => _endpoints.RegisterWritableEndpo
{
if (result.IsFaulted || result.IsCanceled)
{
if (result.Exception != null)
result.Exception.Handle(_ => true);
result.Exception?.Handle(_ => true);
return false;
}
return result.Result.All(x => x);
Expand Down
14 changes: 7 additions & 7 deletions src/core/Akka.Remote/MessageSerializer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ internal static class MessageSerializer
/// <param name="system">The system.</param>
/// <param name="messageProtocol">The message protocol.</param>
/// <returns>System.Object.</returns>
public static object Deserialize(ExtendedActorSystem system, SerializedMessage messageProtocol)
public static object Deserialize(ExtendedActorSystem system,
SerializedMessage messageProtocol)
{
return system.Serialization.Deserialize(
messageProtocol.Message.ToByteArray(),
Expand All @@ -39,19 +40,18 @@ public static object Deserialize(ExtendedActorSystem system, SerializedMessage m
/// Serializes the specified message.
/// </summary>
/// <param name="system">The system.</param>
/// <param name="address">TBD</param>
/// <param name="transportInformation">The address for the current transport</param>
/// <param name="message">The message.</param>
/// <returns>SerializedMessage.</returns>
public static SerializedMessage Serialize(ExtendedActorSystem system, Address address, object message)
public static SerializedMessage Serialize(ExtendedActorSystem system, Information transportInformation,
object message)
{
var serializer = system.Serialization.FindSerializerFor(message);

var oldInfo = Akka.Serialization.Serialization.CurrentTransportInformation;
try
{
if (oldInfo == null)
Akka.Serialization.Serialization.CurrentTransportInformation =
system.Provider.SerializationInformation;
Akka.Serialization.Serialization.CurrentTransportInformation = transportInformation;

var serializedMsg = new SerializedMessage
{
Expand Down Expand Up @@ -81,4 +81,4 @@ public static SerializedMessage Serialize(ExtendedActorSystem system, Address ad
}
}
}
}
}

0 comments on commit 4ae4792

Please sign in to comment.