Skip to content

Commit

Permalink
local transport: drop inheritDispatcher and add tests for unlimited b…
Browse files Browse the repository at this point in the history
…uffer
  • Loading branch information
whyoleg committed Mar 12, 2024
1 parent 80b9e5a commit d3a12ed
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,10 @@ public sealed interface LocalClientTransportBuilder :
RSocketTransportBuilder<String, LocalClientTarget, LocalClientTransport> {

public fun dispatcher(context: CoroutineContext)
public fun inheritDispatcher(): Unit = dispatcher(EmptyCoroutineContext)
}

private class LocalClientTransportBuilderImpl : LocalClientTransportBuilder {
private var dispatcher: CoroutineContext = Dispatchers.Default
private var dispatcher: CoroutineContext = EmptyCoroutineContext

override fun dispatcher(context: CoroutineContext) {
check(context[Job] == null) { "Dispatcher shouldn't contain job" }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,14 +44,13 @@ public sealed interface LocalServerTransportBuilder : RSocketTransportBuilder<
LocalServerTransport> {

public fun dispatcher(context: CoroutineContext)
public fun inheritDispatcher(): Unit = dispatcher(EmptyCoroutineContext)

public fun sequential(connectionBufferCapacity: Int = 64)
public fun multiplexed(connectionStreamsQueueCapacity: Int = 64, streamBufferCapacity: Int = 64)
}

private class LocalServerTransportBuilderImpl : LocalServerTransportBuilder {
private var dispatcher: CoroutineContext = Dispatchers.Default
private var dispatcher: CoroutineContext = EmptyCoroutineContext
private var connector: LocalServerConnector? = null

override fun dispatcher(context: CoroutineContext) {
Expand All @@ -70,7 +69,7 @@ private class LocalServerTransportBuilderImpl : LocalServerTransportBuilder {
@RSocketTransportApi
override fun buildTransport(context: CoroutineContext): LocalServerTransport = LocalServerTransportImpl(
coroutineContext = context.supervisorContext() + dispatcher,
connector = connector ?: LocalServerConnector.Sequential(64) // TODO: what is default?
connector = connector ?: LocalServerConnector.Sequential(64)
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,15 @@ class SequentialLocalTransportTest : TransportTest() {
}
}

class SequentialUnlimitedLocalTransportTest : TransportTest() {
override suspend fun before() {
val server = startServer(LocalServerTransport(testContext) {
sequential(Int.MAX_VALUE)
}.target())
client = connectClient(LocalClientTransport(testContext).target(server))
}
}

class MultiplexedLocalTransportTest : TransportTest() {
override suspend fun before() {
val server = startServer(LocalServerTransport(testContext) {
Expand All @@ -42,3 +51,12 @@ class MultiplexedLocalTransportTest : TransportTest() {
client = connectClient(LocalClientTransport(testContext).target(server))
}
}

class MultiplexedUnlimitedLocalTransportTest : TransportTest() {
override suspend fun before() {
val server = startServer(LocalServerTransport(testContext) {
multiplexed(Int.MAX_VALUE, Int.MAX_VALUE)
}.target())
client = connectClient(LocalClientTransport(testContext).target(server))
}
}

0 comments on commit d3a12ed

Please sign in to comment.