Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added latency injection functionality #2230

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
103 changes: 102 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -1730,8 +1730,109 @@ but on the server side `pps_allowance_exceeded` might show 0 events/s.

For any pair of instances A and B, it is advised to run the PPS test for both A and B as the server.
This ensure a clear picture of all the PPS limits across instances.
___
### Injecting latencies

### CP subsystem leader priority
It's often useful to simulate the effects of network latency between nodes and clients, especially in scenarios where you want to test performance across multiple data centers (DCs) or regions.

Simulator provides a convenience command to inject network latencies between groups of clients and nodes using the inventory inject_latencies command.
This allows you to simulate different network conditions in a simple setup.

There are two methods to inject latencies:

1. Basic global assignment: Assign a single latency value that will be applied bi-directionally between groups.
2. Advanced group assignment: Assign different latency values between different groups.

For both methods you must have created your machines using the `inventory apply` command and edit the generated
`inventory.yaml`in your project root to assign each host to a group.

```yaml
loadgenerators:
hosts:
3.121.207.133:
ansible_ssh_private_key_file: key
ansible_user: ec2-user
private_ip: 10.0.55.38
group: group1

nodes:
hosts:
3.122.199.101:
ansible_ssh_private_key_file: key
ansible_user: ec2-user
private_ip: 10.0.44.25
group: group2
```
In this example, we have two hosts, each belonging to a different group (group1 and group2). Latencies will only be applied between different groups.
If a host is not assigned a group, no direction of latency will be applied to that host.

Note that the latency is applied to the outbound traffic from a source group to the target group.
___
**Method 1: Basic Global Assignment**

Once your hosts are grouped, you can inject latencies by running the following command:

```bash
inventory inject_latencies --latency 10 --interface eth0 --rtt
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there are 2 possible approaches to defining the latencies: globally (independent on test) and as part of test setup (in tests.yaml / test case definition). Both have pros and cons. I am curious why did you choose this approach?

Copy link
Contributor Author

@gareth-johnston gareth-johnston Oct 14, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mainly because it was the simplest approach, in most cases if we are testing a use case with latencies applied I feel like they are going to be fairly static arrangements. If you want to test with different latencies you can just run another test suite with machines having different latencies applied.

That said, I thought about the benefit of having it per test case definition, it is a superior implementation if we can do both - but don't have the time to allocate to this as it isn't scheduled work. It is definitely a possible improvement though.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

simplicity of implementation and limited time allocated are perfectly good explanation :)

I was wondering which way is more convenient in practice. Definition in test case makes it easily repeatable, but will we actually use it, or will that become something we only copy-paste over and over again?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps we can extend it in the future if we such value? In practice I see myself using this per test class, not case. But fore example it might be more useful in the future with the automated performance tests we that work commences.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how can I disable the latency injection when no longer needed without recreating the environment? inventory inject_latencies --latency 0 or something more complicated? Maybe it is worth adding dedicated command or option?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, you can just do as you say. Same to overwrite existing form. But I agree, a command would be useful to just wipe the state.

````
This command will:

Apply a round-trip time (RTT) latency of 10ms between hosts in different groups (e.g., between group1 and group2).
No latency will be applied between hosts in the same group (e.g., clients or members within group1 or group2 will communicate without added delay).

Explanation of Flags:

`--latency`: The desired latency in milliseconds (in this case, 10ms).

`--interface`: The network interface where the latency should be applied (default is eth0).

`--rtt`: When set, this flag indicates the latency is a round-trip time (RTT), meaning 10ms latency will be split into 5ms each way.
Otherwise, 10ms will be applied in each direction.

___
**Method 2: Advanced Group Assignment**

If you want to assign different latencies between different groups, you can do so by supplying a latency profile in a YAML file.

Create a YAML file with the following format:


```yaml
latency_profiles:
relationships:
- source: group1
target: group2
latency: 5
- source: group2
target: group1
latency: 2
default_latency: 0
```

In this example, we have two groups (group1 and group2) with different latencies between them.
Communication from group1 to group2 will have a 5ms latency, while communication from group2 to group1 will have a 2ms latency.

The default_latency serves as a fallback value in case no specific latency is defined between two groups.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is default_latency required, or can it be optionally omitted?


`source`: The group of nodes where the latency will be applied to outgoing traffic.
`target`: The group of nodes to which the outgoing traffic from the source group will experience a delay.

To apply these latencies, run the following command:

```bash
inventory inject_latencies --interface eth0 --profiles latency_profile.yaml
````

