Skip to content

Commit

Permalink
Merge pull request #25 from ahdde/connection_pooling
Browse files Browse the repository at this point in the history
Carbon connection pooling
  • Loading branch information
zivillian authored Jun 10, 2020
2 parents 4ad75b1 + c51eba9 commit ab4db53
Show file tree
Hide file tree
Showing 8 changed files with 383 additions and 41 deletions.
110 changes: 110 additions & 0 deletions Graphite.Test/GraphiteClientTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
using System.Net;
using System.Net.Sockets;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using ahd.Graphite.Base;
using Newtonsoft.Json;
Expand Down Expand Up @@ -73,12 +74,89 @@ public async Task CanSendMetric()
var recvTask = ReceiveMetric(server);
var sendTask = client.SendAsync("usage.unittest.cpu.count", Environment.ProcessorCount);
await sendTask;
new CarbonConnectionPool("localhost", new PickleGraphiteFormatter(33225)).ClearPool();
var metric = await recvTask;
Assert.Contains("usage.unittest.cpu.count", metric);
server.Stop();
Console.WriteLine(metric);
}

[Fact]
public async Task CanReusePooledConnectionMetric()
{
var server = new TcpListener(new IPEndPoint(IPAddress.Loopback, 33225));
server.Start();
var client = new GraphiteClient("localhost", new PlaintextGraphiteFormatter(33225));
var recvTask = ReceiveMetric(server);
await client.SendAsync("usage.unittest.cpu.count", Environment.ProcessorCount);

client = new GraphiteClient("localhost", new PlaintextGraphiteFormatter(33225));
await client.SendAsync("usage.unittest.ram.count", Environment.ProcessorCount);
//"kill" all existing tcp connections
new CarbonConnectionPool("localhost", new PickleGraphiteFormatter(33225)).ClearPool();

var metric = await recvTask;
Assert.Contains("usage.unittest.cpu.count", metric);
Assert.Contains("usage.unittest.ram.count", metric);
server.Stop();
Console.WriteLine(metric);
}

[Fact]
public async Task BrokenPooledConnectionIsDetected()
{
var server = new TcpListener(new IPEndPoint(IPAddress.Loopback, 33225));
server.Start();
var client = new GraphiteClient("localhost", new PlaintextGraphiteFormatter(33225));

var send = client.SendAsync("usage.unittest.cpu.count", Environment.ProcessorCount);
//accept and dispose client connection
using (var conn = await server.AcceptTcpClientAsync())
using (var stream = conn.GetStream())
using (var reader = new StreamReader(stream))
{
var receive =reader.ReadLineAsync();
await send;
await receive;
}
client = new GraphiteClient("localhost", new PlaintextGraphiteFormatter(33225));
var recvTask = ReceiveMetric(server);
await client.SendAsync("usage.unittest.cpu.count", Environment.ProcessorCount);
new CarbonConnectionPool("localhost", new PickleGraphiteFormatter(33225)).ClearPool();

var metric = await recvTask;
Assert.Contains("usage.unittest.cpu.count", metric);
server.Stop();
Console.WriteLine(metric);
}

[Fact]
public async Task BrokenPooledConnectionIsDetectedForPickle()
{
var server = new TcpListener(new IPEndPoint(IPAddress.Loopback, 33225));
server.Start();
var client = new GraphiteClient("localhost", new PickleGraphiteFormatter(33225));

var send = client.SendAsync("usage.unittest.cpu.count", Environment.ProcessorCount);
//accept and dispose client connection
using (var conn = await server.AcceptTcpClientAsync())
using (var stream = conn.GetStream())
using (var reader = new StreamReader(stream))
{
var receive =reader.ReadAsync(new char[1], 0, 1);
await send;
await receive;
}
client = new GraphiteClient("localhost", new PickleGraphiteFormatter(33225));
var recvTask = ReceiveMetric(server);
await client.SendAsync("usage.unittest.cpu.count", Environment.ProcessorCount);
new CarbonConnectionPool("localhost", new PickleGraphiteFormatter(33225)).ClearPool();

var metric = await recvTask;
server.Stop();
Console.WriteLine(metric);
}

[Fact]
public void DataPointConversion()
{
Expand Down Expand Up @@ -132,6 +210,38 @@ public async Task CanSendToV4OnlyHost()
client.Send("usage.unittest.cpu.count", 1);
}

[Fact]
[Trait("Category", "Integration")]
public void CanSendManyMetrics()
{
var random = new Random();
while (random.Next() > 0)
{
var client = new GraphiteClient(GraphiteHost, new PickleGraphiteFormatter());
client.Send("test.client.random1", random.NextDouble() * 100);
client = new GraphiteClient(GraphiteHost, new PlaintextGraphiteFormatter());
client.Send("test.client.random2", random.NextDouble() * 100);
Thread.Sleep(500);
}
CarbonConnectionPool.ClearAllPools();
}

[Fact]
[Trait("Category", "Integration")]
public async Task CanSendManyMetricsAsync()
{
var random = new Random();
while (random.Next() > 0)
{
var client = new GraphiteClient(GraphiteHost, new PickleGraphiteFormatter());
await client.SendAsync("test.client.random1", random.NextDouble() * 100);
client = new GraphiteClient(GraphiteHost, new PlaintextGraphiteFormatter());
await client.SendAsync("test.client.random2", random.NextDouble() * 100);
await Task.Delay(500);
}
CarbonConnectionPool.ClearAllPools();
}

