Skip to content

Commit

Permalink
shape outbound traffic on endpoints other than round/play
Browse files Browse the repository at this point in the history
  • Loading branch information
schlawg committed Sep 23, 2024
1 parent 023819a commit e0b99d7
Show file tree
Hide file tree
Showing 7 changed files with 88 additions and 53 deletions.
12 changes: 10 additions & 2 deletions src/main/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@ study.mongo.uri = ${mongo.uri}
yolo.mongo.uri = ${mongo.uri}
redis.uri = "redis://127.0.0.1"
csrf.origin = "http://localhost:9663"
netty.native = true
netty.threads = 0 # auto
cookie.name = "lila2"

socialGraph {
Expand All @@ -29,6 +27,16 @@ reactivemongo {
}
}

netty {
threads = 0 # auto

flush {
step = 100 # target number of messages to flush per interval
interval = 1.millis # interval between flush cycles
max-delay = 500.millis # exceed step when this threshold is passed
}
}

storm.secret = "somethingElseInProd"
oauth.secret = "somethingElseInProd"

Expand Down
6 changes: 4 additions & 2 deletions src/main/scala/Controller.scala
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,8 @@ object Controller:
val behavior: ClientEmit => ClientBehavior,
val rateLimit: RateLimit,
val header: RequestHeader,
val emitCounter: kamon.metric.Counter
val emitCounter: kamon.metric.Counter,
val name: String
)
def endpoint(
name: String,
Expand All @@ -320,7 +321,8 @@ object Controller:
name = name
),
header,
Monitor.clientInCounter(name)
Monitor.clientInCounter(name),
name
)

type ResponseSync = Either[HttpResponseStatus, Endpoint]
Expand Down
1 change: 1 addition & 0 deletions src/main/scala/LilaWs.scala
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ object LilaWs extends App:
lazy val router = wire[Router]
lazy val seenAt = wire[SeenAtUpdate]
lazy val auth = wire[Auth]
lazy val workerGroup = wire[netty.WorkerLoop]
lazy val nettyServer = wire[netty.NettyServer]
lazy val monitor = wire[Monitor]

