From 1bab56a57d2dbd16d680b16b0a340ede146e8f8e Mon Sep 17 00:00:00 2001 From: Yury-Fridlyand Date: Thu, 5 Oct 2023 14:01:59 -0700 Subject: [PATCH] Add option to run tests on multiple clients in concurrency (#16) * Add option to run tests on multiple clients in concurrency * Common pool of iterations. * Awaiting result from async methods. Signed-off-by: Yury-Fridlyand * minor fix Signed-off-by: Yury-Fridlyand * Change while-loop; Spotless Apply Signed-off-by: acarbonetto --------- Signed-off-by: Yury-Fridlyand Signed-off-by: acarbonetto Co-authored-by: acarbonetto --- .../benchmarks/BenchmarkingApp.java | 93 +++++----- .../benchmarks/utils/Benchmarking.java | 168 ++++++++++++------ 2 files changed, 167 insertions(+), 94 deletions(-) diff --git a/java/benchmarks/src/main/java/javababushka/benchmarks/BenchmarkingApp.java b/java/benchmarks/src/main/java/javababushka/benchmarks/BenchmarkingApp.java index ad19929d06..66ea1f5029 100644 --- a/java/benchmarks/src/main/java/javababushka/benchmarks/BenchmarkingApp.java +++ b/java/benchmarks/src/main/java/javababushka/benchmarks/BenchmarkingApp.java @@ -1,16 +1,18 @@ package javababushka.benchmarks; +import static javababushka.benchmarks.utils.Benchmarking.testClientSetGet; + import java.io.FileWriter; import java.io.IOException; import java.util.Arrays; import java.util.List; import java.util.Optional; import java.util.stream.Collectors; +import java.util.stream.Stream; import javababushka.benchmarks.jedis.JedisClient; +import javababushka.benchmarks.jedis.JedisPseudoAsyncClient; import javababushka.benchmarks.lettuce.LettuceAsyncClient; import javababushka.benchmarks.lettuce.LettuceClient; -import javababushka.benchmarks.utils.Benchmarking; -import javababushka.benchmarks.jedis.JedisPseudoAsyncClient; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.CommandLineParser; import org.apache.commons.cli.DefaultParser; @@ -38,34 +40,17 @@ public static void main(String[] args) { for (ClientName client : runConfiguration.clients) { switch (client) { - case ALL: - testClientSetGet(new JedisClient(), runConfiguration); - testClientSetGet(new LettuceClient(), runConfiguration); - testAsyncClientSetGet(new JedisPseudoAsyncClient(), runConfiguration); - testAsyncClientSetGet(new LettuceAsyncClient(), runConfiguration); - System.out.println("Babushka not yet configured"); - break; - case ALL_ASYNC: - testAsyncClientSetGet(new JedisPseudoAsyncClient(), runConfiguration); - testAsyncClientSetGet(new LettuceAsyncClient(), runConfiguration); - System.out.println("Babushka not yet configured"); - break; - case ALL_SYNC: - testClientSetGet(new JedisClient(), runConfiguration); - testClientSetGet(new LettuceClient(), runConfiguration); - System.out.println("Babushka not yet configured"); - break; case JEDIS: - testClientSetGet(new JedisClient(), runConfiguration); + testClientSetGet(JedisClient::new, runConfiguration, false); break; case JEDIS_ASYNC: - testAsyncClientSetGet(new JedisPseudoAsyncClient(), runConfiguration); + testClientSetGet(JedisPseudoAsyncClient::new, runConfiguration, true); break; case LETTUCE: - testClientSetGet(new LettuceClient(), runConfiguration); + testClientSetGet(LettuceClient::new, runConfiguration, false); break; case LETTUCE_ASYNC: - testAsyncClientSetGet(new LettuceAsyncClient(), runConfiguration); + testClientSetGet(LettuceAsyncClient::new, runConfiguration, true); break; case BABUSHKA: System.out.println("Babushka not yet configured"); @@ -90,7 +75,8 @@ private static Options getOptions() { options.addOption("f", "resultsFile", true, "Result filepath []"); options.addOption("d", "dataSize", true, "Data block size [20]"); options.addOption("C", "concurrentTasks", true, "Number of concurrent tasks [1 10 100]"); - options.addOption("l", "clients", true, "one of: all|jedis|lettuce|babushka [all]"); + options.addOption( + "l", "clients", true, "one of: all|jedis|jedis_async|lettuce|lettuce_async|babushka [all]"); options.addOption("h", "host", true, "host url [localhost]"); options.addOption("p", "port", true, "port number [6379]"); options.addOption("n", "clientCount", true, "Client count [1]"); @@ -146,6 +132,30 @@ private static RunConfiguration verifyOptions(CommandLine line) throws ParseExce runConfiguration.clients = Arrays.stream(clients) .map(c -> Enum.valueOf(ClientName.class, c.toUpperCase())) + .flatMap( + e -> { + switch (e) { + case ALL: + return Stream.of( + ClientName.JEDIS, + ClientName.JEDIS_ASYNC, + ClientName.BABUSHKA, + ClientName.LETTUCE, + ClientName.LETTUCE_ASYNC); + case ALL_ASYNC: + return Stream.of( + ClientName.JEDIS_ASYNC, + // ClientName.BABUSHKA, + ClientName.LETTUCE_ASYNC); + case ALL_SYNC: + return Stream.of( + ClientName.JEDIS, + // ClientName.BABUSHKA, + ClientName.LETTUCE); + default: + return Stream.of(e); + } + }) .toArray(ClientName[]::new); } @@ -154,7 +164,15 @@ private static RunConfiguration verifyOptions(CommandLine line) throws ParseExce } if (line.hasOption("clientCount")) { - runConfiguration.clientCount = Integer.parseInt(line.getOptionValue("clientCount")); + String clientCount = line.getOptionValue("clientCount"); + + // check if it's the correct format + if (!clientCount.matches("\\d+(\\s+\\d+)?")) { + throw new ParseException("Invalid concurrentTasks"); + } + // split the string into a list of integers + runConfiguration.clientCount = + Arrays.stream(clientCount.split("\\s+")).mapToInt(Integer::parseInt).toArray(); } if (line.hasOption("tls")) { @@ -164,18 +182,6 @@ private static RunConfiguration verifyOptions(CommandLine line) throws ParseExce return runConfiguration; } - private static void testClientSetGet(Client client, RunConfiguration runConfiguration) { - System.out.printf("%n =====> %s <===== %n%n", client.getName()); - Benchmarking.printResults(Benchmarking.measurePerformance(client, runConfiguration, false)); - System.out.println(); - } - - private static void testAsyncClientSetGet(AsyncClient client, RunConfiguration runConfiguration) { - System.out.printf("%n =====> %s <===== %n%n", client.getName()); - Benchmarking.printResults(Benchmarking.measurePerformance(client, runConfiguration, true)); - System.out.println(); - } - public enum ClientName { JEDIS("Jedis"), JEDIS_ASYNC("Jedis async"), @@ -210,18 +216,23 @@ public static class RunConfiguration { public ClientName[] clients; public String host; public int port; - public int clientCount; + public int[] clientCount; public boolean tls; + public boolean debugLogging = false; public RunConfiguration() { configuration = "Release"; resultsFile = Optional.empty(); dataSize = 20; - concurrentTasks = List.of(1, 10, 100); - clients = new ClientName[] {ClientName.ALL}; + concurrentTasks = List.of(10, 100); + clients = + new ClientName[] { + // ClientName.BABUSHKA, + ClientName.JEDIS, ClientName.JEDIS_ASYNC, ClientName.LETTUCE, ClientName.LETTUCE_ASYNC + }; host = "localhost"; port = 6379; - clientCount = 1; + clientCount = new int[] {1, 2}; tls = false; } } diff --git a/java/benchmarks/src/main/java/javababushka/benchmarks/utils/Benchmarking.java b/java/benchmarks/src/main/java/javababushka/benchmarks/utils/Benchmarking.java index 6a2ec95755..bae34c9373 100644 --- a/java/benchmarks/src/main/java/javababushka/benchmarks/utils/Benchmarking.java +++ b/java/benchmarks/src/main/java/javababushka/benchmarks/utils/Benchmarking.java @@ -1,24 +1,30 @@ package javababushka.benchmarks.utils; import java.io.FileWriter; -import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Supplier; import java.util.stream.Collectors; import javababushka.benchmarks.AsyncClient; import javababushka.benchmarks.BenchmarkingApp; import javababushka.benchmarks.Client; import javababushka.benchmarks.SyncClient; import org.apache.commons.lang3.RandomStringUtils; +import org.apache.commons.lang3.tuple.Pair; public class Benchmarking { static final double PROB_GET = 0.8; static final double PROB_GET_EXISTING_KEY = 0.8; static final int SIZE_GET_KEYSPACE = 3750000; static final int SIZE_SET_KEYSPACE = 3000000; + static final int ASYNC_OPERATION_TIMEOUT_SEC = 1; private static ChosenAction randomAction() { if (Math.random() > PROB_GET) { @@ -40,31 +46,19 @@ public static String generateKeySet() { } public interface Operation { - void go(); + void go() throws Exception; } - public static Map> getLatencies( - int iterations, Map actions) { - Map> latencies = new HashMap>(); - for (ChosenAction action : actions.keySet()) { - latencies.put(action, new ArrayList()); - } - - for (int i = 0; i < iterations; i++) { - ChosenAction action = randomAction(); - Operation op = actions.get(action); - ArrayList actionLatencies = latencies.get(action); - addLatency(op, actionLatencies); - } - - return latencies; - } - - private static void addLatency(Operation op, ArrayList latencies) { + private static Pair getLatency(Map actions) { + var action = randomAction(); long before = System.nanoTime(); - op.go(); + try { + actions.get(action).go(); + } catch (Exception e) { + // timed out - exception from Future::get + } long after = System.nanoTime(); - latencies.add(after - before); + return Pair.of(action, after - before); } // Assumption: latencies is sorted in ascending order @@ -114,8 +108,7 @@ public static Map calculateResults( } public static void printResults( - Map calculatedResults, Optional resultsFile) - throws IOException { + Map calculatedResults, Optional resultsFile) { if (resultsFile.isPresent()) { printResults(calculatedResults, resultsFile.get()); } else { @@ -124,16 +117,19 @@ public static void printResults( } public static void printResults( - Map resultsMap, FileWriter resultsFile) throws IOException { + Map resultsMap, FileWriter resultsFile) { for (Map.Entry entry : resultsMap.entrySet()) { ChosenAction action = entry.getKey(); LatencyResults results = entry.getValue(); - resultsFile.write("Avg. time in ms per " + action + ": " + results.avgLatency / 1000000.0); - resultsFile.write(action + " p50 latency in ms: " + results.p50Latency / 1000000.0); - resultsFile.write(action + " p90 latency in ms: " + results.p90Latency / 1000000.0); - resultsFile.write(action + " p99 latency in ms: " + results.p99Latency / 1000000.0); - resultsFile.write(action + " std dev in ms: " + results.stdDeviation / 1000000.0); + try { + resultsFile.write("Avg. time in ms per " + action + ": " + results.avgLatency / 1000000.0); + resultsFile.write(action + " p50 latency in ms: " + results.p50Latency / 1000000.0); + resultsFile.write(action + " p90 latency in ms: " + results.p90Latency / 1000000.0); + resultsFile.write(action + " p99 latency in ms: " + results.p99Latency / 1000000.0); + resultsFile.write(action + " std dev in ms: " + results.stdDeviation / 1000000.0); + } catch (Exception ignored) { + } } } @@ -150,41 +146,107 @@ public static void printResults(Map resultsMap) { } } - public static Map measurePerformance( - Client client, BenchmarkingApp.RunConfiguration config, boolean async) { - client.connectToRedis(new ConnectionSettings(config.host, config.port, config.tls)); - - int iterations = 10000; - String value = RandomStringUtils.randomAlphanumeric(config.dataSize); - - if (config.resultsFile.isPresent()) { - try { - config.resultsFile.get().write(client.getName() + " client Benchmarking: "); - } catch (Exception ignored) { + public static void testClientSetGet( + Supplier clientCreator, BenchmarkingApp.RunConfiguration config, boolean async) { + for (int concurrentNum : config.concurrentTasks) { + int iterations = Math.min(Math.max(100000, concurrentNum * 10000), 10000000); + for (int clientNum : config.clientCount) { + System.out.printf( + "%n =====> %s <===== %d clients %d concurrent %n%n", + clientCreator.get().getName(), clientNum, concurrentNum); + AtomicInteger iterationCounter = new AtomicInteger(0); + Map> actionResults = + Map.of( + ChosenAction.GET_EXISTING, new ArrayList<>(), + ChosenAction.GET_NON_EXISTING, new ArrayList<>(), + ChosenAction.SET, new ArrayList<>()); + List tasks = new ArrayList<>(); + + for (int i = 0; i < concurrentNum; ) { + for (int j = 0; j < clientNum; j++) { + i++; + int finalI = i; + int finalJ = j; + var client = clientCreator.get(); + client.connectToRedis(new ConnectionSettings(config.host, config.port, config.tls)); + tasks.add( + () -> { + if (config.debugLogging) { + System.out.printf( + "%n concurrent = %d/%d, client# = %d/%d%n", + finalI, concurrentNum, finalJ + 1, clientNum); + } + int iterationIncrement = iterationCounter.getAndIncrement(); + while (iterationIncrement < iterations) { + if (config.debugLogging) { + System.out.printf( + "> iteration = %d/%d, client# = %d/%d%n", + iterationIncrement + 1, iterations, finalJ + 1, clientNum); + } + // operate and calculate tik-tok + Pair result = + measurePerformance(client, config.dataSize, async); + actionResults.get(result.getLeft()).add(result.getRight()); + + iterationIncrement = iterationCounter.getAndIncrement(); + } + }); + } + } + if (config.debugLogging) { + System.out.printf("%s client Benchmarking: %n", clientCreator.get().getName()); + System.out.printf( + "===> concurrentNum = %d, clientNum = %d, tasks = %d%n", + concurrentNum, clientNum, tasks.size()); + } + tasks.stream() + .map(CompletableFuture::runAsync) + .forEach( + f -> { + try { + f.get(); + } catch (Exception e) { + e.printStackTrace(); + } + }); + + printResults(calculateResults(actionResults), config.resultsFile); } - } else { - System.out.printf("%s client Benchmarking: %n", client.getName()); } - Map actions = new HashMap<>(); + System.out.println(); + } + + public static Pair measurePerformance( + Client client, int dataSize, boolean async) { + + String value = RandomStringUtils.randomAlphanumeric(dataSize); + Map actions = new HashMap<>(); actions.put( ChosenAction.GET_EXISTING, async - ? () -> ((AsyncClient) client).asyncGet(Benchmarking.generateKeySet()) - : () -> ((SyncClient) client).get(Benchmarking.generateKeySet())); + ? () -> + ((AsyncClient) client) + .asyncGet(generateKeySet()) + .get(ASYNC_OPERATION_TIMEOUT_SEC, TimeUnit.SECONDS) + : () -> ((SyncClient) client).get(generateKeySet())); actions.put( ChosenAction.GET_NON_EXISTING, async - ? () -> ((AsyncClient) client).asyncGet(Benchmarking.generateKeyGet()) - : () -> ((SyncClient) client).get(Benchmarking.generateKeyGet())); + ? () -> + ((AsyncClient) client) + .asyncGet(generateKeyGet()) + .get(ASYNC_OPERATION_TIMEOUT_SEC, TimeUnit.SECONDS) + : () -> ((SyncClient) client).get(generateKeyGet())); actions.put( ChosenAction.SET, async - ? () -> ((AsyncClient) client).asyncSet(Benchmarking.generateKeySet(), value) - : () -> ((SyncClient) client).set(Benchmarking.generateKeySet(), value)); + ? () -> + ((AsyncClient) client) + .asyncSet(generateKeySet(), value) + .get(ASYNC_OPERATION_TIMEOUT_SEC, TimeUnit.SECONDS) + : () -> ((SyncClient) client).set(generateKeySet(), value)); - var results = Benchmarking.calculateResults(Benchmarking.getLatencies(iterations, actions)); - client.closeConnection(); - return results; + return getLatency(actions); } }