Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

new implementation for multicontainer use case #67

Merged
merged 14 commits into from
Apr 16, 2024
2 changes: 1 addition & 1 deletion LocalFeeder/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,5 @@ boto3
xarray
fastapi
uvicorn
oedisi==1.1.1
git+https://github.com/openEDI/oedisi@al/config
python-multipart
50 changes: 33 additions & 17 deletions LocalFeeder/server.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,21 @@
from fastapi import FastAPI, BackgroundTasks, UploadFile, Request
from fastapi.exceptions import HTTPException
from fastapi.responses import JSONResponse
from sender_cosim import run_simulator
import traceback
import asyncio
import json
import logging
import os
import zipfile
import uvicorn
import socket
import sys
import json
import time
import traceback
import zipfile
import sys
import os

import uvicorn
from fastapi import BackgroundTasks, FastAPI, Request, UploadFile
from fastapi.exceptions import HTTPException
from fastapi.responses import JSONResponse
from oedisi.types.common import BrokerConfig, HeathCheck, ServerReply
from sender_cosim import run_simulator
from oedisi.componentframework.system_configuration import ComponentStruct
from oedisi.types.common import ServerReply, HeathCheck, DefaultFileNames
from oedisi.types.common import BrokerConfig

REQUEST_TIMEOUT_SEC = 1200

Expand Down Expand Up @@ -49,7 +51,7 @@ def read_root():
return JSONResponse(response, 200)


@app.get("/sensor/")
@app.get("/sensor")
async def sensor():
logging.info(os.getcwd())
sensor_path = os.path.join(base_path, "sensors", "sensors.json")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm unable to add a comment on the right section, but I noticed a couple problems caused by this function when trying to run with SMART-DS as well as deleting the existing sensors.json file

  • We don't even use sensors.json
  • This should almost certainly use asyncio.sleep(1) so that it doesn't block

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will have to resolve this issue once we have sensors published on the helics interface

Expand All @@ -61,7 +63,7 @@ async def sensor():
return data


@app.post("/profiles/")
@app.post("/profiles")
async def upload_profiles(file: UploadFile):
try:
data = file.file.read()
Expand Down Expand Up @@ -94,7 +96,7 @@ async def upload_profiles(file: UploadFile):
)


@app.post("/model/")
@app.post("/model")
async def upload_model(file: UploadFile):
try:
data = file.file.read()
Expand Down Expand Up @@ -123,7 +125,7 @@ async def upload_model(file: UploadFile):
HTTPException(500, "Unknown error while uploading userdefined opendss model.")


