Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(fetcher): first step at optimization #30

Merged
merged 12 commits into from
Jan 9, 2025
Merged
7 changes: 6 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@web3-storage/blob-fetcher",
"version": "2.4.3",
"version": "2.4.4-rc.0",
"description": "A blob fetcher that batches requests and reads multipart byterange responses.",
"main": "src/index.js",
"type": "module",
Expand Down Expand Up @@ -50,11 +50,16 @@
"./locator/indexing-service": {
"import": "./src/locator/indexing-service/index.js",
"types": "./dist/src/locator/indexing-service/index.d.ts"
},
"./tracing/tracing": {
"import": "./src/tracing/tracing.js",
"types": "./dist/src/tracing/tracing.d.ts"
}
},
"dependencies": {
"@cloudflare/workers-types": "^4.20241022.0",
"@ipld/dag-ucan": "^3.4.0",
"@opentelemetry/api": "^1.9.0",
"@storacha/indexing-service-client": "^2.0.0",
"@ucanto/interface": "^10.0.1",
"@web3-storage/blob-index": "^1.0.2",
Expand Down
9 changes: 9 additions & 0 deletions pnpm-lock.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion src/api.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { ByteView, MultihashDigest } from 'multiformats'
import { Failure, Result, URI, DID } from '@ucanto/interface'
import { Range } from 'multipart-byte-range'
import { QueryError } from '@storacha/indexing-service-client/api'
import { Range } from 'multipart-byte-range'

export { ByteView, MultihashDigest } from 'multiformats'
export { Failure, Result, URI, DID, Principal } from '@ucanto/interface'
Expand Down
167 changes: 125 additions & 42 deletions src/fetcher/batching.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@
import * as API from '../api.js'
import { DigestMap } from '@web3-storage/blob-index'
import defer from 'p-defer'
import { MultipartByteRangeDecoder, getBoundary } from 'multipart-byte-range/decoder'
import { NetworkError, NotFoundError } from '../lib.js'
import { fetchBlob } from './simple.js'
import { resolveRange } from './lib.js'
import { withAsyncGeneratorSpan, withResultSpan } from '../tracing/tracing.js'
import { MultipartByteRangeDecoder, getBoundary } from 'multipart-byte-range'

/**
* @typedef {'*'|`${number},${number}`|`${number}`} RangeKey
Expand All @@ -18,6 +19,7 @@ const MAX_BATCH_SIZE = 16
/** @implements {API.Fetcher} */
class BatchingFetcher {
#locator
#fetch

/** @type {DigestMap<API.MultihashDigest, RangedRequests>} */
#pendingReqs = new DigestMap()
Expand All @@ -30,9 +32,13 @@ class BatchingFetcher {
/** @type {Promise<void>|null} */
#processing = null

/** @param {API.Locator} locator */
constructor (locator) {
/**
* @param {API.Locator} locator
* @param {typeof globalThis.fetch} [fetch]
*/
constructor (locator, fetch = globalThis.fetch.bind(globalThis)) {
this.#locator = locator
this.#fetch = fetch
}

#scheduleBatchProcessing () {
Expand Down Expand Up @@ -66,6 +72,14 @@ class BatchingFetcher {
const pendingReqs = this.#pendingReqs
this.#pendingReqs = new DigestMap()

// Basic algorithm
// 1. assemble each http request
// 2. fire off request
// 3. once first byte received, begin processing the response async in background
// 4. immediately go to next http request, but after first iteration, wait so that we're never processing the body
// of more than one response at a time
/** @type {Promise<API.Result<true, API.NotFound|API.Aborted|API.NetworkError>> | undefined } */
let lastResolveBlobs
while (true) {
const first = queue.shift()
if (!first) break
Expand All @@ -84,16 +98,25 @@ class BatchingFetcher {
if (locs.length >= MAX_BATCH_SIZE) break
}

const res = await fetchBlobs(siteURL, locs)
if (res.error) break
for (const [i, blob] of res.ok.entries()) {
const rangeReqs = pendingReqs.get(blob.digest)
const key = rangeKey(locs[i].range)
const reqs = rangeReqs?.get(key)
reqs?.[0].resolve({ ok: blob })
reqs?.slice(1).forEach(r => r.resolve({ ok: blob.clone() }))
rangeReqs?.delete(key)
const fetchRes = await fetchBlobs(siteURL, locs, this.#fetch)
// if we have an error, stop
if (fetchRes.error) {
break
}
// if we are still processing the previous response, we should wait before we process this response
if (lastResolveBlobs !== undefined) {
const resolveRes = await lastResolveBlobs
lastResolveBlobs = undefined
if (resolveRes.error) {
break
}
}
lastResolveBlobs = resolveRequests(fetchRes.ok, pendingReqs)
}

// await the last call to resolve blobs
if (lastResolveBlobs !== undefined) {
await lastResolveBlobs
}

// resolve `undefined` for any remaining requests
Expand Down Expand Up @@ -135,83 +158,143 @@ class BatchingFetcher {
/**
* Create a new batching blob fetcher.
* @param {API.Locator} locator
* @param {typeof globalThis.fetch} [fetch]
* @returns {API.Fetcher}
*/
export const create = (locator) => new BatchingFetcher(locator)
export const create = (locator, fetch = globalThis.fetch.bind(globalThis)) => new BatchingFetcher(locator, fetch)

/** @typedef {{range: API.AbsoluteRange, digest: API.MultihashDigest, orig: API.Range | undefined}} ResolvedBlobs */

/**
* Fetch blobs from the passed locations. The locations MUST share a common
* site to fetch from.
*
*/
export const fetchBlobs = withResultSpan('fetchBlobs', _fetchBlobs)

/**
* @param {URL} url Desired URL to fetch blobs from.
* @param {Array<{ location: API.Location, range?: API.Range }>} locations
* @returns {Promise<API.Result<API.Blob[], API.NotFound|API.Aborted|API.NetworkError>>}
* @param {typeof globalThis.fetch} [fetch]
* @returns {Promise<API.Result<AsyncGenerator<BlobResult, API.Result<true, API.NotFound|API.Aborted|API.NetworkError>>, API.NotFound|API.Aborted|API.NetworkError>>}
*/
export const fetchBlobs = async (url, locations) => {
async function _fetchBlobs (url, locations, fetch = globalThis.fetch.bind(globalThis)) {
if (locations.length === 1) {
const res = await fetchBlob(locations[0].location, locations[0].range)
const res = await fetchBlob(locations[0].location, locations[0].range, fetch)
if (res.error) return res
return { ok: [res.ok] }
return {
ok: (async function * () {
yield { blob: res.ok, range: locations[0].range }
return { ok: true }
}())
}
}

const ranges = []
// resolve ranges for blobs

/** @type {ResolvedBlobs[]} */
const resolvedBlobs = []
for (const { location, range } of locations) {
for (const s of location.site) {
let found = false
for (const l of s.location) {
if (l.toString() === url.toString()) {
/** @type {import('multipart-byte-range').AbsoluteRange} */
/** @type {API.AbsoluteRange} */
let resolvedRange = [s.range.offset, s.range.offset + s.range.length - 1]
if (range) {
const relRange = resolveRange(range, s.range.length)
resolvedRange = [s.range.offset + relRange[0], s.range.offset + relRange[1]]
}
ranges.push(resolvedRange)
resolvedBlobs.push({
digest: location.digest,
range: resolvedRange,
orig: range
})
found = true
break
}
}
if (found) break
}
}
if (ranges.length !== locations.length) {
if (resolvedBlobs.length !== locations.length) {
throw new Error('no common site')
}

const headers = { Range: `bytes=${ranges.map(r => `${r[0]}-${r[1]}`).join(',')}` }
const headers = { Range: `bytes=${resolvedBlobs.map(r => `${r.range[0]}-${r.range[1]}`).join(',')}` }
try {
const res = await fetch(url, { headers })
if (!res.ok) {
return { error: new NetworkError(url, { cause: new Error(`unexpected HTTP status: ${res.status}`) }) }
}
return { ok: consumeBatchResponse(url, resolvedBlobs, res) }
} catch (err) {
return { error: new NetworkError(url, { cause: err }) }
}
}

if (!res.body) {
return { error: new NetworkError(url, { cause: new Error('missing repsonse body') }) }
}
/** @typedef {{blob: API.Blob, range: API.Range | undefined}} BlobResult */

const boundary = getBoundary(res.headers)
if (!boundary) {
return { error: new NetworkError(url, { cause: new Error('missing multipart boundary') }) }
}
/**
* Consumes a batch request to create multiple blobs. Will break up
* a byte range going from first byte byte of first blob to last byte of last blob
* into appropriate ranges for each blob
*/
const consumeBatchResponse = withAsyncGeneratorSpan('consumeBatchResponse', _consumeBatchResponse)

/** @type {API.Blob[]} */
const blobs = []
let i = 0
await res.body
.pipeThrough(new MultipartByteRangeDecoder(boundary))
.pipeTo(new WritableStream({
write (part) {
blobs.push(new Blob(locations[i].location.digest, part.content))
i++
}
}))
/**
* @param {URL} url
* @param {ResolvedBlobs[]} resolvedBlobs
* @param {Response} res
* @returns {AsyncGenerator<BlobResult, API.Result<true, API.NotFound|API.Aborted|API.NetworkError>>}
*/
async function * _consumeBatchResponse (url, resolvedBlobs, res) {
if (!res.body) {
return { error: new NetworkError(url, { cause: new Error('missing repsonse body') }) }
}

const boundary = getBoundary(res.headers)
if (!boundary) {
return { error: new NetworkError(url, { cause: new Error('missing multipart boundary') }) }
}

let i = 0

return { ok: blobs }
try {
for await (const chunk of res.body.pipeThrough(new MultipartByteRangeDecoder(boundary))) {
// generate blob out of the current buffer
const blob = new Blob(resolvedBlobs[i].digest, chunk.content)
yield ({ blob, range: resolvedBlobs[i].orig })
i++
}
return { ok: true }
} catch (err) {
return { error: new NetworkError(url, { cause: err }) }
}
}

/**
* Resolve pending requests from blobs generated out of the last fetch
*
* @param {AsyncGenerator<BlobResult, API.Result<true, API.NotFound|API.Aborted|API.NetworkError>>} blobResults
* @param {DigestMap<API.MultihashDigest, RangedRequests>} pendingReqs
* @returns {Promise<API.Result<true, API.NotFound|API.Aborted|API.NetworkError>>}
*/
const resolveRequests = async (blobResults, pendingReqs) => {
for (;;) {
const { value: result, done } = await blobResults.next()
if (done) {
return result
}
const { blob, range } = result
const rangeReqs = pendingReqs.get(blob.digest)
const key = rangeKey(range)
const reqs = rangeReqs?.get(key)
reqs?.[0].resolve({ ok: blob })
reqs?.slice(1).forEach(r => r.resolve({ ok: blob.clone() }))
rangeReqs?.delete(key)
}
}

/** @implements {API.Blob} */
class Blob {
#digest
Expand Down
4 changes: 2 additions & 2 deletions src/fetcher/lib.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/**
* @param {import('multipart-byte-range').Range} range
* @param {import('../api.js').Range} range
* @param {number} totalSize
* @returns {import('multipart-byte-range').AbsoluteRange}
* @returns {import('../api.js').AbsoluteRange}
*/
export const resolveRange = (range, totalSize) => {
let last = range[1]
Expand Down
Loading
Loading