Mix.install([
{:jason, "~> 1.4"},
{:kino, "~> 0.9", override: true},
{:youtube, github: "brooklinjazz/youtube"},
{:hidden_cell, github: "brooklinjazz/hidden_cell"},
{:poolboy, "~> 1.5"}
])
Process bottlenecks occur when we send too many messages to a process. For example, if we have a single GenServer in our application, each message is handled synchronously. Synchronous message handling makes it easier to reason about the behavior of a GenServer but limits the concurrency in our system.
For example, if we had an expensive operation that takes 500ms, and three clients made simultaneous requests, the last client would have to wait 1.5 seconds.
sequenceDiagram
participant Client 3
participant Client 2
participant Client 1
participant GenServer
Client 1->>GenServer: request
Client 2->>GenServer: request
Client 3->>GenServer: request
GenServer->>Client 1: response (500ms)
GenServer->>Client 2: response (1s)
GenServer->>Client 3: response (1.5s)
A worker pool is a group of processes that handle incoming tasks. When a new task arrives, one of the idle worker processes is assigned to handle it. This allows the workload to be distributed evenly among the worker processes, increasing the efficiency and scalability of the system.
flowchart LR
Caller --> P
subgraph Worker Pool
P[Pool Manager]
W1[Worker 1]
W2[Worker 2]
W3[Worker 3]
P --> W1
P --> W2
P --> W3
end
Worker pools typically have a finite number of workers. A limited worker pool helps prevent unbounded concurrency, which can overload our system with too many concurrent tasks. In addition, once we have enough workers to ensure our CPU cores are busy with work, additional concurrency has few benefits.
We use named processes in this reading material. If a named process is already started and you re-evaluate a code example you may see the following error.
** (MatchError) no match of right hand side value: {:error, {:already_started, #PID<0.265.0>}} reading/worker_pools.livemd#cell:oc2mtzukw3gd7z2s5oyguox4qdpegr7a:3: PoolManager.start_link/1 /home/brook/dockyard/curriculum/reading/worker_pools. livemd#cell:ynvusqt7eiocyatfupox6d2pqqpiwwf4:2: (file) /home/brook/dockyard/curriculum/reading/worker_pools. livemd#cell:ynvusqt7eiocyatfupox6d2pqqpiwwf4:1: (file)
To resolve this issue, reconnect the current runtime.
Throughout this reading, you will see several examples of module names without a module definition.
Registry.start_link(name: ModuleName, keys: :duplicate)
Keep in mind that module names are just atoms. For these examples, we can use atoms and module names interchangeably. We often see this when providing the name for a process.
Registry.start_link(name: :my_module_name, keys: :duplicate)
Module names are just syntax sugar for atoms prefaced with :Elixir.<atom>
.
MyModule == :"Elixir.MyModule"
Modules even work with Atom functions.
Atom.to_string(MyModule)
We use Kino.Process
in this material to provide sequence diagrams of processes. We'll often use the Kino.Process.render_sequence_trace/2 function to display process messages in a diagram.
Kino.Process.render_seq_trace(fn ->
parent = self()
child = spawn(fn -> send(parent, :message) end)
receive do
:message -> "parent #{inspect(parent)} received a message from child #{inspect(child)}"
end
end)
Registry is a built-in module that provides a way to register and look up named processes in a distributed system. Think of it as a directory service that allows you to associate a process with a name, making it easier to locate and communicate with that process throughout your application.
flowchart
Registry
p1[Process]
p2[Process]
p3[Process]
Registry --> p1
Registry --> p2
Registry --> p3
Unique Registries register only a single process for each unique key.
flowchart
R[Registry]
K1[Key1]
K2[Key2]
K3[Key3]
P1[Process1]
P2[Process2]
P3[Process3]
R --> K1 --> P1
R --> K2 --> P2
R --> K3 --> P3
{:ok, _} = Registry.start_link(keys: :unique, name: :my_unique_registry)
We can start a process under the Registry by calling Registry.register/3. We'll start a simple Agent process.
{:ok, agent_pid} =
Agent.start_link(fn ->
Registry.register(:my_unique_registry, :agent1, nil)
0
end)
Then we can find the :agent1
process in the Registry using Registry.lookup/2.
Registry.lookup(:my_unique_registry, :agent1)
For a unique Registry, we can also start a registered process without calling Registry.register/3 by using a :via tuple in the format {:via, Registry, {registry, key}}
.
name = {:via, Registry, {:my_unique_registry, :agent2}}
{:ok, agent_pid} = Agent.start_link(fn -> 0 end, name: name)
Our Registry contains both :agent1
, and :agent2
now.
[{agent1, _}] = Registry.lookup(:my_unique_registry, :agent1)
[{agent2, _}] = Registry.lookup(:my_unique_registry, :agent2)
{agent1, agent2}
Instead of using a unique key for each process, we can group many processes under a single key.
flowchart
R[Registry]
K1[Key1]
K2[Key2]
P1[Process]
P2[Process]
P3[Process]
P4[Process]
P5[Process]
P6[Process]
R --> K1
K1 --> P1
K1 --> P2
K1 --> P3
R --> K2
K2 --> P4
K2 --> P5
K2 --> P6
{:ok, _} = Registry.start_link(keys: :duplicate, name: :my_duplicate_registry)
# Register Multiple Processes Under The Same Key In The Registry
Agent.start_link(fn ->
Registry.register(:my_duplicate_registry, :my_key, nil)
0
end)
Agent.start_link(fn ->
Registry.register(:my_duplicate_registry, :my_key, nil)
0
end)
Agent.start_link(fn ->
Registry.register(:my_duplicate_registry, :my_key, nil)
0
end)
# Lookup All Three Processes
Registry.lookup(:my_duplicate_registry, :my_key)
Now that we understand Registry, we can use it to build our own worker pool.
Our Worker
will be a simple GenServer that performs a job that waits for a second and returns a response.
When this Worker
process starts, it will register itself under a PoolManager
registry. While not strictly necessary, we pass in the Registry through the opts
so we can use the same Worker
process with multiple registry examples.
defmodule Worker do
use GenServer
def start_link(opts) do
GenServer.start_link(__MODULE__, opts, opts)
end
def init(opts) do
# Register the Worker under the PoolManager
# We've made the : Registry configurable.
# While not necessary, this makes our worker re-usable throughout our registry examples.
registry = opts[:registry] || PoolManager
Registry.register(registry, :workers, nil)
{:ok, 0}
end
def perform_job(pid) do
# Print the name of the worker or its pid
IO.inspect(Process.info(pid)[:registered_name] || pid, label: "starting job")
GenServer.call(pid, :perform_job)
end
def handle_call(:perform_job, _from, state) do
Process.sleep(1000)
{:reply, "response", state}
end
end
We'll also create a PoolManager
, which starts a Registry and three Worker
processes.
defmodule PoolManager do
def start_link(_opts) do
{:ok, _} = Registry.start_link(name: __MODULE__, keys: :duplicate)
# Start three workers. We've given them names for the diagrams below.
for n <- 1..3 do
{:ok, pid} = Worker.start_link(name: :"worker#{n}")
end
end
def schedule_job do
workers = Registry.lookup(__MODULE__, :workers)
# We grab a random worker to perform the job.
# While not ideal, this is a very simple scheduling implementation.
{pid, _value} = Enum.random(workers)
Worker.perform_job(pid)
end
end
We'll use Kino.Process.render_seq_trace/2 to demonstrate how the PoolManager
starts the three registered Worker
processes.
The PoolManager.PIDPartition0
process is used under the hood by Registry. See Registry.start_link/1 for more.
Kino.Process.render_seq_trace(fn ->
PoolManager.start_link([])
end)
When a process calls PoolManager.schedule_job/0
, our PoolManager
will randomly select a worker.
Kino.Process.render_seq_trace(fn ->
PoolManager.schedule_job()
end)
The code is synchronous for the Caller process, but if we have multiple Caller processes (such as when you have many clients in a Phoenix server), we'll see the full Benefits of the worker pool.
flowchart LR
C1[Client]
C2[Client]
C1 --> P
C2 --> P
subgraph Worker Pool
P[Pool Manager]
W1[Worker 1]
W2[Worker 2]
W3[Worker 3]
P --Client 1 Routed to--> W1
P --Client 2 Routed to --> W2
P --inactive--> W3
end
To simulate multiple callers, we'll use two tasks. This will take only a second to run most of the time. However, it will take two seconds if we get unlucky and select the same worker for both tasks.
task1 = Task.async(fn -> PoolManager.schedule_job() end)
task2 = Task.async(fn -> PoolManager.schedule_job() end)
Task.await_many([task1, task2])
While randomly selecting a worker is usually fine since large workloads will be evenly distributed across workers, there are more complicated scheduling techniques such as round-robin, where we schedule workers in order.
To supervise our Registry, we simply need to put our Registry process and Worker
pool processes under a supervisor.
Here is a minimal example to demonstrate the concept.
Next, we start a Registry called SupervisedPoolManager
under a Supervisor. We also manually start three SupervisedWorker
processes for our worker pool.
children = [
{Registry, name: SupervisedPoolManager, keys: :duplicate},
%{
id: :worker1,
start: {Worker, :start_link, [[name: :super_worker1, registry: SupervisedPoolManager]]}
},
%{
id: :worker2,
start: {Worker, :start_link, [[name: :super_worker2, registry: SupervisedPoolManager]]}
},
%{
id: :worker3,
start: {Worker, :start_link, [[name: :super_worker3, registry: SupervisedPoolManager]]}
}
]
opts = [strategy: :one_for_one, name: :registry_supervisor]
{:ok, supervisor_pid} = Supervisor.start_link(children, opts)
We can look up our three SupervisedWorker
processes registered by the SupervisedPoolManager
to ensure they have started.
Registry.lookup(SupervisedPoolManager, :workers)
We'll use Kino.Process.sup_tree/2 to visualize the supervisor tree above for demonstration purposes.
Kino.Process.sup_tree(supervisor_pid)
Now that we've seen a minimal example, we'll make a more complete implementation using a Module Based Supervisors to encapsulate our supervised Registry.
defmodule SupervisedPool do
use Supervisor
def start_link(_opts) do
# we've made our name configurable for demonstration purposes.
Supervisor.start_link(__MODULE__, [], name: __MODULE__)
end
@impl true
def init(_opts) do
# System.schedulers_online() returns the number of
# available schedulers on the current machine.
child_specs =
Enum.map(1..System.schedulers_online(), fn n ->
%{
id: :"supervised_worker_#{n}",
start: {Worker, :start_link, [[registry: SupervisedPool.Registry]]}
}
end)
children =
[
{Registry, name: SupervisedPool.Registry, keys: :duplicate}
] ++ child_specs
Supervisor.init(children, strategy: :one_for_one)
end
end
This module starts a number of workers equal to the number of available schedulers in the current machine.
System.schedulers_online()
We'll start the SupervisedPool and visualize it in a Kino supervision tree diagram. You'll see the same number of processes under the SupervisedPool
as you have schedulers online.
{:ok, supervisor_pid} = SupervisedPool.start_link([])
Kino.Process.sup_tree(supervisor_pid)
Worker pools help applications run better by managing worker processes, making it easier to handle many tasks and scale our system without running into unbounded concurrency issues.
It's important to note that there exist numerous approaches to designing and implementing worker pools. While the code we've developed to construct our custom worker pool from scratch is one viable solution, there are certainly better options.
Create a mix application with a supervised worker pool that allows you to configure the number of workers started in the worker pool.
For example:
MyApp.SupervisedPool.start_link(name: :example_pool, size: 4)
Start the supervised worker pool as a child of your application supervisor.
Poolboy is a reliable alternative to implementing a custom process pool that saves us time and effort.
We can specify the following configuration for the process pool.
:name
: The name of the pool. The scope can be either:local
,:global
, or:via
.:worker_module
: The module used to create workers for the pool.:size
: The maximum number of workers allowed in the pool.:max_overflow
(optional): The maximum number of temporary workers that can be created when the pool is empty.:strategy
(optional): Determines whether workers that return to the pool should be placed first or last in the line of available workers, with two possible values::lifo
or:fifo
. The default is:lifo
.
poolboy_config = [
name: {:local, :worker},
worker_module: Worker,
size: 4
]
Poolboy provides a :poolboy.child_spec/2
function we can use to start a Poolboy manager in the supervision tree of our application.
children = [
:poolboy.child_spec(:worker, poolboy_config)
]
opts = [strategy: :one_for_one, name: :my_pool_supervisor]
{:ok, supervisor_pid} = Supervisor.start_link(children, opts)
We can use the :poolboy.transaction/3
function to access one of the workers in our worker pool and perform a job.
:poolboy.transaction(:worker, fn pid -> Worker.perform_job(pid) end, 5000)
Due to the size: 4
configuration above, 4
workers will perform jobs concurrently in the worker pool. Notice that despite triggering 10
tasks, only 4
jobs occur simultaneously.
tasks =
Enum.map(1..10, fn _ ->
Task.async(fn ->
:poolboy.transaction(:worker, fn pid -> Worker.perform_job(pid) end, 5000)
end)
end)
Task.await_many(tasks)
Here, we'll use Kino.Process.seq_trace/2 to visualize the Poolboy pool manager starting three worker processes.
Notice there are three perform_job
messages. There are also several processes being used under the hood in Poolboy.
Kino.Process.render_seq_trace(fn ->
tasks =
Enum.map(1..3, fn _ ->
Task.async(fn ->
:poolboy.transaction(:worker, fn pid -> Worker.perform_job(pid) end, 5000)
end)
end)
Task.await_many(tasks)
end)
Add the Poolboy dependency to a mix project and start a worker pool under your application's supervision tree.
Consider the following resource(s) to deepen your understanding of the topic.
- Process pools with Elixir's Registry by Andrea Leopardi
- Managing processes with Poolboy in Elixir by Akash Manohar
DockYard Academy now recommends you use the latest Release rather than forking or cloning our repository.
Run git status
to ensure there are no undesirable changes.
Then run the following in your command line from the curriculum
folder to commit your progress.
$ git add .
$ git commit -m "finish Worker Pools reading"
$ git push
We're proud to offer our open-source curriculum free of charge for anyone to learn from at their own pace.
We also offer a paid course where you can learn from an instructor alongside a cohort of your peers. We will accept applications for the June-August 2023 cohort soon.