diff --git a/push/index.ts b/push/index.ts index 9b4cc66..6d73228 100644 --- a/push/index.ts +++ b/push/index.ts @@ -1,4 +1,4 @@ -import { $, CryptoHasher, file, write } from "bun"; +import { $, CryptoHasher, file, sleep, write } from "bun"; import { extract } from "tar"; import stream from "node:stream"; @@ -71,7 +71,7 @@ if (!(await file(tarFile).exists())) { await mkdir(imagePath); - const result = await extract({ + await extract({ file: tarFile, cwd: imagePath, }); @@ -251,7 +251,6 @@ async function pushLayer(layerDigest: string, readableStream: ReadableStream, to throw new Error(`oci-chunk-max-length header is malformed (not a number)`); } - const reader = readableStream.getReader(); const uploadId = createUploadResponse.headers.get("docker-upload-uuid"); if (uploadId === null) { throw new Error("Docker-Upload-UUID not defined in headers"); @@ -271,9 +270,13 @@ async function pushLayer(layerDigest: string, readableStream: ReadableStream, to let written = 0; let previousReadable: ReadableLimiter | undefined; let totalLayerSizeLeft = totalLayerSize; + const reader = readableStream.getReader(); + let fail = "true"; + let failures = 0; while (totalLayerSizeLeft > 0) { const range = `0-${Math.min(end, totalLayerSize) - 1}`; const current = new ReadableLimiter(reader as ReadableStreamDefaultReader, maxToWrite, previousReadable); + await current.init(); const patchChunkUploadURL = parseLocation(location); // we have to do fetchNode because Bun doesn't allow setting custom Content-Length. // https://github.com/oven-sh/bun/issues/10507 @@ -284,14 +287,19 @@ async function pushLayer(layerDigest: string, readableStream: ReadableStream, to "range": range, "authorization": cred, "content-length": `${Math.min(totalLayerSizeLeft, maxToWrite)}`, + "x-fail": fail, }), }); if (!patchChunkResult.ok) { - throw new Error( - `uploading chunk ${patchChunkUploadURL} returned ${patchChunkResult.status}: ${await patchChunkResult.text()}`, - ); + previousReadable = current; + console.error(`${layerDigest}: Pushing ${range} failed with ${patchChunkResult.status}, retrying`); + await sleep(500); + if (failures++ >= 2) fail = "false"; + continue; } + fail = "true"; + current.ok(); const rangeResponse = patchChunkResult.headers.get("range"); if (rangeResponse !== range) { throw new Error(`unexpected Range header ${rangeResponse}, expected ${range}`); @@ -308,18 +316,24 @@ async function pushLayer(layerDigest: string, readableStream: ReadableStream, to const range = `0-${written - 1}`; const uploadURL = new URL(parseLocation(location)); uploadURL.searchParams.append("digest", layerDigest); - const response = await fetch(uploadURL.toString(), { - method: "PUT", - headers: new Headers({ - Range: range, - Authorization: cred, - }), - }); - if (!response.ok) { - throw new Error(`${uploadURL.toString()} failed with ${response.status}: ${await response.text()}`); + for (let tries = 0; tries < 3; tries++) { + const response = await fetch(uploadURL.toString(), { + method: "PUT", + headers: new Headers({ + Range: range, + Authorization: cred, + }), + }); + if (!response.ok) { + console.error(`${layerDigest}: Finishing ${range} failed with ${response.status}, retrying`); + continue; + } + + console.log("Pushed", layerDigest); + return; } - console.log("Pushed", layerDigest); + throw new Error(`Could not push after multiple tries`); } const layersManifest = [] as { diff --git a/push/limiter.ts b/push/limiter.ts index 422e068..8d08f88 100644 --- a/push/limiter.ts +++ b/push/limiter.ts @@ -6,6 +6,8 @@ import stream from "node:stream"; export class ReadableLimiter extends stream.Readable { public written: number = 0; private leftover: Uint8Array | undefined; + private promise: Promise | undefined; + private accumulator: Uint8Array[]; constructor( // reader will be used to read bytes until limit. @@ -17,7 +19,27 @@ export class ReadableLimiter extends stream.Readable { ) { super(); - if (previousReader) this.leftover = previousReader.leftover; + if (previousReader) { + this.leftover = previousReader.leftover; + if (previousReader.accumulator.length > 0) { + this.promise = new Blob(previousReader.accumulator).bytes(); + previousReader.accumulator = []; + } + } + + this.accumulator = []; + } + + async init() { + if (this.promise !== undefined) { + if (this.leftover !== undefined && this.leftover.length > 0) + this.leftover = await new Blob([await this.promise, this.leftover ?? []]).bytes(); + else this.leftover = await this.promise; + } + } + + ok() { + this.accumulator = []; } _read(): void { @@ -27,6 +49,7 @@ export class ReadableLimiter extends stream.Readable { if (this.leftover !== undefined) { const toPushNow = this.leftover.slice(0, this.limit); + this.accumulator.push(toPushNow); this.leftover = this.leftover.slice(this.limit); this.push(toPushNow); this.limit -= toPushNow.length; @@ -50,7 +73,7 @@ export class ReadableLimiter extends stream.Readable { } if (arr.length === 0) return this.push(null); - + this.accumulator.push(arr); this.push(arr); this.limit -= arr.length; this.written += arr.length;