Skip to content

Commit

Permalink
[DPE-2196] Log Level Config Option (#159)
Browse files Browse the repository at this point in the history
  • Loading branch information
deusebio authored Jan 3, 2024
1 parent 2907f4d commit 6323a1c
Show file tree
Hide file tree
Showing 15 changed files with 161 additions and 38 deletions.
4 changes: 4 additions & 0 deletions config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -86,3 +86,7 @@ options:
description: Config options to add extra-sans to the ones used when requesting server certificates. The extra-sans are specified by comma-separated names to be added when requesting signed certificates. Use "{unit}" as a placeholder to be filled with the unit number, e.g. "worker-{unit}" will be translated as "worker-0" for unit 0 and "worker-1" for unit 1 when requesting the certificate.
type: string
default: ""
log_level:
description: 'Level of logging for the different components operated by the charm. Possible values: ERROR, WARNING, INFO, DEBUG'
type: string
default: "INFO"
5 changes: 4 additions & 1 deletion src/charm.py
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,8 @@ def _on_update_status(self, event: EventBase) -> None:
if not self.health.machine_configured():
self._set_status(Status.SYSCONF_NOT_OPTIMAL)
return
except SnapError:
except SnapError as e:
logger.debug(f"Error: {e}")
self._set_status(Status.SNAP_NOT_RUNNING)
return

Expand Down Expand Up @@ -354,6 +355,8 @@ def _on_config_changed(self, event: EventBase) -> None:
)
self.kafka_config.set_server_properties()

self.kafka_config.set_environment()

if isinstance(event, StorageEvent): # to get new storages
self.on[f"{self.restart.name}"].acquire_lock.emit(
callback_override="_disable_enable_restart"
Expand Down
35 changes: 24 additions & 11 deletions src/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
AuthMechanism,
Scope,
)
from structured_config import LogLevel
from utils import map_env, safe_get_file, safe_write_to_file, update_env

if TYPE_CHECKING:
Expand All @@ -38,6 +39,8 @@
auto.create.topics.enable=false
"""

SERVER_PROPERTIES_BLACKLIST = ["profile", "log_level", "certificate_extra_sans"]


class Listener:
"""Definition of a listener.
Expand Down Expand Up @@ -111,9 +114,8 @@ def __init__(self, charm):
self.zk_jaas_filepath = f"{self.charm.snap.CONF_PATH}/zookeeper-jaas.cfg"
self.keystore_filepath = f"{self.charm.snap.CONF_PATH}/keystore.p12"
self.truststore_filepath = f"{self.charm.snap.CONF_PATH}/truststore.jks"
self.log4j_properties_filepath = f"{self.charm.snap.CONF_PATH}/log4j.properties"
self.jmx_prometheus_javaagent_filepath = (
f"{self.charm.snap.BINARIES_PATH}/jmx_prometheus_javaagent.jar"
f"{self.charm.snap.BINARIES_PATH}/libs/jmx_prometheus_javaagent.jar"
)
self.jmx_prometheus_config_filepath = f"{self.charm.snap.CONF_PATH}/jmx_prometheus.yaml"

Expand Down Expand Up @@ -194,15 +196,16 @@ def zookeeper_connected(self) -> bool:
return False

@property
def log4j_opts(self) -> str:
"""The Java config options for specifying log4j properties.
def log_level(self) -> str:
"""Return the Java-compliant logging level set by the user.
Returns:
String of Java config options
String with these possible values: DEBUG, INFO, WARN, ERROR
"""
opts = [f"-Dlog4j.configuration=file:{self.log4j_properties_filepath}"]

return f"KAFKA_LOG4J_OPTS='{' '.join(opts)}'"
# Remapping to WARN that is generally used in Java applications based on log4j and logback.
if self.charm.config.log_level == LogLevel.WARNING.value:
return "WARN"
return self.charm.config.log_level

@property
def jmx_opts(self) -> str:
Expand Down Expand Up @@ -263,6 +266,7 @@ def kafka_opts(self) -> str:
"""
opts = [
f"-Djava.security.auth.login.config={self.zk_jaas_filepath}",
f"-Dcharmed.kafka.log.level={self.log_level}",
]

