Skip to content

Commit

Permalink
remove using Marshal, correct serialized objects
Browse files Browse the repository at this point in the history
  • Loading branch information
prog-supdex committed Sep 22, 2023
1 parent f08db8e commit 646d680
Show file tree
Hide file tree
Showing 6 changed files with 164 additions and 27 deletions.
3 changes: 2 additions & 1 deletion graphql-anycable.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,13 @@ Gem::Specification.new do |spec|
spec.add_dependency "anyway_config", ">= 1.3", "< 3"
spec.add_dependency "graphql", ">= 1.11", "< 3"
spec.add_dependency "redis", ">= 4.2.0"
spec.add_dependency "activejob", ">= 5.0.0"
spec.add_dependency "activejob", ">= 5.2.0"

spec.add_development_dependency "anycable-rails"
spec.add_development_dependency "bundler", "~> 2.0"
spec.add_development_dependency "rack"
spec.add_development_dependency "railties"
spec.add_development_dependency "rake", ">= 12.3.3"
spec.add_development_dependency "rspec", "~> 3.0"
spec.add_development_dependency "rspec-rails", "~> 5.0"
end
2 changes: 1 addition & 1 deletion lib/graphql-anycable.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
require_relative "graphql/anycable/cleaner"
require_relative "graphql/anycable/config"
require_relative "graphql/anycable/railtie" if defined?(Rails)
require_relative "graphql/subscriptions/adapters/base_job"
require_relative "graphql/adapters/base_job"
require_relative "graphql/anycable/stats"
require_relative "graphql/subscriptions/anycable_subscriptions"

Expand Down
30 changes: 30 additions & 0 deletions lib/graphql/adapters/base_job.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
# frozen_string_literal: true

require "active_job"

module GraphQL
module Adapters
class BaseJob < ActiveJob::Base
DEFAULT_QUEUE_NAME = :default

queue_as { GraphQL::AnyCable.config.async_broadcasting["queue"] || DEFAULT_QUEUE_NAME }

def perform(serialized_payload, execute_method, event_name, args = {}, object = nil, options = {})
schema = schema_parse(serialized_payload)

schema.public_send(execute_method, event_name, args, object, **options)
end

private

def schema_parse(serialized_payload)
payload = JSON.parse(serialized_payload)

payload["schema"] = payload["schema"].constantize
payload["serializer"] = payload["serializer"].constantize

GraphQL::Subscriptions::AnyCableSubscriptions.new(**payload.transform_keys(&:to_sym))
end
end
end
end
17 changes: 0 additions & 17 deletions lib/graphql/subscriptions/adapters/base_job.rb

This file was deleted.

39 changes: 31 additions & 8 deletions lib/graphql/subscriptions/anycable_subscriptions.rb
Original file line number Diff line number Diff line change
Expand Up @@ -55,16 +55,20 @@ class AnyCableSubscriptions < GraphQL::Subscriptions
extend Forwardable

def_delegators :"GraphQL::AnyCable", :redis, :config
alias_method :trigger_sync, :trigger

SUBSCRIPTION_PREFIX = "subscription:" # HASH: Stores subscription data: query, context, …
FINGERPRINTS_PREFIX = "fingerprints:" # ZSET: To get fingerprints by topic
SUBSCRIPTIONS_PREFIX = "subscriptions:" # SET: To get subscriptions by fingerprint
CHANNEL_PREFIX = "channel:" # SET: Auxiliary structure for whole channel's subscriptions cleanup
EXECUTOR_METHOD_NAME = "execute_synchronically" # method, who execute the main logic
EXECUTOR_METHOD_NAME = "trigger_sync" # method, who executes the sync method "trigger"

# @param serializer [<#dump(obj), #load(string)] Used for serializing messages before handing them to `.broadcast(msg)`
def initialize(serializer: Serialize, **rest)
@serializer = serializer

@serialized_arguments = serialize_arguments(serializer, rest)

super
end

Expand All @@ -74,13 +78,6 @@ def execute_all(event, object)
fingerprints = redis.zrange(redis_key(FINGERPRINTS_PREFIX) + event.topic, 0, -1)
return if fingerprints.empty?

perform(event, object)
end

