-
-
Notifications
You must be signed in to change notification settings - Fork 420
Concurrency
One aspect of using immutable data-types like Map
, Seq
, HashSet
, etc. is that in most applications, at some point, you're likely to have some shared reference to one and need to mutate that shared reference. This often requires using synchronisation primitives like lock
(which are not composable and are prone to error).
-
Atom
atomic updates to a single value -
Ref
Software Transactional Memory: Atomic updates of multiple values - Atomic data-structures
- Source code
With a nod to the atom
type in Clojure language-ext has two types:
Atom<A>
Atom<M, A>
These types both wrap a value of A
and provides several variants of the method: Swap
(and Prelude
functions swap
) for atomically mutating the wrapped value without locking.
var atom = Atom(Set("A", "B", "C"));
swap(atom, old => old.Add("D"));
swap(atom, old => old.Add("E"));
swap(atom, old => old.Add("F"));
Debug.Assert(atom == Set("A", "B", "C", "D", "E", "F"));
NOTE: The above can also be written:
atom.Swap(old => old.Add("D"));
atom.Swap(old => old.Add("E"));
atom.Swap(old => old.Add("F"));
One thing that must be noted is that if another thread mutates the atom whilst you're running Swap
then your update will rollback. Swap
will then re-run the provided lambda function with the newly updated value (from the other thread) so that it can get a valid updated value to apply. This means that you must be careful to not have side-effects in the Swap
function, or at the very least it needs to be reliably repeatable.
The Atom
constructor can take a Func<A, bool>
validation lambda. This is run against the initial value and all subsequent values before being swapped. If the validation lambda returns false
for the proposed value then false
is returned from Swap
and no update takes place.
The Atom
types both have a Change
event:
public event AtomChangedEvent<A> Change;
It is fired after each successful atomic update to the wrapped value. If you're using the LanguageExt.Rx
extensions then you can also consume the atom.OnChange()
observable.
The Atom<M, A>
type with an M
generic argument, take an additional meta-data argument on construction which can be used to pass through an environment or some sort of context for the Swap
functions:
var env = new Env();
var atom = Atom(env, Set("A", "B", "C"));
atom.Swap((nenv, old) => old.Add("D"));
There are also other variants of Swap
that can take up to two additional arguments and pass them through to the lambda:
var atom = Atom(Set(1, 2, 3));
atom.Swap(4, 5, (x, y, old) => old.Add(x).Add(y));
Debug.Assert(atom == Set(1, 2, 3, 4, 5));
The wrapped value can be accessed by calling atom.Value
or using the implicit operator
conversion to A
.
Atom
is all well and good until you need to update multiple values atomically. This is an entirely different class of problem and can't rely on the built-in Interlocked
atomic operations. And so language-ext provides another type called Ref<A>
. Ref
wraps a value of type A
just like Atom<A>
. But a Ref
can only be updated within a transaction.
Internally language-ext uses a Software Transactional Memory (STM) with Multi-Version Concurrency Control (MVCC)
Let's start with the classic example of a bank-account where we need to atomically debit one account and credit another. Either both operations should succeed or none.
First the definition of the Account
:
public class Account
{
public readonly int Balance;
Account(int balance) =>
Balance = balance;
public Account SetBalance(int value) =>
new Account(value);
public static Ref<Account> New(int balance) =>
Ref(new Account(balance), Account.Validate);
public static bool Validate(Account a) =>
a.Balance >= 0;
}
Note how I've made the constructor private and then made a static
method called New
which forces the result to be a Ref
. It also provides an optional validation function to the Ref
. This will be run before any transaction is committed to ensure we have a valid state.
It isn't necessary to wrap up construction like this, but for something like a bank-account type it's clear we'd always want updates to be atomic, and so this is a really nice way of enforcing the rules of the type.
Now we can write a static class
to do our balance transfer:
public static class Transfer
{
public static Unit Do(Ref<Account> from, Ref<Account> to, int amount) =>
atomic(() =>
{
swap(from, a => a.SetBalance(a.Balance - amount));
swap(to, a => a.SetBalance(a.Balance + amount));
});
}
NOTE: swap
here is a convenience function for updating the value in the Ref
. We could do it manually by setting the Value
property, but it's not quite as pretty:
from.Value = from.Value.SetBalance(from.Value.Balance - amount);
We can also use the Swap
method on Ref
:
from.Swap(a => a.SetBalance(a.Balance - amount));
The key here is the atomic
function. That wraps the transactional operation. When it starts it will take a snapshot of the Ref
world. Then all operations are performed in isolation. On completion of the lambda the transaction system will commit the changes. However, if another transaction has run and modified the same values in the meantime then the whole process starts again and the lambda is re-run with the new state of the Ref
world.
atomic
covers the ACI
of ACID
:
- Atomic - means that every change to Refs made within a transaction occurs or none do
- Consistent - means that each new value can be checked with a validator function before allowing the transaction to commit
- Isolated - means that no transaction sees the effects of any other transaction while it is running.
There are two isolation modes, the default being Isolation.Snapshot
-
Isolation.Snapshot
- This is the slightly weaker isolation mode. A snapshot transaction will only fail if there are write conflicts between two transactions. This is usually good enough, but can lead to the problem of:- Transaction 1: read ref A to get the value to write in ref B
- Transaction 2: read ref B to get the value to write in ref A
- These two will never have a write conflict, but the reads conflict with the writes
-
Isolation.Serialisable
- This is the strictest isolation mode and solves the problem mentioned above. However, you should only use this if you have the problem above, as it's more likely to encounter conflicts and have to re-run the transaction.
Instead of passing an Isolation
parameter to atomic
, you can call the convenience functions:
-
snapshot
- for snapshot isolation -
serial
- for serialisable isolation
This is a very good video on Multi-Version Concurrency Control and the two isolation modes listed above
Sometimes you're not looking to modify a value, but you want a consistent snapshot of multiple items. One thing to note is:
-
Ref
reads never block other readers or writers -
Ref
writes never block other readers or writers
And so we can very quickly get a snapshot of a number of Ref
values. If we have a couple of Ref<Account>
values:
Ref<Account> accountA = Account.New(100);
Ref<Account> accountB = Account.New(400);
var (balanceA, balanceB) = atomic(() => (accountA.Value, accountB.Value));
This will always give you a consistent read. You can read from the Value
property outside of a atomic
, it will just return the latest version. But reading the Value
properties from multiple Ref
values outside of a atomic
could lead to an inconsistent view.
It is also possible to set the Value
property with yourRef.Value = ...
, but it must be done within a atomic
otherwise an exception will be thrown.
BIG BIG NOTE: The types that
Ref
wraps should be immutable. Mutable objects can be modified outside of the STM system, breaking this whole concept. If you do this, you're on your own!
There are some concurrency use cases where you can afford to be a bit more relaxed to gain some performance. For example, imagine you were keeping a transaction log through the day. You might be relaxed about the order of transactions in the log if you knew that the final balance was always correct. Basically if you receive two deposits of £100 and £50 you won’t mind too much if they are logged as £100 then £50 or £50 then £100 because addition is commutative.
Let's add a Deposit
method to our Account
type:
public class Account
{
public readonly int Balance;
Account(int balance) =>
Balance = balance;
public Account SetBalance(int value) =>
new Account(value);
public Account Deposit(int value) =>
new Account(Balance + value);
public static Ref<Account> New(int balance) =>
Ref(new Account(balance), Account.Validate);
public static bool Validate(Account a) =>
a.Balance >= 0;
public override string ToString() =>
Balance.ToString();
}
Now, we can write a simple function to deposit:
static void LogDeposit(Ref<Account> account, int amount) =>
atomic(() => commute(account, a => a.Deposit(amount)));
Then a little test to prove our assumptions:
var bank = Account.New(0);
LogDeposit(bank, 100);
LogDeposit(bank, 50);
Assert.True(bank.Value.Balance == 150);
var bank2 = Account.New(0);
LogDeposit(bank2, 50);
LogDeposit(bank2, 100);
Assert.True(bank2.Value.Balance == 150);
The key function is commute
(or Commute
if you're calling the method from a Ref<A>
). It has the following properties:
- Must be run from within a
atomic
transaction - We provide it with a function to run against the
Ref
- The result will go into the transaction's state
- When the transaction is committed: the
Func
is run again with the liveRef
value, not theRef
value from the transaction snapshot/state.
The thing to note about commute
is that when the function is called it sets the in-transaction value of the Ref
, but the actual change is applied at commit time, by running the function passed to commute
again with the latest Ref
value. This means that the value you calculate in your transaction might not be the value that is finally committed to the Ref
. This needs careful thought. You need to be sure that your processing doesn’t rely on seeing the latest value of the Ref
.
Although the operation is atomic, it clearly has a weaker consistency story than swap
. We need to make sure of the following for this to work out for us:
- The bound
Ref
value is immutable - The operation performed on the
Ref
is commutative
If this isn't the case, then you essentially have a last writer wins situation and you're on your own.
Oftentimes what gets wrapped in an Atom
is a data-structure like a HashMap
or a Seq
, and so to make it easier to work with these atomic values, there are some prebuilt atomic data-structures that have all the guarantees of Atom
plus the benefits of the original interface of the data-structure. For example, with Atom<HashMap<K, V>>
we'd need to do this to add an item:
var data = Atom(HashMap<string, int>());
data.Swap(d => d.Add("X", 1));
Which is fine, but can get a little repetitive if we're working with the atomic value a lot. It also comes with an allocation cost for the lambda. Instead we can use the atomic data-structure: AtomHashMap<K, V>
:
var data = AtomHashMap<string, int>();
data.Add("X", 1);
The atomic data-structures mirror the interface of their immutable counterpart exactly. They are optimised for the underlying type too, so no allocation costs for atomic operations (unless the original immutable counterpart allocated), and come with a few extras that make sense in the concurrent world, like Swap
:
var data = AtomHashMap<string, int>();
// Do more complex processing atomically here
data.Swap(d => ...);
AtomHashMap<K, V>
is functionally equivalent to Atom<HashMap<K, V>>
but highly optimised to make the best use of the underlying data structure. Internally the hash-map is immutable, but the reference is atomically mutable. The beauty of that is you can call ToHashMap()
to get a snapshot of the data-structure at any time with zero allocation costs and O1
access time:
var data = AtomHashMap<string, int>();
// Returns an empty HashMap
var snapshot1 = data.ToHashMap();
data.Add("X", 1);
// Returns a HashMap with one item in it, snapshot1 is still empty
var snapshot2 = data.ToHashMap();
This ability to grab perfectly sound snapshots of the data, with practically zero cost, is pretty amazing. When combined with the ability to do atomic operations on the AtomHashMap
itself, we have an incredibly powerful shared-atomic-state system.
AtomSeq<A>
is functionally equivalent to Atom<Seq<A>>
but highly optimised to make the best use of the underlying data structure. Internally the Seq
is immutable, but the reference is atomically mutable. The beauty of that is you can call ToSeq()
to get a snapshot of the data-structure at any time with zero allocation costs and O1
access time:
var data = Seq<string>();
// Returns an empty Seq
var snapshot1 = data.ToSeq();
data.Add("X");
// Returns a Seq with one item in it, snapshot1 is still empty
var snapshot2 = data.ToSeq();
This ability to grab perfectly sound snapshots of the data, with practically zero cost, is pretty amazing. When combined with the ability to do atomic operations on the AtomSeq
itself, we have an incredibly powerful shared-atomic-state system.