Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

deliver broadcast messages asynchronously #41

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,34 @@ Or you can use `collect`
end
```

## Async delivering messages

If you use `Rails` application or you use `ActiveJob`, you can deliver messages using `ActiveJob`

### Configuration

You have the next configuration

```ruby
GraphQL::AnyCable.configure do |config|
# ... other configurations
config.delivery_method = "inline" # the default value "inline", also can be "active_job"
config.queue = "default" # the name of ActiveJob queue
config.job_class = "GraphQL::Jobs::TriggerJob" # the name executor job
end
```

`delivery_method` can be either `inline` or `active_job`.
`inline` means that delivering messaging will work sync.
`active_job` - It will add delivering messages operations to `ActiveJob` with queue `default` and using job `GraphQL::Jobs::TriggerJob`

You can change the queue or job_class by changing it in the configuration

Or you can run code

```ruby
GraphQL::AnyCable.delivery_method = "active_job", { queue: "broadcasting", job_class: "GraphQL::Jobs::TriggerJob" }
```

## Testing applications which use `graphql-anycable`

Expand Down
1 change: 1 addition & 0 deletions graphql-anycable.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -37,4 +37,5 @@ Gem::Specification.new do |spec|
spec.add_development_dependency "railties"
spec.add_development_dependency "rake", ">= 12.3.3"
spec.add_development_dependency "rspec", "~> 3.0"
spec.add_development_dependency "activejob", "~> 6.0"
end
14 changes: 14 additions & 0 deletions lib/graphql-anycable.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
require_relative "graphql/anycable/config"
require_relative "graphql/anycable/railtie" if defined?(Rails)
require_relative "graphql/anycable/stats"
require_relative "graphql/anycable/delivery_adapter"
require_relative "graphql/subscriptions/anycable_subscriptions"

module GraphQL
Expand All @@ -25,6 +26,19 @@ def self.stats(**options)
Stats.new(**options).collect
end

def self.delivery_method=(args)
method_name, options = Array(args)
options ||= {}

config.delivery_method = method_name
config.queue = options[:queue] if options[:queue]
config.job_class = options[:job_class] if options[:job_class]
end

def self.delivery_adapter(object)
DeliveryAdapter.lookup(executor_object: object)
end

module_function

def redis
Expand Down
25 changes: 25 additions & 0 deletions lib/graphql/adapters/active_job_adapter.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# frozen_string_literal: true

module GraphQL
module Adapters
class ActiveJobAdapter < BaseAdapter
def trigger(...)
executor_class_job.set(queue: config.queue).perform_later(
executor_object,
executor_method,
...
)
end

private

def executor_class_job
config.job_class.constantize
end

def config
GraphQL::AnyCable.config
end
end
end
end
18 changes: 18 additions & 0 deletions lib/graphql/adapters/base_adapter.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# frozen_string_literal: true

module GraphQL
module Adapters
class BaseAdapter
attr_reader :executor_object, :executor_method

def initialize(executor_object:)
@executor_object = executor_object
@executor_method = executor_object.class::EXECUTOR_METHOD_NAME
end

def trigger
raise NoMethodError, "#{__method__} method should be implemented in concrete class"
end
end
end
end
11 changes: 11 additions & 0 deletions lib/graphql/adapters/inline_adapter.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
# frozen_string_literal: true

module GraphQL
module Adapters
class InlineAdapter < BaseAdapter
def trigger(...)
executor_object.public_send(executor_method, ...)
end
end
end
end
32 changes: 32 additions & 0 deletions lib/graphql/anycable/config.rb
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,38 @@ class Config < Anyway::Config
attr_config use_redis_object_on_cleanup: true
attr_config use_client_provided_uniq_id: true
attr_config redis_prefix: "graphql" # Here, we set clear redis_prefix without any hyphen. The hyphen is added at the end of this value on our side.

attr_config delivery_method: "inline", queue: "default", job_class: "GraphQL::Jobs::TriggerJob"

def job_class=(value)
ensure_value_is_not_blank!("job_class", value)

super
end

def queue=(value)
ensure_value_is_not_blank!("queue", value)

super
end

def delivery_method=(value)
ensure_value_is_not_blank!("delivery_method", value)

super
end

private

def empty_value?(value)
value.nil? || value == ""
end

def ensure_value_is_not_blank!(name, value)
return unless empty_value?(value)

raise_validation_error("#{name} can not be blank")
end
end
end
end
25 changes: 25 additions & 0 deletions lib/graphql/anycable/delivery_adapter.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# frozen_string_literal: true

require "graphql/adapters/base_adapter"
require "graphql/adapters/inline_adapter"
require "graphql/adapters/active_job_adapter"

module GraphQL
module AnyCable
class DeliveryAdapter
class << self
def lookup(options)
adapter_class_name = config.delivery_method.to_s.split("_").map(&:capitalize).join

Adapters.const_get("#{adapter_class_name}Adapter").new(**(options || {}))
rescue NameError => e
raise e.class, "Delivery adapter :#{config.delivery_method} haven't been found", e.backtrace
end

def config
GraphQL::AnyCable.config
end
end
end
end
end
9 changes: 9 additions & 0 deletions lib/graphql/anycable/railtie.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,15 @@
module GraphQL
module AnyCable
class Railtie < ::Rails::Railtie
initializer "graphql_anycable.load_trigger_job" do
ActiveSupport.on_load(:active_job) do
require "graphql/jobs/trigger_job"
require "graphql/serializers/anycable_subscription_serializer"

ActiveJob::Serializers.add_serializers(GraphQL::Serializers::AnyCableSubscriptionSerializer)
end
end

rake_tasks do
path = File.expand_path(__dir__)
Dir.glob("#{path}/tasks/**/*.rake").each { |f| load f }
Expand Down
11 changes: 11 additions & 0 deletions lib/graphql/jobs/trigger_job.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
# frozen_string_literal: true

module GraphQL
module Jobs
class TriggerJob < ActiveJob::Base
def perform(executor_object, execute_method, event_name, args = {}, object = nil, options = {})
executor_object.public_send(execute_method, event_name, args, object, **options)
end
end
end
end
19 changes: 19 additions & 0 deletions lib/graphql/serializers/anycable_subscription_serializer.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
# frozen_string_literal: true

module GraphQL
module Serializers
class AnyCableSubscriptionSerializer < ActiveJob::Serializers::ObjectSerializer
def serialize?(argument)
argument.kind_of?(GraphQL::Subscriptions::AnyCableSubscriptions)
end

def serialize(subscription)
super(subscription.collected_arguments)
end

def deserialize(payload)
GraphQL::Subscriptions::AnyCableSubscriptions.new(**payload)
end
end
end
end
10 changes: 10 additions & 0 deletions lib/graphql/subscriptions/anycable_subscriptions.rb
Original file line number Diff line number Diff line change
Expand Up @@ -56,14 +56,20 @@ class AnyCableSubscriptions < GraphQL::Subscriptions

def_delegators :"GraphQL::AnyCable", :redis, :config

attr_reader :collected_arguments
alias_method :trigger_sync, :trigger

SUBSCRIPTION_PREFIX = "subscription:" # HASH: Stores subscription data: query, context, …
FINGERPRINTS_PREFIX = "fingerprints:" # ZSET: To get fingerprints by topic
SUBSCRIPTIONS_PREFIX = "subscriptions:" # SET: To get subscriptions by fingerprint
CHANNEL_PREFIX = "channel:" # SET: Auxiliary structure for whole channel's subscriptions cleanup
EXECUTOR_METHOD_NAME = "trigger_sync" # method, who executes the sync method "trigger"

# @param serializer [<#dump(obj), #load(string)] Used for serializing messages before handing them to `.broadcast(msg)`
def initialize(serializer: Serialize, **rest)
@serializer = serializer
@collected_arguments = { serializer: serializer, **rest }

super
end

Expand Down Expand Up @@ -206,6 +212,10 @@ def delete_channel_subscriptions(channel_or_id)
redis.del(redis_key(CHANNEL_PREFIX) + channel_id)
end

def trigger(...)
AnyCable.delivery_adapter(self).trigger(...)
end

private

def anycable
Expand Down
90 changes: 89 additions & 1 deletion spec/graphql/anycable_spec.rb
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
# frozen_string_literal: true

require "active_job"
require "graphql/jobs/trigger_job"

RSpec.describe GraphQL::AnyCable do
subject do
AnycableSchema.execute(
Expand Down Expand Up @@ -263,9 +266,94 @@

describe ".stats" do
it "calls Graphql::AnyCable::Stats" do
allow_any_instance_of(GraphQL::AnyCable::Stats).to receive(:collect)
expect_any_instance_of(GraphQL::AnyCable::Stats).to receive(:collect)

described_class.stats
end
end

describe ".delivery_adapter" do
context "when config.delivery_method is inline" do
around do |ex|
old_value = GraphQL::AnyCable.config.delivery_method
GraphQL::AnyCable.config.delivery_method = "inline"

ex.run

GraphQL::AnyCable.config.delivery_method = old_value
end

it "calls InlineAdapter" do
expect(GraphQL::Adapters::InlineAdapter).to receive(:new).with(executor_object: "any_object")

described_class.delivery_adapter("any_object")
end
end

context "when config.delivery_method is active_job" do
around do |ex|
old_value = GraphQL::AnyCable.config.delivery_method
GraphQL::AnyCable.config.delivery_method = "active_job"

ex.run

GraphQL::AnyCable.config.delivery_method = old_value
end

it "calls ActiveJobAdapter" do
expect(GraphQL::Adapters::ActiveJobAdapter).to receive(:new).with(executor_object: "any_object")

described_class.delivery_adapter("any_object")
end
end
end

describe ".delivery_method" do
let(:config) { GraphQL::AnyCable.config }

after do
config.delivery_method = "inline"
config.queue = "default"
config.job_class = "GraphQL::Jobs::TriggerJob"
end

it "changes config" do
expect(config.delivery_method).to eq("inline")
expect(config.queue).to eq("default")
expect(config.job_class).to eq("GraphQL::Jobs::TriggerJob")

described_class.delivery_method = :active_job, { queue: "test", job_class: "CustomJob" }

expect(config.delivery_method).to eq(:active_job)
expect(config.queue).to eq("test")
expect(config.job_class).to eq("CustomJob")
end

context "when entered empty delivery_method" do
it "raises an error" do
expect { described_class.delivery_method = nil }.to raise_error(
Anyway::Config::ValidationError,
/delivery_method can not be blank/,
)
end
end

context "when entered invalid queue" do
it "raises an error" do
expect { described_class.delivery_method = "inline", { queue: "" } }.to raise_error(
Anyway::Config::ValidationError,
/queue can not be blank/,
)
end
end

context "when entered invalid job_class" do
it "raises an error" do
expect { described_class.delivery_method = "inline", { job_class: "" } }.to raise_error(
Anyway::Config::ValidationError,
/job_class can not be blank/,
)
end
end
end
end
Loading