Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

deployment fixes from mk-retriever-debug #157

Merged
merged 15 commits into from
Sep 27, 2024
Merged
4 changes: 3 additions & 1 deletion .github/workflows/docker-publish.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@ name: Build and Publish Docker Image
on:
push:
branches:
- main
- dev
pull_request:
branches:
- main
- dev

jobs:
Expand All @@ -20,7 +22,7 @@ jobs:
- name: Set Docker image tag and other variables
run: |
echo "WORKER_NAME=emfos-model-retriever" >> $GITHUB_ENV
echo "BRANCH_NAME=${GITHUB_REF#refs/heads/}" >> $GITHUB_ENV
echo "BRANCH_NAME=$GITHUB_REF_NAME" >> $GITHUB_ENV
echo "IMAGE_TAG=$(date +%Y%m%d%H%M%S)" >> $GITHUB_ENV
echo "LATEST_TAG=latest" >> $GITHUB_ENV

Expand Down
81 changes: 50 additions & 31 deletions emf/common/integrations/minio_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,12 @@
import re
import logging
import config
import functools
from typing import List
from io import BytesIO
from zipfile import ZipFile
from datetime import datetime
from aniso8601 import parse_duration, parse_datetime
from aniso8601 import parse_datetime
from emf.common.config_parser import parse_app_properties
from emf.loadflow_tool.helper import metadata_from_filename
urllib3.disable_warnings()
Expand All @@ -21,9 +22,20 @@
parse_app_properties(globals(), config.paths.integrations.minio)


def renew_authentication_token(func):
@functools.wraps(func)
def wrapper(self, *args, **kwargs):
if datetime.utcnow() >= self.token_expiration:
logger.warning("Authentication token expired, renewing token")
self._create_client()
return func(self, *args, **kwargs)

return wrapper


class ObjectStorage:

def __init__(self, server=MINIO_SERVER, username=MINIO_USERNAME, password=MINIO_PASSWORD):
def __init__(self, server: str = MINIO_SERVER, username: str = MINIO_USERNAME, password: str = MINIO_PASSWORD):
self.server = server
self.username = username
self.password = password
Expand All @@ -35,29 +47,27 @@ def __init__(self, server=MINIO_SERVER, username=MINIO_USERNAME, password=MINIO_
#ca_certs='/usr/local/share/ca-certificates/CA-Bundle.crt'
)

# Init client
self.__create_client()

def __create_client(self):

if self.token_expiration < (datetime.utcnow() + parse_duration("PT1M")):
credentials = self.__get_credentials()

self.token_expiration = parse_datetime(credentials['Expiration']).replace(tzinfo=None)
self.client = minio.Minio(endpoint=self.server,
access_key=credentials['AccessKeyId'],
secret_key=credentials['SecretAccessKey'],
session_token=credentials['SessionToken'],
secure=True,
http_client=self.http_client,
)

def __get_credentials(self, action="AssumeRoleWithLDAPIdentity", version="2011-06-15"):
# Initialize client
self._create_client()

def _create_client(self):
"""Connect to Minio"""
credentials = self._get_credentials()
self.token_expiration = parse_datetime(credentials['Expiration']).replace(tzinfo=None)
self.client = minio.Minio(endpoint=self.server,
access_key=credentials['AccessKeyId'],
secret_key=credentials['SecretAccessKey'],
session_token=credentials['SessionToken'],
secure=True,
http_client=self.http_client,
)

