Skip to content

Commit

Permalink
Client optimizations. (#37)
Browse files Browse the repository at this point in the history
* Client optimizations.

Signed-off-by: Yury-Fridlyand <[email protected]>

* minor cleanup.

Signed-off-by: Yury-Fridlyand <[email protected]>

* Optimize building a command.

Signed-off-by: Yury-Fridlyand <[email protected]>

* Typo fix.

Signed-off-by: Yury-Fridlyand <[email protected]>

* Minor rename.

Signed-off-by: Yury-Fridlyand <[email protected]>

* Clean up Redis close connection

Signed-off-by: Andrew Carbonetto <[email protected]>

* Clean up Redis close connection

Signed-off-by: Andrew Carbonetto <[email protected]>

* Minor changes.

Signed-off-by: Yury-Fridlyand <[email protected]>

* Add todos to closeConnection()

Signed-off-by: Andrew Carbonetto <[email protected]>

---------

Signed-off-by: Yury-Fridlyand <[email protected]>
Signed-off-by: Andrew Carbonetto <[email protected]>
Co-authored-by: Andrew Carbonetto <[email protected]>
  • Loading branch information
Yury-Fridlyand and acarbonetto authored Nov 16, 2023
1 parent a70c907 commit fbea007
Show file tree
Hide file tree
Showing 5 changed files with 52 additions and 25 deletions.
6 changes: 3 additions & 3 deletions java/benchmarks/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,14 @@ java {
application {
// Define the main class for the application.
mainClass = 'javababushka.benchmarks.BenchmarkingApp'
applicationDefaultJvmArgs += "-Djava.library.path=${projectDir}/../target/release"
applicationDefaultJvmArgs += "-Djava.library.path=${projectDir}/../target/release:${projectDir}/../target/debug"
}

tasks.withType(Test) {
tasks.withType(Test) {
testLogging {
exceptionFormat "full"
events "started", "skipped", "passed", "failed"
showStandardStreams true
}
jvmArgs "-Djava.library.path=${projectDir}/../target/debug"
jvmArgs "-Djava.library.path=${projectDir}/../target/release:${projectDir}/../target/debug"
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public void connectToRedis() {

@Override
public void connectToRedis(ConnectionSettings connectionSettings) {
waitForResult(asyncConnectToRedis(connectionSettings));
waitForResult(asyncConnectToRedis(connectionSettings));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,15 +148,12 @@ public static void printResults(
public static void testClientSetGet(
Supplier<Client> clientCreator, BenchmarkingApp.RunConfiguration config, boolean async) {
for (int concurrentNum : config.concurrentTasks) {
int iterations = 100000;
Math.min(Math.max(LATENCY_MIN, concurrentNum * LATENCY_MULTIPLIER), LATENCY_MAX);
int iterations =
Math.min(Math.max(LATENCY_MIN, concurrentNum * LATENCY_MULTIPLIER), LATENCY_MAX);
for (int clientCount : config.clientCount) {
for (int dataSize : config.dataSize) {
System.out.printf(
"%n =====> %s <===== %d clients %d concurrent %d data %n%n",
clientCreator.get().getName(), clientCount, concurrentNum, dataSize);
AtomicInteger iterationCounter = new AtomicInteger(0);
// Collections.synchronizedList

Map<ChosenAction, List<Long>> actionResults =
Map.of(
ChosenAction.GET_EXISTING, new ArrayList<>(),
Expand All @@ -172,6 +169,12 @@ public static void testClientSetGet(
clients.add(newClient);
}

String clientName = clients.get(0).getName();

System.out.printf(
"%n =====> %s <===== %d clients %d concurrent %d data %n%n",
clientName, clientCount, concurrentNum, dataSize);

for (int taskNum = 0; taskNum < concurrentNum; taskNum++) {
final int taskNumDebugging = taskNum;
tasks.add(
Expand Down Expand Up @@ -214,7 +217,7 @@ public static void testClientSetGet(
});
}
if (config.debugLogging) {
System.out.printf("%s client Benchmarking: %n", clientCreator.get().getName());
System.out.printf("%s client Benchmarking: %n", clientName);
System.out.printf(
"===> concurrentNum = %d, clientNum = %d, tasks = %d%n",
concurrentNum, clientCount, tasks.size());
Expand Down Expand Up @@ -257,7 +260,7 @@ public static void testClientSetGet(
calculatedResults,
config.resultsFile.get(),
dataSize,
clientCreator.get().getName(),
clientName,
clientCount,
concurrentNum,
iterations / ((after - before) / TPS_NORMALIZATION));
Expand Down
20 changes: 16 additions & 4 deletions java/client/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,23 @@ dependencies {

tasks.register('protobuf', Exec) {
doFirst {
project.mkdir(Paths.get(project.projectDir.path, 'src/main/java/org/babushka/javababushka/generated').toString())
project.mkdir(Paths.get(project.projectDir.path, 'src/main/java/javababushka/generated').toString())
}
commandLine 'protoc',
'-Iprotobuf=babushka-core/src/protobuf/',
'--java_out=java/client/src/main/java/org/babushka/javababushka/generated',
'--java_out=java/client/src/main/java/javababushka/generated',
'babushka-core/src/protobuf/connection_request.proto',
'babushka-core/src/protobuf/redis_request.proto',
'babushka-core/src/protobuf/response.proto'
workingDir Paths.get(project.rootDir.path, '..').toFile()
}

tasks.register('cleanProtobuf') {
doFirst {
project.delete(Paths.get(project.projectDir.path, 'src/main/java/javababushka/generated').toString())
}
}

tasks.register('buildRust', Exec) {
commandLine 'cargo', 'build', '--release'
workingDir project.rootDir
Expand All @@ -54,7 +60,13 @@ tasks.register('buildAll') {
}

compileJava.dependsOn('protobuf')
clean.dependsOn('cleanProtobuf')

test {
systemProperty("java.library.path", "${projectDir}/../target/release")
tasks.withType(Test) {
testLogging {
exceptionFormat "full"
events "started", "skipped", "passed", "failed"
showStandardStreams true
}
jvmArgs "-Djava.library.path=${projectDir}/../target/release:${projectDir}/../target/debug"
}
30 changes: 21 additions & 9 deletions java/client/src/main/java/javababushka/Client.java
Original file line number Diff line number Diff line change
Expand Up @@ -227,28 +227,40 @@ public void run() {
}

public void closeConnection() {
try {
channel.flush();

// flush and close the channel
channel.flush();
channel.close();
// TODO: check that the channel is closed

// shutdown the event loop group gracefully by waiting for the remaining response
// and then shutting down the connection
try {
long waitStarted = System.nanoTime();
long waitUntil =
waitStarted + PENDING_RESPONSES_ON_CLOSE_TIMEOUT_MILLIS * 100_000; // in nanos
for (var future : responses) {
if (future == null || future.isDone()) {
for (var responseFuture : responses) {
if (responseFuture == null || responseFuture.isDone()) {
continue;
}
try {
future.get(waitUntil - System.nanoTime(), TimeUnit.NANOSECONDS);
responseFuture.get(waitUntil - System.nanoTime(), TimeUnit.NANOSECONDS);
} catch (InterruptedException | ExecutionException ignored) {
// TODO: print warning
} catch (TimeoutException e) {
future.cancel(true);
// TODO cancel the rest
responseFuture.cancel(true);
// TODO: cancel the rest
break;
}
}
} finally {
// channel.closeFuture().sync()
group.shutdownGracefully();
var shuttingDown = group.shutdownGracefully();
try {
shuttingDown.get();
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
assert group.isShutdown() : "Redis connection did not shutdown gracefully";
}
}

Expand Down

0 comments on commit fbea007

Please sign in to comment.