def execute_synchronically(event, object)
fingerprints = redis.zrange(redis_key(FINGERPRINTS_PREFIX) + event.topic, 0, -1)
return if fingerprints.empty?

fingerprint_subscription_ids = Hash[fingerprints.zip(
redis.pipelined do |pipeline|
fingerprints.map do |fingerprint|
Expand Down Expand Up @@ -214,8 +211,25 @@ def delete_channel_subscriptions(channel_or_id)
redis.del(redis_key(CHANNEL_PREFIX) + channel_id)
end

def trigger(event_name, args, object, **elements)
unless config.use_async_broadcasting
return trigger_sync(event_name, args, object, **elements)
end

executor_class_job.perform_later(
serialized_arguments,
EXECUTOR_METHOD_NAME,
event_name,
args,
object,
elements
)
end

private

attr_reader :serialized_arguments

def anycable
@anycable ||= ::AnyCable.broadcast_adapter
end
Expand Down Expand Up @@ -269,6 +283,15 @@ def perform(event, object)

executor_class_job.perform_later(*args)
end

def serialize_arguments(serializer, payload)
payload = payload.dup

payload[:serializer] = serializer.to_s
payload[:schema] = payload[:schema].to_s

JSON.dump(payload)
end
end
end
end
Expand Down
100 changes: 100 additions & 0 deletions spec/adapters/base_job_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
# frozen_string_literal: true

require "active_job"

RSpec.describe GraphQL::Adapters::BaseJob, type: :job do
ActiveJob::Base.queue_adapter = :inline

subject(:job) { described_class.perform_later(*job_payload) }
subject(:broadcast_changes) { AnycableSchema.subscriptions.trigger(*trigger_sync_arguments) }

before do
AnycableSchema.execute(
query: query,
context: { channel: channel, subscription_id: subscription_id },
variables: {},
operation_name: "SomeSubscription",
)
end

let(:trigger_sync_arguments) do
[
:product_updated,
{},
{id: 1, title: "foo"}
]
end

let(:job_payload) do
[
"{\"schema\":\"AnycableSchema\",\"serializer\":\"GraphQL::Subscriptions::Serialize\"}",
"trigger_sync",
*trigger_sync_arguments
]
end

let(:query) do
<<~GRAPHQL
subscription SomeSubscription { productUpdated { id } }
GRAPHQL
end

let(:channel) do
socket = double("Socket", istate: AnyCable::Socket::State.new({}))
connection = double("Connection", anycable_socket: socket)
double("Channel", id: "legacy_id", params: { "channelId" => "legacy_id" }, stream_from: nil, connection: connection)
end

let(:subscription_id) do
"some-truly-random-number"
end

context "when config.use_async_broadcasting is true" do
around do |ex|
GraphQL::AnyCable.config.use_async_broadcasting = true
ex.run
GraphQL::AnyCable.config.use_async_broadcasting = false
end

it "executes AnyCableSubscriptions" do
ActiveJob::Base.queue_adapter = :inline

expect_any_instance_of(GraphQL::Subscriptions::AnyCableSubscriptions)
.to receive(:trigger_sync).with(*trigger_sync_arguments)

expect_any_instance_of(GraphQL::Adapters::BaseJob).to receive(:perform).and_call_original

broadcast_changes
end

it "adds BaseJob to enqueued_jobs" do
ActiveJob::Base.queue_adapter = :test

expect { broadcast_changes }.to change(ActiveJob::Base.queue_adapter.enqueued_jobs, :size).by(1)
end
end

context "when config.use_async_broadcasting is false" do
before do
GraphQL::AnyCable.config.use_async_broadcasting = false
end

it "executes AnyCableSubscriptions" do
ActiveJob::Base.queue_adapter = :inline

expect_any_instance_of(GraphQL::Subscriptions::AnyCableSubscriptions)
.to receive(:trigger_sync).with(*trigger_sync_arguments)

expect_any_instance_of(GraphQL::Adapters::BaseJob).to_not receive(:perform)

broadcast_changes
end

it "does not add BaseJob to enqueued_jobs" do
ActiveJob::Base.queue_adapter = :test

expect { broadcast_changes }.to change(ActiveJob::Base.queue_adapter.enqueued_jobs, :size).by(0)
end
end
end

0 comments on commit 646d680

Please sign in to comment.