From fe74879e7f5c738537f4d871b8e5ea6656e074fb Mon Sep 17 00:00:00 2001 From: Gareth Johnston Date: Wed, 9 Oct 2024 11:13:00 +0100 Subject: [PATCH 1/3] Make reference selection deterministic --- .../simulator/tests/cp/CPMapTest.java | 24 +++++++++++-------- .../tests/cp/IAtomicReferenceTest.java | 19 +++++++++------ 2 files changed, 26 insertions(+), 17 deletions(-) diff --git a/java/drivers/driver-hazelcast4plus/src/main/java/com/hazelcast/simulator/tests/cp/CPMapTest.java b/java/drivers/driver-hazelcast4plus/src/main/java/com/hazelcast/simulator/tests/cp/CPMapTest.java index 81884e183..aa7269ab2 100644 --- a/java/drivers/driver-hazelcast4plus/src/main/java/com/hazelcast/simulator/tests/cp/CPMapTest.java +++ b/java/drivers/driver-hazelcast4plus/src/main/java/com/hazelcast/simulator/tests/cp/CPMapTest.java @@ -95,25 +95,25 @@ private String[] createCpGroupNames() { @TimeStep(prob = 1) public void set(ThreadState state) { - state.randomMap().set(state.randomKey(), state.randomValue()); + state.getNextMap().set(state.randomKey(), state.randomValue()); state.operationCounter.setCount++; } @TimeStep(prob = 0) public void put(ThreadState state) { - state.randomMap().put(state.randomKey(), state.randomValue()); + state.getNextMap().put(state.randomKey(), state.randomValue()); state.operationCounter.putCount++; } @TimeStep(prob = 0) public void putIfAbsent(ThreadState state) { - state.randomMap().putIfAbsent(state.randomKey(), state.randomValue()); + state.getNextMap().putIfAbsent(state.randomKey(), state.randomValue()); state.operationCounter.putIfAbsentCount++; } @TimeStep(prob = 0) public void get(ThreadState state) { - state.randomMap().get(state.randomKey()); + state.getNextMap().get(state.randomKey()); state.operationCounter.getCount++; } @@ -122,19 +122,19 @@ public void get(ThreadState state) { @TimeStep(prob = 0) public void remove(ThreadState state) { - state.randomMap().remove(state.randomKey()); + state.getNextMap().remove(state.randomKey()); state.operationCounter.removeCount++; } @TimeStep(prob = 0) public void delete(ThreadState state) { - state.randomMap().delete(state.randomKey()); + state.getNextMap().delete(state.randomKey()); state.operationCounter.deleteCount++; } @TimeStep(prob = 0) public void cas(ThreadState state) { - CPMap randomMap = state.randomMap(); + CPMap randomMap = state.getNextMap(); Integer key = state.randomKey(); byte[] expectedValue = randomMap.get(key); if (expectedValue != null) { @@ -145,7 +145,7 @@ public void cas(ThreadState state) { @TimeStep(prob = 0) public void setThenDelete(ThreadState state) { - CPMap map = state.randomMap(); + CPMap map = state.getNextMap(); int key = state.randomKey(); map.set(key, state.randomValue()); map.delete(key); @@ -183,6 +183,7 @@ public void verify() { public class ThreadState extends BaseThreadState { + private int currentMapIndex = 0; final CpMapOperationCounter operationCounter = new CpMapOperationCounter(); public int randomKey() { @@ -193,8 +194,11 @@ public byte[] randomValue() { return values[randomInt(valuesCount)]; // [0, values) } - public CPMap randomMap() { - return mapReferences.get(randomInt(maps)); + public CPMap getNextMap() { + if (currentMapIndex == maps) { + currentMapIndex = 0; + } + return mapReferences.get(currentMapIndex++); } } } \ No newline at end of file diff --git a/java/drivers/driver-hazelcast4plus/src/main/java/com/hazelcast/simulator/tests/cp/IAtomicReferenceTest.java b/java/drivers/driver-hazelcast4plus/src/main/java/com/hazelcast/simulator/tests/cp/IAtomicReferenceTest.java index 31d829193..88d799279 100644 --- a/java/drivers/driver-hazelcast4plus/src/main/java/com/hazelcast/simulator/tests/cp/IAtomicReferenceTest.java +++ b/java/drivers/driver-hazelcast4plus/src/main/java/com/hazelcast/simulator/tests/cp/IAtomicReferenceTest.java @@ -62,31 +62,31 @@ public void prepare() { @TimeStep(prob = 0) public String get(ThreadState state) { - IAtomicReference reference = state.randomRef(); + IAtomicReference reference = state.getNextAtomicReference(); return reference.get(); } @TimeStep(prob = 1) public void set(ThreadState state) { - IAtomicReference reference = state.randomRef(); + IAtomicReference reference = state.getNextAtomicReference(); reference.set(value); } @TimeStep(prob = 0) public void alter(ThreadState state) { - IAtomicReference reference = state.randomRef(); + IAtomicReference reference = state.getNextAtomicReference(); reference.alter(state.identity); } @TimeStep(prob = 0) public boolean cas(ThreadState state) { - IAtomicReference reference = state.randomRef(); + IAtomicReference reference = state.getNextAtomicReference(); return reference.compareAndSet(value, value); } @TimeStep(prob = 0) public void casOptimisticConcurrencyControl(ThreadState state) { - IAtomicReference reference = state.randomRef(); + IAtomicReference reference = state.getNextAtomicReference(); // todo: what is the point of the loop? It will always succeed because there is just a single value. // So the performance will be exactly the same as the cas timestep method. @@ -100,8 +100,13 @@ public void casOptimisticConcurrencyControl(ThreadState state) { public class ThreadState extends BaseThreadState { - public IAtomicReference randomRef() { - return references[randomInt(referenceCount)]; + private int currentReferenceIndex = 0; + + public IAtomicReference getNextAtomicReference() { + if (currentReferenceIndex == referenceCount) { + currentReferenceIndex = 0; + } + return references[currentReferenceIndex++]; } final IFunction identity = s -> s; From d22a9c4be2b8147619cef6ff275024b04ac70460 Mon Sep 17 00:00:00 2001 From: Gareth Johnston Date: Sat, 12 Oct 2024 17:31:50 +0100 Subject: [PATCH 2/3] introduce latency configuration --- README.md | 97 +++++++++++++++++++- src/apply_latencies.py | 201 +++++++++++++++++++++++++++++++++++++++++ src/inventory_cli.py | 27 ++++++ 3 files changed, 324 insertions(+), 1 deletion(-) create mode 100644 src/apply_latencies.py diff --git a/README.md b/README.md index 6b477e304..86e164b10 100644 --- a/README.md +++ b/README.md @@ -1730,8 +1730,103 @@ but on the server side `pps_allowance_exceeded` might show 0 events/s. For any pair of instances A and B, it is advised to run the PPS test for both A and B as the server. This ensure a clear picture of all the PPS limits across instances. +___ +### Injecting latencies -### CP subsystem leader priority +It's often useful to simulate the effects of network latency between nodes and clients, especially in scenarios where you want to test performance across multiple data centers (DCs) or regions. + +Simulator provides a convenience command to inject network latencies between groups of clients and nodes using the inventory inject_latencies command. +This allows you to simulate different network conditions in a simple setup. + +There are two methods to inject latencies: + +1. Basic global assignment: Assign a single latency value that will be applied bi-directionally between all groups. +2. Advanced group assignment: Assign different latency values between different groups. + +For both methods you must have created your machines using the `inventory apply` command and edit the generated +`inventory.yaml`in your project root to assign each host to a group. + +```yaml +loadgenerators: + hosts: + 3.121.207.133: + ansible_ssh_private_key_file: key + ansible_user: ec2-user + private_ip: 10.0.55.38 + group: group1 + +nodes: + hosts: + 3.122.199.101: + ansible_ssh_private_key_file: key + ansible_user: ec2-user + private_ip: 10.0.44.25 + group: group2 +``` +In this example, we have two hosts, each belonging to a different group (group1 and group2). Latencies will only be applied between different groups. +If a host is not assigned a group, no direction of latency will be applied to that host. +___ +**Method 1: Basic Global Assignment** + +Once your hosts are grouped, you can inject latencies by running the following command: + +```bash +inventory inject_latencies --latency 10 --interface eth0 --rtt +```` +This command will: + +Apply a round-trip time (RTT) latency of 10ms between hosts in different groups (e.g., between group1 and group2). +No latency will be applied between hosts in the same group (e.g., clients or members within group1 or group2 will communicate without added delay). + +Explanation of Flags: + +`--latency`: The desired latency in milliseconds (in this case, 10ms). + +`--interface`: The network interface where the latency should be applied (default is eth0). + +`--rtt`: When set, this flag indicates the latency is a round-trip time (RTT), meaning 10ms latency will be split into 5ms each way. + +___ +**Method 2: Advanced Group Assignment** + +If you want to assign different latencies between different groups, you can do so by supplying a latency profile in a YAML file. + +Create a YAML file with the following format: + + +```yaml +latency_profiles: + relationships: + - source: group1 + target: group2 + latency: 5 + - source: group2 + target: group1 + latency: 2 + default_latency: 0 +``` + +In this example, we have two groups (group1 and group2) with different latencies between them. +Communication from group1 to group2 will have a 5ms latency, while communication from group2 to group1 will have a 2ms latency. + +The default_latency serves as a fallback value in case no specific latency is defined between two groups. + +To apply these latencies, run the following command: + +```bash +inventory inject_latencies --interface eth0 --profiles latency_profile.yaml +```` + +Explanation of Flags: + +`--interface`: The network interface where the latency should be applied (default is eth0). + +`--profiles`: The path to the YAML file containing the latency profiles. + +Note that if the latency profile file is not provided, the command will default to the basic global assignment method. +--- + +## CP subsystem leader priority It is possible to assign leadership priority to a member or list of members in a CP group(s). This is useful when you want to attribute certain behaviours to an agent in the cluster. For example, you may wish to inject a latency diff --git a/src/apply_latencies.py b/src/apply_latencies.py new file mode 100644 index 000000000..fb9e7dae7 --- /dev/null +++ b/src/apply_latencies.py @@ -0,0 +1,201 @@ +import yaml +import paramiko +import time +from concurrent.futures import ThreadPoolExecutor, as_completed + + +def load_yaml(file_path): + with open(file_path, 'r') as stream: + try: + return yaml.safe_load(stream) + except yaml.YAMLError as exc: + print(f"Error loading YAML file: {exc}") + return None + + +def check_package_manager(ssh_client): + stdin, stdout, stderr = ssh_client.exec_command("which yum") + if stdout.read().decode().strip(): + return "yum" + stdin, stdout, stderr = ssh_client.exec_command("which apt-get") + if stdout.read().decode().strip(): + return "apt" + return None + + +def apply_multi_latency(ssh_client, interface, target_ips, latency, band_number): + stdin, stdout, stderr = ssh_client.exec_command("which tc") + if not stdout.read().decode().strip(): + install_command = ( + "sudo yum install -y iproute-tc" if "yum" in check_package_manager(ssh_client) + else "sudo apt-get update && sudo apt-get install -y iproute2" + ) + ssh_client.exec_command(install_command) + time.sleep(2) + + try: + # Clear any existing qdiscs + replace_command = f"sudo tc qdisc replace dev {interface} root handle 1: htb" + ssh_client.exec_command(replace_command) + + # Create a class for the specific latency profile + classid = f"1:{band_number}" + print(f"Adding class {classid} with {latency}ms delay") + class_command = f"sudo tc class add dev {interface} parent 1: classid {classid} htb rate 100mbit" + ssh_client.exec_command(class_command) + + # Adjust latency for round-trip time if needed + applied_latency_string = f"{latency}ms" + print(f"Attaching netem to class {classid} with {applied_latency_string} latency.") + netem_command = f"sudo tc qdisc add dev {interface} parent {classid} handle {band_number}0: netem delay {applied_latency_string}" + ssh_client.exec_command(netem_command) + + # Apply u32 filters for each IP + for ip in target_ips: + print(f"Adding filter for {ip} with flowid {classid}") + filter_command = f"sudo tc filter add dev {interface} protocol ip parent 1: prio 1 u32 match ip dst {ip}/32 flowid {classid}" + ssh_client.exec_command(filter_command) + filter_stderr = stderr.read().decode() + if "Error" in filter_stderr: + print(f"Error adding filter for {ip}: {filter_stderr}. Aborting filter setup for this IP.") + + # Verify the setup + print(f"Verifying qdisc on {interface}") + verify_qdisc_command = f"sudo tc qdisc show dev {interface}" + stdin, stdout, stderr = ssh_client.exec_command(verify_qdisc_command) + print(f"Qdisc verification: {stdout.read().decode()}") + + print(f"Verifying filters on {interface}") + verify_filter_command = f"sudo tc filter show dev {interface}" + stdin, stdout, stderr = ssh_client.exec_command(verify_filter_command) + print(f"Filter verification: {stdout.read().decode()}") + + print("Finished applying latencies.") + except Exception as e: + print(f"An error occurred while applying latency: {str(e)}") + + +def apply_latency_global(ssh_client, interface, target_ips, latency, is_rtt): + stdin, stdout, stderr = ssh_client.exec_command("which tc") + if stdout.read().decode().strip() == "": + ssh_client.exec_command("sudo yum install -y iproute-tc") + time.sleep(2) + + # clear any existing qdiscs + ssh_client.exec_command(f"sudo tc qdisc del dev {interface} root 2> /dev/null") + # adjust latency for round-trip time if needed + + applied_latency = latency / 2 if is_rtt else latency + + # set up the root qdisc and netem + commands = [ + f"sudo tc qdisc add dev {interface} root handle 1: prio", + f"sudo tc qdisc add dev {interface} parent 1:3 handle 30: netem delay {applied_latency}ms" + ] + + # execute the initial commands + for command in commands: + stdin, stdout, stderr = ssh_client.exec_command(command) + stdout.channel.recv_exit_status() # Ensure command completes + print(f"Running: {command}") + print(stdout.read().decode()) + print(stderr.read().decode()) + + # apply the filter command for each IP individually + for ip in target_ips: + print(f"Adding latency for {ip}") + filter_command = f"sudo tc filter add dev {interface} protocol ip parent 1:0 prio 1 u32 match ip dst {ip}/32 flowid 1:3" + stdin, stdout, stderr = ssh_client.exec_command(filter_command) + stdout.channel.recv_exit_status() + print(f"Running: {filter_command}") + print(stdout.read().decode()) + print(stderr.read().decode()) + + +def configure_latencies(machine, all_hosts, target_latency, network_interface, rtt=False, latency_profiles=None): + groupId, private_ip, public_ip, user, key, type = machine + print(f"Configuring latencies for {public_ip}") + + target_ips = [] + if latency_profiles and 'relationships' in latency_profiles: + for profile in latency_profiles['relationships']: + if profile['source'] == groupId and profile['target'] != groupId: + target_ips.append((profile['target'], profile['latency'])) + print(f"Adding relationship: {profile['source']} -> {profile['target']} with {profile['latency']}ms latency.") + + if not target_ips: + print(f"No target IPs for {groupId}. Skipping latency configuration.") + return + + # Apply the latency only if this machine is the source and has targets + for idx, (target_group, latency) in enumerate(target_ips): + ips = [m[1] for m in all_hosts if m[0] == target_group] + print(f"Target IPs for group {target_group} from {groupId}: {ips}") + if ips: + ssh_client = paramiko.SSHClient() + ssh_client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) + try: + print(f"Connecting to {public_ip} to apply latency.") + ssh_client.connect(public_ip, username=user, key_filename=key) + + band_number = 10 + idx # Use a unique band number for each relationship + print(f"Applying latency between {groupId} and {target_group}: {latency}ms with band {band_number}") + apply_multi_latency(ssh_client, network_interface, ips, latency, band_number) + except Exception as e: + print(f"Error connecting to {public_ip}: {str(e)}") + finally: + ssh_client.close() + else: + print(f"No latency profiles found. Using global latency per group.") + ssh_client = paramiko.SSHClient() + ssh_client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) + target_ips = [] + if type == 'client': + # Clients should only add members in other data centers + target_ips.extend(m[1] for m in all_hosts if m[0] != groupId and m[5] == 'member') + elif type == 'member': + # Members should add both clients and other members in different data centers + target_ips.extend(m[1] for m in all_hosts if m[0] != groupId and m[5] == 'client') + target_ips.extend(m[1] for m in all_hosts if m[0] != groupId and m[5] == 'member') + try: + ssh_client.connect(public_ip, username=user, key_filename=key) + print(f"Applying fallback latency of {target_latency}ms.") + apply_latency_global(ssh_client, network_interface, target_ips, target_latency, rtt) + except Exception as e: + print(f"Error connecting to {public_ip}: {str(e)}") + finally: + ssh_client.close() + + +def inject_latencies(target_latency, network_interface, rtt, profiles_path=None): + yaml_file_path = 'inventory.yaml' + config = load_yaml(yaml_file_path) + latency_profiles = load_yaml(profiles_path) if profiles_path else None + + if latency_profiles and 'latency_profiles' in latency_profiles: + latency_profiles = latency_profiles['latency_profiles'] + print(f"Loaded latency profiles from {profiles_path}.") + else: + print(f"No latency profiles found at {profiles_path}. Using default latency.") + + if config: + all_hosts = [] + for group, host_type in [('loadgenerators', 'client'), ('nodes', 'member')]: + hosts = config.get(group, {}).get('hosts', {}) + for public_ip, details in hosts.items(): + private_ip = details['private_ip'] + group = details.get('group', 'default') + user = details['ansible_user'] + key = details['ansible_ssh_private_key_file'] + all_hosts.append((group, private_ip, public_ip, user, key, host_type)) + + with ThreadPoolExecutor(max_workers=5) as executor: + futures = [ + executor.submit(configure_latencies, machine, all_hosts, target_latency, network_interface, rtt, latency_profiles) + for machine in all_hosts + ] + for future in as_completed(futures): + try: + future.result() # If any thread throws an exception, it will be raised here + except Exception as e: + print(f"Error occurred during latency configuration: {str(e)}") diff --git a/src/inventory_cli.py b/src/inventory_cli.py index 58cd92d4a..c3c46cd9f 100644 --- a/src/inventory_cli.py +++ b/src/inventory_cli.py @@ -4,6 +4,7 @@ import argparse from os import path +from apply_latencies import inject_latencies from simulator.inventory_terraform import terraform_import, terraform_destroy, terraform_apply from simulator.util import load_yaml_file, exit_with_error, simulator_home, shell, now_seconds from simulator.log import info, log_header @@ -21,6 +22,7 @@ new_key Creates a new public/private keypair shell Executes a shell command on the inventory tune Tunes the environment + inject_latencies Injects latencies between inventory nodes ''' # for git like arg parsing: @@ -416,6 +418,28 @@ def __init__(self, argv): new_key(args.name[0]) +class InventoryInjectLatenciesCli: + + def __init__(self, argv): + parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter, + description='Injects latencies between inventory nodes') + parser.add_argument("--latency", help="The target latency to apply (in ms)", type=int) + parser.add_argument("--interface", default="eth0", help="Network interface to apply latency on (default: eth0)") + parser.add_argument("--rtt", action='store_true', help="Specify this flag to apply round-trip latency") + parser.add_argument("--profiles", help="Path to the latency profiles YAML file") + + args = parser.parse_args(argv) + + self.target_latency = args.latency + self.network_interface = args.interface + self.rtt = args.rtt + self.profiles = args.profiles + + log_header("Injecting Latencies") + inject_latencies(self.target_latency, self.network_interface, self.rtt, self.profiles) + log_header("Injecting Latencies: Done") + + class InventoryCli: def __init__(self): @@ -456,6 +480,9 @@ def new_key(self, argv): def tune(self, argv): TuneCli(argv) + def inject_latencies(self, argv): + InventoryInjectLatenciesCli(argv) + if __name__ == '__main__': InventoryCli() From 5bc22b829d70eaf5313d8da55174bc8216402762 Mon Sep 17 00:00:00 2001 From: Gareth Johnston Date: Mon, 14 Oct 2024 00:09:59 +0100 Subject: [PATCH 3/3] update readme --- README.md | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index 86e164b10..9d32b827d 100644 --- a/README.md +++ b/README.md @@ -1740,7 +1740,7 @@ This allows you to simulate different network conditions in a simple setup. There are two methods to inject latencies: -1. Basic global assignment: Assign a single latency value that will be applied bi-directionally between all groups. +1. Basic global assignment: Assign a single latency value that will be applied bi-directionally between groups. 2. Advanced group assignment: Assign different latency values between different groups. For both methods you must have created your machines using the `inventory apply` command and edit the generated @@ -1765,6 +1765,8 @@ nodes: ``` In this example, we have two hosts, each belonging to a different group (group1 and group2). Latencies will only be applied between different groups. If a host is not assigned a group, no direction of latency will be applied to that host. + +Note that the latency is applied to the outbound traffic from a source group to the target group. ___ **Method 1: Basic Global Assignment** @@ -1785,6 +1787,7 @@ Explanation of Flags: `--interface`: The network interface where the latency should be applied (default is eth0). `--rtt`: When set, this flag indicates the latency is a round-trip time (RTT), meaning 10ms latency will be split into 5ms each way. + Otherwise, 10ms will be applied in each direction. ___ **Method 2: Advanced Group Assignment** @@ -1811,6 +1814,9 @@ Communication from group1 to group2 will have a 5ms latency, while communication The default_latency serves as a fallback value in case no specific latency is defined between two groups. +`source`: The group of nodes where the latency will be applied to outgoing traffic. +`target`: The group of nodes to which the outgoing traffic from the source group will experience a delay. + To apply these latencies, run the following command: ```bash