diff --git a/lib/pubsub.dart b/lib/pubsub.dart index ccda884..3f11671 100644 --- a/lib/pubsub.dart +++ b/lib/pubsub.dart @@ -1,13 +1,13 @@ part of redis; -class _WarrningPubSubInProgress extends RedisConnection { +class _WarningPubSubInProgress extends RedisConnection { RedisConnection _connection; - _WarrningPubSubInProgress(this._connection) {} + _WarningPubSubInProgress(this._connection) {} - _err() => throw "PubSub on this connaction in progress" + _err() => throw "PubSub on this connection in progress" "It is not allowed to issue commands trough this handler"; - // swap this relevant methods in Conenction with exception + // swap this relevant methods in connection with exception // ignore: unused_element Future _sendraw(Parser parser, List data) => _err(); @@ -23,39 +23,43 @@ class _WarrningPubSubInProgress extends RedisConnection { class PubSub { late Command _command; - StreamController _stream_controler = StreamController(); + Map, void Function(dynamic, String?)> _onMessageListeners = {}; + + bool streamAlreadyListened = false; + + StreamController _streamController = StreamController(); PubSub(Command command) { _command = Command.from(command); command.send_nothing()!.then((_) { - //override socket with warrning - command._connection = _WarrningPubSubInProgress(_command._connection); + //override socket with warning + command._connection = _WarningPubSubInProgress(_command._connection); // listen and process forever return Future.doWhile(() { return _command._connection ._senddummy(_command.parser) .then((var data) { try { - _stream_controler.add(data); + _streamController.add(data); return true; // run doWhile more } catch (e) { try { - _stream_controler.addError(e); + _streamController.addError(e); } catch (_) { - // we could not notfy stream that we have eror + // we could not notify stream that we have error } // stop doWhile() - _stream_controler.close(); + _streamController.close(); return false; } }).catchError((e) { try { - _stream_controler.addError(e); + _streamController.addError(e); } catch (_) { - // we could not notfy stream that we have eror + // we could not notify stream that we have error } // stop doWhile() - _stream_controler.close(); + _streamController.close(); return false; }); }); @@ -63,26 +67,74 @@ class PubSub { } Stream getStream() { - return _stream_controler.stream; + return _streamController.stream; + } + + /// Subscribes the client to the specified channels. + /// ``` + /// subscriber.subscribe(['chat']); + /// subscriber.subscribe(['chat'], + /// onMessage: (dynamic message, String? channel) { + /// print(message); + /// print(channel); + /// }, + /// ); + /// ``` + /// If you would like to handle on message via Stream, + /// onMessage callback can be optional + void subscribe(List s, + {void Function(dynamic message, String? channel)? onMessage}) { + _sendCommandAndList("SUBSCRIBE", s); + if (onMessage != null) { + /// register onMessage callback to `_onMessageListeners` + _onMessageListeners[s] = onMessage; + _listenForNewMessage(); + } + } + + /// handle new message via stream and + /// return result to registered onMessage callbacks. + void _listenForNewMessage() { + if (streamAlreadyListened) return; + streamAlreadyListened = true; + getStream().listen((msg) { + var kind = msg[0]; + if (kind != 'message') return; + var channel = msg[1]; + var message = msg[2]; + Function(dynamic, String?)? onMessageCallback = + _findOnMessageCallback(channel); + if (onMessageCallback != null) { + onMessageCallback(message, channel); + } + }); } - void subscribe(List s) { - _sendcmd_and_list("SUBSCRIBE", s); + /// get onMessage callback related to the cannel + Function(dynamic, String?)? _findOnMessageCallback(String? channel) { + List> channelsLists = _onMessageListeners.keys.toList(); + channelsLists = + channelsLists.where((element) => element.contains(channel)).toList(); + if (channelsLists.isNotEmpty) { + List? channels = channelsLists.first; + return _onMessageListeners[channels]; + } + return null; } void psubscribe(List s) { - _sendcmd_and_list("PSUBSCRIBE", s); + _sendCommandAndList("PSUBSCRIBE", s); } void unsubscribe(List s) { - _sendcmd_and_list("UNSUBSCRIBE", s); + _sendCommandAndList("UNSUBSCRIBE", s); } void punsubscribe(List s) { - _sendcmd_and_list("PUNSUBSCRIBE", s); + _sendCommandAndList("PUNSUBSCRIBE", s); } - void _sendcmd_and_list(String cmd, List s) { + void _sendCommandAndList(String cmd, List s) { List list = [cmd]; list.addAll(s); _command._connection._socket.add(_command.serializer.serialize(list)); diff --git a/pubspec.yaml b/pubspec.yaml index 9a69a23..912ae35 100644 --- a/pubspec.yaml +++ b/pubspec.yaml @@ -10,3 +10,4 @@ environment: dev_dependencies: test: ^1.6.5 + mockito: ^5.2.0 diff --git a/test/pubsub_callback_test.dart b/test/pubsub_callback_test.dart new file mode 100644 index 0000000..5329e4a --- /dev/null +++ b/test/pubsub_callback_test.dart @@ -0,0 +1,40 @@ +import 'package:mockito/mockito.dart'; +import 'package:redis/redis.dart'; +import 'package:test/test.dart'; + +import 'main.dart'; + +class MockOnNewMessageCallback extends Mock { + void call(dynamic message, String? channel); +} + +void main() async { + Command cmdP = await generate_connect(); + Command cmdS = await generate_connect(); + + group("Test Redis Pub-Sub subscribe with onMessage callback", () { + PubSub subscriber = PubSub(cmdS); + + test("Subscribe to channel and listen via callback", () async { + final mockOnNewMessageCallback = MockOnNewMessageCallback(); + + subscriber.subscribe( + ["chat_room"], + onMessage: mockOnNewMessageCallback, + ); + subscriber.subscribe( + ["chat_room2"], + onMessage: mockOnNewMessageCallback, + ); + + await cmdP.send_object(["PUBLISH", "chat_room", "goodbye"]); + await cmdP.send_object(["PUBLISH", "chat_room2", "hello"]); + + // wait for the callback is triggered completely + await Future.delayed(Duration(milliseconds: 500)); + + verify(mockOnNewMessageCallback("goodbye", "chat_room")).called(1); + verify(mockOnNewMessageCallback("hello", "chat_room2")).called(1); + }); + }); +} diff --git a/test/pubsub_test.dart b/test/pubsub_test.dart index 2faaa69..d7140fa 100644 --- a/test/pubsub_test.dart +++ b/test/pubsub_test.dart @@ -7,8 +7,7 @@ void main() async { Command cmdP = await generate_connect(); Command cmdS = await generate_connect(); - group("Test Redis Pub-Sub", () { - + group("Test Redis Pub-Sub", () { PubSub subscriber = PubSub(cmdS); test("Publishing to channel before subscription", () { @@ -26,7 +25,7 @@ void main() async { expect( () => cmdS.send_object("PING"), - throwsA(equals("PubSub on this connaction in progress" + throwsA(equals("PubSub on this connection in progress" "It is not allowed to issue commands trough this handler")), reason: "After subscription, command should not be able to send"); }); @@ -58,18 +57,17 @@ void main() async { "Publishing a message after unsubscribe should be received by zero clients."); // TODO: Multiple channels, Pattern (un)subscribe - }); - - test("Test close", () async { - // test that we can close connection - // creates new connection as prevously used in test - // does not expect errors - Command cmdClose = await generate_connect(); - PubSub ps_c = PubSub(cmdClose); - cmdClose.get_connection().close(); - expect(ps_c.getStream(), - emitsError(anything), // todo catch CloseError - reason: "Number of subscribers should be 0 after unsubscribe"); - }); + }); + + test("Test close", () async { + // test that we can close connection + // creates new connection as prevously used in test + // does not expect errors + Command cmdClose = await generate_connect(); + PubSub ps_c = PubSub(cmdClose); + cmdClose.get_connection().close(); + expect(ps_c.getStream(), emitsError(anything), // todo catch CloseError + reason: "Number of subscribers should be 0 after unsubscribe"); + }); }); }