Skip to content

Commit

Permalink
Merge branch 'dev' into mk-retriever-debug
Browse files Browse the repository at this point in the history
  • Loading branch information
Haigutus authored Sep 27, 2024
2 parents 5b8f9f7 + 7b94e79 commit 7cb8d61
Show file tree
Hide file tree
Showing 8 changed files with 170 additions and 75 deletions.
102 changes: 72 additions & 30 deletions config/cgm_worker/replacement_config.json
Original file line number Diff line number Diff line change
Expand Up @@ -273,12 +273,18 @@
"priority": [
{"1": "P0D"},
{"2": "P1D"},
{"3": "-P3D"},
{"4": "P2D"},
{"5": "-P4D"},
{"6": "-P5D"},
{"7": "-P1W"},
{"8": "-P2W"}
{"3": "-P7D"},
{"4": "-P6D"},
{"5": "-P5D"},
{"6": "-P4D"},
{"7": "-P3D"},
{"8": "-P14D"},
{"9": "-P13D"},
{"10": "-P12D"},
{"11": "-P11D"},
{"12": "-P10D"},
{"13": "-P2D"},
{"14": "-P1D"}
]
},
{
Expand All @@ -287,11 +293,17 @@
{"1": "P0D"},
{"2": "P1D"},
{"3": "-P1D"},
{"4": "-P4D"},
{"5": "-P5D"},
{"4": "-P7D"},
{"5": "-P8D"},
{"6": "-P6D"},
{"7": "-P1W"},
{"8": "-P2W"}
{"7": "-P5D"},
{"8": "-P4D"},
{"9": "-P14D"},
{"10": "-P13D"},
{"11": "-P12D"},
{"12": "-P11D"},
{"13": "-P3D"},
{"14": "-P2D"}
]
},
{
Expand All @@ -301,10 +313,16 @@
{"2": "P1D"},
{"3": "-P1D"},
{"4": "-P2D"},
{"5": "-P5D"},
{"5": "-P7D"},
{"6": "-P6D"},
{"7": "-P1W"},
{"8": "-P2W"}
{"7": "-P5D"},
{"8": "-P8D"},
{"9": "-P9D"},
{"10": "-P14D"},
{"11": "-P13D"},
{"12": "-P12D"},
{"13": "-P4D"},
{"14": "-P3D"}
]
},
{
Expand All @@ -315,9 +333,15 @@
{"3": "P1D"},
{"4": "-P2D"},
{"5": "-P3D"},
{"6": "-P6D"},
{"7": "-P1W"},
{"8": "-P2W"}
{"6": "-P7D"},
{"7": "-P6D"},
{"8": "-P8D"},
{"9": "-P9D"},
{"10": "-P10D"},
{"11": "-P14D"},
{"12": "-P5D"},
{"13": "-P6D"},
{"14": "-P13D"}
]
},
{
Expand All @@ -328,35 +352,53 @@
{"3": "-P2D"},
{"4": "-P3D"},
{"5": "-P4D"},
{"5": "-P5D"},
{"7": "-P1W"},
{"8": "-P2W"}
{"5": "-P7D"},
{"7": "-P8D"},
{"8": "-P9D"},
{"9": "-P10D"},
{"10": "-P11D"},
{"11": "-P14D"},
{"12": "-P6D"},
{"13": "-P13D"},
{"14": "-P5D"}
]
},
{
"day": 5,
"priority": [
{"1": "P0D"},
{"2": "-P1W"},
{"3": "-P2W"},
{"4": "P1D"},
{"5": "-P1D"},
{"6": "-P2D"},
{"7": "-P3D"},
{"8": "-P4D"}
{"2": "-P7D"},
{"3": "-P14D"},
{"4": "-P6D"},
{"5": "-P13D"},
{"6": "-P1D"},
{"7": "-P2D"},
{"8": "-P3D"},
{"9": "-P4D"},
{"10": "-P5D"},
{"11": "-P8D"},
{"12": "-P9D"},
{"13": "-P10D"},
{"14": "-P11D"}
]
},
{
"day": 6,
"priority": [
{"1": "P0D"},
{"2": "-P1W"},
{"3": "-P2W"},
{"2": "-P7D"},
{"3": "-P14D"},
{"4": "-P1D"},
{"5": "-P2D"},
{"5": "-P8D"},
{"6": "-P3D"},
{"7": "-P4D"},
{"8": "-P5D"}
{"8": "-P5D"},
{"9": "-P6D"},
{"10": "-P9D"},
{"11": "-P10D"},
{"12": "-P11D"},
{"13": "-P12D"},
{"14": "-P13D"}
]
}
]
Expand Down
3 changes: 2 additions & 1 deletion config/task_generator/task_generator.properties
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ TIMETRAVEL =
#TIMETRAVEL = 2024-05-24T23:05+0200
TASK_HEADER_KEYS = @id,job_id,run_id,process_id,@type,task_properties.merge_type,task_properties.time_horizon,