return f"KAFKA_OPTS='{' '.join(opts)}'"
Expand Down Expand Up @@ -489,7 +493,8 @@ def client_properties(self) -> List[str]:
client_properties = [
f'sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="{username}" password="{password}";',
"sasl.mechanism=SCRAM-SHA-512",
f"security.protocol={self.security_protocol}", # FIXME: will need changing once multiple listener auth schemes
f"security.protocol={self.security_protocol}",
# FIXME: security.protocol will need changing once multiple listener auth schemes
f"bootstrap.servers={','.join(self.bootstrap_server)}",
]

Expand Down Expand Up @@ -537,11 +542,20 @@ def server_properties(self) -> List[str]:

return properties

@staticmethod
def _translate_config_key(key):
"""Format config names into server properties, blacklisted property are commented out.
Returns:
String with Kafka configuration name to be placed in the server.properties file
"""
return key.replace("_", ".") if key not in SERVER_PROPERTIES_BLACKLIST else f"# {key}"

@property
def config_properties(self) -> List[str]:
"""Configure server properties from config."""
return [
f"{conf_key.replace('_', '.')}={str(value)}"
f"{self._translate_config_key(conf_key)}={str(value)}"
for conf_key, value in self.charm.config.dict().items()
if value is not None
]
Expand Down Expand Up @@ -578,7 +592,6 @@ def set_environment(self) -> None:
updated_env_list = [
self.kafka_opts,
self.jmx_opts,
self.log4j_opts,
self.jvm_performance_opts,
self.heap_opts,
]
Expand Down
2 changes: 1 addition & 1 deletion src/literals.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@

CHARM_KEY = "kafka"
SNAP_NAME = "charmed-kafka"
CHARMED_KAFKA_SNAP_REVISION = 24
CHARMED_KAFKA_SNAP_REVISION = 30

PEER = "cluster"
ZK = "zookeeper"
Expand Down
21 changes: 15 additions & 6 deletions src/snap.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
"""KafkaSnap class and methods."""

import logging
import re
import subprocess
from typing import List

