Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added support for onMessage callback in PubSub subscribe #80

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
94 changes: 73 additions & 21 deletions lib/pubsub.dart
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
part of redis;

class _WarrningPubSubInProgress extends RedisConnection {
class _WarningPubSubInProgress extends RedisConnection {
RedisConnection _connection;
_WarrningPubSubInProgress(this._connection) {}
_WarningPubSubInProgress(this._connection) {}

_err() => throw "PubSub on this connaction in progress"
_err() => throw "PubSub on this connection in progress"
"It is not allowed to issue commands trough this handler";

// swap this relevant methods in Conenction with exception
// swap this relevant methods in connection with exception
// ignore: unused_element
Future _sendraw(Parser parser, List<int> data) => _err();

Expand All @@ -23,66 +23,118 @@ class _WarrningPubSubInProgress extends RedisConnection {

class PubSub {
late Command _command;
StreamController<List> _stream_controler = StreamController<List>();
Map<List<String>, void Function(dynamic, String?)> _onMessageListeners = {};

bool streamAlreadyListened = false;

StreamController<List> _streamController = StreamController<List>();

PubSub(Command command) {
_command = Command.from(command);
command.send_nothing()!.then((_) {
//override socket with warrning
command._connection = _WarrningPubSubInProgress(_command._connection);
//override socket with warning
command._connection = _WarningPubSubInProgress(_command._connection);
// listen and process forever
return Future.doWhile(() {
return _command._connection
._senddummy(_command.parser)
.then<bool>((var data) {
try {
_stream_controler.add(data);
_streamController.add(data);
return true; // run doWhile more
} catch (e) {
try {
_stream_controler.addError(e);
_streamController.addError(e);
} catch (_) {
// we could not notfy stream that we have eror
// we could not notify stream that we have error
}
// stop doWhile()
_stream_controler.close();
_streamController.close();
return false;
}
}).catchError((e) {
try {
_stream_controler.addError(e);
_streamController.addError(e);
} catch (_) {
// we could not notfy stream that we have eror
// we could not notify stream that we have error
}
// stop doWhile()
_stream_controler.close();
_streamController.close();
return false;
});
});
});
}

Stream getStream() {
return _stream_controler.stream;
return _streamController.stream;
}

/// Subscribes the client to the specified channels.
/// ```
/// subscriber.subscribe(['chat']);
/// subscriber.subscribe(['chat'],
/// onMessage: (dynamic message, String? channel) {
/// print(message);
/// print(channel);
/// },
/// );
/// ```
/// If you would like to handle on message via Stream,
/// onMessage callback can be optional
void subscribe(List<String> s,
{void Function(dynamic message, String? channel)? onMessage}) {
_sendCommandAndList("SUBSCRIBE", s);
if (onMessage != null) {
/// register onMessage callback to `_onMessageListeners`
_onMessageListeners[s] = onMessage;
_listenForNewMessage();
}
}

/// handle new message via stream and
/// return result to registered onMessage callbacks.
void _listenForNewMessage() {
if (streamAlreadyListened) return;
streamAlreadyListened = true;
getStream().listen((msg) {
var kind = msg[0];
if (kind != 'message') return;
var channel = msg[1];
var message = msg[2];
Function(dynamic, String?)? onMessageCallback =
_findOnMessageCallback(channel);
if (onMessageCallback != null) {
onMessageCallback(message, channel);
}
});
}

void subscribe(List<String> s) {
_sendcmd_and_list("SUBSCRIBE", s);
/// get onMessage callback related to the cannel
Function(dynamic, String?)? _findOnMessageCallback(String? channel) {
List<List<String>> channelsLists = _onMessageListeners.keys.toList();
channelsLists =
channelsLists.where((element) => element.contains(channel)).toList();
if (channelsLists.isNotEmpty) {
List<String>? channels = channelsLists.first;
return _onMessageListeners[channels];
}
return null;
}

void psubscribe(List<String> s) {
_sendcmd_and_list("PSUBSCRIBE", s);
_sendCommandAndList("PSUBSCRIBE", s);
}

void unsubscribe(List<String> s) {
_sendcmd_and_list("UNSUBSCRIBE", s);
_sendCommandAndList("UNSUBSCRIBE", s);
}

void punsubscribe(List<String> s) {
_sendcmd_and_list("PUNSUBSCRIBE", s);
_sendCommandAndList("PUNSUBSCRIBE", s);
}

void _sendcmd_and_list(String cmd, List<String> s) {
void _sendCommandAndList(String cmd, List<String> s) {
List list = [cmd];
list.addAll(s);
_command._connection._socket.add(_command.serializer.serialize(list));
Expand Down
1 change: 1 addition & 0 deletions pubspec.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,4 @@ environment:

dev_dependencies:
test: ^1.6.5
mockito: ^5.2.0
40 changes: 40 additions & 0 deletions test/pubsub_callback_test.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
import 'package:mockito/mockito.dart';
import 'package:redis/redis.dart';
import 'package:test/test.dart';

import 'main.dart';

class MockOnNewMessageCallback extends Mock {
void call(dynamic message, String? channel);
}

void main() async {
Command cmdP = await generate_connect();
Command cmdS = await generate_connect();

group("Test Redis Pub-Sub subscribe with onMessage callback", () {
PubSub subscriber = PubSub(cmdS);

test("Subscribe to channel and listen via callback", () async {
final mockOnNewMessageCallback = MockOnNewMessageCallback();

subscriber.subscribe(
["chat_room"],
onMessage: mockOnNewMessageCallback,
);
subscriber.subscribe(
["chat_room2"],
onMessage: mockOnNewMessageCallback,
);

await cmdP.send_object(["PUBLISH", "chat_room", "goodbye"]);
await cmdP.send_object(["PUBLISH", "chat_room2", "hello"]);

// wait for the callback is triggered completely
await Future.delayed(Duration(milliseconds: 500));

verify(mockOnNewMessageCallback("goodbye", "chat_room")).called(1);
verify(mockOnNewMessageCallback("hello", "chat_room2")).called(1);
});
});
}
30 changes: 14 additions & 16 deletions test/pubsub_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,7 @@ void main() async {
Command cmdP = await generate_connect();
Command cmdS = await generate_connect();

group("Test Redis Pub-Sub", () {

group("Test Redis Pub-Sub", () {
PubSub subscriber = PubSub(cmdS);

test("Publishing to channel before subscription", () {
Expand All @@ -26,7 +25,7 @@ void main() async {

expect(
() => cmdS.send_object("PING"),
throwsA(equals("PubSub on this connaction in progress"
throwsA(equals("PubSub on this connection in progress"
"It is not allowed to issue commands trough this handler")),
reason: "After subscription, command should not be able to send");
});
Expand Down Expand Up @@ -58,18 +57,17 @@ void main() async {
"Publishing a message after unsubscribe should be received by zero clients.");

// TODO: Multiple channels, Pattern (un)subscribe
});

test("Test close", () async {
// test that we can close connection
// creates new connection as prevously used in test
// does not expect errors
Command cmdClose = await generate_connect();
PubSub ps_c = PubSub(cmdClose);
cmdClose.get_connection().close();
expect(ps_c.getStream(),
emitsError(anything), // todo catch CloseError
reason: "Number of subscribers should be 0 after unsubscribe");
});
});

test("Test close", () async {
// test that we can close connection
// creates new connection as prevously used in test
// does not expect errors
Command cmdClose = await generate_connect();
PubSub ps_c = PubSub(cmdClose);
cmdClose.get_connection().close();
expect(ps_c.getStream(), emitsError(anything), // todo catch CloseError
reason: "Number of subscribers should be 0 after unsubscribe");
});
});
}