Skip to content

Commit

Permalink
fix: fix thread(coroutines) bug
Browse files Browse the repository at this point in the history
  • Loading branch information
RTAkland committed Sep 19, 2024
1 parent 50b3984 commit dc0364a
Show file tree
Hide file tree
Showing 6 changed files with 34 additions and 16 deletions.
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
kotlin.code.style=official

libVersion=1.8.6
libVersion=1.8.7
4 changes: 2 additions & 2 deletions src/main/kotlin/cn/rtast/rob/util/ob/MessageHandler.kt
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,8 @@ object MessageHandler {
return@forEach
}
}
commandManager.handleGroup(listener, msg)
listener.onGroupMessage(msg, message)
commandManager.handleGroup(listener, msg)
}

MessageType.private -> {
Expand All @@ -69,8 +69,8 @@ object MessageHandler {
return@forEach
}
}
commandManager.handlePrivate(listener, msg)
listener.onPrivateMessage(msg, message)
commandManager.handlePrivate(listener, msg)
}

null -> listener.onMessage(message)
Expand Down
15 changes: 13 additions & 2 deletions src/main/kotlin/cn/rtast/rob/util/ws/WsClient.kt
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ internal class WsClient(
private val reconnectInterval = 5000L
private var isConnected = false
private val coroutineScope = CoroutineScope(Dispatchers.IO)
private val channelCoroutineScope = CoroutineScope(Dispatchers.IO)
private val messageChannel = Channel<String>(messageQueueLimit)
private val scheduler = Executors.newScheduledThreadPool(1)

Expand All @@ -44,8 +45,11 @@ internal class WsClient(
}
}

/**
* 每次接收到消息时都会向channel中发送数据等待消费
*/
override fun onMessage(message: String) {
coroutineScope.launch {
channelCoroutineScope.launch {
messageChannel.send(message)
}
}
Expand Down Expand Up @@ -75,10 +79,17 @@ internal class WsClient(
}, reconnectInterval, TimeUnit.MILLISECONDS)
}

/**
* 启动一个线程用于消费管道(Channel)内的消息
* 每次消费消息都会开一个线程用于处理这条消息
* 消费完成之后线程会自动回到线程池等下下次启动
*/
private fun processMessages() {
coroutineScope.launch {
for (message in messageChannel) {
MessageHandler.onMessage(listener, message)
coroutineScope.launch {
MessageHandler.onMessage(listener, message)
}
}
}
}
Expand Down
13 changes: 6 additions & 7 deletions src/main/kotlin/cn/rtast/rob/util/ws/WsServer.kt
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ internal class WsServer(
) : WebSocketServer(InetSocketAddress(port)) {

private val coroutineScope = CoroutineScope(Dispatchers.IO)
private val channelCoroutineScope = CoroutineScope(Dispatchers.IO)
private val messageChannel = Channel<String>(messageQueueLimit)

init {
Expand Down Expand Up @@ -55,8 +56,8 @@ internal class WsServer(
}

override fun onMessage(conn: WebSocket, message: String) {
coroutineScope.launch {
MessageHandler.onMessage(listener, message)
channelCoroutineScope.launch {
messageChannel.send(message)
}
}

Expand All @@ -72,14 +73,12 @@ internal class WsServer(
}
}

/**
* launch a coroutine to process queued messages
*/
private fun processMessages() {
coroutineScope.launch {
messageChannel.cancel()
for (message in messageChannel) {
MessageHandler.onMessage(listener, message)
coroutineScope.launch {
MessageHandler.onMessage(listener, message)
}
}
}
}
Expand Down
11 changes: 11 additions & 0 deletions src/test/kotlin/Common.kt
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import cn.rtast.rob.entity.GroupMessage
import cn.rtast.rob.util.BaseCommand
import cn.rtast.rob.util.ob.CQMessageChain
import cn.rtast.rob.util.ob.OneBotListener
import kotlinx.coroutines.delay

class EchoCommand : BaseCommand() {
// A simple echo message command
Expand All @@ -21,4 +22,14 @@ class EchoCommand : BaseCommand() {
.build()
listener.sendGroupMessage(message.groupId, msg)
}
}

class DelayCommand : BaseCommand() {
override val commandNames = listOf("d", "/d")

override suspend fun executeGroup(listener: OneBotListener, message: GroupMessage, args: List<String>) {
delay(3000L)
message.reply("延迟3秒")
println("延迟3秒")
}
}
5 changes: 1 addition & 4 deletions src/test/kotlin/TestClient.kt
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,9 @@ fun main() {
override suspend fun onGroupMessage(message: GroupMessage, json: String) {
message.sender.ban(1)
}

override suspend fun onGroupFileUpload(event: FileEvent) {
event.saveTo(".")
}
})
rob.commandManager.register(EchoCommand()) // not a suspend function
rob.commandManager.register(DelayCommand()) // not a suspend function
// rob.action.sendGroupMessage(114514, "1919810") // send a message in global scope
rob.addListeningGroups(985927054, 114514) // set listening groups, set empty to listen all groups' event
}

0 comments on commit dc0364a

Please sign in to comment.