New features
Activation repartitioning
ActivationRepartitioning.mp4
Above: a demonstration showing Activation Repartitioning in action. The red lines represent cross-silo communication. As the red lines are eliminated by the partitioning algorithm, throughput improves to over 2x the initial throughput.
Ledjon Behluli and @ReubenBond implemented activation repartitioning in #8877. When enabled, activation repartitioning collocates grains based on observed communication patterns to improve performance while keeping load balanced across your cluster. In initial benchmarks, we observe throughput improvements in the range of 30% to 110%. The following paragraphs provide more background and implementation details for those who are interested. The feature is currently experimental and to enable it you need to opt-in on every silo in your cluster using the ISiloBuilder.AddActivationRepartitioner()
extension method, suppressing the experimental feature warning:
#pragma warning disable ORLEANSEXP001 // Type is for evaluation purposes only and is subject to change or removal in future updates. Suppress this diagnostic to proceed.
siloBuilder.AddActivationRepartitioner();
#pragma warning restore ORLEANSEXP001 // Type is for evaluation purposes only and is subject to change or removal in future updates. Suppress this diagnostic to proceed.
The fastest and cheapest grains calls are ones which don't cross process boundaries. These grain calls do not need to be serialized and do not need to incur network transmission costs. For that reason, collocating related grains within the same host can significantly improve the performance of your application. On the other hand, if all grains were placed in a single host, that host may become overloaded and crash, and you would not be able to scale your application across multiple hosts. How can we maximize collocation of related grains while keeping load across your hosts balanced? Before describing our solution, we need to provide some background.
Grain placement in Orleans is flexible: Orleans executes a user-defined function when deciding where in a cluster to place each grain, providing your function with a list of the compatible silos in your cluster, that is, the silos which support the grain type and interface version which triggered placement. Grains calls are location-transparent, so callers do not need to know where a grain is located, allowing grains to be placed anywhere across your cluster of hosts. Each grain's current location is stored in a distributed directory and lookups to the directory are cached for performance.
Resource-optimized placement was implemented by @ledjon-behluli in #8815. Resource-optimized placement uses runtime statistics such as total and available memory, CPU usage, and grain count, collected from all hosts in the cluster, smooths them, and combines them to calculate a load score. It selects the least-loaded silo from a subset of hosts to balance load evenly across the cluster1. If the load score of the local silo is within some configured range of the best candidate's load score, the local silo is chosen preferentially. This improves grain locality by leveraging the knowledge that the local silo initiated a call to the grain and therefore has some relation to that grain.
Ledjon wrote more about Resource-optimized placement in this blog post.
Originally, there was no straightforward way to move an active grain from one host to another without needing to fully deactivate the grain, unregister it from the grain directory, contend with concurrent callers on where to place the new activation, and reload its state from the database when the new activation is created. Live grain migration was introduced in #8452, allowing grains to transparently migrate from one silo to another on-demand without needing to reload state from the database, and without affecting pending requests. Live grain migration introduced two new lifecycle stages: dehydration and rehydration. The grain's in-memory state (application state, enqueued messages, metadata) is dehydrated into a migration packet which is sent to the destination silo where it's rehydrated. Live grain migration provided the mechanism for grains to migrate across hosts, but did not provide any out-of-the-box policies to automate migration. Users trigger grain migration by calling this.MigrateOnIdle()
from within a grain, optionally providing a placement hint which the grain's configured placement director can use to select a destination host for the grain activation.
Finally, we have the pieces in place for activation repartitioning: grain activations are load-balanced across the cluster, and they are able to migrate from host to host quickly. While live grain migration gives developers a mechanism to migrate grain activations from one host to another, it does not provide any automated policy to do so. Remember, we want grains to be balanced across the cluster and collocated with related grains to reduce networking and serialization cost. This is a difficult challenge since:
- An application can have millions of in-memory grains spread across tens or hundreds of silos.
- Each grain can message any other grain.
- The set of grains which each grain communicates with can change from minute to minute. For example, in an online game, player grains may join one match and communicate with each other for some time and then join a different match with an entirely different set of players afterwards.
- Computing the minimum edge-cut for an arbitrary graph is NP-hard.
- No single host has full knowledge of which grains are hosted on which other host and which grains they communicate with: the graph is distributed across the cluster and changes dynamically.
- Storing the entire communication graph in memory could be prohibitively expensive.
Folks at Microsoft Research studied this problem and proposed a solution in a paper titled Optimizing Distributed Actor Systems for Dynamic Interactive Services. The paper, dubbed ActOp, proposes a decentralized approximate solution which achieves good results in their benchmarks. Their implementation was never merged into Orleans and we were unable to find the original implementation on Microsoft's internal network. So, after first implementing resource-optimized placement, community contributor @ledjon-behluli set out to implement activation repartitioning from scratch based on the ActOp paper. The following paragraphs describe the algorithm and the enhancements we made along the way.
The activation repartitioning algorithm involves pair-wise exchange of grains between two hosts at a time. Silos compute a candidate set of grains to send to a peer, then the peer does similarly, and uses a greedy algorithm to determine a final exchange set which minimizes cost while keeping silos balanced.
To compute the candidate sets, silos track which grains communicate with which other grains and how frequently. The whole graph would be unwieldy, so we only maintain the top-K communication edges using a variant of the Space-Saving2 algorithm. Messages are sampled via a multi-producer, single consumer ring buffer which drops messages if the partition is full. They are then processed by a single thread, which yields frequently to give other threads CPU time. When the distribution has low skew and the K parameter is fairly small, Space-Saving can require a lot of costly shuffling at the bottom of its max-heap (we use the heap variant to reduce memory). To address this, we use Filtered Space-Saving3 instead of Space-Saving. Filtered Space-Saving involves putting a 'sketch' data structure at the bottom of the max heap for the lower end of the distribution, which can greatly reduce churn at the bottom and improve performance by up to ~2x in our tests.
If the top-K communication edges are all internal (eg, because the algorithm has already optimized partitioning somewhat), silos won't find many good transfer candidates. We need to track internal edges to work out which grains should/shouldn't be transferred (cost vs benefit). To address this, we introduced a bloom filter to track grains where the cost of movement is greater than the benefit, removing them from the top-K data structure. From our experiments, this works very well with even a 10x smaller K. This performance improvement will come with a reduced ability to handle dynamic graphs, so in the future we may need to implement a decay strategy to address this as the bloom filter becomes saturated. To improve lookup performance, @ledjon-behluli implemented a blocked bloom filter4, which is used instead of a classic bloom filter.
Enhancements to grain timers
Orleans v8.2.0 introduces a new API, RegisterGrainTimer
, for managing grain timers. For compatibility, the existing RegisterTimer
API is still available, but it is marked [Obsolete]
and developers should migrate to the new grain timer API, RegisterGrainTimer
.
Grain timers have been a common source of confusion for new and experienced developers because grain timer callbacks can execute concurrently with other grain calls, rather than being executed one-by-one like grain calls are. That is, timer callbacks are interleaving. With v8.2.0, this issue can be avoided by using the new RegisterGrainTimer
API. The new API has the following advantages:
- Grain timers can be updated using the
Change(TimeSpan, TimeSpan)
method on the returnedIGrainTimer
instance. - Callbacks do not interleave by default. Interleaving can be enabled by setting
Interleave
totrue
onGrainTimerCreationOptions
. - Callbacks can keep the grain active, preventing it from being collected if the timer period is relatively short. This can be enabled by setting
KeepAlive
totrue
onGrainTimerCreationOptions
. - Callbacks can receive a
CancellationToken
which is canceled when the timer is disposed or the grain starts to deactivate. - Callbacks can dispose the grain timer which fired them.
- Callbacks are now subject to grain call filters.
- Callbacks are visible in distributed tracing, when distributed tracing is enabled.
- POCO grains (grain classes which do not inherit from
Grain
) can register grain timers using theRegisterGrainTimer
extension method.
The core API is as-follows:
public static IGrainTimer RegisterGrainTimer<TState>(this IGrainBase grain, Func<TState, CancellationToken, Task> callback, TState state, GrainTimerCreationOptions options);
There are various overloads for convenience:
public static IGrainTimer RegisterGrainTimer<TState>(this IGrainBase grain, Func<TState, CancellationToken, Task> callback, TState state, GrainTimerCreationOptions options);
public static IGrainTimer RegisterGrainTimer(this IGrainBase grain, Func<CancellationToken, Task> callback, GrainTimerCreationOptions options);
public static IGrainTimer RegisterGrainTimer(this IGrainBase grain, Func<Task> callback, GrainTimerCreationOptions options);
public static IGrainTimer RegisterGrainTimer<TState>(this IGrainBase grain, Func<TState, Task> callback, TState state, GrainTimerCreationOptions options);
public static IGrainTimer RegisterGrainTimer(this IGrainBase grain, Func<Task> callback, TimeSpan dueTime, TimeSpan period);
public static IGrainTimer RegisterGrainTimer(this IGrainBase grain, Func<CancellationToken, Task> callback, TimeSpan dueTime, TimeSpan period);
public static IGrainTimer RegisterGrainTimer<TState>(this IGrainBase grain, Func<TState, Task> callback, TState state, TimeSpan dueTime, TimeSpan period);
public static IGrainTimer RegisterGrainTimer<TState>(this IGrainBase grain, Func<TState, CancellationToken, Task> callback, TState state, TimeSpan dueTime, TimeSpan period);
The RegisterGrainTimer
API returns instances of IGrainTimer
instead of IDisposable
:
/// <summary>
/// Represents a timer belonging to a grain.
/// </summary>
public interface IGrainTimer : IDisposable
{
/// <summary>Changes the start time and the interval between method invocations for a timer, using <see cref="TimeSpan"/> values to measure time intervals.</summary>
/// <param name="dueTime">
/// A <see cref="TimeSpan"/> representing the amount of time to delay before invoking the callback method specified when the <see cref="IGrainTimer"/> was constructed.
/// Specify <see cref="Timeout.InfiniteTimeSpan"/> to prevent the timer from restarting.
/// Specify <see cref="TimeSpan.Zero"/> to restart the timer immediately.
/// </param>
/// <param name="period">
/// The time interval between invocations of the callback method specified when the timer was constructed.
/// Specify <see cref="Timeout.InfiniteTimeSpan"/> to disable periodic signaling.
/// </param>
/// <exception cref="ArgumentOutOfRangeException">The <paramref name="dueTime"/> or <paramref name="period"/> parameter, in milliseconds, is less than -1 or greater than 4294967294.</exception>
void Change(TimeSpan dueTime, TimeSpan period);
}
MessagePack serialization support
@n-sidorov implemented support for serializing messages using MessagePack.
To use MessagePack for message serialization, you will need to install Microsoft.Orleans.Serialization.MessagePack
by inserting the following into your project files:
<ItemGroup>
<PackageReference Include="Microsoft.Orleans.Serialization.MessagePack" Version="8.2.0" />
</ItemGroup>
Enable MessagePack serialization by calling ISerializerBuilder.AddMessagePackSerializer()
on your clients and silos:
builder.AddSerializer(serializer => serializer.AddMessagePackSerializer());
Upon doing so, Orleans will be able to serialize types with the [MessagePackObject]
attribute, for example:
[MessagePackObject]
public sealed record MyMessagePackClass
{
[Key(0)]
public int IntProperty { get; init; }
[Key(1)]
public string StringProperty { get; init; }
[Key(2)]
public MyMessagePackSubClass SubClass { get; init; }
}
Cassandra clustering provider
@rkargMsft implemented support for using Cassandra as a backing store for clustering in #8925.
To use Cassandra for clustering, you will need to install Microsoft.Orleans.Clustering.Cassandra
by inserting the following into your project files:
<ItemGroup>
<PackageReference Include="Microsoft.Orleans.Clustering.Cassandra" Version="8.2.0" />
</ItemGroup>
On your clients and silos, you can enable Cassandra clustering by calling:
builder.UseCassandraClustering(connectionString);
ADO.NET Streaming Provider (alpha)
@JorgeCandeias implemented an ADO.NET streams provider in #8974.
ADO.NET streaming is currently an alpha version. You can add it to your project files like so:
<ItemGroup>
<PackageReference Include="Microsoft.Orleans.Streaming.AdoNet" Version="8.2.0-alpha.1" />
</ItemGroup>
Configure ADO.NET streaming like so:
builder.AddAdoNetStreams("provider name", options =>
{
options.Invariant = "ADO.NET invariant name";
options.ConnectionString = "Connection string";
});
What's Changed
- Fix potential grain timer deadlock during disposal by @ReubenBond in #8950
- Add missing description node to XML docs by @scottaddie in #8959
- Clean up
SafeTimer
usage, replace withPeriodicTimer
where possible by @ReubenBond in #8953 - Fix capitalization of 'MachineName' structured logging parameter by @ReubenBond in #8980
- Ensure PeriodicTimer period >= 1ms by @ReubenBond in #8981
- Ensure reminder table is initialized before access by @ReubenBond in #8982
- Update Npgsql by @ReubenBond in #8994
- Fix serialization of types inheriting from
Dictionary<K,V>
which add values in their constructor by @ReubenBond in #8993 - Prevent generated types from appearing in IDE by @ReubenBond in #8987
- Use dotnet-public instead of nuget.org by @benjaminpetit in #8931
- Add Orleans.Runtime to implicit usings by @ReubenBond in #8996
- Stop watchdog when container is disposed by @ReubenBond in #8998
- Stop silo on Dispose by @ReubenBond in #9000
- Dispose all activations when host is disposed by @ReubenBond in #9001
- Dispose cluster & silo health monitors are disposed when host is disposed, and clean up code by @ReubenBond in #8999
- Unsubscribe
ConsistentRingProvider
&VirtualBucketsRingProvider
fromISiloStatusOracle
on shutdown by @ReubenBond in #8997 - Avoid unnecessary
Interlocked.Or
inSingleWaiterAutoResetEvent
by @ReubenBond in #9003 - test(codegen): add derived from list by @claylaut in #8858
- Add serialization support for types derived from
List<T>
andHashSet<T>
by @ReubenBond in #9005 - Use
PeriodicTimer
instead ofGrainTimer
inLeaseBasedQueueBalancer
by @ReubenBond in #9002 - Updatable grain timers by @ReubenBond in #8954
- Fix streaming config validator registration by @benjaminpetit in #8876
- Update samples README.md to point to samples repo & explorer by @ReubenBond in #9010
- [CodeGen] Always specify grain extension interface for grain extension calls by @ReubenBond in #9009
- Fix silo shutdown logging when silo is already shutting down. by @ReubenBond in #9013
- Fix perf of PooledBufferTests by @ReubenBond in #9015
- Fix termination condition in ActivationMigrationManager.AcceptMigratingGrains by @ReubenBond in #9017
- Improve
ActivationData
shutdown process by @ReubenBond in #9018 - Exclude explicitly implemented interface methods from proxy by @alrz in #8992
- Consider interface method accessibility when generating the invoker by @alrz in #9019
- ADO.NET Streaming Provider by @JorgeCandeias in #8974
- Brings back support for
StringData
in ATS provider by @ledjon-behluli in #8965 - Avoid changing soon-to-be-deprecated RegisterTimer method return type by @ReubenBond in #9020
- Mark Orleans.Streaming.AdoNet as alpha by @benjaminpetit in #9022
- Use Entra ID auth in Azure tests when possible by @benjaminpetit in #9027
- Update Microsoft.Build dependency by @benjaminpetit in #9029
- Added #pragma warning disable CS1591 to supress warnings in the source generated code by @m3nax in #8940
- ResourceOptimizedPlacement: normalize
MaxAvailableMemory
, addActivationCount
factor by @ReubenBond in #9028 - Add support to JSON codecs to serialize their generic JSON types by @ReubenBond in #9033
- Remove Orleans.Streaming.GCP by @benjaminpetit in #9031
- Allow custom back-off providers for
PersistentStreamPullingAgent
by @ledjon-behluli in #9035 - Fix nightly package publishing NuGet path by @ReubenBond in #9040
- Adds support to use
MayInterleave
withStatelessWorker
grains by @ledjon-behluli in #9050 - Avoid logging error monitoring k8s pods on restart by @gp-jorge in #9048
- Non-reentrant grain timers by @ReubenBond in #8955
- Activation Repartitioning by @ledjon-behluli in #8877
- Fix leaking timed-out callbacks in InsideRuntimeClient by @krasin-ga in #9041
- Fix table storage Reminders and GrainDirectory providers registration by @Costo in #9045
- Adding codec to serialize F# Unit by @gfix in #9039
- Cassandra Clustering implementation by @rkargMsft in #8925
- Fix for Recorded flag not propagated between activities by @Costo in #9016
- Support implementing multiple IConverter<,> in single converter class by @ccorsano in #8881
- [Orleans.EventSourcing] Fix double-increment in
RemoveStaleConditionalUpdates
by @zbarrier in #8623 - Fix flaky Activation Repartitioning test by @ReubenBond in #9058
- Implement incoming grain call filters for observers by @ReubenBond in #9054
- Activation repartitioner: use
null
for no-op message observer to avoid interface call by @ReubenBond in #9056 - MessagePack codec by @n-sidorov in #8546
- Replace custom
GetHashCode
implementations withHashCode.Combine
by @ReubenBond in #9059 - Coordinate shutdown of AdaptiveDirectoryCacheMaintainer with LocalGrainDirectory by @ReubenBond in #9061
- Promptly terminate AdaptiveDirectoryCacheMaintainer by @ReubenBond in #9062
- ActivationData: get IGrainActivator from shared components consistently by @ReubenBond in #9063
- StatelessWorker: pump work item queue consistently by @ReubenBond in #9064
- Allow GrainTimers to dispose themselves from their own callback by @ReubenBond in #9065
New Contributors
- @scottaddie made their first contribution in #8959
- @alrz made their first contribution in #8992
- @gp-jorge made their first contribution in #9048
- @krasin-ga made their first contribution in #9041
- @rkargMsft made their first contribution in #8925
- @ccorsano made their first contribution in #8881
- @zbarrier made their first contribution in #8623
- @n-sidorov made their first contribution in #8546
Full Changelog: v8.1.0...v8.2.0
-
The Power of Two Choices in Randomized Load Balancing by Michael David Mitzenmacher ↩
-
Efficient Computation of Frequent and Top-k Elements in Data Streams by Metwally, Agrawal, and Abbadi ↩
-
Finding top-k elements in data streams by Nuno Homem & Joao Paulo Carvalho ↩
-
Cache-, Hash- and Space-Efficient Bloom Filters by Felix Putze, Peter Sanders and Johannes Single ↩