Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add suport for layer mounting + garbage collector refactor #92

Merged
merged 13 commits into from
Feb 19, 2025
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
140 changes: 110 additions & 30 deletions src/registry/garbage-collector.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@
// Unreferenced will delete all blobs that are not referenced by any manifest.
// Untagged will delete all blobs that are not referenced by any manifest and are not tagged.

import { ServerError } from "../errors";
import { ManifestSchema } from "../manifest";
import { hexToDigest } from "../user";
import {symlinkHeader} from "./r2";

export type GarbageCollectionMode = "unreferenced" | "untagged";
export type GCOptions = {
Expand Down Expand Up @@ -147,7 +148,7 @@ export class GarbageCollector {
}

private async list(prefix: string, callback: (object: R2Object) => Promise<boolean>): Promise<boolean> {
const listed = await this.registry.list({ prefix });
const listed = await this.registry.list({ prefix: prefix, include: ["customMetadata"] });
for (const object of listed.objects) {
if ((await callback(object)) === false) {
return false;
Expand Down Expand Up @@ -182,61 +183,140 @@ export class GarbageCollector {

private async collectInner(options: GCOptions): Promise<boolean> {
// We can run out of memory, this should be a bloom filter
let referencedBlobs = new Set<string>();
const manifestList: { [key: string]: Set<string> } = {};
const mark = await this.getInsertionMark(options.name);

// List manifest from repo to be scanned
await this.list(`${options.name}/manifests/`, async (manifestObject) => {
const tag = manifestObject.key.split("/").pop();
if (!tag || (options.mode === "untagged" && tag.startsWith("sha256:"))) {
return true;
const currentHashFile = hexToDigest(manifestObject.checksums.sha256!);
if (manifestList[currentHashFile] === undefined) {
manifestList[currentHashFile] = new Set<string>();
}
const manifest = await this.registry.get(manifestObject.key);
if (!manifest) {
return true;
manifestList[currentHashFile].add(manifestObject.key);
return true;
});

// In untagged mode, search for manifest to delete
if (options.mode === "untagged") {
const manifestToRemove = new Set<string>();
const referencedManifests = new Set<string>();
// List tagged manifest to find manifest-list
for (const [_, manifests] of Object.entries(manifestList)) {
const taggedManifest = [...manifests].filter((item) => !item.split("/").pop()?.startsWith("sha256:"));
for (const manifestPath of taggedManifest) {
// Tagged manifest some, load manifest content
const manifest = await this.registry.get(manifestPath);
if (!manifest) {
continue;
}

const manifestData = (await manifest.json()) as ManifestSchema;
// Search for manifest list
if (manifestData.schemaVersion == 2 && "manifests" in manifestData) {
// Extract referenced manifests from manifest list
manifestData.manifests.forEach((manifest) => {
referencedManifests.add(manifest.digest);
});
}
}
}

const manifestData = (await manifest.json()) as ManifestSchema;
// TODO: garbage collect manifests.
if ("manifests" in manifestData) {
return true;
for (const [key, manifests] of Object.entries(manifestList)) {
if (referencedManifests.has(key)) {
continue;
}
if (![...manifests].some((item) => !item.split("/").pop()?.startsWith("sha256:"))) {
// Add untagged manifest that should be removed
manifests.forEach((manifest) => {
manifestToRemove.add(manifest);
});
// Manifest to be removed shouldn't be parsed to search for referenced layers
delete manifestList[key];
}
}

// Deleting untagged manifest
if (manifestToRemove.size > 0) {
if (!(await this.checkIfGCCanContinue(options.name, mark))) {
throw new Error("there is a manifest insertion going, the garbage collection shall stop");
}

// GC will deleted untagged manifest
await this.registry.delete(manifestToRemove.values().toArray());
}
}

const referencedBlobs = new Set<string>();
// From manifest, extract referenced layers
for (const [_, manifests] of Object.entries(manifestList)) {
// Select only one manifest per unique manifest
const manifestPath = manifests.values().next().value;
if (manifestPath === undefined) {
continue;
}
const manifest = await this.registry.get(manifestPath);
// Skip if manifest not found
if (!manifest) continue;

const manifestData = (await manifest.json()) as ManifestSchema;

if (manifestData.schemaVersion === 1) {
manifestData.fsLayers.forEach((layer) => {
referencedBlobs.add(layer.blobSum);
});
} else {
// Skip manifest-list, they don't contain any layers references
if ("manifests" in manifestData) continue;
// Add referenced layers from current manifest
manifestData.layers.forEach((layer) => {
referencedBlobs.add(layer.digest);
});
// Add referenced config blob from current manifest
referencedBlobs.add(manifestData.config.digest);
}
}

const unreferencedBlobs = new Set<string>();
// List blobs to be removed
await this.list(`${options.name}/blobs/`, async (object) => {
const blobHash = object.key.split("/").pop();
if (blobHash && !referencedBlobs.has(blobHash)) {
unreferencedBlobs.add(object.key);
}
return true;
});

let unreferencedKeys: string[] = [];
const deleteThreshold = 15;
await this.list(`${options.name}/blobs/`, async (object) => {
const hash = object.key.split("/").pop();
if (hash && !referencedBlobs.has(hash)) {
unreferencedKeys.push(object.key);
if (unreferencedKeys.length > deleteThreshold) {
if (!(await this.checkIfGCCanContinue(options.name, mark))) {
throw new ServerError("there is a manifest insertion going, the garbage collection shall stop");
// Check for symlink before removal
if (unreferencedBlobs.size >= 0) {
await this.list("", async (object) => {
const objectPath = object.key;
// Skip non-blobs object and from any other repository (symlink only target cross repository blobs)
if (objectPath.startsWith(`${options.name}/`) || !objectPath.includes("/blobs/sha256:")) {
return true;
}
if (object.customMetadata && object.customMetadata[symlinkHeader] !== undefined) {
// Find symlink target
const manifest = await this.registry.get(object.key);
// Skip if manifest not found
if (!manifest) return true;
// Get symlink target
const symlinkTarget = await manifest.text();
if (unreferencedBlobs.has(symlinkTarget)) {
// This symlink target a layer that should be removed
unreferencedBlobs.delete(symlinkTarget);
}

await this.registry.delete(unreferencedKeys);
unreferencedKeys = [];
}
}
return true;
});
if (unreferencedKeys.length > 0) {
return unreferencedBlobs.size > 0;
});
}

if (unreferencedBlobs.size > 0) {
if (!(await this.checkIfGCCanContinue(options.name, mark))) {
throw new Error("there is a manifest insertion going, the garbage collection shall stop");
}

await this.registry.delete(unreferencedKeys);
// GC will delete unreferenced blobs
await this.registry.delete(unreferencedBlobs.values().toArray());
}

return true;
Expand Down
8 changes: 8 additions & 0 deletions src/registry/http.ts
Original file line number Diff line number Diff line change
Expand Up @@ -456,6 +456,14 @@ export class RegistryHTTPClient implements Registry {
}
}

mountExistingLayer(
_sourceName: string,
_digest: string,
_destinationName: string,
): Promise<RegistryError | FinishedUploadObject> {
throw new Error("unimplemented");
}

putManifest(
_namespace: string,
_reference: string,
Expand Down
68 changes: 63 additions & 5 deletions src/registry/r2.ts
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,8 @@ export async function encodeState(state: State, env: Env): Promise<{ jwt: string
return { jwt: jwtSignature, hash: await getSHA256(jwtSignature, "") };
}

export const symlinkHeader = "X-Serverless-Registry-Symlink";

export async function getUploadState(
name: string,
uploadId: string,
Expand Down Expand Up @@ -150,14 +152,12 @@ export class R2Registry implements Registry {
return { response: new ServerError("invalid checksum from R2 backend") };
}

const checkManifestResponse = {
return {
exists: true,
digest: hexToDigest(res.checksums.sha256!),
contentType: res.httpMetadata!.contentType!,
size: res.size,
};

return checkManifestResponse;
}

async listRepositories(limit?: number, last?: string): Promise<RegistryError | ListRepositoriesResponse> {
Expand Down Expand Up @@ -377,6 +377,52 @@ export class R2Registry implements Registry {
};
}

async mountExistingLayer(
sourceName: string,
digest: string,
destinationName: string,
): Promise<RegistryError | FinishedUploadObject> {
const sourceLayerPath = `${sourceName}/blobs/${digest}`;
const [res, err] = await wrap(this.env.REGISTRY.head(sourceLayerPath));
if (err) {
return wrapError("mountExistingLayer", err);
}
if (!res) {
return wrapError("mountExistingLayer", "Layer not found");
} else {
const destinationLayerPath = `${destinationName}/blobs/${digest}`;
if (sourceLayerPath === destinationLayerPath) {
// Bad request
throw new InternalError();
}
// Prevent recursive symlink
if (res.customMetadata && symlinkHeader in res.customMetadata) {
return await this.mountExistingLayer(res.customMetadata[symlinkHeader], digest, destinationName);
}
// Trying to mount a layer from sourceLayerPath to destinationLayerPath

// Create linked file with custom metadata
const [newFile, error] = await wrap(
this.env.REGISTRY.put(destinationLayerPath, sourceLayerPath, {
sha256: await getSHA256(sourceLayerPath, ""),
httpMetadata: res.httpMetadata,
customMetadata: { [symlinkHeader]: sourceName }, // Storing target repository name in metadata (to easily resolve recursive layer mounting)
}),
);
if (error) {
return wrapError("mountExistingLayer", error);
}
if (newFile && "response" in newFile) {
return wrapError("mountExistingLayer", newFile.response);
}

return {
digest: hexToDigest(res.checksums.sha256!),
location: `/v2/${destinationLayerPath}`,
};
}
}

async layerExists(name: string, tag: string): Promise<RegistryError | CheckLayerResponse> {
const [res, err] = await wrap(this.env.REGISTRY.head(`${name}/blobs/${tag}`));
if (err) {
Expand Down Expand Up @@ -408,6 +454,19 @@ export class R2Registry implements Registry {
};
}

// Handle R2 symlink
if (res.customMetadata && symlinkHeader in res.customMetadata) {
const layerPath = await res.text();
// Symlink detected! Will download layer from "layerPath"
const [linkName, linkDigest] = layerPath.split("/blobs/");
if (linkName == name && linkDigest == digest) {
return {
response: new Response(JSON.stringify(BlobUnknownError), { status: 404 }),
};
}
return await this.env.REGISTRY_CLIENT.getLayer(linkName, linkDigest);
}

return {
stream: res.body!,
digest: hexToDigest(res.checksums.sha256!),
Expand Down Expand Up @@ -751,7 +810,6 @@ export class R2Registry implements Registry {
}

async garbageCollection(namespace: string, mode: GarbageCollectionMode): Promise<boolean> {
const result = await this.gc.collect({ name: namespace, mode: mode });
return result;
return await this.gc.collect({ name: namespace, mode: mode });
}
}
7 changes: 7 additions & 0 deletions src/registry/registry.ts
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,13 @@ export interface Registry {
// gets the manifest by namespace + digest
getManifest(namespace: string, digest: string): Promise<GetManifestResponse | RegistryError>;

// mount an existing layer from a repository to another
mountExistingLayer(
sourceName: string,
digest: string,
destinationName: string,
): Promise<RegistryError | FinishedUploadObject>;

// checks that a layer exists
layerExists(namespace: string, digest: string): Promise<CheckLayerResponse | RegistryError>;

Expand Down
21 changes: 20 additions & 1 deletion src/router.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ v2Router.get("/_catalog", async (req, env: Env) => {
}),
{
headers: {
Link: `${url.protocol}//${url.hostname}${url.pathname}?n=${n ?? 1000}&last=${response.cursor ?? ""}; rel=next`,
"Link": `${url.protocol}//${url.hostname}${url.pathname}?n=${n ?? 1000}&last=${response.cursor ?? ""}; rel=next`,
"Content-Type": "application/json",
},
},
Expand Down Expand Up @@ -331,6 +331,25 @@ v2Router.delete("/:name+/blobs/uploads/:id", async (req, env: Env) => {
// this is the first thing that the client asks for in an upload
v2Router.post("/:name+/blobs/uploads/", async (req, env: Env) => {
const { name } = req.params;
const { from, mount } = req.query;
if (mount !== undefined && from !== undefined) {
// Try to create a new upload from an existing layer on another repository
const [finishedUploadObject, err] = await wrap<FinishedUploadObject | RegistryError, Error>(
env.REGISTRY_CLIENT.mountExistingLayer(from.toString(), mount.toString(), name),
);
// If there is an error, fallback to the default layer upload system
if (!(err || (finishedUploadObject && "response" in finishedUploadObject))) {
return new Response(null, {
status: 201,
headers: {
"Content-Length": "0",
"Location": finishedUploadObject.location,
"Docker-Content-Digest": finishedUploadObject.digest,
},
});
}
}
// Upload a new layer
const [uploadObject, err] = await wrap<UploadObject | RegistryError, Error>(env.REGISTRY_CLIENT.startUpload(name));

if (err) {
Expand Down
Loading