Skip to content

Commit

Permalink
Merge pull request #33 from ahdde/carbon_http
Browse files Browse the repository at this point in the history
add support for go-carbon http receiver
  • Loading branch information
zivillian authored Nov 15, 2022
2 parents d446657 + 3255a95 commit efab8d9
Show file tree
Hide file tree
Showing 5 changed files with 167 additions and 68 deletions.
10 changes: 5 additions & 5 deletions Graphite.Test/Graphite.Test.csproj
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFramework>netcoreapp2.1</TargetFramework>
<TargetFramework>net6.0</TargetFramework>
<IsPackable>false</IsPackable>
<RootNamespace>ahd.Graphite.Test</RootNamespace>
<AssemblyName>ahd.Graphite.Test</AssemblyName>
Expand All @@ -20,10 +20,10 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="16.4.0" />
<PackageReference Include="System.Text.Json" Version="5.0.2" />
<PackageReference Include="xunit" Version="2.4.1" />
<PackageReference Include="xunit.runner.visualstudio" Version="2.4.1">
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.4.0" />
<PackageReference Include="System.Text.Json" Version="6.0.7" />
<PackageReference Include="xunit" Version="2.4.2" />
<PackageReference Include="xunit.runner.visualstudio" Version="2.4.5">
<PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
</PackageReference>
Expand Down
80 changes: 80 additions & 0 deletions Graphite/AbstractCarbonClient.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;

