diff --git a/gradle.properties b/gradle.properties index 5688a95..5f13a17 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,3 +1,3 @@ kotlin.code.style=official -libVersion=1.8.6 \ No newline at end of file +libVersion=1.8.7 \ No newline at end of file diff --git a/src/main/kotlin/cn/rtast/rob/util/ob/MessageHandler.kt b/src/main/kotlin/cn/rtast/rob/util/ob/MessageHandler.kt index 0efcf06..8f65544 100644 --- a/src/main/kotlin/cn/rtast/rob/util/ob/MessageHandler.kt +++ b/src/main/kotlin/cn/rtast/rob/util/ob/MessageHandler.kt @@ -57,8 +57,8 @@ object MessageHandler { return@forEach } } - commandManager.handleGroup(listener, msg) listener.onGroupMessage(msg, message) + commandManager.handleGroup(listener, msg) } MessageType.private -> { @@ -69,8 +69,8 @@ object MessageHandler { return@forEach } } - commandManager.handlePrivate(listener, msg) listener.onPrivateMessage(msg, message) + commandManager.handlePrivate(listener, msg) } null -> listener.onMessage(message) diff --git a/src/main/kotlin/cn/rtast/rob/util/ws/WsClient.kt b/src/main/kotlin/cn/rtast/rob/util/ws/WsClient.kt index 6a57e8a..40dce11 100644 --- a/src/main/kotlin/cn/rtast/rob/util/ws/WsClient.kt +++ b/src/main/kotlin/cn/rtast/rob/util/ws/WsClient.kt @@ -30,6 +30,7 @@ internal class WsClient( private val reconnectInterval = 5000L private var isConnected = false private val coroutineScope = CoroutineScope(Dispatchers.IO) + private val channelCoroutineScope = CoroutineScope(Dispatchers.IO) private val messageChannel = Channel(messageQueueLimit) private val scheduler = Executors.newScheduledThreadPool(1) @@ -44,8 +45,11 @@ internal class WsClient( } } + /** + * 每次接收到消息时都会向channel中发送数据等待消费 + */ override fun onMessage(message: String) { - coroutineScope.launch { + channelCoroutineScope.launch { messageChannel.send(message) } } @@ -75,10 +79,17 @@ internal class WsClient( }, reconnectInterval, TimeUnit.MILLISECONDS) } + /** + * 启动一个线程用于消费管道(Channel)内的消息 + * 每次消费消息都会开一个线程用于处理这条消息 + * 消费完成之后线程会自动回到线程池等下下次启动 + */ private fun processMessages() { coroutineScope.launch { for (message in messageChannel) { - MessageHandler.onMessage(listener, message) + coroutineScope.launch { + MessageHandler.onMessage(listener, message) + } } } } diff --git a/src/main/kotlin/cn/rtast/rob/util/ws/WsServer.kt b/src/main/kotlin/cn/rtast/rob/util/ws/WsServer.kt index 1d18d06..1b7d5c1 100644 --- a/src/main/kotlin/cn/rtast/rob/util/ws/WsServer.kt +++ b/src/main/kotlin/cn/rtast/rob/util/ws/WsServer.kt @@ -26,6 +26,7 @@ internal class WsServer( ) : WebSocketServer(InetSocketAddress(port)) { private val coroutineScope = CoroutineScope(Dispatchers.IO) + private val channelCoroutineScope = CoroutineScope(Dispatchers.IO) private val messageChannel = Channel(messageQueueLimit) init { @@ -55,8 +56,8 @@ internal class WsServer( } override fun onMessage(conn: WebSocket, message: String) { - coroutineScope.launch { - MessageHandler.onMessage(listener, message) + channelCoroutineScope.launch { + messageChannel.send(message) } } @@ -72,14 +73,12 @@ internal class WsServer( } } - /** - * launch a coroutine to process queued messages - */ private fun processMessages() { coroutineScope.launch { - messageChannel.cancel() for (message in messageChannel) { - MessageHandler.onMessage(listener, message) + coroutineScope.launch { + MessageHandler.onMessage(listener, message) + } } } } diff --git a/src/test/kotlin/Common.kt b/src/test/kotlin/Common.kt index d432fb3..7f19788 100644 --- a/src/test/kotlin/Common.kt +++ b/src/test/kotlin/Common.kt @@ -9,6 +9,7 @@ import cn.rtast.rob.entity.GroupMessage import cn.rtast.rob.util.BaseCommand import cn.rtast.rob.util.ob.CQMessageChain import cn.rtast.rob.util.ob.OneBotListener +import kotlinx.coroutines.delay class EchoCommand : BaseCommand() { // A simple echo message command @@ -21,4 +22,14 @@ class EchoCommand : BaseCommand() { .build() listener.sendGroupMessage(message.groupId, msg) } +} + +class DelayCommand : BaseCommand() { + override val commandNames = listOf("d", "/d") + + override suspend fun executeGroup(listener: OneBotListener, message: GroupMessage, args: List) { + delay(3000L) + message.reply("延迟3秒") + println("延迟3秒") + } } \ No newline at end of file diff --git a/src/test/kotlin/TestClient.kt b/src/test/kotlin/TestClient.kt index 43b9d9b..f964441 100644 --- a/src/test/kotlin/TestClient.kt +++ b/src/test/kotlin/TestClient.kt @@ -16,12 +16,9 @@ fun main() { override suspend fun onGroupMessage(message: GroupMessage, json: String) { message.sender.ban(1) } - - override suspend fun onGroupFileUpload(event: FileEvent) { - event.saveTo(".") - } }) rob.commandManager.register(EchoCommand()) // not a suspend function + rob.commandManager.register(DelayCommand()) // not a suspend function // rob.action.sendGroupMessage(114514, "1919810") // send a message in global scope rob.addListeningGroups(985927054, 114514) // set listening groups, set empty to listen all groups' event } \ No newline at end of file