Skip to content

Commit

Permalink
Add functionality to load packages from ArtifactBundler
Browse files Browse the repository at this point in the history
  • Loading branch information
garrettgu10 committed Sep 5, 2024
1 parent 76fac69 commit d48817c
Show file tree
Hide file tree
Showing 9 changed files with 125 additions and 69 deletions.
55 changes: 44 additions & 11 deletions src/pyodide/internal/loadPackage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,21 @@
* that contains all the packages ready to go.
*/

import { default as LOCKFILE } from 'pyodide-internal:generated/pyodide-lock.json';
import { WORKERD_INDEX_URL } from 'pyodide-internal:metadata';
import {
SITE_PACKAGES,
WORKERD_INDEX_URL,
LOCKFILE,
LOAD_WHEELS_FROM_R2,
LOAD_WHEELS_FROM_ARTIFACT_BUNDLER,
PACKAGES_VERSION,
} from 'pyodide-internal:metadata';
import {
SITE_PACKAGES,
getSitePackagesPath,
} from 'pyodide-internal:setupPackages';
import { parseTarInfo } from 'pyodide-internal:tar';
import { default as DiskCache } from 'pyodide-internal:disk_cache';
import { createTarFS } from 'pyodide-internal:tarfs';
import { default as ArtifactBundler } from 'pyodide-internal:artifacts';

async function decompressArrayBuffer(
arrBuf: ArrayBuffer
Expand All @@ -33,13 +38,18 @@ async function decompressArrayBuffer(
}
}

