Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: dynamic dag traversal #163

Open
wants to merge 12 commits into
base: main
Choose a base branch
from
2 changes: 2 additions & 0 deletions packages/verified-fetch/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,9 @@
"interface-datastore": "^8.3.1",
"ipfs-unixfs-exporter": "^13.6.1",
"ipns": "^10.0.0",
"it-first": "^3.0.6",
"it-map": "^3.1.1",
"it-peekable": "^3.0.5",
"it-pipe": "^3.0.1",
"it-tar": "^6.0.5",
"it-to-browser-readablestream": "^2.0.9",
Expand Down
104 changes: 104 additions & 0 deletions packages/verified-fetch/src/utils/enhanced-dag-traversal.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
import { type ComponentLogger } from '@libp2p/interface'
import { type ReadableStorage, exporter, type ExporterOptions } from 'ipfs-unixfs-exporter'
import first from 'it-first'
import peekable from 'it-peekable'
import toBrowserReadableStream from 'it-to-browser-readablestream'
import { type CID } from 'multiformats/cid'
import { type ContentTypeParser } from '../types.js'
import { getContentType } from './get-content-type.js'

export interface EnhancedDagTraversalOptions extends ExporterOptions {
blockstore: ReadableStorage
cidOrPath: string | CID
logger: ComponentLogger
path: string
contentTypeParser?: ContentTypeParser
}

export interface EnhancedDagTraversalResponse {
stream: ReadableStream<Uint8Array>
firstChunk: Uint8Array
}

export async function enhancedDagTraversal ({
blockstore,
signal,
onProgress,
cidOrPath,
offset,
length,
path,
logger,
contentTypeParser
}: EnhancedDagTraversalOptions): Promise<EnhancedDagTraversalResponse> {
const log = logger.forComponent('helia:verified-fetch:enhanced-dag-traversal')

const dfsEntry = await exporter(cidOrPath, blockstore, {
signal,
onProgress,
blockReadConcurrency: 1
})
SgtPooki marked this conversation as resolved.
Show resolved Hide resolved

const dfsIter = dfsEntry.content({
signal,
onProgress,
offset,
length
})

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

set blockReadConcurrency: 1 here.

set offset:0 and length to min bytes to retrieve to determine file type to ensure we only request whats needed

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated this to follow the same pattern as https://pkg.go.dev/net/http#DetectContentType for detecting the content type

let firstChunk: Uint8Array = new Uint8Array()
let error: Error
try {
// Fetch the first chunk eagerly
const peekableIter = peekable(dfsIter)
const firstPeek = await peekableIter.peek()
if (firstPeek.done === true) {
throw new Error('No content found')
}
// firstChunk = await first(dfsIter)
firstChunk = firstPeek.value
peekableIter.push(firstChunk)
} catch (err: any) {
if (signal?.aborted === true) {
error = err
log.trace('Request aborted while fetching first chunk')
} else {
throw err
}
}

return {
stream: toBrowserReadableStream({
[Symbol.asyncIterator]: async function * (): AsyncGenerator<Uint8Array, void, undefined> {
if (error != null) {
throw error
}

// Determine content type based on the first chunk
const contentType = await getContentType({ bytes: firstChunk, contentTypeParser, path, log })

const isImageOrVideo = contentType.startsWith('video/') || contentType.startsWith('image/')
log.trace('Content type determined: %s', contentType)

const exporterEntry = isImageOrVideo
? dfsEntry
// If not image/video, switch to a BFS iterator
: await exporter(cidOrPath, blockstore, {
signal,
onProgress
})
SgtPooki marked this conversation as resolved.
Show resolved Hide resolved

// continue with the BFS iterator
SgtPooki marked this conversation as resolved.
Show resolved Hide resolved
for await (const chunk of exporterEntry.content({
signal,
onProgress,
offset,
length
})) {
yield chunk
}
}
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

test if we need to set blockReadConcurrency here, based on either offset===0 or isImageOrVideo

}),
firstChunk
}
}
36 changes: 36 additions & 0 deletions packages/verified-fetch/src/utils/get-content-type.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
import { type Logger } from '@libp2p/interface'
import { type ContentTypeParser } from '../types.js'
import { isPromise } from './type-guards.js'

export interface GetContentTypeOptions {
bytes: Uint8Array
path: string
defaultContentType?: string
contentTypeParser?: ContentTypeParser
log: Logger
}

