diff --git a/README.md b/README.md index 6b477e304..9d32b827d 100644 --- a/README.md +++ b/README.md @@ -1730,8 +1730,109 @@ but on the server side `pps_allowance_exceeded` might show 0 events/s. For any pair of instances A and B, it is advised to run the PPS test for both A and B as the server. This ensure a clear picture of all the PPS limits across instances. +___ +### Injecting latencies -### CP subsystem leader priority +It's often useful to simulate the effects of network latency between nodes and clients, especially in scenarios where you want to test performance across multiple data centers (DCs) or regions. + +Simulator provides a convenience command to inject network latencies between groups of clients and nodes using the inventory inject_latencies command. +This allows you to simulate different network conditions in a simple setup. + +There are two methods to inject latencies: + +1. Basic global assignment: Assign a single latency value that will be applied bi-directionally between groups. +2. Advanced group assignment: Assign different latency values between different groups. + +For both methods you must have created your machines using the `inventory apply` command and edit the generated +`inventory.yaml`in your project root to assign each host to a group. + +```yaml +loadgenerators: + hosts: + 3.121.207.133: + ansible_ssh_private_key_file: key + ansible_user: ec2-user + private_ip: 10.0.55.38 + group: group1 + +nodes: + hosts: + 3.122.199.101: + ansible_ssh_private_key_file: key + ansible_user: ec2-user + private_ip: 10.0.44.25 + group: group2 +``` +In this example, we have two hosts, each belonging to a different group (group1 and group2). Latencies will only be applied between different groups. +If a host is not assigned a group, no direction of latency will be applied to that host. + +Note that the latency is applied to the outbound traffic from a source group to the target group. +___ +**Method 1: Basic Global Assignment** + +Once your hosts are grouped, you can inject latencies by running the following command: + +```bash +inventory inject_latencies --latency 10 --interface eth0 --rtt +```` +This command will: + +Apply a round-trip time (RTT) latency of 10ms between hosts in different groups (e.g., between group1 and group2). +No latency will be applied between hosts in the same group (e.g., clients or members within group1 or group2 will communicate without added delay). + +Explanation of Flags: + +`--latency`: The desired latency in milliseconds (in this case, 10ms). + +`--interface`: The network interface where the latency should be applied (default is eth0). + +`--rtt`: When set, this flag indicates the latency is a round-trip time (RTT), meaning 10ms latency will be split into 5ms each way. + Otherwise, 10ms will be applied in each direction. + +___ +**Method 2: Advanced Group Assignment** + +If you want to assign different latencies between different groups, you can do so by supplying a latency profile in a YAML file. + +Create a YAML file with the following format: + + +```yaml +latency_profiles: + relationships: + - source: group1 + target: group2 + latency: 5 + - source: group2 + target: group1 + latency: 2 + default_latency: 0 +``` + +In this example, we have two groups (group1 and group2) with different latencies between them. +Communication from group1 to group2 will have a 5ms latency, while communication from group2 to group1 will have a 2ms latency. + +The default_latency serves as a fallback value in case no specific latency is defined between two groups. + +`source`: The group of nodes where the latency will be applied to outgoing traffic. +`target`: The group of nodes to which the outgoing traffic from the source group will experience a delay. + +To apply these latencies, run the following command: + +```bash +inventory inject_latencies --interface eth0 --profiles latency_profile.yaml +```` + +Explanation of Flags: + +`--interface`: The network interface where the latency should be applied (default is eth0). + +`--profiles`: The path to the YAML file containing the latency profiles. + +Note that if the latency profile file is not provided, the command will default to the basic global assignment method. +--- + +## CP subsystem leader priority It is possible to assign leadership priority to a member or list of members in a CP group(s). This is useful when you want to attribute certain behaviours to an agent in the cluster. For example, you may wish to inject a latency diff --git a/src/apply_latencies.py b/src/apply_latencies.py new file mode 100644 index 000000000..fb9e7dae7 --- /dev/null +++ b/src/apply_latencies.py @@ -0,0 +1,201 @@ +import yaml +import paramiko +import time +from concurrent.futures import ThreadPoolExecutor, as_completed + + +def load_yaml(file_path): + with open(file_path, 'r') as stream: + try: + return yaml.safe_load(stream) + except yaml.YAMLError as exc: + print(f"Error loading YAML file: {exc}") + return None + + +def check_package_manager(ssh_client): + stdin, stdout, stderr = ssh_client.exec_command("which yum") + if stdout.read().decode().strip(): + return "yum" + stdin, stdout, stderr = ssh_client.exec_command("which apt-get") + if stdout.read().decode().strip(): + return "apt" + return None + + +def apply_multi_latency(ssh_client, interface, target_ips, latency, band_number): + stdin, stdout, stderr = ssh_client.exec_command("which tc") + if not stdout.read().decode().strip(): + install_command = ( + "sudo yum install -y iproute-tc" if "yum" in check_package_manager(ssh_client) + else "sudo apt-get update && sudo apt-get install -y iproute2" + ) + ssh_client.exec_command(install_command) + time.sleep(2) + + try: + # Clear any existing qdiscs + replace_command = f"sudo tc qdisc replace dev {interface} root handle 1: htb" + ssh_client.exec_command(replace_command) + + # Create a class for the specific latency profile + classid = f"1:{band_number}" + print(f"Adding class {classid} with {latency}ms delay") + class_command = f"sudo tc class add dev {interface} parent 1: classid {classid} htb rate 100mbit" + ssh_client.exec_command(class_command) + + # Adjust latency for round-trip time if needed + applied_latency_string = f"{latency}ms" + print(f"Attaching netem to class {classid} with {applied_latency_string} latency.") + netem_command = f"sudo tc qdisc add dev {interface} parent {classid} handle {band_number}0: netem delay {applied_latency_string}" + ssh_client.exec_command(netem_command) + + # Apply u32 filters for each IP + for ip in target_ips: + print(f"Adding filter for {ip} with flowid {classid}") + filter_command = f"sudo tc filter add dev {interface} protocol ip parent 1: prio 1 u32 match ip dst {ip}/32 flowid {classid}" + ssh_client.exec_command(filter_command) + filter_stderr = stderr.read().decode() + if "Error" in filter_stderr: + print(f"Error adding filter for {ip}: {filter_stderr}. Aborting filter setup for this IP.") + + # Verify the setup + print(f"Verifying qdisc on {interface}") + verify_qdisc_command = f"sudo tc qdisc show dev {interface}" + stdin, stdout, stderr = ssh_client.exec_command(verify_qdisc_command) + print(f"Qdisc verification: {stdout.read().decode()}") + + print(f"Verifying filters on {interface}") + verify_filter_command = f"sudo tc filter show dev {interface}" + stdin, stdout, stderr = ssh_client.exec_command(verify_filter_command) + print(f"Filter verification: {stdout.read().decode()}") + + print("Finished applying latencies.") + except Exception as e: + print(f"An error occurred while applying latency: {str(e)}") + + +def apply_latency_global(ssh_client, interface, target_ips, latency, is_rtt): + stdin, stdout, stderr = ssh_client.exec_command("which tc") + if stdout.read().decode().strip() == "": + ssh_client.exec_command("sudo yum install -y iproute-tc") + time.sleep(2) + + # clear any existing qdiscs + ssh_client.exec_command(f"sudo tc qdisc del dev {interface} root 2> /dev/null") + # adjust latency for round-trip time if needed + + applied_latency = latency / 2 if is_rtt else latency + + # set up the root qdisc and netem + commands = [ + f"sudo tc qdisc add dev {interface} root handle 1: prio", + f"sudo tc qdisc add dev {interface} parent 1:3 handle 30: netem delay {applied_latency}ms" + ] + + # execute the initial commands + for command in commands: + stdin, stdout, stderr = ssh_client.exec_command(command) + stdout.channel.recv_exit_status() # Ensure command completes + print(f"Running: {command}") + print(stdout.read().decode()) + print(stderr.read().decode()) + + # apply the filter command for each IP individually + for ip in target_ips: + print(f"Adding latency for {ip}") + filter_command = f"sudo tc filter add dev {interface} protocol ip parent 1:0 prio 1 u32 match ip dst {ip}/32 flowid 1:3" + stdin, stdout, stderr = ssh_client.exec_command(filter_command) + stdout.channel.recv_exit_status() + print(f"Running: {filter_command}") + print(stdout.read().decode()) + print(stderr.read().decode()) + + +def configure_latencies(machine, all_hosts, target_latency, network_interface, rtt=False, latency_profiles=None): + groupId, private_ip, public_ip, user, key, type = machine + print(f"Configuring latencies for {public_ip}") + + target_ips = [] + if latency_profiles and 'relationships' in latency_profiles: + for profile in latency_profiles['relationships']: + if profile['source'] == groupId and profile['target'] != groupId: + target_ips.append((profile['target'], profile['latency'])) + print(f"Adding relationship: {profile['source']} -> {profile['target']} with {profile['latency']}ms latency.") + + if not target_ips: + print(f"No target IPs for {groupId}. Skipping latency configuration.") + return + + # Apply the latency only if this machine is the source and has targets + for idx, (target_group, latency) in enumerate(target_ips): + ips = [m[1] for m in all_hosts if m[0] == target_group] + print(f"Target IPs for group {target_group} from {groupId}: {ips}") + if ips: + ssh_client = paramiko.SSHClient() + ssh_client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) + try: + print(f"Connecting to {public_ip} to apply latency.") + ssh_client.connect(public_ip, username=user, key_filename=key) + + band_number = 10 + idx # Use a unique band number for each relationship + print(f"Applying latency between {groupId} and {target_group}: {latency}ms with band {band_number}") + apply_multi_latency(ssh_client, network_interface, ips, latency, band_number) + except Exception as e: + print(f"Error connecting to {public_ip}: {str(e)}") + finally: + ssh_client.close() + else: + print(f"No latency profiles found. Using global latency per group.") + ssh_client = paramiko.SSHClient() + ssh_client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) + target_ips = [] + if type == 'client': + # Clients should only add members in other data centers + target_ips.extend(m[1] for m in all_hosts if m[0] != groupId and m[5] == 'member') + elif type == 'member': + # Members should add both clients and other members in different data centers + target_ips.extend(m[1] for m in all_hosts if m[0] != groupId and m[5] == 'client') + target_ips.extend(m[1] for m in all_hosts if m[0] != groupId and m[5] == 'member') + try: + ssh_client.connect(public_ip, username=user, key_filename=key) + print(f"Applying fallback latency of {target_latency}ms.") + apply_latency_global(ssh_client, network_interface, target_ips, target_latency, rtt) + except Exception as e: + print(f"Error connecting to {public_ip}: {str(e)}") + finally: + ssh_client.close() + + +def inject_latencies(target_latency, network_interface, rtt, profiles_path=None): + yaml_file_path = 'inventory.yaml' + config = load_yaml(yaml_file_path) + latency_profiles = load_yaml(profiles_path) if profiles_path else None + + if latency_profiles and 'latency_profiles' in latency_profiles: + latency_profiles = latency_profiles['latency_profiles'] + print(f"Loaded latency profiles from {profiles_path}.") + else: + print(f"No latency profiles found at {profiles_path}. Using default latency.") + + if config: + all_hosts = [] + for group, host_type in [('loadgenerators', 'client'), ('nodes', 'member')]: + hosts = config.get(group, {}).get('hosts', {}) + for public_ip, details in hosts.items(): + private_ip = details['private_ip'] + group = details.get('group', 'default') + user = details['ansible_user'] + key = details['ansible_ssh_private_key_file'] + all_hosts.append((group, private_ip, public_ip, user, key, host_type)) + + with ThreadPoolExecutor(max_workers=5) as executor: + futures = [ + executor.submit(configure_latencies, machine, all_hosts, target_latency, network_interface, rtt, latency_profiles) + for machine in all_hosts + ] + for future in as_completed(futures): + try: + future.result() # If any thread throws an exception, it will be raised here + except Exception as e: + print(f"Error occurred during latency configuration: {str(e)}") diff --git a/src/inventory_cli.py b/src/inventory_cli.py index 58cd92d4a..c3c46cd9f 100644 --- a/src/inventory_cli.py +++ b/src/inventory_cli.py @@ -4,6 +4,7 @@ import argparse from os import path +from apply_latencies import inject_latencies from simulator.inventory_terraform import terraform_import, terraform_destroy, terraform_apply from simulator.util import load_yaml_file, exit_with_error, simulator_home, shell, now_seconds from simulator.log import info, log_header @@ -21,6 +22,7 @@ new_key Creates a new public/private keypair shell Executes a shell command on the inventory tune Tunes the environment + inject_latencies Injects latencies between inventory nodes ''' # for git like arg parsing: @@ -416,6 +418,28 @@ def __init__(self, argv): new_key(args.name[0]) +class InventoryInjectLatenciesCli: + + def __init__(self, argv): + parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter, + description='Injects latencies between inventory nodes') + parser.add_argument("--latency", help="The target latency to apply (in ms)", type=int) + parser.add_argument("--interface", default="eth0", help="Network interface to apply latency on (default: eth0)") + parser.add_argument("--rtt", action='store_true', help="Specify this flag to apply round-trip latency") + parser.add_argument("--profiles", help="Path to the latency profiles YAML file") + + args = parser.parse_args(argv) + + self.target_latency = args.latency + self.network_interface = args.interface + self.rtt = args.rtt + self.profiles = args.profiles + + log_header("Injecting Latencies") + inject_latencies(self.target_latency, self.network_interface, self.rtt, self.profiles) + log_header("Injecting Latencies: Done") + + class InventoryCli: def __init__(self): @@ -456,6 +480,9 @@ def new_key(self, argv): def tune(self, argv): TuneCli(argv) + def inject_latencies(self, argv): + InventoryInjectLatenciesCli(argv) + if __name__ == '__main__': InventoryCli()