Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Subscriber Authentication #14

Closed
wants to merge 1 commit into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import io.github.crackthecodeabhi.kreds.*
import io.github.crackthecodeabhi.kreds.args.EmptyArgument
import io.github.crackthecodeabhi.kreds.args.createArguments
import io.github.crackthecodeabhi.kreds.args.toArgument
import io.github.crackthecodeabhi.kreds.commands.*
import io.github.crackthecodeabhi.kreds.commands.Command
import io.github.crackthecodeabhi.kreds.commands.CommandExecution
import io.github.crackthecodeabhi.kreds.commands.ConnectionCommand
Expand Down Expand Up @@ -51,7 +52,10 @@ public class KredsPubSubException : KredsException {
internal constructor(message: String, throwable: Throwable) : super(message, throwable)
}

internal enum class PubSubCommand(override val subCommand: Command? = null, commandString: String? = null) : Command {
internal enum class PubSubCommand(
override val subCommand: Command? = null,
commandString: String? = null
) : Command {
PSUBSCRIBE, PUBLISH, PUNSUBSCRIBE, SUBSCRIBE, UNSUBSCRIBE,

CHANNELS, NUMPAT, NUMSUB, HELP,
Expand Down Expand Up @@ -158,13 +162,21 @@ internal interface PublishCommandExecutor : PublisherCommands, CommandExecutor {
execute(PUBLISH, IntegerCommandProcessor, channel.toArgument(), message.toArgument())

override suspend fun pubsubChannels(pattern: String?): List<String> =
execute(PUBSUB_CHANNELS, ArrayCommandProcessor, *createArguments(pattern)).responseTo("pubsub channels")
execute(
PUBSUB_CHANNELS,
ArrayCommandProcessor,
*createArguments(pattern)
).responseTo("pubsub channels")

override suspend fun pubsubNumpat(): Long =
execute(PUBSUB_NUMPAT, IntegerCommandProcessor)

override suspend fun pubsubNumsub(vararg channels: String): List<Any> =
execute(PUBSUB_NUMSUB, ArrayCommandProcessor, *createArguments(*channels)).responseTo("pubsub numsub")
execute(
PUBSUB_NUMSUB,
ArrayCommandProcessor,
*createArguments(*channels)
).responseTo("pubsub numsub")

override suspend fun pubsubHelp(): List<String> =
execute(PUBSUB_HELP, ArrayCommandProcessor).responseTo("pubsub help")
Expand Down Expand Up @@ -201,39 +213,6 @@ public interface SubscriberCommands {
*/
public suspend fun pUnsubscribe(vararg patterns: String)

/**
* ## `PING [message]`
*
* Returns PONG if no argument is provided, otherwise return a copy of the argument
*
* [Doc](https://redis.io/commands/ping)
* @since 1.0.0
* @return PONG or message
*/
public suspend fun ping(message: String? = null): String?

/**
* ### RESET
*
* This command performs a full reset of the connection's server-side context, mimicking the effect of disconnecting and reconnecting again.
*
* [Doc](https://redis.io/commands/reset)
* @since 6.2
* @return RESET
*/
public suspend fun reset(): String

/**
* ### QUIT
*
* Ask the server to close the connection. The connection is closed as soon as all pending replies have been written to the client.
*
* [Doc](https://redis.io/commands/quit)
* @since 1.0.0
* @return OK
*/
public suspend fun quit(): String

/**
* ### `UNSUBSCRIBE [channel [channel ...]]`
*
Expand All @@ -245,7 +224,7 @@ public interface SubscriberCommands {
public suspend fun unsubscribe(vararg channels: String)
}

public interface KredsSubscriberClient : AutoCloseable, SubscriberCommands {
public interface KredsSubscriberClient : AutoCloseable, SubscriberCommands, ConnectionCommands {

}

Expand All @@ -260,7 +239,8 @@ internal class DefaultKredsSubscriberClient(
context: CoroutineContext,
kredsSubscriber: KredsSubscriber,
config: KredsClientConfig
) : KredsSubscriberClient, AbstractKredsClient(endpoint, eventLoopGroup, config) {
) : KredsSubscriberClient, AbstractKredsClient(endpoint, eventLoopGroup, config),
ConnectionCommandsExecutor {

private val scope = CoroutineScope(context + SupervisorJob(context.job))

Expand Down Expand Up @@ -363,7 +343,8 @@ internal class DefaultKredsSubscriberClient(

private inline fun <reified R> messageOrChannel(reply: List<Any?>): R = reply.getAs(2)

private inline fun <reified R> subscribedChannels(reply: List<Any?>): R = messageOrChannel(reply)
private inline fun <reified R> subscribedChannels(reply: List<Any?>): R =
messageOrChannel(reply)

private inline fun <reified R> pmessage(reply: List<Any?>): R = reply.getAs(3)

Expand Down Expand Up @@ -422,7 +403,8 @@ internal class DefaultKredsSubscriberClient(
}
}

inner class Writer(override val mutex: Mutex, override val key: ReentrantMutexContextKey) : ExclusiveObject {
inner class Writer(override val mutex: Mutex, override val key: ReentrantMutexContextKey) :
ExclusiveObject {
suspend fun write(execution: CommandExecution<*>) = withReentrantLock {
with(execution) {
connectWriteAndFlush(processor.encode(command, *args))
Expand All @@ -432,25 +414,49 @@ internal class DefaultKredsSubscriberClient(

override suspend fun subscribe(vararg channels: String) {
reader.preemptRead {
writer.write(CommandExecution(SUBSCRIBE, ArrayCommandProcessor, *createArguments(*channels)))
writer.write(
CommandExecution(
SUBSCRIBE,
ArrayCommandProcessor,
*createArguments(*channels)
)
)
}
}

override suspend fun unsubscribe(vararg channels: String) {
reader.preemptRead {
writer.write(CommandExecution(UNSUBSCRIBE, ArrayCommandProcessor, *createArguments(*channels)))
writer.write(
CommandExecution(
UNSUBSCRIBE,
ArrayCommandProcessor,
*createArguments(*channels)
)
)
}
}

override suspend fun pSubscribe(vararg patterns: String) {
reader.preemptRead {
writer.write(CommandExecution(PSUBSCRIBE, ArrayCommandProcessor, *createArguments(*patterns)))
writer.write(
CommandExecution(
PSUBSCRIBE,
ArrayCommandProcessor,
*createArguments(*patterns)
)
)
}
}

override suspend fun pUnsubscribe(vararg patterns: String) {
reader.preemptRead {
writer.write(CommandExecution(PUNSUBSCRIBE, ArrayCommandProcessor, *createArguments(*patterns)))
writer.write(
CommandExecution(
PUNSUBSCRIBE,
ArrayCommandProcessor,
*createArguments(*patterns)
)
)
}
}

Expand Down