def _get_credentials(self, action: str = "AssumeRoleWithLDAPIdentity", version: str = "2011-06-15"):
"""
Method to get temporary credentials for LDAP user
:param action: string of action
:param version: version
:return:
:return: dictionary with authentication details
"""
# Define LDAP service user parameters
params = {
Expand All @@ -79,7 +89,8 @@ def __get_credentials(self, action="AssumeRoleWithLDAPIdentity", version="2011-0

return credentials

def upload_object(self, file_path_or_file_object, bucket_name, metadata=None):
@renew_authentication_token
def upload_object(self, file_path_or_file_object: str | BytesIO, bucket_name: str, metadata: dict | None = None):
"""
Method to upload file to Minio storage
:param file_path_or_file_object: file path or BytesIO object
Expand Down Expand Up @@ -111,7 +122,8 @@ def upload_object(self, file_path_or_file_object, bucket_name, metadata=None):

return response

def download_object(self, bucket_name, object_name):
@renew_authentication_token
def download_object(self, bucket_name: str, object_name: str):
try:
object_name = object_name.replace("//", "/")
file_data = self.client.get_object(bucket_name, object_name)
Expand All @@ -120,11 +132,9 @@ def download_object(self, bucket_name, object_name):
except minio.error.S3Error as err:
logger.error(err)

def object_exists(self, object_name, bucket_name):

# TODO - add description
# TODO - add logging

@renew_authentication_token
def object_exists(self, object_name: str, bucket_name: str) -> bool:
"""Check whether object exists in specified bucket by its object name"""
exists = False
try:
self.client.stat_object(bucket_name, object_name)
Expand All @@ -134,16 +144,23 @@ def object_exists(self, object_name, bucket_name):

return exists

def list_objects(self, bucket_name, prefix=None, recursive=False, start_after=None, include_user_meta=True, include_version=False):

@renew_authentication_token
def list_objects(self,
bucket_name: str,
prefix: str | None = None,
recursive: bool = False,
start_after: str | None = None,
include_user_meta: bool = True,
include_version: bool = False):
"""Return all object of specified bucket"""
try:
objects = self.client.list_objects(bucket_name, prefix, recursive, start_after, include_user_meta, include_version)
return objects
except minio.error.S3Error as err:
logger.error(err, exc_info=True)

@renew_authentication_token
def query_objects(self, bucket_name: str, metadata: dict = None, prefix: str = None, use_regex: bool = False):

"""Example: service.query_objects(prefix="IGM", metadata={'bamessageid': '20230215T1630Z-1D-LITGRID-001'})"""

objects = self.client.list_objects(bucket_name, prefix, recursive=True, include_user_meta=True)
Expand All @@ -169,6 +186,7 @@ def query_objects(self, bucket_name: str, metadata: dict = None, prefix: str = N

return result_list

@renew_authentication_token
def get_latest_models_and_download(self,
time_horizon: str,
scenario_datetime: str,
Expand Down Expand Up @@ -272,6 +290,7 @@ def get_latest_models_and_download(self,
## Start service
service = ObjectStorage()
buckets = service.client.list_buckets()
objects = service.list_objects(bucket_name='iop')
print(buckets)

models = service.get_latest_models_and_download(time_horizon='ID',
Expand Down
135 changes: 68 additions & 67 deletions emf/common/integrations/rabbit.py
Original file line number Diff line number Diff line change
Expand Up @@ -401,80 +401,81 @@ def on_message(self, _unused_channel, basic_deliver, properties, body):

# Convert if needed
if self.message_converter:
with ThreadPoolExecutor() as converter_executor:
converter_task = converter_executor.submit(self.message_converter.convert, body)
# with ThreadPoolExecutor() as converter_executor:
# converter_task = converter_executor.submit(self.message_converter.convert, body)
#
# while not converter_task.done():
# logger.info("Waiting for converter")
# #self._connection.process_data_events(time_limit=1)
# self._connection._heartbeat_checker._send_heartbeat()
# time.sleep(10)
#
# try:
# body, content_type = converter_task.result()
#
# except Exception as error:
# logger.error(f"Message conversion failed: {error}", exc_info=True)
# ack = False

while not converter_task.done():
logger.info("Waiting for converter")
#self._connection.process_data_events(time_limit=1)
self._connection._heartbeat_checker._send_heartbeat()
time.sleep(10)
try:
body, content_type = self.message_converter.convert(body)
properties.content_type = content_type
logger.info(f"Message converted")
except Exception as error:
logger.error(f"Message conversion failed: {error}", exc_info=True)
ack = False
self.stop()

if self.message_handlers:
# with ThreadPoolExecutor() as handler_executor:
#
# for message_handler in self.message_handlers:
# logger.info(f"Handling message with handler: {message_handler.__class__.__name__}")
# handler_task = handler_executor.submit(message_handler.handle, body, properties=properties)
#
# while not handler_task.done():
#
# try:
# # TODO - set to debug when rabbit issue solved
# logger.info("Waiting for handler")
# #logger.info(self._connection.ioloop.is_running())
# #logger.info(asyncio.current_task(self._connection.ioloop))
# #logger.info(asyncio.all_tasks(self._connection.ioloop))
# #logger.info(self._connection.ioloop._scheduled)
# #loop_time = self._connection.ioloop.time()
# #logger.info([task.when() - loop_time for task in self._connection.ioloop._scheduled])
# #logger.info(self._connection.ioloop.time())
# self._connection._heartbeat_checker._send_heartbeat()
# #self._connection.ioloop.poll()
# #self._connection.process_data_events(time_limit=1)
# #self._connection._heartbeat_checker.send_heartbeat()
# time.sleep(10)
#
# except Exception as error:
# logger.info(error)
#
#
# try:
# body = handler_task.result()
#
# except Exception as error:
# logger.error(f"Message handling failed: {error}", exc_info=True)
# ack = False
# # In case of failure, stop message processing and close the thread
# handler_executor.shutdown(wait=False)
# break

for message_handler in self.message_handlers:
try:
body, content_type = converter_task.result()

logger.info(f"Handling message with handler: {message_handler.__class__.__name__}")
body = message_handler.handle(body, properties=properties)
except Exception as error:
logger.error(f"Message conversion failed: {error}", exc_info=True)
logger.error(f"Message handling failed: {error}", exc_info=True)
ack = False
converter_executor.shutdown(wait=False)
self.stop()

# try:
# body, content_type = self.message_converter.convert(body)
# properties.content_type = content_type
# logger.info(f"Message converted")
# except Exception as error:
# logger.error(f"Message conversion failed: {error}", exc_info=True)
# # ack = False

if self.message_handlers:
with ThreadPoolExecutor() as handler_executor:

for message_handler in self.message_handlers:
logger.info(f"Handling message with handler: {message_handler.__class__.__name__}")
handler_task = handler_executor.submit(message_handler.handle, body, properties=properties)

while not handler_task.done():

try:
# TODO - set to debug when rabbit issue solved
logger.info("Waiting for handler")
#logger.info(self._connection.ioloop.is_running())
#logger.info(asyncio.current_task(self._connection.ioloop))
#logger.info(asyncio.all_tasks(self._connection.ioloop))
#logger.info(self._connection.ioloop._scheduled)
#loop_time = self._connection.ioloop.time()
#logger.info([task.when() - loop_time for task in self._connection.ioloop._scheduled])
#logger.info(self._connection.ioloop.time())
self._connection._heartbeat_checker._send_heartbeat()
#self._connection.ioloop.poll()
#self._connection.process_data_events(time_limit=1)
#self._connection._heartbeat_checker.send_heartbeat()
time.sleep(10)

except Exception as error:
logger.info(error)


try:
body = handler_task.result()

except Exception as error:
logger.error(f"Message handling failed: {error}", exc_info=True)
ack = False
# In case of failure, stop message processing and close the thread
handler_executor.shutdown(wait=False)
self.stop()
break

# for message_handler in self.message_handlers:
# try:
# logger.info(f"Handling message with handler: {message_handler.__class__.__name__}")
# body = message_handler.handle(body, properties=properties)
# except Exception as error:
# logger.error(f"Message handling failed: {error}", exc_info=True)
# # ack = False

break

if ack:
self.acknowledge_message(basic_deliver.delivery_tag)

Expand Down
11 changes: 5 additions & 6 deletions emf/loadflow_tool/model_validator/validator.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,9 @@ def validate_model(opdm_objects, loadflow_parameters=getattr(loadflow_settings,
model_data = load_model(opdm_objects=opdm_objects)
network = model_data["network"]

# Run all validations except SHUNTS, that does not work on pypowsybl 0.24.0
# Run all validations
if run_element_validations:
validations = list(
set(attr_to_dict(pypowsybl._pypowsybl.ValidationType).keys()) - set(["ALL", "name", "value", "SHUNTS"]))
validations = list(set(attr_to_dict(pypowsybl._pypowsybl.ValidationType).keys()) - set(["ALL", "name", "value"]))

model_data["validations"] = {}

Expand All @@ -38,9 +37,10 @@ def validate_model(opdm_objects, loadflow_parameters=getattr(loadflow_settings,
logger.info(f"Running validation: {validation_type}")
try:
# TODO figure out how to store full validation results if needed. Currently only status is taken
model_data["validations"][validation] = pypowsybl.loadflow.run_validation(network=network, validation_types=[validation_type])._valid.__bool__()
model_data["validations"][validation] = pypowsybl.loadflow.run_validation(network=network,
validation_types=[validation_type])._valid.__bool__()
except Exception as error:
logger.error(f"Failed {validation_type} validation with error: {error}")
logger.warning(f"Failed {validation_type} validation with error: {error}")
continue

# Validate if loadflow can be run
Expand All @@ -50,7 +50,6 @@ def validate_model(opdm_objects, loadflow_parameters=getattr(loadflow_settings,
parameters=loadflow_parameters,
reporter=loadflow_report)


# Parsing loadflow results
# TODO move sanitization to Elastic integration
loadflow_result_dict = {}
Expand Down
8 changes: 6 additions & 2 deletions emf/model_retriever/model_retriever.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,8 +126,12 @@ def handle(self, opdm_objects: List[dict], **kwargs):

# Run network model validation
for opdm_object in opdm_objects:
response = validate_model(opdm_objects=[opdm_object, latest_boundary])
opdm_object["valid"] = response["valid"] # taking only relevant data from validation step
try:
response = validate_model(opdm_objects=[opdm_object, latest_boundary])
opdm_object["valid"] = response["valid"] # taking only relevant data from validation step
except Exception as error:
logger.error(f"Models validator failed with exception: {error}", exc_info=True)
opdm_object["valid"] = False

return opdm_objects

Expand Down
Loading
Loading