@app.post("/run/")
@app.post("/run")
async def run_feeder(
broker_config: BrokerConfig, background_tasks: BackgroundTasks
): # :BrokerConfig
Expand All @@ -138,6 +140,20 @@ async def run_feeder(
HTTPException(500, str(err))


@app.post("/configure")
async def configure(component_struct:ComponentStruct):
component = component_struct.component
params = component.parameters
params["name"] = component.name
links = {}
for link in component_struct.links:
links[link.target_port] = f"{link.source}/{link.source_port}"
json.dump(links , open(DefaultFileNames.INPUT_MAPPING.value, "w"))
json.dump(params , open(DefaultFileNames.STATIC_INPUTS.value, "w"))
response = ServerReply(
detail = f"Sucessfully updated configuration files."
).dict()
return JSONResponse(response, 200)

if __name__ == "__main__":
port = int(sys.argv[2])
uvicorn.run(app, host="0.0.0.0", port=port)
uvicorn.run(app, host="0.0.0.0", port=int(os.environ['PORT']))
2 changes: 1 addition & 1 deletion broker/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,4 @@ RUN pip install -r requirements.txt

EXPOSE 8766/tcp

CMD ["python", "server.py", "10.5.0.2", "8766"]
CMD ["python", "server.py"]
19 changes: 19 additions & 0 deletions broker/docker-compose.yml
josephmckinsey marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
networks:
custom-network:
driver: bridge
ipam:
config:
- gateway: 10.5.0.1
subnet: 10.5.0.0/16
services:
oedisi_broker:
build:
context: ./broker/.
environment:
PORT: '8766'
hostname: broker
image: aadillatif/oedisi_broker
networks:
custom-network: {}
ports:
- 8766:8766
2 changes: 1 addition & 1 deletion broker/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,6 @@ helics-apps==3.4.0
pyyaml
fastapi
uvicorn
oedisi==1.1.1
git+https://github.com/openEDI/oedisi@al/config
grequests
python-multipart
132 changes: 91 additions & 41 deletions broker/server.py
Original file line number Diff line number Diff line change
@@ -1,25 +1,43 @@
import logging
import os
import shutil
import socket
import sys
import time
import traceback
import zipfile

import grequests
from fastapi import FastAPI, BackgroundTasks, UploadFile
from fastapi.responses import FileResponse, JSONResponse
from fastapi.exceptions import HTTPException
import helics as h
import grequests
import traceback
import requests
import zipfile
import uvicorn
import logging
import socket
import shutil
import time
import yaml
from fastapi import BackgroundTasks, FastAPI, UploadFile
from fastapi.exceptions import HTTPException
from fastapi.responses import FileResponse, JSONResponse
from oedisi.types.common import HeathCheck, ServerReply
import json
import sys
import os

from oedisi.componentframework.system_configuration import WiringDiagram, ComponentStruct
from oedisi.types.common import ServerReply, HeathCheck


logger = logging.getLogger('uvicorn.error')


app = FastAPI()

is_kubernetes_env = os.environ['KUBERNETES_SERVICE_NAME'] if 'KUBERNETES_SERVICE_NAME' in os.environ else None

WIRING_DIAGRAM_FILENAME = "system.json"

def build_url(host:str, port:int, enpoint:list):
if is_kubernetes_env:
KUBERNETES_SERVICE_NAME = os.environ['KUBERNETES_SERVICE_NAME']
url = f"http://{host}.{KUBERNETES_SERVICE_NAME}:{port}/"
else:
url = f"http://{host}:{port}/"
url = url + "/".join(enpoint) + "/"
return url

def find_filenames(path_to_dir=os.getcwd(), suffix=".feather"):
filenames = os.listdir(path_to_dir)
return [filename for filename in filenames if filename.endswith(suffix)]
Expand All @@ -32,13 +50,15 @@ def read_settings():
services = config["services"]
print(services)
broker = services.pop("oedisi_broker")
broker_ip = broker["networks"]["custom-network"]["ipv4_address"]
broker_host = broker["hostname"]

broker_ip = socket.gethostbyname(broker_host)
api_port = int(broker["ports"][0].split(":")[0])

for service in services:
ip = services[service]["networks"]["custom-network"]["ipv4_address"]
host = services[service]["hostname"]
port = int(services[service]["ports"][0].split(":")[0])
component_map[ip] = port
component_map[host] = port

return services, component_map, broker_ip, api_port

Expand All @@ -53,13 +73,13 @@ def read_root():
return JSONResponse(response, 200)


@app.post("/profiles/")
@app.post("/profiles")
async def upload_profiles(file: UploadFile):
try:
services, _, _, _ = read_settings()
for service in services:
if "feeder" in service.lower():
ip = services[service]["networks"]["custom-network"]["ipv4_address"]
ip = services[service]["hostname"]
port = int(services[service]["ports"][0].split(":")[0])
data = file.file.read()
if not file.filename.endswith(".zip"):
Expand All @@ -68,7 +88,10 @@ async def upload_profiles(file: UploadFile):
)
with open(file.filename, "wb") as f:
f.write(data)
url = f"http://{ip}:{port}/profiles/"

url = build_url(ip, port, ["profiles"])
logger.info(f"making a request to url - {url}")

files = {"file": open(file.filename, "rb")}
r = requests.post(url, files=files)
response = ServerReply(detail=r.text).dict()
Expand All @@ -79,13 +102,13 @@ async def upload_profiles(file: UploadFile):
raise HTTPException(status_code=500, detail=str(err))


@app.post("/model/")
@app.post("/model")
async def upload_model(file: UploadFile):
try:
services, _, _, _ = read_settings()
for service in services:
if "feeder" in service.lower():
ip = services[service]["networks"]["custom-network"]["ipv4_address"]
ip = services[service]["hostname"]
port = int(services[service]["ports"][0].split(":")[0])
data = file.file.read()
if not file.filename.endswith(".zip"):
Expand All @@ -94,7 +117,10 @@ async def upload_model(file: UploadFile):
)
with open(file.filename, "wb") as f:
f.write(data)
url = f"http://{ip}:{port}/model/"

