Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

new implementation for multicontainer use case #67

Merged
merged 14 commits into from
Apr 16, 2024
126 changes: 69 additions & 57 deletions broker/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,25 +9,30 @@
import uvicorn
import logging
import socket
import shutil
import time
import yaml
import json
import sys
import os

import json

from oedisi.componentframework.system_configuration import WiringDiagram, ComponentStruct
from oedisi.types.common import ServerReply, HeathCheck


logger = logging.getLogger('uvicorn.error')

#logger = logging.getLogger(__name__)

app = FastAPI()

is_kubernetes_env = os.environ['KUBERNETES_SERVICE_NAME'] if 'KUBERNETES_SERVICE_NAME' in os.environ else None

WIRING_DIAGRAM_FILENAME = "system.json"
WIRING_DIAGRAM : WiringDiagram | None = None

# def test_function(json_file = r"C:/Users/alatif/Documents/GitHub/sgidal-example/scenarios/system.json"):
# global WIRING_DIAGRAM
# data = json.load(open(json_file, "r"))
# WIRING_DIAGRAM = WiringDiagram(**data)
AadilLatif marked this conversation as resolved.
Show resolved Hide resolved

def build_url(host:str, port:int, enpoint:list):
if is_kubernetes_env:
Expand All @@ -44,23 +49,21 @@ def find_filenames(path_to_dir=os.getcwd(), suffix=".feather"):


def read_settings():
component_map = {}
with open("docker-compose.yml", "r") as stream:
config = yaml.safe_load(stream)
services = config["services"]
print(services)
broker = services.pop("oedisi_broker")
broker_host = broker["hostname"]

broker_ip = socket.gethostbyname(broker_host)
api_port = int(broker["ports"][0].split(":")[0])

for service in services:
host = services[service]["hostname"]
port = int(services[service]["ports"][0].split(":")[0])
component_map[host] = port

broker_host = socket.gethostname()
broker_ip = socket.gethostbyname(broker_host)
api_port = 8766 #int(os.environ['PORT'])

component_map = {
broker_host: api_port
}
if WIRING_DIAGRAM:
for component in WIRING_DIAGRAM.components:
component_map[component.host] = component.container_port
else:
logger.info("Use the '/configure' setpoint to setup up the WiringDiagram before making requests other enpoints")

return services, component_map, broker_ip, api_port
return component_map, broker_ip, api_port


@app.get("/")
Expand All @@ -72,15 +75,14 @@ def read_root():

return JSONResponse(response, 200)