namespace ahd.Graphite
{
/// <summary>
/// Base class for clients for submitting data to carbon
/// </summary>
public abstract class AbstractCarbonClient
{
/// <summary>
/// Send a single datapoint
/// </summary>
/// <param name="series">metric path</param>
/// <param name="value">metric value</param>
/// <param name="cancellationToken">The token to monitor for cancellation requests. The default value is <see cref="CancellationToken.None"/>.</param>
/// <returns></returns>
public Task SendAsync(string series, double value, CancellationToken cancellationToken = default(CancellationToken))
{
return SendAsync(series, value, DateTime.Now, cancellationToken);
}

/// <summary>
/// Send a single datapoint
/// </summary>
/// <param name="series">metric path</param>
/// <param name="value">metric value</param>
/// <param name="timestamp">metric timestamp</param>
/// <param name="cancellationToken">The token to monitor for cancellation requests. The default value is <see cref="CancellationToken.None"/>.</param>
/// <returns></returns>
public Task SendAsync(string series, double value, DateTime timestamp, CancellationToken cancellationToken = default(CancellationToken))
{
return SendAsync(new []{new Datapoint(series, value, timestamp)}, cancellationToken);
}

/// <summary>
/// Send a list of datapoints
/// </summary>
/// <param name="datapoints"></param>
/// <returns></returns>
public Task SendAsync(params Datapoint[] datapoints)
{
return SendAsync(datapoints, CancellationToken.None);
}

/// <summary>
/// Send a list of datapoints
/// </summary>
/// <param name="cancellationToken">The token to monitor for cancellation requests. The default value is <see cref="CancellationToken.None"/>.</param>
/// <param name="datapoints"></param>
/// <returns></returns>
public Task SendAsync(CancellationToken cancellationToken, params Datapoint[] datapoints)
{
return SendAsync(datapoints, cancellationToken);
}

/// <summary>
/// Send a list of datapoints
/// </summary>
/// <param name="datapoints"></param>
/// <param name="cancellationToken">The token to monitor for cancellation requests.</param>
/// <returns></returns>
public Task SendAsync(Datapoint[] datapoints, CancellationToken cancellationToken)
{
ICollection<Datapoint> points = datapoints;
return SendAsync(points, cancellationToken);
}

/// <summary>
/// Send a list of datapoints
/// </summary>
/// <param name="datapoints"></param>
/// <param name="cancellationToken">The token to monitor for cancellation requests. The default value is <see cref="CancellationToken.None"/>.</param>
/// <returns></returns>
public abstract Task SendAsync(ICollection<Datapoint> datapoints, CancellationToken cancellationToken = default(CancellationToken));

}
}
63 changes: 3 additions & 60 deletions Graphite/CarbonClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ namespace ahd.Graphite
/// <summary>
/// Client for submitting data to carbon
/// </summary>
public class CarbonClient
public class CarbonClient:AbstractCarbonClient
{
private readonly CarbonConnectionPool _carbonPool;

Expand All @@ -27,6 +27,7 @@ public CarbonClient():this("localhost")
public CarbonClient(string host):this(host, new PlaintextGraphiteFormatter())
{
}

/// <summary>
/// Creates a client with the specified host and formatter
/// </summary>
Expand Down Expand Up @@ -63,72 +64,14 @@ public CarbonClient(string host, IGraphiteFormatter formatter)
/// Use ip dual stack for sending metrics. Defaults to true.
/// </summary>
public bool UseDualStack { get; set; }

/// <summary>
/// Send a single datapoint
/// </summary>
/// <param name="series">metric path</param>
/// <param name="value">metric value</param>
/// <param name="cancellationToken">The token to monitor for cancellation requests. The default value is <see cref="CancellationToken.None"/>.</param>
/// <returns></returns>
public Task SendAsync(string series, double value, CancellationToken cancellationToken = default(CancellationToken))
{
return SendAsync(series, value, DateTime.Now, cancellationToken);
}

/// <summary>
/// Send a single datapoint
/// </summary>
/// <param name="series">metric path</param>
/// <param name="value">metric value</param>
/// <param name="timestamp">metric timestamp</param>
/// <param name="cancellationToken">The token to monitor for cancellation requests. The default value is <see cref="CancellationToken.None"/>.</param>
/// <returns></returns>
public Task SendAsync(string series, double value, DateTime timestamp, CancellationToken cancellationToken = default(CancellationToken))
{
return SendAsync(new []{new Datapoint(series, value, timestamp)}, cancellationToken);
}

/// <summary>
/// Send a list of datapoints in up to <see cref="BatchSize"/> batches
/// </summary>
/// <param name="datapoints"></param>
/// <returns></returns>
public Task SendAsync(params Datapoint[] datapoints)
{
return SendAsync(datapoints, CancellationToken.None);
}

/// <summary>
/// Send a list of datapoints in up to <see cref="BatchSize"/> batches
/// </summary>
/// <param name="cancellationToken">The token to monitor for cancellation requests. The default value is <see cref="CancellationToken.None"/>.</param>
/// <param name="datapoints"></param>
/// <returns></returns>
public Task SendAsync(CancellationToken cancellationToken, params Datapoint[] datapoints)
{
return SendAsync(datapoints, cancellationToken);
}

/// <summary>
/// Send a list of datapoints in up to <see cref="BatchSize"/> batches
/// </summary>
/// <param name="datapoints"></param>
/// <param name="cancellationToken">The token to monitor for cancellation requests.</param>
/// <returns></returns>
public Task SendAsync(Datapoint[] datapoints, CancellationToken cancellationToken)
{
ICollection<Datapoint> points = datapoints;
return SendAsync(points, cancellationToken);
}

/// <summary>
/// Send a list of datapoints in up to <see cref="BatchSize"/> batches
/// </summary>
/// <param name="datapoints"></param>
/// <param name="cancellationToken">The token to monitor for cancellation requests. The default value is <see cref="CancellationToken.None"/>.</param>
/// <returns></returns>
public async Task SendAsync(ICollection<Datapoint> datapoints, CancellationToken cancellationToken = default(CancellationToken))
public override async Task SendAsync(ICollection<Datapoint> datapoints, CancellationToken cancellationToken = default(CancellationToken))
{
if (datapoints == null || datapoints.Count == 0) throw new ArgumentNullException(nameof(datapoints));
var batches = GetBatches(datapoints);
Expand Down
76 changes: 76 additions & 0 deletions Graphite/CarbonHttpClient.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Net.Http;
using System.Net.Http.Headers;
using System.Threading;
using System.Threading.Tasks;
using ahd.Graphite.Exceptions;
using Razorvine.Pickle;

namespace ahd.Graphite
{
/// <summary>
/// Client for submitting data to carbon via HTTP using the pickle protocol
/// </summary>
public class CarbonHttpClient:AbstractCarbonClient
{
private readonly HttpClient _client;

/// <summary>
/// Creates a client for localhost:2007
/// </summary>
public CarbonHttpClient():this("http://localhost:2007")
{
}

/// <summary>
/// Creates a client with the specified endpoint
/// </summary>
/// <param name="baseAddress">carbon http endpoint</param>
public CarbonHttpClient(string baseAddress):this(new Uri(baseAddress))
{
}

/// <summary>
/// Creates a client with the specified endpoint
/// </summary>
/// <param name="baseAddress">carbon http endpoint</param>
public CarbonHttpClient(Uri baseAddress):this(new HttpClient{BaseAddress = baseAddress})
{
}

/// <summary>
/// Creates a client using the supplied http client
/// </summary>
/// <param name="client">preconfigured http client</param>
public CarbonHttpClient(HttpClient client)
{
_client = client;
}

/// <inheritdoc />
public override async Task SendAsync(ICollection<Datapoint> datapoints, CancellationToken cancellationToken = default(CancellationToken))
{
var response = await _client.PostAsync("/", Serialize(datapoints), cancellationToken).ConfigureAwait(false);
await response.EnsureSuccessStatusCodeAsync().ConfigureAwait(false);
}

private HttpContent Serialize(ICollection<Datapoint> datapoints)
{
using (var pickler = new Pickler())
{
var data = datapoints.Select(x => new object[] { x.Series, new object[] { x.UnixTimestamp, x.Value } });
var pickled = pickler.dumps(data);
return new ByteArrayContent(pickled)
{
Headers =
{
ContentType = new MediaTypeHeaderValue("application/python-pickle")
}
};
}
}
}
}
6 changes: 3 additions & 3 deletions Graphite/Graphite.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,9 @@

<ItemGroup>
<PackageReference Include="Microsoft.SourceLink.GitHub" Version="1.0.0" PrivateAssets="All" />
<PackageReference Include="Microsoft.Extensions.ObjectPool" Version="5.0.8" />
<PackageReference Include="Razorvine.Pickle" Version="1.1.0" />
<PackageReference Include="System.Text.Json" Version="5.0.2" />
<PackageReference Include="Microsoft.Extensions.ObjectPool" Version="6.0.11" />
<PackageReference Include="Razorvine.Pickle" Version="1.2.1" />
<PackageReference Include="System.Text.Json" Version="6.0.7" />
</ItemGroup>

<ItemGroup>
Expand Down

0 comments on commit efab8d9

Please sign in to comment.