Skip to content

Commit

Permalink
Merge pull request #380 from rbino/trigger-engine-prefetch-count
Browse files Browse the repository at this point in the history
trigger_engine: add AMQP prefetch count
  • Loading branch information
drf authored May 14, 2020
2 parents 6de679d + 9fd8a1f commit 777c073
Show file tree
Hide file tree
Showing 4 changed files with 19 additions and 0 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.
### Added
- [data_updater_plant] Add `DATA_UPDATER_PLANT_AMQP_DATA_QUEUE_TOTAL_COUNT` environment variable,
this must be equal to the total number of queues in the Astarte instance.
- [trigger_engine] Add `TRIGGER_ENGINE_AMQP_PREFETCH_COUNT` environment variable to set the
prefetech count of AMQPEventsConsumer, avoiding excessive memory usage.

### Fixed
- Wait for schema_version agreement before applying any schema change (such as creating tables or a
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,15 @@ See the moduledoc for `Conform.Schema.Validator` for more details and examples.
hidden: false,
to: "astarte_trigger_engine.amqp_consumer_options.virtual_host"
],
"amqp_consumer_options.virtual_host": [
commented: true,
datatype: :integer,
default: 300,
env_var: "TRIGGER_ENGINE_AMQP_PREFETCH_COUNT",
doc: "Prefetch count for the AMQP consumer",
hidden: false,
to: "astarte_trigger_engine.amqp_prefetch_count"
],
amqp_events_queue_name: [
commented: true,
datatype: :binary,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ defmodule Astarte.TriggerEngine.AMQPEventsConsumer do
defp connect() do
with {:ok, conn} <- Connection.open(Config.amqp_consumer_options()),
{:ok, chan} <- Channel.open(conn),
:ok <- Basic.qos(chan, prefetch_count: Config.amqp_prefetch_count()),
:ok <- Exchange.declare(chan, Config.events_exchange_name(), :direct, durable: true),
{:ok, _queue} <- Queue.declare(chan, Config.events_queue_name(), durable: true),
:ok <-
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,13 @@ defmodule Astarte.TriggerEngine.Config do
Application.get_env(:astarte_trigger_engine, :amqp_consumer_options, [])
end

@doc """
Returns the AMQP prefetch count
"""
def amqp_prefetch_count do
Application.get_env(:astarte_trigger_engine, :amqp_prefetch_count, 300)
end

@doc """
Returns the events name of the exchange on which events are published
"""
Expand Down

0 comments on commit 777c073

Please sign in to comment.