class TestGraphiteFormatter : PlaintextGraphiteFormatter
{
public TestGraphiteFormatter(ushort port):base(port)
Expand Down
184 changes: 184 additions & 0 deletions Graphite/CarbonConnectionPool.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
using System;
using System.Collections.Concurrent;
using System.IO;
using System.Net.Sockets;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.ObjectPool;

namespace ahd.Graphite
{
/// <summary>
/// Connection pool for TCP connections to carbon
/// </summary>
public class CarbonConnectionPool
{
private readonly string _hostname;
private readonly IGraphiteFormatter _formatter;
private static readonly ConcurrentDictionary<(string,ushort), ObjectPool<TcpClient>> Cache = new ConcurrentDictionary<(string,ushort), ObjectPool<TcpClient>>();
private readonly ObjectPool<TcpClient> _pool;

/// <summary>
/// Create or reuse a connection pool for the specified endpoint. If there is already an existing connection pool for the specified endpoint it is reused.
/// </summary>
/// <param name="hostname">Graphite hostname</param>
/// <param name="formatter">formatter for sending data to graphite</param>
public CarbonConnectionPool(string hostname, IGraphiteFormatter formatter)
{
_hostname = hostname;
_formatter = formatter;
_pool = Cache.GetOrAdd((hostname, _formatter.Port), CreatePool);
}

/// <summary>
/// clears all connections from all endpoints
/// </summary>
public static void ClearAllPools()
{
foreach (var entry in Cache)
{
if (Cache.TryRemove(entry.Key, out var pool))
{
var disposable = pool as IDisposable;
disposable?.Dispose();
}
}
}

/// <summary>
/// clears all connections from the current pool
/// </summary>
public void ClearPool()
{
Cache.TryRemove((_hostname, _formatter.Port), out _);
var disposable = _pool as IDisposable;
disposable?.Dispose();
}

/// <summary>
/// returns a pooled connection or creates a new connection
/// <remarks>the connection is tested before returning to the caller</remarks>
/// </summary>
/// <returns></returns>
public TcpClient Get()
{
var client = _pool.Get();
if (!client.Connected)
{
client.Dispose();
client = new TcpClient(_hostname, _formatter.Port);
}
else
{
if (!Test(client))
return Get();
}
return client;
}

/// <summary>
/// returns the connection to the pool
/// </summary>
/// <param name="client">the connection to return to the pool</param>
public void Return(TcpClient client)
{
_pool.Return(client);
}

private bool Test(TcpClient client)
{
try
{
_formatter.TestConnection(client.GetStream());
return true;
}
catch (IOException ex) when (ex.InnerException is SocketException se && CheckSocketException(se))
{
client.Dispose();
return false;
}
}

/// <summary>
/// returns a pooled connection or creates a new connection
/// <remarks>the connection is tested before returning to the caller</remarks>
/// </summary>
/// <param name="useDualStack">Use ip dual stack for sending metrics</param>
/// <param name="cancellationToken">The token to monitor for cancellation requests. The default value is <see cref="CancellationToken.None"/>.</param>
/// <returns></returns>
public async Task<TcpClient> GetAsync(bool useDualStack, CancellationToken cancellationToken)
{
var client = _pool.Get();
if (!client.Connected)
{
client.Dispose();
if (useDualStack)
{
client = new TcpClient(AddressFamily.InterNetworkV6) {Client = {DualMode = true}};
}
else
{
client = new TcpClient(AddressFamily.InterNetwork);
}
cancellationToken.ThrowIfCancellationRequested();
await client.ConnectAsync(_hostname, _formatter.Port).ConfigureAwait(false);
}
else
{
if (!await TestAsync(client, cancellationToken).ConfigureAwait(false))
return await GetAsync(useDualStack, cancellationToken).ConfigureAwait(false);
}
return client;
}

private async Task<bool> TestAsync(TcpClient client, CancellationToken cancellationToken)
{
try
{
await _formatter.TestConnectionAsync(client.GetStream(), cancellationToken).ConfigureAwait(false);
return true;
}
catch (IOException ex) when (ex.InnerException is SocketException se && CheckSocketException(se))
{
client.Dispose();
return false;
}
}

private bool CheckSocketException(SocketException ex)
{
if (ex is null) return false;
switch (ex.SocketErrorCode)
{
case SocketError.ConnectionAborted:
case SocketError.ConnectionReset:
case SocketError.Disconnecting:
case SocketError.NetworkReset:
case SocketError.NotConnected:
case SocketError.OperationAborted:
case SocketError.Shutdown:
return true;
default:
return false;
}
}

private ObjectPool<TcpClient> CreatePool((string, ushort) endpoint)
{
return new DefaultObjectPoolProvider().Create(new TcpClientPolicy());
}

private class TcpClientPolicy : IPooledObjectPolicy<TcpClient>
{
public TcpClient Create()
{
return new TcpClient();
}

public bool Return(TcpClient obj)
{
return obj.Connected;
}
}
}
}
3 changes: 2 additions & 1 deletion Graphite/Graphite.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<RootNamespace>ahd.Graphite</RootNamespace>
<AssemblyName>ahd.Graphite</AssemblyName>
<Title>Graphite client</Title>
<Version>2.2.1</Version>
<Version>2.3.0</Version>
<Authors>ahd GmbH &amp; Co. KG</Authors>
<Company>ahd GmbH &amp; Co. KG</Company>
<GeneratePackageOnBuild>false</GeneratePackageOnBuild>
Expand All @@ -32,6 +32,7 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Microsoft.Extensions.ObjectPool" Version="3.1.4" />
<PackageReference Include="Newtonsoft.Json" Version="12.0.3" />
<PackageReference Include="Razorvine.Pickle" Version="1.0.0" />
</ItemGroup>
Expand Down
Loading

0 comments on commit ab4db53

Please sign in to comment.