async function loadBundle(requirement: string): Promise<[string, ArrayBuffer]> {
// loadBundleFromR2 loads the package from the internet (through fetch) and uses the DiskCache as
// a backing store. This is only used in local dev.
async function loadBundleFromR2(
requirement: string
): Promise<[string, Reader]> {
// first check if the disk cache has what we want
const filename = LOCKFILE['packages'][requirement]['file_name'];
const cached = DiskCache.get(filename);
if (cached) {
const decompressed = await decompressArrayBuffer(cached);
return [requirement, decompressed];
const reader = new ArrayBufferReader(decompressed);
return [requirement, reader];
}

// we didn't find it in the disk cache, continue with original fetch
Expand All @@ -50,7 +60,21 @@ async function loadBundle(requirement: string): Promise<[string, ArrayBuffer]> {
const decompressed = await decompressArrayBuffer(compressed);

DiskCache.put(filename, compressed);
return [requirement, decompressed];
const reader = new ArrayBufferReader(decompressed);
return [requirement, reader];
}

async function loadBundleFromArtifactBundler(
requirement: string
): Promise<[string, Reader]> {
const packagesVersion = PACKAGES_VERSION;
const filename = LOCKFILE['packages'][requirement]['file_name'];
const fullPath = 'python-package-bucket/' + packagesVersion + '/' + filename;
return new Promise((resolve) => {
ArtifactBundler.onPackageReceived(fullPath, (r: Reader) =>
resolve([requirement, r])
);
});
}

/**
Expand All @@ -73,9 +97,11 @@ class ArrayBufferReader {
}
}

export async function loadPackages(Module: Module, requirements: Set<string>) {
if (!LOAD_WHEELS_FROM_R2) return;

async function loadPackagesImpl(
Module: Module,
requirements: Set<string>,
loadBundle: (req: string) => Promise<[string, Reader]>
) {
let loadPromises = [];
let loading = [];
for (const req of requirements) {
Expand All @@ -87,8 +113,7 @@ export async function loadPackages(Module: Module, requirements: Set<string>) {
console.log('Loading ' + loading.join(', '));

const buffers = await Promise.all(loadPromises);
for (const [requirement, buffer] of buffers) {
const reader = new ArrayBufferReader(buffer);
for (const [requirement, reader] of buffers) {
const [tarInfo, soFiles] = parseTarInfo(reader);
SITE_PACKAGES.addSmallBundle(tarInfo, soFiles, requirement);
}
Expand All @@ -100,3 +125,11 @@ export async function loadPackages(Module: Module, requirements: Set<string>) {
const info = SITE_PACKAGES.rootInfo;
Module.FS.mount(tarFS, { info }, path);
}

export async function loadPackages(Module: Module, requirements: Set<string>) {
if (LOAD_WHEELS_FROM_R2) {
await loadPackagesImpl(Module, requirements, loadBundleFromR2);
} else if (LOAD_WHEELS_FROM_ARTIFACT_BUNDLER) {
await loadPackagesImpl(Module, requirements, loadBundleFromArtifactBundler);
}
}
8 changes: 7 additions & 1 deletion src/pyodide/internal/metadata.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import { default as MetadataReader } from 'pyodide-internal:runtime-generated/metadata';
export { default as LOCKFILE } from 'pyodide-internal:generated/pyodide-lock.json';
import { default as PYODIDE_BUCKET } from 'pyodide-internal:generated/pyodide-bucket.json';
import { default as ArtifactBundler } from 'pyodide-internal:artifacts';

Expand All @@ -9,6 +8,13 @@ export const SHOULD_SNAPSHOT_TO_DISK = MetadataReader.shouldSnapshotToDisk();
export const IS_CREATING_BASELINE_SNAPSHOT =
MetadataReader.isCreatingBaselineSnapshot();
export const WORKERD_INDEX_URL = PYODIDE_BUCKET.PYODIDE_PACKAGE_BUCKET_URL;
export const LOAD_WHEELS_FROM_R2: boolean = IS_WORKERD;
export const LOAD_WHEELS_FROM_ARTIFACT_BUNDLER =
MetadataReader.shouldUsePackagesInArtifactBundler();
export const PACKAGES_VERSION = MetadataReader.getPackagesVersion();
export const LOCKFILE: PackageLock = JSON.parse(
MetadataReader.getPackagesLock()
);
export const REQUIREMENTS = MetadataReader.getRequirements();
export const MAIN_MODULE_NAME = MetadataReader.getMainModule();
export const MEMORY_SNAPSHOT_READER = MetadataReader.hasMemorySnapshot()
Expand Down
26 changes: 12 additions & 14 deletions src/pyodide/internal/setupPackages.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
import { parseTarInfo } from 'pyodide-internal:tar';
import { createTarFS } from 'pyodide-internal:tarfs';
import { createMetadataFS } from 'pyodide-internal:metadatafs';
import { default as LOCKFILE } from 'pyodide-internal:generated/pyodide-lock.json';
import { REQUIREMENTS, WORKERD_INDEX_URL } from 'pyodide-internal:metadata';
import {
REQUIREMENTS,
LOAD_WHEELS_FROM_R2,
LOCKFILE,
LOAD_WHEELS_FROM_ARTIFACT_BUNDLER,
} from 'pyodide-internal:metadata';
import { simpleRunPython } from 'pyodide-internal:util';

const canonicalizeNameRegex = /[-_.]+/g;
Expand Down Expand Up @@ -122,22 +126,18 @@ class SitePackagesDir {
* This also returns the list of soFiles in the resulting site-packages
* directory so we can preload them.
*/
export function buildSitePackages(
requirements: Set<string>
): [SitePackagesDir, boolean] {
export function buildSitePackages(requirements: Set<string>): SitePackagesDir {
const [bigTarInfo, bigTarSoFiles] = parseTarInfo();

let LOAD_WHEELS_FROM_R2 = true;
let requirementsInBigBundle = new Set([...STDLIB_PACKAGES]);
if (bigTarInfo.children!.size > 10) {
LOAD_WHEELS_FROM_R2 = false;
if (!LOAD_WHEELS_FROM_R2 && !LOAD_WHEELS_FROM_ARTIFACT_BUNDLER) {
requirements.forEach((r) => requirementsInBigBundle.add(r));
}

const res = new SitePackagesDir();
res.addBigBundle(bigTarInfo, bigTarSoFiles, requirementsInBigBundle);

return [res, LOAD_WHEELS_FROM_R2];
return res;
}

/**
Expand Down Expand Up @@ -188,8 +188,8 @@ export function mountLib(Module: Module, info: TarFSInfo): void {
const site_packages = getSitePackagesPath(Module);
Module.FS.mkdirTree(site_packages);
Module.FS.mkdirTree('/session/metadata');
if (!LOAD_WHEELS_FROM_R2) {
// if we are not loading additional wheels from R2, then we're done
if (!LOAD_WHEELS_FROM_R2 && !LOAD_WHEELS_FROM_ARTIFACT_BUNDLER) {
// if we are not loading additional wheels, then we're done
// with site-packages and we can mount it here. Otherwise, we must mount it in
// loadPackages().
Module.FS.mount(tarFS, { info }, site_packages);
Expand Down Expand Up @@ -253,6 +253,4 @@ function addPackageToLoad(

export { REQUIREMENTS };
export const TRANSITIVE_REQUIREMENTS = getTransitiveRequirements();
export const [SITE_PACKAGES, LOAD_WHEELS_FROM_R2] = buildSitePackages(
TRANSITIVE_REQUIREMENTS
);
export const SITE_PACKAGES = buildSitePackages(TRANSITIVE_REQUIREMENTS);
19 changes: 3 additions & 16 deletions src/pyodide/python-entrypoint-helper.ts
Original file line number Diff line number Diff line change
Expand Up @@ -158,24 +158,13 @@ const handlers: {
} = {};

try {
// Do not setup anything to do with Python in the global scope when tracing. The Jaeger tracing
// needs to be called inside an IO context.
if (!IS_TRACING) {
if (IS_WORKERD) {
// If we're in workerd, we have to do setupPackages in the IoContext, so don't start it yet.
// TODO: fix this.
await getPyodide();
} else {
// If we're not in workerd, setupPackages doesn't require IO so we can do it all here.
await getMainModule();
}
}
// We have to do setupPackages in the IoContext, so don't start it yet.
await getPyodide();

if (IS_WORKERD || IS_TRACING) {
handlers.fetch = makeHandler('on_fetch');
handlers.test = makeHandler('test');
} else {
const mainModule = await getMainModule();
for (const handlerName of [
'fetch',
'alarm',
Expand All @@ -185,9 +174,7 @@ try {
'pubsub',
]) {
const pyHandlerName = 'on_' + handlerName;
if (typeof mainModule[pyHandlerName] === 'function') {
handlers[handlerName] = makeHandler(pyHandlerName);
}
handlers[handlerName] = makeHandler(pyHandlerName);
}
}
/**
Expand Down
1 change: 1 addition & 0 deletions src/pyodide/types/artifacts.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ declare namespace ArtifactBundler {
const getMemorySnapshotSize: () => number;
const disposeMemorySnapshot: () => void;
const storeMemorySnapshot: (snap: MemorySnapshotResult) => void;
const onPackageReceived: (path: string, resolve: (r: Reader) => void) => void;
}

export default ArtifactBundler;
5 changes: 0 additions & 5 deletions src/pyodide/types/pyodide-lock.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,3 @@ interface PackageLock {
[id: string]: PackageDeclaration;
};
}

declare module 'pyodide-internal:generated/pyodide-lock.json' {
const lock: PackageLock;
export default lock;
}
3 changes: 3 additions & 0 deletions src/pyodide/types/runtime-generated/metadata.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ declare namespace MetadataReader {
) => void;
const getMemorySnapshotSize: () => number;
const disposeMemorySnapshot: () => void;
const shouldUsePackagesInArtifactBundler: () => boolean;
const getPackagesVersion: () => string;
const getPackagesLock: () => string;
const read: (index: number, position: number, buffer: Uint8Array) => number;
}

Expand Down
34 changes: 20 additions & 14 deletions src/workerd/api/pyodide/pyodide.c++
Original file line number Diff line number Diff line change
Expand Up @@ -161,10 +161,13 @@ jsg::Ref<PyodideMetadataReader> makePyodideMetadataReader(
names.finish(),
contents.finish(),
requirements.finish(),
kj::str("20240829.4"), // TODO: hardcoded version & lock
kj::str(PYODIDE_LOCK.toString()),
true /* isWorkerd */,
false /* isTracing */,
snapshotToDisk,
createBaselineSnapshot,
false, /* usePackagesInArtifactBundler */
kj::none /* memorySnapshot */
);
// clang-format on
Expand Down Expand Up @@ -212,7 +215,7 @@ bool hasPythonModules(capnp::List<server::config::Worker::Module>::Reader module
}

void PackagePromiseMap::insert(
kj::String path, kj::Promise<kj::Own<SmallPackagesTarReader>> promise) {
kj::String path, kj::Promise<kj::ArrayPtr<const unsigned char>> promise) {
KJ_ASSERT(waitlists.find(kj::str(path)) == kj::none);
waitlists.insert(kj::str(path), CrossThreadWaitList());

Expand All @@ -224,29 +227,32 @@ void PackagePromiseMap::insert(
fetchedPackages.insert(kj::mv(path), kj::mv(a));
waitlist.fulfill();
} else {
JSG_FAIL_REQUIRE(Error, "Failed to get waitlist for package", path);
JSG_FAIL_REQUIRE(Error, "Failed to get waitlist for package: ", path);
}
}).detach([](kj::Exception&& exception) {
JSG_FAIL_REQUIRE(Error, "Failed to get package", exception);
JSG_FAIL_REQUIRE(Error, "Failed to get package: ", exception);
});
}

kj::Promise<kj::Own<SmallPackagesTarReader>> PackagePromiseMap::getPromise(kj::StringPtr path) {
void PackagePromiseMap::onPackageReceived(jsg::Lock& lock,
kj::String path,
jsg::Function<void(jsg::Ref<SmallPackagesTarReader>)> resolve) {
auto maybeWaitlist = waitlists.find(path);

KJ_IF_SOME(waitlist, maybeWaitlist) {
co_await waitlist.addWaiter();
auto maybeEntry = fetchedPackages.findEntry(path);
KJ_IF_SOME(entryRef, maybeEntry) {
auto entry = fetchedPackages.release(entryRef);

co_return kj::mv(entry.value);
} else {
JSG_FAIL_REQUIRE(Error, "Failed to get package when trying to get promise", path);
}
IoContext::current().awaitIo(lock, waitlist.addWaiter(),
[this, path = kj::mv(path), resolve = kj::mv(resolve)](jsg::Lock& js) mutable {
auto maybeEntry = fetchedPackages.findEntry(path);
KJ_IF_SOME(entry, maybeEntry) {
resolve(js, jsg::alloc<SmallPackagesTarReader>(entry.value));
} else {
JSG_FAIL_REQUIRE(Error, "Failed to get package after waitlist fulfilled: ", path);
}
});

} else {
JSG_FAIL_REQUIRE(Error, "Failed to get waitlist for package when trying to get promise", path);
JSG_FAIL_REQUIRE(
Error, "Failed to get waitlist for package when trying to get promise: ", path);
}
}

Expand Down
Loading

0 comments on commit d48817c

Please sign in to comment.