From ecb981f60d70779981dc741f3946eba5ae2aaca5 Mon Sep 17 00:00:00 2001 From: Rafael Sarmiento Date: Wed, 23 Oct 2024 10:19:35 +0200 Subject: [PATCH] add fs endpoints --- firecrest/v2/AsyncClient.py | 387 +++++++++++++++++++++++++++++++++++- 1 file changed, 386 insertions(+), 1 deletion(-) diff --git a/firecrest/v2/AsyncClient.py b/firecrest/v2/AsyncClient.py index 06d63e3..602fad8 100644 --- a/firecrest/v2/AsyncClient.py +++ b/firecrest/v2/AsyncClient.py @@ -9,9 +9,12 @@ import httpx import json import logging +import pathlib import ssl -from typing import Any, Optional, List +from io import BytesIO +from contextlib import nullcontext +from typing import Any, ContextManager, Optional, List from packaging.version import Version, parse from firecrest.utilities import ( @@ -263,3 +266,385 @@ async def partitions( endpoint=f"/status/{system_name}/partitions" ) return self._json_response(resp, 200)["partitions"] + + async def list_files( + self, + system_name: str, + path: str, + show_hidden: bool = False, + recursive: bool = False, + numeric_uid: bool = False, + follow_links: bool = False + ) -> List[dict]: + """Returns a list of files in a directory. + + :param system_name: the machine name where the filesystem belongs to + :param path: the absolute target path + :param show_hidden: show hidden files + :param recursive: recursively list directories encountered + :param follow_links: Follow symbolik links + :calls: GET `/filesystem/{system_name}/ops/ls` + """ + params: dict[str, Any] = {"path": f"{path}"} + if show_hidden is True: + params["showHidden"] = show_hidden + + if recursive is True: + params["recursive"] = recursive + + if numeric_uid is True: + params["numericUid"] = numeric_uid + + if follow_links is True: + params["followLinks"] = follow_links + + resp = await self._get_request( + endpoint=f"/filesystem/{system_name}/ops/ls", + params=params + ) + return self._json_response(resp, 200) + + async def head( + self, + system_name: str, + path: str, + bytes: Optional[int] = None, + lines: Optional[int] = None, + skip_ending: bool = False, + ) -> List[dict]: + """Display the beginning of a specified file. + By default 10 lines will be returned. + Bytes and lines cannot be specified simultaneously. + + :param system_name: the machine name where the filesystem belongs to + :param path: the absolute target path + :param bytes: The output will be the first NUM bytes of each file + :param lines: The output will be the first NUM lines of each file + :param skip_ending: The output will be the whole file, without the last + NUM bytes/lines of each file. NUM should be + specified in the respective argument through + ``bytes`` or ``lines``. + :calls: GET `/filesystem/{system_name}/ops/head` + """ + params: dict[str, Any] = {"path": f"{path}"} + if bytes is True: + params["bytes"] = bytes + + if lines is True: + params["lines"] = lines + + if skip_ending is True: + params["skipEnding"] = skip_beginning + + resp = await self._get_request( + endpoint=f"/filesystem/{system_name}/ops/head", + params=params + ) + return self._json_response(resp, 200) + + async def tail( + self, + system_name: str, + path: str, + bytes: Optional[int] = None, + lines: Optional[int] = None, + skip_beginning: bool = False, + ) -> List[dict]: + """Display the ending of a specified file. + By default 10 lines will be returned. + Bytes and lines cannot be specified simultaneously. + + :param system_name: the machine name where the filesystem belongs to + :param path: the absolute target path + :param bytes: The output will be the last NUM bytes of each file + :param lines: The output will be the last NUM lines of each file + :param skip_beginning: The output will be the whole file, without the last + NUM bytes/lines of each file. NUM should be + specified in the respective argument through + ``bytes`` or ``lines``. + :calls: GET `/filesystem/{system_name}/ops/tail` + """ + params: dict[str, Any] = {"path": f"{path}"} + if bytes is True: + params["bytes"] = bytes + + if lines is True: + params["lines"] = lines + + if skip_beginning is True: + params["skipBeginning"] = skip_beginning + + resp = await self._get_request( + endpoint=f"/filesystem/{system_name}/ops/tail", + params=params + ) + return self._json_response(resp, 200) + + async def view( + self, + system_name: str, + path: str, + ) -> List[dict]: + """ + View full file content (up to 5MB files) + + :param system_name: the machine name where the filesystem belongs to + :param path: the absolute target path + :calls: GET `/filesystem/{system_name}/ops/view` + """ + params: dict[str, str] = {"path": f"{path}"} + + resp = await self._get_request( + endpoint=f"/filesystem/{system_name}/ops/view", + params=params + ) + return self._json_response(resp, 200) + + async def checksum( + self, + system_name: str, + path: str, + ) -> List[dict]: + """ + Calculate the SHA256 (256-bit) checksum of a specified file. + + :param system_name: the machine name where the filesystem belongs to + :param path: the absolute target path + :calls: GET `/filesystem/{system_name}/ops/checksum` + """ + params: dict[str, str] = {"path": f"{path}"} + + resp = await self._get_request( + endpoint=f"/filesystem/{system_name}/ops/checksum", + params=params + ) + return self._json_response(resp, 200) + + async def file_type( + self, + system_name: str, + path: str, + ) -> List[dict]: + """ + Uses the `file` linux application to determine the type of a file. + + :param system_name: the machine name where the filesystem belongs to + :param path: the absolute target path + :calls: GET `/filesystem/{system_name}/ops/checksum` + """ + params: dict[str, str] = {"path": f"{path}"} + + resp = await self._get_request( + endpoint=f"/filesystem/{system_name}/ops/file", + params=params + ) + return self._json_response(resp, 200) + + async def chmod( + self, + system_name: str, + path: str, + mode: str + ) -> List[dict]: + """Changes the file mod bits of a given file according to the specified mode. + + :param machine: the machine name where the filesystem belongs to + :param path: the absolute target path + :param mode: same as numeric mode of linux chmod tool + :calls: PUT `/filesystem/{system_name}/ops/chmod` + """ + params: dict[str, str] = { + "path": f"{path}", + "mode": f"{mode}" + } + resp = await self._put_request( + endpoint=f"/filesystem/{system_name}/ops/chmod", + params=params + ) + return self._json_response(resp, 200) + + async def chown( + self, + system_name: str, + path: str, + owner: str, + group: str + ) -> List[dict]: + """Changes the user and/or group ownership of a given file. + If only owner or group information is passed, only that information will be updated. + + :param machine: the machine name where the filesystem belongs to + :param path: the absolute target path + :param owner: owner ID for target + :param group: group ID for target + :calls: PUT `/filesystem/{system_name}/ops/chown` + """ + params: dict[str, str] = {"path": f"{path}"} + if owner: + data["owner"] = owner + + if group: + data["group"] = group + + resp = await self._put_request( + endpoint=f"/filesystem/{system_name}/ops/chown", + params=params + ) + return self._json_response(resp, 200) + + + async def stat( + self, + system_name: str, + path: str, + ) -> List[dict]: + """ + Uses the stat linux application to determine the status of a file on the machine's filesystem. + The result follows: https://docs.python.org/3/library/os.html#os.stat_result. + + :param system_name: the machine name where the filesystem belongs to + :param path: the absolute target path + :param dereference: follow symbolic links + :calls: GET `/filesystem/{system_name}/ops/checksum` + """ + params: dict[str, str] = {"path": f"{path}"} + + if dereference is True: + params["dereference"] = dereference + + resp = await self._get_request( + endpoint=f"/filesystem/{system_name}/ops/stat", + params=params + ) + return self._json_response(resp, 200) + + async def upload( + self, + system_name: str, + source_path: str, + target_path: str, + filename: Optional[str] = None, + ) -> List[dict]: + """Blocking call to upload a small file. + The file that will be uploaded will have the same name as the source_path. + + :param machine: the machine name where the filesystem belongs to + :param source_path: the source path of the file or binary stream + :param target_path: the absolute target path of the directory where the file will be uploaded + :param filename: naming target file to filename (default is same as the local one) + :calls: POST `/filesystem/{system_name}/transfer/upload` + """ + context: ContextManager[BytesIO] = ( + open(source_path, "rb") # type: ignore + if isinstance(source_path, str) or isinstance(source_path, pathlib.Path) + else nullcontext(source_path) + ) + with context as f: + # Set filename + if filename is not None: + f = (filename, f) # type: ignore + + params: dict[str, str] = { + "fileName": f, + "targetPath": f"{target_path}" + } + + resp = await self._post_request( + endpoint=f"/filesystem/{system_name}/transfer/upload", + params=params + ) + return self._json_response(resp, 200) + + async def download( + self, + system_name: str, + source_path: str, + target_path: str, + ) -> List[dict]: + """Blocking call to download a small file. + + :param machine: the machine name where the filesystem belongs to + :param source_path: the absolute source path of the file or binary stream + :param target_path: the target path in the local filesystem or binary stream + :calls: POST `/filesystem/{system_name}/transfer/upload` + """ + params: dict[str, str] = {"source_path": f"{sourcePath}"} + resp = await self._post_request( + endpoint=f"/filesystem/{system_name}/transfer/upload", + params=params + ) + self._json_response([resp], 200, allow_none_result=True) + context: ContextManager[BytesIO] = ( + open(target_path, "wb") # type: ignore + if isinstance(target_path, str) or isinstance(target_path, pathlib.Path) + else nullcontext(target_path) + ) + with context as f: + f.write(resp.content) + + async def mv( + self, + system_name: str, + source_path: str, + target_path: str + ) -> List[dict]: + """Rename/move a file, directory, or symlink at the `source_path` to the `target_path` on `machine`'s filesystem. + When successful, the method returns a string with the new path of the file. + + :param machine: the machine name where the filesystem belongs to + :param source_path: the absolute source path + :param target_path: the absolute target path + :calls: POST `/filesystem/{system_name}/transfer/mv` + """ + params: dict[str, str] = { + "sourcePath": f"{source_path}", + "targetPath": f"{target_path}" + } + resp = await self._post_request( + endpoint=f"/filesystem/{system_name}/transfer/mv", + params=params + ) + return self._json_response(resp, 200) + + async def cp( + self, + system_name: str, + source_path: str, + target_path: str + ) -> List[dict]: + """Copies file from `source_path` to `target_path`. + When successful, the method returns a string with the path of the newly created file. + + :param machine: the machine name where the filesystem belongs to + :param source_path: the absolute source path + :param target_path: the absolute target path + :calls: POST `/filesystem/{system_name}/transfer/cp` + """ + params: dict[str, str] = { + "sourcePath": f"{source_path}", + "targetPath": f"{target_path}" + } + resp = await self._post_request( + endpoint=f"/filesystem/{system_name}/transfer/cp", + params=params + ) + return self._json_response(resp, 200) + + async def rm( + self, + system_name: str, + path: str, + target_path: str + ) -> List[dict]: + """Blocking call to delete a small file. + + :param machine: the machine name where the filesystem belongs to + :param path: the absolute target path + :calls: DELETE `/filesystem/{system_name}/transfer/rm` + """ + params: dict[str, str] = {"path": f"{path}"} + resp = await self._post_request( + endpoint=f"/filesystem/{system_name}/transfer/rm", + params=params + ) + return self._json_response(resp, 200)