Skip to content

Commit

Permalink
triplets optimization
Browse files Browse the repository at this point in the history
  • Loading branch information
gpoderys authored and VeikoAunapuu committed Oct 14, 2024
1 parent 2f22daf commit dc54b05
Show file tree
Hide file tree
Showing 7 changed files with 142 additions and 52 deletions.
2 changes: 2 additions & 0 deletions config/task_generator/manual_task_generator.properties
Original file line number Diff line number Diff line change
Expand Up @@ -24,3 +24,5 @@ RUN_SCALING = False
UPLOAD_TO_OPDM = False
UPLOAD_TO_MINIO = True
SEND_MERGE_REPORT = True
PRE_TEMP_FIXES = True
POST_TEMP_FIXES = True
4 changes: 3 additions & 1 deletion config/task_generator/task_generator.properties
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,6 @@ UPLOAD_TO_OPDM_CGM = True
UPLOAD_TO_MINIO_RMM = True
UPLOAD_TO_MINIO_CGM = True
SEND_MERGE_REPORT_RMM = True
SEND_MERGE_REPORT_CGM = True
SEND_MERGE_REPORT_CGM = True
PRE_TEMP_FIXES = True
POST_TEMP_FIXES = True
48 changes: 41 additions & 7 deletions emf/loadflow_tool/model_merger/merge_functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ def update_FullModel_from_OpdmObject(data, opdm_object):
})


def create_sv_and_updated_ssh(merged_model, original_models, scenario_date, time_horizon, version, merging_area, merging_entity, mas):
def create_sv_and_updated_ssh(merged_model, original_models, models_as_triplets, scenario_date, time_horizon, version, merging_area, merging_entity, mas):

