From 12f420719341db9a270e1a9e1f0840e03c29e8b5 Mon Sep 17 00:00:00 2001 From: Zach Daniel Date: Tue, 20 Feb 2024 13:02:43 -0500 Subject: [PATCH] improvement: add `actor_persister`, and use it automatically improvement: authorize?: true always --- config/config.exs | 24 ++ .../tutorials/get-started-with-ash-oban.md | 33 +++ lib/actor_persister.ex | 13 + lib/ash_oban.ex | 81 ++++-- lib/changes/run_oban_trigger.ex | 9 +- lib/transformers/define_schedulers.ex | 241 ++++++++++-------- mix.exs | 18 +- mix.lock | 7 +- test/ash_oban_test.exs | 33 ++- test/support/actor_persister.ex | 17 ++ test/support/repo.ex | 3 + test/support/triggered.ex | 19 ++ .../20240220165645_install_oban.exs | 11 + 13 files changed, 379 insertions(+), 130 deletions(-) create mode 100644 lib/actor_persister.ex create mode 100644 test/support/actor_persister.ex create mode 100644 test/support/repo.ex create mode 100644 test_migrations/20240220165645_install_oban.exs diff --git a/config/config.exs b/config/config.exs index fa4b52f..153935b 100644 --- a/config/config.exs +++ b/config/config.exs @@ -6,6 +6,30 @@ config :spark, :formatter, "Ash.Registry": [], "Ash.Resource": [] +if Mix.env() == :test do + config :ash_oban, ecto_repos: [AshOban.Test.Repo] + + config :ash_oban, :oban, + repo: AshOban.Test.Repo, + prefix: "private", + plugins: [ + {Oban.Plugins.Cron, []} + ], + queues: [ + triggered_process: 10, + triggered_process_2: 10, + triggered_say_hello: 10 + ] + + config :ash_oban, :actor_persister, AshOban.Test.ActorPersister + + config :ash_oban, AshOban.Test.Repo, + username: "postgres", + database: "ash_oban_test", + hostname: "localhost", + pool: Ecto.Adapters.SQL.Sandbox +end + if Mix.env() == :dev do config :git_ops, mix_project: AshOban.MixProject, diff --git a/documentation/tutorials/get-started-with-ash-oban.md b/documentation/tutorials/get-started-with-ash-oban.md index 7690511..3a416ad 100644 --- a/documentation/tutorials/get-started-with-ash-oban.md +++ b/documentation/tutorials/get-started-with-ash-oban.md @@ -113,3 +113,36 @@ and } } ``` + +## Persisting the actor along with a job + +Create a module that is responsible for translating the current user to a value that will be JSON encoded, and for turning that encoded value back into an actor. + +```elixir +defmodule MyApp.AshObanActorPersister do + use AshOban.PersistActor + + def store(%MyApp.User{id: id}), do: %{"type" => "user", "id" => id} + + def lookup(%{"type" => "user", "id" => id}), do: MyApp.Accounts.get(MyApp.User, id) + + # This allows you to set a default actor + # in cases where no actor was present + # when scheduling. + def lookup(nil), do: {:ok, nil} +end +``` + +Then, configure this in application config. + +```elixir +config :ash_oban, :actor_persister, MyApp.AshObanActorPersister +``` + +### Considerations + +There are a few things that are important to keep in mind: + +1. The actor could be deleted or otherwise unavailable when you look it up. You very likely want your `lookup/1` to return an error in that scenario. + +2. The actor can have changed between when the job was scheduled and when the trigger is executing. It can even change across retries. If you are trying to authorize access for a given trigger's update action to a given actor, keep in mind that just because the trigger is running for a given action, does *not* mean that the conditions that allowed them to originally *schedule* that action are still true. \ No newline at end of file diff --git a/lib/actor_persister.ex b/lib/actor_persister.ex new file mode 100644 index 0000000..4c28ddb --- /dev/null +++ b/lib/actor_persister.ex @@ -0,0 +1,13 @@ +defmodule AshOban.ActorPersister do + @type actor_json :: any + @type actor :: any + + @callback store(actor :: actor) :: actor_json :: actor_json + @callback lookup(actor_json :: actor_json | nil) :: {:ok, actor | nil} | {:error, Ash.Error.t()} + + defmacro __using__(_) do + quote do + @behaviour AshOban.ActorPersister + end + end +end diff --git a/lib/ash_oban.ex b/lib/ash_oban.ex index 95b7c32..ab24e10 100644 --- a/lib/ash_oban.ex +++ b/lib/ash_oban.ex @@ -372,7 +372,14 @@ defmodule AshOban do queues_not_drained: list(atom) } - def schedule(resource, trigger) do + @doc """ + Schedules all relevant jobs for the provided trigger or scheduled action + + ## Options + + `:actor` - the actor to set on the job. Requires configuring an actor persister. + """ + def schedule(resource, trigger, opts \\ []) do case trigger do %AshOban.Trigger{} -> trigger @@ -387,17 +394,54 @@ defmodule AshOban do |> case do %AshOban.Schedule{worker: worker} -> %{} + |> persist_actor(opts[:actor]) |> worker.new() |> Oban.insert!() %AshOban.Trigger{scheduler: scheduler} -> %{} + |> persist_actor(opts[:actor]) |> scheduler.new() |> Oban.insert!() end end - def run_trigger(%resource{} = record, trigger, oban_job_opts \\ []) do + @spec persist_actor(args :: map, actor :: any) :: any + def persist_actor(args, nil), do: args + + def persist_actor(args, actor) do + case Application.get_env(:ash_oban, :actor_persister) do + nil -> + args + + persister -> + Map.put(args, "actor", persister.store(actor)) + end + end + + @spec lookup_actor(actor_json :: any) :: any + def lookup_actor(actor_json) do + case Application.get_env(:ash_oban, :actor_persister) do + nil -> + {:ok, nil} + + persister -> + persister.lookup(actor_json) + end + end + + @doc """ + Runs a specific trigger for the record provided. + + ## Options + + - `:actor` - the actor to set on the job. Requires configuring an actor persister. + + All other options are passed through to `c:Oban.Worker.new/2` + """ + def run_trigger(%resource{} = record, trigger, opts \\ []) do + {opts, oban_job_opts} = Keyword.split(opts, [:actor]) + trigger = case trigger do %AshOban.Trigger{} -> @@ -419,6 +463,7 @@ defmodule AshOban do end %{primary_key: Map.take(record, primary_key), metadata: metadata} + |> AshOban.persist_actor(opts[:actor]) |> trigger.worker.new(oban_job_opts) |> Oban.insert!() end @@ -620,6 +665,7 @@ defmodule AshOban do - `queue`, `with_limit`, `with_recursion`, `with_safety`, `with_scheduled` - passed through to `Oban.drain_queue/2`, if it is called - `scheduled_actions?` - Defaults to false, unless a scheduled action name was explicitly provided. Schedules all applicable scheduled actions. - `triggers?` - Defaults to true, schedules all applicable scheduled actions. + - `actor` - The actor to schedule and run the triggers with If the input is: * a list - each item is passed into `schedule_and_run_triggers/1`, and the results are merged together. @@ -641,7 +687,7 @@ defmodule AshOban do def do_schedule_and_run_triggers(resources_or_apis_or_otp_apps, opts) when is_list(resources_or_apis_or_otp_apps) do - Enum.reduce(resources_or_apis_or_otp_apps, %{}, fn item, acc -> + Enum.reduce(resources_or_apis_or_otp_apps, default_acc(), fn item, acc -> item |> do_schedule_and_run_triggers(opts) |> merge_results(acc) @@ -661,7 +707,7 @@ defmodule AshOban do end) Enum.each(triggers, fn trigger -> - AshOban.schedule(resource, trigger) + AshOban.schedule(resource, trigger, actor: opts[:actor]) end) queues = @@ -697,7 +743,7 @@ defmodule AshOban do end) Enum.each(triggers, fn trigger -> - AshOban.schedule(resource_or_api_or_otp_app, trigger) + AshOban.schedule(resource_or_api_or_otp_app, trigger, actor: opts[:actor]) end) queues = @@ -712,7 +758,7 @@ defmodule AshOban do resource_or_api_or_otp_app |> Application.get_env(:ash_apis, []) |> List.wrap() - |> Enum.reduce(%{}, fn api, acc -> + |> Enum.reduce(default_acc(), fn api, acc -> api |> do_schedule_and_run_triggers(opts) |> merge_results(acc) @@ -722,7 +768,7 @@ defmodule AshOban do defp drain_queues(queues, opts) do if opts[:drain_queues?] do - Enum.reduce(queues, %{}, fn queue, acc -> + Enum.reduce(queues, default_acc(), fn queue, acc -> [queue: queue] |> Keyword.merge( Keyword.take(opts, [:queue, :with_limit, :with_recursion, :with_safety, :with_scheduled]) @@ -732,17 +778,22 @@ defmodule AshOban do |> merge_results(acc) end) else - %{ - discard: 0, - cancelled: 0, - success: 0, - failure: 0, - snoozed: 0, - queues_not_drained: Enum.uniq(queues) - } + default_acc() + |> Map.update!(:queues_not_drained, &Enum.uniq(&1 ++ queues)) end end + defp default_acc() do + %{ + discard: 0, + cancelled: 0, + success: 0, + failure: 0, + snoozed: 0, + queues_not_drained: [] + } + end + defp merge_results(results, acc) do Map.merge(results, acc, fn :queues_not_drained, left, right -> diff --git a/lib/changes/run_oban_trigger.ex b/lib/changes/run_oban_trigger.ex index ee240de..1bd770e 100644 --- a/lib/changes/run_oban_trigger.ex +++ b/lib/changes/run_oban_trigger.ex @@ -5,7 +5,7 @@ defmodule AshOban.Changes.RunObanTrigger do use Ash.Resource.Change - def change(changeset, opts, _context) do + def change(changeset, opts, context) do trigger = AshOban.Info.oban_trigger(changeset.resource, opts[:trigger]) if !trigger do @@ -13,7 +13,12 @@ defmodule AshOban.Changes.RunObanTrigger do end Ash.Changeset.after_action(changeset, fn _changeset, result -> - AshOban.run_trigger(result, trigger, opts[:oban_job_opts]) + AshOban.run_trigger( + result, + trigger, + Keyword.put(opts[:oban_job_opts] || [], :actor, context.actor) + ) + {:ok, result} end) end diff --git a/lib/transformers/define_schedulers.ex b/lib/transformers/define_schedulers.ex index 4f19b52..7f0c9bc 100644 --- a/lib/transformers/define_schedulers.ex +++ b/lib/transformers/define_schedulers.ex @@ -94,24 +94,30 @@ defmodule AshOban.Transformers.DefineSchedulers do stream = if is_nil(trigger.where) do quote location: :keep do - def stream(resource) do + def stream(resource, actor) do resource |> Ash.Query.set_context(%{private: %{ash_oban?: true}}) |> Ash.Query.select(unquote(primary_key)) |> limit_stream() - |> Ash.Query.for_read(unquote(trigger.read_action)) + |> Ash.Query.for_read(unquote(trigger.read_action), %{}, + authorize?: true, + actor: actor + ) |> unquote(api).stream!(unquote(batch_opts)) end end else quote location: :keep do - def stream(resource) do + def stream(resource, actor) do resource |> Ash.Query.set_context(%{private: %{ash_oban?: true}}) |> Ash.Query.select(unquote(primary_key)) |> limit_stream() |> filter() - |> Ash.Query.for_read(unquote(trigger.read_action)) + |> Ash.Query.for_read(unquote(trigger.read_action), %{}, + authorize?: true, + actor: actor + ) |> unquote(api).stream!() end end @@ -193,7 +199,7 @@ defmodule AshOban.Transformers.DefineSchedulers do {:discard, unquote(trigger.state)} end else - def unquote(function_name)(%Oban.Job{}) do + def unquote(function_name)(%Oban.Job{args: args}) do metadata = case AshOban.Info.oban_trigger(unquote(resource), unquote(trigger.name)) do %{read_metadata: read_metadata} when is_function(read_metadata) -> @@ -203,15 +209,22 @@ defmodule AshOban.Transformers.DefineSchedulers do fn _ -> %{} end end - unquote(resource) - |> stream() - |> Stream.map(fn record -> - unquote(worker_module_name).new(%{ - primary_key: Map.take(record, unquote(primary_key)), - metadata: metadata.(record) - }) - end) - |> insert() + case AshOban.lookup_actor(args["actor"]) do + {:ok, actor} -> + unquote(resource) + |> stream(actor) + |> Stream.map(fn record -> + unquote(worker_module_name).new(%{ + primary_key: Map.take(record, unquote(primary_key)), + metadata: metadata.(record), + actor: args["actor"] + }) + end) + |> insert() + + {:error, e} -> + raise Ash.Error.to_ash_error(e) + end rescue e -> Logger.error( @@ -414,63 +427,69 @@ defmodule AshOban.Transformers.DefineSchedulers do end def handle_error( - %{max_attempts: max_attempts, attempt: max_attempts} = job, + %{max_attempts: max_attempts, attempt: max_attempts, args: args} = job, error, primary_key, stacktrace ) do - query() - |> Ash.Query.do_filter(primary_key) - |> Ash.Query.set_context(%{private: %{ash_oban?: true}}) - |> Ash.Query.for_read(unquote(read_action)) - |> unquote(api).read_one() - |> case do - {:error, error} -> - AshOban.debug( - """ - Record with primary key #{inspect(primary_key)} encountered an error in #{unquote(inspect(resource))}#{unquote(trigger.name)} - - #{Exception.format(:error, error, AshOban.stacktrace(error))} - """, - unquote(trigger.debug?) - ) - - {:error, error} - - {:ok, nil} -> - AshOban.debug( - "Record with primary key #{inspect(primary_key)} no longer applies to trigger #{unquote(inspect(resource))}#{unquote(trigger.name)}", - unquote(trigger.debug?) - ) - - {:discard, :trigger_no_longer_applies} - - {:ok, record} -> - unquote(log_final_error) - - record - |> Ash.Changeset.new() - |> prepare_error(primary_key) - |> Ash.Changeset.set_context(%{private: %{ash_oban?: true}}) - |> Ash.Changeset.for_action(unquote(trigger.on_error), %{error: error}) - |> AshOban.update_or_destroy(unquote(api)) + case AshOban.lookup_actor(args["actor"]) do + {:ok, actor} -> + query() + |> Ash.Query.do_filter(primary_key) + |> Ash.Query.set_context(%{private: %{ash_oban?: true}}) + |> Ash.Query.for_read(unquote(read_action), %{}, authorize?: true, actor: actor) + |> unquote(api).read_one() |> case do - :ok -> - :ok - - {:ok, result} -> - :ok - {:error, error} -> - error = Ash.Error.to_ash_error(error, stacktrace) - - Logger.error(""" - Error handler failed for #{inspect(unquote(resource))}: #{inspect(primary_key)}! - - #{inspect(Exception.format(:error, error, AshOban.stacktrace(error)))} - """) - - reraise error, stacktrace + AshOban.debug( + """ + Record with primary key #{inspect(primary_key)} encountered an error in #{unquote(inspect(resource))}#{unquote(trigger.name)} + + #{Exception.format(:error, error, AshOban.stacktrace(error))} + """, + unquote(trigger.debug?) + ) + + {:error, error} + + {:ok, nil} -> + AshOban.debug( + "Record with primary key #{inspect(primary_key)} no longer applies to trigger #{unquote(inspect(resource))}#{unquote(trigger.name)}", + unquote(trigger.debug?) + ) + + {:discard, :trigger_no_longer_applies} + + {:ok, record} -> + unquote(log_final_error) + + record + |> Ash.Changeset.new() + |> prepare_error(primary_key) + |> Ash.Changeset.set_context(%{private: %{ash_oban?: true}}) + |> Ash.Changeset.for_action(unquote(trigger.on_error), %{error: error}, + authorize?: true, + actor: actor + ) + |> AshOban.update_or_destroy(unquote(api)) + |> case do + :ok -> + :ok + + {:ok, result} -> + :ok + + {:error, error} -> + error = Ash.Error.to_ash_error(error, stacktrace) + + Logger.error(""" + Error handler failed for #{inspect(unquote(resource))}: #{inspect(primary_key)}! + + #{inspect(Exception.format(:error, error, AshOban.stacktrace(error)))} + """) + + reraise error, stacktrace + end end end end @@ -525,51 +544,59 @@ defmodule AshOban.Transformers.DefineSchedulers do unquote(trigger.debug?) ) - query() - |> Ash.Query.do_filter(primary_key) - |> Ash.Query.set_context(%{private: %{ash_oban?: true}}) - |> Ash.Query.for_read(unquote(read_action)) - |> unquote(api).read_one() - |> case do - {:ok, nil} -> - AshOban.debug( - "Record with primary key #{inspect(primary_key)} no longer applies to trigger #{unquote(inspect(resource))}#{unquote(trigger.name)}", - unquote(trigger.debug?) - ) - - {:discard, :trigger_no_longer_applies} - - {:ok, record} -> - args = - if unquote(is_nil(trigger.read_metadata)) do - %{} - else - %{metadata: args["metadata"]} - end - - record - |> Ash.Changeset.new() - |> prepare(primary_key) - |> Ash.Changeset.set_context(%{private: %{ash_oban?: true}}) - |> Ash.Changeset.for_action( - unquote(trigger.action), - Map.merge(unquote(Macro.escape(trigger.action_input || %{})), args) - ) - |> AshOban.update_or_destroy(unquote(api)) + case AshOban.lookup_actor(args["actor"]) do + {:ok, actor} -> + query() + |> Ash.Query.do_filter(primary_key) + |> Ash.Query.set_context(%{private: %{ash_oban?: true}}) + |> Ash.Query.for_read(unquote(read_action), %{}, authorize?: true, actor: actor) + |> unquote(api).read_one() |> case do - :ok -> - :ok - - {:ok, result} -> - {:ok, result} - - {:error, error} -> - raise Ash.Error.to_error_class(error) + {:ok, nil} -> + AshOban.debug( + "Record with primary key #{inspect(primary_key)} no longer applies to trigger #{unquote(inspect(resource))}#{unquote(trigger.name)}", + unquote(trigger.debug?) + ) + + {:discard, :trigger_no_longer_applies} + + {:ok, record} -> + args = + if unquote(is_nil(trigger.read_metadata)) do + %{} + else + %{metadata: args["metadata"]} + end + + record + |> Ash.Changeset.new() + |> prepare(primary_key) + |> Ash.Changeset.set_context(%{private: %{ash_oban?: true}}) + |> Ash.Changeset.for_action( + unquote(trigger.action), + Map.merge(unquote(Macro.escape(trigger.action_input || %{})), args), + authorize?: true, + actor: actor + ) + |> AshOban.update_or_destroy(unquote(api)) + |> case do + :ok -> + :ok + + {:ok, result} -> + {:ok, result} + + {:error, error} -> + raise Ash.Error.to_error_class(error) + end + + # we don't have the record here, so we can't do the `on_error` behavior + other -> + other end - # we don't have the record here, so we can't do the `on_error` behavior - other -> - other + {:error, error} -> + raise Ash.Error.to_error_class(error) end rescue error -> diff --git a/mix.exs b/mix.exs index e1b1cc1..51f2af9 100644 --- a/mix.exs +++ b/mix.exs @@ -25,6 +25,16 @@ defmodule AshOban.MixProject do ] end + def cli do + [ + preferred_envs: [ + "test.gen.migration": :test, + "test.migrate": :test, + "test.create": :test + ] + ] + end + defp package do [ name: :ash_oban, @@ -112,7 +122,8 @@ defmodule AshOban.MixProject do {:dialyxir, ">= 0.0.0", only: [:dev, :test], runtime: false}, {:sobelow, ">= 0.0.0", only: [:dev, :test], runtime: false}, {:git_ops, "~> 2.5", only: [:dev, :test]}, - {:excoveralls, "~> 0.13", only: [:dev, :test]} + {:excoveralls, "~> 0.13", only: [:dev, :test]}, + {:postgrex, "~> 0.17.4"} ] end @@ -128,7 +139,10 @@ defmodule AshOban.MixProject do ], "spark.formatter": "spark.formatter --extensions AshOban", "spark.cheat_sheets": "spark.cheat_sheets --extensions AshOban", - "spark.cheat_sheets_in_search": "spark.cheat_sheets_in_search --extensions AshOban" + "spark.cheat_sheets_in_search": "spark.cheat_sheets_in_search --extensions AshOban", + "test.gen.migration": "ecto.gen.migration --migrations-path=test_migrations", + "test.migrate": "ecto.migrate --migrations-path=test_migrations", + "test.create": "ecto.create" ] end end diff --git a/mix.lock b/mix.lock index a2998df..2f9424d 100644 --- a/mix.lock +++ b/mix.lock @@ -3,12 +3,12 @@ "bunt": {:hex, :bunt, "0.2.1", "e2d4792f7bc0ced7583ab54922808919518d0e57ee162901a16a1b6664ef3b14", [:mix], [], "hexpm", "a330bfb4245239787b15005e66ae6845c9cd524a288f0d141c148b02603777a5"}, "comparable": {:hex, :comparable, "1.0.0", "bb669e91cedd14ae9937053e5bcbc3c52bb2f22422611f43b6e38367d94a495f", [:mix], [{:typable, "~> 0.1", [hex: :typable, repo: "hexpm", optional: false]}], "hexpm", "277c11eeb1cd726e7cd41c6c199e7e52fa16ee6830b45ad4cdc62e51f62eb60c"}, "credo": {:hex, :credo, "1.7.0", "6119bee47272e85995598ee04f2ebbed3e947678dee048d10b5feca139435f75", [:mix], [{:bunt, "~> 0.2.1", [hex: :bunt, repo: "hexpm", optional: false]}, {:file_system, "~> 0.2.8", [hex: :file_system, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "6839fcf63d1f0d1c0f450abc8564a57c43d644077ab96f2934563e68b8a769d7"}, - "db_connection": {:hex, :db_connection, "2.5.0", "bb6d4f30d35ded97b29fe80d8bd6f928a1912ca1ff110831edcd238a1973652c", [:mix], [{:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "c92d5ba26cd69ead1ff7582dbb860adeedfff39774105a4f1c92cbb654b55aa2"}, + "db_connection": {:hex, :db_connection, "2.6.0", "77d835c472b5b67fc4f29556dee74bf511bbafecdcaf98c27d27fa5918152086", [:mix], [{:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "c2f992d15725e721ec7fbc1189d4ecdb8afef76648c746a8e1cad35e3b8a35f3"}, "decimal": {:hex, :decimal, "2.1.1", "5611dca5d4b2c3dd497dec8f68751f1f1a54755e8ed2a966c2633cf885973ad6", [:mix], [], "hexpm", "53cfe5f497ed0e7771ae1a475575603d77425099ba5faef9394932b35020ffcc"}, "dialyxir": {:hex, :dialyxir, "1.4.1", "a22ed1e7bd3a3e3f197b68d806ef66acb61ee8f57b3ac85fc5d57354c5482a93", [:mix], [{:erlex, ">= 0.2.6", [hex: :erlex, repo: "hexpm", optional: false]}], "hexpm", "84b795d6d7796297cca5a3118444b80c7d94f7ce247d49886e7c291e1ae49801"}, "earmark_parser": {:hex, :earmark_parser, "1.4.39", "424642f8335b05bb9eb611aa1564c148a8ee35c9c8a8bba6e129d51a3e3c6769", [:mix], [], "hexpm", "06553a88d1f1846da9ef066b87b57c6f605552cfbe40d20bd8d59cc6bde41944"}, - "ecto": {:hex, :ecto, "3.10.3", "eb2ae2eecd210b4eb8bece1217b297ad4ff824b4384c0e3fdd28aaf96edd6135", [:mix], [{:decimal, "~> 1.6 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "44bec74e2364d491d70f7e42cd0d690922659d329f6465e89feb8a34e8cd3433"}, - "ecto_sql": {:hex, :ecto_sql, "3.10.2", "6b98b46534b5c2f8b8b5f03f126e75e2a73c64f3c071149d32987a5378b0fdbd", [:mix], [{:db_connection, "~> 2.4.1 or ~> 2.5", [hex: :db_connection, repo: "hexpm", optional: false]}, {:ecto, "~> 3.10.0", [hex: :ecto, repo: "hexpm", optional: false]}, {:myxql, "~> 0.6.0", [hex: :myxql, repo: "hexpm", optional: true]}, {:postgrex, "~> 0.16.0 or ~> 0.17.0 or ~> 1.0", [hex: :postgrex, repo: "hexpm", optional: true]}, {:tds, "~> 2.1.1 or ~> 2.2", [hex: :tds, repo: "hexpm", optional: true]}, {:telemetry, "~> 0.4.0 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "68c018debca57cb9235e3889affdaec7a10616a4e3a80c99fa1d01fdafaa9007"}, + "ecto": {:hex, :ecto, "3.11.1", "4b4972b717e7ca83d30121b12998f5fcdc62ba0ed4f20fd390f16f3270d85c3e", [:mix], [{:decimal, "~> 2.0", [hex: :decimal, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "ebd3d3772cd0dfcd8d772659e41ed527c28b2a8bde4b00fe03e0463da0f1983b"}, + "ecto_sql": {:hex, :ecto_sql, "3.11.1", "e9abf28ae27ef3916b43545f9578b4750956ccea444853606472089e7d169470", [:mix], [{:db_connection, "~> 2.4.1 or ~> 2.5", [hex: :db_connection, repo: "hexpm", optional: false]}, {:ecto, "~> 3.11.0", [hex: :ecto, repo: "hexpm", optional: false]}, {:myxql, "~> 0.6.0", [hex: :myxql, repo: "hexpm", optional: true]}, {:postgrex, "~> 0.16.0 or ~> 0.17.0 or ~> 1.0", [hex: :postgrex, repo: "hexpm", optional: true]}, {:tds, "~> 2.1.1 or ~> 2.2", [hex: :tds, repo: "hexpm", optional: true]}, {:telemetry, "~> 0.4.0 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "ce14063ab3514424276e7e360108ad6c2308f6d88164a076aac8a387e1fea634"}, "elixir_make": {:hex, :elixir_make, "0.7.7", "7128c60c2476019ed978210c245badf08b03dbec4f24d05790ef791da11aa17c", [:mix], [{:castore, "~> 0.1 or ~> 1.0", [hex: :castore, repo: "hexpm", optional: true]}], "hexpm", "5bc19fff950fad52bbe5f211b12db9ec82c6b34a9647da0c2224b8b8464c7e6c"}, "erlex": {:hex, :erlex, "0.2.6", "c7987d15e899c7a2f34f5420d2a2ea0d659682c06ac607572df55a43753aa12e", [:mix], [], "hexpm", "2ed2e25711feb44d52b17d2780eabf998452f6efda104877a3881c2f8c0c0c75"}, "ets": {:hex, :ets, "0.9.0", "79c6a6c205436780486f72d84230c6cba2f8a9920456750ddd1e47389107d5fd", [:mix], [], "hexpm", "2861fdfb04bcaeff370f1a5904eec864f0a56dcfebe5921ea9aadf2a481c822b"}, @@ -27,6 +27,7 @@ "oban": {:hex, :oban, "2.15.4", "d49ab4ffb7153010e32f80fe9e56f592706238149ec579eb50f8a4e41d218856", [:mix], [{:ecto_sql, "~> 3.6", [hex: :ecto_sql, repo: "hexpm", optional: false]}, {:ecto_sqlite3, "~> 0.9", [hex: :ecto_sqlite3, repo: "hexpm", optional: true]}, {:jason, "~> 1.1", [hex: :jason, repo: "hexpm", optional: false]}, {:postgrex, "~> 0.16", [hex: :postgrex, repo: "hexpm", optional: true]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "5fce611fdfffb13e9148df883116e5201adf1e731eb302cc88cde0588510079c"}, "oban_pro": {:hex, :oban_pro, "1.0.2", "fa25f84f24c665f0f812321a35ef37eaa5e6e929782727226becb9ea2cf05eb3", [:mix], [{:ecto_sql, "~> 3.8", [hex: :ecto_sql, repo: "hexpm", optional: false]}, {:libgraph, "~> 0.13", [hex: :libgraph, repo: "hexpm", optional: true]}, {:oban, "~> 2.15.4", [hex: :oban, repo: "hexpm", optional: false]}, {:postgrex, "~> 0.16", [hex: :postgrex, repo: "hexpm", optional: true]}], "oban", "65b87d47f08d43c39ad5c026e2c86392296d29c028011d92670baf7032e93760"}, "picosat_elixir": {:hex, :picosat_elixir, "0.2.3", "bf326d0f179fbb3b706bb2c15fbc367dacfa2517157d090fdfc32edae004c597", [:make, :mix], [{:elixir_make, "~> 0.6", [hex: :elixir_make, repo: "hexpm", optional: false]}], "hexpm", "f76c9db2dec9d2561ffaa9be35f65403d53e984e8cd99c832383b7ab78c16c66"}, + "postgrex": {:hex, :postgrex, "0.17.4", "5777781f80f53b7c431a001c8dad83ee167bcebcf3a793e3906efff680ab62b3", [:mix], [{:db_connection, "~> 2.1", [hex: :db_connection, repo: "hexpm", optional: false]}, {:decimal, "~> 1.5 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}, {:table, "~> 0.1.0", [hex: :table, repo: "hexpm", optional: true]}], "hexpm", "6458f7d5b70652bc81c3ea759f91736c16a31be000f306d3c64bcdfe9a18b3cc"}, "sobelow": {:hex, :sobelow, "0.13.0", "218afe9075904793f5c64b8837cc356e493d88fddde126a463839351870b8d1e", [:mix], [{:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "cd6e9026b85fc35d7529da14f95e85a078d9dd1907a9097b3ba6ac7ebbe34a0d"}, "sourceror": {:hex, :sourceror, "0.14.1", "c6fb848d55bd34362880da671debc56e77fd722fa13b4dcbeac89a8998fc8b09", [:mix], [], "hexpm", "8b488a219e4c4d7d9ff29d16346fd4a5858085ccdd010e509101e226bbfd8efc"}, "spark": {:hex, :spark, "1.1.54", "54dac39403a2960f738ba5d60678d20b30de7381fb51b787b6bcb6aeabb73d9d", [:mix], [{:jason, "~> 1.4", [hex: :jason, repo: "hexpm", optional: false]}, {:nimble_options, "~> 0.5 or ~> 1.0", [hex: :nimble_options, repo: "hexpm", optional: false]}, {:sourceror, "~> 0.1", [hex: :sourceror, repo: "hexpm", optional: false]}], "hexpm", "abc9a67cfb60a97d2f3c7e270fa968a2ace94f389e2741d406239d237ec6dbb1"}, diff --git a/test/ash_oban_test.exs b/test/ash_oban_test.exs index cc9ccd4..d5cd71f 100644 --- a/test/ash_oban_test.exs +++ b/test/ash_oban_test.exs @@ -1,10 +1,41 @@ defmodule AshObanTest do - use ExUnit.Case + use ExUnit.Case, async: false doctest AshOban alias AshOban.Test.Api alias AshOban.Test.Triggered + setup_all do + AshOban.Test.Repo.start_link() + Oban.start_link(AshOban.config([Api], Application.get_env(:ash_oban, :oban))) + + :ok + end + + test "nothing happens if no records exist" do + assert %{success: 1} = AshOban.Test.schedule_and_run_triggers(Triggered) + end + + test "if a record exists, it is processed" do + Triggered + |> Ash.Changeset.for_create(:create, %{}) + |> Api.create!() + + assert %{success: 2} = + AshOban.Test.schedule_and_run_triggers(Triggered, + actor: %AshOban.Test.ActorPersister.FakeActor{id: 1} + ) + end + + test "if an actor is not set, it is nil when executing the job" do + Triggered + |> Ash.Changeset.for_create(:create) + |> Api.create!() + + assert %{success: 1, failure: 1} = + AshOban.Test.schedule_and_run_triggers(Triggered) + end + test "dsl introspection" do assert [ %AshOban.Trigger{action: :process}, diff --git a/test/support/actor_persister.ex b/test/support/actor_persister.ex new file mode 100644 index 0000000..cbf80d5 --- /dev/null +++ b/test/support/actor_persister.ex @@ -0,0 +1,17 @@ +defmodule AshOban.Test.ActorPersister do + use AshOban.ActorPersister + + defmodule FakeActor do + defstruct id: nil + end + + def store(%FakeActor{id: id}) do + %{"id" => id} + end + + def lookup(%{"id" => id}) do + {:ok, %FakeActor{id: id}} + end + + def lookup(nil), do: {:ok, nil} +end diff --git a/test/support/repo.ex b/test/support/repo.ex new file mode 100644 index 0000000..e8c9fa9 --- /dev/null +++ b/test/support/repo.ex @@ -0,0 +1,3 @@ +defmodule AshOban.Test.Repo do + use Ecto.Repo, adapter: Ecto.Adapters.Postgres, otp_app: :ash_oban +end diff --git a/test/support/triggered.ex b/test/support/triggered.ex index 1bb17e4..25bfb38 100644 --- a/test/support/triggered.ex +++ b/test/support/triggered.ex @@ -2,6 +2,7 @@ defmodule AshOban.Test.Triggered do @moduledoc false use Ash.Resource, data_layer: Ash.DataLayer.Ets, + authorizers: [Ash.Policy.Authorizer], extensions: [AshOban] oban do @@ -11,12 +12,14 @@ defmodule AshOban.Test.Triggered do trigger :process do action :process where expr(processed != true) + max_attempts 2 worker_read_action(:read) end trigger :process_2 do action :process where expr(processed != true) + max_attempts 2 worker_read_action(:read) scheduler_cron false end @@ -27,6 +30,16 @@ defmodule AshOban.Test.Triggered do end end + policies do + policy action(:process) do + authorize_if actor_present() + end + + policy always() do + authorize_if always() + end + end + actions do defaults [:create] @@ -37,6 +50,11 @@ defmodule AshOban.Test.Triggered do update :process do change set_attribute(:processed, true) + + change fn changeset, context -> + send(self(), {:actor, context.actor}) + changeset + end end action :say_hello, :string do @@ -52,5 +70,6 @@ defmodule AshOban.Test.Triggered do attributes do uuid_primary_key :id + attribute :processed, :boolean, default: false, allow_nil?: false end end diff --git a/test_migrations/20240220165645_install_oban.exs b/test_migrations/20240220165645_install_oban.exs new file mode 100644 index 0000000..600285d --- /dev/null +++ b/test_migrations/20240220165645_install_oban.exs @@ -0,0 +1,11 @@ +defmodule AshOban.Test.Repo.Migrations.InstallOban do + use Ecto.Migration + + def up do + Oban.Migrations.up(prefix: "private") + end + + def down do + Oban.Migrations.down(prefix: "private") + end +end