Skip to content

Commit

Permalink
registry: add concurrency safety in the garbage collector
Browse files Browse the repository at this point in the history
  • Loading branch information
gabivlj committed Sep 23, 2024
1 parent 47e8e8f commit 4362f90
Show file tree
Hide file tree
Showing 9 changed files with 258 additions and 40 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@ jobs:
- name: Install pnpm
uses: pnpm/action-setup@v4
with:
version: 9
version: 9.9
- name: Use Node
uses: actions/setup-node@v4
with:
node-version: ${{ matrix.node-version }}
cache: 'pnpm'
cache: "pnpm"

- run: pnpm install --frozen-lockfile --child-concurrency=10
- run: cp wrangler.toml.example wrangler.toml
Expand Down
31 changes: 29 additions & 2 deletions index.test.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
import { afterAll, beforeEach, describe, expect, test } from "vitest";
import { afterAll, describe, expect, test } from "vitest";
import { SHA256_PREFIX_LEN, getSHA256 } from "./src/user";
import { TagsList } from "./src/router";
import { Env } from ".";
import * as fetchAuth from "./index";
import { RegistryTokens } from "./src/token";
import { RegistryAuthProtocolTokenPayload } from "./src/auth";
import { registries } from "./src/registry/registry";
Expand Down Expand Up @@ -187,11 +186,39 @@ describe("v2 manifests", () => {
test("PUT then DELETE /v2/:name/manifests/:reference works", async () => {
const { sha256 } = await createManifest("hello-world", await generateManifest("hello-world"), "hello");
const bindings = env as Env;

{
const listObjects = await bindings.REGISTRY.list({ prefix: "hello-world/blobs/" });
expect(listObjects.objects.length).toEqual(1);

const gcRes = await fetch(new Request("http://registry.com/v2/hello-world/gc", { method: "POST" }));
if (!gcRes.ok) {
throw new Error(`${gcRes.status}: ${await gcRes.text()}`);
}

const listObjectsAfterGC = await bindings.REGISTRY.list({ prefix: "hello-world/blobs/" });
expect(listObjectsAfterGC.objects.length).toEqual(1);
}

expect(await bindings.REGISTRY.head(`hello-world/manifests/hello`)).toBeTruthy();
const res = await fetch(createRequest("DELETE", `/v2/hello-world/manifests/${sha256}`, null));
expect(res.status).toEqual(202);
expect(await bindings.REGISTRY.head(`hello-world/manifests/${sha256}`)).toBeNull();
expect(await bindings.REGISTRY.head(`hello-world/manifests/hello`)).toBeNull();

const listObjects = await bindings.REGISTRY.list({ prefix: "hello-world/blobs/" });
expect(listObjects.objects.length).toEqual(1);

const listObjectsManifests = await bindings.REGISTRY.list({ prefix: "hello-world/manifests/" });
expect(listObjectsManifests.objects.length).toEqual(0);

const gcRes = await fetch(new Request("http://registry.com/v2/hello-world/gc", { method: "POST" }));
if (!gcRes.ok) {
throw new Error(`${gcRes.status}: ${await gcRes.text()}`);
}

const listObjectsAfterGC = await bindings.REGISTRY.list({ prefix: "hello-world/blobs/" });
expect(listObjectsAfterGC.objects.length).toEqual(0);
});

test("PUT multiple parts then DELETE /v2/:name/manifests/:reference works", async () => {
Expand Down
24 changes: 24 additions & 0 deletions src/errors.ts
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,30 @@ export class InternalError extends Response {
}
}

export class ManifestError extends Response {
constructor(
code: "MANIFEST_INVALID" | "BLOB_UNKNOWN" | "MANIFEST_UNVERIFIED" | "TAG_INVALID" | "NAME_INVALID",
message: string,
detail: Record<string, string> = {},
) {
const jsonBody = JSON.stringify({
errors: [
{
code,
message,
detail,
},
],
});
super(jsonBody, {
status: 400,
headers: {
"content-type": "application/json;charset=UTF-8",
},
});
}
}

export class ServerError extends Response {
constructor(message: string, errorCode = 500) {
super(JSON.stringify({ errors: [{ code: "SERVER_ERROR", message, detail: null }] }), {
Expand Down
31 changes: 31 additions & 0 deletions src/manifest.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
import { z } from "zod";

// https://github.com/opencontainers/image-spec/blob/main/manifest.md
export const manifestSchema = z.object({
schemaVersion: z.literal(2),
artifactType: z.string().optional(),
// to maintain retrocompatibility of the registry, let's not assume mediaTypes
mediaType: z.string(),
config: z.object({
mediaType: z.string(),
digest: z.string(),
size: z.number().int(),
}),
layers: z.array(
z.object({
size: z.number().int(),
mediaType: z.string(),
digest: z.string(),
}),
),
annotations: z.record(z.string()).optional(),
subject: z
.object({
mediaType: z.string(),
digest: z.string(),
size: z.number().int(),
})
.optional(),
});

export type ManifestSchema = z.infer<typeof manifestSchema>;
107 changes: 97 additions & 10 deletions src/registry/garbage-collector.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,26 +2,99 @@
// Unreferenced will delete all blobs that are not referenced by any manifest.
// Untagged will delete all blobs that are not referenced by any manifest and are not tagged.

export type GARBAGE_COLLECTOR_MODE = "unreferenced" | "untagged";
import { ManifestSchema } from "../manifest";

export type GarbageCollectionMode = "unreferenced" | "untagged";
export type GCOptions = {
name: string;
mode: GARBAGE_COLLECTOR_MODE;
mode: GarbageCollectionMode;
};

// The garbage collector checks for dangling layers in the namespace. It's a lock free
// GC, but on-conflict (when there is an ongoing manifest insertion, or an ongoing garbage collection),
// the methods can throw errors.
//
// Summary:
// insertParent() {
// mark = updateInsertMark(); // mark insertion
// defer cleanInsertMark(mark);
// checkEveryChildIsOK();
// getDeletionMarkIsFalse(); // make sure not ongoing deletion mark after checking child is in db
// insertParent(); // insert parent in db
// }
//
// gc() {
// setDeletionMark() // marks deletion as gc
// defer { cleanDeletionMark(); } // clean up mark
// checkNotOngoingInsertMark() // makes sure not ongoing updateInsertMark
// deleteChildrenWithoutParent(); // go ahead and clean children
// }
//
// This makes it so: after every layer is OK we can proceed and insert the manifest, as there is no ongoing GC
// In the GC code, if there is an insertion on-going, there is an error.
export class GarbageCollector {
private registry: R2Bucket;

constructor(registry: R2Bucket) {
this.registry = registry;
}

async markForGarbageCollection(namespace: string): Promise<string> {
const etag = crypto.randomUUID();
const deletion = await this.registry.put(`${namespace}/deletion`, etag);
if (deletion === null) throw new Error("unreachable");
return etag;
}

async cleanupGarbageCollectionMark(namespace: string) {
await this.registry.delete(`${namespace}/deletion`);
}

async checkCanInsertData(namespace: string): Promise<boolean> {
const deletion = await this.registry.head(`${namespace}/deletion`);
if (deletion === null) {
return true;
}

return false;
}

// If successful, it inserted in R2 that its going
// to start inserting data that might conflight with GC.
async markForInsertion(namespace: string): Promise<string> {
const uid = crypto.randomUUID();
const deletion = await this.registry.put(`${namespace}/insertion/${uid}`, uid);
if (deletion === null) throw new Error("unreachable");
return uid;
}

async cleanInsertion(namespace: string, tag: string) {
await this.registry.delete(`${namespace}/insertion/${tag}`);
}

async checkIfGCCanContinue(namespace: string): Promise<boolean> {
const objects = await this.registry.list({ prefix: `${namespace}/insertion` });
for (const object of objects.objects) {
if (object.uploaded.getTime() + 1000 * 60 <= Date.now()) {
await this.registry.delete(object.key);
} else {
return false;
}
}

// call again to clean more
if (objects.truncated) return false;
return true;
}

private async list(prefix: string, callback: (object: R2Object) => Promise<boolean>): Promise<boolean> {
const listed = await this.registry.list({ prefix });
for (const object of listed.objects) {
if ((await callback(object)) === false) {
return false;
}
}

let truncated = listed.truncated;
let cursor = listed.truncated ? listed.cursor : undefined;

Expand All @@ -39,7 +112,18 @@ export class GarbageCollector {
}

async collect(options: GCOptions): Promise<boolean> {
let referencedBlobs = new Set<string>(); // We can run out of memory, this should be a bloom filter
await this.markForGarbageCollection(options.name);
try {
return await this.collectInner(options);
} finally {
// if this fails, user can always call a custom endpoint to clean it up
await this.cleanupGarbageCollectionMark(options.name);
}
}

private async collectInner(options: GCOptions): Promise<boolean> {
// We can run out of memory, this should be a bloom filter
let referencedBlobs = new Set<string>();

await this.list(`${options.name}/manifests/`, async (manifestObject) => {
const tag = manifestObject.key.split("/").pop();
Expand All @@ -51,22 +135,25 @@ export class GarbageCollector {
return true;
}

const manifestData = await manifest.text();
const manifestData = (await manifest.json()) as ManifestSchema;
manifestData.layers.forEach((layer) => {
referencedBlobs.add(layer.digest);
});

const layerRegex = /sha256:[a-f0-9]{64}/g;
let match;
while ((match = layerRegex.exec(manifestData)) !== null) {
referencedBlobs.add(match[0]);
}
return true;
});

let unreferencedKeys: string[] = [];
const deleteThreshold = 15;
await this.list(`${options.name}/blobs/`, async (object) => {
if (!(await this.checkIfGCCanContinue(options.name))) {
throw new Error("there is a manifest insertion going, the garbage collection shall stop");
}

const hash = object.key.split("/").pop();
if (hash && !referencedBlobs.has(hash)) {
unreferencedKeys.push(object.key);
if (unreferencedKeys.length > 100) {
if (unreferencedKeys.length > deleteThreshold) {
await this.registry.delete(unreferencedKeys);
unreferencedKeys = [];
}
Expand Down
4 changes: 2 additions & 2 deletions src/registry/http.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { Env } from "../..";
import { InternalError } from "../errors";
import { errorString } from "../utils";
import { GARBAGE_COLLECTOR_MODE } from "./garbage-collector";
import { GarbageCollectionMode } from "./garbage-collector";
import {
CheckLayerResponse,
CheckManifestResponse,
Expand Down Expand Up @@ -475,7 +475,7 @@ export class RegistryHTTPClient implements Registry {
throw new Error("unimplemented");
}

collectGarbage(_context: ExecutionContext, _namespace: string, _mode: GARBAGE_COLLECTOR_MODE): Promise<boolean> {
garbageCollection(_namespace: string, _mode: GarbageCollectionMode): Promise<boolean> {
throw new Error("unimplemented");
}
}
Expand Down
Loading

0 comments on commit 4362f90

Please sign in to comment.