Skip to content

Commit

Permalink
improvement: add actor_persister, and use it automatically
Browse files Browse the repository at this point in the history
improvement: authorize?: true always
  • Loading branch information
zachdaniel committed Feb 20, 2024
1 parent 52b9c94 commit 12f4207
Show file tree
Hide file tree
Showing 13 changed files with 379 additions and 130 deletions.
24 changes: 24 additions & 0 deletions config/config.exs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,30 @@ config :spark, :formatter,
"Ash.Registry": [],
"Ash.Resource": []

if Mix.env() == :test do
config :ash_oban, ecto_repos: [AshOban.Test.Repo]

config :ash_oban, :oban,
repo: AshOban.Test.Repo,
prefix: "private",
plugins: [
{Oban.Plugins.Cron, []}
],
queues: [
triggered_process: 10,
triggered_process_2: 10,
triggered_say_hello: 10
]

config :ash_oban, :actor_persister, AshOban.Test.ActorPersister

config :ash_oban, AshOban.Test.Repo,
username: "postgres",
database: "ash_oban_test",
hostname: "localhost",
pool: Ecto.Adapters.SQL.Sandbox
end

if Mix.env() == :dev do
config :git_ops,
mix_project: AshOban.MixProject,
Expand Down
33 changes: 33 additions & 0 deletions documentation/tutorials/get-started-with-ash-oban.md
Original file line number Diff line number Diff line change
Expand Up @@ -113,3 +113,36 @@ and
}
}
```
## Persisting the actor along with a job
Create a module that is responsible for translating the current user to a value that will be JSON encoded, and for turning that encoded value back into an actor.
```elixir
defmodule MyApp.AshObanActorPersister do
use AshOban.PersistActor
def store(%MyApp.User{id: id}), do: %{"type" => "user", "id" => id}
def lookup(%{"type" => "user", "id" => id}), do: MyApp.Accounts.get(MyApp.User, id)
# This allows you to set a default actor
# in cases where no actor was present
# when scheduling.
def lookup(nil), do: {:ok, nil}
end
```
Then, configure this in application config.
```elixir
config :ash_oban, :actor_persister, MyApp.AshObanActorPersister
```
### Considerations
There are a few things that are important to keep in mind:
1. The actor could be deleted or otherwise unavailable when you look it up. You very likely want your `lookup/1` to return an error in that scenario.
2. The actor can have changed between when the job was scheduled and when the trigger is executing. It can even change across retries. If you are trying to authorize access for a given trigger's update action to a given actor, keep in mind that just because the trigger is running for a given action, does *not* mean that the conditions that allowed them to originally *schedule* that action are still true.
13 changes: 13 additions & 0 deletions lib/actor_persister.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
defmodule AshOban.ActorPersister do
@type actor_json :: any
@type actor :: any

@callback store(actor :: actor) :: actor_json :: actor_json
@callback lookup(actor_json :: actor_json | nil) :: {:ok, actor | nil} | {:error, Ash.Error.t()}

defmacro __using__(_) do
quote do
@behaviour AshOban.ActorPersister
end
end
end
81 changes: 66 additions & 15 deletions lib/ash_oban.ex
Original file line number Diff line number Diff line change
Expand Up @@ -372,7 +372,14 @@ defmodule AshOban do
queues_not_drained: list(atom)
}

def schedule(resource, trigger) do
@doc """
Schedules all relevant jobs for the provided trigger or scheduled action
## Options
`:actor` - the actor to set on the job. Requires configuring an actor persister.
"""
def schedule(resource, trigger, opts \\ []) do
case trigger do
%AshOban.Trigger{} ->
trigger
Expand All @@ -387,17 +394,54 @@ defmodule AshOban do
|> case do
%AshOban.Schedule{worker: worker} ->
%{}
|> persist_actor(opts[:actor])
|> worker.new()
|> Oban.insert!()

%AshOban.Trigger{scheduler: scheduler} ->
%{}
|> persist_actor(opts[:actor])
|> scheduler.new()
|> Oban.insert!()
end
end

def run_trigger(%resource{} = record, trigger, oban_job_opts \\ []) do
@spec persist_actor(args :: map, actor :: any) :: any
def persist_actor(args, nil), do: args

def persist_actor(args, actor) do
case Application.get_env(:ash_oban, :actor_persister) do
nil ->
args

persister ->
Map.put(args, "actor", persister.store(actor))
end
end

@spec lookup_actor(actor_json :: any) :: any
def lookup_actor(actor_json) do
case Application.get_env(:ash_oban, :actor_persister) do
nil ->
{:ok, nil}

persister ->
persister.lookup(actor_json)
end
end

@doc """
Runs a specific trigger for the record provided.
## Options
- `:actor` - the actor to set on the job. Requires configuring an actor persister.
All other options are passed through to `c:Oban.Worker.new/2`
"""
def run_trigger(%resource{} = record, trigger, opts \\ []) do
{opts, oban_job_opts} = Keyword.split(opts, [:actor])

trigger =
case trigger do
%AshOban.Trigger{} ->
Expand All @@ -419,6 +463,7 @@ defmodule AshOban do
end

%{primary_key: Map.take(record, primary_key), metadata: metadata}
|> AshOban.persist_actor(opts[:actor])
|> trigger.worker.new(oban_job_opts)
|> Oban.insert!()
end
Expand Down Expand Up @@ -620,6 +665,7 @@ defmodule AshOban do
- `queue`, `with_limit`, `with_recursion`, `with_safety`, `with_scheduled` - passed through to `Oban.drain_queue/2`, if it is called
- `scheduled_actions?` - Defaults to false, unless a scheduled action name was explicitly provided. Schedules all applicable scheduled actions.
- `triggers?` - Defaults to true, schedules all applicable scheduled actions.
- `actor` - The actor to schedule and run the triggers with
If the input is:
* a list - each item is passed into `schedule_and_run_triggers/1`, and the results are merged together.
Expand All @@ -641,7 +687,7 @@ defmodule AshOban do

def do_schedule_and_run_triggers(resources_or_apis_or_otp_apps, opts)
when is_list(resources_or_apis_or_otp_apps) do
Enum.reduce(resources_or_apis_or_otp_apps, %{}, fn item, acc ->
Enum.reduce(resources_or_apis_or_otp_apps, default_acc(), fn item, acc ->
item
|> do_schedule_and_run_triggers(opts)
|> merge_results(acc)
Expand All @@ -661,7 +707,7 @@ defmodule AshOban do
end)

Enum.each(triggers, fn trigger ->
AshOban.schedule(resource, trigger)
AshOban.schedule(resource, trigger, actor: opts[:actor])
end)

queues =
Expand Down Expand Up @@ -697,7 +743,7 @@ defmodule AshOban do
end)

Enum.each(triggers, fn trigger ->
AshOban.schedule(resource_or_api_or_otp_app, trigger)
AshOban.schedule(resource_or_api_or_otp_app, trigger, actor: opts[:actor])
end)

queues =
Expand All @@ -712,7 +758,7 @@ defmodule AshOban do
resource_or_api_or_otp_app
|> Application.get_env(:ash_apis, [])
|> List.wrap()
|> Enum.reduce(%{}, fn api, acc ->
|> Enum.reduce(default_acc(), fn api, acc ->
api
|> do_schedule_and_run_triggers(opts)
|> merge_results(acc)
Expand All @@ -722,7 +768,7 @@ defmodule AshOban do

defp drain_queues(queues, opts) do
if opts[:drain_queues?] do
Enum.reduce(queues, %{}, fn queue, acc ->
Enum.reduce(queues, default_acc(), fn queue, acc ->
[queue: queue]
|> Keyword.merge(
Keyword.take(opts, [:queue, :with_limit, :with_recursion, :with_safety, :with_scheduled])
Expand All @@ -732,17 +778,22 @@ defmodule AshOban do
|> merge_results(acc)
end)
else
%{
discard: 0,
cancelled: 0,
success: 0,
failure: 0,
snoozed: 0,
queues_not_drained: Enum.uniq(queues)
}
default_acc()
|> Map.update!(:queues_not_drained, &Enum.uniq(&1 ++ queues))
end
end

defp default_acc() do
%{
discard: 0,
cancelled: 0,
success: 0,
failure: 0,
snoozed: 0,
queues_not_drained: []
}
end

defp merge_results(results, acc) do
Map.merge(results, acc, fn
:queues_not_drained, left, right ->
Expand Down
9 changes: 7 additions & 2 deletions lib/changes/run_oban_trigger.ex
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,20 @@ defmodule AshOban.Changes.RunObanTrigger do

use Ash.Resource.Change

def change(changeset, opts, _context) do
def change(changeset, opts, context) do
trigger = AshOban.Info.oban_trigger(changeset.resource, opts[:trigger])

if !trigger do
raise "No such trigger #{opts[:trigger]} for resource #{inspect(changeset.resource)}"
end

Ash.Changeset.after_action(changeset, fn _changeset, result ->
AshOban.run_trigger(result, trigger, opts[:oban_job_opts])
AshOban.run_trigger(
result,
trigger,
Keyword.put(opts[:oban_job_opts] || [], :actor, context.actor)
)

{:ok, result}
end)
end
Expand Down
Loading

0 comments on commit 12f4207

Please sign in to comment.