Skip to content

Commit

Permalink
Added pool for HttpClients to improve performance (see: https://githu…
Browse files Browse the repository at this point in the history
  • Loading branch information
sweetlandj committed Sep 6, 2016
1 parent 5143b20 commit 8a163bd
Show file tree
Hide file tree
Showing 4 changed files with 117 additions and 26 deletions.
4 changes: 2 additions & 2 deletions Source/Platibus.IntegrationTests/LoadTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -78,10 +78,10 @@ public async Task When_Sending_2000_Test_Messages_2000_Replies_Should_Be_Handled

[Test]
[Explicit]
public async Task When_Sending_10000_Test_Messages_10000_Replies_Should_Be_Handled_Within_120s()
public async Task When_Sending_10000_Test_Messages_10000_Replies_Should_Be_Handled_Within_20s()
{
var elapsed = await RunTest(10000);
Assert.That(elapsed, Is.LessThan(TimeSpan.FromSeconds(120)));
Assert.That(elapsed, Is.LessThan(TimeSpan.FromSeconds(20)));
}

private static async Task<TimeSpan> RunTest(int messageCount, bool durable = false)
Expand Down
107 changes: 107 additions & 0 deletions Source/Platibus/Http/HttpClientPool.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
using System;
using System.Collections.Generic;
using System.Net;
using System.Net.Http;
using System.Threading;
using System.Threading.Tasks;

namespace Platibus.Http
{
internal class HttpClientPool
{
private readonly SemaphoreSlim _poolSync = new SemaphoreSlim(1);
private readonly IDictionary<PoolKey, HttpClientHandler> _pool = new Dictionary<PoolKey, HttpClientHandler>();

public async Task<HttpClient> GetClient(Uri uri, IEndpointCredentials credentials, CancellationToken cancellationToken)
{
var key = new PoolKey(uri, credentials);
HttpClientHandler clientHandler;
if (_pool.TryGetValue(key, out clientHandler))
{
return CreateClient(clientHandler, uri);
}

await _poolSync.WaitAsync(cancellationToken);
try
{
if (!_pool.TryGetValue(key, out clientHandler))
{
clientHandler = new HttpClientHandler
{
AllowAutoRedirect = true,
UseProxy = false
};

if (credentials != null)
{
credentials.Accept(new HttpEndpointCredentialsVisitor(clientHandler));
}

_pool[key] = clientHandler;

// Make sure DNS TTL is honored
var sp = ServicePointManager.FindServicePoint(uri);
sp.ConnectionLeaseTimeout = (int)TimeSpan.FromMinutes(5).TotalMilliseconds;
}
}
finally
{
_poolSync.Release();
}

return CreateClient(clientHandler, uri);
}

private HttpClient CreateClient(HttpClientHandler clientHandler, Uri baseAddress)
{
return new HttpClient(clientHandler, false)
{
BaseAddress = baseAddress
};
}

private class PoolKey : IEquatable<PoolKey>
{
private readonly Uri _uri;
private readonly IEndpointCredentials _credentials;

public PoolKey(Uri uri, IEndpointCredentials credentials)
{
_uri = uri;
_credentials = credentials;
}

public bool Equals(PoolKey other)
{
if (ReferenceEquals(null, other)) return false;
if (ReferenceEquals(this, other)) return true;
return Equals(_uri, other._uri) && Equals(_credentials, other._credentials);
}

public override bool Equals(object obj)
{
if (ReferenceEquals(null, obj)) return false;
if (ReferenceEquals(this, obj)) return true;
return obj.GetType() == GetType() && Equals((PoolKey)obj);
}

public override int GetHashCode()
{
unchecked
{
return ((_uri != null ? _uri.GetHashCode() : 0) * 397) ^ (_credentials != null ? _credentials.GetHashCode() : 0);
}
}

public static bool operator ==(PoolKey left, PoolKey right)
{
return Equals(left, right);
}

public static bool operator !=(PoolKey left, PoolKey right)
{
return !Equals(left, right);
}
}
}
}
31 changes: 7 additions & 24 deletions Source/Platibus/Http/HttpTransportService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ public class HttpTransportService : ITransportService, IQueueListener
private readonly Func<Message, CancellationToken, Task> _handleMessage;
private readonly QueueName _outboundQueueName;

private readonly HttpClientPool _clientPool;

/// <summary>
/// Initializes a new <see cref="HttpTransportService"/>
/// </summary>
Expand Down Expand Up @@ -91,6 +93,8 @@ public HttpTransportService(Uri baseUri, IEndpointCollection endpoints, IMessage
_bypassTransportLocalDestination = bypassTransportLocalDestination;
_handleMessage = handleMessage;
_outboundQueueName = "Outbound";

_clientPool = new HttpClientPool();
}

/// <summary>
Expand Down Expand Up @@ -199,28 +203,7 @@ public async Task PublishMessage(Message message, TopicName topicName, Cancellat

await Task.WhenAll(transportTasks);
}

private static HttpClient GetClient(Uri uri, IEndpointCredentials credentials)
{
var clientHandler = new HttpClientHandler
{
AllowAutoRedirect = true,
UseProxy = false
};

if (credentials != null)
{
credentials.Accept(new HttpEndpointCredentialsVisitor(clientHandler));
}

var httpClient = new HttpClient(clientHandler)
{
BaseAddress = uri
};

return httpClient;
}


private async Task TransportMessage(Message message, IEndpointCredentials credentials, CancellationToken cancellationToken = default(CancellationToken))
{
HttpClient httpClient = null;
Expand All @@ -237,7 +220,7 @@ private static HttpClient GetClient(Uri uri, IEndpointCredentials credentials)
var httpContent = new StringContent(message.Content);
WriteHttpContentHeaders(message, httpContent);

httpClient = GetClient(endpointBaseUri, credentials);
httpClient = await _clientPool.GetClient(endpointBaseUri, credentials, cancellationToken);
var messageId = message.Headers.MessageId;
var urlEncodedMessageId = HttpUtility.UrlEncode(messageId);
var relativeUri = string.Format("message/{0}", urlEncodedMessageId);
Expand Down Expand Up @@ -395,7 +378,7 @@ private async Task SendSubscriptionRequest(IEndpoint endpoint, TopicName topic,
try
{
var endpointBaseUri = endpoint.Address.WithTrailingSlash();
httpClient = GetClient(endpointBaseUri, endpoint.Credentials);
httpClient = await _clientPool.GetClient(endpointBaseUri, endpoint.Credentials, cancellationToken);

var urlSafeTopicName = HttpUtility.UrlEncode(topic);
var relativeUri = string.Format("topic/{0}/subscriber?uri={1}", urlSafeTopicName, _baseUri);
Expand Down
1 change: 1 addition & 0 deletions Source/Platibus/Platibus.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@
<Compile Include="Http\AuthenticationSchemeElement.cs" />
<Compile Include="Http\AuthenticationSchemesElementCollection.cs" />
<Compile Include="Http\BasicAuthorizationService.cs" />
<Compile Include="Http\HttpClientPool.cs" />
<Compile Include="ResourceNotFoundException.cs" />
<Compile Include="Security\IAuthorizationService.cs" />
<Compile Include="Http\HttpServerConfigurationManager.cs" />
Expand Down

0 comments on commit 8a163bd

Please sign in to comment.