Explanation of Flags:

`--interface`: The network interface where the latency should be applied (default is eth0).

`--profiles`: The path to the YAML file containing the latency profiles.

Note that if the latency profile file is not provided, the command will default to the basic global assignment method.
---

## CP subsystem leader priority

It is possible to assign leadership priority to a member or list of members in a CP group(s). This is useful when
you want to attribute certain behaviours to an agent in the cluster. For example, you may wish to inject a latency
Expand Down
201 changes: 201 additions & 0 deletions src/apply_latencies.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,201 @@
import yaml
import paramiko
import time
from concurrent.futures import ThreadPoolExecutor, as_completed


def load_yaml(file_path):
with open(file_path, 'r') as stream:
try:
return yaml.safe_load(stream)
except yaml.YAMLError as exc:
print(f"Error loading YAML file: {exc}")
return None


def check_package_manager(ssh_client):
stdin, stdout, stderr = ssh_client.exec_command("which yum")
if stdout.read().decode().strip():
return "yum"
stdin, stdout, stderr = ssh_client.exec_command("which apt-get")
if stdout.read().decode().strip():
return "apt"
return None


def apply_multi_latency(ssh_client, interface, target_ips, latency, band_number):
stdin, stdout, stderr = ssh_client.exec_command("which tc")
if not stdout.read().decode().strip():
install_command = (
"sudo yum install -y iproute-tc" if "yum" in check_package_manager(ssh_client)
else "sudo apt-get update && sudo apt-get install -y iproute2"
)
ssh_client.exec_command(install_command)
time.sleep(2)

try:
# Clear any existing qdiscs
replace_command = f"sudo tc qdisc replace dev {interface} root handle 1: htb"
ssh_client.exec_command(replace_command)

# Create a class for the specific latency profile
classid = f"1:{band_number}"
print(f"Adding class {classid} with {latency}ms delay")
class_command = f"sudo tc class add dev {interface} parent 1: classid {classid} htb rate 100mbit"
ssh_client.exec_command(class_command)

# Adjust latency for round-trip time if needed
applied_latency_string = f"{latency}ms"
print(f"Attaching netem to class {classid} with {applied_latency_string} latency.")
netem_command = f"sudo tc qdisc add dev {interface} parent {classid} handle {band_number}0: netem delay {applied_latency_string}"
ssh_client.exec_command(netem_command)

# Apply u32 filters for each IP
for ip in target_ips:
print(f"Adding filter for {ip} with flowid {classid}")
filter_command = f"sudo tc filter add dev {interface} protocol ip parent 1: prio 1 u32 match ip dst {ip}/32 flowid {classid}"
ssh_client.exec_command(filter_command)
filter_stderr = stderr.read().decode()
if "Error" in filter_stderr:
print(f"Error adding filter for {ip}: {filter_stderr}. Aborting filter setup for this IP.")

# Verify the setup
print(f"Verifying qdisc on {interface}")
verify_qdisc_command = f"sudo tc qdisc show dev {interface}"
stdin, stdout, stderr = ssh_client.exec_command(verify_qdisc_command)
print(f"Qdisc verification: {stdout.read().decode()}")

print(f"Verifying filters on {interface}")
verify_filter_command = f"sudo tc filter show dev {interface}"
stdin, stdout, stderr = ssh_client.exec_command(verify_filter_command)
print(f"Filter verification: {stdout.read().decode()}")

print("Finished applying latencies.")
except Exception as e:
print(f"An error occurred while applying latency: {str(e)}")


