From ac09d1efd51f6d8bc00adddd2e7720c2cb3ff481 Mon Sep 17 00:00:00 2001 From: Yury-Fridlyand Date: Wed, 25 Sep 2024 08:23:44 -0700 Subject: [PATCH] update script --- .../tests/Integration/IntegrationTestBase.cs | 2 +- go/integTest/glide_test_suite_test.go | 4 +- java/integTest/build.gradle | 4 +- utils/cluster_manager.py | 179 +++++++++--------- 4 files changed, 96 insertions(+), 93 deletions(-) diff --git a/csharp/tests/Integration/IntegrationTestBase.cs b/csharp/tests/Integration/IntegrationTestBase.cs index 10d9872c4f..6df910175c 100644 --- a/csharp/tests/Integration/IntegrationTestBase.cs +++ b/csharp/tests/Integration/IntegrationTestBase.cs @@ -73,7 +73,7 @@ internal List StartRedis(bool cluster, bool tls = false, string? name = nu /// internal void StopRedis(bool keepLogs, string? name = null) { - string cmd = $"stop --prefix {name ?? "redis-cluster"} {(keepLogs ? "--keep-folder" : "")}"; + string cmd = $"stop --prefix {name ?? "cluster"} {(keepLogs ? "--keep-folder" : "")}"; _ = RunClusterManager(cmd, true); } diff --git a/go/integTest/glide_test_suite_test.go b/go/integTest/glide_test_suite_test.go index dddfd3d297..d7dae0a604 100644 --- a/go/integTest/glide_test_suite_test.go +++ b/go/integTest/glide_test_suite_test.go @@ -29,7 +29,7 @@ type GlideTestSuite struct { func (suite *GlideTestSuite) SetupSuite() { // Stop cluster in case previous test run was interrupted or crashed and didn't stop. // If an error occurs, we ignore it in case the servers actually were stopped before running this. - runClusterManager(suite, []string{"stop", "--prefix", "redis-cluster"}, true) + runClusterManager(suite, []string{"stop", "--prefix", "cluster"}, true) // Delete dirs if stop failed due to https://github.com/valkey-io/valkey-glide/issues/849 err := os.RemoveAll("../../utils/clusters") @@ -122,7 +122,7 @@ func TestGlideTestSuite(t *testing.T) { } func (suite *GlideTestSuite) TearDownSuite() { - runClusterManager(suite, []string{"stop", "--prefix", "redis-cluster", "--keep-folder"}, false) + runClusterManager(suite, []string{"stop", "--prefix", "cluster", "--keep-folder"}, false) } func (suite *GlideTestSuite) TearDownTest() { diff --git a/java/integTest/build.gradle b/java/integTest/build.gradle index 2b56978a08..0fd09dc4f8 100644 --- a/java/integTest/build.gradle +++ b/java/integTest/build.gradle @@ -50,7 +50,7 @@ ext { tasks.register('stopAllAfterTests', Exec) { workingDir "${project.rootDir}/../utils" - commandLine 'python3', 'cluster_manager.py', 'stop', '--prefix', 'redis-cluster', '--keep-folder' + commandLine 'python3', 'cluster_manager.py', 'stop', '--prefix', 'cluster', '--keep-folder' } // We need to call for stop before and after the test, but gradle doesn't support executing a task @@ -58,7 +58,7 @@ tasks.register('stopAllAfterTests', Exec) { // We need to call for stop in case if previous test run was interrupted/crashed and didn't stop. tasks.register('stopAllBeforeTests', Exec) { workingDir "${project.rootDir}/../utils" - commandLine 'python3', 'cluster_manager.py', 'stop', '--prefix', 'redis-cluster' + commandLine 'python3', 'cluster_manager.py', 'stop', '--prefix', 'cluster' ignoreExitValue true // ignore fail if servers are stopped before } diff --git a/utils/cluster_manager.py b/utils/cluster_manager.py index 03adcaba00..7251c7d7ba 100644 --- a/utils/cluster_manager.py +++ b/utils/cluster_manager.py @@ -23,15 +23,33 @@ "debug": logging.DEBUG, } -GLIDE_HOME_DIR = os.getenv("GLIDE_HOME_DIR") or f"{__file__}/../.." +GLIDE_HOME_DIR = os.getenv("GLIDE_HOME_DIR") or f"{__file__}/.." CLUSTERS_FOLDER = os.getenv("CLUSTERS_FOLDER") or os.path.abspath( - f"{GLIDE_HOME_DIR}/utils/clusters" + f"{GLIDE_HOME_DIR}/clusters" ) -TLS_FOLDER = os.path.abspath(f"{GLIDE_HOME_DIR}/utils/tls_crts") +TLS_FOLDER = os.path.abspath(f"{GLIDE_HOME_DIR}/tls_crts") CA_CRT = f"{TLS_FOLDER}/ca.crt" -REDIS_CRT = f"{TLS_FOLDER}/redis.crt" -REDIS_KEY = f"{TLS_FOLDER}/redis.key" +SERVER_CRT = f"{TLS_FOLDER}/server.crt" +SERVER_KEY = f"{TLS_FOLDER}/server.key" +def get_command(commands: List[str]) -> str: + for command in commands: + try: + result = subprocess.run( + ["which", command], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + text=True, + ) + if result.returncode == 0: + return command + except Exception as e: + logging.error(f"Error checking {command}: {e}") + raise Exception(f"Neither ${'nor'.join(command)} found in the system.") + +# Determine which server to use by checking `valkey-server` and `redis-server` +SERVER_COMMAND = get_command(["valkey-server", "redis-server"]) +CLI_COMMAND = get_command(["valkey-cli", "redis-cli"]) def init_logger(logfile: str): print(f"LOG_FILE={logfile}") @@ -64,7 +82,7 @@ def should_generate_new_tls_certs() -> bool: try: Path(TLS_FOLDER).mkdir(exist_ok=False) except FileExistsError: - files_list = [CA_CRT, REDIS_KEY, REDIS_CRT] + files_list = [CA_CRT, SERVER_KEY, SERVER_CRT] for file in files_list: if check_if_tls_cert_exist(file) and check_if_tls_cert_is_valid(file): return False @@ -72,8 +90,8 @@ def should_generate_new_tls_certs() -> bool: def generate_tls_certs(): - # Based on shell script in redis's server tests - # https://github.com/redis/redis/blob/8c291b97b95f2e011977b522acf77ead23e26f55/utils/gen-test-certs.sh + # Based on shell script in valkey's server tests + # https://github.com/valkey-io/valkey/blob/0d2ba9b94d28d4022ea475a2b83157830982c941/utils/gen-test-certs.sh logging.debug("## Generating TLS certificates") tic = time.perf_counter() ca_key = f"{TLS_FOLDER}/ca.key" @@ -106,8 +124,8 @@ def make_key(name: str, size: int): # Build CA key make_key(ca_key, 4096) - # Build redis key - make_key(REDIS_KEY, 2048) + # Build server key + make_key(SERVER_KEY, 2048) # Build CA Cert p = subprocess.Popen( @@ -123,7 +141,7 @@ def make_key(name: str, size: int): "-days", "3650", "-subj", - "/O=Redis Test/CN=Certificate Authority", + "/O=Valkey GLIDE Test/CN=Certificate Authority", "-out", CA_CRT, ], @@ -137,7 +155,7 @@ def make_key(name: str, size: int): f"Failed to make create CA cert. Executed: {str(p.args)}:\n{err}" ) - # Read Redis key + # Read server key p1 = subprocess.Popen( [ "openssl", @@ -145,19 +163,19 @@ def make_key(name: str, size: int): "-new", "-sha256", "-subj", - "/O=Redis Test/CN=Generic-cert", + "/O=Valkey GLIDE Test/CN=Generic-cert", "-key", - REDIS_KEY, + SERVER_KEY, ], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, ) - _redis_key_output, err = p.communicate(timeout=10) + _key_output, err = p.communicate(timeout=10) if p.returncode != 0: - raise Exception(f"Failed to read Redis key. Executed: {str(p.args)}:\n{err}") + raise Exception(f"Failed to read server key. Executed: {str(p.args)}:\n{err}") - # Build redis cert + # Build server cert p = subprocess.Popen( [ "openssl", @@ -176,7 +194,7 @@ def make_key(name: str, size: int): "-extfile", ext_file, "-out", - REDIS_CRT, + SERVER_CRT, ], stdout=subprocess.PIPE, stderr=subprocess.PIPE, @@ -185,22 +203,22 @@ def make_key(name: str, size: int): ) output, err = p.communicate(timeout=10) if p.returncode != 0: - raise Exception(f"Failed to create redis cert. Executed: {str(p.args)}:\n{err}") + raise Exception(f"Failed to create server cert. Executed: {str(p.args)}:\n{err}") toc = time.perf_counter() logging.debug(f"generate_tls_certs() Elapsed time: {toc - tic:0.4f}") - logging.debug(f"TLS files= {REDIS_CRT}, {REDIS_KEY}, {CA_CRT}") + logging.debug(f"TLS files= {SERVER_CRT}, {SERVER_KEY}, {CA_CRT}") -def get_redis_cli_option_args( +def get_cli_option_args( cluster_folder: str, use_tls: bool, auth: Optional[str] = None ) -> List[str]: args = ( [ "--tls", "--cert", - REDIS_CRT, + SERVER_CRT, "--key", - REDIS_KEY, + SERVER_KEY, "--cacert", CA_CRT, ] @@ -218,7 +236,7 @@ def get_random_string(length): return result_str -class RedisServer: +class Server: def __init__(self, host: str, port: int) -> None: self.host = host self.port = port @@ -269,7 +287,7 @@ def create_cluster_folder(path: str, prefix: str) -> str: return cluster_folder -def start_redis_server( +def start_server( host: str, port: Optional[int], cluster_folder: str, @@ -277,7 +295,7 @@ def start_redis_server( tls_args: List[str], cluster_mode: bool, load_module: Optional[List[str]] = None, -) -> Tuple[RedisServer, str]: +) -> Tuple[Server, str]: port = port if port else next_free_port() logging.debug(f"Creating server {host}:{port}") @@ -285,26 +303,9 @@ def start_redis_server( node_folder = f"{cluster_folder}/{port}" Path(node_folder).mkdir(exist_ok=True) - # Determine which server to use by checking `valkey-server` and `redis-server` - def get_server_command() -> str: - for server in ["valkey-server", "redis-server"]: - try: - result = subprocess.run( - ["which", server], - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - text=True, - ) - if result.returncode == 0: - return server - except Exception as e: - logging.error(f"Error checking {server}: {e}") - raise Exception("Neither valkey-server nor redis-server found in the system.") - - server_name = get_server_command() # Define command arguments cmd_args = [ - server_name, + SERVER_COMMAND, f"{'--tls-port' if tls else '--port'}", str(port), "--cluster-enabled", @@ -314,7 +315,9 @@ def get_server_command() -> str: "--daemonize", "yes", "--logfile", - f"{node_folder}/redis.log", + f"{node_folder}/server.log", + "--protected-mode", + "no" ] if load_module: if len(load_module) == 0: @@ -336,7 +339,7 @@ def get_server_command() -> str: f"Failed to execute command: {str(p.args)}\n Return code: {p.returncode}\n Error: {err}" ) - server = RedisServer(host, port) + server = Server(host, port) return server, node_folder @@ -349,10 +352,10 @@ def create_servers( tls: bool, cluster_mode: bool, load_module: Optional[List[str]] = None, -) -> List[RedisServer]: +) -> List[Server]: tic = time.perf_counter() logging.debug("## Creating servers") - ready_servers: List[RedisServer] = [] + ready_servers: List[Server] = [] nodes_count = shard_count * (1 + replica_count) tls_args = [] if tls is True: @@ -362,9 +365,9 @@ def create_servers( "--tls-cluster", "yes", "--tls-cert-file", - REDIS_CRT, + SERVER_CRT, "--tls-key-file", - REDIS_KEY, + SERVER_KEY, "--tls-ca-cert-file", CA_CRT, "--tls-auth-clients", # Make it so client doesn't have to send cert @@ -382,7 +385,7 @@ def create_servers( for i in range(nodes_count): port = ports[i] if ports else None servers_to_check.add( - start_redis_server( + start_server( host, port, cluster_folder, tls, tls_args, cluster_mode, load_module ) ) @@ -390,16 +393,16 @@ def create_servers( while len(servers_to_check) > 0: server, node_folder = servers_to_check.pop() logging.debug(f"Checking server {server.host}:{server.port}") - if is_address_already_in_use(server, f"{node_folder}/redis.log"): + if is_address_already_in_use(server, f"{node_folder}/server.log"): remove_folder(node_folder) if ports is not None: # The user passed a taken port, exit with an error raise Exception( - f"Couldn't start redis on {server.host}:{server.port}, address already in use" + f"Couldn't start server on {server.host}:{server.port}, address already in use" ) # The port was already taken, try to find a new free one servers_to_check.add( - start_redis_server( + start_server( server.host, None, cluster_folder, @@ -413,7 +416,7 @@ def create_servers( if not wait_for_server(server, cluster_folder, tls): raise Exception( f"Waiting for server {server.host}:{server.port} to start exceeded timeout.\n" - f"See {node_folder}/redis.log for more information" + f"See {node_folder}/server.log for more information" ) ready_servers.append(server) logging.debug("All servers are up!") @@ -423,7 +426,7 @@ def create_servers( def create_cluster( - servers: List[RedisServer], + servers: List[Server], shard_count: int, replica_count: int, cluster_folder: str, @@ -434,8 +437,8 @@ def create_cluster( logging.debug("## Starting cluster creation...") p = subprocess.Popen( [ - "redis-cli", - *get_redis_cli_option_args(cluster_folder, use_tls), + CLI_COMMAND, + *get_cli_option_args(cluster_folder, use_tls), "--cluster", "create", *servers_tuple, @@ -451,7 +454,7 @@ def create_cluster( if err or "[OK] All 16384 slots covered." not in output: raise Exception(f"Failed to create cluster: {err if err else output}") - wait_for_a_message_in_redis_logs(cluster_folder, "Cluster state changed: ok") + wait_for_a_message_in_logs(cluster_folder, "Cluster state changed: ok") wait_for_all_topology_views(servers, cluster_folder, use_tls) logging.debug("The cluster was successfully created!") toc = time.perf_counter() @@ -459,11 +462,11 @@ def create_cluster( def create_standalone_replication( - servers: List[RedisServer], + servers: List[Server], cluster_folder: str, use_tls: bool, ): - # Sets up replication among Redis servers, making them replicas of the primary server. + # Sets up replication among servers, making them replicas of the primary server. tic = time.perf_counter() primary_server = servers[0] @@ -473,8 +476,8 @@ def create_standalone_replication( if i == 0: continue # Skip the primary server replica_of_command = [ - "redis-cli", - *get_redis_cli_option_args(cluster_folder, use_tls), + CLI_COMMAND, + *get_cli_option_args(cluster_folder, use_tls), "-h", str(server.host), "-p", @@ -495,7 +498,7 @@ def create_standalone_replication( f"Failed to set up replication for server {server}: {err if err else output}" ) servers_ports = [str(server.port) for server in servers] - wait_for_a_message_in_redis_logs( + wait_for_a_message_in_logs( cluster_folder, "sync: Finished with success", servers_ports[1:], @@ -508,7 +511,7 @@ def create_standalone_replication( logging.debug(f"create_replication Elapsed time: {toc - tic:0.4f}") -def wait_for_a_message_in_redis_logs( +def wait_for_a_message_in_logs( cluster_folder: str, message: str, server_ports: Optional[List[str]] = None, @@ -516,19 +519,19 @@ def wait_for_a_message_in_redis_logs( for dir in Path(cluster_folder).rglob("*"): if not dir.is_dir(): continue - log_file = f"{dir}/redis.log" + log_file = f"{dir}/server.log" if server_ports and os.path.basename(os.path.normpath(dir)) not in server_ports: continue if not wait_for_message(log_file, message, 10): raise Exception( f"During the timeout duration, the server logs associated with port {dir} did not contain the message:{message}." - f"See {dir}/redis.log for more information" + f"See {dir}/server.log for more information" ) def wait_for_all_topology_views( - servers: List[RedisServer], cluster_folder: str, use_tls: bool + servers: List[Server], cluster_folder: str, use_tls: bool ): """ Wait for each of the nodes to have a topology view that contains all nodes. @@ -536,12 +539,12 @@ def wait_for_all_topology_views( """ for server in servers: cmd_args = [ - "redis-cli", + CLI_COMMAND, "-h", server.host, "-p", str(server.port), - *get_redis_cli_option_args(cluster_folder, use_tls), + *get_cli_option_args(cluster_folder, use_tls), "cluster", "slots", ] @@ -580,7 +583,7 @@ def wait_for_all_topology_views( def wait_for_server( - server: RedisServer, + server: Server, cluster_folder: str, use_tls: bool, timeout: int = 10, @@ -590,12 +593,12 @@ def wait_for_server( while time.time() < timeout_start + timeout: p = subprocess.Popen( [ - "redis-cli", + CLI_COMMAND, "-h", server.host, "-p", str(server.port), - *get_redis_cli_option_args(cluster_folder, use_tls), + *get_cli_option_args(cluster_folder, use_tls), "PING", ], stdout=subprocess.PIPE, @@ -627,8 +630,8 @@ def wait_for_message( timeout_start = time.time() while time.time() < timeout_start + timeout: with open(log_file, "r") as f: - redis_log = f.read() - if message in redis_log: + server_log = f.read() + if message in server_log: return True else: time.sleep(0.1) @@ -638,7 +641,7 @@ def wait_for_message( def is_address_already_in_use( - server: RedisServer, + server: Server, log_file: str, timeout: int = 5, ): @@ -646,11 +649,11 @@ def is_address_already_in_use( timeout_start = time.time() while time.time() < timeout_start + timeout: with open(log_file, "r") as f: - redis_log = f.read() - if "Address already in use" in redis_log: + server_log = f.read() + if "Address already in use" in server_log: logging.debug(f"Address is already bind for server {server}") return True - elif "Ready" in redis_log: + elif "Ready" in server_log: logging.debug(f"Address is free for server {server}!") return False else: @@ -674,15 +677,15 @@ def dir_path(path: str): raise NotADirectoryError(path) -def stop_server(server: RedisServer, cluster_folder: str, use_tls: bool, auth: str): +def stop_server(server: Server, cluster_folder: str, use_tls: bool, auth: str): logging.debug(f"Stopping server {server}") cmd_args = [ - "redis-cli", + CLI_COMMAND, "-h", server.host, "-p", str(server.port), - *get_redis_cli_option_args(cluster_folder, use_tls, auth), + *get_cli_option_args(cluster_folder, use_tls, auth), "shutdown", "nosave", ] @@ -720,7 +723,7 @@ def stop_server(server: RedisServer, cluster_folder: str, use_tls: bool, auth: s def wait_for_server_shutdown( - server: RedisServer, + server: Server, cluster_folder: str, use_tls: bool, auth: str, @@ -732,12 +735,12 @@ def wait_for_server_shutdown( while time.time() < timeout_start + timeout: p = subprocess.Popen( [ - "redis-cli", + CLI_COMMAND, "-h", server.host, "-p", str(server.port), - *get_redis_cli_option_args(cluster_folder, use_tls, auth), + *get_cli_option_args(cluster_folder, use_tls, auth), "PING", ], stdout=subprocess.PIPE, @@ -818,7 +821,7 @@ def stop_cluster( for it in os.scandir(cluster_folder): if it.is_dir() and it.name.isdigit(): port = it.name - stop_server(RedisServer(host, int(port)), cluster_folder, use_tls, auth) + stop_server(Server(host, int(port)), cluster_folder, use_tls, auth) logging.debug("All hosts were stopped") if not keep_folder: remove_folder(cluster_folder) @@ -916,14 +919,14 @@ def main(): type=str, help="Prefix to be used for the cluster folder name " "(default without TLS: %(default)s, default with TLS: tls-%(default)s)", - default="redis-cluster", + default="cluster", required=False, ) parser_start.add_argument( "--load-module", action="append", - help="The paths of the redis modules to load.", + help="The paths of the server modules to load.", required=False, )