RUN_REPLACEMENT = True
RUN_REPLACEMENT_RMM = True
RUN_REPLACEMENT_CGM = False

# CGM EU MERGE
CGM_INCLUDED_TSO =
Expand Down
13 changes: 10 additions & 3 deletions emf/common/integrations/object_storage/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
logger = logging.getLogger(__name__)


def query_data(metadata_query: dict, query_filter: str = None, index: str = ELASTIC_QUERY_INDEX, return_payload: bool = False, size: str = '10000', sort: dict=None):
def query_data(metadata_query: dict, query_filter: str = None, index: str = ELASTIC_QUERY_INDEX, return_payload: bool = False, size: str = '10000', sort: dict=None, scroll: str='1m'):
"""
Queries Elasticsearch based on provided metadata queries.
Expand Down Expand Up @@ -54,8 +54,15 @@ def query_data(metadata_query: dict, query_filter: str = None, index: str = ELAS
query = {"bool": {"must": match_and_term_list}}

# Return query results
response = elastic_service.client.search(index=index, query=query, size=size, sort=sort)
content_list = [content["_source"] for content in response["hits"]["hits"]]
response = elastic_service.client.search(index=index, query=query, size=size, sort=sort, scroll=scroll)
scroll_id = response['_scroll_id']
hits = response["hits"]["hits"]
content_list = [content["_source"] for content in hits]
while len(hits) > 0:
response = elastic_service.client.scroll(scroll_id=scroll_id, scroll=scroll)
hits = response["hits"]["hits"]
if hits:
content_list.extend([content["_source"] for content in hits])

if return_payload:
for num, item in enumerate(content_list):
Expand Down
6 changes: 5 additions & 1 deletion emf/common/integrations/rabbit.py
Original file line number Diff line number Diff line change
Expand Up @@ -424,6 +424,7 @@ def on_message(self, _unused_channel, basic_deliver, properties, body):
except Exception as error:
logger.error(f"Message conversion failed: {error}", exc_info=True)
ack = False
self.stop()

if self.message_handlers:
# with ThreadPoolExecutor() as handler_executor:
Expand Down Expand Up @@ -471,7 +472,10 @@ def on_message(self, _unused_channel, basic_deliver, properties, body):
except Exception as error:
logger.error(f"Message handling failed: {error}", exc_info=True)
ack = False

converter_executor.shutdown(wait=False)
self.stop()
break

if ack:
self.acknowledge_message(basic_deliver.delivery_tag)

Expand Down
38 changes: 24 additions & 14 deletions emf/loadflow_tool/model_merger/handlers/cgm_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from emf.loadflow_tool.model_merger import merge_functions
from emf.task_generator.task_generator import update_task_status
from emf.common.logging.custom_logger import get_elk_logging_handler
from emf.loadflow_tool.replacement import run_replacement
from emf.loadflow_tool.replacement import run_replacement, get_available_tsos
import triplets

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -92,20 +92,24 @@ def handle(self, task_object: dict, **kwargs):
valid_models = [model for model in filtered_models if model['valid'] == 'True' or model['valid'] == True]
invalid_models = [model['pmd:TSO'] for model in filtered_models if model not in valid_models]
if invalid_models:
merge_log.get('exclusion_reason').extend([{'tso': tso, 'reason': 'Model is no valid'} for tso in invalid_models])
merge_log.get('exclusion_reason').extend([{'tso': tso, 'reason': 'Model is not valid'} for tso in invalid_models])

if included_models:
missing_models = [model for model in included_models if model not in [model['pmd:TSO'] for model in downloaded_models]]
missing_models = [model for model in included_models if model not in [model['pmd:TSO'] for model in valid_models]]
if missing_models:
merge_log.get('exclusion_reason').extend([{'tso': tso, 'reason': 'Model missing from OPDM'} for tso in missing_models])
else:
missing_models = []
if model_replacement == 'True':
available_tsos = get_available_tsos()
missing_models = [model for model in available_tsos if model not in [model['pmd:TSO'] for model in valid_models] + excluded_models]
else:
missing_models = []

# Run replacement on missing/invalid models
if (missing_models or invalid_models) and model_replacement == 'True':
if model_replacement == 'True' and missing_models:
try:
logger.info(f"Running replacement for missing models: {missing_models}")
replacement_models = run_replacement(missing_models + invalid_models, time_horizon, scenario_datetime)
logger.info(f"Running replacement for missing models")
replacement_models = run_replacement(missing_models, time_horizon, scenario_datetime)
if replacement_models:
logger.info(f"Replacement model(s) found: {[model['pmd:fileName'] for model in replacement_models]}")
merge_log.get('replaced_entity').extend([{'tso': model['pmd:TSO'],
Expand All @@ -118,8 +122,8 @@ def handle(self, task_object: dict, **kwargs):
logger.error(f"Failed to run replacement: {error}")

#Run Process only if you find some models to merge, otherwise return None
if not filtered_models:
logger.warning("Found no Models To Merge, Returning NONE")
if not valid_models:
logger.warning("Found no valid models to merge, Returning NONE")
return None
else:
# Load all selected models
Expand All @@ -132,6 +136,12 @@ def handle(self, task_object: dict, **kwargs):
assembeled_data = triplets.cgmes_tools.update_FullModel_from_filename(assembeled_data)
# assembeled_data = merge_functions.configure_paired_boundarypoint_injections(assembeled_data)
assembeled_data = merge_functions.configure_paired_boundarypoint_injections_by_nodes(assembeled_data)
escape_upper_xml = assembeled_data[assembeled_data['VALUE'].astype(str).str.contains('.XML')]
if not escape_upper_xml.empty:
escape_upper_xml['VALUE'] = escape_upper_xml['VALUE'].str.replace('.XML', '.xml')
assembeled_data = triplets.rdf_parser.update_triplet_from_triplet(assembeled_data, escape_upper_xml,
update=True,
add=False)

input_models = create_opdm_objects([merge_functions.export_to_cgmes_zip([assembeled_data])])
del assembeled_data
Expand Down Expand Up @@ -287,16 +297,16 @@ def handle(self, task_object: dict, **kwargs):
"job_period_start": "2024-05-24T22:00:00+00:00",
"job_period_end": "2024-05-25T06:00:00+00:00",
"task_properties": {
"timestamp_utc": "2024-09-09T11:30:00+00:00",
"timestamp_utc": "2024-09-13T13:30:00+00:00",
"merge_type": "EU",
"merging_entity": "BALTICRSC",
"included": ["AST"],
"excluded": [],
"time_horizon": "1D",
"included": [],
"excluded": ["APG", "FI"],
"time_horizon": "ID",
"version": "123",

"mas": "http://www.baltic-rsc.eu/OperationalPlanning",
"replacement": "False",
"replacement": "True",
}
}

Expand Down
62 changes: 40 additions & 22 deletions emf/loadflow_tool/replacement.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,32 +25,43 @@ def run_replacement(tso_list: list, time_horizon: str, scenario_date: str, conf=
Returns: from configuration a list of replaced models
"""
replacement_config = json.loads(Path(__file__).parent.parent.parent.joinpath(conf).read_text())
model_list = []
replacement_models = []
replacements = pd.DataFrame()
for tso in tso_list:
query = {"pmd:TSO": tso, "valid": True}
response = query_data(query, QUERY_FILTER)
model_list.extend(response)
model_df = pd.DataFrame(model_list)
# TODO time horizon exclusion logic + exclude available models from query
query = {"opde:Object-Type": "IGM", "pmd:TSO.keyword": tso_list, "valid": True}
body = query_data(query, QUERY_FILTER)
model_df = pd.DataFrame(body)