def apply_latency_global(ssh_client, interface, target_ips, latency, is_rtt):
stdin, stdout, stderr = ssh_client.exec_command("which tc")
if stdout.read().decode().strip() == "":
ssh_client.exec_command("sudo yum install -y iproute-tc")
time.sleep(2)

# clear any existing qdiscs
ssh_client.exec_command(f"sudo tc qdisc del dev {interface} root 2> /dev/null")
# adjust latency for round-trip time if needed

applied_latency = latency / 2 if is_rtt else latency

# set up the root qdisc and netem
commands = [
f"sudo tc qdisc add dev {interface} root handle 1: prio",
f"sudo tc qdisc add dev {interface} parent 1:3 handle 30: netem delay {applied_latency}ms"
]

# execute the initial commands
for command in commands:
stdin, stdout, stderr = ssh_client.exec_command(command)
stdout.channel.recv_exit_status() # Ensure command completes
print(f"Running: {command}")
print(stdout.read().decode())
print(stderr.read().decode())

# apply the filter command for each IP individually
for ip in target_ips:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should be easier and cleaner with ansible (similar to inventory tune). why not use it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Didn't consider it, wasn't really aware of the option if I am honest : )

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not much of a functional difference, definitely cleaner but they both achieve the same result and at the moment I think in the current form this commit adds value by reducing setup time/ human error enough that it is worth it to go ahead and migrate it later. If you think it wouldn't take too much time to change it to use ansible I am more than happy to look into it during a cool down (along with your other comment)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

using Ansible will make this more consistent with rest of simulator, more robust and easier to maintain. Ansible has some learning curve, but IMO it is worth learning.
with ansible you will have some challenges with translating custom yaml to node-specific configs/scripts, but Ansible has some extensive templating and programming capabilities.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dependency installation should be very easy with ansible (we already have playbooks that do it).
the configuration commands are a simple loop over some config: should be rather easy in ansible

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good. Let's leave this here for now, I will look into ansible and translate it across, thank you foir the valuable input 👍

print(f"Adding latency for {ip}")
filter_command = f"sudo tc filter add dev {interface} protocol ip parent 1:0 prio 1 u32 match ip dst {ip}/32 flowid 1:3"
stdin, stdout, stderr = ssh_client.exec_command(filter_command)
stdout.channel.recv_exit_status()
print(f"Running: {filter_command}")
print(stdout.read().decode())
print(stderr.read().decode())


def configure_latencies(machine, all_hosts, target_latency, network_interface, rtt=False, latency_profiles=None):
groupId, private_ip, public_ip, user, key, type = machine
print(f"Configuring latencies for {public_ip}")

target_ips = []
if latency_profiles and 'relationships' in latency_profiles:
for profile in latency_profiles['relationships']:
if profile['source'] == groupId and profile['target'] != groupId:
target_ips.append((profile['target'], profile['latency']))
print(f"Adding relationship: {profile['source']} -> {profile['target']} with {profile['latency']}ms latency.")

if not target_ips:
print(f"No target IPs for {groupId}. Skipping latency configuration.")
return

# Apply the latency only if this machine is the source and has targets
for idx, (target_group, latency) in enumerate(target_ips):
ips = [m[1] for m in all_hosts if m[0] == target_group]
print(f"Target IPs for group {target_group} from {groupId}: {ips}")
if ips:
ssh_client = paramiko.SSHClient()
ssh_client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
try:
print(f"Connecting to {public_ip} to apply latency.")
ssh_client.connect(public_ip, username=user, key_filename=key)

