Skip to content

Commit

Permalink
feat(opensearch): fetch logs from opensearch (#602)
Browse files Browse the repository at this point in the history
  • Loading branch information
jlemesh committed Oct 9, 2024
1 parent e36c6a2 commit 4415b61
Show file tree
Hide file tree
Showing 9 changed files with 584 additions and 39 deletions.
1 change: 1 addition & 0 deletions AUTHORS.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ The list of contributors in alphabetical order:
- [Giuseppe Steduto](https://orcid.org/0009-0002-1258-8553)
- [Harri Hirvonsalo](https://orcid.org/0000-0002-5503-510X)
- [Jan Okraska](https://orcid.org/0000-0002-1416-3244)
- [Jelizaveta Lemeševa](https://orcid.org/0009-0003-6606-9270)
- [Leticia Wanderley](https://orcid.org/0000-0003-4649-6630)
- [Lukas Heinrich](https://orcid.org/0000-0002-4048-7584)
- [Marco Donadoni](https://orcid.org/0000-0003-2922-5505)
Expand Down
30 changes: 30 additions & 0 deletions reana_workflow_controller/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,36 @@ def _env_vars_dict_to_k8s_list(env_vars):
)
"""Common to all workflow engines environment variables for debug mode."""

REANA_OPENSEARCH_ENABLED = (
os.getenv("REANA_OPENSEARCH_ENABLED", "false").lower() == "true"
)
"""OpenSearch enabled flag."""

REANA_OPENSEARCH_HOST = os.getenv(
"REANA_OPENSEARCH_HOST", "opensearch-cluster-master.default.svc.cluster.local"
)
"""OpenSearch host."""

REANA_OPENSEARCH_PORT = os.getenv("REANA_OPENSEARCH_PORT", "9200")
"""OpenSearch port."""

REANA_OPENSEARCH_URL_PREFIX = os.getenv("REANA_OPENSEARCH_URL_PREFIX", "")
"""OpenSearch URL prefix."""

REANA_OPENSEARCH_USER = os.getenv("REANA_OPENSEARCH_USER", "admin")
"""OpenSearch user."""

REANA_OPENSEARCH_PASSWORD = os.getenv("REANA_OPENSEARCH_PASSWORD", "admin")
"""OpenSearch password."""

REANA_OPENSEARCH_USE_SSL = (
os.getenv("REANA_OPENSEARCH_USE_SSL", "false").lower() == "true"
)
"""OpenSearch SSL flag."""

REANA_OPENSEARCH_CA_CERTS = os.getenv("REANA_OPENSEARCH_CA_CERTS")
"""OpenSearch CA certificates."""


def _parse_interactive_sessions_environments(env_var):
config = {}
Expand Down
176 changes: 176 additions & 0 deletions reana_workflow_controller/opensearch.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
# -*- coding: utf-8 -*-
#
# This file is part of REANA.
# Copyright (C) 2024 CERN.
#
# REANA is free software; you can redistribute it and/or modify it
# under the terms of the MIT License; see LICENSE file for more details.

"""OpenSearch client and log fetcher."""

import logging
from opensearchpy import OpenSearch

from reana_workflow_controller.config import (
REANA_OPENSEARCH_CA_CERTS,
REANA_OPENSEARCH_HOST,
REANA_OPENSEARCH_PASSWORD,
REANA_OPENSEARCH_PORT,
REANA_OPENSEARCH_URL_PREFIX,
REANA_OPENSEARCH_USE_SSL,
REANA_OPENSEARCH_USER,
REANA_OPENSEARCH_ENABLED,
)


def build_opensearch_client(
host: str = REANA_OPENSEARCH_HOST,
port: str = REANA_OPENSEARCH_PORT,
url_prefix: str = REANA_OPENSEARCH_URL_PREFIX,
http_auth: tuple | None = (REANA_OPENSEARCH_USER, REANA_OPENSEARCH_PASSWORD),
use_ssl: bool = REANA_OPENSEARCH_USE_SSL,
ca_certs: str | None = REANA_OPENSEARCH_CA_CERTS,
) -> OpenSearch:
"""
Build an OpenSearch client object.
:param host: OpenSearch host.
:param port: OpenSearch port.
:param url_prefix: URL prefix.
:param http_auth: HTTP authentication credentials.
:param use_ssl: Use SSL/TLS for connection.
:param ca_certs: Path to CA certificates.
:return: OpenSearch client object.
"""
opensearch_client = OpenSearch(
hosts=f"{host}:{port}",
http_compress=True, # enables gzip compression for request bodies
http_auth=http_auth,
use_ssl=use_ssl,
ca_certs=ca_certs,
url_prefix=url_prefix,
verify_certs=True,
)
return opensearch_client


class OpenSearchLogFetcher(object):
"""Retrieves job and workflow logs from OpenSearch API."""

def __init__(
self,
os_client: OpenSearch | None = None,
job_index: str = "fluentbit-job_log",
workflow_index: str = "fluentbit-workflow_log",
max_rows: int = 5000,
log_key: str = "log",
order: str = "asc",
job_log_matcher: str = "kubernetes.labels.job-name.keyword",
workflow_log_matcher: str = "kubernetes.labels.reana-run-batch-workflow-uuid.keyword",
timeout: int = 5,
) -> None:
"""
Initialize the OpenSearchLogFetcher object.
:param os_client: OpenSearch client object.
:param job_index: Index name for job logs.
:param workflow_index: Index name for workflow logs.
:param max_rows: Maximum number of rows to fetch.
:param log_key: Key for log message in the response.
:param order: Order of logs (asc/desc).
:param job_log_matcher: Job log matcher.
:param workflow_log_matcher: Workflow log matcher.
:param timeout: Timeout for OpenSearch queries.
:return: None
"""
if os_client is None:
os_client = build_opensearch_client()

self.os_client = os_client
self.job_index = job_index
self.workflow_index = workflow_index
self.max_rows = max_rows
self.log_key = log_key
self.order = order
self.job_log_matcher = job_log_matcher
self.workflow_log_matcher = workflow_log_matcher
self.timeout = timeout

def fetch_logs(self, id: str, index: str, match: str) -> str | None:
"""
Fetch logs of a specific job or workflow.
:param id: Job or workflow ID.
:param index: Index name for logs.
:param match: Matcher for logs.
:return: Job or workflow logs.
"""
query = {
"query": {"match": {match: id}},
"sort": [{"@timestamp": {"order": self.order}}],
}

try:
response = self.os_client.search(
index=index, body=query, size=self.max_rows, timeout=self.timeout
)
except Exception as e:
logging.error("Failed to fetch logs for {0}: {1}".format(id, e))
return None

return self._concat_rows(response["hits"]["hits"])

def fetch_job_logs(self, backend_job_id: str) -> str:
"""
Fetch logs of a specific job.
:param backend_job_id: Job ID.
:return: Job logs.
"""
return self.fetch_logs(
backend_job_id,
self.job_index,
self.job_log_matcher,
)

def fetch_workflow_logs(self, workflow_id: str) -> str | None:
"""
Fetch logs of a specific workflow.
:param workflow_id: Workflow ID.
:return: Workflow logs.
"""
return self.fetch_logs(
workflow_id,
self.workflow_index,
self.workflow_log_matcher,
)

def _concat_rows(self, rows: list) -> str | None:
"""
Concatenate log messages from rows.
:param rows: List of rows.
:return: Concatenated log messages.
"""
logs = ""

for hit in rows:
logs += hit["_source"][self.log_key] + "\n"

return logs


def build_opensearch_log_fetcher() -> OpenSearchLogFetcher | None:
"""
Build OpenSearchLogFetcher object.
:return: OpenSearchLogFetcher object.
"""
return OpenSearchLogFetcher() if REANA_OPENSEARCH_ENABLED else None
15 changes: 13 additions & 2 deletions reana_workflow_controller/rest/utils.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# -*- coding: utf-8 -*-
#
# This file is part of REANA.
# Copyright (C) 2020, 2021, 2022, 2023 CERN.
# Copyright (C) 2020, 2021, 2022, 2023, 2024 CERN.
#
# REANA is free software; you can redistribute it and/or modify it
# under the terms of the MIT License; see LICENSE file for more details.
Expand Down Expand Up @@ -166,6 +166,8 @@ def is_uuid_v4(uuid_or_name: str) -> bool:

def build_workflow_logs(workflow, steps=None, paginate=None):
"""Return the logs for all jobs of a workflow."""
from reana_workflow_controller.opensearch import build_opensearch_log_fetcher

query = Session.query(Job).filter_by(workflow_uuid=workflow.id_)
if steps:
query = query.filter(Job.job_name.in_(steps))
Expand All @@ -179,6 +181,15 @@ def build_workflow_logs(workflow, steps=None, paginate=None):
finished_at = (
job.finished_at.strftime(WORKFLOW_TIME_FORMAT) if job.finished_at else None
)

open_search_log_fetcher = build_opensearch_log_fetcher()

logs = (
open_search_log_fetcher.fetch_job_logs(job.backend_job_id)
if open_search_log_fetcher
else None
)

item = {
"workflow_uuid": str(job.workflow_uuid) or "",
"job_name": job.job_name or "",
Expand All @@ -187,7 +198,7 @@ def build_workflow_logs(workflow, steps=None, paginate=None):
"docker_img": job.docker_img or "",
"cmd": job.prettified_cmd or "",
"status": job.status.name or "",
"logs": job.logs or "",
"logs": logs or job.logs or "",
"started_at": started_at,
"finished_at": finished_at,
}
Expand Down
16 changes: 14 additions & 2 deletions reana_workflow_controller/rest/workflows_status.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# -*- coding: utf-8 -*-
#
# This file is part of REANA.
# Copyright (C) 2020, 2021, 2022 CERN.
# Copyright (C) 2020, 2021, 2022, 2024 CERN.
#
# REANA is free software; you can redistribute it and/or modify it
# under the terms of the MIT License; see LICENSE file for more details.
Expand Down Expand Up @@ -150,8 +150,20 @@ def get_workflow_logs(workflow_id_or_name, paginate=None, **kwargs): # noqa
"engine_specific": None,
}
else:
from reana_workflow_controller.opensearch import (
build_opensearch_log_fetcher,
)

open_search_log_fetcher = build_opensearch_log_fetcher()

logs = (
open_search_log_fetcher.fetch_workflow_logs(workflow.id_)
if open_search_log_fetcher
else None
)

workflow_logs = {
"workflow_logs": workflow.logs,
"workflow_logs": logs or workflow.logs,
"job_logs": build_workflow_logs(workflow, paginate=paginate),
"engine_specific": workflow.engine_specific,
}
Expand Down
11 changes: 6 additions & 5 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -13,19 +13,19 @@ bracex==2.4 # via wcmatch
bravado==10.3.2 # via reana-commons
bravado-core==6.1.0 # via bravado, reana-commons
cachetools==5.4.0 # via google-auth
certifi==2024.7.4 # via kubernetes, requests
certifi==2024.7.4 # via kubernetes, opensearch-py, requests
cffi==1.16.0 # via cryptography
charset-normalizer==3.3.2 # via requests
checksumdir==1.1.9 # via reana-commons
click==8.1.7 # via flask, reana-commons
cryptography==43.0.0 # via sqlalchemy-utils
events==0.5 # via opensearch-py
flask==2.2.5 # via reana-workflow-controller (setup.py)
fqdn==1.5.1 # via jsonschema
fs==2.4.16 # via reana-commons
gitdb==4.0.11 # via gitpython
gitpython==3.1.43 # via reana-workflow-controller (setup.py)
google-auth==2.32.0 # via kubernetes
greenlet==3.0.3 # via sqlalchemy
idna==3.7 # via jsonschema, requests
importlib-resources==6.4.0 # via swagger-spec-validator
isoduration==20.11.0 # via jsonschema
Expand All @@ -46,18 +46,19 @@ monotonic==1.6 # via bravado
msgpack==1.0.8 # via bravado-core
msgpack-python==0.5.6 # via bravado
oauthlib==3.2.2 # via requests-oauthlib
opensearch-py==2.7.1 # via reana-workflow-controller (setup.py)
packaging==24.1 # via reana-workflow-controller (setup.py)
psycopg2-binary==2.9.9 # via reana-db
pyasn1==0.6.0 # via pyasn1-modules, rsa
pyasn1-modules==0.4.0 # via google-auth
pycparser==2.22 # via cffi
python-dateutil==2.9.0.post0 # via arrow, bravado, bravado-core, kubernetes
python-dateutil==2.9.0.post0 # via arrow, bravado, bravado-core, kubernetes, opensearch-py
pytz==2024.1 # via bravado-core
pyyaml==6.0.1 # via bravado, bravado-core, kubernetes, reana-commons, swagger-spec-validator
reana-commons[kubernetes]==0.95.0a3 # via reana-db, reana-workflow-controller (setup.py)
reana-db==0.95.0a4 # via reana-workflow-controller (setup.py)
referencing==0.35.1 # via jsonschema, jsonschema-specifications
requests==2.32.3 # via bravado, bravado-core, kubernetes, reana-workflow-controller (setup.py), requests-oauthlib
requests==2.32.3 # via bravado, bravado-core, kubernetes, opensearch-py, reana-workflow-controller (setup.py), requests-oauthlib
requests-oauthlib==2.0.0 # via kubernetes
rfc3339-validator==0.1.4 # via jsonschema
rfc3987==1.3.8 # via jsonschema
Expand All @@ -72,7 +73,7 @@ swagger-spec-validator==3.0.4 # via bravado-core
types-python-dateutil==2.9.0.20240316 # via arrow
typing-extensions==4.12.2 # via alembic, bravado, swagger-spec-validator
uri-template==1.3.0 # via jsonschema
urllib3==2.2.2 # via kubernetes, requests
urllib3==2.2.2 # via kubernetes, opensearch-py, requests
uwsgi==2.0.26 # via reana-workflow-controller (setup.py)
uwsgi-tools==1.1.1 # via reana-workflow-controller (setup.py)
uwsgitop==0.12 # via reana-workflow-controller (setup.py)
Expand Down
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
"gitpython>=2.1",
"jsonpickle>=0.9.6",
"marshmallow>2.13.0,<3.0.0", # same upper pin as reana-server
"opensearch-py>=2.7.0,<2.8.0",
"packaging>=18.0",
"reana-commons[kubernetes] @ git+https://github.com/reanahub/[email protected]",
"reana-db>=0.95.0a4,<0.96.0",
Expand Down
Loading

0 comments on commit 4415b61

Please sign in to comment.