Skip to content

Commit

Permalink
Merge pull request mediacloud#295 from thepsalmist/fix/es-s3-repo
Browse files Browse the repository at this point in the history
Elasticsearch register s3 and fs repository
  • Loading branch information
thepsalmist authored Jun 20, 2024
2 parents 43be320 + 21fce04 commit 26b394c
Show file tree
Hide file tree
Showing 3 changed files with 98 additions and 0 deletions.
8 changes: 8 additions & 0 deletions docker/deploy.sh
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ ELASTICSEARCH_CONFIG_DIR=./conf/elasticsearch/templates
ELASTICSEARCH_IMAGE="docker.elastic.co/elasticsearch/elasticsearch:8.12.0"
ELASTICSEARCH_PORT_BASE=9200 # native port
ELASTICSEARCH_SNAPSHOT_CRONJOB_ENABLE=false
ELASTICSEARCH_SNAPSHOT_REPO_TYPE="fs"

FETCHER_CRONJOB_ENABLE=true # batch fetcher
FETCHER_NUM_BATCHES=20 # batch fetcher
Expand Down Expand Up @@ -328,6 +329,7 @@ prod)
ELASTICSEARCH_ILM_MAX_AGE="90d"
ELASTICSEARCH_ILM_MAX_SHARD_SIZE="50gb"
ELASTICSEARCH_HOSTS=http://ramos.angwin:9200,http://woodward.angwin:9200,http://bradley.angwin:9200
ELASTICSEARCH_SNAPSHOT_REPO_TYPE="s3"

# Disabled until tested in staging.
# Questions:
Expand All @@ -350,6 +352,7 @@ staging)
ELASTICSEARCH_SHARD_REPLICAS=1
ELASTICSEARCH_ILM_MAX_AGE="6h"
ELASTICSEARCH_ILM_MAX_SHARD_SIZE="5gb"
ELASTICSEARCH_SNAPSHOT_REPO_SETTINGS_LOCATION="mc_story_indexer"

STORY_LIMIT=50000

Expand Down Expand Up @@ -377,6 +380,7 @@ dev)
ELASTICSEARCH_SHARD_REPLICAS=1
ELASTICSEARCH_ILM_MAX_AGE="15m"
ELASTICSEARCH_ILM_MAX_SHARD_SIZE="100mb"
ELASTICSEARCH_SNAPSHOT_REPO_SETTINGS_LOCATION="mc_story_indexer"

STORY_LIMIT=5000

Expand Down Expand Up @@ -636,6 +640,10 @@ add ELASTICSEARCH_CONTAINERS int
add ELASTICSEARCH_HOSTS
add ELASTICSEARCH_SNAPSHOT_CRONJOB_ENABLE # NOT bool!
add ELASTICSEARCH_SNAPSHOT_REPO allow-empty
add ELASTICSEARCH_SNAPSHOT_REPO_TYPE
add ELASTICSEARCH_SNAPSHOT_REPO_SETTINGS_LOCATION
add ELASTICSEARCH_SNAPSHOT_REPO_SETTINGS_BUCKET #private
add ELASTICSEARCH_SNAPSHOT_REPO_SETTINGS_ENDPOINT
add ELASTICSEARCH_SHARD_COUNT int
add ELASTICSEARCH_SHARD_REPLICAS int
add ELASTICSEARCH_ILM_MAX_AGE
Expand Down
3 changes: 3 additions & 0 deletions docker/docker-compose.yml.j2
Original file line number Diff line number Diff line change
Expand Up @@ -435,6 +435,9 @@ services:
ELASTICSEARCH_SHARD_COUNT : {{elasticsearch_shard_count}}
ELASTICSEARCH_SHARD_REPLICAS: {{elasticsearch_shard_replicas}}
ELASTICSEARCH_SNAPSHOT_REPO: {{elasticsearch_snapshot_repo}}
ELASTICSEARCH_SNAPSHOT_REPO_SETTINGS_BUCKET: {{elasticsearch_snapshot_repo_settings_bucket}}
ELASTICSEARCH_SNAPSHOT_REPO_SETTINGS_LOCATION: {{elasticsearch_snapshot_repo_settings_location}}
ELASTICSEARCH_SNAPSHOT_REPO_TYPE: {{elasticsearch_snapshot_repo_type}}
RUN: elastic-conf

{% macro define_volume(prefix, suffix='') %}
Expand Down
87 changes: 87 additions & 0 deletions indexer/scripts/elastic-conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,18 @@


