Skip to content

Commit

Permalink
push: Implement retry logic (test)
Browse files Browse the repository at this point in the history
  • Loading branch information
gabivlj committed Nov 26, 2024
1 parent b28155e commit 9511163
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 18 deletions.
46 changes: 30 additions & 16 deletions push/index.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { $, CryptoHasher, file, write } from "bun";
import { $, CryptoHasher, file, sleep, write } from "bun";
import { extract } from "tar";

import stream from "node:stream";
Expand Down Expand Up @@ -71,7 +71,7 @@ if (!(await file(tarFile).exists())) {

await mkdir(imagePath);

const result = await extract({
await extract({
file: tarFile,
cwd: imagePath,
});
Expand Down Expand Up @@ -251,7 +251,6 @@ async function pushLayer(layerDigest: string, readableStream: ReadableStream, to
throw new Error(`oci-chunk-max-length header is malformed (not a number)`);
}

const reader = readableStream.getReader();
const uploadId = createUploadResponse.headers.get("docker-upload-uuid");
if (uploadId === null) {
throw new Error("Docker-Upload-UUID not defined in headers");
Expand All @@ -271,9 +270,13 @@ async function pushLayer(layerDigest: string, readableStream: ReadableStream, to
let written = 0;
let previousReadable: ReadableLimiter | undefined;
let totalLayerSizeLeft = totalLayerSize;
const reader = readableStream.getReader();
let fail = "true";
let failures = 0;
while (totalLayerSizeLeft > 0) {
const range = `0-${Math.min(end, totalLayerSize) - 1}`;
const current = new ReadableLimiter(reader as ReadableStreamDefaultReader, maxToWrite, previousReadable);
await current.init();
const patchChunkUploadURL = parseLocation(location);
// we have to do fetchNode because Bun doesn't allow setting custom Content-Length.
// https://github.com/oven-sh/bun/issues/10507
Expand All @@ -284,14 +287,19 @@ async function pushLayer(layerDigest: string, readableStream: ReadableStream, to
"range": range,
"authorization": cred,
"content-length": `${Math.min(totalLayerSizeLeft, maxToWrite)}`,
"x-fail": fail,
}),
});
if (!patchChunkResult.ok) {
throw new Error(
`uploading chunk ${patchChunkUploadURL} returned ${patchChunkResult.status}: ${await patchChunkResult.text()}`,
);
previousReadable = current;
console.error(`${layerDigest}: Pushing ${range} failed with ${patchChunkResult.status}, retrying`);
await sleep(500);
if (failures++ >= 2) fail = "false";
continue;
}

fail = "true";
current.ok();
const rangeResponse = patchChunkResult.headers.get("range");
if (rangeResponse !== range) {
throw new Error(`unexpected Range header ${rangeResponse}, expected ${range}`);
Expand All @@ -308,18 +316,24 @@ async function pushLayer(layerDigest: string, readableStream: ReadableStream, to
const range = `0-${written - 1}`;
const uploadURL = new URL(parseLocation(location));
uploadURL.searchParams.append("digest", layerDigest);
const response = await fetch(uploadURL.toString(), {
method: "PUT",
headers: new Headers({
Range: range,
Authorization: cred,
}),
});
if (!response.ok) {
throw new Error(`${uploadURL.toString()} failed with ${response.status}: ${await response.text()}`);
for (let tries = 0; tries < 3; tries++) {
const response = await fetch(uploadURL.toString(), {
method: "PUT",
headers: new Headers({
Range: range,
Authorization: cred,
}),
});
if (!response.ok) {
console.error(`${layerDigest}: Finishing ${range} failed with ${response.status}, retrying`);
continue;
}

console.log("Pushed", layerDigest);
return;
}

console.log("Pushed", layerDigest);
throw new Error(`Could not push after multiple tries`);
}

const layersManifest = [] as {
Expand Down
27 changes: 25 additions & 2 deletions push/limiter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import stream from "node:stream";
export class ReadableLimiter extends stream.Readable {
public written: number = 0;
private leftover: Uint8Array | undefined;
private promise: Promise<Uint8Array> | undefined;
private accumulator: Uint8Array[];

constructor(
// reader will be used to read bytes until limit.
Expand All @@ -17,7 +19,27 @@ export class ReadableLimiter extends stream.Readable {
) {
super();

if (previousReader) this.leftover = previousReader.leftover;
if (previousReader) {
this.leftover = previousReader.leftover;
if (previousReader.accumulator.length > 0) {
this.promise = new Blob(previousReader.accumulator).bytes();
previousReader.accumulator = [];
}
}

this.accumulator = [];
}

async init() {
if (this.promise !== undefined) {
if (this.leftover !== undefined && this.leftover.length > 0)
this.leftover = await new Blob([await this.promise, this.leftover ?? []]).bytes();
else this.leftover = await this.promise;
}
}

ok() {
this.accumulator = [];
}

_read(): void {
Expand All @@ -27,6 +49,7 @@ export class ReadableLimiter extends stream.Readable {

if (this.leftover !== undefined) {
const toPushNow = this.leftover.slice(0, this.limit);
this.accumulator.push(toPushNow);
this.leftover = this.leftover.slice(this.limit);
this.push(toPushNow);
this.limit -= toPushNow.length;
Expand All @@ -50,7 +73,7 @@ export class ReadableLimiter extends stream.Readable {
}

if (arr.length === 0) return this.push(null);

this.accumulator.push(arr);
this.push(arr);
this.limit -= arr.length;
this.written += arr.length;
Expand Down

0 comments on commit 9511163

Please sign in to comment.