Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding concurrent function that works like Promise.allSettled #182

Open
shine1594 opened this issue Dec 7, 2022 · 4 comments
Open

Adding concurrent function that works like Promise.allSettled #182

shine1594 opened this issue Dec 7, 2022 · 4 comments
Labels
enhancement New feature or request

Comments

@shine1594
Copy link
Collaborator

Suggestion

⭐ Suggestion

💻 Use Cases

await  pipe(
  range(1, 10),
  someAsyncTask, // Assume `ReturnType<typeof someAsyncTask>` is `T`
  concurrent(5, 'allSettled') // OR concurrent.allSettled(5),
  filter((a): a is PromiseFulfilledResult<T> => a.status === 'fulfilled'),
  each(({ value }) => { console.log(value); })
);
@ppeeou
Copy link
Member

ppeeou commented Dec 20, 2022

Good suggestion! How about concurrentAllSettled?

@ppeeou ppeeou added the enhancement New feature or request label Jul 10, 2023
@puppybits
Copy link

Love the concurrent function. Is there a design reason on why it batches/chunks instead of operating like buffer?

In the case of a long running task, the time for a chunk to resolve will be capped at the slowest item in the chunk. Is there a use case in mind for the concurrent where chunking is preferred? Would it be more dynamic if it parallelized or create another function that allows parallel operations up to a limit?

@ppeeou
Copy link
Member

ppeeou commented Dec 27, 2024

@puppybits Thank you for your interest in fxts

As you mentioned, specifying the maximum number of requests and adjusting backpressure might be a better fit for speed and concurrency(like buffer)

This function has a use case where it can apply effects simultaneously. It waits for all file I/O requests to complete, and once all the requests are finished, it applies the effect at once.

If you were to visualize this on the screen, it would look like the video below:
(The use case for how to handle this depends on the user 😃)

It would be useful if functions working with buffers were added as well!

default.mov

https://fxts.dev/docs/handle-concurrency

@ppeeou
Copy link
Member

ppeeou commented Dec 27, 2024

@puppybits It seems that this will be useful when making requests to a service with a rate limit.

  • Condition
    • Rate limit: 70 requests per minute
  • Implementation
    • Send 10 requests every 10 seconds (60 requests per minute) // 10 requests remaining as buffer
    • Assume error handling is done outside
async function executeTask<T>(val: T) {
  return delay(100, val);
}


async function withoutConcurrent() {
  const tasks = range(Infinity);

  return await pipe(
    toAsync(tasks),
    map((task) => () => executeTask(task)),
    chunk(10),
    map((tasks) => append(() => delay(10_000, 0), tasks)), // for rate limiting
    map((tasks) =>
      Promise.all(Array.from(tasks).map((f) => f())).then((tasks) =>
        dropRight(1, tasks) // filter `delay`
      )
    ),
    flat,
    peek((task) => console.log(task)), // print task
    toArray
  );
}

async function withConcurrent() {
  const tasks = range(Infinity);

  return await pipe(
    toAsync(tasks),
    map((task) =>
      Promise.all([
        executeTask(task),
        delay(10_000), // for rate limiting
      ])
    ),
    map(([task]) => task),
    peek((task) => console.log(task)), // print task
    concurrent(10),
    toArray
  );
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

3 participants