Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

docs: dispose registry which contains rate limiters #2438

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
108 changes: 108 additions & 0 deletions docs/pipelines/resilience-pipeline-registry.md
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,114 @@ Both `AddReloadToken(...)` and `OnPipelineDisposed(...)` are used to implement t

Resource disposal occurs when the registry is disposed of or when the pipeline undergoes changes due to [dynamic reloads](#dynamic-reloads). Upon disposal, all callbacks registered through the `OnPipelineDisposed` method are invoked. However, actual resource disposal is deferred until the pipeline completes all outgoing executions. It's vital to note that dispose callbacks are associated only with a specific instance of the pipeline.

### Disposal of encapsulated rate limiters

If one is using custom rate limiters and wants to dispose them on pipeline reload or when a registry is disposed, then one should use the `OnPipelineDisposed` callback.

Consider the following runnable example. It creates a registry with a concurrency strategy and a chained rate limiter strategy (which contains multiple rate limiters):
```cs
using System;
using System.Threading.RateLimiting;
using Polly;
using Polly.RateLimiting;
using Polly.Registry;


public static class Program
{
public static void Main()
{
using var pra = new PipelineRegistryAdapter();
pra.GetOrCreateResiliencePipeline("Pipeline foo", 1, 10, 100, 1000);
pra.GetOrCreateResiliencePipeline("Pipeline bar", 2, 20, 200, 2000);
}
}


public sealed class PipelineRegistryAdapter : IDisposable
{
private bool _disposed;
private readonly ResiliencePipelineRegistry<string> _resiliencePipelineRegistry = new();


public void Dispose()
{
if (_disposed) return;
_resiliencePipelineRegistry.Dispose();
_disposed = true;
}


private PartitionedRateLimiter<ResilienceContext> CreateConcurrencyLimiter(string partitionKey, int permitLimit) =>
PartitionedRateLimiter.Create<ResilienceContext, string>(context =>
RateLimitPartition.GetConcurrencyLimiter(
partitionKey: partitionKey,
factory: partitionKey => new ConcurrencyLimiterOptions { PermitLimit = permitLimit, QueueLimit = 0 }));


private PartitionedRateLimiter<ResilienceContext> CreateFixedWindowLimiter(string partitionKey, int permitLimit, TimeSpan window) =>
PartitionedRateLimiter.Create<ResilienceContext, string>(context =>
RateLimitPartition.GetFixedWindowLimiter(
partitionKey: partitionKey,
factory: partitionKey => new FixedWindowRateLimiterOptions { PermitLimit = permitLimit, QueueLimit = 0, Window = window }));


public ResiliencePipeline GetOrCreateResiliencePipeline(string partitionKey, int maximumConcurrentThreads, int sendLimitPerSecond, int sendLimitPerHour, int sendLimitPerDay)
{
// return pipeline if exists
if (_resiliencePipelineRegistry.TryGetPipeline(partitionKey, out var pipeline))
return pipeline;

// else create pipeline with multiple strategies
var wasCreated = _resiliencePipelineRegistry.TryAddBuilder(partitionKey, (builder, context) =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggest to take a look at GetOrAddPipeline here. It would make the code simpler.

{
PartitionedRateLimiter<ResilienceContext>? threadLimiter = null;
PartitionedRateLimiter<ResilienceContext>? requestLimiter = null;

// outer strategy: limit threads
builder.AddRateLimiter(new RateLimiterStrategyOptions {
RateLimiter = args => {
threadLimiter = CreateConcurrencyLimiter(partitionKey, maximumConcurrentThreads);
return threadLimiter.AcquireAsync(args.Context, permitCount:1, args.Context.CancellationToken);
}
});

// inner strategy: limit requests (by second, hour, day)
builder.AddRateLimiter(new RateLimiterStrategyOptions {
RateLimiter = args => {
PartitionedRateLimiter<ResilienceContext>[] limiters = [
CreateFixedWindowLimiter(partitionKey, sendLimitPerSecond, TimeSpan.FromSeconds(1)),
CreateFixedWindowLimiter(partitionKey, sendLimitPerHour, TimeSpan.FromHours(1)),
CreateFixedWindowLimiter(partitionKey, sendLimitPerDay, TimeSpan.FromDays(1)),
];
requestLimiter = PartitionedRateLimiter.CreateChained(limiters);
return requestLimiter.AcquireAsync(args.Context, permitCount:1, args.Context.CancellationToken);
}
});

// unlike other strategies, rate limiters disposed manually
context.OnPipelineDisposed(() => {
threadLimiter?.Dispose();
requestLimiter?.Dispose();
Console.WriteLine($"Disposed pipeline '{partitionKey}'");
});

});

if (wasCreated)
Console.WriteLine($"Created pipeline '{partitionKey}'");
else
throw new InvalidOperationException($"Failed to create pipeline '{partitionKey}'");

return _resiliencePipelineRegistry.GetPipeline(partitionKey);
}


}
```

Notice how the rate limiters are disposed manually in the callback.

## Complex registry keys

Though the pipeline registry supports complex keys, we suggest you use them when defining pipelines with the [Dependency Injection](../advanced/dependency-injection.md) (DI) containers. For further information, see the [section on complex pipeline keys](../advanced/dependency-injection.md#complex-pipeline-keys).