Skip to content

Commit

Permalink
change branch to implicitt receiver
Browse files Browse the repository at this point in the history
  • Loading branch information
rtc11 committed Mar 8, 2024
1 parent 5ce1a8d commit 5769453
Show file tree
Hide file tree
Showing 5 changed files with 27 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ class BranchedKStream<T : Any> internal constructor(

fun branch(
predicate: (T) -> Boolean,
consumed: (ConsumedStream<T>) -> Unit,
consumed: ConsumedStream<T>.() -> Unit,
): BranchedKStream<T> {
val namedBranch = "-branch-$nextBranchNumber"
val internalPredicate = internalPredicate(predicate)
Expand All @@ -23,7 +23,7 @@ class BranchedKStream<T : Any> internal constructor(
return this
}

fun default(consumed: (ConsumedStream<T>) -> Unit) {
fun default(consumed: ConsumedStream<T>.() -> Unit) {
val namedBranch = "-branch-default"
val internalBranch = internalBranch(consumed, namedBranch) { "via$namedBranch-${namedSupplier()}" }
stream.defaultBranch(internalBranch)
Expand All @@ -49,7 +49,7 @@ class BranchedMappedKStream<T : Any> internal constructor(

fun branch(
predicate: (T) -> Boolean,
consumed: (MappedStream<T>) -> Unit,
consumed: MappedStream<T>.() -> Unit,
): BranchedMappedKStream<T> {
val namedBranch = "-branch-$nextBranchNumber"
val internalPredicate = internalPredicate(predicate)
Expand All @@ -58,7 +58,7 @@ class BranchedMappedKStream<T : Any> internal constructor(
return this
}

fun default(consumed: (MappedStream<T>) -> Unit) {
fun default(consumed: MappedStream<T>.() -> Unit) {
val namedBranch = "-branch-default"
val internalBranch = internalBranch(consumed, namedBranch) { "via$namedBranch-${namedSupplier()}" }
stream.defaultBranch(internalBranch)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ class ConsumedStream<T : Any> internal constructor(
return JoinedStream(topic.name, joinedStream, named)
}

fun branch(predicate: (T) -> Boolean, consumed: (ConsumedStream<T>) -> Unit): BranchedKStream<T> {
fun branch(predicate: (T) -> Boolean, consumed: ConsumedStream<T>.() -> Unit): BranchedKStream<T> {
val splittedStream = stream.split(Named.`as`("split-${namedSupplier()}"))
return BranchedKStream(topic, splittedStream, namedSupplier).branch(predicate, consumed)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ class JoinedStream<L : Any, R> internal constructor(

fun branch(
predicate: (StreamsPair<L, R>) -> Boolean,
consumed: (MappedStream<StreamsPair<L, R>>) -> Unit,
consumed: MappedStream<StreamsPair<L, R>>.() -> Unit,
): BranchedMappedKStream<StreamsPair<L, R>> {
val branchedStream = stream.split(Named.`as`("split-${namedSupplier()}"))
return BranchedMappedKStream(sourceTopicName, branchedStream, namedSupplier).branch(predicate, consumed)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ class MappedStream<T : Any> internal constructor(
return MappedStream(sourceTopicName, filteredStream, namedSupplier)
}

fun branch(predicate: (T) -> Boolean, consumed: (MappedStream<T>) -> Unit): BranchedMappedKStream<T> {
fun branch(predicate: (T) -> Boolean, consumed: MappedStream<T>.() -> Unit): BranchedMappedKStream<T> {
val named = Named.`as`("split-${namedSupplier()}")
val branchedStream = stream.split(named)
return BranchedMappedKStream(sourceTopicName, branchedStream, namedSupplier).branch(predicate, consumed)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,10 @@ internal class BranchedStreamTest {
val kafka = StreamsMock.withTopology {
consume(Topics.A)
.branch({ v -> v == "lol" }, {
it.produce(Topics.C)
produce(Topics.C)
})
.branch({ v -> v != "lol" }, {
it.produce(Topics.B)
produce(Topics.B)
})
}

Expand All @@ -36,10 +36,10 @@ internal class BranchedStreamTest {
val kafka = StreamsMock.withTopology {
consume(Topics.A)
.branch({ v -> v == "lol" }, {
it.produce(Topics.C)
produce(Topics.C)
})
.default {
it.produce(Topics.B)
produce(Topics.B)
}
}

Expand All @@ -59,10 +59,10 @@ internal class BranchedStreamTest {
consume(Topics.A)
.map { i -> i }
.branch({ v -> v == "lol" }, {
it.produce(Topics.C)
produce(Topics.C)
})
.branch({ v -> v != "lol" }, {
it.produce(Topics.B)
produce(Topics.B)
})
}

Expand All @@ -82,12 +82,12 @@ internal class BranchedStreamTest {
consume(Topics.A)
.map { i -> i }
.branch({ v -> v == "lol" }, {
it
.branch({ true }) { b -> b.produce(Topics.C) }
.branch({ false }) { b -> b.produce(Topics.B) }
this
.branch({ true }) { produce(Topics.C) }
.branch({ false }) { produce(Topics.B) }
})
.branch({ v -> v != "lol" }, {
it.produce(Topics.B)
produce(Topics.B)
})
}

Expand All @@ -107,10 +107,10 @@ internal class BranchedStreamTest {
consume(Topics.A)
.map { i -> i }
.branch({ v -> v == "lol" }, {
it.produce(Topics.C)
produce(Topics.C)
})
.default {
it.produce(Topics.B)
produce(Topics.B)
}
}

Expand All @@ -132,13 +132,13 @@ internal class BranchedStreamTest {
.joinWith(tableB)
.branch({ (left, _) -> left == "lol" }, {

it.map { (left, right) -> left + right }
map { (left, right) -> left + right }
.produce(Topics.C)

})
.branch({ (_, right) -> right == "lol" }, {

it.map { (_, right) -> right + right }
map { (_, right) -> right + right }
.produce(Topics.D)

})
Expand All @@ -163,11 +163,11 @@ internal class BranchedStreamTest {
consume(Topics.A)
.joinWith(tableB)
.branch({ (left, _) -> left == "lol" }, {
it.map { (left, right) -> left + right }.produce(Topics.C)
map { (left, right) -> left + right }.produce(Topics.C)

})
.default {
it.map { (_, right) -> right + right }.produce(Topics.D)
map { (_, right) -> right + right }.produce(Topics.D)
}
}

Expand All @@ -190,16 +190,10 @@ internal class BranchedStreamTest {
consume(Topics.A)
.leftJoinWith(tableB)
.branch({ (left, _) -> left == "lol" }, {

it.map { (left, right) -> left + right }
.produce(Topics.C)

map { (left, right) -> left + right }.produce(Topics.C)
})
.branch({ (_, right) -> right == "lol" }, {

it.map { (_, right) -> right + right }
.produce(Topics.D)

map { (_, right) -> right + right }.produce(Topics.D)
})
}

Expand All @@ -222,11 +216,10 @@ internal class BranchedStreamTest {
consume(Topics.A)
.leftJoinWith(tableB)
.branch({ (left, _) -> left == "lol" }, {
it.map { (left, right) -> left + right }.produce(Topics.C)

map { (left, right) -> left + right }.produce(Topics.C)
})
.default {
it.map { (_, right) -> right + right }.produce(Topics.D)
map { (_, right) -> right + right }.produce(Topics.D)
}
}

Expand Down

0 comments on commit 5769453

Please sign in to comment.