Skip to content

Commit

Permalink
Add more flushing rules and UT.
Browse files Browse the repository at this point in the history
Signed-off-by: Yury-Fridlyand <[email protected]>
  • Loading branch information
Yury-Fridlyand committed Nov 3, 2023
1 parent 49c9119 commit dd7413b
Show file tree
Hide file tree
Showing 5 changed files with 243 additions and 40 deletions.
18 changes: 12 additions & 6 deletions java/benchmarks/build.gradle
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
plugins {
// Apply the application plugin to add support for building a CLI application in Java.
id 'application'
id 'io.freefair.lombok'
}

repositories {
Expand All @@ -25,14 +26,18 @@ dependencies {

implementation group: 'io.netty', name: 'netty-handler', version: '4.1.100.Final'
// https://github.com/netty/netty/wiki/Native-transports
// Windows is not supported, because babushka does not support windows, because tokio does not support windows, because ... 42
implementation group: 'io.netty', name: 'netty-transport-native-epoll', version: '4.1.100.Final', classifier: 'linux-x86_64'
implementation group: 'io.netty', name: 'netty-transport-native-kqueue', version: '4.1.100.Final', classifier: 'osx-x86_64'
implementation group: 'io.netty', name: 'netty-transport-native-kqueue', version: '4.1.100.Final', classifier: 'osx-aarch_64'

//testImplementation group: 'org.slf4j', name: 'slf4j-reload4j', version: '2.0.9'
//testImplementation group: 'org.slf4j', name: 'slf4j-api', version: '2.0.9'

compileOnly 'org.projectlombok:lombok:1.18.30'
annotationProcessor 'org.projectlombok:lombok:1.18.30'
testCompileOnly 'org.projectlombok:lombok:1.18.30'
testAnnotationProcessor 'org.projectlombok:lombok:1.18.30'
}

// Apply a specific Java toolchain to ease working on different environments.
Expand All @@ -45,14 +50,15 @@ java {
application {
// Define the main class for the application.
mainClass = 'javababushka.benchmarks.BenchmarkingApp'
// mainClass = 'javababushka.benchmarks.clients.babushka.JniNettyClient'
mainClass = 'javababushka.benchmarks.clients.babushka.JniNettyClient'
applicationDefaultJvmArgs += "-Djava.library.path=${projectDir}/../target/debug"
}

tasks.withType(Test) {
testLogging {
exceptionFormat "full"
events "started", "skipped", "passed", "failed"
showStandardStreams true
}
testLogging {
exceptionFormat "full"
events "started", "skipped", "passed", "failed"
showStandardStreams true
}
jvmArgs "-Djava.library.path=${projectDir}/../target/debug"
}
Original file line number Diff line number Diff line change
Expand Up @@ -244,17 +244,17 @@ public static class RunConfiguration {
public RunConfiguration() {
configuration = "Release";
resultsFile = Optional.of("res_java.json");//Optional.empty();
dataSize = new int[] {100, 4000};
dataSize = new int[] {100};
concurrentTasks = new int[] {100};
clients =
new ClientName[] {
// ClientName.BABUSHKA_ASYNC,
//ClientName.JEDIS, ClientName.JEDIS_ASYNC, ClientName.LETTUCE, ClientName.LETTUCE_ASYNC
ClientName.JNI_NETTY, ClientName.LETTUCE, ClientName.LETTUCE_ASYNC
ClientName.JNI_NETTY//, ClientName.LETTUCE, ClientName.LETTUCE_ASYNC
};
host = "localhost";
port = 6379;
clientCount = new int[] {1, 2};
clientCount = new int[] {2};
tls = false;
}
}
Expand Down
75 changes: 46 additions & 29 deletions java/benchmarks/src/main/java/javababushka/benchmarks/clients/babushka/JniNettyClient.java
100755 → 100644
Original file line number Diff line number Diff line change
Expand Up @@ -7,82 +7,78 @@
import static connection_request.ConnectionRequestOuterClass.AuthenticationInfo;
import static connection_request.ConnectionRequestOuterClass.TlsMode;
import static response.ResponseOuterClass.Response;
import static response.ResponseOuterClass.ConstantResponse;
import static redis_request.RedisRequestOuterClass.Command.ArgsArray;
import static redis_request.RedisRequestOuterClass.Command;
import static redis_request.RedisRequestOuterClass.RequestType;
import static redis_request.RedisRequestOuterClass.RedisRequest;
import static redis_request.RedisRequestOuterClass.SimpleRoutes;
import static redis_request.RedisRequestOuterClass.Routes;

import com.google.protobuf.InvalidProtocolBufferException;
import com.google.common.annotations.VisibleForTesting;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandler;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelOutboundHandler;
import io.netty.channel.ChannelOutboundHandlerAdapter;
import io.netty.channel.ChannelPromise;
import io.netty.channel.CombinedChannelDuplexHandler;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.SimpleUserEventChannelHandler;
import io.netty.channel.WriteBufferWaterMark;
import io.netty.channel.epoll.EpollDomainSocketChannel;
import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.kqueue.KQueue;
import io.netty.channel.kqueue.KQueueDomainSocketChannel;
import io.netty.channel.kqueue.KQueueEventLoopGroup;
import io.netty.channel.unix.UnixChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
import io.netty.handler.codec.protobuf.ProtobufDecoder;
import io.netty.handler.codec.protobuf.ProtobufEncoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.util.internal.logging.InternalLogLevel;
import io.netty.util.internal.logging.InternalLoggerFactory;
import io.netty.util.internal.logging.Slf4JLoggerFactory;
import javababushka.benchmarks.clients.AsyncClient;
import javababushka.benchmarks.clients.SyncClient;
import javababushka.benchmarks.utils.ConnectionSettings;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.channel.unix.DomainSocketAddress;
import javababushka.client.RedisClient;
import response.ResponseOuterClass;

import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;

@VisibleForTesting
public class JniNettyClient implements SyncClient, AsyncClient<Response>, AutoCloseable {

public static boolean ALWAYS_FLUSH_ON_WRITE = false;

// https://netty.io/3.6/api/org/jboss/netty/handler/queue/BufferedWriteHandler.html
private final static int AUTO_FLUSH_THRESHOLD = 512;//1024;
private final AtomicInteger nonFlushedCounter = new AtomicInteger(0);
// Flush every N bytes if !ALWAYS_FLUSH_ON_WRITE
public static int AUTO_FLUSH_THRESHOLD_BYTES = 512;//1024;
private final AtomicInteger nonFlushedBytesCounter = new AtomicInteger(0);

// Flush every N writes if !ALWAYS_FLUSH_ON_WRITE
public static int AUTO_FLUSH_THRESHOLD_WRITES = 10;
private final AtomicInteger nonFlushedWritesCounter = new AtomicInteger(0);

private final static int AUTO_FLUSH_TIMEOUT_MILLIS = 100;
// If !ALWAYS_FLUSH_ON_WRITE and a command has no response in N millis, flush (probably it wasn't send)
public static int AUTO_FLUSH_RESPONSE_TIMEOUT_MILLIS = 100;
// If !ALWAYS_FLUSH_ON_WRITE flush on timer (like a cron)
public static int AUTO_FLUSH_TIMER_MILLIS = 200;

private final static int PENDING_RESPONSES_ON_CLOSE_TIMEOUT_MILLIS = 1000;
public static int PENDING_RESPONSES_ON_CLOSE_TIMEOUT_MILLIS = 1000;

// Futures to handle responses. Index is callback id, starting from 1 (0 index is for connection request always).
// TODO clean up completed futures
Expand Down Expand Up @@ -158,7 +154,7 @@ private void createChannel() {
try {
channel = new Bootstrap()
.option(ChannelOption.WRITE_BUFFER_WATER_MARK,
new WriteBufferWaterMark(1024 * 1024 * 2 + 10, 1024 * 1024 * 10))
new WriteBufferWaterMark(1024, 4096))
.option(ChannelOption.ALLOCATOR, ByteBufAllocator.DEFAULT)
.group(group = isMacOs ? new KQueueEventLoopGroup() : new EpollEventLoopGroup())
.channel(isMacOs ? KQueueDomainSocketChannel.class : EpollDomainSocketChannel.class)
Expand Down Expand Up @@ -212,18 +208,22 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
var bytes = (byte[])msg;

boolean needFlush = false;
synchronized (nonFlushedCounter) {
if (nonFlushedCounter.addAndGet(bytes.length) >= AUTO_FLUSH_THRESHOLD) {
nonFlushedCounter.set(0);
needFlush = true;
if (!ALWAYS_FLUSH_ON_WRITE) {
synchronized (nonFlushedBytesCounter) {
if (nonFlushedBytesCounter.addAndGet(bytes.length) >= AUTO_FLUSH_THRESHOLD_BYTES
|| nonFlushedWritesCounter.incrementAndGet() >= AUTO_FLUSH_THRESHOLD_WRITES) {
nonFlushedBytesCounter.set(0);
nonFlushedWritesCounter.set(0);
needFlush = true;
}
}
}
super.write(ctx, Unpooled.copiedBuffer(bytes), promise);
if (needFlush) {
// flush outside the sync block
flush(ctx);
//System.out.println("-- auto flush - buffer");
}
super.write(ctx, Unpooled.copiedBuffer(bytes), promise);
}

@Override
Expand All @@ -250,6 +250,17 @@ protected void eventReceived(ChannelHandlerContext ctx, String evt) throws Excep
System.err.printf("Failed to create a channel %s: %s%n", e.getClass().getSimpleName(), e.getMessage());
e.printStackTrace(System.err);
}

if (!ALWAYS_FLUSH_ON_WRITE) {
new Timer(true).scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
channel.flush();
nonFlushedBytesCounter.set(0);
nonFlushedWritesCounter.set(0);
}
}, 0, AUTO_FLUSH_TIMER_MILLIS);
}
}

@Override
Expand Down Expand Up @@ -439,19 +450,25 @@ private CompletableFuture<Response> submitNewCommand(RequestType command, List<S
.setSimpleRoutes(SimpleRoutes.AllNodes)
.build())
.build();
if (ALWAYS_FLUSH_ON_WRITE) {
channel.writeAndFlush(request.toByteArray());
return responses.get(callbackId);
}
channel.write(request.toByteArray());
return autoFlushFutureWrapper(responses.get(callbackId));
}

private <T> CompletableFuture<T> autoFlushFutureWrapper(Future<T> future) {
return CompletableFuture.supplyAsync(() -> {
try {
return future.get(AUTO_FLUSH_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
return future.get(AUTO_FLUSH_RESPONSE_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
} catch (TimeoutException e) {
//System.out.println("-- auto flush - timeout");
channel.flush();
nonFlushedBytesCounter.set(0);
nonFlushedWritesCounter.set(0);
}
try {
return future.get();
Expand Down
Loading

0 comments on commit dd7413b

Please sign in to comment.