Skip to content

Commit

Permalink
Add node info endpoint (#101)
Browse files Browse the repository at this point in the history
* add node info endpoint

* remove newline

* fix typo

* fix typo

* fix type

* fix comments

* fix comments
  • Loading branch information
rsarm authored Apr 24, 2024
1 parent b5b747e commit d2a1e0f
Show file tree
Hide file tree
Showing 5 changed files with 284 additions and 2 deletions.
30 changes: 30 additions & 0 deletions firecrest/AsyncClient.py
Original file line number Diff line number Diff line change
Expand Up @@ -1246,6 +1246,36 @@ async def poll_active(

return ret

async def nodes(
self,
machine: str,
nodes: Optional[Sequence[str]] = None,
) -> List[t.JobQueue]:
"""Retrieves information about the compute nodes.
This call uses the `scontrol show nodes` command.
:param machine: the machine name where the scheduler belongs to
:param nodes: specific compute nodes to query
:calls: GET `/compute/nodes`
GET `/tasks/{taskid}`
.. warning:: This is available only for FirecREST>=1.16.0
"""
params = {}
if nodes:
params["nodes"] = ",".join(nodes)

resp = await self._get_request(
endpoint="/compute/nodes",
additional_headers={"X-Machine-Name": machine},
params=params,
)
json_response = self._json_response([resp], 200)
t = ComputeTask(self, json_response["task_id"], [resp])
result = await t.poll_task("200")
return result

async def cancel(self, machine: str, job_id: str | int) -> str:
"""Cancels running job.
This call uses the `scancel` command.
Expand Down
32 changes: 32 additions & 0 deletions firecrest/BasicClient.py
Original file line number Diff line number Diff line change
Expand Up @@ -1082,6 +1082,38 @@ def poll_active(
)
return list(dict_result.values())

def nodes(
self,
machine: str,
nodes: Optional[Sequence[str]] = None,
) -> List[t.JobQueue]:
"""Retrieves information about the compute nodes.
This call uses the `scontrol show nodes` command.
:param machine: the machine name where the scheduler belongs to
:param nodes: specific compute nodes to query
:calls: GET `/compute/nodes`
GET `/tasks/{taskid}`
.. warning:: This is available only for FirecREST>=1.16.0
"""
params = {}
if nodes:
params["nodes"] = ",".join(nodes)

resp = self._get_request(
endpoint="/compute/nodes",
additional_headers={"X-Machine-Name": machine},
params=params,
)
self._current_method_requests.append(resp)
json_response = self._json_response(self._current_method_requests, 200)
result = self._poll_tasks(
json_response["task_id"], "200", iter([1, 0.5, 0.25])
)
return result

def cancel(self, machine: str, job_id: str | int) -> str:
"""Cancels running job.
This call uses the `scancel` command.
Expand Down
52 changes: 50 additions & 2 deletions firecrest/cli/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -1292,6 +1292,54 @@ def poll_active(
raise typer.Exit(code=1)


@app.command(rich_help_panel="Compute commands")
def get_nodes(
config_from_parent: str = typer.Option(None,
callback=config_parent_load_callback,
is_eager=True,
hidden=True
),
system: str = typer.Option(
..., "-s", "--system", help="The name of the system.", envvar="FIRECREST_SYSTEM"
),
nodes: Optional[List[str]] = typer.Argument(
None, help="List of specific compute nodes to query."
),
raw: bool = typer.Option(False, "--raw", help="Print unformatted."),
):
"""Retrieves information about the compute nodes.
This call uses the `scontrol show nodes` command
"""
try:
results = client.nodes(system, nodes)
if raw:
console.print(results)
else:
parsed_results = []
for item in results:
parsed_item = {}
for key, value in item.items():
if isinstance(value, list):
parsed_item[key] = ", ".join(value)
else:
parsed_item[key] = str(value)

parsed_results.append(parsed_item)

table = create_table(
"Information about jobs in the queue",
parsed_results,
("Name", "NodeName"),
("Partitions", "Partitions"),
("State", "State"),
("Active Features", "ActiveFeatures"),
)
console.print(table)
except Exception as e:
examine_exeption(e)
raise typer.Exit(code=1)


@app.command(rich_help_panel="Compute commands")
def cancel(
config_from_parent: str = typer.Option(None,
Expand All @@ -1312,8 +1360,8 @@ def cancel(
raise typer.Exit(code=1)


@reservation_app.command(rich_help_panel="Reservation commands")
def list(
@reservation_app.command(name='list', rich_help_panel="Reservation commands")
def list_command(
config_from_parent: str = typer.Option(None,
callback=config_parent_load_callback,
is_eager=True,
Expand Down
128 changes: 128 additions & 0 deletions tests/test_compute.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,28 @@ def submit_path_handler(request: Request):
)


def nodes_request_handler(request: Request):
if not request.query_string or request.query_string == b'nodes=nid001':
ret = {
"success": "Task created",
"task_id": "nodes_info",
"task_url": "/tasks/nodes_info"
}
status_code = 200

if request.query_string == b'nodes=nidunknown':
ret = {
"success": "Task created",
"task_id": "info_unknown_node",
"task_url": "/tasks/info_unknown_node"
}
status_code = 200

return Response(
json.dumps(ret), status=status_code, content_type="application/json"
)


def submit_upload_handler(request: Request):
if request.headers["Authorization"] != "Bearer VALID_TOKEN":
return Response(
Expand Down Expand Up @@ -845,6 +867,62 @@ def tasks_handler(request: Request):
}
status_code = 200

if taskid == "nodes_info":
ret = {
"tasks": {
taskid: {
"created_at": "2024-04-16T09:47:06",
"data": [
{
"ActiveFeatures": [
"f7t"
],
"NodeName": "nid001",
"Partitions": [
"part01",
"part02",
],
"State": [
"IDLE"
]
}
],
"description": "Finished successfully",
"hash_id": "nodes_info",
"last_modify": "2024-04-16T09:47:06",
"service": "compute",
"status": "200",
"system": "cluster",
"task_id": "nodes_info",
"task_url": "/tasks/nodes_info",
"updated_at": "2024-04-16T09:47:06",
"user": "service-account-firecrest-sample"
}
}
}
status_code = 200

if taskid == "info_unknown_node":
ret = {
"tasks": {
taskid: {
"created_at": "2024-04-16T09:56:14",
"data": "Node nidunknown not found",
"description": "Finished with errors",
"hash_id": "info_unknown_node",
"last_modify": "2024-04-16T09:56:14",
"service": "compute",
"status": "400",
"system": "cluster",
"task_id": "info_unknown_node",
"task_url": "/tasks/info_unknown_node",
"updated_at": "2024-04-16T09:56:14",
"user": "service-account-firecrest-sample"
}
}
}
status_code = 400

return Response(
json.dumps(ret), status=status_code, content_type="application/json"
)
Expand Down Expand Up @@ -876,6 +954,10 @@ def fc_server(httpserver):
"/tasks", method="GET"
).respond_with_handler(tasks_handler)

httpserver.expect_request(
"/compute/nodes", method="GET"
).respond_with_handler(nodes_request_handler)

return httpserver


Expand Down Expand Up @@ -1216,6 +1298,17 @@ def test_poll_active(valid_client):
]


def test_cli_get_nodes(valid_credentials):
args = valid_credentials + ["get-nodes", "--system", "cluster1", "nid001"]
result = runner.invoke(cli.app, args=args)
stdout = common.clean_stdout(result.stdout)
assert result.exit_code == 0
assert "Information about jobs in the queue" in stdout
assert "nid001" in stdout
assert "part01, part02" in stdout
assert "IDLE" in stdout
assert "f7t" in stdout

def test_cli_poll_active(valid_credentials):
global queue_retry
queue_retry = 0
Expand Down Expand Up @@ -1294,3 +1387,38 @@ def test_cancel_invalid_machine(valid_client):
def test_cancel_invalid_client(invalid_client):
with pytest.raises(firecrest.UnauthorizedException):
invalid_client.cancel(machine="cluster1", job_id=35360071)


def test_get_nodes(valid_client):
response = [{
"ActiveFeatures": ["f7t"],
"NodeName": "nid001",
"Partitions": [
"part01",
"part02"
],
"State": [
"IDLE"
]
}]
assert valid_client.nodes(machine="cluster1") == response


def test_get_nodes_from_list(valid_client):
response = [{
"ActiveFeatures": ["f7t"],
"NodeName": "nid001",
"Partitions": [
"part01",
"part02"
],
"State": [
"IDLE"
]
}]
assert valid_client.nodes(machine="cluster1", nodes=["nid001"]) == response


def test_get_nodes_unknown(valid_client):
with pytest.raises(firecrest.FirecrestException):
valid_client.nodes(machine="cluster1", nodes=["nidunknown"])
44 changes: 44 additions & 0 deletions tests/test_compute_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,10 @@ def fc_server(httpserver):
"/tasks", method="GET"
).respond_with_handler(basic_compute.tasks_handler)

httpserver.expect_request(
"/compute/nodes", method="GET"
).respond_with_handler(basic_compute.nodes_request_handler)

return httpserver


Expand Down Expand Up @@ -407,3 +411,43 @@ async def test_cancel_invalid_machine(valid_client):
async def test_cancel_invalid_client(invalid_client):
with pytest.raises(firecrest.UnauthorizedException):
await invalid_client.cancel(machine="cluster1", job_id=35360071)


@pytest.mark.asyncio
async def test_get_nodes(valid_client):
response = [{
"ActiveFeatures": ["f7t"],
"NodeName": "nid001",
"Partitions": [
"part01",
"part02"
],
"State": [
"IDLE"
]
}]
assert await valid_client.nodes(machine="cluster1") == response


@pytest.mark.asyncio
async def test_get_nodes_from_list(valid_client):
response = [{
"ActiveFeatures": ["f7t"],
"NodeName": "nid001",
"Partitions": [
"part01",
"part02"
],
"State": [
"IDLE"
]
}]
assert await valid_client.nodes(machine="cluster1",
nodes=["nid001"]) == response


@pytest.mark.asyncio
async def test_get_nodes_unknown(valid_client):
with pytest.raises(firecrest.FirecrestException):
await valid_client.nodes(machine="cluster1",
nodes=["nidunknown"])

0 comments on commit d2a1e0f

Please sign in to comment.