diff --git a/src/benchmark/Akka.Benchmarks/Remoting/AkkaPduCodecBenchmark.cs b/src/benchmark/Akka.Benchmarks/Remoting/AkkaPduCodecBenchmark.cs index fc82f619d01..ee6a1b18482 100644 --- a/src/benchmark/Akka.Benchmarks/Remoting/AkkaPduCodecBenchmark.cs +++ b/src/benchmark/Akka.Benchmarks/Remoting/AkkaPduCodecBenchmark.cs @@ -15,6 +15,7 @@ using Akka.Remote; using Akka.Remote.Serialization; using Akka.Remote.Transport; +using Akka.Serialization; using BenchmarkDotNet.Attributes; using BenchmarkDotNet.Loggers; using Google.Protobuf; @@ -40,6 +41,7 @@ public class AkkaPduCodecBenchmark private Address _addr1; private Address _addr2; + private Information _addr2Info; private AkkaPduProtobuffCodec _recvCodec; private AkkaPduProtobuffCodec _sendCodec; @@ -62,6 +64,7 @@ public async Task Setup() _rarp = RARP.For(_sys1).Provider; _addr1 = _rarp.DefaultAddress; _addr2 = RARP.For(_sys2).Provider.DefaultAddress; + _addr2Info = new Information(_addr2, _sys2); _senderActorRef = _sys2.ActorOf(act => { act.ReceiveAny((_, context) => context.Sender.Tell(context.Sender)); }, @@ -188,7 +191,7 @@ public void DeserializePayloadOnly() private ByteString CreatePayloadPdu() { return _sendCodec.ConstructPayload(_sendCodec.ConstructMessage(_remoteReceiveRef.LocalAddressToUse, _remoteReceiveRef, - MessageSerializer.Serialize(_sys2, _addr2, _message), _senderActorRef, null, _lastAck)); + MessageSerializer.Serialize(_sys2, _addr2Info, _message), _senderActorRef, null, _lastAck)); } } } diff --git a/src/core/Akka.Remote.Tests/Serialization/BugFix5062Spec.cs b/src/core/Akka.Remote.Tests/Serialization/BugFix5062Spec.cs index 8c1949ac02e..763d79253ed 100644 --- a/src/core/Akka.Remote.Tests/Serialization/BugFix5062Spec.cs +++ b/src/core/Akka.Remote.Tests/Serialization/BugFix5062Spec.cs @@ -53,7 +53,8 @@ public void Failed_serialization_should_give_proper_exception_message() true); var node1 = new Address("akka.tcp", "Sys", "localhost", 2551); - var serialized = MessageSerializer.Serialize((ExtendedActorSystem)Sys, node1, message); + var info = new Information(node1, Sys); + var serialized = MessageSerializer.Serialize((ExtendedActorSystem)Sys, info, message); var o = new object(); o.Invoking(_ => MessageSerializer.Deserialize((ExtendedActorSystem)Sys, serialized)).Should() diff --git a/src/core/Akka.Remote.Tests/Transport/MultiTransportAddressingSpec.cs b/src/core/Akka.Remote.Tests/Transport/MultiTransportAddressingSpec.cs index 4167eda003b..44fba1bc8ef 100644 --- a/src/core/Akka.Remote.Tests/Transport/MultiTransportAddressingSpec.cs +++ b/src/core/Akka.Remote.Tests/Transport/MultiTransportAddressingSpec.cs @@ -18,9 +18,9 @@ namespace Akka.Remote.Tests.Transport; /// /// Added this spec to prove the existence of https://github.com/akkadotnet/akka.net/issues/7378 /// -public class MultiTransportAddressingSpec : AkkaSpec +public class MultiTransportAddressingSpec : TestKit.Xunit2.TestKit { - public MultiTransportAddressingSpec(ITestOutputHelper output) : base(GetConfig(Sys1Port1, Sys1Port2), output) + public MultiTransportAddressingSpec(ITestOutputHelper output) : base(GetConfig(Sys1Port1, Sys1Port2), "MultiTransportSpec", output) { } @@ -30,7 +30,7 @@ public MultiTransportAddressingSpec(ITestOutputHelper output) : base(GetConfig(S public const int Sys2Port1 = 9993; public const int Sys2Port2 = 9994; - private static Config GetConfig(int transportPort1, int transportPort2) + private static Config GetConfig(int transportPort1, int transportPort2, string actorSystemName = "MultiTransportSpec") { return $$""" @@ -45,7 +45,7 @@ private static Config GetConfig(int transportPort1, int transportPort2) transport-class = "Akka.Remote.Transport.TestTransport, Akka.Remote" applied-adapters = [] registry-key = aX33k0jWKg - local-address = "test1://MultiTransportSpec@localhost:{{transportPort1}}" + local-address = "test1://{{actorSystemName}}@localhost:{{transportPort1}}" maximum-payload-bytes = 32000b scheme-identifier = test1 } @@ -53,7 +53,7 @@ private static Config GetConfig(int transportPort1, int transportPort2) transport-class = "Akka.Remote.Transport.TestTransport, Akka.Remote" applied-adapters = [] registry-key = aX33k0j11c - local-address = "test2://MultiTransportSpec@localhost:{{transportPort2}}" + local-address = "test2://{{actorSystemName}}@localhost:{{transportPort2}}" maximum-payload-bytes = 32000b scheme-identifier = test2 } @@ -67,7 +67,8 @@ private static Config GetConfig(int transportPort1, int transportPort2) [Fact] public async Task Should_Use_Second_Transport_For_Communication() { - var secondSystem = ActorSystem.Create("MultiTransportSpec", GetConfig(Sys2Port1, Sys2Port2).WithFallback(Sys.Settings.Config)); + const string secondActorSystemName = "MultiTransportSpec2"; + var secondSystem = ActorSystem.Create(secondActorSystemName, GetConfig(Sys2Port1, Sys2Port2, secondActorSystemName).WithFallback(Sys.Settings.Config)); InitializeLogger(secondSystem); var assertProbe = CreateTestProbe(secondSystem); @@ -87,11 +88,19 @@ public async Task Should_Use_Second_Transport_For_Communication() Shutdown(secondSystem); } + return; + async Task PingAndVerify(string scheme, int port) { - var selection = Sys.ActorSelection($"akka.{scheme}://MultiTransportSpec@localhost:{port}/user/echo"); - selection.Tell("ping", TestActor); - + var selection = Sys.ActorSelection($"akka.{scheme}://{secondActorSystemName}@localhost:{port}/user/echo"); + + // important: https://github.com/akkadotnet/akka.net/issues/7378 only occurs with IActorRefs + var actor = await selection.ResolveOne(TimeSpan.FromSeconds(1)); + + // assert that the remote actor is using the correct transport + Assert.Contains(scheme, actor.Path.Address.Protocol); + + actor.Tell("ping"); var reply = await ExpectMsgAsync(TimeSpan.FromSeconds(3)); Assert.Equal("pong", reply); diff --git a/src/core/Akka.Remote/Endpoint.cs b/src/core/Akka.Remote/Endpoint.cs index 7c9c7d521fb..3cd9cebc782 100644 --- a/src/core/Akka.Remote/Endpoint.cs +++ b/src/core/Akka.Remote/Endpoint.cs @@ -1043,6 +1043,7 @@ public EndpointWriter( Inbound = handleOrActive != null; _ackDeadline = NewAckDeadline(); _handle = handleOrActive; + _transportInformation = new Information(localAddress, Context.System); _remoteMetrics = RemoteMetricsExtension.Create(Context.System.AsInstanceOf()); if (_handle == null) @@ -1056,6 +1057,7 @@ public EndpointWriter( } private readonly ILoggingAdapter _log = Context.GetLogger(); + private readonly Information _transportInformation; private readonly int? _refuseUid; private readonly AkkaPduCodec _codec; private readonly IActorRef _reliableDeliverySupervisor; @@ -1357,7 +1359,7 @@ private SerializedMessage SerializeMessage(object msg) { throw new EndpointException("Internal error: No handle was present during serialization of outbound message."); } - return MessageSerializer.Serialize(_system, _handle.LocalAddress, msg); + return MessageSerializer.Serialize(_system, _transportInformation, msg); } private int _writeCount = 0; diff --git a/src/core/Akka.Remote/EndpointManager.cs b/src/core/Akka.Remote/EndpointManager.cs index 51b7d14a11a..4f101998288 100644 --- a/src/core/Akka.Remote/EndpointManager.cs +++ b/src/core/Akka.Remote/EndpointManager.cs @@ -549,14 +549,14 @@ Directive Hopeless(HopelessAssociation e) { switch (e) { - case HopelessAssociation h when h.Uid != null: - _log.Error(e.InnerException ?? e, "Association to [{0}] with UID [{1}] is irrecoverably failed. Quarantining address.", h.RemoteAddress, h.Uid); + case HopelessAssociation { Uid: not null }: + _log.Error(e.InnerException ?? e, "Association to [{0}] with UID [{1}] is irrecoverably failed. Quarantining address.", e.RemoteAddress, e.Uid); if (_settings.QuarantineDuration.HasValue && _settings.QuarantineDuration != TimeSpan.MaxValue) { // have a finite quarantine duration specified in settings. // If we don't have one specified, don't bother quarantining - it's disabled. - _endpoints.MarkAsQuarantined(h.RemoteAddress, h.Uid.Value, Deadline.Now + _settings.QuarantineDuration); - _eventPublisher.NotifyListeners(new QuarantinedEvent(h.RemoteAddress, h.Uid.Value)); + _endpoints.MarkAsQuarantined(e.RemoteAddress, e.Uid.Value, Deadline.Now + _settings.QuarantineDuration); + _eventPublisher.NotifyListeners(new QuarantinedEvent(e.RemoteAddress, e.Uid.Value)); } return Directive.Stop; @@ -818,9 +818,6 @@ bool MatchesQuarantine(AkkaProtocolHandle handle) Receive(send => { var recipientAddress = send.Recipient.Path.Address; - IActorRef CreateAndRegisterWritingEndpoint() => _endpoints.RegisterWritableEndpoint(recipientAddress, - CreateEndpoint(recipientAddress, send.Recipient.LocalAddressToUse, _transportMapping[send.Recipient.LocalAddressToUse], - _settings, writing: true, handleOption: null), uid: null); // pattern match won't throw a NullReferenceException if one is returned by WritableEndpointWithPolicyFor switch (_endpoints.WritableEndpointWithPolicyFor(recipientAddress)) @@ -841,6 +838,12 @@ IActorRef CreateAndRegisterWritingEndpoint() => _endpoints.RegisterWritableEndpo CreateAndRegisterWritingEndpoint().Tell(send); break; } + + return; + + IActorRef CreateAndRegisterWritingEndpoint() => _endpoints.RegisterWritableEndpoint(recipientAddress, + CreateEndpoint(recipientAddress, send.Recipient.LocalAddressToUse, _transportMapping[send.Recipient.LocalAddressToUse], + _settings, writing: true, handleOption: null), uid: null); }); Receive(ia => HandleInboundAssociation(ia, false)); Receive(endpoint => AcceptPendingReader(endpoint.Writer)); @@ -894,8 +897,7 @@ IActorRef CreateAndRegisterWritingEndpoint() => _endpoints.RegisterWritableEndpo { if (result.IsFaulted || result.IsCanceled) { - if (result.Exception != null) - result.Exception.Handle(_ => true); + result.Exception?.Handle(_ => true); return false; } return result.Result.All(x => x); diff --git a/src/core/Akka.Remote/MessageSerializer.cs b/src/core/Akka.Remote/MessageSerializer.cs index 277c93cdcf5..81e32676536 100644 --- a/src/core/Akka.Remote/MessageSerializer.cs +++ b/src/core/Akka.Remote/MessageSerializer.cs @@ -27,7 +27,8 @@ internal static class MessageSerializer /// The system. /// The message protocol. /// System.Object. - public static object Deserialize(ExtendedActorSystem system, SerializedMessage messageProtocol) + public static object Deserialize(ExtendedActorSystem system, + SerializedMessage messageProtocol) { return system.Serialization.Deserialize( messageProtocol.Message.ToByteArray(), @@ -39,19 +40,18 @@ public static object Deserialize(ExtendedActorSystem system, SerializedMessage m /// Serializes the specified message. /// /// The system. - /// TBD + /// The address for the current transport /// The message. /// SerializedMessage. - public static SerializedMessage Serialize(ExtendedActorSystem system, Address address, object message) + public static SerializedMessage Serialize(ExtendedActorSystem system, Information transportInformation, + object message) { var serializer = system.Serialization.FindSerializerFor(message); var oldInfo = Akka.Serialization.Serialization.CurrentTransportInformation; try { - if (oldInfo == null) - Akka.Serialization.Serialization.CurrentTransportInformation = - system.Provider.SerializationInformation; + Akka.Serialization.Serialization.CurrentTransportInformation = transportInformation; var serializedMsg = new SerializedMessage { @@ -81,4 +81,4 @@ public static SerializedMessage Serialize(ExtendedActorSystem system, Address ad } } } -} +} \ No newline at end of file