-
Notifications
You must be signed in to change notification settings - Fork 7
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
AsyncFirecrest: Practicality and Efficiency #94
Comments
Hi @khsrali , thanks for reporting this. I had a quick look trying to figure out where this error is coming from and I don't know yet. 🤔 As far as I understand -ideally- you are not meant to call |
Hi @ekouts, thanks for following up! You are right, so instead I tried with this, which I think it should be a valid
Here I explicitly set a new_event_loop, but raises
It seems this is due to the way AsyncClient.py is locking loops. |
Hi @khsrali and sorry for the delayed response. So, l found out where the error was coming from and it's from the fact that I am using the same httpx.AsyncClient() for all the requests of a client. Based on the original code, here is a small example with the same error: import asyncio
import httpx
class Test():
def __init__(self):
self._session = httpx.AsyncClient()
async def print_facts(self):
resp = await self._session.get('https://catfact.ninja/fact')
print(resp.json())
t = Test()
async def facts():
await t.print_facts()
asyncio.run(facts())
asyncio.run(facts()) Sorry for that. This was me being a bit lazy and assuming that this may speed some things up without testing. It's probably worth investing some time from our side to make this a bit better/more efficient. But what are you trying to achieve in your code? There are different ways to fix the issue without changing pyfirecrest. For example
|
Thank you @ekouts for following up. It's good that you found the issue, I'll look it up.
I think it certainly does. I have some testing regarding efficiency of
Basically, I'm trying to make a function that could be called multiple times, to download different bunch of files at each call. One can use the work around you provided, but still, if one needs to do two different operation withing let's say Unfortunately eiger is down now, and I have to find an alternative machine to final check my tests, the I'll share it with you. |
Thank you, I would be very interested in this! Regarding the
And finally on this:
How I imagined the I could also add some function in the |
Hi @ekouts , @jpdorsch, --P.S. unfortunately I don't have Elia's github name to pin here, please do if you have-- In today's meeting, I mentioned a simple test I conducted, and I'd like to share the results with you. The one with
and the one with
Which means submitting external_download requests for 15 files takes about 79 seconds, a bit longer than I expected, especially considering First script, which is not using async: from firecrest import ClientCredentialsAuth, Firecrest
from queue import Queue
from time import monotonic, process_time
client_id = '.'
secret = '.'
token_uri = "https://auth.cscs.ch/auth/realms/firecrest-clients/protocol/openid-connect/token"
fire_url="https://firecrest.cscs.ch"
_machine= "daint"#"eiger"
_client = Firecrest(
firecrest_url=fire_url,
authorization=ClientCredentialsAuth(client_id, secret, token_uri),
)
remote_tree = '/scratch/snx3000/akhosrav/test-1mb/'#'/capstor/scratch/cscs/akhosrav/test-1mb/'
local_path = '.'
files = _client.list_files(_machine, remote_tree)
NN = 15
def getfiles( list_: list
) -> None:
q = Queue(maxsize = 0)
# Blocking call
for i, item in enumerate(list_[:NN], start=1):
down_obj = _client.external_download(_machine, remote_tree + str(item['name']) )
q.put([down_obj , local_path + str(item['name']) ])
user_i = process_time()
real_i = monotonic()
getfiles(files)
user = (process_time() - user_i)
real = (monotonic() - real_i)
print(f'waiting time (real minues user) {real-user:.2f}s')
print('reguest rate [#/min]',f'{60*NN/(real):.2f}s') AND WITH import asyncio
from firecrest import ClientCredentialsAuth, AsyncFirecrest
from queue import Queue
from time import process_time, monotonic
client_id = '.'
secret = '.'
token_uri = "https://auth.cscs.ch/auth/realms/firecrest-clients/protocol/openid-connect/token"
fire_url="https://firecrest.cscs.ch"
_machine= "daint"# "eiger"
remote_tree = '/scratch/snx3000/akhosrav/test-1mb/'#'/capstor/scratch/cscs/akhosrav/test-1mb/'
local_path = '.'
NN = 15
async def submit_download(name, queue, As_client):
while True:
item_ = await queue.get()
down_obj = await As_client.external_download(_machine, remote_tree + str(item_['name']) )
queue.task_done()
async def main():
As_client = AsyncFirecrest(
firecrest_url=fire_url,
authorization=ClientCredentialsAuth(client_id, secret, token_uri),
)
list_ = await As_client.list_files(_machine, remote_tree)
queue = asyncio.Queue()
for _ in list_[:NN]:
queue.put_nowait(_)
tasks = []
for i in range(4):
task = asyncio.create_task(submit_download(f'worker-{i}', queue, As_client))
tasks.append(task)
await queue.join()
for task in tasks:
task.cancel()
await asyncio.gather(*tasks, return_exceptions=True)
user_i = process_time()
real_i = monotonic()
asyncio.run(main())
user = (process_time() - user_i)
real = (monotonic() - real_i)
print(f'waiting time (real minues user) {real-user:.2f}s')
print(', reguest rate [#/min]',f'{60*NN/(real):.2f}s') |
Hi @khsrali, Indeed there is something you can easily tweak. Try defining your client like this: As_client = AsyncFirecrest(
firecrest_url=fire_url,
authorization=ClientCredentialsAuth(client_id, secret, token_uri),
)
As_client.time_between_calls = {
"compute": 0,
"reservations": 0,
"status": 0,
"storage": 0,
"tasks": 0,
"utilities": 0,
} The asynchronous version of pyfirecrest has a mechanism with which you can set your preferred rate limit per microservice. By default I have set this to 5 seconds and indeed this makes everything unnecessarily slow. 😅 😬 Please let me know if the time of the async client improves. PS. I add here the github account of Elia: @theely |
Hi @ekouts Haha, this was certainly an improvement :) , thanks!
Actually, I was hoping that the async approach would really speed things up, theoretically aiming for around 0.5 seconds in total, given we have 15 files and the ideal time would be roughly 7.75 seconds divided by 15. Do you think it's possible that the server might be a bit slow or maybe there are some limitations in place? |
Yes, I was also looking at the code in the morning and I was trying to figure out why it's not faster. It is coming from this imposed rate limit, because I am using some locks that end up serialising the request per microservice. I could remove the locks when the rate limit is 0, but I don't want to completely remove the logic of it. I think I can fix it but I don't think I will have time for it this week. I will ping you as soon as I have something to try. But thanks again for the feedback, it has been already very helpful in improving the async client! |
Thank you very much, @ekouts! I appreciate your following up. Below, I've summarized the key points we've touched upon both here and in our recent meeting, for our future reference: Efficiency: We may consider reviewing the locking policy and discuss the possibly increasing rate limit. The goal is to reach a waiting time of (1/N)th of non-asynchronous operations, so ~0.5 seconds in the above script. Practicality: It might be beneficial to offer users the option to interact with the lower level of asynchronous operation, for example:
To enhance usability and prevent the need for users to instantiate multiple clients, we could allow users to decide whether to maintain a session active or to terminate it following a Additionally, the idea of making simple_delete asynchronous or, alternatively, providing an option to control this behavior, e.g. I think these adjustments could contribute in our use of async client in AiiDA. I look forward to hearing your thoughts and any further suggestions you might have. Thank you again! |
Hi @khsrali! We made a release yesterday with some of the changes we discussed in the issue, in case you would like to try it. You can find more details in #96 and #97 . We also reduced the default sleeping times in both clients. The last issue about performance in the async client is not addressed in the release. I will let you know when we have news on this. |
Thanks a lot @ekouts for the good news! looks good. |
Hi @khsrali , I just wanted to give you some updates about this issue. We have made some improvements in #107. We still need to test it a bit more, so I will let you know when it's ready to be merged. We have removed the locks from most of the requests and have kept them only for the case that some requests can be merged (for example when there are many
There are some limitations from FirecREST's side that don't allow to reach this at the moment.
|
Thanks a lot @ekouts for the heads up! |
Thanks a lot @ekouts I understand and agree the bottle neck is the rate limit from FirecREST server itself. NN =200
class AsyncClient:
def __init__(self):
self._session = httpx.AsyncClient(timeout=200.0)
self.counter = 0
async def _post_request(self, sourcePath):
task = asyncio.create_task(
self._session.post(
url=f'{fire_url}/storage/xfer-external/download',
headers={'Authorization': f'Bearer {TOKEN}', 'X-Machine-Name': machine},
data={'sourcePath': sourcePath}))
return sourcePath, task
async def _run_tasks(self, tasks):
results = await asyncio.gather(*[task for _, task in tasks])
return {sourcePath: result for (sourcePath, _), result in zip(tasks, results)}
async def send_download_req_careless(self, list_):
self.counter += len(list_)
tasks = [await self._post_request(sourcePath) for sourcePath in list_]
results = await self._run_tasks(tasks)
return results
async def send_download_req_assure(self, results={}, retry_list=[]):
for sourcePath, response in results.items():
if response.status_code == TOO_MANY_REQUESTS_CODE:
retry_list.append(sourcePath)
else:
print(f"Success for {sourcePath} with status {response.status_code}")
if retry_list:
chuncks = [retry_list[i:i + CLOGGING_LIMIT] for i in range(0, len(retry_list), CLOGGING_LIMIT)]
results = {}
for chunk in chuncks:
results.update(await self.send_download_req_careless(chunk))
# Maybe one can implement an exponential backoff here
# await asyncio.sleep(2 ** call_count)
await self.send_download_req_assure(results,retry_list=[])
async def main():
client = AsyncClient()
await client.send_download_req_assure(retry_list=files[:NN])
print(f"Total number of requests: {client.counter}")
CLOGGING_LIMIT = 200
# This is the limit for the number of requests that can be sent in a short period of time
TOO_MANY_REQUESTS_CODE = 429
from time import process_time, monotonic
user_i = process_time()
real_i = monotonic()
asyncio.run(main())
user = (process_time() - user_i)
real = (monotonic() - real_i)
print(f'waiting time (real minues user) {real-user:.2f}s')
print(', reguest rate [#/min]',f'{60*NN/(real):.2f}') N=200 files, this will gave me
Note the code above has send requests 1305 times! meaning that the server responded to 1105 requests by saying too-many-requests. This indicates that, if the rate limit were higher, the code above might be able to process the 200 requests within the firecrest implementation as it stands right now, taking about only 20 seconds, instead of 140~150! While with #107
And requests are processing one by one. (to be understood why, it should not be this way). Even if that rate limit were hypothetically high, the difference with result of the code above might get worse. This way or that way, unfortunately, as of right now, using
To keep it short: If you would like to think about using that strategy in |
Thanks a lot @khsrali ! I will go through your suggestion and take your offer for a Zoom call (I will schedule through an email). Just a very quick comment, my original idea on why the async client would be faster was because of the sleep times between the polling + the fact that normally you would be making concurrent requests to different microservices so you wouldn't be affected as much by the rate limit + the serialisation of the requests on the server side. |
Many thanks to @ekouts for our Friday meeting. As it turned out, the code I provided in my previous comment is not much better than Given this limitation from the server side, the only gain is to save on additional sleep times during the URL retrieval stage, which is the time the client is idle while waiting for the server to stage files. Therefore, it seems nothing can be done to further asynchronise the initial request unless the server allows that. With this clarification, I think we can now close this issue. |
If one uses two consecutive asyncio.run() in the asyncio library, everything should be alright. Because as far as I understood
asyncio.run()
is supposed to both create & close an event loop.However,
AsyncFirecrest
for some reasons, prevents a secondary run to create a new loop event, which I think is not an expected behavior. I'm not sure where this problem is originating from.It can be reproduced if you do something like this: --a dummy example, just to demonstrate--
The first run passes successfully, and the second one, which is supposed to create a new event loop, fails to do so.
RuntimeError: Event loop is closed
The text was updated successfully, but these errors were encountered: