Skip to content

Commit

Permalink
Drop handling of client-provided channel identifiers (#23)
Browse files Browse the repository at this point in the history
  • Loading branch information
Envek authored May 7, 2024
1 parent 23c67ff commit d75e7a4
Show file tree
Hide file tree
Showing 13 changed files with 19 additions and 166 deletions.
5 changes: 1 addition & 4 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ on:

jobs:
test:
name: "GraphQL-Ruby ${{ matrix.graphql }} AnyCable ${{ matrix.anycable }} on Ruby ${{ matrix.ruby }} (use_client_id: ${{ matrix.client_id }}) Redis ${{ matrix.redis_version }}"
name: "GraphQL-Ruby ${{ matrix.graphql }} AnyCable ${{ matrix.anycable }} on Ruby ${{ matrix.ruby }} Redis ${{ matrix.redis_version }}"
# Skip running tests for local pull requests (use push event instead), run only for foreign ones
if: github.event_name != 'pull_request' || github.event.pull_request.head.repo.owner.login != github.event.pull_request.base.repo.owner.login
runs-on: ubuntu-latest
Expand All @@ -20,17 +20,14 @@ jobs:
include:
- ruby: "3.3"
graphql: '~> 2.3'
client_id: 'false'
anycable: '~> 1.5'
redis_version: latest
- ruby: "3.2"
graphql: '~> 2.2.0'
client_id: 'false'
anycable: '~> 1.4.0'
redis_version: '7.2'
- ruby: "3.1"
graphql: '~> 2.0.0'
client_id: 'false'
anycable: '~> 1.3.0'
redis_version: '6.2'
env:
Expand Down
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,11 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.

### Removed

- Handling of client-provided channel identifiers. **BREAKING CHANGE**

Please make sure that you have changed your channel `disconnected` method to pass channel instance to GraphQL-AnyCable's `delete_channel_subscriptions` method.
See [release notes for version 1.1.0](https://github.com/anycable/graphql-anycable/releases/tag/v1.1.0) for details.

- Handling of pre-1.0 subscriptions data.

If you're still using version 0.5 or below, please upgrade to 1.0 or 1.1 first with `handle_legacy_subscriptions` setting enabled.
Expand Down
4 changes: 1 addition & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,6 @@ GraphQL-AnyCable uses [anyway_config] to configure itself. There are several pos
```.env
GRAPHQL_ANYCABLE_SUBSCRIPTION_EXPIRATION_SECONDS=604800
GRAPHQL_ANYCABLE_USE_REDIS_OBJECT_ON_CLEANUP=true
GRAPHQL_ANYCABLE_USE_CLIENT_PROVIDED_UNIQ_ID=false
GRAPHQL_ANYCABLE_REDIS_PREFIX=graphql
```
Expand All @@ -151,7 +150,6 @@ GraphQL-AnyCable uses [anyway_config] to configure itself. There are several pos
production:
subscription_expiration_seconds: 300 # 5 minutes
use_redis_object_on_cleanup: false # For restricted redis installations
use_client_provided_uniq_id: false # To avoid problems with non-uniqueness of Apollo channel identifiers
redis_prefix: graphql # You can configure redis_prefix for anycable-graphql subscription prefixes. Default value "graphql"
```
Expand Down Expand Up @@ -232,7 +230,7 @@ As in AnyCable there is no place to store subscription data in-memory, it should
}
```

4. Channel subscriptions: `graphql-channel:#{channel_id}` set containing identifiers for subscriptions created in ActionCable channel to delete them on client disconnect.
4. Channel subscriptions: `graphql-channel:#{subscription_id}` set containing identifiers for subscriptions created in ActionCable channel to delete them on client disconnect.

```
SMEMBERS graphql-channel:17420c6ed9e
Expand Down
6 changes: 0 additions & 6 deletions lib/graphql-anycable.rb
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,6 @@
module GraphQL
module AnyCable
def self.use(schema, **options)
if config.use_client_provided_uniq_id?
warn "[Deprecated] Using client provided channel uniq IDs could lead to unexpected behaviour, " \
"please, set GraphQL::AnyCable.config.use_client_provided_uniq_id = false or GRAPHQL_ANYCABLE_USE_CLIENT_PROVIDED_UNIQ_ID=false, " \
"and update the `#unsubscribed` callback code according to the latest docs."
end

schema.use GraphQL::Subscriptions::AnyCableSubscriptions, **options
end

Expand Down
1 change: 0 additions & 1 deletion lib/graphql/anycable/config.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ class Config < Anyway::Config

attr_config subscription_expiration_seconds: nil
attr_config use_redis_object_on_cleanup: true
attr_config use_client_provided_uniq_id: true
attr_config redis_prefix: "graphql" # Here, we set clear redis_prefix without any hyphen. The hyphen is added at the end of this value on our side.
end
end
Expand Down
16 changes: 7 additions & 9 deletions lib/graphql/subscriptions/anycable_subscriptions.rb
Original file line number Diff line number Diff line change
Expand Up @@ -126,11 +126,8 @@ def write_subscription(query, events)

raise GraphQL::AnyCable::ChannelConfigurationError unless channel

channel_uniq_id = config.use_client_provided_uniq_id? ? channel.params["channelId"] : subscription_id

# Store subscription_id in the channel state to cleanup on disconnect
write_subscription_id(channel, channel_uniq_id)

write_subscription_id(channel, subscription_id)

events.each do |event|
channel.stream_from(redis_key(SUBSCRIPTIONS_PREFIX) + event.fingerprint)
Expand All @@ -145,14 +142,14 @@ def write_subscription(query, events)
}

redis.multi do |pipeline|
pipeline.sadd(redis_key(CHANNEL_PREFIX) + channel_uniq_id, [subscription_id])
pipeline.sadd(redis_key(CHANNEL_PREFIX) + subscription_id, [subscription_id])
pipeline.mapped_hmset(redis_key(SUBSCRIPTION_PREFIX) + subscription_id, data)
events.each do |event|
pipeline.zincrby(redis_key(FINGERPRINTS_PREFIX) + event.topic, 1, event.fingerprint)
pipeline.sadd(redis_key(SUBSCRIPTIONS_PREFIX) + event.fingerprint, [subscription_id])
end
next unless config.subscription_expiration_seconds
pipeline.expire(redis_key(CHANNEL_PREFIX) + channel_uniq_id, config.subscription_expiration_seconds)
pipeline.expire(redis_key(CHANNEL_PREFIX) + subscription_id, config.subscription_expiration_seconds)
pipeline.expire(redis_key(SUBSCRIPTION_PREFIX) + subscription_id, config.subscription_expiration_seconds)
end
end
Expand Down Expand Up @@ -193,9 +190,10 @@ def delete_subscription(subscription_id)
end

# The channel was closed, forget about it and its subscriptions
def delete_channel_subscriptions(channel_or_id)
# For backward compatibility
channel_id = channel_or_id.is_a?(String) ? channel_or_id : read_subscription_id(channel_or_id)
def delete_channel_subscriptions(channel)
raise(ArgumentError, "Please pass channel instance to #{__method__} in your #unsubscribed method") if channel.is_a?(String)

channel_id = read_subscription_id(channel)

# Missing in case disconnect happens before #execute
return unless channel_id
Expand Down
43 changes: 0 additions & 43 deletions spec/graphql/anycable_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -113,12 +113,6 @@

describe ".delete_channel_subscriptions" do
context "with default config.redis-prefix" do
around do |ex|
GraphQL::AnyCable.config.use_client_provided_uniq_id = false
ex.run
GraphQL::AnyCable.config.use_client_provided_uniq_id = false
end

before do
AnycableSchema.execute(
query: query,
Expand Down Expand Up @@ -148,12 +142,10 @@
context "with different config.redis-prefix" do
around do |ex|
old_redis_prefix = GraphQL::AnyCable.config.redis_prefix
GraphQL::AnyCable.config.use_client_provided_uniq_id = false
GraphQL::AnyCable.config.redis_prefix = "graphql-test"

ex.run

GraphQL::AnyCable.config.use_client_provided_uniq_id = false
GraphQL::AnyCable.config.redis_prefix = old_redis_prefix
end

Expand Down Expand Up @@ -184,41 +176,6 @@
end
end

describe "legacy .delete_channel_subscriptions" do
before do
GraphQL::AnyCable.config.use_client_provided_uniq_id = true
end

before do
AnycableSchema.execute(
query: query,
context: { channel: channel, subscription_id: subscription_id },
variables: {},
operation_name: "SomeSubscription",
)
end

after do
GraphQL::AnyCable.config.use_client_provided_uniq_id = false
end

let(:redis) { AnycableSchema.subscriptions.redis }

subject do
AnycableSchema.subscriptions.delete_channel_subscriptions(channel.id)
end

it "removes subscription from redis" do
expect(redis.exists?("graphql-subscription:some-truly-random-number")).to be true
expect(redis.exists?("graphql-channel:legacy_id")).to be true
expect(redis.exists?("graphql-fingerprints::productUpdated:")).to be true
subject
expect(redis.exists?("graphql-channel:legacy_id")).to be false
expect(redis.exists?("graphql-fingerprints::productUpdated:")).to be false
expect(redis.exists?("graphql-subscription:some-truly-random-number")).to be false
end
end

describe "with missing channel instance in execution context" do
subject do
AnycableSchema.execute(
Expand Down
1 change: 0 additions & 1 deletion spec/graphql/broadcast_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ def subscribe(query)

before do
allow(channel).to receive(:stream_from)
allow(channel).to receive(:params).and_return("channelId" => "ohmycables")
allow(anycable).to receive(:broadcast)
end

Expand Down
3 changes: 1 addition & 2 deletions spec/integration_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,7 @@
let(:schema) { nil }
let(:identifiers) { {current_user: "john", schema: schema.to_s} }
let(:channel_class) { "GraphqlChannel" }
let(:channel_params) { {channelId: rand(1000).to_s} }
let(:channel_identifier) { {channel: channel_class}.merge(channel_params) }
let(:channel_identifier) { {channel: channel_class} }
let(:channel_id) { channel_identifier.to_json }

let(:handler) { AnyCable::RPC::Handler.new }
Expand Down
48 changes: 2 additions & 46 deletions spec/integrations/broadcastable_subscriptions_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -42,17 +42,15 @@

stream_name = subject.streams.first

# update request context and channelId
# update request context
request.connection_identifiers = identifiers.merge(current_user: "alice").to_json
request.identifier = channel_identifier.merge(channelId: rand(1000).to_s).to_json

response = handler.handle(:command, request)
expect(response).to be_success
expect(response.streams).to eq([stream_name])

# now update the query param
request.data = data.merge(variables: {id: "b"}).to_json
request.identifier = channel_identifier.merge(channelId: rand(1000).to_s).to_json

response = handler.handle(:command, request)
expect(response).to be_success
Expand Down Expand Up @@ -114,9 +112,8 @@

request_2 = request.dup

# update request context and channelId
# update request context
request_2.connection_identifiers = identifiers.merge(current_user: "alice").to_json
request_2.identifier = channel_identifier.merge(channelId: rand(1000).to_s).to_json

response_2 = handler.handle(:command, request_2)

Expand Down Expand Up @@ -157,47 +154,6 @@
expect(AnyCable.broadcast_adapter).to have_received(:broadcast).twice
end

context "with similar ChannelId and not using ID from client" do
let(:config) { GraphQL::AnyCable.config }

around do |ex|
config.use_client_provided_uniq_id.tap do |was_val|
config.use_client_provided_uniq_id = false
ex.run
config.use_client_provided_uniq_id = was_val
end
end

specify "creates an entry for each subscription" do
# first, subscribe to obtain the connection state
subscribe_response = handler.handle(:command, request)
expect(subscribe_response).to be_success

expect(redis.keys("graphql-subscription:*").size).to eq(1)
expect(redis.keys("graphql-subscriptions:*").size).to eq(1)

# update request context
request.connection_identifiers = identifiers.merge(current_user: "alice").to_json

response = handler.handle(:command, request)

expect(redis.keys("graphql-subscription:*").size).to eq(2)
expect(redis.keys("graphql-subscriptions:*").size).to eq(1)

istate = response.istate

request.command = "unsubscribe"
request.data = ""
request.istate = istate

response = handler.handle(:command, request)
expect(response).to be_success

expect(redis.keys("graphql-subscription:*").size).to eq(1)
expect(redis.keys("graphql-subscriptions:*").size).to eq(1)
end
end

context "without subscription" do
let(:data) { nil }
let(:command) { "unsubscribe" }
Expand Down
48 changes: 1 addition & 47 deletions spec/integrations/per_client_subscriptions_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,6 @@

all_streams = Set.new(subject.streams)

# update request channelId
request.identifier = channel_identifier.merge(channelId: rand(1000).to_s).to_json

response = handler.handle(:command, request)
expect(response).to be_success
expect(response.streams.size).to eq 1
Expand All @@ -55,7 +52,6 @@

# now update the query param
request.data = data.merge(variables: {id: "b"}).to_json
request.identifier = channel_identifier.merge(channelId: rand(1000).to_s).to_json

response = handler.handle(:command, request)
expect(response).to be_success
Expand Down Expand Up @@ -98,9 +94,8 @@

request_2 = request.dup

# update request context and channelId
# update request context
request_2.connection_identifiers = identifiers.merge(current_user: "alice").to_json
request_2.identifier = channel_identifier.merge(channelId: rand(1000).to_s).to_json

response_2 = handler.handle(:command, request_2)

Expand Down Expand Up @@ -141,47 +136,6 @@
expect(AnyCable.broadcast_adapter).to have_received(:broadcast).thrice
end

context "with similar ChannelId and not using ID from client" do
let(:config) { GraphQL::AnyCable.config }

around do |ex|
config.use_client_provided_uniq_id.tap do |was_val|
config.use_client_provided_uniq_id = false
ex.run
config.use_client_provided_uniq_id = was_val
end
end

specify "creates an entry for each subscription" do
# first, subscribe to obtain the connection state
subscribe_response = handler.handle(:command, request)
expect(subscribe_response).to be_success

expect(redis.keys("graphql-subscription:*").size).to eq(1)
expect(redis.keys("graphql-subscriptions:*").size).to eq(1)

# update request context
request.connection_identifiers = identifiers.merge(current_user: "alice").to_json

response = handler.handle(:command, request)

expect(redis.keys("graphql-subscription:*").size).to eq(2)
expect(redis.keys("graphql-subscriptions:*").size).to eq(2)

istate = response.istate

request.command = "unsubscribe"
request.data = ""
request.istate = istate

response = handler.handle(:command, request)
expect(response).to be_success

expect(redis.keys("graphql-subscription:*").size).to eq(1)
expect(redis.keys("graphql-subscriptions:*").size).to eq(1)
end
end

context "without subscription" do
let(:data) { nil }
let(:command) { "unsubscribe" }
Expand Down
3 changes: 1 addition & 2 deletions spec/integrations/rails_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -120,9 +120,8 @@ def context

request_2 = request.dup

# update request context and channelId
# update request context
request_2.connection_identifiers = identifiers.merge(current_user: "alice").to_json
request_2.identifier = channel_identifier.merge(channelId: rand(1000).to_s).to_json

response_2 = handler.handle(:command, request_2)

Expand Down
2 changes: 0 additions & 2 deletions spec/spec_helper.rb
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
# frozen_string_literal: true

ENV["GRAPHQL_ANYCABLE_USE_CLIENT_PROVIDED_UNIQ_ID"] ||= "false"

require "bundler/setup"
require "graphql/anycable"
require "pry"
Expand Down

0 comments on commit d75e7a4

Please sign in to comment.