Skip to content

Commit 136562c

Browse files
authored
Add support for layer mounting + garbage collector refactor (#92)
* Add support for layer mounting cross repository * Refacto garbage collector selector: - Fix config blobs were delete when referenced - Add manifest-list support for multi-arch images - Add layer mounting support cross-repository * Upgrade tests with random blobs data + config blob different from layer blob * Add test for manifest list and garbage collector * Reduce number of tags for more vitest stability * Prevent recursive symlink * Fix typescript error for registry.list include option in garbage-collector.ts * Change typo to camelCase * Change typo to camelCase * Add mountExistingLayer to RegistryHTTPClient to satisfy Registry interface * Update symlinkHeader name * Update variable naming + add symlink filter optimisation
1 parent e92b4c9 commit 136562c

File tree

7 files changed

+761
-56
lines changed

7 files changed

+761
-56
lines changed

src/registry/garbage-collector.ts

+112-30
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,9 @@
22
// Unreferenced will delete all blobs that are not referenced by any manifest.
33
// Untagged will delete all blobs that are not referenced by any manifest and are not tagged.
44

5-
import { ServerError } from "../errors";
65
import { ManifestSchema } from "../manifest";
6+
import { hexToDigest } from "../user";
7+
import {symlinkHeader} from "./r2";
78

89
export type GarbageCollectionMode = "unreferenced" | "untagged";
910
export type GCOptions = {
@@ -147,7 +148,7 @@ export class GarbageCollector {
147148
}
148149

149150
private async list(prefix: string, callback: (object: R2Object) => Promise<boolean>): Promise<boolean> {
150-
const listed = await this.registry.list({ prefix });
151+
const listed = await this.registry.list({ prefix: prefix, include: ["customMetadata"] });
151152
for (const object of listed.objects) {
152153
if ((await callback(object)) === false) {
153154
return false;
@@ -182,61 +183,142 @@ export class GarbageCollector {
182183

183184
private async collectInner(options: GCOptions): Promise<boolean> {
184185
// We can run out of memory, this should be a bloom filter
185-
let referencedBlobs = new Set<string>();
186+
const manifestList: { [key: string]: Set<string> } = {};
186187
const mark = await this.getInsertionMark(options.name);
187188

189+
// List manifest from repo to be scanned
188190
await this.list(`${options.name}/manifests/`, async (manifestObject) => {
189-
const tag = manifestObject.key.split("/").pop();
190-
if (!tag || (options.mode === "untagged" && tag.startsWith("sha256:"))) {
191-
return true;
191+
const currentHashFile = hexToDigest(manifestObject.checksums.sha256!);
192+
if (manifestList[currentHashFile] === undefined) {
193+
manifestList[currentHashFile] = new Set<string>();
192194
}
193-
const manifest = await this.registry.get(manifestObject.key);
194-
if (!manifest) {
195-
return true;
195+
manifestList[currentHashFile].add(manifestObject.key);
196+
return true;
197+
});
198+
199+
// In untagged mode, search for manifest to delete
200+
if (options.mode === "untagged") {
201+
const manifestToRemove = new Set<string>();
202+
const referencedManifests = new Set<string>();
203+
// List tagged manifest to find manifest-list
204+
for (const [_, manifests] of Object.entries(manifestList)) {
205+
const taggedManifest = [...manifests].filter((item) => !item.split("/").pop()?.startsWith("sha256:"));
206+
for (const manifestPath of taggedManifest) {
207+
// Tagged manifest some, load manifest content
208+
const manifest = await this.registry.get(manifestPath);
209+
if (!manifest) {
210+
continue;
211+
}
212+
213+
const manifestData = (await manifest.json()) as ManifestSchema;
214+
// Search for manifest list
215+
if (manifestData.schemaVersion == 2 && "manifests" in manifestData) {
216+
// Extract referenced manifests from manifest list
217+
manifestData.manifests.forEach((manifest) => {
218+
referencedManifests.add(manifest.digest);
219+
});
220+
}
221+
}
196222
}
197223

198-
const manifestData = (await manifest.json()) as ManifestSchema;
199-
// TODO: garbage collect manifests.
200-
if ("manifests" in manifestData) {
201-
return true;
224+
for (const [key, manifests] of Object.entries(manifestList)) {
225+
if (referencedManifests.has(key)) {
226+
continue;
227+
}
228+
if (![...manifests].some((item) => !item.split("/").pop()?.startsWith("sha256:"))) {
229+
// Add untagged manifest that should be removed
230+
manifests.forEach((manifest) => {
231+
manifestToRemove.add(manifest);
232+
});
233+
// Manifest to be removed shouldn't be parsed to search for referenced layers
234+
delete manifestList[key];
235+
}
236+
}
237+
238+
// Deleting untagged manifest
239+
if (manifestToRemove.size > 0) {
240+
if (!(await this.checkIfGCCanContinue(options.name, mark))) {
241+
throw new Error("there is a manifest insertion going, the garbage collection shall stop");
242+
}
243+
244+
// GC will deleted untagged manifest
245+
await this.registry.delete(manifestToRemove.values().toArray());
246+
}
247+
}
248+
249+
const referencedBlobs = new Set<string>();
250+
// From manifest, extract referenced layers
251+
for (const [_, manifests] of Object.entries(manifestList)) {
252+
// Select only one manifest per unique manifest
253+
const manifestPath = manifests.values().next().value;
254+
if (manifestPath === undefined) {
255+
continue;
202256
}
257+
const manifest = await this.registry.get(manifestPath);
258+
// Skip if manifest not found
259+
if (!manifest) continue;
260+
261+
const manifestData = (await manifest.json()) as ManifestSchema;
203262

204263
if (manifestData.schemaVersion === 1) {
205264
manifestData.fsLayers.forEach((layer) => {
206265
referencedBlobs.add(layer.blobSum);
207266
});
208267
} else {
268+
// Skip manifest-list, they don't contain any layers references
269+
if ("manifests" in manifestData) continue;
270+
// Add referenced layers from current manifest
209271
manifestData.layers.forEach((layer) => {
210272
referencedBlobs.add(layer.digest);
211273
});
274+
// Add referenced config blob from current manifest
275+
referencedBlobs.add(manifestData.config.digest);
212276
}
277+
}
213278

279+
const unreferencedBlobs = new Set<string>();
280+
// List blobs to be removed
281+
await this.list(`${options.name}/blobs/`, async (object) => {
282+
const blobHash = object.key.split("/").pop();
283+
if (blobHash && !referencedBlobs.has(blobHash)) {
284+
unreferencedBlobs.add(object.key);
285+
}
214286
return true;
215287
});
216288

217-
let unreferencedKeys: string[] = [];
218-
const deleteThreshold = 15;
219-
await this.list(`${options.name}/blobs/`, async (object) => {
220-
const hash = object.key.split("/").pop();
221-
if (hash && !referencedBlobs.has(hash)) {
222-
unreferencedKeys.push(object.key);
223-
if (unreferencedKeys.length > deleteThreshold) {
224-
if (!(await this.checkIfGCCanContinue(options.name, mark))) {
225-
throw new ServerError("there is a manifest insertion going, the garbage collection shall stop");
289+
// Check for symlink before removal
290+
if (unreferencedBlobs.size >= 0) {
291+
await this.list("", async (object) => {
292+
const objectPath = object.key;
293+
// Skip non-blobs object and from any other repository (symlink only target cross repository blobs)
294+
if (objectPath.startsWith(`${options.name}/`) || !objectPath.includes("/blobs/sha256:")) {
295+
return true;
296+
}
297+
if (object.customMetadata && object.customMetadata[symlinkHeader] !== undefined) {
298+
// Check if the symlink target the current GC repository
299+
if (object.customMetadata[symlinkHeader] !== options.name) return true;
300+
// Get symlink blob to retrieve its target
301+
const symlinkBlob = await this.registry.get(object.key);
302+
// Skip if symlinkBlob not found
303+
if (!symlinkBlob) return true;
304+
// Get the path of the target blob from the symlink blob
305+
const targetBlobPath = await symlinkBlob.text();
306+
if (unreferencedBlobs.has(targetBlobPath)) {
307+
// This symlink target a layer that should be removed
308+
unreferencedBlobs.delete(targetBlobPath);
226309
}
227-
228-
await this.registry.delete(unreferencedKeys);
229-
unreferencedKeys = [];
230310
}
231-
}
232-
return true;
233-
});
234-
if (unreferencedKeys.length > 0) {
311+
return unreferencedBlobs.size > 0;
312+
});
313+
}
314+
315+
if (unreferencedBlobs.size > 0) {
235316
if (!(await this.checkIfGCCanContinue(options.name, mark))) {
236317
throw new Error("there is a manifest insertion going, the garbage collection shall stop");
237318
}
238319

239-
await this.registry.delete(unreferencedKeys);
320+
// GC will delete unreferenced blobs
321+
await this.registry.delete(unreferencedBlobs.values().toArray());
240322
}
241323

242324
return true;

src/registry/http.ts

+8
Original file line numberDiff line numberDiff line change
@@ -456,6 +456,14 @@ export class RegistryHTTPClient implements Registry {
456456
}
457457
}
458458

459+
mountExistingLayer(
460+
_sourceName: string,
461+
_digest: string,
462+
_destinationName: string,
463+
): Promise<RegistryError | FinishedUploadObject> {
464+
throw new Error("unimplemented");
465+
}
466+
459467
putManifest(
460468
_namespace: string,
461469
_reference: string,

src/registry/r2.ts

+63-5
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,8 @@ export async function encodeState(state: State, env: Env): Promise<{ jwt: string
101101
return { jwt: jwtSignature, hash: await getSHA256(jwtSignature, "") };
102102
}
103103

104+
export const symlinkHeader = "X-Serverless-Registry-Symlink";
105+
104106
export async function getUploadState(
105107
name: string,
106108
uploadId: string,
@@ -150,14 +152,12 @@ export class R2Registry implements Registry {
150152
return { response: new ServerError("invalid checksum from R2 backend") };
151153
}
152154

153-
const checkManifestResponse = {
155+
return {
154156
exists: true,
155157
digest: hexToDigest(res.checksums.sha256!),
156158
contentType: res.httpMetadata!.contentType!,
157159
size: res.size,
158160
};
159-
160-
return checkManifestResponse;
161161
}
162162

163163
async listRepositories(limit?: number, last?: string): Promise<RegistryError | ListRepositoriesResponse> {
@@ -377,6 +377,52 @@ export class R2Registry implements Registry {
377377
};
378378
}
379379

380+
async mountExistingLayer(
381+
sourceName: string,
382+
digest: string,
383+
destinationName: string,
384+
): Promise<RegistryError | FinishedUploadObject> {
385+
const sourceLayerPath = `${sourceName}/blobs/${digest}`;
386+
const [res, err] = await wrap(this.env.REGISTRY.head(sourceLayerPath));
387+
if (err) {
388+
return wrapError("mountExistingLayer", err);
389+
}
390+
if (!res) {
391+
return wrapError("mountExistingLayer", "Layer not found");
392+
} else {
393+
const destinationLayerPath = `${destinationName}/blobs/${digest}`;
394+
if (sourceLayerPath === destinationLayerPath) {
395+
// Bad request
396+
throw new InternalError();
397+
}
398+
// Prevent recursive symlink
399+
if (res.customMetadata && symlinkHeader in res.customMetadata) {
400+
return await this.mountExistingLayer(res.customMetadata[symlinkHeader], digest, destinationName);
401+
}
402+
// Trying to mount a layer from sourceLayerPath to destinationLayerPath
403+
404+
// Create linked file with custom metadata
405+
const [newFile, error] = await wrap(
406+
this.env.REGISTRY.put(destinationLayerPath, sourceLayerPath, {
407+
sha256: await getSHA256(sourceLayerPath, ""),
408+
httpMetadata: res.httpMetadata,
409+
customMetadata: { [symlinkHeader]: sourceName }, // Storing target repository name in metadata (to easily resolve recursive layer mounting)
410+
}),
411+
);
412+
if (error) {
413+
return wrapError("mountExistingLayer", error);
414+
}
415+
if (newFile && "response" in newFile) {
416+
return wrapError("mountExistingLayer", newFile.response);
417+
}
418+
419+
return {
420+
digest: hexToDigest(res.checksums.sha256!),
421+
location: `/v2/${destinationLayerPath}`,
422+
};
423+
}
424+
}
425+
380426
async layerExists(name: string, tag: string): Promise<RegistryError | CheckLayerResponse> {
381427
const [res, err] = await wrap(this.env.REGISTRY.head(`${name}/blobs/${tag}`));
382428
if (err) {
@@ -408,6 +454,19 @@ export class R2Registry implements Registry {
408454
};
409455
}
410456

457+
// Handle R2 symlink
458+
if (res.customMetadata && symlinkHeader in res.customMetadata) {
459+
const layerPath = await res.text();
460+
// Symlink detected! Will download layer from "layerPath"
461+
const [linkName, linkDigest] = layerPath.split("/blobs/");
462+
if (linkName == name && linkDigest == digest) {
463+
return {
464+
response: new Response(JSON.stringify(BlobUnknownError), { status: 404 }),
465+
};
466+
}
467+
return await this.env.REGISTRY_CLIENT.getLayer(linkName, linkDigest);
468+
}
469+
411470
return {
412471
stream: res.body!,
413472
digest: hexToDigest(res.checksums.sha256!),
@@ -751,7 +810,6 @@ export class R2Registry implements Registry {
751810
}
752811

753812
async garbageCollection(namespace: string, mode: GarbageCollectionMode): Promise<boolean> {
754-
const result = await this.gc.collect({ name: namespace, mode: mode });
755-
return result;
813+
return await this.gc.collect({ name: namespace, mode: mode });
756814
}
757815
}

src/registry/registry.ts

+7
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,13 @@ export interface Registry {
114114
// gets the manifest by namespace + digest
115115
getManifest(namespace: string, digest: string): Promise<GetManifestResponse | RegistryError>;
116116

117+
// mount an existing layer from a repository to another
118+
mountExistingLayer(
119+
sourceName: string,
120+
digest: string,
121+
destinationName: string,
122+
): Promise<RegistryError | FinishedUploadObject>;
123+
117124
// checks that a layer exists
118125
layerExists(namespace: string, digest: string): Promise<CheckLayerResponse | RegistryError>;
119126

src/router.ts

+20-1
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ v2Router.get("/_catalog", async (req, env: Env) => {
4242
}),
4343
{
4444
headers: {
45-
Link: `${url.protocol}//${url.hostname}${url.pathname}?n=${n ?? 1000}&last=${response.cursor ?? ""}; rel=next`,
45+
"Link": `${url.protocol}//${url.hostname}${url.pathname}?n=${n ?? 1000}&last=${response.cursor ?? ""}; rel=next`,
4646
"Content-Type": "application/json",
4747
},
4848
},
@@ -331,6 +331,25 @@ v2Router.delete("/:name+/blobs/uploads/:id", async (req, env: Env) => {
331331
// this is the first thing that the client asks for in an upload
332332
v2Router.post("/:name+/blobs/uploads/", async (req, env: Env) => {
333333
const { name } = req.params;
334+
const { from, mount } = req.query;
335+
if (mount !== undefined && from !== undefined) {
336+
// Try to create a new upload from an existing layer on another repository
337+
const [finishedUploadObject, err] = await wrap<FinishedUploadObject | RegistryError, Error>(
338+
env.REGISTRY_CLIENT.mountExistingLayer(from.toString(), mount.toString(), name),
339+
);
340+
// If there is an error, fallback to the default layer upload system
341+
if (!(err || (finishedUploadObject && "response" in finishedUploadObject))) {
342+
return new Response(null, {
343+
status: 201,
344+
headers: {
345+
"Content-Length": "0",
346+
"Location": finishedUploadObject.location,
347+
"Docker-Content-Digest": finishedUploadObject.digest,
348+
},
349+
});
350+
}
351+
}
352+
// Upload a new layer
334353
const [uploadObject, err] = await wrap<UploadObject | RegistryError, Error>(env.REGISTRY_CLIENT.startUpload(name));
335354

336355
if (err) {

0 commit comments

Comments
 (0)