Expand Down
3 changes: 1 addition & 2 deletions src/main/scala/Monitor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,9 @@ final class Monitor(

val version = System.getProperty("java.version")
val memory = Runtime.getRuntime.maxMemory() / 1024 / 1024
val native = config.getBoolean("netty.native")
val useKamon = config.getString("kamon.influxdb.hostname").nonEmpty

logger.info(s"lila-ws 3.0 netty native=$native kamon=$useKamon")
logger.info(s"lila-ws 3.0 netty kamon=$useKamon")
logger.info(s"Java version: $version, memory: ${memory}MB")

if useKamon then kamon.Kamon.init()
Expand Down
32 changes: 21 additions & 11 deletions src/main/scala/netty/ActorChannelConnector.scala
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,15 @@ import org.apache.pekko.actor.typed.ActorRef
import lila.ws.Controller.Endpoint
import lila.ws.netty.ProtocolHandler.key

final private class ActorChannelConnector(clients: ActorRef[Clients.Control])(using Executor):
final private class ActorChannelConnector(clients: ActorRef[Clients.Control], loop: WorkerLoop)(using
Executor
):

def apply(endpoint: Endpoint, channel: Channel): Unit =
val clientPromise = Promise[Client]()
channel.attr(key.client).set(clientPromise.future)
val channelEmit = emitToChannel(channel)
val channelEmit: ClientEmit =
emitToChannel(channel, withFlush = endpoint.name == "round/play")
val monitoredEmit: ClientEmit = (msg: ipc.ClientIn) =>
endpoint.emitCounter.increment()
channelEmit(msg)
Expand All @@ -27,12 +30,19 @@ final private class ActorChannelConnector(clients: ActorRef[Clients.Control])(us
clients ! Clients.Control.Stop(client)
}

private def emitToChannel(channel: Channel): ClientEmit =
case ipc.ClientIn.Disconnect(reason) =>
channel
.writeAndFlush(CloseWebSocketFrame(WebSocketCloseStatus(4010, reason)))
.addListener(ChannelFutureListener.CLOSE)
case ipc.ClientIn.RoundPingFrameNoFlush =>
channel.write { PingWebSocketFrame(Unpooled.copyLong(System.currentTimeMillis())) }
case in =>
channel.writeAndFlush(TextWebSocketFrame(in.write))
private inline def emitDisconnect(inline channel: Channel, inline reason: String): Unit =
channel
.writeAndFlush(CloseWebSocketFrame(WebSocketCloseStatus(4010, reason)))
.addListener(ChannelFutureListener.CLOSE)

private inline def emitPingFrame(inline channel: Channel): Unit =
channel.write { PingWebSocketFrame(Unpooled.copyLong(System.currentTimeMillis())) }

private def emitToChannel(channel: Channel, withFlush: Boolean): ClientEmit =
msg =>
msg.match
case ipc.ClientIn.Disconnect(reason) => emitDisconnect(channel, reason)
case ipc.ClientIn.RoundPingFrameNoFlush => emitPingFrame(channel)
case in =>
if withFlush then channel.writeAndFlush(TextWebSocketFrame(in.write))
else loop.writeShaped(channel, TextWebSocketFrame(in.write))
44 changes: 8 additions & 36 deletions src/main/scala/netty/NettyServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,62 +4,36 @@ package netty
import com.typesafe.config.Config
import com.typesafe.scalalogging.Logger
import io.netty.bootstrap.ServerBootstrap
import io.netty.channel.epoll.{ EpollEventLoopGroup, EpollServerSocketChannel }
import io.netty.channel.kqueue.{ KQueueEventLoopGroup, KQueueServerSocketChannel }
import io.netty.channel.nio.NioEventLoopGroup
import io.netty.channel.socket.nio.NioServerSocketChannel
import io.netty.channel.{ Channel, ChannelInitializer }
import io.netty.handler.codec.http.*

final class NettyServer(
clients: ClientSystem,
router: Router,
config: Config
config: Config,
workerLoop: WorkerLoop
)(using Executor):

private val connector = ActorChannelConnector(clients)
private val logger = Logger(getClass)
private val logger = Logger(getClass)

def start(): Unit =

logger.info("Start")

val port = config.getInt("http.port")
val workerThreads = config.getInt("netty.threads")

val (bossGroup, workerGroup, channelClz) =
if !config.getBoolean("netty.native") then
(
NioEventLoopGroup(1),
NioEventLoopGroup(workerThreads),
classOf[NioServerSocketChannel]
)
else if System.getProperty("os.name").toLowerCase.startsWith("mac") then
(
KQueueEventLoopGroup(1),
KQueueEventLoopGroup(workerThreads),
classOf[KQueueServerSocketChannel]
)
else
(
EpollEventLoopGroup(1),
EpollEventLoopGroup(workerThreads),
classOf[EpollServerSocketChannel]
)

val port = config.getInt("http.port")
try
val boot = new ServerBootstrap
boot
.group(bossGroup, workerGroup)
.channel(channelClz)
.group(workerLoop.parentGroup, workerLoop.group)
.channel(workerLoop.channelClass)
.childHandler(
new ChannelInitializer[Channel]:
override def initChannel(ch: Channel): Unit =
val pipeline = ch.pipeline()
pipeline.addLast(HttpServerCodec())
pipeline.addLast(HttpObjectAggregator(4096))
pipeline.addLast(RequestHandler(router))
pipeline.addLast(ProtocolHandler(connector))
pipeline.addLast(ProtocolHandler(ActorChannelConnector(clients, workerLoop)))
pipeline.addLast(FrameHandler())
)

Expand All @@ -70,6 +44,4 @@ final class NettyServer(
server.closeFuture().sync()

logger.info(s"Closed $port")
finally
bossGroup.shutdownGracefully()
workerGroup.shutdownGracefully()
finally workerLoop.shutdown()
43 changes: 43 additions & 0 deletions src/main/scala/netty/WorkerLoop.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package lila.ws
package netty

import com.typesafe.config.Config
import io.netty.channel.{ Channel, EventLoopGroup }
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame
import io.netty.channel.epoll.{ EpollEventLoopGroup, EpollServerSocketChannel }
import io.netty.channel.kqueue.{ KQueueEventLoopGroup, KQueueServerSocketChannel }
import java.util.concurrent.{ ConcurrentLinkedQueue, TimeUnit }

final class WorkerLoop(config: Config)(using Executor):
private val isMacOS = System.getProperty("os.name").toLowerCase.startsWith("mac")
private val step = config.getInt("netty.flush.step")
private val interval: Long = config.getDuration("netty.flush.interval").toNanos
private val maxDelay: Long = config.getDuration("netty.flush.max-delay").toNanos
private val maxDelayFactor: Double = interval.toDouble / maxDelay
private val flushQ = new ConcurrentLinkedQueue[Channel]()

val channelClass = if isMacOS then classOf[KQueueServerSocketChannel] else classOf[EpollServerSocketChannel]
val parentGroup = makeGroup(1)
val group = makeGroup(config.getInt("netty.threads"))

private val f = group.scheduleAtFixedRate(() => flush(), 1_000_000_000L, interval, TimeUnit.NANOSECONDS)

def writeShaped(channel: Channel, frame: TextWebSocketFrame): Unit =
channel.write(frame)
flushQ.add(channel)

def shutdown(): Unit =
f.cancel(false)
parentGroup.shutdownGracefully()
group.shutdownGracefully()

private def flush(): Unit =
val channelsToFlush = step.atLeast((flushQ.size * maxDelayFactor).toInt)
val iterator = flushQ.iterator()
for count <- 0 until channelsToFlush if iterator.hasNext do
iterator.next().flush()
iterator.remove()

private def makeGroup(n: Int): EventLoopGroup =
if isMacOS then new KQueueEventLoopGroup(n)
else new EpollEventLoopGroup(n)

0 comments on commit e0b99d7

Please sign in to comment.