Skip to content

Commit

Permalink
Simplify and enhance dolphin engine insert_all
Browse files Browse the repository at this point in the history
The dolphin engine now does a single bulk insert for multiple jobs. That
speeds up the overall insert speed, reduces calls over the wire, and
matches the approach taken in other engines. The only downside is that
jobs inserted by the dolphin engine lack database generates values such
as the primary key.

Closes #1220
  • Loading branch information
sorentwo committed Jan 11, 2025
1 parent acf9cce commit fb22494
Show file tree
Hide file tree
Showing 4 changed files with 32 additions and 38 deletions.
13 changes: 10 additions & 3 deletions lib/oban.ex
Original file line number Diff line number Diff line change
Expand Up @@ -667,11 +667,18 @@ defmodule Oban do
`* (Ecto.InvalidChangesetError) could not perform insert because changeset is invalid.`
> #### 🌟 Unique Jobs and Batching {: .warning}
#### Dolphin Engine and Generated Values
MySQL doesn't return anything on insertion into the database. That means any values generated by
the database, namely the primary key and timestamps, aren't included in the job structs returned
from `insert_all`.
> #### 🌟 Unique Jobs and Batching {: .tip}
>
> Only the [Smart Engine](https://oban.pro/docs/pro/Oban.Pro.Engines.Smart.html) in [Oban
> Pro](https://oban.pro) supports bulk unique jobs and automatic batching. With the basic
> engine, you must use `insert/3` for unique support.
> Pro](https://oban.pro) supports bulk unique jobs, automatic insert batching, and minimizes
> parameters sent over the wire. With the basic engine, you must use `insert/3` to insert unique
> jobs one at a time.
## Options
Expand Down
20 changes: 3 additions & 17 deletions lib/oban/engines/dolphin.ex
Original file line number Diff line number Diff line change
Expand Up @@ -67,25 +67,11 @@ defmodule Oban.Engines.Dolphin do

@impl Engine
def insert_all_jobs(%Config{} = conf, changesets, opts) do
# MySQL doesn't return a primary key from a bulk insert, which violates the insert_all_jobs
# contract. Inserting one at a time is far less efficient, but it does what's required.
{:ok, jobs} =
Repo.transaction(conf, fn ->
Enum.map(changesets, fn changeset ->
case insert_job(conf, changeset, opts) do
{:ok, job} ->
job

{:error, %Changeset{} = changeset} ->
raise Ecto.InvalidChangesetError, action: :insert, changeset: changeset
jobs = Enum.map(changesets, &Job.to_map/1)

{:error, reason} ->
raise RuntimeError, inspect(reason)
end
end)
end)
{_count, _jobs} = Repo.insert_all(conf, Job, jobs, opts)

jobs
Enum.map(changesets, &Ecto.Changeset.apply_action!(&1, :insert))
end

@impl Engine
Expand Down
27 changes: 14 additions & 13 deletions test/oban/engine_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -659,12 +659,14 @@ for engine <- [Oban.Engines.Basic, Oban.Engines.Lite, Oban.Engines.Dolphin] do
test "inserting and executing jobs", %{name: name} do
TelemetryHandler.attach_events()

changesets =
[job_1, job_2, job_3, job_4, job_5] =
~w(OK CANCEL DISCARD ERROR SNOOZE)
|> Enum.with_index(1)
|> Enum.map(fn {act, ref} -> Worker.new(%{action: act, ref: ref}) end)

[job_1, job_2, job_3, job_4, job_5] = Oban.insert_all(name, changesets)
|> Enum.map(fn {act, ref} ->
%{action: act, ref: ref}
|> Worker.new()
|> then(&Oban.insert!(name, &1))
end)

assert_receive {:event, [:fetch_jobs, :stop], _, %{jobs: _}}

Expand All @@ -685,12 +687,14 @@ for engine <- [Oban.Engines.Basic, Oban.Engines.Lite, Oban.Engines.Dolphin] do

@tag :capture_log
test "safely executing jobs with any type of exit", %{name: name} do
changesets =
jobs =
~w(EXIT KILL TASK_ERROR TASK_EXIT)
|> Enum.with_index(1)
|> Enum.map(fn {act, ref} -> Worker.new(%{action: act, ref: ref}) end)

jobs = Oban.insert_all(name, changesets)
|> Enum.map(fn {act, ref} ->
%{action: act, ref: ref}
|> Worker.new()
|> then(&Oban.insert!(name, &1))
end)

assert_receive {:exit, 1}
assert_receive {:kill, 2}
Expand Down Expand Up @@ -732,11 +736,8 @@ for engine <- [Oban.Engines.Basic, Oban.Engines.Lite, Oban.Engines.Dolphin] do
end

test "discarding jobs that exceed max attempts", %{name: name} do
[job_1, job_2] =
Oban.insert_all(name, [
Worker.new(%{action: "ERROR", ref: 1}, max_attempts: 1),
Worker.new(%{action: "ERROR", ref: 2}, max_attempts: 2)
])
job_1 = Oban.insert!(name, Worker.new(%{action: "ERROR", ref: 1}, max_attempts: 1))
job_2 = Oban.insert!(name, Worker.new(%{action: "ERROR", ref: 2}, max_attempts: 2))

assert_receive {:error, 1}
assert_receive {:error, 2}
Expand Down
10 changes: 5 additions & 5 deletions test/oban/plugins/lifeline_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -78,11 +78,11 @@ defmodule Oban.Plugins.LifelineTest do
repo: DolphinRepo
)

[job_1, job_2] =
Oban.insert_all(name, [
Worker.new(%{}, state: "executing", attempted_at: seconds_ago(3)),
Worker.new(%{}, state: "executing", attempted_at: seconds_ago(8))
])
job_1 =
Oban.insert!(name, Worker.new(%{}, state: "executing", attempted_at: seconds_ago(3)))

job_2 =
Oban.insert!(name, Worker.new(%{}, state: "executing", attempted_at: seconds_ago(8)))

send_rescue(name)

Expand Down

0 comments on commit fb22494

Please sign in to comment.