class ElasticConf(ElasticConfMixin, App):
def __init__(self, process_name: str, descr: str):
super().__init__(process_name, descr)
self.es_snapshot_s3_bucket = os.environ.get(
"ELASTICSEARCH_SNAPSHOT_REPO_SETTINGS_BUCKET"
)
self.es_snapshot_fs_location = os.environ.get(
"ELASTICSEARCH_SNAPSHOT_REPO_SETTINGS_LOCATION"
)
self.es_snapshot_s3_endpoint = os.environ.get(
"ELASTICSEARCH_SNAPSHOT_REPO_SETTINGS_ENDPOINT"
)

def define_options(self, ap: argparse.ArgumentParser) -> None:
super().define_options(ap)
# Index template args
Expand Down Expand Up @@ -54,6 +66,12 @@ def define_options(self, ap: argparse.ArgumentParser) -> None:
default=os.environ.get("ELASTICSEARCH_SNAPSHOT_REPO") or "",
help="ES snapshot repository name",
)
ap.add_argument(
"--es-snapshot-repo-type",
dest="es_snapshot_repo_type",
default=os.environ.get("ELASTICSEARCH_SNAPSHOT_REPO_TYPE") or "",
help="ES snapshots type, default fs",
)

def process_args(self) -> None:
super().process_args()
Expand All @@ -64,6 +82,7 @@ def process_args(self) -> None:
("ilm_max_age", "ELASTICSEARCH_ILM_MAX_AGE"),
("ilm_max_shard_size", "ELASTICSEARCH_ILM_MAX_SHARD_SIZE"),
("es_snapshot_repo", "ELASTICSEARCH_SNAPSHOT_REPO"),
("es_snapshot_repo_type", "ELASTICSEARCH_SNAPSHOT_REPO_TYPE"),
]
for arg_name, env_name in required_args:
arg_val = getattr(self.args, arg_name)
Expand All @@ -76,6 +95,7 @@ def process_args(self) -> None:
self.ilm_max_age = self.args.ilm_max_age
self.ilm_max_shard_size = self.args.ilm_max_shard_size
self.es_snapshot_repo = self.args.es_snapshot_repo
self.es_snapshot_repo_type = self.args.es_snapshot_repo_type

def main_loop(self) -> None:
es = self.elasticsearch_client()
Expand All @@ -96,6 +116,65 @@ def main_loop(self) -> None:
logger.error("One or more configurations failed. Check logs for details.")
return

def repository_exists(self, es: Elasticsearch) -> bool:
repo_name = self.es_snapshot_repo
try:
response = es.snapshot.get_repository(name=repo_name)
return repo_name in response
except Exception:
logger.exception("Error validating repository: %s", repo_name)
return False

def register_repository(self, es: Elasticsearch) -> bool:
repo_type = self.es_snapshot_repo_type
if repo_type not in ["fs", "s3"]:
logger.error(
"Unsupported repository type: '%s'. Must be either 'fs' or 's3'",
repo_type,
)
return False

if self.repository_exists(es):
logger.info("%s repository already exists.", repo_type)
return True

if repo_type == "s3":
if not any([self.es_snapshot_s3_bucket, self.es_snapshot_s3_endpoint]):
logger.error(
"Failed to register s3 repository %s: bucket or endpoint required, none provided",
self.es_snapshot_repo,
)
return False

settings = {}
if self.es_snapshot_s3_bucket:
settings["bucket"] = self.es_snapshot_s3_bucket
if self.es_snapshot_s3_endpoint:
settings["endpoint"] = self.es_snapshot_s3_endpoint
else: # repo-type=fs
settings = {"location": self.es_snapshot_fs_location}

try:
response = es.snapshot.create_repository(
name=self.es_snapshot_repo,
type=repo_type,
settings=settings,
)

acknowledged = False
if response:
acknowledged = response.get("acknowledged", False)
if acknowledged:
logger.info(
"Successfully registered repository: %s", self.es_snapshot_repo
)
else:
logger.error("Failed to register repository: %s", self.es_snapshot_repo)
return acknowledged
except Exception:
logger.exception("Failed to register repository: %s", self.es_snapshot_repo)
return False

def create_index_template(self, es: Elasticsearch) -> Any:
json_data = self.load_index_template()
if not json_data:
Expand Down Expand Up @@ -162,6 +241,14 @@ def create_initial_index(self, es: Elasticsearch) -> Any:
def create_slm_policy(self, es: Elasticsearch) -> Any:
CURRENT_POLICY_ID = "bi_weekly_slm"
repository = self.es_snapshot_repo

if not self.register_repository(es):
logger.error(
"Elasticsearch snapshot repository does not exist: %s",
self.es_snapshot_repo,
)
return False

json_data = self.load_slm_policy_template(CURRENT_POLICY_ID)
if not json_data:
logger.error("Elasticsearch create slm policy: error template not loaded")
Expand Down

0 comments on commit 26b394c

Please sign in to comment.