From 8a163bd705e5a5d9338103b08de68b16d86291b0 Mon Sep 17 00:00:00 2001 From: Jesse Sweetland Date: Tue, 6 Sep 2016 12:19:25 -0500 Subject: [PATCH] Added pool for HttpClients to improve performance (see: https://github.com/mspnp/performance-optimization/blob/master/ImproperInstantiation/docs/ImproperInstantiation.md) --- Source/Platibus.IntegrationTests/LoadTests.cs | 4 +- Source/Platibus/Http/HttpClientPool.cs | 107 ++++++++++++++++++ Source/Platibus/Http/HttpTransportService.cs | 31 ++--- Source/Platibus/Platibus.csproj | 1 + 4 files changed, 117 insertions(+), 26 deletions(-) create mode 100644 Source/Platibus/Http/HttpClientPool.cs diff --git a/Source/Platibus.IntegrationTests/LoadTests.cs b/Source/Platibus.IntegrationTests/LoadTests.cs index bf7c2884..80a7da52 100644 --- a/Source/Platibus.IntegrationTests/LoadTests.cs +++ b/Source/Platibus.IntegrationTests/LoadTests.cs @@ -78,10 +78,10 @@ public async Task When_Sending_2000_Test_Messages_2000_Replies_Should_Be_Handled [Test] [Explicit] - public async Task When_Sending_10000_Test_Messages_10000_Replies_Should_Be_Handled_Within_120s() + public async Task When_Sending_10000_Test_Messages_10000_Replies_Should_Be_Handled_Within_20s() { var elapsed = await RunTest(10000); - Assert.That(elapsed, Is.LessThan(TimeSpan.FromSeconds(120))); + Assert.That(elapsed, Is.LessThan(TimeSpan.FromSeconds(20))); } private static async Task RunTest(int messageCount, bool durable = false) diff --git a/Source/Platibus/Http/HttpClientPool.cs b/Source/Platibus/Http/HttpClientPool.cs new file mode 100644 index 00000000..63863876 --- /dev/null +++ b/Source/Platibus/Http/HttpClientPool.cs @@ -0,0 +1,107 @@ +using System; +using System.Collections.Generic; +using System.Net; +using System.Net.Http; +using System.Threading; +using System.Threading.Tasks; + +namespace Platibus.Http +{ + internal class HttpClientPool + { + private readonly SemaphoreSlim _poolSync = new SemaphoreSlim(1); + private readonly IDictionary _pool = new Dictionary(); + + public async Task GetClient(Uri uri, IEndpointCredentials credentials, CancellationToken cancellationToken) + { + var key = new PoolKey(uri, credentials); + HttpClientHandler clientHandler; + if (_pool.TryGetValue(key, out clientHandler)) + { + return CreateClient(clientHandler, uri); + } + + await _poolSync.WaitAsync(cancellationToken); + try + { + if (!_pool.TryGetValue(key, out clientHandler)) + { + clientHandler = new HttpClientHandler + { + AllowAutoRedirect = true, + UseProxy = false + }; + + if (credentials != null) + { + credentials.Accept(new HttpEndpointCredentialsVisitor(clientHandler)); + } + + _pool[key] = clientHandler; + + // Make sure DNS TTL is honored + var sp = ServicePointManager.FindServicePoint(uri); + sp.ConnectionLeaseTimeout = (int)TimeSpan.FromMinutes(5).TotalMilliseconds; + } + } + finally + { + _poolSync.Release(); + } + + return CreateClient(clientHandler, uri); + } + + private HttpClient CreateClient(HttpClientHandler clientHandler, Uri baseAddress) + { + return new HttpClient(clientHandler, false) + { + BaseAddress = baseAddress + }; + } + + private class PoolKey : IEquatable + { + private readonly Uri _uri; + private readonly IEndpointCredentials _credentials; + + public PoolKey(Uri uri, IEndpointCredentials credentials) + { + _uri = uri; + _credentials = credentials; + } + + public bool Equals(PoolKey other) + { + if (ReferenceEquals(null, other)) return false; + if (ReferenceEquals(this, other)) return true; + return Equals(_uri, other._uri) && Equals(_credentials, other._credentials); + } + + public override bool Equals(object obj) + { + if (ReferenceEquals(null, obj)) return false; + if (ReferenceEquals(this, obj)) return true; + return obj.GetType() == GetType() && Equals((PoolKey)obj); + } + + public override int GetHashCode() + { + unchecked + { + return ((_uri != null ? _uri.GetHashCode() : 0) * 397) ^ (_credentials != null ? _credentials.GetHashCode() : 0); + } + } + + public static bool operator ==(PoolKey left, PoolKey right) + { + return Equals(left, right); + } + + public static bool operator !=(PoolKey left, PoolKey right) + { + return !Equals(left, right); + } + } + } +} diff --git a/Source/Platibus/Http/HttpTransportService.cs b/Source/Platibus/Http/HttpTransportService.cs index 600d8eee..d7d41ea8 100644 --- a/Source/Platibus/Http/HttpTransportService.cs +++ b/Source/Platibus/Http/HttpTransportService.cs @@ -51,6 +51,8 @@ public class HttpTransportService : ITransportService, IQueueListener private readonly Func _handleMessage; private readonly QueueName _outboundQueueName; + private readonly HttpClientPool _clientPool; + /// /// Initializes a new /// @@ -91,6 +93,8 @@ public HttpTransportService(Uri baseUri, IEndpointCollection endpoints, IMessage _bypassTransportLocalDestination = bypassTransportLocalDestination; _handleMessage = handleMessage; _outboundQueueName = "Outbound"; + + _clientPool = new HttpClientPool(); } /// @@ -199,28 +203,7 @@ public async Task PublishMessage(Message message, TopicName topicName, Cancellat await Task.WhenAll(transportTasks); } - - private static HttpClient GetClient(Uri uri, IEndpointCredentials credentials) - { - var clientHandler = new HttpClientHandler - { - AllowAutoRedirect = true, - UseProxy = false - }; - - if (credentials != null) - { - credentials.Accept(new HttpEndpointCredentialsVisitor(clientHandler)); - } - - var httpClient = new HttpClient(clientHandler) - { - BaseAddress = uri - }; - - return httpClient; - } - + private async Task TransportMessage(Message message, IEndpointCredentials credentials, CancellationToken cancellationToken = default(CancellationToken)) { HttpClient httpClient = null; @@ -237,7 +220,7 @@ private static HttpClient GetClient(Uri uri, IEndpointCredentials credentials) var httpContent = new StringContent(message.Content); WriteHttpContentHeaders(message, httpContent); - httpClient = GetClient(endpointBaseUri, credentials); + httpClient = await _clientPool.GetClient(endpointBaseUri, credentials, cancellationToken); var messageId = message.Headers.MessageId; var urlEncodedMessageId = HttpUtility.UrlEncode(messageId); var relativeUri = string.Format("message/{0}", urlEncodedMessageId); @@ -395,7 +378,7 @@ private async Task SendSubscriptionRequest(IEndpoint endpoint, TopicName topic, try { var endpointBaseUri = endpoint.Address.WithTrailingSlash(); - httpClient = GetClient(endpointBaseUri, endpoint.Credentials); + httpClient = await _clientPool.GetClient(endpointBaseUri, endpoint.Credentials, cancellationToken); var urlSafeTopicName = HttpUtility.UrlEncode(topic); var relativeUri = string.Format("topic/{0}/subscriber?uri={1}", urlSafeTopicName, _baseUri); diff --git a/Source/Platibus/Platibus.csproj b/Source/Platibus/Platibus.csproj index 2bad521a..d0f27fa4 100644 --- a/Source/Platibus/Platibus.csproj +++ b/Source/Platibus/Platibus.csproj @@ -94,6 +94,7 @@ +