# Set scenario dat to UTC
scenario_date = parser.parse(scenario_date).strftime("%Y-%m-%dT%H:%M:%SZ")
replacement_df = create_replacement_table(scenario_date, time_horizon, model_df, replacement_config)

if not replacement_df.empty:
unique_tsos_list = replacement_df["pmd:TSO"].unique().tolist()
for unique_tso in unique_tsos_list:
sample_tso = replacement_df.loc[(replacement_df["pmd:TSO"] == unique_tso)]
sample_tso = sample_tso.loc[(sample_tso["priority_day"] == sample_tso["priority_day"].min())]
sample_tso = sample_tso.loc[(sample_tso["priority_business"] == sample_tso["priority_business"].min())]
sample_tso = sample_tso.loc[(sample_tso["priority_hour"] == sample_tso["priority_hour"].min())]
sample_tso_min = sample_tso.loc[(sample_tso["pmd:versionNumber"] == sample_tso["pmd:versionNumber"].max())]
replacements = pd.concat([replacements, sample_tso_min])

replacement_models = replacements.to_dict(orient='records') if not replacements.empty else None
for num, model in enumerate(replacement_models):
replacement_models[num] = get_content(model)
if not model_df.empty:
scenario_date = parser.parse(scenario_date).strftime("%Y-%m-%dT%H:%M:%SZ")
replacement_df = create_replacement_table(scenario_date, time_horizon, model_df, replacement_config)
if not replacement_df.empty:
unique_tsos_list = replacement_df["pmd:TSO"].unique().tolist()
for unique_tso in unique_tsos_list:
sample_tso = replacement_df.loc[(replacement_df["pmd:TSO"] == unique_tso)]
sample_tso = sample_tso.loc[(sample_tso["priority_day"] == sample_tso["priority_day"].min())]
sample_tso = sample_tso.loc[(sample_tso["priority_business"] == sample_tso["priority_business"].min())]
sample_tso = sample_tso.loc[(sample_tso["priority_hour"] == sample_tso["priority_hour"].min())]
sample_tso_min = sample_tso.loc[(sample_tso["pmd:versionNumber"] == sample_tso["pmd:versionNumber"].max())]
replacements = pd.concat([replacements, sample_tso_min])

replacement_models = replacements.to_dict(orient='records') if not replacements.empty else None
for num, model in enumerate(replacement_models):
replacement_models[num] = get_content(model)

replaced_tso = replacements['pmd:TSO'].unique().tolist()
not_replaced = [model for model in unique_tsos_list if model not in replaced_tso]
if not_replaced:
logger.error(f"Unable to find replacements within given replacement logic for TSO's: {not_replaced}")

tso_missing = [model for model in tso_list if model not in unique_tsos_list]
if tso_missing:
logger.info(f"No replacement models found for TSO(s): {tso_missing}")
else:
logger.error(f"No replacement models found, replacement list is empty")
else:
logger.info(f"No replacement models found in Elastic for TSO(s): {tso_list}")

return replacement_models

Expand Down Expand Up @@ -114,6 +125,13 @@ def create_replacement_table(target_timestamp, target_timehorizon, valid_models_
return valid_models_df


def get_available_tsos():
query = {"opde:Object-Type": "IGM", "valid": True}
body = query_data(query, QUERY_FILTER)
key = 'pmd:TSO'
return list({item[key] for item in body if key in item})


if __name__ == "__main__":

missing_tso = ['PSE']
Expand Down
Loading

0 comments on commit 7cb8d61

Please sign in to comment.