@app.post("/profiles")
async def upload_profiles(file: UploadFile):
try:
services, _, _, _ = read_settings()
for service in services:
if "feeder" in service.lower():
ip = services[service]["hostname"]
port = int(services[service]["ports"][0].split(":")[0])
component_map, _, _ = read_settings()
for hostname in component_map:
if "feeder" in hostname():
AadilLatif marked this conversation as resolved.
Show resolved Hide resolved
ip = hostname
port = component_map[hostname]
data = file.file.read()
if not file.filename.endswith(".zip"):
HTTPException(
Expand All @@ -101,15 +103,14 @@ async def upload_profiles(file: UploadFile):
err = traceback.format_exc()
raise HTTPException(status_code=500, detail=str(err))


@app.post("/model")
async def upload_model(file: UploadFile):
try:
services, _, _, _ = read_settings()
for service in services:
if "feeder" in service.lower():
ip = services[service]["hostname"]
port = int(services[service]["ports"][0].split(":")[0])
component_map, _, _ = read_settings()
for hostname in component_map:
if "feeder" in hostname():
AadilLatif marked this conversation as resolved.
Show resolved Hide resolved
ip = hostname
port = component_map[hostname]
data = file.file.read()
if not file.filename.endswith(".zip"):
HTTPException(
Expand All @@ -132,18 +133,19 @@ async def upload_model(file: UploadFile):

@app.get("/results")
def download_results():
services, _, _, _ = read_settings()
for service in services:
if "recorder" in service.lower():
host = services[service]["hostname"]
port = int(services[service]["ports"][0].split(":")[0])
component_map, _, _ = read_settings()

for hostname in component_map:
if "recorder" in hostname():
AadilLatif marked this conversation as resolved.
Show resolved Hide resolved
host = hostname
port = component_map[hostname]

url = build_url(host, port, ["download"])
logger.info(f"making a request to url - {url}")

response = requests.get(url)
logger.info(f"Response from {service} has {len(response.content)} bytes")
with open(f"{service}.feather", "wb") as out_file:
logger.info(f"Response from {hostname} has {len(response.content)} bytes")
with open(f"{hostname}.feather", "wb") as out_file:
out_file.write(response.content)

file_path = "results.zip"
Expand All @@ -156,7 +158,6 @@ def download_results():
except Exception as e:
raise HTTPException(status_code=404, detail="Failed download")


@app.get("/terminate")
def terminate_simulation():
try:
Expand All @@ -165,39 +166,46 @@ def terminate_simulation():
except Exception as e:
raise HTTPException(status_code=404, detail="Failed download ")

def _get_feeder_info(component_map:dict):
for host in component_map:
if host == "feeder":
return host, component_map[host]

def run_simulation():
services, component_map, broker_ip, api_port = read_settings()
print(services)
component_map, broker_ip, api_port = read_settings()
feeder_host, feeder_port = _get_feeder_info(component_map)
logger.info(f"{broker_ip}, {api_port}")
initstring = f"-f {len(component_map)} --name=mainbroker --loglevel=trace --local_interface={broker_ip} --localport=23404"
initstring = f"-f {len(component_map)-1} --name=mainbroker --loglevel=trace --local_interface={broker_ip} --localport=23404"
logger.info(f"Broker initaialization string: {initstring}")
broker = h.helicsCreateBroker("zmq", "", initstring)
logger.info(broker)
isconnected = h.helicsBrokerIsConnected(broker)
logger.info(f"Broker connected: {isconnected}")
logger.info(str(component_map))
replies = []

broker_host = socket.gethostname()

for service_ip, service_port in component_map.items():

url = build_url(service_ip, service_port, ["run"])
logger.info(f"making a request to url - {url}")

myobj = {
"broker_port": 23404,
"broker_ip": broker_ip,
"api_port": api_port,
"services": services,
}
replies.append(grequests.post(url, json=myobj))
if service_ip != broker_host:
url = build_url(service_ip, service_port, ["run"])
logger.info(f"making a request to url - {url}")

myobj = {
"broker_port": 23404,
"broker_ip": broker_ip,
"api_port": api_port,
"feeder_host": feeder_host,
"feeder_port": feeder_port
}
replies.append(grequests.post(url, json=myobj))
grequests.map(replies)
while h.helicsBrokerIsConnected(broker):
time.sleep(1)
h.helicsCloseLibrary()

return


@app.post("/run")
async def run_feeder(background_tasks: BackgroundTasks):
try:
Expand All @@ -207,10 +215,12 @@ async def run_feeder(background_tasks: BackgroundTasks):
except Exception as e:
err = traceback.format_exc()
raise HTTPException(status_code=404, detail=str(err))



@app.post("/configure")
async def configure(wiring_diagram:WiringDiagram):
global WIRING_DIAGRAM
WIRING_DIAGRAM = wiring_diagram

json.dump(wiring_diagram.dict(), open(WIRING_DIAGRAM_FILENAME, "w"))
for component in wiring_diagram.components:
component_model = ComponentStruct(
Expand All @@ -230,3 +240,5 @@ async def configure(wiring_diagram:WiringDiagram):

if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=int(os.environ['PORT']))
# test_function()
# read_settings()
4 changes: 2 additions & 2 deletions measuring_federate/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ async def read_root():
@app.post("/run")
async def run_model(broker_config:BrokerConfig, background_tasks: BackgroundTasks):
logging.info(broker_config)
feeder_host = broker_config.services['oedisi_feeder']['hostname']
feeder_port = int(broker_config.services['oedisi_feeder']['ports'][0].split(":")[0])
feeder_host = broker_config.feeder_host
feeder_port = broker_config.feeder_port
url = build_url(feeder_host, feeder_port, ['sensor'])
logging.info(url)
try:
Expand Down
Loading