From 3be867065b91c96a2e9bdabba885915309e73245 Mon Sep 17 00:00:00 2001 From: Xavier Frankline Date: Tue, 4 Jun 2024 15:01:54 +0300 Subject: [PATCH 01/19] draft elastic setup secure credentials & register repo --- indexer/scripts/elastic-conf.py | 77 +++++++++++++++++++++++++++++++++ 1 file changed, 77 insertions(+) diff --git a/indexer/scripts/elastic-conf.py b/indexer/scripts/elastic-conf.py index a0402e05..ef4cdc5e 100644 --- a/indexer/scripts/elastic-conf.py +++ b/indexer/scripts/elastic-conf.py @@ -6,6 +6,7 @@ import argparse import os +import subprocess import sys from logging import getLogger from typing import Any @@ -54,6 +55,19 @@ def define_options(self, ap: argparse.ArgumentParser) -> None: default=os.environ.get("ELASTICSEARCH_SNAPSHOT_REPO") or "", help="ES snapshot repository name", ) + # S3 Credentials + ap.add_argument( + "--s3-access-key", + dest="s3_access_key", + default=os.environ.get("ELASTICSEARCH_S3_ACCESS_KEY") or "", + help="Elasticsearch S3 access key", + ) + ap.add_argument( + "--s3-secret-key", + dest="s3_secret_key", + default=os.environ.get("ELASTICSEARCH_S3_SECRET_KEY") or "", + help="Elasticsearch S3 secret key", + ) def process_args(self) -> None: super().process_args() @@ -64,6 +78,8 @@ def process_args(self) -> None: ("ilm_max_age", "ELASTICSEARCH_ILM_MAX_AGE"), ("ilm_max_shard_size", "ELASTICSEARCH_ILM_MAX_SHARD_SIZE"), ("es_snapshot_repo", "ELASTICSEARCH_SNAPSHOT_REPO"), + ("s3_access_key", "ELASTICSEARCH_S3_ACCESS_KEY"), + ("s3_secret_key", "ELASTICSEARCH_S3_SECRET_KEY"), ] for arg_name, env_name in required_args: arg_val = getattr(self.args, arg_name) @@ -76,10 +92,15 @@ def process_args(self) -> None: self.ilm_max_age = self.args.ilm_max_age self.ilm_max_shard_size = self.args.ilm_max_shard_size self.es_snapshot_repo = self.args.es_snapshot_repo + self.s3_access_key = self.args.s3_access_key + self.s3_secret_key = self.args.s3_secret_key def main_loop(self) -> None: es = self.elasticsearch_client() assert es.ping(), "Failed to connect to Elasticsearch" + if not self.check_s3_credentials(es): + self.set_s3_credentials(es) + self.reload_secure_settings(es) index_template_created = self.create_index_template(es) ilm_policy_created = self.create_ilm_policy(es) alias_created = self.create_initial_index(es) @@ -96,6 +117,62 @@ def main_loop(self) -> None: logger.error("One or more configurations failed. Check logs for details.") return + def check_s3_credentials(self, es: Elasticsearch) -> bool: + try: + response = es.transport.perform_request("GET", "/_nodes/secure_settings") + nodes = response.get("nodes", {}) + for node_id, node_data in nodes.items(): + settings = node_data.get("secure_settings", {}) + if ( + "s3.client.default.access_key" in settings + and "s3.client.default.secret_key" in settings + ): + return True + return False + except Exception as e: + logger.error(f"Error checking S3 credentials: {e}") + return False + + def set_s3_credentials(self, es: Elasticsearch) -> None: + commands = [ + ("s3.client.default.access_key", self.s3_access_key), + ("s3.client.default.secret_key", self.s3_secret_key), + ] + for key, value in commands: + result = subprocess.run( + ["elasticsearch-keystore", "add", "--force", "--stdin", key], + input=value.encode(), + capture_output=True, + text=True, + ) + if result.returncode != 0: + logger.error(f"Failed to set keystore value for {key}: {result.stderr}") + sys.exit(1) + + def reload_secure_settings(self, es: Elasticsearch) -> None: + response = es.nodes.reload_secure_settings() + if response.get("acknowledged", False): + logger.info("Secure settings reloaded successfully.") + else: + logger.error("Failed to reload secure settings.") + sys.exit(1) + + def check_s3_repository(self, es: Elasticsearch) -> bool: + response = es.snapshot.get_repository() + return self.es_snapshot_repo in response + + def register_s3_repository(self, es: Elasticsearch) -> None: + response = es.snapshot.create_repository( + name=self.es_snapshot_repo, + type="s3", + settings={"bucket": self.es_snapshot_repo, "client": "default"}, + ) + if response and response.get("acknowledged", False): + logger.info("S3 repository registered successfully.") + else: + logger.error("Failed to register S3 repository.") + sys.exit(1) + def create_index_template(self, es: Elasticsearch) -> Any: json_data = self.load_index_template() if not json_data: From 2494791c578363764c3834761ab2c352966d6335 Mon Sep 17 00:00:00 2001 From: Xavier Frankline Date: Tue, 4 Jun 2024 15:08:50 +0300 Subject: [PATCH 02/19] add ES S3 settings --- docker/deploy.sh | 2 ++ docker/docker-compose.yml.j2 | 2 ++ 2 files changed, 4 insertions(+) diff --git a/docker/deploy.sh b/docker/deploy.sh index a0ee8eb9..2830e7f0 100755 --- a/docker/deploy.sh +++ b/docker/deploy.sh @@ -641,6 +641,8 @@ add ELASTICSEARCH_CLUSTER add ELASTICSEARCH_CONFIG_DIR add ELASTICSEARCH_CONTAINERS int add ELASTICSEARCH_HOSTS +add ELASTICSEARCH_S3_ACCESS_KEY # private +add ELASTICSEARCH_S3_SECRET_KEY # private add ELASTICSEARCH_SNAPSHOT_CRONJOB_ENABLE # NOT bool! add ELASTICSEARCH_SNAPSHOT_REPO allow-empty add ELASTICSEARCH_SHARD_COUNT int diff --git a/docker/docker-compose.yml.j2 b/docker/docker-compose.yml.j2 index e5c3444c..78e4da88 100644 --- a/docker/docker-compose.yml.j2 +++ b/docker/docker-compose.yml.j2 @@ -475,6 +475,8 @@ services: ELASTICSEARCH_SHARD_COUNT : {{elasticsearch_shard_count}} ELASTICSEARCH_SHARD_REPLICAS: {{elasticsearch_shard_replicas}} ELASTICSEARCH_SNAPSHOT_REPO: {{elasticsearch_snapshot_repo}} + ELASTICSEARCH_S3_ACCESS_KEY: {{elasticsearch_s3_access_key}} + ELASTICSEARCH_S3_SECRET_KEY" {{elasticsearch_s3_secret_key}} RUN: elastic-conf {% macro define_volume(prefix, suffix='') %} From 98645304e1653866330b92b75de94f0ffbc78c11 Mon Sep 17 00:00:00 2001 From: Xavier Frankline Date: Tue, 4 Jun 2024 16:47:11 +0300 Subject: [PATCH 03/19] rm setting S3 credetials --- docker/docker-compose.yml.j2 | 2 +- indexer/scripts/elastic-conf.py | 61 --------------------------------- 2 files changed, 1 insertion(+), 62 deletions(-) diff --git a/docker/docker-compose.yml.j2 b/docker/docker-compose.yml.j2 index 78e4da88..a5299381 100644 --- a/docker/docker-compose.yml.j2 +++ b/docker/docker-compose.yml.j2 @@ -476,7 +476,7 @@ services: ELASTICSEARCH_SHARD_REPLICAS: {{elasticsearch_shard_replicas}} ELASTICSEARCH_SNAPSHOT_REPO: {{elasticsearch_snapshot_repo}} ELASTICSEARCH_S3_ACCESS_KEY: {{elasticsearch_s3_access_key}} - ELASTICSEARCH_S3_SECRET_KEY" {{elasticsearch_s3_secret_key}} + ELASTICSEARCH_S3_SECRET_KEY: {{elasticsearch_s3_secret_key}} RUN: elastic-conf {% macro define_volume(prefix, suffix='') %} diff --git a/indexer/scripts/elastic-conf.py b/indexer/scripts/elastic-conf.py index ef4cdc5e..a1998cc9 100644 --- a/indexer/scripts/elastic-conf.py +++ b/indexer/scripts/elastic-conf.py @@ -6,7 +6,6 @@ import argparse import os -import subprocess import sys from logging import getLogger from typing import Any @@ -55,19 +54,6 @@ def define_options(self, ap: argparse.ArgumentParser) -> None: default=os.environ.get("ELASTICSEARCH_SNAPSHOT_REPO") or "", help="ES snapshot repository name", ) - # S3 Credentials - ap.add_argument( - "--s3-access-key", - dest="s3_access_key", - default=os.environ.get("ELASTICSEARCH_S3_ACCESS_KEY") or "", - help="Elasticsearch S3 access key", - ) - ap.add_argument( - "--s3-secret-key", - dest="s3_secret_key", - default=os.environ.get("ELASTICSEARCH_S3_SECRET_KEY") or "", - help="Elasticsearch S3 secret key", - ) def process_args(self) -> None: super().process_args() @@ -78,8 +64,6 @@ def process_args(self) -> None: ("ilm_max_age", "ELASTICSEARCH_ILM_MAX_AGE"), ("ilm_max_shard_size", "ELASTICSEARCH_ILM_MAX_SHARD_SIZE"), ("es_snapshot_repo", "ELASTICSEARCH_SNAPSHOT_REPO"), - ("s3_access_key", "ELASTICSEARCH_S3_ACCESS_KEY"), - ("s3_secret_key", "ELASTICSEARCH_S3_SECRET_KEY"), ] for arg_name, env_name in required_args: arg_val = getattr(self.args, arg_name) @@ -92,15 +76,10 @@ def process_args(self) -> None: self.ilm_max_age = self.args.ilm_max_age self.ilm_max_shard_size = self.args.ilm_max_shard_size self.es_snapshot_repo = self.args.es_snapshot_repo - self.s3_access_key = self.args.s3_access_key - self.s3_secret_key = self.args.s3_secret_key def main_loop(self) -> None: es = self.elasticsearch_client() assert es.ping(), "Failed to connect to Elasticsearch" - if not self.check_s3_credentials(es): - self.set_s3_credentials(es) - self.reload_secure_settings(es) index_template_created = self.create_index_template(es) ilm_policy_created = self.create_ilm_policy(es) alias_created = self.create_initial_index(es) @@ -117,46 +96,6 @@ def main_loop(self) -> None: logger.error("One or more configurations failed. Check logs for details.") return - def check_s3_credentials(self, es: Elasticsearch) -> bool: - try: - response = es.transport.perform_request("GET", "/_nodes/secure_settings") - nodes = response.get("nodes", {}) - for node_id, node_data in nodes.items(): - settings = node_data.get("secure_settings", {}) - if ( - "s3.client.default.access_key" in settings - and "s3.client.default.secret_key" in settings - ): - return True - return False - except Exception as e: - logger.error(f"Error checking S3 credentials: {e}") - return False - - def set_s3_credentials(self, es: Elasticsearch) -> None: - commands = [ - ("s3.client.default.access_key", self.s3_access_key), - ("s3.client.default.secret_key", self.s3_secret_key), - ] - for key, value in commands: - result = subprocess.run( - ["elasticsearch-keystore", "add", "--force", "--stdin", key], - input=value.encode(), - capture_output=True, - text=True, - ) - if result.returncode != 0: - logger.error(f"Failed to set keystore value for {key}: {result.stderr}") - sys.exit(1) - - def reload_secure_settings(self, es: Elasticsearch) -> None: - response = es.nodes.reload_secure_settings() - if response.get("acknowledged", False): - logger.info("Secure settings reloaded successfully.") - else: - logger.error("Failed to reload secure settings.") - sys.exit(1) - def check_s3_repository(self, es: Elasticsearch) -> bool: response = es.snapshot.get_repository() return self.es_snapshot_repo in response From d4764bbd0301946c89bd4d5505ba30ccecc70d14 Mon Sep 17 00:00:00 2001 From: Xavier Frankline Date: Mon, 10 Jun 2024 15:00:25 +0300 Subject: [PATCH 04/19] update ES snapshot setup, define fs for dev/containers --- docker/deploy.sh | 3 ++ docker/docker-compose.yml.j2 | 5 +-- indexer/scripts/elastic-conf.py | 62 ++++++++++++++++++++++++++------- 3 files changed, 56 insertions(+), 14 deletions(-) diff --git a/docker/deploy.sh b/docker/deploy.sh index ad652f97..13799916 100755 --- a/docker/deploy.sh +++ b/docker/deploy.sh @@ -137,6 +137,7 @@ ELASTICSEARCH_CONFIG_DIR=./conf/elasticsearch/templates ELASTICSEARCH_IMAGE="docker.elastic.co/elasticsearch/elasticsearch:8.12.0" ELASTICSEARCH_PORT_BASE=9200 # native port ELASTICSEARCH_SNAPSHOT_CRONJOB_ENABLE=false +ELASTICSEARCH_SNAPSHOT_LOCATION="fs" FETCHER_CRONJOB_ENABLE=true # batch fetcher FETCHER_NUM_BATCHES=20 # batch fetcher @@ -328,6 +329,7 @@ prod) ELASTICSEARCH_ILM_MAX_AGE="90d" ELASTICSEARCH_ILM_MAX_SHARD_SIZE="50gb" ELASTICSEARCH_HOSTS=http://ramos.angwin:9200,http://woodward.angwin:9200,http://bradley.angwin:9200 + ELASTICSEARCH_SNAPSHOT_LOCATION="s3" # Disabled until tested in staging. # Questions: @@ -637,6 +639,7 @@ add ELASTICSEARCH_HOSTS add ELASTICSEARCH_S3_ACCESS_KEY # private add ELASTICSEARCH_S3_SECRET_KEY # private add ELASTICSEARCH_SNAPSHOT_CRONJOB_ENABLE # NOT bool! +add ELASTICSEARCH_SNAPSHOT_LOCATION add ELASTICSEARCH_SNAPSHOT_REPO allow-empty add ELASTICSEARCH_SHARD_COUNT int add ELASTICSEARCH_SHARD_REPLICAS int diff --git a/docker/docker-compose.yml.j2 b/docker/docker-compose.yml.j2 index 5abffb60..740b5037 100644 --- a/docker/docker-compose.yml.j2 +++ b/docker/docker-compose.yml.j2 @@ -434,10 +434,11 @@ services: ELASTICSEARCH_ILM_MAX_SHARD_SIZE: {{elasticsearch_ilm_max_shard_size}} ELASTICSEARCH_SHARD_COUNT : {{elasticsearch_shard_count}} ELASTICSEARCH_SHARD_REPLICAS: {{elasticsearch_shard_replicas}} + ELASTICSEARCH_SNAPSHOT_LOCATION: {{elasticsearch_snapshot_location}} ELASTICSEARCH_SNAPSHOT_REPO: {{elasticsearch_snapshot_repo}} - ELASTICSEARCH_S3_ACCESS_KEY: {{elasticsearch_s3_access_key}} - ELASTICSEARCH_S3_SECRET_KEY: {{elasticsearch_s3_secret_key}} RUN: elastic-conf + volumes: + - *es-backup-volume {% macro define_volume(prefix, suffix='') %} {{prefix}}{{suffix}}: diff --git a/indexer/scripts/elastic-conf.py b/indexer/scripts/elastic-conf.py index a1998cc9..6e13c9a5 100644 --- a/indexer/scripts/elastic-conf.py +++ b/indexer/scripts/elastic-conf.py @@ -54,6 +54,12 @@ def define_options(self, ap: argparse.ArgumentParser) -> None: default=os.environ.get("ELASTICSEARCH_SNAPSHOT_REPO") or "", help="ES snapshot repository name", ) + ap.add_argument( + "--es-snapshot-location", + dest="es_snapshot_location", + default=os.environ.get("ELASTICSEARCH_SNAPSHOT_LOCATION") or "fs", + help="ES snapshots upload location, default fs", + ) def process_args(self) -> None: super().process_args() @@ -64,6 +70,7 @@ def process_args(self) -> None: ("ilm_max_age", "ELASTICSEARCH_ILM_MAX_AGE"), ("ilm_max_shard_size", "ELASTICSEARCH_ILM_MAX_SHARD_SIZE"), ("es_snapshot_repo", "ELASTICSEARCH_SNAPSHOT_REPO"), + ("es_snapshot_location", "ELASTICSEARCH_SNAPSHOT_LOCATION"), ] for arg_name, env_name in required_args: arg_val = getattr(self.args, arg_name) @@ -76,6 +83,7 @@ def process_args(self) -> None: self.ilm_max_age = self.args.ilm_max_age self.ilm_max_shard_size = self.args.ilm_max_shard_size self.es_snapshot_repo = self.args.es_snapshot_repo + self.es_snapshot_location = self.args.es_snapshot_location def main_loop(self) -> None: es = self.elasticsearch_client() @@ -96,21 +104,41 @@ def main_loop(self) -> None: logger.error("One or more configurations failed. Check logs for details.") return - def check_s3_repository(self, es: Elasticsearch) -> bool: - response = es.snapshot.get_repository() - return self.es_snapshot_repo in response + def repository_exists(self, es: Elasticsearch, repo_name: str) -> bool: + try: + response = es.snapshot.get_repository(name=repo_name) + return repo_name in response + except Exception as e: + logger.error("Error checking if repository exists: %s", e) + return False + + def register_fs_repository(self, es: Elasticsearch) -> None: + if not self.repository_exists(es, self.es_snapshot_repo): + response = es.snapshot.create_repository( + name=self.es_snapshot_repo, + type="fs", + settings={"location": "/var/backups/elasticsearch", "compress": True}, + ) + if response and response.get("acknowledged", False): + logger.info("Filesystem repository registered successfully.") + else: + logger.error("Failed to register filesystem repository.") + else: + logger.info("Filesystem repository already exists.") def register_s3_repository(self, es: Elasticsearch) -> None: - response = es.snapshot.create_repository( - name=self.es_snapshot_repo, - type="s3", - settings={"bucket": self.es_snapshot_repo, "client": "default"}, - ) - if response and response.get("acknowledged", False): - logger.info("S3 repository registered successfully.") + if not self.repository_exists(es, self.es_snapshot_repo): + response = es.snapshot.create_repository( + name=self.es_snapshot_repo, + type="s3", + settings={"bucket": self.es_snapshot_repo, "client": "default"}, + ) + if response and response.get("acknowledged", False): + logger.info("S3 repository registered successfully.") + else: + logger.error("Failed to register S3 repository.") else: - logger.error("Failed to register S3 repository.") - sys.exit(1) + logger.info("S3 repository already exists.") def create_index_template(self, es: Elasticsearch) -> Any: json_data = self.load_index_template() @@ -178,6 +206,16 @@ def create_initial_index(self, es: Elasticsearch) -> Any: def create_slm_policy(self, es: Elasticsearch) -> Any: CURRENT_POLICY_ID = "bi_weekly_slm" repository = self.es_snapshot_repo + + if self.es_snapshot_location == "fs": + self.register_fs_repository(es) + elif self.es_snapshot_location == "s3": + self.register_s3_repository(es) + # To Add Backblaze support + else: + logger.error("Unsupported snapshot location: %s", self.es_snapshot_location) + return False + json_data = self.load_slm_policy_template(CURRENT_POLICY_ID) if not json_data: logger.error("Elasticsearch create slm policy: error template not loaded") From 21675a25b3f2bbf1cd67f4e32ce43a9a64cdbf22 Mon Sep 17 00:00:00 2001 From: Xavier Frankline Date: Mon, 10 Jun 2024 15:04:59 +0300 Subject: [PATCH 05/19] rm ES S3 credentials --- docker/deploy.sh | 2 -- 1 file changed, 2 deletions(-) diff --git a/docker/deploy.sh b/docker/deploy.sh index 13799916..8398eab3 100755 --- a/docker/deploy.sh +++ b/docker/deploy.sh @@ -636,8 +636,6 @@ add ELASTICSEARCH_CLUSTER add ELASTICSEARCH_CONFIG_DIR add ELASTICSEARCH_CONTAINERS int add ELASTICSEARCH_HOSTS -add ELASTICSEARCH_S3_ACCESS_KEY # private -add ELASTICSEARCH_S3_SECRET_KEY # private add ELASTICSEARCH_SNAPSHOT_CRONJOB_ENABLE # NOT bool! add ELASTICSEARCH_SNAPSHOT_LOCATION add ELASTICSEARCH_SNAPSHOT_REPO allow-empty From 72d55998e9c46d17b720e7d9a2296af94b309008 Mon Sep 17 00:00:00 2001 From: Xavier Frankline Date: Mon, 10 Jun 2024 15:12:32 +0300 Subject: [PATCH 06/19] set ES snapshot bucket --- docker/deploy.sh | 1 + docker/docker-compose.yml.j2 | 1 + indexer/scripts/elastic-conf.py | 10 +++++++++- 3 files changed, 11 insertions(+), 1 deletion(-) diff --git a/docker/deploy.sh b/docker/deploy.sh index 8398eab3..ebc7ba08 100755 --- a/docker/deploy.sh +++ b/docker/deploy.sh @@ -636,6 +636,7 @@ add ELASTICSEARCH_CLUSTER add ELASTICSEARCH_CONFIG_DIR add ELASTICSEARCH_CONTAINERS int add ELASTICSEARCH_HOSTS +add ELASTICSEARCH_SNAPSHOT_BUCKET #private add ELASTICSEARCH_SNAPSHOT_CRONJOB_ENABLE # NOT bool! add ELASTICSEARCH_SNAPSHOT_LOCATION add ELASTICSEARCH_SNAPSHOT_REPO allow-empty diff --git a/docker/docker-compose.yml.j2 b/docker/docker-compose.yml.j2 index 740b5037..f70ce6e9 100644 --- a/docker/docker-compose.yml.j2 +++ b/docker/docker-compose.yml.j2 @@ -434,6 +434,7 @@ services: ELASTICSEARCH_ILM_MAX_SHARD_SIZE: {{elasticsearch_ilm_max_shard_size}} ELASTICSEARCH_SHARD_COUNT : {{elasticsearch_shard_count}} ELASTICSEARCH_SHARD_REPLICAS: {{elasticsearch_shard_replicas}} + ELASTICSEARCH_SNAPSHOT_BUCKET: {{elasticsearch_snapshot_bucket}} ELASTICSEARCH_SNAPSHOT_LOCATION: {{elasticsearch_snapshot_location}} ELASTICSEARCH_SNAPSHOT_REPO: {{elasticsearch_snapshot_repo}} RUN: elastic-conf diff --git a/indexer/scripts/elastic-conf.py b/indexer/scripts/elastic-conf.py index 6e13c9a5..0f071406 100644 --- a/indexer/scripts/elastic-conf.py +++ b/indexer/scripts/elastic-conf.py @@ -48,6 +48,12 @@ def define_options(self, ap: argparse.ArgumentParser) -> None: help="ES ILM policy max shard size", ) # SLM + ap.add_argument( + "--es-snapshot-s3-bucket", + dest="es_snapshot_s3_bucket", + default=os.environ.get("ELASTICSEARCH_SNAPSHOT_BUCKET") or "", + help="ES snapshot S3 bucket", + ) ap.add_argument( "--es-snapshot-repo", dest="es_snapshot_repo", @@ -69,6 +75,7 @@ def process_args(self) -> None: ("replicas", "ELASTICSEARCH_SHARD_REPLICAS"), ("ilm_max_age", "ELASTICSEARCH_ILM_MAX_AGE"), ("ilm_max_shard_size", "ELASTICSEARCH_ILM_MAX_SHARD_SIZE"), + ("es_snapshot_s3_bucket", "ELASTICSEARCH_SNAPSHOT_BUCKET"), ("es_snapshot_repo", "ELASTICSEARCH_SNAPSHOT_REPO"), ("es_snapshot_location", "ELASTICSEARCH_SNAPSHOT_LOCATION"), ] @@ -84,6 +91,7 @@ def process_args(self) -> None: self.ilm_max_shard_size = self.args.ilm_max_shard_size self.es_snapshot_repo = self.args.es_snapshot_repo self.es_snapshot_location = self.args.es_snapshot_location + self.es_snapshot_s3_bucket = self.args.es_snapshot_s3_bucket def main_loop(self) -> None: es = self.elasticsearch_client() @@ -131,7 +139,7 @@ def register_s3_repository(self, es: Elasticsearch) -> None: response = es.snapshot.create_repository( name=self.es_snapshot_repo, type="s3", - settings={"bucket": self.es_snapshot_repo, "client": "default"}, + settings={"bucket": self.es_snapshot_s3_bucket, "client": "default"}, ) if response and response.get("acknowledged", False): logger.info("S3 repository registered successfully.") From fe433c82f849388c979ea859ed9f60e1963b8299 Mon Sep 17 00:00:00 2001 From: Xavier Frankline Date: Mon, 10 Jun 2024 17:38:38 +0300 Subject: [PATCH 07/19] updates --- docker/deploy.sh | 10 ++++++---- docker/docker-compose.yml.j2 | 9 ++++----- indexer/scripts/elastic-conf.py | 35 ++++++++++++++++++++++----------- 3 files changed, 33 insertions(+), 21 deletions(-) diff --git a/docker/deploy.sh b/docker/deploy.sh index ebc7ba08..02b102d4 100755 --- a/docker/deploy.sh +++ b/docker/deploy.sh @@ -137,7 +137,8 @@ ELASTICSEARCH_CONFIG_DIR=./conf/elasticsearch/templates ELASTICSEARCH_IMAGE="docker.elastic.co/elasticsearch/elasticsearch:8.12.0" ELASTICSEARCH_PORT_BASE=9200 # native port ELASTICSEARCH_SNAPSHOT_CRONJOB_ENABLE=false -ELASTICSEARCH_SNAPSHOT_LOCATION="fs" +ELASTICSEARCH_SNAPSHOT_TYPE="fs" +ELASTICSEARCH_SNAPSHOT_SETTINGS_LOCATION="/var/backups/elasticsearch" FETCHER_CRONJOB_ENABLE=true # batch fetcher FETCHER_NUM_BATCHES=20 # batch fetcher @@ -329,7 +330,7 @@ prod) ELASTICSEARCH_ILM_MAX_AGE="90d" ELASTICSEARCH_ILM_MAX_SHARD_SIZE="50gb" ELASTICSEARCH_HOSTS=http://ramos.angwin:9200,http://woodward.angwin:9200,http://bradley.angwin:9200 - ELASTICSEARCH_SNAPSHOT_LOCATION="s3" + ELASTICSEARCH_SNAPSHOT_TYPE="s3" # Disabled until tested in staging. # Questions: @@ -636,10 +637,11 @@ add ELASTICSEARCH_CLUSTER add ELASTICSEARCH_CONFIG_DIR add ELASTICSEARCH_CONTAINERS int add ELASTICSEARCH_HOSTS -add ELASTICSEARCH_SNAPSHOT_BUCKET #private add ELASTICSEARCH_SNAPSHOT_CRONJOB_ENABLE # NOT bool! -add ELASTICSEARCH_SNAPSHOT_LOCATION +add ELASTICSEARCH_SNAPSHOT_TYPE add ELASTICSEARCH_SNAPSHOT_REPO allow-empty +add ELASTICSEARCH_SNAPSHOT_SETTINGS_LOCATION +add ELASTICSEARCH_SNAPSHOT_SETTINGS_BUCKET #private add ELASTICSEARCH_SHARD_COUNT int add ELASTICSEARCH_SHARD_REPLICAS int add ELASTICSEARCH_ILM_MAX_AGE diff --git a/docker/docker-compose.yml.j2 b/docker/docker-compose.yml.j2 index f70ce6e9..c836e9af 100644 --- a/docker/docker-compose.yml.j2 +++ b/docker/docker-compose.yml.j2 @@ -87,7 +87,7 @@ x-es-service-settings: &es-service-settings ES_JAVA_OPTS: "-Xms1g -Xmx1g" discovery.type: single-node {% endif %} - path.repo: "/var/backups/elasticsearch" + path.repo: {{elasticsearch_snapshot_settings_location}} xpack.security.enabled: "false" image: {{elasticsearch_image}} @@ -434,12 +434,11 @@ services: ELASTICSEARCH_ILM_MAX_SHARD_SIZE: {{elasticsearch_ilm_max_shard_size}} ELASTICSEARCH_SHARD_COUNT : {{elasticsearch_shard_count}} ELASTICSEARCH_SHARD_REPLICAS: {{elasticsearch_shard_replicas}} - ELASTICSEARCH_SNAPSHOT_BUCKET: {{elasticsearch_snapshot_bucket}} - ELASTICSEARCH_SNAPSHOT_LOCATION: {{elasticsearch_snapshot_location}} ELASTICSEARCH_SNAPSHOT_REPO: {{elasticsearch_snapshot_repo}} + ELASTICSEARCH_SNAPSHOT_SETTINGS_BUCKET: {{elasticsearch_snapshot_settings_bucket}} + ELASTICSEARCH_SNAPSHOT_SETTINGS_LOCATION: {{elasticsearch_snapshot_settings_location}} + ELASTICSEARCH_SNAPSHOT_TYPE: {{elasticsearch_snapshot_type}} RUN: elastic-conf - volumes: - - *es-backup-volume {% macro define_volume(prefix, suffix='') %} {{prefix}}{{suffix}}: diff --git a/indexer/scripts/elastic-conf.py b/indexer/scripts/elastic-conf.py index 0f071406..bf763074 100644 --- a/indexer/scripts/elastic-conf.py +++ b/indexer/scripts/elastic-conf.py @@ -51,7 +51,7 @@ def define_options(self, ap: argparse.ArgumentParser) -> None: ap.add_argument( "--es-snapshot-s3-bucket", dest="es_snapshot_s3_bucket", - default=os.environ.get("ELASTICSEARCH_SNAPSHOT_BUCKET") or "", + default=os.environ.get("ELASTICSEARCH_SNAPSHOT_SETTINGS_BUCKET") or "", help="ES snapshot S3 bucket", ) ap.add_argument( @@ -61,10 +61,16 @@ def define_options(self, ap: argparse.ArgumentParser) -> None: help="ES snapshot repository name", ) ap.add_argument( - "--es-snapshot-location", - dest="es_snapshot_location", - default=os.environ.get("ELASTICSEARCH_SNAPSHOT_LOCATION") or "fs", - help="ES snapshots upload location, default fs", + "--es-snapshot-type", + dest="es_snapshot_type", + default=os.environ.get("ELASTICSEARCH_SNAPSHOT_TYPE") or "fs", + help="ES snapshots type, default fs", + ) + ap.add_argument( + "--es-snapshot-fs-location", + dest="es_snapshot_fs_location", + default=os.environ.get("ELASTICSEARCH_SNAPSHOT_SETTINGS_LOCATION"), + help="ES path for filesystem backup", ) def process_args(self) -> None: @@ -75,9 +81,8 @@ def process_args(self) -> None: ("replicas", "ELASTICSEARCH_SHARD_REPLICAS"), ("ilm_max_age", "ELASTICSEARCH_ILM_MAX_AGE"), ("ilm_max_shard_size", "ELASTICSEARCH_ILM_MAX_SHARD_SIZE"), - ("es_snapshot_s3_bucket", "ELASTICSEARCH_SNAPSHOT_BUCKET"), ("es_snapshot_repo", "ELASTICSEARCH_SNAPSHOT_REPO"), - ("es_snapshot_location", "ELASTICSEARCH_SNAPSHOT_LOCATION"), + ("es_snapshot_type", "ELASTICSEARCH_SNAPSHOT_TYPE"), ] for arg_name, env_name in required_args: arg_val = getattr(self.args, arg_name) @@ -90,8 +95,14 @@ def process_args(self) -> None: self.ilm_max_age = self.args.ilm_max_age self.ilm_max_shard_size = self.args.ilm_max_shard_size self.es_snapshot_repo = self.args.es_snapshot_repo - self.es_snapshot_location = self.args.es_snapshot_location + self.es_snapshot_type = self.args.es_snapshot_type self.es_snapshot_s3_bucket = self.args.es_snapshot_s3_bucket + self.es_snapshot_fs_location = self.args.es_snapshot_fs_location + + if not self.es_snapshot_s3_bucket: + logger.warning( + "--es-snapshot-s3-bucket or ELASTICSEARCH_SNAPSHOT_SETTINGS_BUCKET not set" + ) def main_loop(self) -> None: es = self.elasticsearch_client() @@ -125,7 +136,7 @@ def register_fs_repository(self, es: Elasticsearch) -> None: response = es.snapshot.create_repository( name=self.es_snapshot_repo, type="fs", - settings={"location": "/var/backups/elasticsearch", "compress": True}, + settings={"location": self.es_snapshot_fs_location, "compress": True}, ) if response and response.get("acknowledged", False): logger.info("Filesystem repository registered successfully.") @@ -215,13 +226,13 @@ def create_slm_policy(self, es: Elasticsearch) -> Any: CURRENT_POLICY_ID = "bi_weekly_slm" repository = self.es_snapshot_repo - if self.es_snapshot_location == "fs": + if self.es_snapshot_type == "fs": self.register_fs_repository(es) - elif self.es_snapshot_location == "s3": + elif self.es_snapshot_type == "s3": self.register_s3_repository(es) # To Add Backblaze support else: - logger.error("Unsupported snapshot location: %s", self.es_snapshot_location) + logger.error("Unsupported snapshot location: %s", self.es_snapshot_type) return False json_data = self.load_slm_policy_template(CURRENT_POLICY_ID) From 81df0ece4084906eb0219ceaf0c346435fa4ab2b Mon Sep 17 00:00:00 2001 From: Xavier Frankline Date: Thu, 13 Jun 2024 12:19:17 +0300 Subject: [PATCH 08/19] apply updates --- docker/deploy.sh | 13 ++--- docker/docker-compose.yml.j2 | 2 +- indexer/scripts/elastic-conf.py | 86 +++++++++++++++------------------ 3 files changed, 46 insertions(+), 55 deletions(-) diff --git a/docker/deploy.sh b/docker/deploy.sh index 02b102d4..38f4cc12 100755 --- a/docker/deploy.sh +++ b/docker/deploy.sh @@ -137,8 +137,8 @@ ELASTICSEARCH_CONFIG_DIR=./conf/elasticsearch/templates ELASTICSEARCH_IMAGE="docker.elastic.co/elasticsearch/elasticsearch:8.12.0" ELASTICSEARCH_PORT_BASE=9200 # native port ELASTICSEARCH_SNAPSHOT_CRONJOB_ENABLE=false -ELASTICSEARCH_SNAPSHOT_TYPE="fs" -ELASTICSEARCH_SNAPSHOT_SETTINGS_LOCATION="/var/backups/elasticsearch" +ELASTICSEARCH_SNAPSHOT_REPO_TYPE="fs" +ELASTICSEARCH_SNAPSHOT_REPO_SETTINGS_LOCATION="mc_story_indexer" FETCHER_CRONJOB_ENABLE=true # batch fetcher FETCHER_NUM_BATCHES=20 # batch fetcher @@ -330,7 +330,7 @@ prod) ELASTICSEARCH_ILM_MAX_AGE="90d" ELASTICSEARCH_ILM_MAX_SHARD_SIZE="50gb" ELASTICSEARCH_HOSTS=http://ramos.angwin:9200,http://woodward.angwin:9200,http://bradley.angwin:9200 - ELASTICSEARCH_SNAPSHOT_TYPE="s3" + ELASTICSEARCH_SNAPSHOT_REPO_TYPE="s3" # Disabled until tested in staging. # Questions: @@ -638,10 +638,11 @@ add ELASTICSEARCH_CONFIG_DIR add ELASTICSEARCH_CONTAINERS int add ELASTICSEARCH_HOSTS add ELASTICSEARCH_SNAPSHOT_CRONJOB_ENABLE # NOT bool! -add ELASTICSEARCH_SNAPSHOT_TYPE add ELASTICSEARCH_SNAPSHOT_REPO allow-empty -add ELASTICSEARCH_SNAPSHOT_SETTINGS_LOCATION -add ELASTICSEARCH_SNAPSHOT_SETTINGS_BUCKET #private +add ELASTICSEARCH_SNAPSHOT_REPO_TYPE +add ELASTICSEARCH_SNAPSHOT_REPO_SETTINGS_LOCATION +add ELASTICSEARCH_SNAPSHOT_REPO_SETTINGS_BUCKET #private +add ELASTICSEARCH_SNAPSHOT_REPO_SETTINGS_ENDPOINT add ELASTICSEARCH_SHARD_COUNT int add ELASTICSEARCH_SHARD_REPLICAS int add ELASTICSEARCH_ILM_MAX_AGE diff --git a/docker/docker-compose.yml.j2 b/docker/docker-compose.yml.j2 index c836e9af..a7f84c82 100644 --- a/docker/docker-compose.yml.j2 +++ b/docker/docker-compose.yml.j2 @@ -87,7 +87,7 @@ x-es-service-settings: &es-service-settings ES_JAVA_OPTS: "-Xms1g -Xmx1g" discovery.type: single-node {% endif %} - path.repo: {{elasticsearch_snapshot_settings_location}} + path.repo: "/var/backups/elasticsearch" xpack.security.enabled: "false" image: {{elasticsearch_image}} diff --git a/indexer/scripts/elastic-conf.py b/indexer/scripts/elastic-conf.py index bf763074..87cab2d5 100644 --- a/indexer/scripts/elastic-conf.py +++ b/indexer/scripts/elastic-conf.py @@ -19,6 +19,18 @@ class ElasticConf(ElasticConfMixin, App): + def __init__(self, process_name: str, descr: str): + super().__init__(process_name, descr) + self.es_snapshot_s3_bucket = os.environ.get( + "ELASTICSEARCH_SNAPSHOT_REPO_SETTINGS_BUCKET" + ) + self.es_snapshot_fs_location = os.environ.get( + "ELASTICSEARCH_SNAPSHOT_REPO_SETTINGS_LOCATION" + ) + self.es_snapshot_s3_endpoint = os.environ.get( + "ELASTICSEARCH_SNAPSHOT_REPO_SETTINGS_ENDPOINT" + ) + def define_options(self, ap: argparse.ArgumentParser) -> None: super().define_options(ap) # Index template args @@ -48,12 +60,6 @@ def define_options(self, ap: argparse.ArgumentParser) -> None: help="ES ILM policy max shard size", ) # SLM - ap.add_argument( - "--es-snapshot-s3-bucket", - dest="es_snapshot_s3_bucket", - default=os.environ.get("ELASTICSEARCH_SNAPSHOT_SETTINGS_BUCKET") or "", - help="ES snapshot S3 bucket", - ) ap.add_argument( "--es-snapshot-repo", dest="es_snapshot_repo", @@ -63,15 +69,9 @@ def define_options(self, ap: argparse.ArgumentParser) -> None: ap.add_argument( "--es-snapshot-type", dest="es_snapshot_type", - default=os.environ.get("ELASTICSEARCH_SNAPSHOT_TYPE") or "fs", + default=os.environ.get("ELASTICSEARCH_SNAPSHOT_REPO_TYPE") or "fs", help="ES snapshots type, default fs", ) - ap.add_argument( - "--es-snapshot-fs-location", - dest="es_snapshot_fs_location", - default=os.environ.get("ELASTICSEARCH_SNAPSHOT_SETTINGS_LOCATION"), - help="ES path for filesystem backup", - ) def process_args(self) -> None: super().process_args() @@ -82,7 +82,7 @@ def process_args(self) -> None: ("ilm_max_age", "ELASTICSEARCH_ILM_MAX_AGE"), ("ilm_max_shard_size", "ELASTICSEARCH_ILM_MAX_SHARD_SIZE"), ("es_snapshot_repo", "ELASTICSEARCH_SNAPSHOT_REPO"), - ("es_snapshot_type", "ELASTICSEARCH_SNAPSHOT_TYPE"), + ("es_snapshot_type", "ELASTICSEARCH_SNAPSHOT_REPO_TYPE"), ] for arg_name, env_name in required_args: arg_val = getattr(self.args, arg_name) @@ -96,13 +96,6 @@ def process_args(self) -> None: self.ilm_max_shard_size = self.args.ilm_max_shard_size self.es_snapshot_repo = self.args.es_snapshot_repo self.es_snapshot_type = self.args.es_snapshot_type - self.es_snapshot_s3_bucket = self.args.es_snapshot_s3_bucket - self.es_snapshot_fs_location = self.args.es_snapshot_fs_location - - if not self.es_snapshot_s3_bucket: - logger.warning( - "--es-snapshot-s3-bucket or ELASTICSEARCH_SNAPSHOT_SETTINGS_BUCKET not set" - ) def main_loop(self) -> None: es = self.elasticsearch_client() @@ -131,33 +124,32 @@ def repository_exists(self, es: Elasticsearch, repo_name: str) -> bool: logger.error("Error checking if repository exists: %s", e) return False - def register_fs_repository(self, es: Elasticsearch) -> None: + def register_repository(self, es: Elasticsearch, repo_type: str) -> bool: if not self.repository_exists(es, self.es_snapshot_repo): - response = es.snapshot.create_repository( - name=self.es_snapshot_repo, - type="fs", - settings={"location": self.es_snapshot_fs_location, "compress": True}, - ) - if response and response.get("acknowledged", False): - logger.info("Filesystem repository registered successfully.") - else: - logger.error("Failed to register filesystem repository.") - else: - logger.info("Filesystem repository already exists.") + settings = {"location": self.es_snapshot_fs_location, "compress": True} + if repo_type == "s3": + settings = { + "bucket": self.es_snapshot_s3_bucket, + "endpoint": self.es_snapshot_s3_endpoint, + } - def register_s3_repository(self, es: Elasticsearch) -> None: - if not self.repository_exists(es, self.es_snapshot_repo): response = es.snapshot.create_repository( name=self.es_snapshot_repo, - type="s3", - settings={"bucket": self.es_snapshot_s3_bucket, "client": "default"}, + type=repo_type, + settings=settings, ) + if response and response.get("acknowledged", False): - logger.info("S3 repository registered successfully.") + logger.info( + f"{repo_type.capitalize()} repository registered successfully." + ) + return True else: - logger.error("Failed to register S3 repository.") + logger.error(f"Failed to register {repo_type} repository.") + return False else: - logger.info("S3 repository already exists.") + logger.info(f"{repo_type.capitalize()} repository already exists.") + return True def create_index_template(self, es: Elasticsearch) -> Any: json_data = self.load_index_template() @@ -226,14 +218,12 @@ def create_slm_policy(self, es: Elasticsearch) -> Any: CURRENT_POLICY_ID = "bi_weekly_slm" repository = self.es_snapshot_repo - if self.es_snapshot_type == "fs": - self.register_fs_repository(es) - elif self.es_snapshot_type == "s3": - self.register_s3_repository(es) - # To Add Backblaze support - else: - logger.error("Unsupported snapshot location: %s", self.es_snapshot_type) - return False + if not self.register_repository(es, self.es_snapshot_type): + logger.error( + "Elasticsearch snapshot repository does not exist: %s", + self.es_snapshot_type, + ) + sys.exit(1) json_data = self.load_slm_policy_template(CURRENT_POLICY_ID) if not json_data: From 292ff91957c84f0db6d97f0dbc21239297e3b401 Mon Sep 17 00:00:00 2001 From: Xavier Frankline Date: Thu, 13 Jun 2024 14:38:29 +0300 Subject: [PATCH 09/19] apply updates --- indexer/scripts/elastic-conf.py | 41 ++++++++++++++++++++------------- 1 file changed, 25 insertions(+), 16 deletions(-) diff --git a/indexer/scripts/elastic-conf.py b/indexer/scripts/elastic-conf.py index 87cab2d5..d30e47ee 100644 --- a/indexer/scripts/elastic-conf.py +++ b/indexer/scripts/elastic-conf.py @@ -117,14 +117,17 @@ def main_loop(self) -> None: return def repository_exists(self, es: Elasticsearch, repo_name: str) -> bool: + status = False try: response = es.snapshot.get_repository(name=repo_name) - return repo_name in response + if repo_name in response: + status = True except Exception as e: - logger.error("Error checking if repository exists: %s", e) - return False + logger.error("Error validating repository: %s", e) + return status def register_repository(self, es: Elasticsearch, repo_type: str) -> bool: + status = False if not self.repository_exists(es, self.es_snapshot_repo): settings = {"location": self.es_snapshot_fs_location, "compress": True} if repo_type == "s3": @@ -133,23 +136,29 @@ def register_repository(self, es: Elasticsearch, repo_type: str) -> bool: "endpoint": self.es_snapshot_s3_endpoint, } - response = es.snapshot.create_repository( - name=self.es_snapshot_repo, - type=repo_type, - settings=settings, - ) + try: + response = es.snapshot.create_repository( + name=self.es_snapshot_repo, + type=repo_type, + settings=settings, + ) - if response and response.get("acknowledged", False): - logger.info( - f"{repo_type.capitalize()} repository registered successfully." + if response and response.get("acknowledged", False): + logger.info( + f"{repo_type.capitalize()} repository registered successfully." + ) + status = True + else: + logger.error(f"Failed to register {repo_type} repository.") + except Exception as e: + logger.error( + f"Exception occurred while registering {repo_type} repository: %s", + e, ) - return True - else: - logger.error(f"Failed to register {repo_type} repository.") - return False else: logger.info(f"{repo_type.capitalize()} repository already exists.") - return True + status = True + return status def create_index_template(self, es: Elasticsearch) -> Any: json_data = self.load_index_template() From 298f83a8d7bce4fc532822e6502675cebcbe22b8 Mon Sep 17 00:00:00 2001 From: Xavier Frankline Odhiambo <49587182+thepsalmist@users.noreply.github.com> Date: Thu, 13 Jun 2024 16:16:39 +0300 Subject: [PATCH 10/19] Update docker/docker-compose.yml.j2 Co-authored-by: Clemence Kyara --- docker/docker-compose.yml.j2 | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/docker/docker-compose.yml.j2 b/docker/docker-compose.yml.j2 index a7f84c82..84e254f2 100644 --- a/docker/docker-compose.yml.j2 +++ b/docker/docker-compose.yml.j2 @@ -435,9 +435,9 @@ services: ELASTICSEARCH_SHARD_COUNT : {{elasticsearch_shard_count}} ELASTICSEARCH_SHARD_REPLICAS: {{elasticsearch_shard_replicas}} ELASTICSEARCH_SNAPSHOT_REPO: {{elasticsearch_snapshot_repo}} - ELASTICSEARCH_SNAPSHOT_SETTINGS_BUCKET: {{elasticsearch_snapshot_settings_bucket}} - ELASTICSEARCH_SNAPSHOT_SETTINGS_LOCATION: {{elasticsearch_snapshot_settings_location}} - ELASTICSEARCH_SNAPSHOT_TYPE: {{elasticsearch_snapshot_type}} + ELASTICSEARCH_SNAPSHOT_REPO_SETTINGS_BUCKET: {{elasticsearch_snapshot_repo_settings_bucket}} + ELASTICSEARCH_SNAPSHOT_REPO_SETTINGS_LOCATION: {{elasticsearch_snapshot_repo_settings_location}} + ELASTICSEARCH_SNAPSHOT_REPO_TYPE: {{elasticsearch_snapshot_repo_type}} RUN: elastic-conf {% macro define_volume(prefix, suffix='') %} From e1a12cc9fd0011ae6914169f4f2004768cc7027c Mon Sep 17 00:00:00 2001 From: Xavier Frankline Date: Fri, 14 Jun 2024 11:24:16 +0300 Subject: [PATCH 11/19] apply updates --- indexer/scripts/elastic-conf.py | 88 +++++++++++++++++---------------- 1 file changed, 45 insertions(+), 43 deletions(-) diff --git a/indexer/scripts/elastic-conf.py b/indexer/scripts/elastic-conf.py index d30e47ee..7d633ee6 100644 --- a/indexer/scripts/elastic-conf.py +++ b/indexer/scripts/elastic-conf.py @@ -67,9 +67,9 @@ def define_options(self, ap: argparse.ArgumentParser) -> None: help="ES snapshot repository name", ) ap.add_argument( - "--es-snapshot-type", - dest="es_snapshot_type", - default=os.environ.get("ELASTICSEARCH_SNAPSHOT_REPO_TYPE") or "fs", + "--es-snapshot-repo-type", + dest="es_snapshot_repo_type", + default=os.environ.get("ELASTICSEARCH_SNAPSHOT_REPO_TYPE") or "", help="ES snapshots type, default fs", ) @@ -82,7 +82,7 @@ def process_args(self) -> None: ("ilm_max_age", "ELASTICSEARCH_ILM_MAX_AGE"), ("ilm_max_shard_size", "ELASTICSEARCH_ILM_MAX_SHARD_SIZE"), ("es_snapshot_repo", "ELASTICSEARCH_SNAPSHOT_REPO"), - ("es_snapshot_type", "ELASTICSEARCH_SNAPSHOT_REPO_TYPE"), + ("es_snapshot_repo_type", "ELASTICSEARCH_SNAPSHOT_REPO_TYPE"), ] for arg_name, env_name in required_args: arg_val = getattr(self.args, arg_name) @@ -95,7 +95,7 @@ def process_args(self) -> None: self.ilm_max_age = self.args.ilm_max_age self.ilm_max_shard_size = self.args.ilm_max_shard_size self.es_snapshot_repo = self.args.es_snapshot_repo - self.es_snapshot_type = self.args.es_snapshot_type + self.es_snapshot_repo_type = self.args.es_snapshot_repo_type def main_loop(self) -> None: es = self.elasticsearch_client() @@ -116,48 +116,50 @@ def main_loop(self) -> None: logger.error("One or more configurations failed. Check logs for details.") return - def repository_exists(self, es: Elasticsearch, repo_name: str) -> bool: - status = False + def repository_exists(self, es: Elasticsearch) -> bool: + repo_name = self.es_snapshot_repo try: response = es.snapshot.get_repository(name=repo_name) - if repo_name in response: - status = True - except Exception as e: - logger.error("Error validating repository: %s", e) - return status + return repo_name in response + except Exception: + logger.exception("Error validating repository: %s", repo_name) + return False - def register_repository(self, es: Elasticsearch, repo_type: str) -> bool: + def register_repository(self, es: Elasticsearch) -> bool: status = False - if not self.repository_exists(es, self.es_snapshot_repo): - settings = {"location": self.es_snapshot_fs_location, "compress": True} - if repo_type == "s3": - settings = { - "bucket": self.es_snapshot_s3_bucket, - "endpoint": self.es_snapshot_s3_endpoint, - } + repo_type = self.es_snapshot_repo_type + if not repo_type: + logger.error("Repository type must be either 'fs' or 's3'") + return status - try: - response = es.snapshot.create_repository( - name=self.es_snapshot_repo, - type=repo_type, - settings=settings, - ) + if self.repository_exists(es): + logger.info("%s repository already exists." % repo_type) + return True - if response and response.get("acknowledged", False): - logger.info( - f"{repo_type.capitalize()} repository registered successfully." - ) - status = True - else: - logger.error(f"Failed to register {repo_type} repository.") - except Exception as e: - logger.error( - f"Exception occurred while registering {repo_type} repository: %s", - e, - ) + if repo_type == "s3": + settings = { + "bucket": self.es_snapshot_s3_bucket, + "endpoint": self.es_snapshot_s3_endpoint, + } else: - logger.info(f"{repo_type.capitalize()} repository already exists.") - status = True + settings = {"location": self.es_snapshot_fs_location} + + try: + response = es.snapshot.create_repository( + name=self.es_snapshot_repo, + type=repo_type, + settings=settings, + ) + + if response and response.get("acknowledged", False): + logger.info("%s repository registered successfully." % repo_type) + status = True + else: + logger.error("Failed to register %s repository." % repo_type) + except Exception: + logger.exception( + "Exception occurred while registering %s repository" % repo_type + ) return status def create_index_template(self, es: Elasticsearch) -> Any: @@ -227,12 +229,12 @@ def create_slm_policy(self, es: Elasticsearch) -> Any: CURRENT_POLICY_ID = "bi_weekly_slm" repository = self.es_snapshot_repo - if not self.register_repository(es, self.es_snapshot_type): + if not self.register_repository(es): logger.error( "Elasticsearch snapshot repository does not exist: %s", - self.es_snapshot_type, + self.es_snapshot_repo_type, ) - sys.exit(1) + return False json_data = self.load_slm_policy_template(CURRENT_POLICY_ID) if not json_data: From 78c136265f7abe73a33a9406817426f17a91bece Mon Sep 17 00:00:00 2001 From: Xavier Frankline Date: Fri, 14 Jun 2024 12:02:18 +0300 Subject: [PATCH 12/19] apply deploy updates --- docker/deploy.sh | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/docker/deploy.sh b/docker/deploy.sh index 38f4cc12..7fd722c4 100755 --- a/docker/deploy.sh +++ b/docker/deploy.sh @@ -138,7 +138,6 @@ ELASTICSEARCH_IMAGE="docker.elastic.co/elasticsearch/elasticsearch:8.12.0" ELASTICSEARCH_PORT_BASE=9200 # native port ELASTICSEARCH_SNAPSHOT_CRONJOB_ENABLE=false ELASTICSEARCH_SNAPSHOT_REPO_TYPE="fs" -ELASTICSEARCH_SNAPSHOT_REPO_SETTINGS_LOCATION="mc_story_indexer" FETCHER_CRONJOB_ENABLE=true # batch fetcher FETCHER_NUM_BATCHES=20 # batch fetcher @@ -353,6 +352,7 @@ staging) ELASTICSEARCH_SHARD_REPLICAS=1 ELASTICSEARCH_ILM_MAX_AGE="6h" ELASTICSEARCH_ILM_MAX_SHARD_SIZE="5gb" + ELASTICSEARCH_SNAPSHOT_REPO_SETTINGS_LOCATION="mc_story_indexer" STORY_LIMIT=50000 @@ -380,6 +380,7 @@ dev) ELASTICSEARCH_SHARD_REPLICAS=1 ELASTICSEARCH_ILM_MAX_AGE="15m" ELASTICSEARCH_ILM_MAX_SHARD_SIZE="100mb" + ELASTICSEARCH_SNAPSHOT_REPO_SETTINGS_LOCATION="mc_story_indexer" STORY_LIMIT=5000 From c966fb1cca238044ef224449d9311ca10fef70bf Mon Sep 17 00:00:00 2001 From: Xavier Frankline Date: Fri, 14 Jun 2024 12:22:48 +0300 Subject: [PATCH 13/19] apply deploy updates --- indexer/scripts/elastic-conf.py | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/indexer/scripts/elastic-conf.py b/indexer/scripts/elastic-conf.py index 7d633ee6..0c4b9860 100644 --- a/indexer/scripts/elastic-conf.py +++ b/indexer/scripts/elastic-conf.py @@ -133,7 +133,7 @@ def register_repository(self, es: Elasticsearch) -> bool: return status if self.repository_exists(es): - logger.info("%s repository already exists." % repo_type) + logger.info("%s repository already exists.", repo_type) return True if repo_type == "s3": @@ -141,8 +141,11 @@ def register_repository(self, es: Elasticsearch) -> bool: "bucket": self.es_snapshot_s3_bucket, "endpoint": self.es_snapshot_s3_endpoint, } - else: + elif repo_type == "fs": settings = {"location": self.es_snapshot_fs_location} + else: + logger.error("Unsupported repository type: %s", repo_type) + return status try: response = es.snapshot.create_repository( @@ -152,13 +155,13 @@ def register_repository(self, es: Elasticsearch) -> bool: ) if response and response.get("acknowledged", False): - logger.info("%s repository registered successfully." % repo_type) + logger.info("%s repository registered successfully.", repo_type) status = True else: - logger.error("Failed to register %s repository." % repo_type) + logger.error("Failed to register %s repository.", repo_type) except Exception: logger.exception( - "Exception occurred while registering %s repository" % repo_type + "Exception occurred while registering %s repository", repo_type ) return status From 2b899e27092a215a55d6f1425fb28273042248c6 Mon Sep 17 00:00:00 2001 From: Xavier Frankline Odhiambo <49587182+thepsalmist@users.noreply.github.com> Date: Fri, 14 Jun 2024 15:34:26 +0300 Subject: [PATCH 14/19] Update indexer/scripts/elastic-conf.py Co-authored-by: Clemence Kyara --- indexer/scripts/elastic-conf.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/indexer/scripts/elastic-conf.py b/indexer/scripts/elastic-conf.py index 0c4b9860..b398eeb9 100644 --- a/indexer/scripts/elastic-conf.py +++ b/indexer/scripts/elastic-conf.py @@ -161,9 +161,9 @@ def register_repository(self, es: Elasticsearch) -> bool: logger.error("Failed to register %s repository.", repo_type) except Exception: logger.exception( - "Exception occurred while registering %s repository", repo_type + "Failed to register repository: %s", self.es_snapshot_repo ) - return status + return False def create_index_template(self, es: Elasticsearch) -> Any: json_data = self.load_index_template() From 01fcfab5ac38dbdcd3984fa9c6da1ba20646f2b1 Mon Sep 17 00:00:00 2001 From: Xavier Frankline Odhiambo <49587182+thepsalmist@users.noreply.github.com> Date: Fri, 14 Jun 2024 15:34:44 +0300 Subject: [PATCH 15/19] Update indexer/scripts/elastic-conf.py Co-authored-by: Clemence Kyara --- indexer/scripts/elastic-conf.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/indexer/scripts/elastic-conf.py b/indexer/scripts/elastic-conf.py index b398eeb9..ef0173c5 100644 --- a/indexer/scripts/elastic-conf.py +++ b/indexer/scripts/elastic-conf.py @@ -235,7 +235,7 @@ def create_slm_policy(self, es: Elasticsearch) -> Any: if not self.register_repository(es): logger.error( "Elasticsearch snapshot repository does not exist: %s", - self.es_snapshot_repo_type, + self.es_snapshot_repo, ) return False From 5ebc9ec1e4b5dc1e08418cc0d21a0acb1454e655 Mon Sep 17 00:00:00 2001 From: Xavier Frankline Odhiambo <49587182+thepsalmist@users.noreply.github.com> Date: Fri, 14 Jun 2024 15:35:04 +0300 Subject: [PATCH 16/19] Update indexer/scripts/elastic-conf.py Co-authored-by: Clemence Kyara --- indexer/scripts/elastic-conf.py | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/indexer/scripts/elastic-conf.py b/indexer/scripts/elastic-conf.py index ef0173c5..91365bdd 100644 --- a/indexer/scripts/elastic-conf.py +++ b/indexer/scripts/elastic-conf.py @@ -154,11 +154,14 @@ def register_repository(self, es: Elasticsearch) -> bool: settings=settings, ) - if response and response.get("acknowledged", False): - logger.info("%s repository registered successfully.", repo_type) - status = True + acknowledged = False + if response: + acknowledged = response.get("acknowledged", False) + if acknowledged: + logger.info("Successfully registered repository: %s", self.es_snapshot_repo) else: - logger.error("Failed to register %s repository.", repo_type) + logger.error("Failed to register repository: %s", self.es_snapshot_repo) + return acknowledged except Exception: logger.exception( "Failed to register repository: %s", self.es_snapshot_repo From a263a0fb15318f64aef9bd87853fb100b0c17f6c Mon Sep 17 00:00:00 2001 From: Xavier Frankline Odhiambo <49587182+thepsalmist@users.noreply.github.com> Date: Fri, 14 Jun 2024 15:35:20 +0300 Subject: [PATCH 17/19] Update indexer/scripts/elastic-conf.py Co-authored-by: Clemence Kyara --- indexer/scripts/elastic-conf.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/indexer/scripts/elastic-conf.py b/indexer/scripts/elastic-conf.py index 91365bdd..3307d117 100644 --- a/indexer/scripts/elastic-conf.py +++ b/indexer/scripts/elastic-conf.py @@ -126,11 +126,10 @@ def repository_exists(self, es: Elasticsearch) -> bool: return False def register_repository(self, es: Elasticsearch) -> bool: - status = False repo_type = self.es_snapshot_repo_type - if not repo_type: - logger.error("Repository type must be either 'fs' or 's3'") - return status + if repo_type not in ["fs", "s3"]: + logger.error("Unsupported repository type: '%s'. Must be either 'fs' or 's3'", repo_type) + return False if self.repository_exists(es): logger.info("%s repository already exists.", repo_type) From 68b1500f73a987facb1c30c78fda2788a693a71c Mon Sep 17 00:00:00 2001 From: Xavier Frankline Date: Fri, 14 Jun 2024 17:31:25 +0300 Subject: [PATCH 18/19] apply updates --- indexer/scripts/elastic-conf.py | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/indexer/scripts/elastic-conf.py b/indexer/scripts/elastic-conf.py index 3307d117..e1c148f6 100644 --- a/indexer/scripts/elastic-conf.py +++ b/indexer/scripts/elastic-conf.py @@ -128,7 +128,10 @@ def repository_exists(self, es: Elasticsearch) -> bool: def register_repository(self, es: Elasticsearch) -> bool: repo_type = self.es_snapshot_repo_type if repo_type not in ["fs", "s3"]: - logger.error("Unsupported repository type: '%s'. Must be either 'fs' or 's3'", repo_type) + logger.error( + "Unsupported repository type: '%s'. Must be either 'fs' or 's3'", + repo_type, + ) return False if self.repository_exists(es): @@ -144,7 +147,6 @@ def register_repository(self, es: Elasticsearch) -> bool: settings = {"location": self.es_snapshot_fs_location} else: logger.error("Unsupported repository type: %s", repo_type) - return status try: response = es.snapshot.create_repository( @@ -157,14 +159,14 @@ def register_repository(self, es: Elasticsearch) -> bool: if response: acknowledged = response.get("acknowledged", False) if acknowledged: - logger.info("Successfully registered repository: %s", self.es_snapshot_repo) + logger.info( + "Successfully registered repository: %s", self.es_snapshot_repo + ) else: logger.error("Failed to register repository: %s", self.es_snapshot_repo) return acknowledged except Exception: - logger.exception( - "Failed to register repository: %s", self.es_snapshot_repo - ) + logger.exception("Failed to register repository: %s", self.es_snapshot_repo) return False def create_index_template(self, es: Elasticsearch) -> Any: From 21fce04a6b2c5f98b65937d5de970227f04ea5c4 Mon Sep 17 00:00:00 2001 From: Xavier Frankline Date: Thu, 20 Jun 2024 15:45:13 +0300 Subject: [PATCH 19/19] cleanup --- indexer/scripts/elastic-conf.py | 20 +++++++++++++------- 1 file changed, 13 insertions(+), 7 deletions(-) diff --git a/indexer/scripts/elastic-conf.py b/indexer/scripts/elastic-conf.py index e1c148f6..77ea617f 100644 --- a/indexer/scripts/elastic-conf.py +++ b/indexer/scripts/elastic-conf.py @@ -139,14 +139,20 @@ def register_repository(self, es: Elasticsearch) -> bool: return True if repo_type == "s3": - settings = { - "bucket": self.es_snapshot_s3_bucket, - "endpoint": self.es_snapshot_s3_endpoint, - } - elif repo_type == "fs": + if not any([self.es_snapshot_s3_bucket, self.es_snapshot_s3_endpoint]): + logger.error( + "Failed to register s3 repository %s: bucket or endpoint required, none provided", + self.es_snapshot_repo, + ) + return False + + settings = {} + if self.es_snapshot_s3_bucket: + settings["bucket"] = self.es_snapshot_s3_bucket + if self.es_snapshot_s3_endpoint: + settings["endpoint"] = self.es_snapshot_s3_endpoint + else: # repo-type=fs settings = {"location": self.es_snapshot_fs_location} - else: - logger.error("Unsupported repository type: %s", repo_type) try: response = es.snapshot.create_repository(