Expand Down Expand Up @@ -140,13 +139,23 @@ def get_service_pid(self) -> int:
Raises:
SnapError if error occurs or if no pid string found in most recent log
"""
last_log = self.kafka.logs(services=[self.SNAP_SERVICE], num_lines=1)
pid_string = re.search(rf"{SNAP_NAME}.{self.SNAP_SERVICE}\[([0-9]+)\]", last_log)
java_processes = subprocess.check_output(
"pidof java", stderr=subprocess.PIPE, universal_newlines=True, shell=True
)

if not pid_string:
raise snap.SnapError("pid not found in snap logs")
logger.debug(f"Java processes: {java_processes}")

return int(pid_string[1])
for pid in java_processes.split():
with open(f"/proc/{pid}/cgroup", "r") as fid:
content = "".join(fid.readlines())

if f"{self.SNAP_NAME}.{self.SNAP_SERVICE}" in content:
logger.debug(
f"Found Snap service {self.SNAP_SERVICE} for {self.SNAP_NAME} with PID {pid}"
)
return int(pid)

raise snap.SnapError(f"Snap {self.SNAP_NAME} pid not found")

@staticmethod
def run_bin_command(bin_keyword: str, bin_args: List[str], opts: List[str] = []) -> str:
Expand Down
22 changes: 22 additions & 0 deletions src/structured_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,15 @@ class CompressionType(str, Enum):
PRODUCER = "producer"


class LogLevel(str, Enum):
"""Enum for the `log_level` field."""

INFO = "INFO"
WARNING = "WARNING"
ERROR = "ERROR"
DEBUG = "DEBUG"


class CharmConfig(BaseConfigModel):
"""Manager for the structured configuration."""

Expand All @@ -63,6 +72,7 @@ class CharmConfig(BaseConfigModel):
zookeeper_ssl_cipher_suites: Optional[str]
profile: str
certificate_extra_sans: Optional[str]
log_level: str

@validator("*", pre=True)
@classmethod
Expand Down Expand Up @@ -214,3 +224,15 @@ def profile_values(cls, value: str) -> Optional[str]:
raise ValueError("Value not one of 'testing', 'staging' or 'production'")

return value

@validator("log_level")
@classmethod
def log_level_values(cls, value: str) -> Optional[str]:
"""Check validity of `log_level` field."""
try:
_log_level = LogLevel(value)
except Exception as e:
raise ValueError(
f"Value out of the accepted values. Could not properly parsed the roles configuration: {e}"
)
return value
2 changes: 2 additions & 0 deletions src/upgrade.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,8 @@ def _on_upgrade_granted(self, event: UpgradeGrantedEvent) -> None:
self.set_unit_failed()
return

self.charm.kafka_config.set_environment()

logger.info(f"{self.charm.unit.name} upgrading service...")
self.charm.snap.restart_snap_service()

Expand Down
8 changes: 6 additions & 2 deletions src/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,10 @@ def get_env() -> dict[str, str]:

def update_env(env: dict[str, str]) -> None:
"""Updates /etc/environment file."""
updated_env = get_env() | env
content = "\n".join([f"{key}={value}" for key, value in updated_env.items()])
current_env = get_env()
if not env or current_env == env:
return

updated_env = current_env | env
content = "\n".join([f"{key}={value}" for key, value in updated_env.items() if value])
safe_write_to_file(content=content, path="/etc/environment", mode="w")
11 changes: 11 additions & 0 deletions tests/integration/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -356,3 +356,14 @@ async def set_mtls_client_acls(ops_test: OpsTest, bootstrap_server: str) -> str:
)

return result


def count_lines_with(model_full_name: str, unit: str, file: str, pattern: str) -> int:
result = check_output(
f"JUJU_MODEL={model_full_name} juju ssh {unit} sudo -i 'grep \"{pattern}\" {file} | wc -l'",
stderr=PIPE,
shell=True,
universal_newlines=True,
)

return int(result)
39 changes: 39 additions & 0 deletions tests/integration/test_charm.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
REL_NAME_ADMIN,
ZK_NAME,
check_socket,
count_lines_with,
get_address,
produce_and_check_logs,
run_client_properties,
Expand Down Expand Up @@ -187,6 +188,44 @@ async def test_exporter_endpoints(ops_test: OpsTest):
assert jmx_resp.ok


@pytest.mark.abort_on_fail
async def test_log_level_change(ops_test: OpsTest):

for unit in ops_test.model.applications[APP_NAME].units:
assert (
count_lines_with(
ops_test.model_full_name,
unit.name,
"/var/snap/charmed-kafka/common/var/log/kafka/server.log",
"DEBUG",
)
== 0
)

await ops_test.model.applications[APP_NAME].set_config({"log_level": "DEBUG"})

await ops_test.model.wait_for_idle(
apps=[APP_NAME], status="active", timeout=1000, idle_period=30
)

for unit in ops_test.model.applications[APP_NAME].units:
assert (
count_lines_with(
ops_test.model_full_name,
unit.name,
"/var/snap/charmed-kafka/common/var/log/kafka/server.log",
"DEBUG",
)
> 0
)

await ops_test.model.applications[APP_NAME].set_config({"log_level": "INFO"})

await ops_test.model.wait_for_idle(
apps=[APP_NAME], status="active", timeout=1000, idle_period=30
)


@pytest.mark.abort_on_fail
@pytest.mark.skip # skipping as we can't add storage without losing Juju conn
async def test_logs_write_to_new_storage(ops_test: OpsTest):
Expand Down
11 changes: 7 additions & 4 deletions tests/unit/test_charm.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from ops.testing import Harness
from tenacity.wait import wait_none

import snap
from charm import KafkaCharm
from literals import CHARM_KEY, INTERNAL_USERS, OS_REQUIREMENTS, PEER, REL_NAME, ZK

Expand Down Expand Up @@ -174,10 +175,7 @@ def test_update_status_blocks_if_no_service(harness, zk_data, passwords_data):
harness.update_relation_data(peer_rel_id, CHARM_KEY, passwords_data)

with (
patch(
"snap.snap.Snap.logs",
return_value="2023-04-13T13:11:43+01:00 juju.fetch-oci[840]: /usr/bin/timeout",
),
patch("health.KafkaHealth.machine_configured", side_effect=snap.snap.SnapError()),
patch("charm.KafkaCharm.healthy", return_value=True),
patch("charm.broker_active", return_value=True),
patch("upgrade.KafkaUpgrade.idle", return_value=True),
Expand Down Expand Up @@ -263,6 +261,7 @@ def test_storage_add_disableenables_and_starts(harness, zk_data, passwords_data)
patch("upgrade.KafkaUpgrade.idle", return_value=True),
patch("config.KafkaConfig.set_server_properties"),
patch("config.KafkaConfig.set_client_properties"),
patch("config.KafkaConfig.set_environment"),
patch("charm.safe_get_file", return_value=["gandalf=grey"]),
patch("snap.KafkaSnap.disable_enable") as patched_disable_enable,
patch("snap.KafkaSnap.start_snap_service") as patched_start,
Expand Down Expand Up @@ -295,6 +294,7 @@ def test_storage_detaching_disableenables_and_starts(harness, zk_data, passwords
patch("upgrade.KafkaUpgrade.idle", return_value=True),
patch("config.KafkaConfig.set_server_properties"),
patch("config.KafkaConfig.set_client_properties"),
patch("config.KafkaConfig.set_environment"),
patch("charm.safe_get_file", return_value=["gandalf=grey"]),
patch("snap.KafkaSnap.disable_enable") as patched_disable_enable,
patch("snap.KafkaSnap.start_snap_service") as patched_start,
Expand Down Expand Up @@ -551,6 +551,7 @@ def test_config_changed_updates_server_properties(harness):
patch("charm.safe_get_file", return_value=["gandalf=grey"]),
patch("config.KafkaConfig.set_server_properties") as set_server_properties,
patch("config.KafkaConfig.set_client_properties"),
patch("config.KafkaConfig.set_environment"),
):
harness.charm.on.config_changed.emit()

Expand Down Expand Up @@ -579,6 +580,7 @@ def test_config_changed_updates_client_properties(harness):
patch("charm.safe_get_file", return_value=["gandalf=grey"]),
patch("config.KafkaConfig.set_server_properties"),
patch("config.KafkaConfig.set_client_properties") as set_client_properties,
patch("config.KafkaConfig.set_environment"),
):
harness.charm.on.config_changed.emit()

Expand Down Expand Up @@ -625,6 +627,7 @@ def test_config_changed_restarts(harness):
new_callable=PropertyMock,
return_value=["gandalf=grey"],
),
patch("config.KafkaConfig.set_environment"),
patch("charm.KafkaCharm.ready_to_start", new_callable=PropertyMock, return_value=True),
patch("charm.KafkaCharm.healthy", new_callable=PropertyMock, return_value=True),
patch("charm.safe_get_file", return_value=["gandalf=white"]),
Expand Down
7 changes: 0 additions & 7 deletions tests/unit/test_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -246,13 +246,6 @@ def test_heap_opts(harness, profile, expected):
assert "KAFKA_HEAP_OPTS" in args


def test_log4j_opts(harness):
"""Checks necessary args for KAFKA_LOG4J_OPTS."""
args = harness.charm.kafka_config.log4j_opts
assert "-Dlog4j.configuration=file:" in args
assert "KAFKA_LOG4J_OPTS" in args


def test_jmx_opts(harness):
"""Checks necessary args for KAFKA_JMX_OPTS."""
args = harness.charm.kafka_config.jmx_opts
Expand Down
Loading

0 comments on commit 6323a1c

Please sign in to comment.