Skip to content

Commit

Permalink
deliver messages asynchronously
Browse files Browse the repository at this point in the history
  • Loading branch information
prog-supdex committed Sep 21, 2023
1 parent bef401d commit d686343
Show file tree
Hide file tree
Showing 5 changed files with 46 additions and 0 deletions.
1 change: 1 addition & 0 deletions graphql-anycable.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ Gem::Specification.new do |spec|
spec.add_dependency "anyway_config", ">= 1.3", "< 3"
spec.add_dependency "graphql", ">= 1.11", "< 3"
spec.add_dependency "redis", ">= 4.2.0"
spec.add_dependency "activejob", ">= 5.0.0"

spec.add_development_dependency "anycable-rails"
spec.add_development_dependency "bundler", "~> 2.0"
Expand Down
1 change: 1 addition & 0 deletions lib/graphql-anycable.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
require_relative "graphql/anycable/cleaner"
require_relative "graphql/anycable/config"
require_relative "graphql/anycable/railtie" if defined?(Rails)
require_relative "graphql/subscriptions/adapters/base_job"
require_relative "graphql/anycable/stats"
require_relative "graphql/subscriptions/anycable_subscriptions"

Expand Down
3 changes: 3 additions & 0 deletions lib/graphql/anycable/config.rb
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ class Config < Anyway::Config
attr_config use_redis_object_on_cleanup: true
attr_config use_client_provided_uniq_id: true
attr_config redis_prefix: "graphql" # Here, we set clear redis_prefix without any hyphen. The hyphen is added at the end of this value on our side.

attr_config use_async_broadcasting: true
attr_config async_broadcasting: { queue: "broadcasting", class: "GraphQL::Adapters::BaseJob" }
end
end
end
15 changes: 15 additions & 0 deletions lib/graphql/subscriptions/adapters/base_job.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
# frozen_string_literal: true

module GraphQL
module Adapters
class BaseJob < ActiveJob::Base
DEFAULT_QUEUE_NAME = :default

queue_as { GraphQL::AnyCable.config.async_broadcasting["queue"] || DEFAULT_QUEUE_NAME }

def perform(subscription_object, execute_method, event, object)
Marshal.load(subscription_object).public_send(execute_method, Marshal.load(event), Marshal.load(object))
end
end
end
end
26 changes: 26 additions & 0 deletions lib/graphql/subscriptions/anycable_subscriptions.rb
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ class AnyCableSubscriptions < GraphQL::Subscriptions
FINGERPRINTS_PREFIX = "fingerprints:" # ZSET: To get fingerprints by topic
SUBSCRIPTIONS_PREFIX = "subscriptions:" # SET: To get subscriptions by fingerprint
CHANNEL_PREFIX = "channel:" # SET: Auxiliary structure for whole channel's subscriptions cleanup
EXECUTOR_METHOD_NAME = "execute_synchronically" # method, who execute the main logic

# @param serializer [<#dump(obj), #load(string)] Used for serializing messages before handing them to `.broadcast(msg)`
def initialize(serializer: Serialize, **rest)
Expand All @@ -73,6 +74,13 @@ def execute_all(event, object)
fingerprints = redis.zrange(redis_key(FINGERPRINTS_PREFIX) + event.topic, 0, -1)
return if fingerprints.empty?

perform(event, object)
end

def execute_synchronically(event, object)
fingerprints = redis.zrange(redis_key(FINGERPRINTS_PREFIX) + event.topic, 0, -1)
return if fingerprints.empty?

fingerprint_subscription_ids = Hash[fingerprints.zip(
redis.pipelined do |pipeline|
fingerprints.map do |fingerprint|
Expand Down Expand Up @@ -243,6 +251,24 @@ def fetch_channel_istate(channel)
def redis_key(prefix)
"#{config.redis_prefix}-#{prefix}"
end

def executor_class_job
custom_class = config.async_broadcasting["class"]

return Adapters::BaseJob unless custom_class

Object.const_get(config.async_broadcasting["class"])
end

def perform(event, object)
unless config.use_async_broadcasting
return public_send(EXECUTOR_METHOD_NAME, event, object)
end

args = [Marshal.dump(self), EXECUTOR_METHOD_NAME, Marshal.dump(event), Marshal.dump(object)]

executor_class_job.perform_later(*args)
end
end
end
end
Expand Down

0 comments on commit d686343

Please sign in to comment.