band_number = 10 + idx # Use a unique band number for each relationship
print(f"Applying latency between {groupId} and {target_group}: {latency}ms with band {band_number}")
apply_multi_latency(ssh_client, network_interface, ips, latency, band_number)
except Exception as e:
print(f"Error connecting to {public_ip}: {str(e)}")
finally:
ssh_client.close()
else:
print(f"No latency profiles found. Using global latency per group.")
ssh_client = paramiko.SSHClient()
ssh_client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
target_ips = []
if type == 'client':
# Clients should only add members in other data centers
target_ips.extend(m[1] for m in all_hosts if m[0] != groupId and m[5] == 'member')
elif type == 'member':
# Members should add both clients and other members in different data centers
target_ips.extend(m[1] for m in all_hosts if m[0] != groupId and m[5] == 'client')
target_ips.extend(m[1] for m in all_hosts if m[0] != groupId and m[5] == 'member')
try:
ssh_client.connect(public_ip, username=user, key_filename=key)
print(f"Applying fallback latency of {target_latency}ms.")
apply_latency_global(ssh_client, network_interface, target_ips, target_latency, rtt)
except Exception as e:
print(f"Error connecting to {public_ip}: {str(e)}")
finally:
ssh_client.close()


def inject_latencies(target_latency, network_interface, rtt, profiles_path=None):
yaml_file_path = 'inventory.yaml'
config = load_yaml(yaml_file_path)
latency_profiles = load_yaml(profiles_path) if profiles_path else None

if latency_profiles and 'latency_profiles' in latency_profiles:
latency_profiles = latency_profiles['latency_profiles']
print(f"Loaded latency profiles from {profiles_path}.")
else:
print(f"No latency profiles found at {profiles_path}. Using default latency.")

if config:
all_hosts = []
for group, host_type in [('loadgenerators', 'client'), ('nodes', 'member')]:
hosts = config.get(group, {}).get('hosts', {})
for public_ip, details in hosts.items():
private_ip = details['private_ip']
group = details.get('group', 'default')
user = details['ansible_user']
key = details['ansible_ssh_private_key_file']
all_hosts.append((group, private_ip, public_ip, user, key, host_type))

with ThreadPoolExecutor(max_workers=5) as executor:
futures = [
executor.submit(configure_latencies, machine, all_hosts, target_latency, network_interface, rtt, latency_profiles)
for machine in all_hosts
]
for future in as_completed(futures):
try:
future.result() # If any thread throws an exception, it will be raised here
except Exception as e:
print(f"Error occurred during latency configuration: {str(e)}")
27 changes: 27 additions & 0 deletions src/inventory_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import argparse
from os import path

from apply_latencies import inject_latencies
from simulator.inventory_terraform import terraform_import, terraform_destroy, terraform_apply
from simulator.util import load_yaml_file, exit_with_error, simulator_home, shell, now_seconds
from simulator.log import info, log_header
Expand All @@ -21,6 +22,7 @@
new_key Creates a new public/private keypair
shell Executes a shell command on the inventory
tune Tunes the environment
inject_latencies Injects latencies between inventory nodes
'''

# for git like arg parsing:
Expand Down Expand Up @@ -416,6 +418,28 @@ def __init__(self, argv):
new_key(args.name[0])


class InventoryInjectLatenciesCli:

def __init__(self, argv):
parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter,
description='Injects latencies between inventory nodes')
parser.add_argument("--latency", help="The target latency to apply (in ms)", type=int)
parser.add_argument("--interface", default="eth0", help="Network interface to apply latency on (default: eth0)")
parser.add_argument("--rtt", action='store_true', help="Specify this flag to apply round-trip latency")
parser.add_argument("--profiles", help="Path to the latency profiles YAML file")

args = parser.parse_args(argv)

self.target_latency = args.latency
self.network_interface = args.interface
self.rtt = args.rtt
self.profiles = args.profiles

log_header("Injecting Latencies")
inject_latencies(self.target_latency, self.network_interface, self.rtt, self.profiles)
log_header("Injecting Latencies: Done")


class InventoryCli:

def __init__(self):
Expand Down Expand Up @@ -456,6 +480,9 @@ def new_key(self, argv):
def tune(self, argv):
TuneCli(argv)

def inject_latencies(self, argv):
InventoryInjectLatenciesCli(argv)


if __name__ == '__main__':
InventoryCli()