url = build_url(ip, port, ["model"])
logger.info(f"making a request to url - {url}")

files = {"file": open(file.filename, "rb")}
r = requests.post(url, files=files)
response = ServerReply(detail=r.text).dict()
Expand All @@ -104,20 +130,22 @@ async def upload_model(file: UploadFile):
err = traceback.format_exc()
raise HTTPException(status_code=500, detail=str(err))


@app.get("/results/")
@app.get("/results")
def download_results():
services, _, _, _ = read_settings()
for service in services:
if "recorder" in service.lower():
ip = services[service]["networks"]["custom-network"]["ipv4_address"]
host = services[service]["hostname"]
port = int(services[service]["ports"][0].split(":")[0])
url = f"http://{ip}:{port}/download/"

url = build_url(host, port, ["download"])
logger.info(f"making a request to url - {url}")

response = requests.get(url)
logger.info(f"Response from {service} has {len(response.content)} bytes")
with open(f"{service}.feather", "wb") as out_file:
shutil.copyfileobj(response.raw, out_file)
time.sleep(2)

out_file.write(response.content)

file_path = "results.zip"
with zipfile.ZipFile(file_path, "w") as zipMe:
for feather_file in find_filenames():
Expand All @@ -129,7 +157,7 @@ def download_results():
raise HTTPException(status_code=404, detail="Failed download")


@app.get("/terminate/")
@app.get("/terminate")
def terminate_simulation():
try:
h.helicsCloseLibrary()
Expand All @@ -140,17 +168,21 @@ def terminate_simulation():

def run_simulation():
services, component_map, broker_ip, api_port = read_settings()
print(services)
logger.info(f"{broker_ip}, {api_port}")
initstring = f"-f {len(component_map)} --name=mainbroker --loglevel=trace --local_interface={broker_ip} --localport=23404"
logging.info(f"Broker initaialization string: {initstring}")
logger.info(f"Broker initaialization string: {initstring}")
broker = h.helicsCreateBroker("zmq", "", initstring)
logging.info(broker)
logger.info(broker)
isconnected = h.helicsBrokerIsConnected(broker)
logging.info(f"Broker connected: {isconnected}")
logging.info(str(component_map))
logger.info(f"Broker connected: {isconnected}")
logger.info(str(component_map))
replies = []
for service_ip, service_port in component_map.items():
url = f"http://{service_ip}:{service_port}/run/"
print(url)

url = build_url(service_ip, service_port, ["run"])
logger.info(f"making a request to url - {url}")

myobj = {
"broker_port": 23404,
"broker_ip": broker_ip,
Expand All @@ -166,7 +198,7 @@ def run_simulation():
return


@app.post("/run/")
@app.post("/run")
async def run_feeder(background_tasks: BackgroundTasks):
try:
background_tasks.add_task(run_simulation)
Expand All @@ -176,7 +208,25 @@ async def run_feeder(background_tasks: BackgroundTasks):
err = traceback.format_exc()
raise HTTPException(status_code=404, detail=str(err))



@app.post("/configure")
async def configure(wiring_diagram:WiringDiagram):
json.dump(wiring_diagram.dict(), open(WIRING_DIAGRAM_FILENAME, "w"))
for component in wiring_diagram.components:
component_model = ComponentStruct(
component = component,
links = []
)
for link in wiring_diagram.links:
if link.target == component.name:
component_model.links.append(link)

url = build_url(component.host, component.container_port, ["configure"])
logger.info(f"making a request to url - {url}")

r = requests.post(url, json=component_model.dict())
assert r.status_code==200, f"POST request to update configuration failed for url - {url}"
return JSONResponse(ServerReply(detail="Sucessfully updated config files for all containers").dict(), 200)

if __name__ == "__main__":
port = int(sys.argv[2])
uvicorn.run(app, host="0.0.0.0", port=port)
uvicorn.run(app, host="0.0.0.0", port=int(os.environ['PORT']))
2 changes: 1 addition & 1 deletion measuring_federate/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,4 @@ fastapi
uvicorn
requests
grequests
oedisi==1.1.1
git+https://github.com/openEDI/oedisi@al/config
Loading
Loading