Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Some nice way of async iterating? #37

Open
wmertens opened this issue Sep 16, 2019 · 6 comments
Open

Some nice way of async iterating? #37

wmertens opened this issue Sep 16, 2019 · 6 comments

Comments

@wmertens
Copy link

I'm trying to limit uploads to 10 simultaneously, and I'm doing it like this:

	const uploadSema = new Sema(10)
	for (const upload of uploads) {
		await uploadSema.acquire()
		uploadOne(upload).finally(() => uploadSema.release())
	}
	await uploadSema.drain()

It's reasonably nice, but I was wondering if there wasn't a way to make this nicer.

I moved the logic to this helper function

export const queuedWork = async (items, fn, workers = 10) => {
	const sema = new Sema(workers)
	let threw = null
	for (const item of items) {
		if (threw) break
		await sema.acquire()
		// eslint-disable-next-line promise/catch-or-return
		Promise.resolve(fn(item))
			.catch(err => {
				threw = err
			})
			.finally(() => sema.release())
	}
	await sema.drain()
	if (threw) throw threw
}

Is this a good way of going about it? Is there maybe a more elegant way?

(wrote these tests too)

test('queuedWork async', async () => {
	const out = []
	await queuedWork([1, 5, 7, 89, 2], async n => out.push(n), 2)
	expect(out).toEqual([1, 5, 7, 89, 2])
})

test('queuedWork sync', async () => {
	const out = []
	await queuedWork([1, 5, 7, 89, 2], n => out.push(n), 2)
	expect(out).toEqual([1, 5, 7, 89, 2])
})

test('queuedWork throws', async () => {
	const out = []
	await expect(
		queuedWork(
			[1, 2, 3, 4, 5],
			async n => {
				if (n === 2) throw new Error('meep')
				else out.push(n)
			},
			2
		)
	).rejects.toThrow('meep')
	expect(out).toEqual([1, 3])
})


test('queuedWork throws sync', async () => {
	const out = []
	await expect(
		queuedWork(
			[1, 2, 3, 4, 5],
			n => {
				if (n === 2) throw new Error('meep')
				else out.push(n)
			},
			2
		)
	).rejects.toThrow('meep')
	expect(out).toEqual([1])
})
@OlliV
Copy link
Collaborator

OlliV commented Sep 18, 2019

What I have been doing is something like this:

await Promise.all(files.map(async (file) => {
  await sema.acquire();
  try {
    await upload(file);
  } finally {
    sema.release();
  }
}));

@cardin
Copy link
Contributor

cardin commented Sep 19, 2019

@wmertens Actually, for your particular problem, you should look at this module instead: https://github.com/sindresorhus/p-queue

p-queue has some queue-based operations, event-emitters, and start/pause functionality.

async-sema has its uses of course, for e.g. if we are concerned about rate limiting.

@wmertens
Copy link
Author

@OlliV That's nice too, with the difference that you create all the promises at once. So for <1000 items that's easier to reason about, but beyond that I'm worried about memory consumption.

@cardin hmmm p-queue seems to only work by adding functions to a queue, not by passing an array and having it be processed. So it doesn't help in this case, or am I mistaken?

@cardin
Copy link
Contributor

cardin commented Sep 19, 2019

@wmertens The example code that OlliV gave shouldn't run into memory issues, since even 10,000 pending promises shouldn't be too much of an issue if they're not allowed to start.

p-queue has an .addAll() that accepts an array of functions. Like queue.add(() => uploadOne(upload)). As for why functions instead of promises, the author's rationale is to have more control over when the queue begins -> functions only execute the async operation when called, unlike promises which is uncontrollable.

@wmertens
Copy link
Author

@cardin but in both cases it does use O(n) memory, and at some point that becomes too much.

Processing an array with "workers" uses only O(nWorkers) memory.

V8 is pretty efficient, but if small changes (amount of code is similar) can use less memory, that's a good thing, right? Less memory trashing => less GC => faster, even apart from host constraints

@OlliV
Copy link
Collaborator

OlliV commented Sep 19, 2019

@wmertens oh right. I have seen that happening (OOM). I have used array-split for splitting a long array and processing it in chunks.

Maybe I could add it to the examples.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants