diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 31d3fbc..20cb85f 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -17,12 +17,12 @@ jobs: - name: Install pnpm uses: pnpm/action-setup@v4 with: - version: 9 + version: 9.9 - name: Use Node uses: actions/setup-node@v4 with: node-version: ${{ matrix.node-version }} - cache: 'pnpm' + cache: "pnpm" - run: pnpm install --frozen-lockfile --child-concurrency=10 - run: cp wrangler.toml.example wrangler.toml diff --git a/index.test.ts b/index.test.ts index 42fefd3..633eb88 100644 --- a/index.test.ts +++ b/index.test.ts @@ -1,8 +1,7 @@ -import { afterAll, beforeEach, describe, expect, test } from "vitest"; +import { afterAll, describe, expect, test } from "vitest"; import { SHA256_PREFIX_LEN, getSHA256 } from "./src/user"; import { TagsList } from "./src/router"; import { Env } from "."; -import * as fetchAuth from "./index"; import { RegistryTokens } from "./src/token"; import { RegistryAuthProtocolTokenPayload } from "./src/auth"; import { registries } from "./src/registry/registry"; @@ -187,11 +186,39 @@ describe("v2 manifests", () => { test("PUT then DELETE /v2/:name/manifests/:reference works", async () => { const { sha256 } = await createManifest("hello-world", await generateManifest("hello-world"), "hello"); const bindings = env as Env; + + { + const listObjects = await bindings.REGISTRY.list({ prefix: "hello-world/blobs/" }); + expect(listObjects.objects.length).toEqual(1); + + const gcRes = await fetch(new Request("http://registry.com/v2/hello-world/gc", { method: "POST" })); + if (!gcRes.ok) { + throw new Error(`${gcRes.status}: ${await gcRes.text()}`); + } + + const listObjectsAfterGC = await bindings.REGISTRY.list({ prefix: "hello-world/blobs/" }); + expect(listObjectsAfterGC.objects.length).toEqual(1); + } + expect(await bindings.REGISTRY.head(`hello-world/manifests/hello`)).toBeTruthy(); const res = await fetch(createRequest("DELETE", `/v2/hello-world/manifests/${sha256}`, null)); expect(res.status).toEqual(202); expect(await bindings.REGISTRY.head(`hello-world/manifests/${sha256}`)).toBeNull(); expect(await bindings.REGISTRY.head(`hello-world/manifests/hello`)).toBeNull(); + + const listObjects = await bindings.REGISTRY.list({ prefix: "hello-world/blobs/" }); + expect(listObjects.objects.length).toEqual(1); + + const listObjectsManifests = await bindings.REGISTRY.list({ prefix: "hello-world/manifests/" }); + expect(listObjectsManifests.objects.length).toEqual(0); + + const gcRes = await fetch(new Request("http://registry.com/v2/hello-world/gc", { method: "POST" })); + if (!gcRes.ok) { + throw new Error(`${gcRes.status}: ${await gcRes.text()}`); + } + + const listObjectsAfterGC = await bindings.REGISTRY.list({ prefix: "hello-world/blobs/" }); + expect(listObjectsAfterGC.objects.length).toEqual(0); }); test("PUT multiple parts then DELETE /v2/:name/manifests/:reference works", async () => { diff --git a/src/errors.ts b/src/errors.ts index c1f8056..95e655f 100644 --- a/src/errors.ts +++ b/src/errors.ts @@ -70,6 +70,30 @@ export class InternalError extends Response { } } +export class ManifestError extends Response { + constructor( + code: "MANIFEST_INVALID" | "BLOB_UNKNOWN" | "MANIFEST_UNVERIFIED" | "TAG_INVALID" | "NAME_INVALID", + message: string, + detail: Record = {}, + ) { + const jsonBody = JSON.stringify({ + errors: [ + { + code, + message, + detail, + }, + ], + }); + super(jsonBody, { + status: 400, + headers: { + "content-type": "application/json;charset=UTF-8", + }, + }); + } +} + export class ServerError extends Response { constructor(message: string, errorCode = 500) { super(JSON.stringify({ errors: [{ code: "SERVER_ERROR", message, detail: null }] }), { diff --git a/src/manifest.ts b/src/manifest.ts new file mode 100644 index 0000000..11e2bd1 --- /dev/null +++ b/src/manifest.ts @@ -0,0 +1,31 @@ +import { z } from "zod"; + +// https://github.com/opencontainers/image-spec/blob/main/manifest.md +export const manifestSchema = z.object({ + schemaVersion: z.literal(2), + artifactType: z.string().optional(), + // to maintain retrocompatibility of the registry, let's not assume mediaTypes + mediaType: z.string(), + config: z.object({ + mediaType: z.string(), + digest: z.string(), + size: z.number().int(), + }), + layers: z.array( + z.object({ + size: z.number().int(), + mediaType: z.string(), + digest: z.string(), + }), + ), + annotations: z.record(z.string()).optional(), + subject: z + .object({ + mediaType: z.string(), + digest: z.string(), + size: z.number().int(), + }) + .optional(), +}); + +export type ManifestSchema = z.infer; diff --git a/src/registry/garbage-collector.ts b/src/registry/garbage-collector.ts index 7d85835..9ec41d9 100644 --- a/src/registry/garbage-collector.ts +++ b/src/registry/garbage-collector.ts @@ -2,12 +2,36 @@ // Unreferenced will delete all blobs that are not referenced by any manifest. // Untagged will delete all blobs that are not referenced by any manifest and are not tagged. -export type GARBAGE_COLLECTOR_MODE = "unreferenced" | "untagged"; +import { ManifestSchema } from "../manifest"; + +export type GarbageCollectionMode = "unreferenced" | "untagged"; export type GCOptions = { name: string; - mode: GARBAGE_COLLECTOR_MODE; + mode: GarbageCollectionMode; }; +// The garbage collector checks for dangling layers in the namespace. It's a lock free +// GC, but on-conflict (when there is an ongoing manifest insertion, or an ongoing garbage collection), +// the methods can throw errors. +// +// Summary: +// insertParent() { +// mark = updateInsertMark(); // mark insertion +// defer cleanInsertMark(mark); +// checkEveryChildIsOK(); +// getDeletionMarkIsFalse(); // make sure not ongoing deletion mark after checking child is in db +// insertParent(); // insert parent in db +// } +// +// gc() { +// setDeletionMark() // marks deletion as gc +// defer { cleanDeletionMark(); } // clean up mark +// checkNotOngoingInsertMark() // makes sure not ongoing updateInsertMark +// deleteChildrenWithoutParent(); // go ahead and clean children +// } +// +// This makes it so: after every layer is OK we can proceed and insert the manifest, as there is no ongoing GC +// In the GC code, if there is an insertion on-going, there is an error. export class GarbageCollector { private registry: R2Bucket; @@ -15,6 +39,54 @@ export class GarbageCollector { this.registry = registry; } + async markForGarbageCollection(namespace: string): Promise { + const etag = crypto.randomUUID(); + const deletion = await this.registry.put(`${namespace}/deletion`, etag); + if (deletion === null) throw new Error("unreachable"); + return etag; + } + + async cleanupGarbageCollectionMark(namespace: string) { + await this.registry.delete(`${namespace}/deletion`); + } + + async checkCanInsertData(namespace: string): Promise { + const deletion = await this.registry.head(`${namespace}/deletion`); + if (deletion === null) { + return true; + } + + return false; + } + + // If successful, it inserted in R2 that its going + // to start inserting data that might conflight with GC. + async markForInsertion(namespace: string): Promise { + const uid = crypto.randomUUID(); + const deletion = await this.registry.put(`${namespace}/insertion/${uid}`, uid); + if (deletion === null) throw new Error("unreachable"); + return uid; + } + + async cleanInsertion(namespace: string, tag: string) { + await this.registry.delete(`${namespace}/insertion/${tag}`); + } + + async checkIfGCCanContinue(namespace: string): Promise { + const objects = await this.registry.list({ prefix: `${namespace}/insertion` }); + for (const object of objects.objects) { + if (object.uploaded.getTime() + 1000 * 60 <= Date.now()) { + await this.registry.delete(object.key); + } else { + return false; + } + } + + // call again to clean more + if (objects.truncated) return false; + return true; + } + private async list(prefix: string, callback: (object: R2Object) => Promise): Promise { const listed = await this.registry.list({ prefix }); for (const object of listed.objects) { @@ -22,6 +94,7 @@ export class GarbageCollector { return false; } } + let truncated = listed.truncated; let cursor = listed.truncated ? listed.cursor : undefined; @@ -39,7 +112,18 @@ export class GarbageCollector { } async collect(options: GCOptions): Promise { - let referencedBlobs = new Set(); // We can run out of memory, this should be a bloom filter + await this.markForGarbageCollection(options.name); + try { + return await this.collectInner(options); + } finally { + // if this fails, user can always call a custom endpoint to clean it up + await this.cleanupGarbageCollectionMark(options.name); + } + } + + private async collectInner(options: GCOptions): Promise { + // We can run out of memory, this should be a bloom filter + let referencedBlobs = new Set(); await this.list(`${options.name}/manifests/`, async (manifestObject) => { const tag = manifestObject.key.split("/").pop(); @@ -51,22 +135,25 @@ export class GarbageCollector { return true; } - const manifestData = await manifest.text(); + const manifestData = (await manifest.json()) as ManifestSchema; + manifestData.layers.forEach((layer) => { + referencedBlobs.add(layer.digest); + }); - const layerRegex = /sha256:[a-f0-9]{64}/g; - let match; - while ((match = layerRegex.exec(manifestData)) !== null) { - referencedBlobs.add(match[0]); - } return true; }); let unreferencedKeys: string[] = []; + const deleteThreshold = 15; await this.list(`${options.name}/blobs/`, async (object) => { + if (!(await this.checkIfGCCanContinue(options.name))) { + throw new Error("there is a manifest insertion going, the garbage collection shall stop"); + } + const hash = object.key.split("/").pop(); if (hash && !referencedBlobs.has(hash)) { unreferencedKeys.push(object.key); - if (unreferencedKeys.length > 100) { + if (unreferencedKeys.length > deleteThreshold) { await this.registry.delete(unreferencedKeys); unreferencedKeys = []; } diff --git a/src/registry/http.ts b/src/registry/http.ts index 5c09093..ae3734d 100644 --- a/src/registry/http.ts +++ b/src/registry/http.ts @@ -1,7 +1,7 @@ import { Env } from "../.."; import { InternalError } from "../errors"; import { errorString } from "../utils"; -import { GARBAGE_COLLECTOR_MODE } from "./garbage-collector"; +import { GarbageCollectionMode } from "./garbage-collector"; import { CheckLayerResponse, CheckManifestResponse, @@ -475,7 +475,7 @@ export class RegistryHTTPClient implements Registry { throw new Error("unimplemented"); } - collectGarbage(_context: ExecutionContext, _namespace: string, _mode: GARBAGE_COLLECTOR_MODE): Promise { + garbageCollection(_namespace: string, _mode: GarbageCollectionMode): Promise { throw new Error("unimplemented"); } } diff --git a/src/registry/r2.ts b/src/registry/r2.ts index 222391e..3b723fb 100644 --- a/src/registry/r2.ts +++ b/src/registry/r2.ts @@ -9,7 +9,7 @@ import { limit, split, } from "../chunk"; -import { InternalError, RangeError, ServerError } from "../errors"; +import { InternalError, ManifestError, RangeError, ServerError } from "../errors"; import { SHA256_PREFIX_LEN, getSHA256, hexToDigest } from "../user"; import { readableToBlob, readerToBlob, wrap } from "../utils"; import { BlobUnknownError, ManifestUnknownError } from "../v2-errors"; @@ -27,6 +27,8 @@ import { UploadObject, wrapError, } from "./registry"; +import { GarbageCollectionMode, GarbageCollector } from "./garbage-collector"; +import { ManifestSchema, manifestSchema } from "../manifest"; export type Chunk = | { @@ -126,7 +128,11 @@ export async function getUploadState( } export class R2Registry implements Registry { - constructor(private env: Env) {} + private gc: GarbageCollector; + + constructor(private env: Env) { + this.gc = new GarbageCollector(this.env.REGISTRY); + } async manifestExists(name: string, reference: string): Promise { const [res, err] = await wrap(this.env.REGISTRY.head(`${name}/manifests/${reference}`)); @@ -169,13 +175,19 @@ export class R2Registry implements Registry { const repositories: Record = {}; let totalRecords = 0; let lastSeen: string | undefined; - const objectExistsInPath = (entry: string) => { + const objectExistsInPath = (entry?: string) => { + if (entry === undefined) return false; const parts = entry.split("/"); const repository = parts.slice(0, parts.length - 2).join("/"); return repository in repositories; }; + const repositoriesOrder: string[] = []; const addObjectPath = (object: R2Object) => { + if (totalRecords >= options.limit && !objectExistsInPath(object.key)) { + return; + } + // update lastSeen for cursoring purposes lastSeen = object.key; // don't add if seen before @@ -185,22 +197,23 @@ export class R2Registry implements Registry { // /<'blobs' | 'manifests'>/ const parts = object.key.split("/"); const repository = parts.slice(0, parts.length - 2).join("/"); - if (!(repository in repositories)) { - totalRecords++; - } + if (parts[parts.length - 2] === "blobs") return; + if (repository in repositories) return; + totalRecords++; repositories[repository] = {}; + repositoriesOrder.push(repository); }; const r2Objects = await this.env.REGISTRY.list({ - limit: options.limit, + limit: 50, startAfter: options.startAfter, }); r2Objects.objects.forEach((path) => addObjectPath(path)); let cursor = r2Objects.truncated ? r2Objects.cursor : undefined; while (cursor !== undefined && totalRecords < options.limit) { const next = await this.env.REGISTRY.list({ - limit: options.limit, + limit: 50, cursor, }); next.objects.forEach((path) => addObjectPath(path)); @@ -213,18 +226,19 @@ export class R2Registry implements Registry { while (cursor !== undefined && typeof lastSeen === "string" && objectExistsInPath(lastSeen)) { const nextList: R2Objects = await this.env.REGISTRY.list({ - limit: 1000, + limit: 50, cursor, }); let found = false; // Search for the next object in the list for (const object of nextList.objects) { - lastSeen = object.key; if (!objectExistsInPath(lastSeen)) { found = true; break; } + + lastSeen = object.key; } if (found) break; @@ -239,17 +253,41 @@ export class R2Registry implements Registry { } } - if (cursor === undefined) { - lastSeen = undefined; - } - return { - repositories: Object.keys(repositories), + repositories: repositoriesOrder, cursor: lastSeen, }; } + async verifyManifest(name: string, manifest: ManifestSchema) { + const layers = [...manifest.layers, manifest.config]; + for (const key of layers) { + const res = await this.env.REGISTRY.head(`${name}/blobs/${key.digest}`); + if (res === null) { + console.error(`Digest ${key} doesn't exist`); + return new ManifestError("BLOB_UNKNOWN", `unknown blob ${key.digest}`); + } + } + + return null; + } + async putManifest( + namespace: string, + reference: string, + readableStream: ReadableStream, + contentType: string, + ): Promise { + const key = await this.gc.markForInsertion(namespace); + try { + return this.putManifestInner(namespace, reference, readableStream, contentType); + } finally { + // if this fails, at some point it will be expired + await this.gc.cleanInsertion(namespace, key); + } + } + + async putManifestInner( name: string, reference: string, readableStream: ReadableStream, @@ -265,10 +303,19 @@ export class R2Registry implements Registry { const digest = await sha256.digest; const digestStr = hexToDigest(digest); const text = await blob.text(); + const manifestJSON = JSON.parse(text); + const manifest = manifestSchema.parse(manifestJSON); + const verifyManifestErr = await this.verifyManifest(name, manifest); + if (verifyManifestErr !== null) return { response: verifyManifestErr }; + + if (!(await this.gc.checkCanInsertData(name))) { + console.error("Manifest can't be uploaded as there is a garbage collection going"); + return { response: new ServerError("garbage collection is on-going... check with registry administrator", 500) }; + } + const putReference = async () => { // if the reference is the same as a digest, it's not necessary to insert if (reference === digestStr) return; - // TODO: If we're overriding an existing manifest here, should we update the original manifest references? return await env.REGISTRY.put(`${name}/manifests/${reference}`, text, { sha256: digest, httpMetadata: { @@ -276,7 +323,8 @@ export class R2Registry implements Registry { }, }); }; - await Promise.allSettled([ + + await Promise.all([ putReference(), // this is the "main" manifest env.REGISTRY.put(`${name}/manifests/${digestStr}`, text, { @@ -682,10 +730,8 @@ export class R2Registry implements Registry { }; } - async collectGarbage(context: ExecutionContext, namespace: string, mode: GARBAGE_COLLECTOR_MODE): Promise { - const gc = new GarbageCollector(this.env.REGISTRY); - const result = gc.collect({ name: namespace, mode: mode }); - context.waitUntil(result); - return await result; + async garbageCollection(namespace: string, mode: GarbageCollectionMode): Promise { + const result = await this.gc.collect({ name: namespace, mode: mode }); + return result; } } diff --git a/src/registry/registry.ts b/src/registry/registry.ts index cffbf60..8c684b2 100644 --- a/src/registry/registry.ts +++ b/src/registry/registry.ts @@ -2,6 +2,7 @@ import { Env } from "../.."; import { InternalError } from "../errors"; import { errorString } from "../utils"; import z from "zod"; +import { GarbageCollectionMode } from "./garbage-collector"; // Defines a registry and how it's configured const registryConfiguration = z.object({ @@ -166,6 +167,8 @@ export interface Registry { stream?: ReadableStream, length?: number, ): Promise; + + garbageCollection(namespace: string, mode: GarbageCollectionMode): Promise; } export function wrapError(method: string, err: unknown): RegistryError { diff --git a/src/router.ts b/src/router.ts index 00654ab..0a92340 100644 --- a/src/router.ts +++ b/src/router.ts @@ -562,13 +562,13 @@ v2Router.delete("/:name+/blobs/:digest", async (req, env: Env) => { }); }); -v2Router.post("/:name+/collectgarbage", async (req, env: Env, context: ExecutionContext) => { +v2Router.post("/:name+/gc", async (req, env: Env) => { const { name } = req.params; const mode = req.query.mode ?? "unreferenced"; if (mode !== "unreferenced" && mode !== "untagged") { throw new ServerError("Mode must be either 'unreferenced' or 'untagged'", 400); } - const result = await env.REGISTRY_CLIENT.collectGarbage(context, name, mode); + const result = await env.REGISTRY_CLIENT.garbageCollection(name, mode); return new Response(JSON.stringify({ success: result })); });