generated from canonical/template-operator
-
Notifications
You must be signed in to change notification settings - Fork 12
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat: add checks for performance tuning settings (#89)
- Loading branch information
1 parent
b734741
commit b126c3d
Showing
10 changed files
with
545 additions
and
187 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Large diffs are not rendered by default.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,177 @@ | ||
#!/usr/bin/env python3 | ||
# Copyright 2023 Canonical Ltd. | ||
# See LICENSE file for licensing details. | ||
|
||
"""Manager for handling Kafka machine health.""" | ||
|
||
import json | ||
import logging | ||
import subprocess | ||
from statistics import mean | ||
from typing import TYPE_CHECKING, Tuple | ||
|
||
from ops.framework import Object | ||
|
||
if TYPE_CHECKING: | ||
from charm import KafkaCharm | ||
|
||
logger = logging.getLogger(__name__) | ||
|
||
|
||
class KafkaHealth(Object): | ||
"""Manager for handling Kafka machine health.""" | ||
|
||
def __init__(self, charm) -> None: | ||
super().__init__(charm, "kafka_health") | ||
self.charm: "KafkaCharm" = charm | ||
|
||
@property | ||
def _service_pid(self) -> int: | ||
"""Gets most recent Kafka service pid from the snap logs.""" | ||
return self.charm.snap.get_service_pid() | ||
|
||
def _get_current_memory_maps(self) -> int: | ||
"""Gets the current number of memory maps for the Kafka process.""" | ||
return int( | ||
subprocess.check_output( | ||
f"cat /proc/{self._service_pid}/maps | wc -l", | ||
shell=True, | ||
stderr=subprocess.PIPE, | ||
universal_newlines=True, | ||
) | ||
) | ||
|
||
def _get_current_max_files(self) -> int: | ||
"""Gets the current file descriptor limit for the Kafka process.""" | ||
return int( | ||
subprocess.check_output( | ||
rf"cat /proc/{self._service_pid}/limits | grep files | awk '{{print $5}}'", | ||
shell=True, | ||
stderr=subprocess.PIPE, | ||
universal_newlines=True, | ||
) | ||
) | ||
|
||
def _get_max_memory_maps(self) -> int: | ||
"""Gets the current memory map limit for the machine.""" | ||
return int( | ||
subprocess.check_output( | ||
"sysctl -n vm.max_map_count", | ||
shell=True, | ||
stderr=subprocess.PIPE, | ||
universal_newlines=True, | ||
) | ||
) | ||
|
||
def _get_vm_swappiness(self) -> int: | ||
"""Gets the current vm.swappiness configured for the machine.""" | ||
return int( | ||
subprocess.check_output( | ||
"sysctl -n vm.swappiness", | ||
shell=True, | ||
stderr=subprocess.PIPE, | ||
universal_newlines=True, | ||
) | ||
) | ||
|
||
def _get_partitions_size(self) -> Tuple[int, int]: | ||
"""Gets the number of partitions and their average size from the log dirs.""" | ||
log_dirs_command = [ | ||
"--describe", | ||
f"--bootstrap-server {','.join(self.charm.kafka_config.bootstrap_server)}", | ||
f"--command-config {self.charm.kafka_config.client_properties_filepath}", | ||
] | ||
log_dirs = self.charm.snap.run_bin_command( | ||
bin_keyword="log-dirs", bin_args=log_dirs_command | ||
) | ||
|
||
dirs = {} | ||
for line in log_dirs.splitlines(): | ||
try: | ||
# filters stdout to only relevant lines | ||
dirs = json.loads(line) | ||
break | ||
except json.decoder.JSONDecodeError: | ||
continue | ||
|
||
if not dirs: | ||
return (0, 0) | ||
|
||
partitions = [] | ||
sizes = [] | ||
for broker in dirs["brokers"]: | ||
for log_dir in broker["logDirs"]: | ||
for partition in log_dir["partitions"]: | ||
partitions.append(partition["partition"]) | ||
sizes.append(int(partition["size"])) | ||
|
||
if not sizes or not partitions: | ||
return (0, 0) | ||
|
||
average_partition_size = mean(sizes) | ||
total_partitions = len(partitions) | ||
|
||
return (total_partitions, average_partition_size) | ||
|
||
def _check_memory_maps(self) -> bool: | ||
"""Checks that the number of used memory maps is not approaching threshold.""" | ||
max_maps = self._get_max_memory_maps() | ||
current_maps = self._get_current_memory_maps() | ||
|
||
# eyeballing warning if 80% used, can be changed | ||
if max_maps * 0.8 <= current_maps: | ||
logger.warning( | ||
f"number of Kafka memory maps {current_maps} is approaching limit of {max_maps} - increase /etc/sysctl.conf vm.max_map_count limit and restart machine" | ||
) | ||
return False | ||
|
||
return True | ||
|
||
def _check_file_descriptors(self) -> bool: | ||
"""Checks that the number of used file descriptors is not approaching threshold.""" | ||
if not self.charm.kafka_config.client_listeners: | ||
return True | ||
|
||
total_partitions, average_partition_size = self._get_partitions_size() | ||
segment_size = int(self.charm.config["log_segment_bytes"]) | ||
|
||
minimum_fd_limit = total_partitions * (average_partition_size / segment_size) | ||
current_max_files = self._get_current_max_files() | ||
|
||
# eyeballing warning if 80% used, can be changed | ||
if current_max_files * 0.8 <= minimum_fd_limit: | ||
logger.warning( | ||
f"number of required Kafka file descriptors {minimum_fd_limit} is approaching limit of {current_max_files} - increase /etc/security/limits.d/root.conf limit and restart machine" | ||
) | ||
return False | ||
|
||
return True | ||
|
||
def _check_vm_swappiness(self) -> bool: | ||
"""Checks that vm.swappiness is configured correctly on the machine.""" | ||
vm_swappiness = self._get_vm_swappiness() | ||
|
||
if vm_swappiness > 1: | ||
logger.error( | ||
f"machine vm.swappiness setting of {vm_swappiness} is higher than 1 - set /etc/syscl.conf vm.swappiness=1 and restart machine" | ||
) | ||
return False | ||
|
||
return True | ||
|
||
def machine_configured(self) -> bool: | ||
"""Checks machine configuration for healthy settings. | ||
Returns: | ||
True if settings safely configured. Otherwise False | ||
""" | ||
if not all( | ||
[ | ||
self._check_memory_maps(), | ||
self._check_file_descriptors(), | ||
self._check_vm_swappiness(), | ||
] | ||
): | ||
return False | ||
|
||
return True |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.