From b24812eb554f243a4424fc07e966ed774ed64b7f Mon Sep 17 00:00:00 2001 From: hannahhoward Date: Fri, 13 Dec 2024 18:11:47 -0800 Subject: [PATCH 01/11] feat(tracing): add tracing --- package.json | 1 + pnpm-lock.yaml | 9 +++ src/fetcher/batching.js | 107 ++++++++++++++++++++-------------- src/fetcher/simple.js | 48 ++++++++------- src/locator/content-claims.js | 11 +++- src/tracing/tracing.js | 69 ++++++++++++++++++++++ 6 files changed, 177 insertions(+), 68 deletions(-) create mode 100644 src/tracing/tracing.js diff --git a/package.json b/package.json index 17b526b..eb97ca1 100644 --- a/package.json +++ b/package.json @@ -55,6 +55,7 @@ "dependencies": { "@cloudflare/workers-types": "^4.20241022.0", "@ipld/dag-ucan": "^3.4.0", + "@opentelemetry/api": "^1.9.0", "@storacha/indexing-service-client": "^2.0.0", "@ucanto/interface": "^10.0.1", "@web3-storage/blob-index": "^1.0.2", diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 4e03078..5c3b9e4 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -14,6 +14,9 @@ importers: '@ipld/dag-ucan': specifier: ^3.4.0 version: 3.4.0 + '@opentelemetry/api': + specifier: ^1.9.0 + version: 1.9.0 '@storacha/indexing-service-client': specifier: ^2.0.0 version: 2.0.0 @@ -185,6 +188,10 @@ packages: resolution: {integrity: sha512-oGB+UxlgWcgQkgwo8GcEGwemoTFt3FIO9ababBmaGwXIoBKZ+GTy0pP185beGg7Llih/NSHSV2XAs1lnznocSg==} engines: {node: '>= 8'} + '@opentelemetry/api@1.9.0': + resolution: {integrity: sha512-3giAOQvZiH5F9bMlMiv8+GSPMeqg0dbaeo58/0SlA9sxSqZhnUtxzX9/2FzyhS9sWQf5S0GJE0AKBrFqjpeYcg==} + engines: {node: '>=8.0.0'} + '@perma/map@1.0.3': resolution: {integrity: sha512-Bf5njk0fnJGTFE2ETntq0N1oJ6YdCPIpTDn3R3KYZJQdeYSOCNL7mBrFlGnbqav8YQhJA/p81pvHINX9vAtHkQ==} @@ -1644,6 +1651,8 @@ snapshots: '@nodelib/fs.scandir': 2.1.5 fastq: 1.17.1 + '@opentelemetry/api@1.9.0': {} + '@perma/map@1.0.3': dependencies: '@multiformats/murmur3': 2.1.8 diff --git a/src/fetcher/batching.js b/src/fetcher/batching.js index 1457c98..2733b78 100644 --- a/src/fetcher/batching.js +++ b/src/fetcher/batching.js @@ -6,6 +6,7 @@ import { MultipartByteRangeDecoder, getBoundary } from 'multipart-byte-range/dec import { NetworkError, NotFoundError } from '../lib.js' import { fetchBlob } from './simple.js' import { resolveRange } from './lib.js' +import { withResultSpan } from '../tracing/tracing.js' /** * @typedef {'*'|`${number},${number}`|`${number}`} RangeKey @@ -142,49 +143,67 @@ export const create = (locator) => new BatchingFetcher(locator) /** * Fetch blobs from the passed locations. The locations MUST share a common * site to fetch from. - * + */ +export const fetchBlobs = withResultSpan('fetchBlobs', +/** * @param {URL} url Desired URL to fetch blobs from. * @param {Array<{ location: API.Location, range?: API.Range }>} locations * @returns {Promise>} */ -export const fetchBlobs = async (url, locations) => { - if (locations.length === 1) { - const res = await fetchBlob(locations[0].location, locations[0].range) - if (res.error) return res - return { ok: [res.ok] } - } + async (url, locations) => { + if (locations.length === 1) { + const res = await fetchBlob(locations[0].location, locations[0].range) + if (res.error) return res + return { ok: [res.ok] } + } - const ranges = [] - for (const { location, range } of locations) { - for (const s of location.site) { - let found = false - for (const l of s.location) { - if (l.toString() === url.toString()) { + const ranges = [] + for (const { location, range } of locations) { + for (const s of location.site) { + let found = false + for (const l of s.location) { + if (l.toString() === url.toString()) { /** @type {import('multipart-byte-range').AbsoluteRange} */ - let resolvedRange = [s.range.offset, s.range.offset + s.range.length - 1] - if (range) { - const relRange = resolveRange(range, s.range.length) - resolvedRange = [s.range.offset + relRange[0], s.range.offset + relRange[1]] + let resolvedRange = [s.range.offset, s.range.offset + s.range.length - 1] + if (range) { + const relRange = resolveRange(range, s.range.length) + resolvedRange = [s.range.offset + relRange[0], s.range.offset + relRange[1]] + } + ranges.push(resolvedRange) + found = true + break } - ranges.push(resolvedRange) - found = true - break } + if (found) break } - if (found) break } - } - if (ranges.length !== locations.length) { - throw new Error('no common site') - } + if (ranges.length !== locations.length) { + throw new Error('no common site') + } - const headers = { Range: `bytes=${ranges.map(r => `${r[0]}-${r[1]}`).join(',')}` } - try { - const res = await fetch(url, { headers }) - if (!res.ok) { - return { error: new NetworkError(url, { cause: new Error(`unexpected HTTP status: ${res.status}`) }) } + const headers = { Range: `bytes=${ranges.map(r => `${r[0]}-${r[1]}`).join(',')}` } + try { + const res = await fetch(url, { headers }) + if (!res.ok) { + return { error: new NetworkError(url, { cause: new Error(`unexpected HTTP status: ${res.status}`) }) } + } + return await consumeMultipartResponse(url, locations, res) + } catch (err) { + return { error: new NetworkError(url, { cause: err }) } } + }) +/** + * Consumes a multipart range request to create multiple blobs + */ +const consumeMultipartResponse = withResultSpan('consumeMultipartResponse', +/** + * @param {URL} url + * @param {Array<{ location: API.Location, range?: API.Range }>} locations + * @param {Response} res + * @returns {Promise>} + */ + async (url, locations, res) => { if (!res.body) { return { error: new NetworkError(url, { cause: new Error('missing repsonse body') }) } } @@ -197,20 +216,20 @@ export const fetchBlobs = async (url, locations) => { /** @type {API.Blob[]} */ const blobs = [] let i = 0 - await res.body - .pipeThrough(new MultipartByteRangeDecoder(boundary)) - .pipeTo(new WritableStream({ - write (part) { - blobs.push(new Blob(locations[i].location.digest, part.content)) - i++ - } - })) - - return { ok: blobs } - } catch (err) { - return { error: new NetworkError(url, { cause: err }) } - } -} + try { + await res.body + .pipeThrough(new MultipartByteRangeDecoder(boundary)) + .pipeTo(new WritableStream({ + write (part) { + blobs.push(new Blob(locations[i].location.digest, part.content)) + i++ + } + })) + return { ok: blobs } + } catch (err) { + return { error: new NetworkError(url, { cause: err }) } + } + }) /** @implements {API.Blob} */ class Blob { diff --git a/src/fetcher/simple.js b/src/fetcher/simple.js index d45ce9d..3c3cb59 100644 --- a/src/fetcher/simple.js +++ b/src/fetcher/simple.js @@ -2,6 +2,7 @@ import * as API from '../api.js' import { resolveRange } from './lib.js' import { NetworkError, NotFoundError } from '../lib.js' +import { withResultSpan } from '../tracing/tracing.js' /** @implements {API.Fetcher} */ class SimpleFetcher { @@ -30,36 +31,39 @@ class SimpleFetcher { */ export const create = (locator) => new SimpleFetcher(locator) -/** +export const fetchBlob = withResultSpan('fetchBlob', + /** * Fetch a blob from the passed location. * @param {API.Location} location * @param {API.Range} [range] */ -export const fetchBlob = async (location, range) => { - let networkError - for (const site of location.site) { - for (const url of site.location) { - let resolvedRange = [site.range.offset, site.range.offset + site.range.length - 1] - if (range) { - const relRange = resolveRange(range, site.range.length) - resolvedRange = [site.range.offset + relRange[0], site.range.offset + relRange[1]] - } - const headers = { Range: `bytes=${resolvedRange[0]}-${resolvedRange[1]}` } - try { - const res = await fetch(url, { headers }) - if (!res.ok || !res.body) { - console.warn(`failed to fetch ${url}: ${res.status} ${await res.text()}`) - continue + async (location, range) => { + let networkError + + for (const site of location.site) { + for (const url of site.location) { + let resolvedRange = [site.range.offset, site.range.offset + site.range.length - 1] + if (range) { + const relRange = resolveRange(range, site.range.length) + resolvedRange = [site.range.offset + relRange[0], site.range.offset + relRange[1]] + } + const headers = { Range: `bytes=${resolvedRange[0]}-${resolvedRange[1]}` } + try { + const res = await fetch(url, { headers }) + if (!res.ok || !res.body) { + console.warn(`failed to fetch ${url}: ${res.status} ${await res.text()}`) + continue + } + return { ok: new Blob(location.digest, res) } + } catch (err) { + networkError = new NetworkError(url, { cause: err }) } - return { ok: new Blob(location.digest, res) } - } catch (err) { - networkError = new NetworkError(url, { cause: err }) } } - } - return { error: networkError || new NotFoundError(location.digest) } -} + return { error: networkError || new NotFoundError(location.digest) } + } +) /** @implements {API.Blob} */ class Blob { diff --git a/src/locator/content-claims.js b/src/locator/content-claims.js index 7e7e239..4c70a53 100644 --- a/src/locator/content-claims.js +++ b/src/locator/content-claims.js @@ -5,6 +5,7 @@ import { DigestMap, ShardedDAGIndex } from '@web3-storage/blob-index' import { fetchBlob } from '../fetcher/simple.js' import { NotFoundError } from '../lib.js' import { base58btc } from 'multiformats/bases/base58' +import { withSimpleSpan } from 'src/tracing/tracing.js' /** * @import { DID } from '@ucanto/interface' @@ -78,7 +79,7 @@ export class ContentClaimsLocator { * Read claims for the passed CID and populate the cache. * @param {API.MultihashDigest} digest */ - async #readClaims (digest) { + async #internalReadClaims (digest) { if (this.#claimFetched.has(digest)) return const claims = await Claims.read(digest, { serviceURL: this.#serviceURL }) @@ -110,7 +111,7 @@ export class ContentClaimsLocator { if (this.#carpark === undefined) { continue } - const obj = await this.#carpark.get(toBlobKey(claim.index.multihash)) + const obj = await withSimpleSpan('carPark.get', this.#carpark.get, this.#carpark)(toBlobKey(claim.index.multihash)) if (!obj) { continue } @@ -170,6 +171,12 @@ export class ContentClaimsLocator { this.#claimFetched.set(digest, true) } + /** + * Read claims for the passed CID and populate the cache. + * @param {API.MultihashDigest} digest + */ + #readClaims = withSimpleSpan('readClaims', this.#internalReadClaims, this) + /** @type {API.Locator['scopeToSpaces']} */ scopeToSpaces (spaces) { return spaceFilteredLocator(this, spaces) diff --git a/src/tracing/tracing.js b/src/tracing/tracing.js new file mode 100644 index 0000000..c16a1c5 --- /dev/null +++ b/src/tracing/tracing.js @@ -0,0 +1,69 @@ +import { trace, context, SpanStatusCode } from '@opentelemetry/api' + +/** + * @template {unknown[]} A + * @template {unknown} T + * @template {Error} X + * @template {import('../api.js').Result} Result + * @param {string} spanName + * @param {(...args: A) => Promise} fn + */ +export const withResultSpan = (spanName, fn) => + /** + * @param {A} args + */ + async (...args) => { + const tracer = trace.getTracer('blob-fetcher') + const span = tracer.startSpan(spanName) + const ctx = trace.setSpan(context.active(), span) + + const result = await context.with(ctx, fn, null, ...args) + if (result.ok) { + span.setStatus({ code: SpanStatusCode.OK }) + } else { + span.setStatus({ + code: SpanStatusCode.ERROR, + message: result.error?.message + }) + } + span.end() + return result + } + +/** + * @template {unknown[]} A + * @template {*} T + * @template {*} This + * @param {string} spanName + * @param {(this: This, ...args: A) => Promise} fn + * @param {This} [thisParam] + */ +export const withSimpleSpan = (spanName, fn, thisParam) => + /** + * @param {A} args + */ + async (...args) => { + const tracer = trace.getTracer('blob-fetcher') + const span = tracer.startSpan(spanName) + const ctx = trace.setSpan(context.active(), span) + + try { + const result = await context.with(ctx, fn, thisParam, ...args) + span.setStatus({ code: SpanStatusCode.OK }) + span.end() + return result + } catch (err) { + if (err instanceof Error) { + span.setStatus({ + code: SpanStatusCode.ERROR, + message: err.message + }) + } else { + span.setStatus({ + code: SpanStatusCode.ERROR + }) + } + span.end() + throw err + } + } From 24ff66ce27e7868ffe032f07098230083ec33425 Mon Sep 17 00:00:00 2001 From: hannahhoward Date: Fri, 13 Dec 2024 18:14:43 -0800 Subject: [PATCH 02/11] feat(batching): fetch from single range --- package.json | 1 + pnpm-lock.yaml | 3 +++ src/fetcher/batching.js | 59 +++++++++++++++++++++++++++-------------- 3 files changed, 43 insertions(+), 20 deletions(-) diff --git a/package.json b/package.json index eb97ca1..4fa2ba2 100644 --- a/package.json +++ b/package.json @@ -64,6 +64,7 @@ "multipart-byte-range": "^3.0.1", "p-defer": "^4.0.1", "p-queue": "^8.0.1", + "uint8arraylist": "^2.4.8", "zod": "^3.23.8" }, "devDependencies": { diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 5c3b9e4..24092fe 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -41,6 +41,9 @@ importers: p-queue: specifier: ^8.0.1 version: 8.0.1 + uint8arraylist: + specifier: ^2.4.8 + version: 2.4.8 zod: specifier: ^3.23.8 version: 3.23.8 diff --git a/src/fetcher/batching.js b/src/fetcher/batching.js index 2733b78..bdd13dd 100644 --- a/src/fetcher/batching.js +++ b/src/fetcher/batching.js @@ -7,6 +7,7 @@ import { NetworkError, NotFoundError } from '../lib.js' import { fetchBlob } from './simple.js' import { resolveRange } from './lib.js' import { withResultSpan } from '../tracing/tracing.js' +import { Uint8ArrayList } from 'uint8arraylist' /** * @typedef {'*'|`${number},${number}`|`${number}`} RangeKey @@ -140,6 +141,8 @@ class BatchingFetcher { */ export const create = (locator) => new BatchingFetcher(locator) +/** @typedef {{range: import('multipart-byte-range').AbsoluteRange, digest: API.MultihashDigest}} ResolvedRange */ + /** * Fetch blobs from the passed locations. The locations MUST share a common * site to fetch from. @@ -157,6 +160,7 @@ export const fetchBlobs = withResultSpan('fetchBlobs', return { ok: [res.ok] } } + /** @type {ResolvedRange[]>} */ const ranges = [] for (const { location, range } of locations) { for (const s of location.site) { @@ -169,7 +173,10 @@ export const fetchBlobs = withResultSpan('fetchBlobs', const relRange = resolveRange(range, s.range.length) resolvedRange = [s.range.offset + relRange[0], s.range.offset + relRange[1]] } - ranges.push(resolvedRange) + ranges.push({ + digest: location.digest, + range: resolvedRange + }) found = true break } @@ -181,13 +188,15 @@ export const fetchBlobs = withResultSpan('fetchBlobs', throw new Error('no common site') } - const headers = { Range: `bytes=${ranges.map(r => `${r[0]}-${r[1]}`).join(',')}` } + ranges.sort((a, b) => a.range[0] - b.range[0]) + const aggregateRangeEnd = ranges.reduce((aggregateEnd, r) => r.range[1] > aggregateEnd ? r.range[1] : aggregateEnd, 0) + const headers = { Range: `bytes=${ranges[0].range[0]}-${aggregateRangeEnd}` } try { const res = await fetch(url, { headers }) if (!res.ok) { return { error: new NetworkError(url, { cause: new Error(`unexpected HTTP status: ${res.status}`) }) } } - return await consumeMultipartResponse(url, locations, res) + return await consumeMultipartResponse(url, ranges, res) } catch (err) { return { error: new NetworkError(url, { cause: err }) } } @@ -199,34 +208,44 @@ export const fetchBlobs = withResultSpan('fetchBlobs', const consumeMultipartResponse = withResultSpan('consumeMultipartResponse', /** * @param {URL} url - * @param {Array<{ location: API.Location, range?: API.Range }>} locations + * @param {ResolvedRange[]} sortedRanges * @param {Response} res * @returns {Promise>} */ - async (url, locations, res) => { + async (url, sortedRanges, res) => { if (!res.body) { return { error: new NetworkError(url, { cause: new Error('missing repsonse body') }) } } - - const boundary = getBoundary(res.headers) - if (!boundary) { - return { error: new NetworkError(url, { cause: new Error('missing multipart boundary') }) } - } - - /** @type {API.Blob[]} */ const blobs = [] - let i = 0 + const parts = new Uint8ArrayList() + let farthestRead = sortedRanges[0].range[0] + let farthestConsumed = sortedRanges[0].range[0] + let currentRange = 0 try { - await res.body - .pipeThrough(new MultipartByteRangeDecoder(boundary)) - .pipeTo(new WritableStream({ - write (part) { - blobs.push(new Blob(locations[i].location.digest, part.content)) - i++ + for await (const chunk of res.body) { + // append the chunk to our buffer + parts.append(chunk) + // update the absolute range of what we've read + farthestRead += chunk.byteLength + // read and push any blobs in the current buffer + // note that as long as ranges are sorted ascending by start + // this should be resilient to overlapping ranges + while (farthestRead >= sortedRanges[currentRange].range[1] + 1) { + blobs.push(new Blob(sortedRanges[currentRange].digest, + parts.subarray(sortedRanges[currentRange].range[0] - farthestConsumed, sortedRanges[currentRange].range[1] + 1 - farthestConsumed))) + currentRange++ + if (currentRange >= sortedRanges.length) { + return { ok: blobs } } - })) + let toConsume = sortedRanges[currentRange].range[0] - farthestConsumed + if (toConsume > parts.byteLength) { toConsume = parts.byteLength } + parts.consume(toConsume) + farthestConsumed += toConsume + } + } return { ok: blobs } } catch (err) { + console.log(err) return { error: new NetworkError(url, { cause: err }) } } }) From 33afdb1ec4f8ee15db8f7e1d27ca5e6ac27943d5 Mon Sep 17 00:00:00 2001 From: hannahhoward Date: Fri, 13 Dec 2024 18:16:28 -0800 Subject: [PATCH 03/11] feat(batching): kick off fetchblobs in parallel --- src/fetcher/batching.js | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/src/fetcher/batching.js b/src/fetcher/batching.js index bdd13dd..06b4180 100644 --- a/src/fetcher/batching.js +++ b/src/fetcher/batching.js @@ -68,6 +68,7 @@ class BatchingFetcher { const pendingReqs = this.#pendingReqs this.#pendingReqs = new DigestMap() + const responses = [] while (true) { const first = queue.shift() if (!first) break @@ -86,11 +87,18 @@ class BatchingFetcher { if (locs.length >= MAX_BATCH_SIZE) break } - const res = await fetchBlobs(siteURL, locs) + responses.push({ + promise: fetchBlobs(siteURL, locs), + locs + }) + } + + for (const response of responses) { + const res = await response.promise if (res.error) break for (const [i, blob] of res.ok.entries()) { const rangeReqs = pendingReqs.get(blob.digest) - const key = rangeKey(locs[i].range) + const key = rangeKey(response.locs[i].range) const reqs = rangeReqs?.get(key) reqs?.[0].resolve({ ok: blob }) reqs?.slice(1).forEach(r => r.resolve({ ok: blob.clone() })) From 34c5a810d6d69f3ab3a80ddb5cc2cbc2ce3da5cd Mon Sep 17 00:00:00 2001 From: hannahhoward Date: Fri, 13 Dec 2024 18:18:59 -0800 Subject: [PATCH 04/11] feat(batching): resolve blocks as soon as we have them --- src/fetcher/batching.js | 63 ++++++++++++++++++++++------------------- 1 file changed, 34 insertions(+), 29 deletions(-) diff --git a/src/fetcher/batching.js b/src/fetcher/batching.js index 06b4180..771bbe6 100644 --- a/src/fetcher/batching.js +++ b/src/fetcher/batching.js @@ -87,24 +87,10 @@ class BatchingFetcher { if (locs.length >= MAX_BATCH_SIZE) break } - responses.push({ - promise: fetchBlobs(siteURL, locs), - locs - }) + responses.push(fetchBlobs(siteURL, locs, pendingReqs)) } - for (const response of responses) { - const res = await response.promise - if (res.error) break - for (const [i, blob] of res.ok.entries()) { - const rangeReqs = pendingReqs.get(blob.digest) - const key = rangeKey(response.locs[i].range) - const reqs = rangeReqs?.get(key) - reqs?.[0].resolve({ ok: blob }) - reqs?.slice(1).forEach(r => r.resolve({ ok: blob.clone() })) - rangeReqs?.delete(key) - } - } + await Promise.all(responses) // resolve `undefined` for any remaining requests for (const [digest, rangeReqs] of pendingReqs) { @@ -149,7 +135,7 @@ class BatchingFetcher { */ export const create = (locator) => new BatchingFetcher(locator) -/** @typedef {{range: import('multipart-byte-range').AbsoluteRange, digest: API.MultihashDigest}} ResolvedRange */ +/** @typedef {{range: import('multipart-byte-range').AbsoluteRange, digest: API.MultihashDigest, orig: API.Range | undefined}} ResolvedRange */ /** * Fetch blobs from the passed locations. The locations MUST share a common @@ -159,13 +145,15 @@ export const fetchBlobs = withResultSpan('fetchBlobs', /** * @param {URL} url Desired URL to fetch blobs from. * @param {Array<{ location: API.Location, range?: API.Range }>} locations - * @returns {Promise>} + * @param {DigestMap} pendingReqs + * @returns {Promise>} */ - async (url, locations) => { + async (url, locations, pendingReqs) => { if (locations.length === 1) { const res = await fetchBlob(locations[0].location, locations[0].range) if (res.error) return res - return { ok: [res.ok] } + resolveBlob(res.ok, locations[0].range, pendingReqs) + return { ok: true } } /** @type {ResolvedRange[]>} */ @@ -183,7 +171,8 @@ export const fetchBlobs = withResultSpan('fetchBlobs', } ranges.push({ digest: location.digest, - range: resolvedRange + range: resolvedRange, + orig: range }) found = true break @@ -204,7 +193,7 @@ export const fetchBlobs = withResultSpan('fetchBlobs', if (!res.ok) { return { error: new NetworkError(url, { cause: new Error(`unexpected HTTP status: ${res.status}`) }) } } - return await consumeMultipartResponse(url, ranges, res) + return await consumeMultipartResponse(url, ranges, res, pendingReqs) } catch (err) { return { error: new NetworkError(url, { cause: err }) } } @@ -218,13 +207,13 @@ const consumeMultipartResponse = withResultSpan('consumeMultipartResponse', * @param {URL} url * @param {ResolvedRange[]} sortedRanges * @param {Response} res - * @returns {Promise>} + * @param {DigestMap} pendingReqs + * @returns {Promise>} */ - async (url, sortedRanges, res) => { + async (url, sortedRanges, res, pendingReqs) => { if (!res.body) { return { error: new NetworkError(url, { cause: new Error('missing repsonse body') }) } } - const blobs = [] const parts = new Uint8ArrayList() let farthestRead = sortedRanges[0].range[0] let farthestConsumed = sortedRanges[0].range[0] @@ -239,11 +228,12 @@ const consumeMultipartResponse = withResultSpan('consumeMultipartResponse', // note that as long as ranges are sorted ascending by start // this should be resilient to overlapping ranges while (farthestRead >= sortedRanges[currentRange].range[1] + 1) { - blobs.push(new Blob(sortedRanges[currentRange].digest, - parts.subarray(sortedRanges[currentRange].range[0] - farthestConsumed, sortedRanges[currentRange].range[1] + 1 - farthestConsumed))) + const blob = new Blob(sortedRanges[currentRange].digest, + parts.subarray(sortedRanges[currentRange].range[0] - farthestConsumed, sortedRanges[currentRange].range[1] + 1 - farthestConsumed)) + resolveBlob(blob, sortedRanges[currentRange].orig, pendingReqs) currentRange++ if (currentRange >= sortedRanges.length) { - return { ok: blobs } + return { ok: true } } let toConsume = sortedRanges[currentRange].range[0] - farthestConsumed if (toConsume > parts.byteLength) { toConsume = parts.byteLength } @@ -251,13 +241,28 @@ const consumeMultipartResponse = withResultSpan('consumeMultipartResponse', farthestConsumed += toConsume } } - return { ok: blobs } + return { error: new NetworkError(url, { cause: new Error('did not resolve all chunks') }) } } catch (err) { console.log(err) return { error: new NetworkError(url, { cause: err }) } } }) +/** + * + * @param {API.Blob} blob + * @param {API.Range | undefined} range + * @param {DigestMap} pendingReqs + */ +const resolveBlob = (blob, range, pendingReqs) => { + const rangeReqs = pendingReqs.get(blob.digest) + const key = rangeKey(range) + const reqs = rangeReqs?.get(key) + reqs?.[0].resolve({ ok: blob }) + reqs?.slice(1).forEach(r => r.resolve({ ok: blob.clone() })) + rangeReqs?.delete(key) +} + /** @implements {API.Blob} */ class Blob { #digest From ae3e5f1c94bc52693330b9a51aa01aed6ef54fc6 Mon Sep 17 00:00:00 2001 From: hannahhoward Date: Tue, 24 Dec 2024 13:55:22 -0800 Subject: [PATCH 05/11] refactor(minimize memory usage with generators): --- src/fetcher/batching.js | 86 ++++++++++++++++++++++++----------- src/locator/content-claims.js | 2 +- src/tracing/tracing.js | 75 ++++++++++++++++++++++++++++++ 3 files changed, 135 insertions(+), 28 deletions(-) diff --git a/src/fetcher/batching.js b/src/fetcher/batching.js index 771bbe6..e534311 100644 --- a/src/fetcher/batching.js +++ b/src/fetcher/batching.js @@ -2,11 +2,10 @@ import * as API from '../api.js' import { DigestMap } from '@web3-storage/blob-index' import defer from 'p-defer' -import { MultipartByteRangeDecoder, getBoundary } from 'multipart-byte-range/decoder' import { NetworkError, NotFoundError } from '../lib.js' import { fetchBlob } from './simple.js' import { resolveRange } from './lib.js' -import { withResultSpan } from '../tracing/tracing.js' +import { withAsyncGeneratorSpan, withResultSpan } from '../tracing/tracing.js' import { Uint8ArrayList } from 'uint8arraylist' /** @@ -68,7 +67,14 @@ class BatchingFetcher { const pendingReqs = this.#pendingReqs this.#pendingReqs = new DigestMap() - const responses = [] + // Basic algorithm + // 1. assemble each http request + // 2. fire off request + // 3. once first byte received, begin processing the response async in background + // 4. immediately go to next http request, but after first iteration, wait so that we're never processing the body + // of more than one response at a time + /** @type {Promise> | undefined } */ + let lastResolveBlobs while (true) { const first = queue.shift() if (!first) break @@ -87,10 +93,26 @@ class BatchingFetcher { if (locs.length >= MAX_BATCH_SIZE) break } - responses.push(fetchBlobs(siteURL, locs, pendingReqs)) + const fetchRes = await fetchBlobs(siteURL, locs) + // if we have an error, stop + if (fetchRes.error) { + break + } + // if we are still processing the previous response, we should wait before we process this response + if (lastResolveBlobs !== undefined) { + const resolveRes = await lastResolveBlobs + lastResolveBlobs = undefined + if (resolveRes.error) { + break + } + } + lastResolveBlobs = resolveBlobs(fetchRes.ok, pendingReqs) } - await Promise.all(responses) + // await the last call to resolve blobs + if (lastResolveBlobs !== undefined) { + await lastResolveBlobs + } // resolve `undefined` for any remaining requests for (const [digest, rangeReqs] of pendingReqs) { @@ -145,18 +167,21 @@ export const fetchBlobs = withResultSpan('fetchBlobs', /** * @param {URL} url Desired URL to fetch blobs from. * @param {Array<{ location: API.Location, range?: API.Range }>} locations - * @param {DigestMap} pendingReqs - * @returns {Promise>} + * @returns {Promise>, API.NotFound|API.Aborted|API.NetworkError>>} */ - async (url, locations, pendingReqs) => { + async (url, locations) => { if (locations.length === 1) { const res = await fetchBlob(locations[0].location, locations[0].range) if (res.error) return res - resolveBlob(res.ok, locations[0].range, pendingReqs) - return { ok: true } + return { + ok: (async function * () { + yield { blob: res.ok, range: locations[0].range } + return { ok: true } + }()) + } } - /** @type {ResolvedRange[]>} */ + /** @type {ResolvedRange[]} */ const ranges = [] for (const { location, range } of locations) { for (const s of location.site) { @@ -193,24 +218,25 @@ export const fetchBlobs = withResultSpan('fetchBlobs', if (!res.ok) { return { error: new NetworkError(url, { cause: new Error(`unexpected HTTP status: ${res.status}`) }) } } - return await consumeMultipartResponse(url, ranges, res, pendingReqs) + return { ok: consumeMultipartResponse(url, ranges, res) } } catch (err) { return { error: new NetworkError(url, { cause: err }) } } }) +/** @typedef {{blob: API.Blob, range: API.Range | undefined}} BlobResult */ + /** * Consumes a multipart range request to create multiple blobs */ -const consumeMultipartResponse = withResultSpan('consumeMultipartResponse', +const consumeMultipartResponse = withAsyncGeneratorSpan('consumeMultipartResponse', /** * @param {URL} url * @param {ResolvedRange[]} sortedRanges * @param {Response} res - * @param {DigestMap} pendingReqs - * @returns {Promise>} + * @returns {AsyncGenerator>} */ - async (url, sortedRanges, res, pendingReqs) => { + async function * (url, sortedRanges, res) { if (!res.body) { return { error: new NetworkError(url, { cause: new Error('missing repsonse body') }) } } @@ -230,7 +256,7 @@ const consumeMultipartResponse = withResultSpan('consumeMultipartResponse', while (farthestRead >= sortedRanges[currentRange].range[1] + 1) { const blob = new Blob(sortedRanges[currentRange].digest, parts.subarray(sortedRanges[currentRange].range[0] - farthestConsumed, sortedRanges[currentRange].range[1] + 1 - farthestConsumed)) - resolveBlob(blob, sortedRanges[currentRange].orig, pendingReqs) + yield ({ blob, range: sortedRanges[currentRange].orig }) currentRange++ if (currentRange >= sortedRanges.length) { return { ok: true } @@ -243,24 +269,30 @@ const consumeMultipartResponse = withResultSpan('consumeMultipartResponse', } return { error: new NetworkError(url, { cause: new Error('did not resolve all chunks') }) } } catch (err) { - console.log(err) return { error: new NetworkError(url, { cause: err }) } } }) /** * - * @param {API.Blob} blob - * @param {API.Range | undefined} range + * @param {AsyncGenerator>} results * @param {DigestMap} pendingReqs + * @returns {Promise>} */ -const resolveBlob = (blob, range, pendingReqs) => { - const rangeReqs = pendingReqs.get(blob.digest) - const key = rangeKey(range) - const reqs = rangeReqs?.get(key) - reqs?.[0].resolve({ ok: blob }) - reqs?.slice(1).forEach(r => r.resolve({ ok: blob.clone() })) - rangeReqs?.delete(key) +const resolveBlobs = async (results, pendingReqs) => { + for (;;) { + const { value: result, done } = await results.next() + if (done) { + return result + } + const { blob, range } = result + const rangeReqs = pendingReqs.get(blob.digest) + const key = rangeKey(range) + const reqs = rangeReqs?.get(key) + reqs?.[0].resolve({ ok: blob }) + reqs?.slice(1).forEach(r => r.resolve({ ok: blob.clone() })) + rangeReqs?.delete(key) + } } /** @implements {API.Blob} */ diff --git a/src/locator/content-claims.js b/src/locator/content-claims.js index 4c70a53..203b97d 100644 --- a/src/locator/content-claims.js +++ b/src/locator/content-claims.js @@ -5,7 +5,7 @@ import { DigestMap, ShardedDAGIndex } from '@web3-storage/blob-index' import { fetchBlob } from '../fetcher/simple.js' import { NotFoundError } from '../lib.js' import { base58btc } from 'multiformats/bases/base58' -import { withSimpleSpan } from 'src/tracing/tracing.js' +import { withSimpleSpan } from '../tracing/tracing.js' /** * @import { DID } from '@ucanto/interface' diff --git a/src/tracing/tracing.js b/src/tracing/tracing.js index c16a1c5..ae1ac2d 100644 --- a/src/tracing/tracing.js +++ b/src/tracing/tracing.js @@ -30,6 +30,81 @@ export const withResultSpan = (spanName, fn) => return result } +/** + * @template {unknown} O + * @template {Error} X + * @template {unknown} T + * @template {import('../api.js').Result} Result + * @param {import('@opentelemetry/api').Span} span + * @param {AsyncGenerator} gen + */ +async function * recordAsyncGeneratorSpan (span, gen) { + try { + for (;;) { + const { value: result, done } = await gen.next() + if (done) { + if (result.ok) { + span.setStatus({ code: SpanStatusCode.OK }) + } else { + span.setStatus({ + code: SpanStatusCode.ERROR, + message: result.error?.message + }) + } + return result + } + yield (result) + } + } finally { + span.end() + } +} + +/** + * @template {unknown[]} A + * @template {unknown} O + * @template {Error} X + * @template {unknown} T + * @template {import('../api.js').Result} Result + * @param {string} spanName + * @param {(...args: A) => AsyncGenerator} fn + */ +export function withAsyncGeneratorSpan (spanName, fn) { + /** + * @param {A} args + */ + return function (...args) { + const tracer = trace.getTracer('blob-fetcher') + const span = tracer.startSpan(spanName) + const ctx = trace.setSpan(context.active(), span) + const gen = context.with(ctx, fn, null, ...args) + return recordAsyncGeneratorSpan(span, bindAsyncGenerator(ctx, gen)) + } +} + +/** + * bindAsyncGenerator binds an async generator to a context + * see https://github.com/open-telemetry/opentelemetry-js/issues/2951 + * @template {unknown} T + * @template {any} TReturn + * @template {unknown} TNext + * @param {import('@opentelemetry/api').Context} ctx + * @param {AsyncGenerator} generator + * @returns {AsyncGenerator} + */ +function bindAsyncGenerator (ctx, generator) { + return { + next: context.bind(ctx, generator.next.bind(generator)), + return: context.bind(ctx, generator.return.bind(generator)), + throw: context.bind(ctx, generator.throw.bind(generator)), + + [Symbol.asyncIterator] () { + return bindAsyncGenerator(ctx, generator[Symbol.asyncIterator]()) + }, + [Symbol.asyncDispose]: context.bind(ctx, generator[Symbol.asyncDispose]?.bind(generator)) + } +} + /** * @template {unknown[]} A * @template {*} T From 80320a75cd78bdcb9546f57207d4d4a7f6f56b68 Mon Sep 17 00:00:00 2001 From: hannahhoward Date: Tue, 24 Dec 2024 17:07:27 -0800 Subject: [PATCH 06/11] refactor(batching): use generator for memory usage --- package.json | 1 - pnpm-lock.yaml | 3 -- src/api.ts | 63 +++++++++++++++++++++++++++++++++++++++-- src/fetcher/batching.js | 4 +-- src/fetcher/lib.js | 4 +-- 5 files changed, 65 insertions(+), 10 deletions(-) diff --git a/package.json b/package.json index 4fa2ba2..c21f6cb 100644 --- a/package.json +++ b/package.json @@ -61,7 +61,6 @@ "@web3-storage/blob-index": "^1.0.2", "@web3-storage/content-claims": "^5.1.0", "multiformats": "^13.1.0", - "multipart-byte-range": "^3.0.1", "p-defer": "^4.0.1", "p-queue": "^8.0.1", "uint8arraylist": "^2.4.8", diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 24092fe..e2d779e 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -32,9 +32,6 @@ importers: multiformats: specifier: ^13.1.0 version: 13.1.0 - multipart-byte-range: - specifier: ^3.0.1 - version: 3.0.1 p-defer: specifier: ^4.0.1 version: 4.0.1 diff --git a/src/api.ts b/src/api.ts index 130a7c9..c05b7e4 100644 --- a/src/api.ts +++ b/src/api.ts @@ -1,11 +1,70 @@ import { ByteView, MultihashDigest } from 'multiformats' import { Failure, Result, URI, DID } from '@ucanto/interface' -import { Range } from 'multipart-byte-range' import { QueryError } from '@storacha/indexing-service-client/api' export { ByteView, MultihashDigest } from 'multiformats' export { Failure, Result, URI, DID, Principal } from '@ucanto/interface' -export { Range, SuffixRange, AbsoluteRange } from 'multipart-byte-range' + +/** + * An absolute byte range to extract - always an array of two values + * corresponding to the first and last bytes (both inclusive). e.g. + * + * ``` + * [100, 200] + * ``` + */ +export type AbsoluteRange = [first: number, last: number] + +/** + * A suffix byte range - always an array of one value corresponding to the + * first byte to start extraction from (inclusive). e.g. + * + * ``` + * [900] + * ``` + * + * If it is unknown how large a resource is, the last `n` bytes + * can be requested by specifying a negative value: + * + * ``` + * [-100] + * ``` + */ +export type SuffixRange = [first: number] + +/** + * Byte range to extract - an array of one or two values corresponding to the + * first and last bytes (both inclusive). e.g. + * + * ``` + * [100, 200] + * ``` + * + * Omitting the second value requests all remaining bytes of the resource. e.g. + * + * ``` + * [900] + * ``` + * + * Alternatively, if it's unknown how large a resource is, the last `n` bytes + * can be requested by specifying a negative value: + * + * ``` + * [-100] + * ``` + */ +export type Range = AbsoluteRange | SuffixRange + +export type ByteGetter = (range: AbsoluteRange) => Promise> + +export interface EncoderOptions { + /** Mime type of each part. */ + contentType?: string + /** Total size of the object in bytes. */ + totalSize?: number + /** Stream queuing strategy. */ + strategy?: QueuingStrategy +} export interface Abortable { signal: AbortSignal diff --git a/src/fetcher/batching.js b/src/fetcher/batching.js index e534311..bec7937 100644 --- a/src/fetcher/batching.js +++ b/src/fetcher/batching.js @@ -157,7 +157,7 @@ class BatchingFetcher { */ export const create = (locator) => new BatchingFetcher(locator) -/** @typedef {{range: import('multipart-byte-range').AbsoluteRange, digest: API.MultihashDigest, orig: API.Range | undefined}} ResolvedRange */ +/** @typedef {{range: API.AbsoluteRange, digest: API.MultihashDigest, orig: API.Range | undefined}} ResolvedRange */ /** * Fetch blobs from the passed locations. The locations MUST share a common @@ -188,7 +188,7 @@ export const fetchBlobs = withResultSpan('fetchBlobs', let found = false for (const l of s.location) { if (l.toString() === url.toString()) { - /** @type {import('multipart-byte-range').AbsoluteRange} */ + /** @type {API.AbsoluteRange} */ let resolvedRange = [s.range.offset, s.range.offset + s.range.length - 1] if (range) { const relRange = resolveRange(range, s.range.length) diff --git a/src/fetcher/lib.js b/src/fetcher/lib.js index e754270..f5c062f 100644 --- a/src/fetcher/lib.js +++ b/src/fetcher/lib.js @@ -1,7 +1,7 @@ /** - * @param {import('multipart-byte-range').Range} range + * @param {import('../api.js').Range} range * @param {number} totalSize - * @returns {import('multipart-byte-range').AbsoluteRange} + * @returns {import('../api.js').AbsoluteRange} */ export const resolveRange = (range, totalSize) => { let last = range[1] From 321d03c0b0612925e302cff3b7a421328b3c7201 Mon Sep 17 00:00:00 2001 From: hannahhoward Date: Tue, 24 Dec 2024 17:31:51 -0800 Subject: [PATCH 07/11] refactor(fetcher): document algorithms used, cleanup code --- src/fetcher/batching.js | 188 +++++++++++++++++++++------------------- 1 file changed, 101 insertions(+), 87 deletions(-) diff --git a/src/fetcher/batching.js b/src/fetcher/batching.js index bec7937..0820a1a 100644 --- a/src/fetcher/batching.js +++ b/src/fetcher/batching.js @@ -106,7 +106,7 @@ class BatchingFetcher { break } } - lastResolveBlobs = resolveBlobs(fetchRes.ok, pendingReqs) + lastResolveBlobs = resolveRequests(fetchRes.ok, pendingReqs) } // await the last call to resolve blobs @@ -157,131 +157,145 @@ class BatchingFetcher { */ export const create = (locator) => new BatchingFetcher(locator) -/** @typedef {{range: API.AbsoluteRange, digest: API.MultihashDigest, orig: API.Range | undefined}} ResolvedRange */ +/** @typedef {{range: API.AbsoluteRange, digest: API.MultihashDigest, orig: API.Range | undefined}} ResolvedBlobs */ /** * Fetch blobs from the passed locations. The locations MUST share a common * site to fetch from. */ -export const fetchBlobs = withResultSpan('fetchBlobs', +export const fetchBlobs = withResultSpan('fetchBlobs', _fetchBlobs) + /** * @param {URL} url Desired URL to fetch blobs from. * @param {Array<{ location: API.Location, range?: API.Range }>} locations * @returns {Promise>, API.NotFound|API.Aborted|API.NetworkError>>} */ - async (url, locations) => { - if (locations.length === 1) { - const res = await fetchBlob(locations[0].location, locations[0].range) - if (res.error) return res - return { - ok: (async function * () { - yield { blob: res.ok, range: locations[0].range } - return { ok: true } - }()) - } +async function _fetchBlobs (url, locations) { + if (locations.length === 1) { + const res = await fetchBlob(locations[0].location, locations[0].range) + if (res.error) return res + return { + ok: (async function * () { + yield { blob: res.ok, range: locations[0].range } + return { ok: true } + }()) } + } + + // resolve ranges for blobs - /** @type {ResolvedRange[]} */ - const ranges = [] - for (const { location, range } of locations) { - for (const s of location.site) { - let found = false - for (const l of s.location) { - if (l.toString() === url.toString()) { + /** @type {ResolvedBlobs[]} */ + const resolvedBlobs = [] + for (const { location, range } of locations) { + for (const s of location.site) { + let found = false + for (const l of s.location) { + if (l.toString() === url.toString()) { /** @type {API.AbsoluteRange} */ - let resolvedRange = [s.range.offset, s.range.offset + s.range.length - 1] - if (range) { - const relRange = resolveRange(range, s.range.length) - resolvedRange = [s.range.offset + relRange[0], s.range.offset + relRange[1]] - } - ranges.push({ - digest: location.digest, - range: resolvedRange, - orig: range - }) - found = true - break + let resolvedRange = [s.range.offset, s.range.offset + s.range.length - 1] + if (range) { + const relRange = resolveRange(range, s.range.length) + resolvedRange = [s.range.offset + relRange[0], s.range.offset + relRange[1]] } + resolvedBlobs.push({ + digest: location.digest, + range: resolvedRange, + orig: range + }) + found = true + break } - if (found) break } + if (found) break } - if (ranges.length !== locations.length) { - throw new Error('no common site') - } + } + if (resolvedBlobs.length !== locations.length) { + throw new Error('no common site') + } - ranges.sort((a, b) => a.range[0] - b.range[0]) - const aggregateRangeEnd = ranges.reduce((aggregateEnd, r) => r.range[1] > aggregateEnd ? r.range[1] : aggregateEnd, 0) - const headers = { Range: `bytes=${ranges[0].range[0]}-${aggregateRangeEnd}` } - try { - const res = await fetch(url, { headers }) - if (!res.ok) { - return { error: new NetworkError(url, { cause: new Error(`unexpected HTTP status: ${res.status}`) }) } - } - return { ok: consumeMultipartResponse(url, ranges, res) } - } catch (err) { - return { error: new NetworkError(url, { cause: err }) } + // sort blobs by starting byte + resolvedBlobs.sort((a, b) => a.range[0] - b.range[0]) + // get last byte to fetch + const aggregateRangeEnd = resolvedBlobs.reduce((aggregateEnd, r) => r.range[1] > aggregateEnd ? r.range[1] : aggregateEnd, 0) + // fetch bytes from the first starting byte to the last byte to fetch + const headers = { Range: `bytes=${resolvedBlobs[0].range[0]}-${aggregateRangeEnd}` } + try { + const res = await fetch(url, { headers }) + if (!res.ok) { + return { error: new NetworkError(url, { cause: new Error(`unexpected HTTP status: ${res.status}`) }) } } - }) + return { ok: consumeBatchResponse(url, resolvedBlobs, res) } + } catch (err) { + return { error: new NetworkError(url, { cause: err }) } + } +} /** @typedef {{blob: API.Blob, range: API.Range | undefined}} BlobResult */ /** - * Consumes a multipart range request to create multiple blobs + * Consumes a batch request to create multiple blobs. Will break up + * a byte range going from first byte byte of first blob to last byte of last blob + * into appropriate ranges for each blob */ -const consumeMultipartResponse = withAsyncGeneratorSpan('consumeMultipartResponse', +const consumeBatchResponse = withAsyncGeneratorSpan('consumeBatchResponse', _consumeBatchResponse) + /** * @param {URL} url - * @param {ResolvedRange[]} sortedRanges + * @param {ResolvedBlobs[]} sortedBlobs * @param {Response} res * @returns {AsyncGenerator>} */ - async function * (url, sortedRanges, res) { - if (!res.body) { - return { error: new NetworkError(url, { cause: new Error('missing repsonse body') }) } - } - const parts = new Uint8ArrayList() - let farthestRead = sortedRanges[0].range[0] - let farthestConsumed = sortedRanges[0].range[0] - let currentRange = 0 - try { - for await (const chunk of res.body) { - // append the chunk to our buffer - parts.append(chunk) - // update the absolute range of what we've read - farthestRead += chunk.byteLength - // read and push any blobs in the current buffer - // note that as long as ranges are sorted ascending by start - // this should be resilient to overlapping ranges - while (farthestRead >= sortedRanges[currentRange].range[1] + 1) { - const blob = new Blob(sortedRanges[currentRange].digest, - parts.subarray(sortedRanges[currentRange].range[0] - farthestConsumed, sortedRanges[currentRange].range[1] + 1 - farthestConsumed)) - yield ({ blob, range: sortedRanges[currentRange].orig }) - currentRange++ - if (currentRange >= sortedRanges.length) { - return { ok: true } - } - let toConsume = sortedRanges[currentRange].range[0] - farthestConsumed - if (toConsume > parts.byteLength) { toConsume = parts.byteLength } - parts.consume(toConsume) - farthestConsumed += toConsume +async function * _consumeBatchResponse (url, sortedBlobs, res) { + if (!res.body) { + return { error: new NetworkError(url, { cause: new Error('missing repsonse body') }) } + } + const parts = new Uint8ArrayList() + // start at first byte of first blob + let farthestRead = sortedBlobs[0].range[0] + let farthestConsumed = sortedBlobs[0].range[0] + let currentBlob = 0 + try { + for await (const chunk of res.body) { + // append the chunk to our buffer + parts.append(chunk) + // update the absolute position of how far we've read + farthestRead += chunk.byteLength + // resolve any blobs in the current buffer + // note that as long as blobs are sorted ascending by start + // this should be resilient to overlapping ranges + while (farthestRead >= sortedBlobs[currentBlob].range[1] + 1) { + // generate blob out of the current buffer + const blob = new Blob(sortedBlobs[currentBlob].digest, + parts.subarray(sortedBlobs[currentBlob].range[0] - farthestConsumed, sortedBlobs[currentBlob].range[1] + 1 - farthestConsumed)) + yield ({ blob, range: sortedBlobs[currentBlob].orig }) + currentBlob++ + if (currentBlob >= sortedBlobs.length) { + return { ok: true } } + // consume any bytes we no longer need + // (they are before the beginning of the current range) + let toConsume = sortedBlobs[currentBlob].range[0] - farthestConsumed + if (toConsume > parts.byteLength) { toConsume = parts.byteLength } + parts.consume(toConsume) + farthestConsumed += toConsume } - return { error: new NetworkError(url, { cause: new Error('did not resolve all chunks') }) } - } catch (err) { - return { error: new NetworkError(url, { cause: err }) } } - }) + return { error: new NetworkError(url, { cause: new Error('did not resolve all chunks') }) } + } catch (err) { + return { error: new NetworkError(url, { cause: err }) } + } +} /** + * Resolve pending requests from blobs generated out of the last fetch * - * @param {AsyncGenerator>} results + * @param {AsyncGenerator>} blobResults * @param {DigestMap} pendingReqs * @returns {Promise>} */ -const resolveBlobs = async (results, pendingReqs) => { +const resolveRequests = async (blobResults, pendingReqs) => { for (;;) { - const { value: result, done } = await results.next() + const { value: result, done } = await blobResults.next() if (done) { return result } From c828a191667e5c1cffa7f1c49a348e7c583a5006 Mon Sep 17 00:00:00 2001 From: hannahhoward Date: Fri, 3 Jan 2025 20:50:13 -0800 Subject: [PATCH 08/11] feat(fetcher): allow passing a custom fetch implementation allows simple & batching fetchers to use a custom fetch implementation. also exposes tracing library. --- package.json | 4 ++++ src/fetcher/batching.js | 19 +++++++++++++------ src/fetcher/simple.js | 17 ++++++++++++----- 3 files changed, 29 insertions(+), 11 deletions(-) diff --git a/package.json b/package.json index c21f6cb..8fd3a1b 100644 --- a/package.json +++ b/package.json @@ -50,6 +50,10 @@ "./locator/indexing-service": { "import": "./src/locator/indexing-service/index.js", "types": "./dist/src/locator/indexing-service/index.d.ts" + }, + "./tracing/tracing": { + "import": "./src/tracing/tracing.js", + "types": "./dist/src/tracing/tracing.d.ts" } }, "dependencies": { diff --git a/src/fetcher/batching.js b/src/fetcher/batching.js index 0820a1a..00bac3f 100644 --- a/src/fetcher/batching.js +++ b/src/fetcher/batching.js @@ -19,6 +19,7 @@ const MAX_BATCH_SIZE = 16 /** @implements {API.Fetcher} */ class BatchingFetcher { #locator + #fetch /** @type {DigestMap} */ #pendingReqs = new DigestMap() @@ -31,9 +32,13 @@ class BatchingFetcher { /** @type {Promise|null} */ #processing = null - /** @param {API.Locator} locator */ - constructor (locator) { + /** + * @param {API.Locator} locator + * @param {typeof globalThis.fetch} [fetch] + */ + constructor (locator, fetch = globalThis.fetch.bind(globalThis)) { this.#locator = locator + this.#fetch = fetch } #scheduleBatchProcessing () { @@ -93,7 +98,7 @@ class BatchingFetcher { if (locs.length >= MAX_BATCH_SIZE) break } - const fetchRes = await fetchBlobs(siteURL, locs) + const fetchRes = await fetchBlobs(siteURL, locs, this.#fetch) // if we have an error, stop if (fetchRes.error) { break @@ -153,9 +158,10 @@ class BatchingFetcher { /** * Create a new batching blob fetcher. * @param {API.Locator} locator + * @param {typeof globalThis.fetch} [fetch] * @returns {API.Fetcher} */ -export const create = (locator) => new BatchingFetcher(locator) +export const create = (locator, fetch = globalThis.fetch.bind(globalThis)) => new BatchingFetcher(locator, fetch) /** @typedef {{range: API.AbsoluteRange, digest: API.MultihashDigest, orig: API.Range | undefined}} ResolvedBlobs */ @@ -168,11 +174,12 @@ export const fetchBlobs = withResultSpan('fetchBlobs', _fetchBlobs) /** * @param {URL} url Desired URL to fetch blobs from. * @param {Array<{ location: API.Location, range?: API.Range }>} locations + * @param {typeof globalThis.fetch} [fetch] * @returns {Promise>, API.NotFound|API.Aborted|API.NetworkError>>} */ -async function _fetchBlobs (url, locations) { +async function _fetchBlobs (url, locations, fetch = globalThis.fetch.bind(globalThis)) { if (locations.length === 1) { - const res = await fetchBlob(locations[0].location, locations[0].range) + const res = await fetchBlob(locations[0].location, locations[0].range, fetch) if (res.error) return res return { ok: (async function * () { diff --git a/src/fetcher/simple.js b/src/fetcher/simple.js index 3c3cb59..41a6261 100644 --- a/src/fetcher/simple.js +++ b/src/fetcher/simple.js @@ -7,10 +7,15 @@ import { withResultSpan } from '../tracing/tracing.js' /** @implements {API.Fetcher} */ class SimpleFetcher { #locator + #fetch - /** @param {API.Locator} locator */ - constructor (locator) { + /** + * @param {API.Locator} locator + * @param {typeof globalThis.fetch} [fetch] + */ + constructor (locator, fetch = globalThis.fetch.bind(globalThis)) { this.#locator = locator + this.#fetch = fetch } /** @@ -20,24 +25,26 @@ class SimpleFetcher { async fetch (digest, options) { const locResult = await this.#locator.locate(digest, options) if (locResult.error) return locResult - return fetchBlob(locResult.ok, options?.range) + return fetchBlob(locResult.ok, options?.range, this.#fetch) } } /** * Create a new blob fetcher. * @param {API.Locator} locator + * @param {typeof globalThis.fetch} [fetch] * @returns {API.Fetcher} */ -export const create = (locator) => new SimpleFetcher(locator) +export const create = (locator, fetch = globalThis.fetch.bind(globalThis)) => new SimpleFetcher(locator, fetch) export const fetchBlob = withResultSpan('fetchBlob', /** * Fetch a blob from the passed location. * @param {API.Location} location * @param {API.Range} [range] + * @param {typeof globalThis.fetch} [fetch] */ - async (location, range) => { + async (location, range, fetch = globalThis.fetch.bind(globalThis)) => { let networkError for (const site of location.site) { From 3ebc9c5cde851ec067396d146aca14662149647c Mon Sep 17 00:00:00 2001 From: hannahhoward Date: Fri, 3 Jan 2025 20:59:20 -0800 Subject: [PATCH 09/11] 2.4.4-rc.0 --- package.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/package.json b/package.json index 8fd3a1b..9ebadc1 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "@web3-storage/blob-fetcher", - "version": "2.4.3", + "version": "2.4.4-rc.0", "description": "A blob fetcher that batches requests and reads multipart byterange responses.", "main": "src/index.js", "type": "module", From 46fa9711ea516f622db1d35ec33c721176304b72 Mon Sep 17 00:00:00 2001 From: hannahhoward Date: Tue, 7 Jan 2025 12:43:54 -0800 Subject: [PATCH 10/11] feat(blob-fetcher): revert no multipart-byte-range fix(blob-fetcher): remove unused package --- package.json | 2 +- pnpm-lock.yaml | 6 ++--- src/fetcher/batching.js | 57 +++++++++++++---------------------------- 3 files changed, 22 insertions(+), 43 deletions(-) diff --git a/package.json b/package.json index 9ebadc1..8610906 100644 --- a/package.json +++ b/package.json @@ -65,9 +65,9 @@ "@web3-storage/blob-index": "^1.0.2", "@web3-storage/content-claims": "^5.1.0", "multiformats": "^13.1.0", + "multipart-byte-range": "^3.0.1", "p-defer": "^4.0.1", "p-queue": "^8.0.1", - "uint8arraylist": "^2.4.8", "zod": "^3.23.8" }, "devDependencies": { diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index e2d779e..5c3b9e4 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -32,15 +32,15 @@ importers: multiformats: specifier: ^13.1.0 version: 13.1.0 + multipart-byte-range: + specifier: ^3.0.1 + version: 3.0.1 p-defer: specifier: ^4.0.1 version: 4.0.1 p-queue: specifier: ^8.0.1 version: 8.0.1 - uint8arraylist: - specifier: ^2.4.8 - version: 2.4.8 zod: specifier: ^3.23.8 version: 3.23.8 diff --git a/src/fetcher/batching.js b/src/fetcher/batching.js index 00bac3f..0d84cce 100644 --- a/src/fetcher/batching.js +++ b/src/fetcher/batching.js @@ -6,7 +6,7 @@ import { NetworkError, NotFoundError } from '../lib.js' import { fetchBlob } from './simple.js' import { resolveRange } from './lib.js' import { withAsyncGeneratorSpan, withResultSpan } from '../tracing/tracing.js' -import { Uint8ArrayList } from 'uint8arraylist' +import { MultipartByteRangeDecoder, getBoundary } from 'multipart-byte-range' /** * @typedef {'*'|`${number},${number}`|`${number}`} RangeKey @@ -220,12 +220,7 @@ async function _fetchBlobs (url, locations, fetch = globalThis.fetch.bind(global throw new Error('no common site') } - // sort blobs by starting byte - resolvedBlobs.sort((a, b) => a.range[0] - b.range[0]) - // get last byte to fetch - const aggregateRangeEnd = resolvedBlobs.reduce((aggregateEnd, r) => r.range[1] > aggregateEnd ? r.range[1] : aggregateEnd, 0) - // fetch bytes from the first starting byte to the last byte to fetch - const headers = { Range: `bytes=${resolvedBlobs[0].range[0]}-${aggregateRangeEnd}` } + const headers = { Range: `bytes=${resolvedBlobs.map(r => `${r.range[0]}-${r.range[1]}`).join(',')}` } try { const res = await fetch(url, { headers }) if (!res.ok) { @@ -248,46 +243,30 @@ const consumeBatchResponse = withAsyncGeneratorSpan('consumeBatchResponse', _con /** * @param {URL} url - * @param {ResolvedBlobs[]} sortedBlobs + * @param {ResolvedBlobs[]} resolvedBlobs * @param {Response} res * @returns {AsyncGenerator>} */ -async function * _consumeBatchResponse (url, sortedBlobs, res) { +async function * _consumeBatchResponse (url, resolvedBlobs, res) { if (!res.body) { return { error: new NetworkError(url, { cause: new Error('missing repsonse body') }) } } - const parts = new Uint8ArrayList() - // start at first byte of first blob - let farthestRead = sortedBlobs[0].range[0] - let farthestConsumed = sortedBlobs[0].range[0] - let currentBlob = 0 + + const boundary = getBoundary(res.headers) + if (!boundary) { + return { error: new NetworkError(url, { cause: new Error('missing multipart boundary') }) } + } + + let i = 0 + try { - for await (const chunk of res.body) { - // append the chunk to our buffer - parts.append(chunk) - // update the absolute position of how far we've read - farthestRead += chunk.byteLength - // resolve any blobs in the current buffer - // note that as long as blobs are sorted ascending by start - // this should be resilient to overlapping ranges - while (farthestRead >= sortedBlobs[currentBlob].range[1] + 1) { - // generate blob out of the current buffer - const blob = new Blob(sortedBlobs[currentBlob].digest, - parts.subarray(sortedBlobs[currentBlob].range[0] - farthestConsumed, sortedBlobs[currentBlob].range[1] + 1 - farthestConsumed)) - yield ({ blob, range: sortedBlobs[currentBlob].orig }) - currentBlob++ - if (currentBlob >= sortedBlobs.length) { - return { ok: true } - } - // consume any bytes we no longer need - // (they are before the beginning of the current range) - let toConsume = sortedBlobs[currentBlob].range[0] - farthestConsumed - if (toConsume > parts.byteLength) { toConsume = parts.byteLength } - parts.consume(toConsume) - farthestConsumed += toConsume - } + for await (const chunk of res.body.pipeThrough(new MultipartByteRangeDecoder(boundary))) { + // generate blob out of the current buffer + const blob = new Blob(resolvedBlobs[i].digest, chunk.content) + yield ({ blob, range: resolvedBlobs[i].orig }) + i++ } - return { error: new NetworkError(url, { cause: new Error('did not resolve all chunks') }) } + return { ok: true } } catch (err) { return { error: new NetworkError(url, { cause: err }) } } From 6d951052c00d115933865846d7ffe1f85cf672c7 Mon Sep 17 00:00:00 2001 From: hannahhoward Date: Wed, 8 Jan 2025 17:13:54 -0800 Subject: [PATCH 11/11] refactor(api): remove copied types --- src/api.ts | 63 ++---------------------------------------------------- 1 file changed, 2 insertions(+), 61 deletions(-) diff --git a/src/api.ts b/src/api.ts index c05b7e4..7c14926 100644 --- a/src/api.ts +++ b/src/api.ts @@ -1,70 +1,11 @@ import { ByteView, MultihashDigest } from 'multiformats' import { Failure, Result, URI, DID } from '@ucanto/interface' import { QueryError } from '@storacha/indexing-service-client/api' +import { Range } from 'multipart-byte-range' export { ByteView, MultihashDigest } from 'multiformats' export { Failure, Result, URI, DID, Principal } from '@ucanto/interface' - -/** - * An absolute byte range to extract - always an array of two values - * corresponding to the first and last bytes (both inclusive). e.g. - * - * ``` - * [100, 200] - * ``` - */ -export type AbsoluteRange = [first: number, last: number] - -/** - * A suffix byte range - always an array of one value corresponding to the - * first byte to start extraction from (inclusive). e.g. - * - * ``` - * [900] - * ``` - * - * If it is unknown how large a resource is, the last `n` bytes - * can be requested by specifying a negative value: - * - * ``` - * [-100] - * ``` - */ -export type SuffixRange = [first: number] - -/** - * Byte range to extract - an array of one or two values corresponding to the - * first and last bytes (both inclusive). e.g. - * - * ``` - * [100, 200] - * ``` - * - * Omitting the second value requests all remaining bytes of the resource. e.g. - * - * ``` - * [900] - * ``` - * - * Alternatively, if it's unknown how large a resource is, the last `n` bytes - * can be requested by specifying a negative value: - * - * ``` - * [-100] - * ``` - */ -export type Range = AbsoluteRange | SuffixRange - -export type ByteGetter = (range: AbsoluteRange) => Promise> - -export interface EncoderOptions { - /** Mime type of each part. */ - contentType?: string - /** Total size of the object in bytes. */ - totalSize?: number - /** Stream queuing strategy. */ - strategy?: QueuingStrategy -} +export { Range, SuffixRange, AbsoluteRange } from 'multipart-byte-range' export interface Abortable { signal: AbortSignal