### SV ###
# Set Metadata
Expand Down Expand Up @@ -139,8 +139,8 @@ def create_sv_and_updated_ssh(merged_model, original_models, scenario_date, time
ssh_data.set_VALUE_at_KEY('Model.scenarioTime', opdm_object_meta['pmd:scenarioDate'])

# Load full original data to fix issues
data = load_opdm_data(original_models)
terminals = data.type_tableview("Terminal")
# data = load_opdm_data(original_models)
# terminals = data.type_tableview("Terminal")

# Update SSH data from SV
ssh_update_map = [
Expand Down Expand Up @@ -182,11 +182,11 @@ def create_sv_and_updated_ssh(merged_model, original_models, scenario_date, time
}
]
# Load terminal from original data
terminals = load_opdm_data(original_models).type_tableview("Terminal")
terminals = models_as_triplets.type_tableview("Terminal")

# Update
for update in ssh_update_map:
logger.info(f"Updating: {update['from_attribute']} -> {update['to_attribute']}")
# logger.info(f"Updating: {update['from_attribute']} -> {update['to_attribute']}")
source_data = sv_data.type_tableview(update['from_class']).reset_index(drop=True)

# Merge with terminal, if needed
Expand Down Expand Up @@ -227,10 +227,10 @@ def create_sv_and_updated_ssh(merged_model, original_models, scenario_date, time
return sv_data, ssh_data


def fix_sv_shunts(sv_data, original_data):
def fix_sv_shunts(sv_data, models_as_triplets):
"""Remove Shunt Sections for EQV Shunts"""

equiv_shunt = load_opdm_data(original_data, "EQ").query("KEY == 'Type' and VALUE == 'EquivalentShunt'")
equiv_shunt = models_as_triplets.query("KEY == 'Type' and VALUE == 'EquivalentShunt'")
if len(equiv_shunt) > 0:
shunts_to_remove = sv_data.merge(
sv_data.query("KEY == 'SvShuntCompensatorSections.ShuntCompensator'").merge(equiv_shunt.ID, left_on='VALUE',
Expand Down Expand Up @@ -786,6 +786,40 @@ def set_brell_lines_to_zero_in_models(opdm_models, magic_brell_lines: dict = Non
return opdm_models


def set_brell_lines_to_zero_in_models_new(assembled_data, magic_brell_lines: dict = None, profile_to_change: str = "SSH"):
"""
Sets p and q of given (BRELL) lines to zero
Copied from emf_python as is
Workflow:
1) Take models (in cgmes format)
2) parse profile ("SSH") to triplets
3) Check and set the BRELL lines
4) if lines were set, repackage from triplets to CGMES and replace it in given profile
5) return models (losses: ""->'' in header, tab -> double space, closing tags -> self-closing tags if empty)
Note that in test run only one of them: L309 was present in AST
:param opdm_models: list of opdm models
:param magic_brell_lines: dictionary of brell lines
:param profile_to_change: profile to change
"""
if not magic_brell_lines:
magic_brell_lines = {'L373': 'cf3af93a-ad15-4db9-adc2-4e4454bb843f',
'L374': 'd98ec0d4-4e25-4667-b21f-5b816a6e8871',
'L358': 'e0786c57-57ff-454e-b9e2-7a912d81c674',
'L309': '7bd0deae-f000-4b15-a24d-5cf30765219f'}

for line, line_id in magic_brell_lines.items():
if assembled_data.query(f"ID == '{line_id}'").empty:
logger.info(f"Skipping brell line {line} as it was not found in data")
else:
logger.info(f"Setting brell line {line} EquivalentInjection.p and EquivalentInjection.q to 0")
assembled_data.loc[
assembled_data.query(f"ID == '{line_id}' and KEY == 'EquivalentInjection.p'").index, "VALUE"] = 0
assembled_data.loc[
assembled_data.query(f"ID == '{line_id}' and KEY == 'EquivalentInjection.q'").index, "VALUE"] = 0

return assembled_data


if __name__ == "__main__":

from emf.common.integrations.object_storage.models import get_latest_boundary, get_latest_models_and_download
Expand Down
72 changes: 29 additions & 43 deletions emf/loadflow_tool/model_merger/model_merger.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,15 @@
from emf.common.integrations import opdm, minio_api, elastic
from emf.common.integrations.object_storage.models import get_latest_boundary, get_latest_models_and_download
from emf.loadflow_tool import loadflow_settings
from emf.loadflow_tool.helper import opdmprofile_to_bytes, create_opdm_objects
from emf.loadflow_tool.helper import opdmprofile_to_bytes
from emf.loadflow_tool.model_merger import merge_functions
from emf.task_generator.task_generator import update_task_status
from emf.common.logging.custom_logger import get_elk_logging_handler
from emf.loadflow_tool.replacement import run_replacement, get_available_tsos
import triplets
# TODO - move this async solution to some common module
from concurrent.futures import ThreadPoolExecutor
from lxml import etree
from emf.loadflow_tool.model_merger.temporary_fixes import run_post_merge_processing, run_pre_merge_processing

logger = logging.getLogger(__name__)
parse_app_properties(caller_globals=globals(), path=config.paths.cgm_worker.merger)
Expand Down Expand Up @@ -59,7 +59,7 @@ def __init__(self):

def handle(self, task_object: dict, **kwargs):

start_time = datetime.datetime.utcnow()
start_time = datetime.datetime.now(datetime.UTC)
merge_log = {"uploaded_to_opde": False,
"uploaded_to_minio": False,
"scaled": False,
Expand Down Expand Up @@ -102,6 +102,8 @@ def handle(self, task_object: dict, **kwargs):
model_upload_to_opdm = task_properties["upload_to_opdm"]
model_upload_to_minio = task_properties["upload_to_minio"]
model_merge_report_send_to_elk = task_properties["send_merge_report"]
pre_temp_fixes = task_properties['pre_temp_fixes']
post_temp_fixes = task_properties['post_temp_fixes']

# Collect valid models from ObjectStorage
downloaded_models = get_latest_models_and_download(time_horizon, scenario_datetime, valid=False)
Expand Down Expand Up @@ -167,32 +169,24 @@ def handle(self, task_object: dict, **kwargs):
else:
# Load all selected models
input_models = valid_models + [latest_boundary]
# SET BRELL LINE VALUES
if merging_area == 'BA':
input_models = merge_functions.set_brell_lines_to_zero_in_models(input_models)

if len(input_models) < 2:
logger.warning("Found no models to merge, returning None")
return None
assembled_data = merge_functions.load_opdm_data(input_models)
assembled_data = triplets.cgmes_tools.update_FullModel_from_filename(assembled_data)
assembled_data = merge_functions.configure_paired_boundarypoint_injections_by_nodes(assembled_data)
escape_upper_xml = assembled_data[assembled_data['VALUE'].astype(str).str.contains('.XML')]
if not escape_upper_xml.empty:
escape_upper_xml['VALUE'] = escape_upper_xml['VALUE'].str.replace('.XML', '.xml')
assembled_data = triplets.rdf_parser.update_triplet_from_triplet(assembled_data, escape_upper_xml,
update=True,
add=False)
input_models = create_opdm_objects([merge_functions.export_to_cgmes_zip([assembled_data])])
del assembled_data

# Run pre-processing
pre_p_start = datetime.datetime.now(datetime.UTC)
if pre_temp_fixes:
input_models = run_pre_merge_processing(input_models, merging_area)
pre_p_end = datetime.datetime.now(datetime.UTC)
logger.error(f"Pre-processing took: {(pre_p_end - pre_p_start).total_seconds()} seconds")

# Load network model and merge
merge_start = datetime.datetime.utcnow()
merge_start = datetime.datetime.now(datetime.UTC)
merged_model = merge_functions.load_model(input_models)

# TODO - run other LF if default fails
solved_model = merge_functions.run_lf(merged_model, loadflow_settings=getattr(loadflow_settings,
MERGE_LOAD_FLOW_SETTINGS))
merge_end = datetime.datetime.utcnow()
solved_model = merge_functions.run_lf(merged_model, loadflow_settings=getattr(loadflow_settings, MERGE_LOAD_FLOW_SETTINGS))
merge_end = datetime.datetime.now(datetime.UTC)
logger.info(f"Loadflow status of main island: {solved_model['LOADFLOW_RESULTS'][0]['status_text']}")

# Update time_horizon in case of generic ID process type
Expand All @@ -203,26 +197,16 @@ def handle(self, task_object: dict, **kwargs):
time_horizon = f"{int((_scenario_datetime - _task_creation_time).seconds / 3600):02d}"
logger.info(f"Setting intraday time horizon to: {time_horizon}")

# TODO - get version dynamically form ELK
sv_data, ssh_data = merge_functions.create_sv_and_updated_ssh(solved_model, input_models, scenario_datetime,
time_horizon, version, merging_area,
merging_entity, mas)

# Fix SV
sv_data = merge_functions.fix_sv_shunts(sv_data, input_models)
sv_data = merge_functions.fix_sv_tapsteps(sv_data, ssh_data)
sv_data = merge_functions.remove_small_islands(sv_data, int(SMALL_ISLAND_SIZE))
models_as_triplets = merge_functions.load_opdm_data(input_models)
sv_data = merge_functions.remove_duplicate_sv_voltages(cgm_sv_data=sv_data,
original_data=models_as_triplets)
sv_data = merge_functions.check_and_fix_dependencies(cgm_sv_data=sv_data,
cgm_ssh_data=ssh_data,
original_data=models_as_triplets)
sv_data, ssh_data = merge_functions.disconnect_equipment_if_flow_sum_not_zero(cgm_sv_data=sv_data,
cgm_ssh_data=ssh_data,
original_data=models_as_triplets)
# Run post-processing
post_p_start = datetime.datetime.now(datetime.UTC)
sv_data, ssh_data = run_post_merge_processing(input_models, solved_model, task_properties, SMALL_ISLAND_SIZE,
apply_temp_fixes=post_temp_fixes)

# Package both input models and exported CGM profiles to in memory zip files
serialized_data = merge_functions.export_to_cgmes_zip([ssh_data, sv_data])
post_p_end = datetime.datetime.now(datetime.UTC)
logger.error(f"Post proocessing took: {(post_p_end - post_p_start).total_seconds()} seconds")
logger.error(f"Merging took: {(merge_end - merge_start).total_seconds()} seconds")

# Upload to OPDM
if model_upload_to_opdm:
Expand Down Expand Up @@ -270,7 +254,7 @@ def handle(self, task_object: dict, **kwargs):

logger.info(f"CGM creation done for {cgm_name}")

end_time = datetime.datetime.utcnow()
end_time = datetime.datetime.now(datetime.UTC)
task_duration = end_time - start_time
logger.info(f"Task ended at {end_time}, total run time {task_duration}",
extra={"task_duration": task_duration.total_seconds(),
Expand Down Expand Up @@ -350,12 +334,14 @@ def handle(self, task_object: dict, **kwargs):
"timestamp_utc": "2024-10-11T06:30:00+00:00",
"merge_type": "EU",
"merging_entity": "BALTICRSC",
"included": [],
"included": ['PSE', 'AST'],
"excluded": [],
"local_import": [],
"time_horizon": "ID",
"time_horizon": "1D",
"version": "99",
"mas": "http://www.baltic-rsc.eu/OperationalPlanning",
"pre_temp_fixes": "True",
"post_temp_fixes": "True",
"replacement": "True",
"scaling": "False",
"upload_to_opdm": "False",
Expand Down
61 changes: 61 additions & 0 deletions emf/loadflow_tool/model_merger/temporary_fixes.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
import triplets
from emf.loadflow_tool.helper import create_opdm_objects
from emf.loadflow_tool.model_merger.merge_functions import (load_opdm_data, create_sv_and_updated_ssh, fix_sv_shunts,
fix_sv_tapsteps, remove_duplicate_sv_voltages,
remove_small_islands,check_and_fix_dependencies,
disconnect_equipment_if_flow_sum_not_zero,
export_to_cgmes_zip, set_brell_lines_to_zero_in_models,
configure_paired_boundarypoint_injections_by_nodes,
set_brell_lines_to_zero_in_models_new)


def run_pre_merge_processing(input_models, merging_area):

# TODO warning logs for temp fix functions

# SET BRELL LINE VALUES
if merging_area == 'BA':
input_models = set_brell_lines_to_zero_in_models(input_models)

assembled_data = load_opdm_data(input_models)

# TODO try to optimize it better
# if merging_area == 'BA':
# assembled_data = set_brell_lines_to_zero_in_models_new(assembled_data)

assembled_data = triplets.cgmes_tools.update_FullModel_from_filename(assembled_data)
assembled_data = configure_paired_boundarypoint_injections_by_nodes(assembled_data)
escape_upper_xml = assembled_data[assembled_data['VALUE'].astype(str).str.contains('.XML')]
if not escape_upper_xml.empty:
escape_upper_xml['VALUE'] = escape_upper_xml['VALUE'].str.replace('.XML', '.xml')
assembled_data = triplets.rdf_parser.update_triplet_from_triplet(assembled_data, escape_upper_xml, update=True, add=False)

input_models = create_opdm_objects([export_to_cgmes_zip([assembled_data])])

return input_models


def run_post_merge_processing(input_models, solved_model, task_properties, SMALL_ISLAND_SIZE, enable_temp_fixes):

time_horizon = task_properties["time_horizon"]
scenario_datetime = task_properties["timestamp_utc"]
merging_area = task_properties["merge_type"]
merging_entity = task_properties["merging_entity"]
mas = task_properties["mas"]
version = task_properties["version"]

models_as_triplets = load_opdm_data(input_models)
sv_data, ssh_data = create_sv_and_updated_ssh(solved_model, input_models, models_as_triplets,
scenario_datetime, time_horizon,
version, merging_area,
merging_entity, mas)

if enable_temp_fixes:
sv_data = fix_sv_shunts(sv_data, models_as_triplets)
sv_data = fix_sv_tapsteps(sv_data, ssh_data)
sv_data = remove_small_islands(sv_data, int(SMALL_ISLAND_SIZE))
sv_data = remove_duplicate_sv_voltages(cgm_sv_data=sv_data, original_data=models_as_triplets)
sv_data = check_and_fix_dependencies(cgm_sv_data=sv_data, cgm_ssh_data=ssh_data, original_data=models_as_triplets)
sv_data, ssh_data = disconnect_equipment_if_flow_sum_not_zero(cgm_sv_data=sv_data, cgm_ssh_data=ssh_data, original_data=models_as_triplets)

return sv_data, ssh_data
2 changes: 2 additions & 0 deletions emf/task_generator/manual_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@
process_config_json[0]['runs'][0]['properties']['upload_to_opdm'] = UPLOAD_TO_OPDM
process_config_json[0]['runs'][0]['properties']['upload_to_minio'] = UPLOAD_TO_MINIO
process_config_json[0]['runs'][0]['properties']['send_merge_report'] = SEND_MERGE_REPORT
process_config_json[0]['runs'][0]['properties']['pre_temp_fixes'] = PRE_TEMP_FIXES
process_config_json[0]['runs'][0]['properties']['post_temp_fixes'] = POST_TEMP_FIXES

process_config_json[0]['runs'][0]['properties']['merging_entity'] = TASK_MERGING_ENTITY
timeframe_config_json[0]['period_start'] = f'{PROCESS_TIME_SHIFT}'
Expand Down
5 changes: 4 additions & 1 deletion emf/task_generator/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
runs['properties']['upload_to_opdm'] = UPLOAD_TO_OPDM_CGM
runs['properties']['upload_to_minio'] = UPLOAD_TO_MINIO_CGM
runs['properties']['send_merge_report'] = SEND_MERGE_REPORT_CGM
runs['properties']['pre_temp_fixes'] = PRE_TEMP_FIXES
runs['properties']['post_temp_fixes'] = POST_TEMP_FIXES
for runs in process_config_json[1]['runs']:
runs['properties']['included'] = [tso.strip() for tso in RMM_INCLUDED_TSO.split(',')] if RMM_INCLUDED_TSO else []
runs['properties']['excluded'] = [tso.strip() for tso in RMM_EXCLUDED_TSO.split(',')] if RMM_EXCLUDED_TSO else []
Expand All @@ -34,7 +36,8 @@
runs['properties']['upload_to_opdm'] = UPLOAD_TO_OPDM_RMM
runs['properties']['upload_to_minio'] = UPLOAD_TO_MINIO_RMM
runs['properties']['send_merge_report'] = SEND_MERGE_REPORT_RMM

runs['properties']['pre_temp_fixes'] = PRE_TEMP_FIXES
runs['properties']['post_temp_fixes'] = POST_TEMP_FIXES

with open(process_conf, 'w') as file:
json.dump(process_config_json, file, indent=1)
Expand Down

0 comments on commit dc54b05

Please sign in to comment.