From f598e05a67bd00b6d003e8a5691de13db5d3d15e Mon Sep 17 00:00:00 2001 From: Eirini Koutsaniti Date: Mon, 29 Apr 2024 14:11:38 +0200 Subject: [PATCH 1/8] wip --- firecrest/AsyncClient.py | 161 +++++++++++++++++++++++---------------- 1 file changed, 96 insertions(+), 65 deletions(-) diff --git a/firecrest/AsyncClient.py b/firecrest/AsyncClient.py index 2f1db9b..21b06fe 100644 --- a/firecrest/AsyncClient.py +++ b/firecrest/AsyncClient.py @@ -233,77 +233,108 @@ def is_session_closed(self) -> bool: """Check if the httpx session is closed""" return self._session.is_closed + # @_retry_requests # type: ignore + # async def _get_request( + # self, endpoint, additional_headers=None, params=None + # ) -> httpx.Response: + # microservice = endpoint.split("/")[1] + # url = f"{self._firecrest_url}{endpoint}" + + # async def _merged_get(event): + # # await self._stall_request(microservice) + # # context: ContextManager[BytesIO] = ( + # # open(target_path, "wb") # type: ignore + # # if isinstance(target_path, str) or isinstance(target_path, pathlib.Path) + # # else nullcontext(target_path) + # # ) + # # with context as f: + # # f.write(resp.content) + # async with self._locks[microservice]: + # results = self._polling_results[microservice] + # ids = self._polling_ids[microservice].copy() + # self._polling_events[microservice] = None + # self._polling_ids[microservice] = set() + # comma_sep_par = "tasks" if microservice == "tasks" else "jobs" + # if ids == {"*"}: + # if comma_sep_par in params: + # del params[comma_sep_par] + # else: + # params[comma_sep_par] = ",".join(ids) + + # headers = { + # "Authorization": f"Bearer {self._authorization.get_access_token()}" + # } + # if additional_headers: + # headers.update(additional_headers) + + # logger.info(f"Making GET request to {endpoint}") + # resp = await self._session.get( + # url=url, headers=headers, params=params, timeout=self.timeout + # ) + + # self._next_request_ts[microservice] = ( + # time.time() + self.time_between_calls[microservice] + # ) + + # results.append(resp) + # event.set() + + # return + + # if microservice == "tasks" or endpoint in ("/compute/jobs", "/compute/acct"): + # async with self._locks[microservice]: + # if self._polling_ids[microservice] != {"*"}: + # comma_sep_par = "tasks" if microservice == "tasks" else "jobs" + # if comma_sep_par not in params: + # self._polling_ids[microservice] = {"*"} + # else: + # task_ids = params[comma_sep_par].split(",") + # self._polling_ids[microservice].update(task_ids) + + # if self._polling_events[microservice] is None: + # self._polling_events[microservice] = asyncio.Event() + # my_event = self._polling_events[microservice] + # self._polling_results[microservice] = [] + # my_result = self._polling_results[microservice] + # waiter = True + # task = asyncio.create_task(_merged_get(my_event)) + # else: + # waiter = False + # my_event = self._polling_events[microservice] + # my_result = self._polling_results[microservice] + + # if waiter: + # await task + + # await my_event.wait() # type: ignore + # resp = my_result[0] + # return resp + + # # Otherwise just do what you usually do + # async with self._locks[microservice]: + # await self._stall_request(microservice) + # headers = { + # "Authorization": f"Bearer {self._authorization.get_access_token()}" + # } + # if additional_headers: + # headers.update(additional_headers) + + # logger.info(f"Making GET request to {endpoint}") + # resp = await self._session.get( + # url=url, headers=headers, params=params, timeout=self.timeout + # ) + # self._next_request_ts[microservice] = ( + # time.time() + self.time_between_calls[microservice] + # ) + + # return resp + @_retry_requests # type: ignore async def _get_request( self, endpoint, additional_headers=None, params=None ) -> httpx.Response: microservice = endpoint.split("/")[1] url = f"{self._firecrest_url}{endpoint}" - - async def _merged_get(event): - await self._stall_request(microservice) - async with self._locks[microservice]: - results = self._polling_results[microservice] - ids = self._polling_ids[microservice].copy() - self._polling_events[microservice] = None - self._polling_ids[microservice] = set() - comma_sep_par = "tasks" if microservice == "tasks" else "jobs" - if ids == {"*"}: - if comma_sep_par in params: - del params[comma_sep_par] - else: - params[comma_sep_par] = ",".join(ids) - - headers = { - "Authorization": f"Bearer {self._authorization.get_access_token()}" - } - if additional_headers: - headers.update(additional_headers) - - logger.info(f"Making GET request to {endpoint}") - resp = await self._session.get( - url=url, headers=headers, params=params, timeout=self.timeout - ) - - self._next_request_ts[microservice] = ( - time.time() + self.time_between_calls[microservice] - ) - - results.append(resp) - event.set() - - return - - if microservice == "tasks" or endpoint in ("/compute/jobs", "/compute/acct"): - async with self._locks[microservice]: - if self._polling_ids[microservice] != {"*"}: - comma_sep_par = "tasks" if microservice == "tasks" else "jobs" - if comma_sep_par not in params: - self._polling_ids[microservice] = {"*"} - else: - task_ids = params[comma_sep_par].split(",") - self._polling_ids[microservice].update(task_ids) - - if self._polling_events[microservice] is None: - self._polling_events[microservice] = asyncio.Event() - my_event = self._polling_events[microservice] - self._polling_results[microservice] = [] - my_result = self._polling_results[microservice] - waiter = True - task = asyncio.create_task(_merged_get(my_event)) - else: - waiter = False - my_event = self._polling_events[microservice] - my_result = self._polling_results[microservice] - - if waiter: - await task - - await my_event.wait() # type: ignore - resp = my_result[0] - return resp - - # Otherwise just do what you usually do async with self._locks[microservice]: await self._stall_request(microservice) headers = { From 34b1e384787284f311a713efbbb1572ccc1537d6 Mon Sep 17 00:00:00 2001 From: Eirini Koutsaniti Date: Thu, 2 May 2024 14:17:46 +0200 Subject: [PATCH 2/8] wip --- firecrest/AsyncClient.py | 128 +++++++++++++++++++-------------------- 1 file changed, 64 insertions(+), 64 deletions(-) diff --git a/firecrest/AsyncClient.py b/firecrest/AsyncClient.py index 21b06fe..51475fe 100644 --- a/firecrest/AsyncClient.py +++ b/firecrest/AsyncClient.py @@ -335,21 +335,21 @@ async def _get_request( ) -> httpx.Response: microservice = endpoint.split("/")[1] url = f"{self._firecrest_url}{endpoint}" - async with self._locks[microservice]: - await self._stall_request(microservice) - headers = { - "Authorization": f"Bearer {self._authorization.get_access_token()}" - } - if additional_headers: - headers.update(additional_headers) + # async with self._locks[microservice]: + # await self._stall_request(microservice) + headers = { + "Authorization": f"Bearer {self._authorization.get_access_token()}" + } + if additional_headers: + headers.update(additional_headers) - logger.info(f"Making GET request to {endpoint}") - resp = await self._session.get( - url=url, headers=headers, params=params, timeout=self.timeout - ) - self._next_request_ts[microservice] = ( - time.time() + self.time_between_calls[microservice] - ) + logger.info(f"Making GET request to {endpoint}") + resp = await self._session.get( + url=url, headers=headers, params=params, timeout=self.timeout + ) + self._next_request_ts[microservice] = ( + time.time() + self.time_between_calls[microservice] + ) return resp @@ -359,21 +359,21 @@ async def _post_request( ) -> httpx.Response: microservice = endpoint.split("/")[1] url = f"{self._firecrest_url}{endpoint}" - async with self._locks[microservice]: - await self._stall_request(microservice) - headers = { - "Authorization": f"Bearer {self._authorization.get_access_token()}" - } - if additional_headers: - headers.update(additional_headers) + # async with self._locks[microservice]: + # await self._stall_request(microservice) + headers = { + "Authorization": f"Bearer {self._authorization.get_access_token()}" + } + if additional_headers: + headers.update(additional_headers) - logger.info(f"Making POST request to {endpoint}") - resp = await self._session.post( - url=url, headers=headers, data=data, files=files, timeout=self.timeout - ) - self._next_request_ts[microservice] = ( - time.time() + self.time_between_calls[microservice] - ) + logger.info(f"Making POST request to {endpoint}") + resp = await self._session.post( + url=url, headers=headers, data=data, files=files, timeout=self.timeout + ) + self._next_request_ts[microservice] = ( + time.time() + self.time_between_calls[microservice] + ) return resp @@ -383,21 +383,21 @@ async def _put_request( ) -> httpx.Response: microservice = endpoint.split("/")[1] url = f"{self._firecrest_url}{endpoint}" - async with self._locks[microservice]: - await self._stall_request(microservice) - headers = { - "Authorization": f"Bearer {self._authorization.get_access_token()}" - } - if additional_headers: - headers.update(additional_headers) + # async with self._locks[microservice]: + # await self._stall_request(microservice) + headers = { + "Authorization": f"Bearer {self._authorization.get_access_token()}" + } + if additional_headers: + headers.update(additional_headers) - logger.info(f"Making PUT request to {endpoint}") - resp = await self._session.put( - url=url, headers=headers, data=data, timeout=self.timeout - ) - self._next_request_ts[microservice] = ( - time.time() + self.time_between_calls[microservice] - ) + logger.info(f"Making PUT request to {endpoint}") + resp = await self._session.put( + url=url, headers=headers, data=data, timeout=self.timeout + ) + self._next_request_ts[microservice] = ( + time.time() + self.time_between_calls[microservice] + ) return resp @@ -407,28 +407,28 @@ async def _delete_request( ) -> httpx.Response: microservice = endpoint.split("/")[1] url = f"{self._firecrest_url}{endpoint}" - async with self._locks[microservice]: - await self._stall_request(microservice) - headers = { - "Authorization": f"Bearer {self._authorization.get_access_token()}" - } - if additional_headers: - headers.update(additional_headers) - - logger.info(f"Making DELETE request to {endpoint}") - # httpx doesn't support data in the `delete` method so we will have to - # use the generic `request` method - # https://www.python-httpx.org/compatibility/#request-body-on-http-methods - resp = await self._session.request( - method="DELETE", - url=url, - headers=headers, - data=data, - timeout=self.timeout, - ) - self._next_request_ts[microservice] = ( - time.time() + self.time_between_calls[microservice] - ) + # async with self._locks[microservice]: + # await self._stall_request(microservice) + headers = { + "Authorization": f"Bearer {self._authorization.get_access_token()}" + } + if additional_headers: + headers.update(additional_headers) + + logger.info(f"Making DELETE request to {endpoint}") + # httpx doesn't support data in the `delete` method so we will have to + # use the generic `request` method + # https://www.python-httpx.org/compatibility/#request-body-on-http-methods + resp = await self._session.request( + method="DELETE", + url=url, + headers=headers, + data=data, + timeout=self.timeout, + ) + self._next_request_ts[microservice] = ( + time.time() + self.time_between_calls[microservice] + ) return resp From 76d210b57fb5958d56ae52c717e5d1c02f91ef27 Mon Sep 17 00:00:00 2001 From: Eirini Koutsaniti Date: Mon, 20 May 2024 16:36:24 +0300 Subject: [PATCH 3/8] Add timers in async client --- firecrest/AsyncClient.py | 46 +++++++++++++++++++++++----------------- 1 file changed, 27 insertions(+), 19 deletions(-) diff --git a/firecrest/AsyncClient.py b/firecrest/AsyncClient.py index 377ba0e..cc0d09b 100644 --- a/firecrest/AsyncClient.py +++ b/firecrest/AsyncClient.py @@ -26,6 +26,7 @@ import firecrest.FirecrestException as fe import firecrest.types as t from firecrest.AsyncExternalStorage import AsyncExternalUpload, AsyncExternalDownload +from firecrest.utilities import time_block if sys.version_info >= (3, 8): @@ -343,9 +344,11 @@ async def _get_request( headers.update(additional_headers) logger.info(f"Making GET request to {endpoint}") - resp = await self._session.get( - url=url, headers=headers, params=params, timeout=self.timeout - ) + with time_block(f"GET request to {endpoint}", logger): + resp = await self._session.get( + url=url, headers=headers, params=params, timeout=self.timeout + ) + self._next_request_ts[microservice] = ( time.time() + self.time_between_calls[microservice] ) @@ -367,9 +370,11 @@ async def _post_request( headers.update(additional_headers) logger.info(f"Making POST request to {endpoint}") - resp = await self._session.post( - url=url, headers=headers, data=data, files=files, timeout=self.timeout - ) + with time_block(f"POST request to {endpoint}", logger): + resp = await self._session.post( + url=url, headers=headers, data=data, files=files, timeout=self.timeout + ) + self._next_request_ts[microservice] = ( time.time() + self.time_between_calls[microservice] ) @@ -391,9 +396,11 @@ async def _put_request( headers.update(additional_headers) logger.info(f"Making PUT request to {endpoint}") - resp = await self._session.put( - url=url, headers=headers, data=data, timeout=self.timeout - ) + with time_block(f"PUT request to {endpoint}", logger): + resp = await self._session.put( + url=url, headers=headers, data=data, timeout=self.timeout + ) + self._next_request_ts[microservice] = ( time.time() + self.time_between_calls[microservice] ) @@ -415,16 +422,17 @@ async def _delete_request( headers.update(additional_headers) logger.info(f"Making DELETE request to {endpoint}") - # httpx doesn't support data in the `delete` method so we will have to - # use the generic `request` method - # https://www.python-httpx.org/compatibility/#request-body-on-http-methods - resp = await self._session.request( - method="DELETE", - url=url, - headers=headers, - data=data, - timeout=self.timeout, - ) + with time_block(f"DELETE request to {endpoint}", logger): + # httpx doesn't support data in the `delete` method so we will + # have to use the generic `request` method + # https://www.python-httpx.org/compatibility/#request-body-on-http-methods + resp = await self._session.request( + method="DELETE", + url=url, + headers=headers, + data=data, + timeout=self.timeout, + ) self._next_request_ts[microservice] = ( time.time() + self.time_between_calls[microservice] ) From 8314c84bc6424d79ccd0ca9ca6b256904fce62a6 Mon Sep 17 00:00:00 2001 From: Eirini Koutsaniti Date: Fri, 24 May 2024 12:25:57 +0300 Subject: [PATCH 4/8] Stash --- firecrest/AsyncClient.py | 29 ++++++++++++----------------- 1 file changed, 12 insertions(+), 17 deletions(-) diff --git a/firecrest/AsyncClient.py b/firecrest/AsyncClient.py index cc0d09b..dc28be6 100644 --- a/firecrest/AsyncClient.py +++ b/firecrest/AsyncClient.py @@ -361,8 +361,10 @@ async def _post_request( ) -> httpx.Response: microservice = endpoint.split("/")[1] url = f"{self._firecrest_url}{endpoint}" - # async with self._locks[microservice]: - # await self._stall_request(microservice) + await self._stall_request(microservice) + self._next_request_ts[microservice] = ( + time.time() + self.time_between_calls[microservice] + ) headers = { "Authorization": f"Bearer {self._authorization.get_access_token()}" } @@ -375,10 +377,6 @@ async def _post_request( url=url, headers=headers, data=data, files=files, timeout=self.timeout ) - self._next_request_ts[microservice] = ( - time.time() + self.time_between_calls[microservice] - ) - return resp @_retry_requests # type: ignore @@ -387,8 +385,10 @@ async def _put_request( ) -> httpx.Response: microservice = endpoint.split("/")[1] url = f"{self._firecrest_url}{endpoint}" - # async with self._locks[microservice]: - # await self._stall_request(microservice) + self._next_request_ts[microservice] = ( + time.time() + self.time_between_calls[microservice] + ) + await self._stall_request(microservice) headers = { "Authorization": f"Bearer {self._authorization.get_access_token()}" } @@ -401,10 +401,6 @@ async def _put_request( url=url, headers=headers, data=data, timeout=self.timeout ) - self._next_request_ts[microservice] = ( - time.time() + self.time_between_calls[microservice] - ) - return resp @_retry_requests # type: ignore @@ -413,8 +409,10 @@ async def _delete_request( ) -> httpx.Response: microservice = endpoint.split("/")[1] url = f"{self._firecrest_url}{endpoint}" - # async with self._locks[microservice]: - # await self._stall_request(microservice) + await self._stall_request(microservice) + self._next_request_ts[microservice] = ( + time.time() + self.time_between_calls[microservice] + ) headers = { "Authorization": f"Bearer {self._authorization.get_access_token()}" } @@ -433,9 +431,6 @@ async def _delete_request( data=data, timeout=self.timeout, ) - self._next_request_ts[microservice] = ( - time.time() + self.time_between_calls[microservice] - ) return resp From 01c50480741507c31b3d7e164521b115d701a968 Mon Sep 17 00:00:00 2001 From: Eirini Koutsaniti Date: Tue, 28 May 2024 09:52:42 +0300 Subject: [PATCH 5/8] Don't merge reqs by default --- firecrest/AsyncClient.py | 190 ++++++++++++++++----------------------- 1 file changed, 78 insertions(+), 112 deletions(-) diff --git a/firecrest/AsyncClient.py b/firecrest/AsyncClient.py index dc28be6..e7f8ed3 100644 --- a/firecrest/AsyncClient.py +++ b/firecrest/AsyncClient.py @@ -185,6 +185,10 @@ def __init__( "tasks": 0.1, "utilities": 0.1, } + #: Merge GET requests to the same endpoint, when possible. This will + #: take effect only when the time_between_calls of the microservice + #: is greater than 0. + self.merge_get_requests: bool = False self._next_request_ts: dict[str, float] = { "compute": 0, "reservations": 0, @@ -194,20 +198,26 @@ def __init__( "utilities": 0, } self._locks = { - "compute": asyncio.Lock(), - "reservations": asyncio.Lock(), - "status": asyncio.Lock(), - "storage": asyncio.Lock(), + "jobs": asyncio.Lock(), + "accounting": asyncio.Lock(), "tasks": asyncio.Lock(), - "utilities": asyncio.Lock(), } - # The following objects are used to "merge" requests in the same endpoints, - # for example requests to tasks or polling for jobs - self._polling_ids: dict[str, set] = {"compute": set(), "tasks": set()} - self._polling_results: dict[str, List] = {"compute": [], "tasks": []} + # The following objects are used to "merge" requests in the same + # endpoints, for example requests to tasks or polling for jobs + self._polling_ids: dict[str, set] = { + "jobs": set(), + "accounting": set(), + "tasks": set() + } + self._polling_results: dict[str, List] = { + "jobs": [], + "accounting": [], + "tasks": [] + } self._polling_events: dict[str, Optional[asyncio.Event]] = { - "compute": None, + "jobs": None, + "accounting": None, "tasks": None, } @@ -233,110 +243,22 @@ def is_session_closed(self) -> bool: """Check if the httpx session is closed""" return self._session.is_closed - # @_retry_requests # type: ignore - # async def _get_request( - # self, endpoint, additional_headers=None, params=None - # ) -> httpx.Response: - # microservice = endpoint.split("/")[1] - # url = f"{self._firecrest_url}{endpoint}" - - # async def _merged_get(event): - # # await self._stall_request(microservice) - # # context: ContextManager[BytesIO] = ( - # # open(target_path, "wb") # type: ignore - # # if isinstance(target_path, str) or isinstance(target_path, pathlib.Path) - # # else nullcontext(target_path) - # # ) - # # with context as f: - # # f.write(resp.content) - # async with self._locks[microservice]: - # results = self._polling_results[microservice] - # ids = self._polling_ids[microservice].copy() - # self._polling_events[microservice] = None - # self._polling_ids[microservice] = set() - # comma_sep_par = "tasks" if microservice == "tasks" else "jobs" - # if ids == {"*"}: - # if comma_sep_par in params: - # del params[comma_sep_par] - # else: - # params[comma_sep_par] = ",".join(ids) - - # headers = { - # "Authorization": f"Bearer {self._authorization.get_access_token()}" - # } - # if additional_headers: - # headers.update(additional_headers) - - # logger.info(f"Making GET request to {endpoint}") - # resp = await self._session.get( - # url=url, headers=headers, params=params, timeout=self.timeout - # ) - - # self._next_request_ts[microservice] = ( - # time.time() + self.time_between_calls[microservice] - # ) - - # results.append(resp) - # event.set() - - # return - - # if microservice == "tasks" or endpoint in ("/compute/jobs", "/compute/acct"): - # async with self._locks[microservice]: - # if self._polling_ids[microservice] != {"*"}: - # comma_sep_par = "tasks" if microservice == "tasks" else "jobs" - # if comma_sep_par not in params: - # self._polling_ids[microservice] = {"*"} - # else: - # task_ids = params[comma_sep_par].split(",") - # self._polling_ids[microservice].update(task_ids) - - # if self._polling_events[microservice] is None: - # self._polling_events[microservice] = asyncio.Event() - # my_event = self._polling_events[microservice] - # self._polling_results[microservice] = [] - # my_result = self._polling_results[microservice] - # waiter = True - # task = asyncio.create_task(_merged_get(my_event)) - # else: - # waiter = False - # my_event = self._polling_events[microservice] - # my_result = self._polling_results[microservice] - - # if waiter: - # await task - - # await my_event.wait() # type: ignore - # resp = my_result[0] - # return resp - - # # Otherwise just do what you usually do - # async with self._locks[microservice]: - # await self._stall_request(microservice) - # headers = { - # "Authorization": f"Bearer {self._authorization.get_access_token()}" - # } - # if additional_headers: - # headers.update(additional_headers) - - # logger.info(f"Making GET request to {endpoint}") - # resp = await self._session.get( - # url=url, headers=headers, params=params, timeout=self.timeout - # ) - # self._next_request_ts[microservice] = ( - # time.time() + self.time_between_calls[microservice] - # ) - - # return resp + @_retry_requests # type: ignore + async def _get_merge_request( + self, endpoint, additional_headers=None, params=None + ) -> httpx.Response: + pass @_retry_requests # type: ignore - async def _get_request( + async def _get_simple_request( self, endpoint, additional_headers=None, params=None ) -> httpx.Response: microservice = endpoint.split("/")[1] url = f"{self._firecrest_url}{endpoint}" - # async with self._locks[microservice]: - # await self._stall_request(microservice) + await self._stall_request(microservice) + self._next_request_ts[microservice] = ( + time.time() + self.time_between_calls[microservice] + ) headers = { "Authorization": f"Bearer {self._authorization.get_access_token()}" } @@ -349,12 +271,56 @@ async def _get_request( url=url, headers=headers, params=params, timeout=self.timeout ) - self._next_request_ts[microservice] = ( - time.time() + self.time_between_calls[microservice] - ) - return resp + async def _get_request( + self, endpoint, additional_headers=None, params=None + ) -> httpx.Response: + microservice = endpoint.split("/")[1] + if ( + self.merge_get_requests and + self.time_between_calls[microservice] > 0 and + endpoint in ("/compute/jobs", "/compute/acct", "/tasks") + ): + # We can only merge requests with the additional restrictions: + # - For `/compute/acct` we can merge only if the start_time, + # end_time, and pagination parameters are not set. + # Moreover we cannot merge if the `*` is used as a task id, + # because the default `sacct` command will only return the + # jobs of the last day. + # - For `/compute/jobs` we can merge only if the pagination + # parameters are not set. + if ( + endpoint == "/compute/acct" + and ( + "starttime" not in params + or "endtime" not in params + or "pageSize" not in params + or "pageNumber" not in params + or params.get("jobs") + ) + ) or ( + endpoint == "/compute/jobs" + and ( + "pageSize" not in params + or "pageNumber" not in params + or params.get("jobs") + ) + ) or ( + endpoint == "/tasks" + ): + return await self._get_merge_request( + endpoint=endpoint, + additional_headers=additional_headers, + params=params + ) + + return await self._get_simple_request( + endpoint=endpoint, + additional_headers=additional_headers, + params=params + ) + @_retry_requests # type: ignore async def _post_request( self, endpoint, additional_headers=None, data=None, files=None From 28018d106a9040b34593845a1866d58bd21a4f0a Mon Sep 17 00:00:00 2001 From: Eirini Koutsaniti Date: Tue, 28 May 2024 12:30:17 +0300 Subject: [PATCH 6/8] Bring back the old merge_request --- firecrest/AsyncClient.py | 88 +++++++++++++++++++++++++++++++++++----- 1 file changed, 78 insertions(+), 10 deletions(-) diff --git a/firecrest/AsyncClient.py b/firecrest/AsyncClient.py index e7f8ed3..ff5bbf4 100644 --- a/firecrest/AsyncClient.py +++ b/firecrest/AsyncClient.py @@ -206,18 +206,18 @@ def __init__( # The following objects are used to "merge" requests in the same # endpoints, for example requests to tasks or polling for jobs self._polling_ids: dict[str, set] = { - "jobs": set(), - "accounting": set(), + "/compute/jobs": set(), + "/compute/acct": set(), "tasks": set() } self._polling_results: dict[str, List] = { - "jobs": [], - "accounting": [], + "/compute/jobs": [], + "/compute/acct": [], "tasks": [] } self._polling_events: dict[str, Optional[asyncio.Event]] = { - "jobs": None, - "accounting": None, + "/compute/jobs": None, + "/compute/acct": None, "tasks": None, } @@ -247,7 +247,73 @@ def is_session_closed(self) -> bool: async def _get_merge_request( self, endpoint, additional_headers=None, params=None ) -> httpx.Response: - pass + microservice = endpoint.split("/")[1] + url = f"{self._firecrest_url}{endpoint}" + + async def _merged_get(event): + await self._stall_request(microservice) + async with self._locks[endpoint]: + results = self._polling_results[endpoint] + ids = self._polling_ids[endpoint].copy() + self._polling_events[endpoint] = None + self._polling_ids[endpoint] = set() + comma_sep_par = "tasks" if microservice == "tasks" else "jobs" + if ids == {"*"}: + if comma_sep_par in params: + del params[comma_sep_par] + else: + params[comma_sep_par] = ",".join(ids) + + headers = { + "Authorization": f"Bearer {self._authorization.get_access_token()}" + } + if additional_headers: + headers.update(additional_headers) + + logger.info(f"Making GET request to {endpoint}") + with time_block(f"GET request to {endpoint}", logger): + resp = await self._session.get( + url=url, headers=headers, + params=params, + timeout=self.timeout + ) + + self._next_request_ts[microservice] = ( + time.time() + self.time_between_calls[microservice] + ) + + results.append(resp) + event.set() + + return + + async with self._locks[endpoint]: + if self._polling_ids[endpoint] != {"*"}: + comma_sep_par = "tasks" if endpoint == "/tasks" else "jobs" + if comma_sep_par not in params: + self._polling_ids[endpoint] = {"*"} + else: + new_ids = params[comma_sep_par].split(",") + self._polling_ids[endpoint].update(new_ids) + + if self._polling_events[endpoint] is None: + self._polling_events[endpoint] = asyncio.Event() + my_event = self._polling_events[endpoint] + self._polling_results[endpoint] = [] + my_result = self._polling_results[endpoint] + waiter = True + task = asyncio.create_task(_merged_get(my_event)) + else: + waiter = False + my_event = self._polling_events[endpoint] + my_result = self._polling_results[endpoint] + + if waiter: + await task + + await my_event.wait() # type: ignore + resp = my_result[0] + return resp @_retry_requests # type: ignore async def _get_simple_request( @@ -404,11 +470,13 @@ async def _stall_request(self, microservice: str) -> None: if self._next_request_ts[microservice] is not None: while time.time() <= self._next_request_ts[microservice]: logger.debug( - f"`{microservice}` microservice has received too many requests. " - f"Going to sleep for " + f"`{microservice}` microservice has received too many " + f"requests. Going to sleep for " f"~{self._next_request_ts[microservice] - time.time()} sec" ) - await asyncio.sleep(self._next_request_ts[microservice] - time.time()) + await asyncio.sleep( + self._next_request_ts[microservice] - time.time() + ) @overload def _json_response( From a4019e21d3d1d6c920821980e40698449b0e79e1 Mon Sep 17 00:00:00 2001 From: Eirini Koutsaniti Date: Tue, 28 May 2024 14:26:24 +0300 Subject: [PATCH 7/8] Fix internal dictionaries keys --- firecrest/AsyncClient.py | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/firecrest/AsyncClient.py b/firecrest/AsyncClient.py index ff5bbf4..74e3142 100644 --- a/firecrest/AsyncClient.py +++ b/firecrest/AsyncClient.py @@ -198,27 +198,26 @@ def __init__( "utilities": 0, } self._locks = { - "jobs": asyncio.Lock(), - "accounting": asyncio.Lock(), - "tasks": asyncio.Lock(), + "/compute/jobs": asyncio.Lock(), + "/compute/acct": asyncio.Lock(), + "/tasks": asyncio.Lock(), } - # The following objects are used to "merge" requests in the same # endpoints, for example requests to tasks or polling for jobs self._polling_ids: dict[str, set] = { "/compute/jobs": set(), "/compute/acct": set(), - "tasks": set() + "/tasks": set() } self._polling_results: dict[str, List] = { "/compute/jobs": [], "/compute/acct": [], - "tasks": [] + "/tasks": [] } self._polling_events: dict[str, Optional[asyncio.Event]] = { "/compute/jobs": None, "/compute/acct": None, - "tasks": None, + "/tasks": None, } def set_api_version(self, api_version: str) -> None: From 375bd8173e1777a81c77c8f9140ed83a684524e6 Mon Sep 17 00:00:00 2001 From: Eirini Koutsaniti Date: Tue, 28 May 2024 15:07:25 +0300 Subject: [PATCH 8/8] Move decorator to wrapper GET function --- firecrest/AsyncClient.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/firecrest/AsyncClient.py b/firecrest/AsyncClient.py index 74e3142..fe513ce 100644 --- a/firecrest/AsyncClient.py +++ b/firecrest/AsyncClient.py @@ -242,7 +242,6 @@ def is_session_closed(self) -> bool: """Check if the httpx session is closed""" return self._session.is_closed - @_retry_requests # type: ignore async def _get_merge_request( self, endpoint, additional_headers=None, params=None ) -> httpx.Response: @@ -314,7 +313,6 @@ async def _merged_get(event): resp = my_result[0] return resp - @_retry_requests # type: ignore async def _get_simple_request( self, endpoint, additional_headers=None, params=None ) -> httpx.Response: @@ -338,6 +336,7 @@ async def _get_simple_request( return resp + @_retry_requests # type: ignore async def _get_request( self, endpoint, additional_headers=None, params=None ) -> httpx.Response: