Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Internal] Distributed tracing: Adds a sample to collect activities and events using custom listener #4021

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,9 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "CFPullModelLatestVersionMod
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "OpenTelemetry", "OpenTelemetry\OpenTelemetry.csproj", "{C6EF6948-C085-4013-A21F-99303ECBA7A9}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "ApplicationInsights", "ApplicationInsights\ApplicationInsights.csproj", "{55149A3C-A263-4EE5-AD2D-02FE9AC4D291}"
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "ApplicationInsights", "ApplicationInsights\ApplicationInsights.csproj", "{55149A3C-A263-4EE5-AD2D-02FE9AC4D291}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "CustomDiagnosticAndEventListener", "CustomDiagnosticAndEventListener\CustomDiagnosticAndEventListener.csproj", "{9BE3551E-31A1-4186-9D2F-DC325411A39D}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Expand Down Expand Up @@ -165,6 +167,10 @@ Global
{55149A3C-A263-4EE5-AD2D-02FE9AC4D291}.Debug|Any CPU.Build.0 = Debug|Any CPU
{55149A3C-A263-4EE5-AD2D-02FE9AC4D291}.Release|Any CPU.ActiveCfg = Release|Any CPU
{55149A3C-A263-4EE5-AD2D-02FE9AC4D291}.Release|Any CPU.Build.0 = Release|Any CPU
{9BE3551E-31A1-4186-9D2F-DC325411A39D}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{9BE3551E-31A1-4186-9D2F-DC325411A39D}.Debug|Any CPU.Build.0 = Debug|Any CPU
{9BE3551E-31A1-4186-9D2F-DC325411A39D}.Release|Any CPU.ActiveCfg = Release|Any CPU
{9BE3551E-31A1-4186-9D2F-DC325411A39D}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
namespace Sample.Listeners
{
using System.Diagnostics.Tracing;
using System.Diagnostics;
using System.Collections.Concurrent;

/// <summary>
/// This listener can cover following aspects:
/// 1. Write its own monitoring library with the custom implementation of aggregation or whatever you want to do with this data.
/// 2. Support an APM tool which is not open telemetry compliant.
/// </summary>
/// <remarks>It is a simple sample. Anybody can get as creative as they want to make it better in terms of usability and performance.</remarks>
internal class CustomDiagnosticAndEventListener :
EventListener, // Override Event Listener to capture Event source events
IObserver<KeyValuePair<string, object>>, // Override IObserver to capture Activity events
IObserver<DiagnosticListener>,
IDisposable
{
private readonly string diagnosticSourceName;
private readonly string eventSourceName;

private ConcurrentBag<IDisposable>? Subscriptions = new();
private ConcurrentBag<Activity> Activities { get; } = new();

public CustomDiagnosticAndEventListener(string diagnosticSourceName, string eventSourceName)
{
this.diagnosticSourceName = diagnosticSourceName;
this.eventSourceName = eventSourceName;

DiagnosticListener.AllListeners.Subscribe(this);
}

/// <summary>
/// IObserver Override
/// </summary>
public void OnCompleted() {
Console.WriteLine("OnCompleted");
}

/// <summary>
/// IObserver Override
/// </summary>
public void OnError(Exception error) {
Console.WriteLine($"OnError : {error}");
}

/// <summary>
/// IObserver Override
/// </summary>
public void OnNext(KeyValuePair<string, object> value)
{
lock (this.Activities)
{
// Check for disposal
if (this.Subscriptions == null) return;

string startSuffix = ".Start";
string stopSuffix = ".Stop";
string exceptionSuffix = ".Exception";

if (Activity.Current == null)
{
return;
}

if (value.Key.EndsWith(startSuffix))
{
this.Activities.Add(Activity.Current);
}
else if (value.Key.EndsWith(stopSuffix) || value.Key.EndsWith(exceptionSuffix))
{
foreach (Activity activity in this.Activities)
{
if (activity.Id == Activity.Current.Id)
{
Console.WriteLine($" Activity Name: {activity.DisplayName}");
Console.WriteLine($" Activity Operation Name: {activity.OperationName}");
foreach (KeyValuePair<string, string?> actualTag in activity.Tags)
{
Console.WriteLine($" {actualTag.Key} ==> {actualTag.Value}");
}
Console.WriteLine();
return;
}
}
}
}
}

/// <summary>
/// IObserver Override
/// </summary>
public void OnNext(DiagnosticListener value)
{
if (value.Name == this.diagnosticSourceName && this.Subscriptions != null)
{
Console.WriteLine($"CustomDiagnosticAndEventListener : OnNext : {value.Name}");
lock (this.Activities)
{
this.Subscriptions?.Add(value.Subscribe(this));
}
}
}

/// <summary>
/// EventListener Override
/// </summary>
protected override void OnEventSourceCreated(EventSource eventSource)
{
if (eventSource != null && eventSource.Name.Equals(this.eventSourceName))
{
Console.WriteLine($"CustomDiagnosticAndEventListener : OnEventSourceCreated : {eventSource.Name}");
this.EnableEvents(eventSource, EventLevel.Informational); // Enable information level events
}
}

/// <summary>
/// EventListener Override
/// </summary>
protected override void OnEventWritten(EventWrittenEventArgs eventData)
{
Console.WriteLine($" Event Name: {eventData.EventName}");
Console.WriteLine($" Event Level: {eventData.Level}");
if(eventData.Payload != null)
{
int counter = 0;
foreach (object? payload in eventData.Payload)
{
Console.WriteLine($" Event Payload {counter++}: {payload}");
}
}
else
{
Console.WriteLine($" Event Payload: NULL");
}
Console.WriteLine();
}

public override void Dispose()
{
Console.WriteLine("CustomDiagnosticAndEventListener : Dispose");
base.Dispose();

if (this.Subscriptions == null)
{
return;
}

ConcurrentBag<IDisposable> subscriptions;
lock (this.Activities)
{
subscriptions = this.Subscriptions;
this.Subscriptions = null;
}

foreach (IDisposable subscription in subscriptions)
{
subscription.Dispose(); // Dispose of DiagnosticListener subscription
}

foreach (Activity activity in this.Activities)
{
activity.Dispose(); // Dispose of Activity
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>net6.0</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Microsoft.Azure.Cosmos" Version="3.35.2-preview" />
sourabh1007 marked this conversation as resolved.
Show resolved Hide resolved
<PackageReference Include="Microsoft.Extensions.Configuration" Version="3.1.0" />
<PackageReference Include="Microsoft.Extensions.Configuration.Json" Version="3.1.0" />
</ItemGroup>
<ItemGroup>
<None Include="..\AppSettings.json">
<CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory>
</None>
</ItemGroup>
</Project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
namespace Cosmos.Samples.ApplicationInsights
{
using Microsoft.Azure.Cosmos;
using Microsoft.Extensions.Configuration;
using Newtonsoft.Json;
using Sample.Listeners;

internal class Program
{
private static readonly string databaseName = "samples";
private static readonly string containerName = "custom-listener-sample";

static async Task Main()
{
IConfigurationRoot configuration = new ConfigurationBuilder()
.AddJsonFile("AppSettings.json")
.Build();

string endpoint = configuration["EndPointUrl"];
if (string.IsNullOrEmpty(endpoint))
{
throw new ArgumentNullException("Please specify a valid CosmosDBEndPointUrl in the appSettings.json");
}

string authKey = configuration["AuthorizationKey"];
if (string.IsNullOrEmpty(authKey) || string.Equals(authKey, "Super secret key"))
{
throw new ArgumentException("Please specify a valid CosmosDBAuthorizationKey in the appSettings.json");
}

using CustomDiagnosticAndEventListener listener
sourabh1007 marked this conversation as resolved.
Show resolved Hide resolved
= new CustomDiagnosticAndEventListener(
diagnosticSourceName: "Azure.Cosmos.Operation",
eventSourceName: "Azure-Cosmos-Operation-Request-Diagnostics");

CosmosClientOptions options = new CosmosClientOptions()
{
IsDistributedTracingEnabled = true // Defaults to true, set to false to disable
};
using (CosmosClient client = new CosmosClient(endpoint, authKey, options))
{
Console.WriteLine($"Getting container reference for {containerName}.");

ContainerProperties properties = new ContainerProperties(containerName, partitionKeyPath: "/id");

await client.CreateDatabaseIfNotExistsAsync(databaseName);
Container container = await client.GetDatabase(databaseName).CreateContainerIfNotExistsAsync(properties);

await Program.RunCrudDemo(container);
}
}

public static async Task RunCrudDemo(Container container)
{
// Any operations will automatically generate telemetry

for (int i = 1; i <= 5; i++)
{
await container.CreateItemAsync(new Item { Id = $"{i}", Status = "new" }, new PartitionKey($"{i}"));
Console.WriteLine($"Created document with id: {i}");
}

for (int i = 1; i <= 5; i++)
{
await container.ReadItemAsync<Item>($"{i}", new PartitionKey($"{i}"));
Console.WriteLine($"Read document with id: {i}");
}

for (int i = 1; i <= 5; i++)
{
await container.ReplaceItemAsync(new Item { Id = $"{i}", Status = "updated" }, $"{i}", new PartitionKey($"{i}"));
Console.WriteLine($"Updated document with id: {i}");
}

for (int i = 1; i <= 5; i++)
{
await container.DeleteItemAsync<Item>($"{i}", new PartitionKey($"{i}"));
Console.WriteLine($"Deleted document with id: {i}");
}
}
}

internal class Item
{
[JsonProperty("id")]
public string Id { get; set; }

public string Status { get; set; }
}
}
Loading