export async function getContentType ({ bytes, contentTypeParser, path, log, defaultContentType = 'application/octet-stream' }: GetContentTypeOptions): Promise<string> {
let contentType: string | undefined

if (contentTypeParser != null) {
try {
let fileName = path.split('/').pop()?.trim()
fileName = fileName === '' ? undefined : fileName
const parsed = contentTypeParser(bytes, fileName)

if (isPromise(parsed)) {
const result = await parsed

if (result != null) {
contentType = result
}
} else if (parsed != null) {
contentType = parsed
}
} catch (err) {
log.error('error parsing content type', err)
}
}
return contentType ?? defaultContentType
}
26 changes: 3 additions & 23 deletions packages/verified-fetch/src/utils/set-content-type.ts
Original file line number Diff line number Diff line change
@@ -1,38 +1,18 @@
import { type Logger } from '@libp2p/interface'
import { type ContentTypeParser } from '../types.js'
import { isPromise } from './type-guards.js'
import { getContentType } from './get-content-type.js'

export interface SetContentTypeOptions {
bytes: Uint8Array
path: string
response: Response
defaultContentType?: string
contentTypeParser: ContentTypeParser | undefined
contentTypeParser?: ContentTypeParser
log: Logger
}

export async function setContentType ({ bytes, path, response, contentTypeParser, log, defaultContentType = 'application/octet-stream' }: SetContentTypeOptions): Promise<void> {
let contentType: string | undefined

if (contentTypeParser != null) {
try {
let fileName = path.split('/').pop()?.trim()
fileName = fileName === '' ? undefined : fileName
const parsed = contentTypeParser(bytes, fileName)

if (isPromise(parsed)) {
const result = await parsed

if (result != null) {
contentType = result
}
} else if (parsed != null) {
contentType = parsed
}
} catch (err) {
log.error('error parsing content type', err)
}
}
const contentType = await getContentType({ bytes, contentTypeParser, path, log, defaultContentType })
log.trace('setting content type to "%s"', contentType ?? defaultContentType)
response.headers.set('content-type', contentType ?? defaultContentType)
}
23 changes: 9 additions & 14 deletions packages/verified-fetch/src/verified-fetch.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@ import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string'
import { toString as uint8ArrayToString } from 'uint8arrays/to-string'
import { ByteRangeContext } from './utils/byte-range-context.js'
import { dagCborToSafeJSON } from './utils/dag-cbor-to-safe-json.js'
import { enhancedDagTraversal } from './utils/enhanced-dag-traversal.js'
import { getContentDispositionFilename } from './utils/get-content-disposition-filename.js'
import { getETag } from './utils/get-e-tag.js'
import { getPeerIdFromString } from './utils/get-peer-id-from-string.js'
import { getResolvedAcceptHeader } from './utils/get-resolved-accept-header.js'
import { getStreamFromAsyncIterable } from './utils/get-stream-from-async-iterable.js'
import { tarStream } from './utils/get-tar-stream.js'
import { getRedirectResponse } from './utils/handle-redirects.js'
import { parseResource } from './utils/parse-resource.js'
Expand Down Expand Up @@ -375,22 +375,16 @@ export class VerifiedFetch {
this.log.trace('calling exporter for %c/%s with offset=%o & length=%o', resolvedCID, path, offset, length)

try {
const entry = await exporter(resolvedCID, this.helia.blockstore, {
signal: options?.signal,
onProgress: options?.onProgress
})

const asyncIter = entry.content({
const { firstChunk, stream } = await enhancedDagTraversal({
blockstore: this.helia.blockstore,
signal: options?.signal,
onProgress: options?.onProgress,
cidOrPath: resolvedCID,
offset,
length
})
SgtPooki marked this conversation as resolved.
Show resolved Hide resolved
this.log('got async iterator for %c/%s', cid, path)

const { stream, firstChunk } = await getStreamFromAsyncIterable(asyncIter, path ?? '', this.helia.logger, {
onProgress: options?.onProgress,
signal: options?.signal
length,
path,
logger: this.helia.logger,
contentTypeParser: this.contentTypeParser
})
byteRangeContext.setBody(stream)
// if not a valid range request, okRangeRequest will call okResponse
Expand All @@ -399,6 +393,7 @@ export class VerifiedFetch {
})

await setContentType({ bytes: firstChunk, path, response, contentTypeParser: this.contentTypeParser, log: this.log })

setIpfsRoots(response, ipfsRoots)

return response
Expand Down
Loading