Skip to content

Commit

Permalink
Replace PackagePromiseMap with immediately returning the package from…
Browse files Browse the repository at this point in the history
… the PackageManager. Flyby small refactor removing SmallPackagesTarReader
  • Loading branch information
garrettgu10 committed Sep 9, 2024
1 parent 33e8288 commit a8c297f
Show file tree
Hide file tree
Showing 5 changed files with 46 additions and 101 deletions.
10 changes: 6 additions & 4 deletions src/pyodide/internal/loadPackage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -70,11 +70,13 @@ async function loadBundleFromArtifactBundler(
const packagesVersion = PACKAGES_VERSION;
const filename = LOCKFILE['packages'][requirement]['file_name'];
const fullPath = 'python-package-bucket/' + packagesVersion + '/' + filename;
return new Promise((resolve) => {
ArtifactBundler.onPackageReceived(fullPath, (r: Reader) =>
resolve([requirement, r])
const reader = ArtifactBundler.getPackage(fullPath);
if (!reader)
throw new Error(
'Failed to get package ' + fullPath + ' from ArtifactBundler'
);
});
return Promise.resolve([requirement, reader]);
// ^ this is okay to do during startup since it resolves immediately
}

/**
Expand Down
18 changes: 16 additions & 2 deletions src/pyodide/python-entrypoint-helper.ts
Original file line number Diff line number Diff line change
Expand Up @@ -158,13 +158,24 @@ const handlers: {
} = {};

try {
// We have to do setupPackages in the IoContext, so don't start it yet.
await getPyodide();
// Do not setup anything to do with Python in the global scope when tracing. The Jaeger tracing
// needs to be called inside an IO context.
if (!IS_TRACING) {
if (IS_WORKERD) {
// If we're in workerd, we have to do setupPackages in the IoContext, so don't start it yet.
// TODO: fix this.
await getPyodide();
} else {
// If we're not in workerd, setupPackages doesn't require IO so we can do it all here.
await getMainModule();
}
}

if (IS_WORKERD || IS_TRACING) {
handlers.fetch = makeHandler('on_fetch');
handlers.test = makeHandler('test');
} else {
const mainModule = await getMainModule();
for (const handlerName of [
'fetch',
'alarm',
Expand All @@ -174,6 +185,9 @@ try {
'pubsub',
]) {
const pyHandlerName = 'on_' + handlerName;
if (typeof mainModule[pyHandlerName] === 'function') {
handlers[handlerName] = makeHandler(pyHandlerName);
}
handlers[handlerName] = makeHandler(pyHandlerName);
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/pyodide/types/artifacts.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ declare namespace ArtifactBundler {
const getMemorySnapshotSize: () => number;
const disposeMemorySnapshot: () => void;
const storeMemorySnapshot: (snap: MemorySnapshotResult) => void;
const onPackageReceived: (path: string, resolve: (r: Reader) => void) => void;
const getPackage: (path: string) => Reader | null;
}

export default ArtifactBundler;
46 changes: 0 additions & 46 deletions src/workerd/api/pyodide/pyodide.c++
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,6 @@ static int readToTarget(
}

int PackagesTarReader::read(jsg::Lock& js, int offset, kj::Array<kj::byte> buf) {
return readToTarget(PYODIDE_PACKAGES_TAR.get(), offset, buf);
}

int SmallPackagesTarReader::read(jsg::Lock& js, int offset, kj::Array<kj::byte> buf) {
return readToTarget(source, offset, buf);
}

Expand Down Expand Up @@ -214,46 +210,4 @@ bool hasPythonModules(capnp::List<server::config::Worker::Module>::Reader module
return false;
}

void PackagePromiseMap::insert(
kj::String path, kj::Promise<kj::ArrayPtr<const unsigned char>> promise) {
KJ_ASSERT(waitlists.find(kj::str(path)) == kj::none);
waitlists.insert(kj::str(path), CrossThreadWaitList());

promise
.then([this, p = kj::mv(path)](auto a) {
auto path = kj::str(p);
auto maybeWaitlist = waitlists.find(p);
KJ_IF_SOME(waitlist, maybeWaitlist) {
fetchedPackages.insert(kj::mv(path), kj::mv(a));
waitlist.fulfill();
} else {
JSG_FAIL_REQUIRE(Error, "Failed to get waitlist for package: ", path);
}
}).detach([](kj::Exception&& exception) {
JSG_FAIL_REQUIRE(Error, "Failed to get package: ", exception);
});
}

void PackagePromiseMap::onPackageReceived(jsg::Lock& lock,
kj::String path,
jsg::Function<void(jsg::Ref<SmallPackagesTarReader>)> resolve) {
auto maybeWaitlist = waitlists.find(path);

KJ_IF_SOME(waitlist, maybeWaitlist) {
IoContext::current().awaitIo(lock, waitlist.addWaiter(),
[this, path = kj::mv(path), resolve = kj::mv(resolve)](jsg::Lock& js) mutable {
auto maybeEntry = fetchedPackages.findEntry(path);
KJ_IF_SOME(entry, maybeEntry) {
resolve(js, jsg::alloc<SmallPackagesTarReader>(entry.value));
} else {
JSG_FAIL_REQUIRE(Error, "Failed to get package after waitlist fulfilled: ", path);
}
});

} else {
JSG_FAIL_REQUIRE(
Error, "Failed to get waitlist for package when trying to get promise: ", path);
}
}

} // namespace workerd::api::pyodide
71 changes: 23 additions & 48 deletions src/workerd/api/pyodide/pyodide.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,10 @@ struct PythonConfig {
// A function to read a segment of the tar file into a buffer
// Set up this way to avoid copying files that aren't accessed.
class PackagesTarReader: public jsg::Object {
kj::ArrayPtr<const kj::byte> source;

public:
PackagesTarReader() = default;
PackagesTarReader(kj::ArrayPtr<const kj::byte> src = PYODIDE_PACKAGES_TAR.get()): source(src) {};

int read(jsg::Lock& js, int offset, kj::Array<kj::byte> buf);

Expand Down Expand Up @@ -191,61 +193,30 @@ class PyodideMetadataReader: public jsg::Object {
}
};

// Similar to PackagesTarReader, but reads from a dynamic buffer rather than a buffer linked into
// the binary
class SmallPackagesTarReader: public jsg::Object {
kj::ArrayPtr<const kj::byte> source;

public:
SmallPackagesTarReader(kj::ArrayPtr<const kj::byte> src): source(src) {};

int read(jsg::Lock& js, int offset, kj::Array<kj::byte> buf);

JSG_RESOURCE_TYPE(SmallPackagesTarReader) {
JSG_METHOD(read);
}
};

struct MemorySnapshotResult {
kj::Array<kj::byte> snapshot;
kj::Array<kj::String> importedModulesList;
JSG_STRUCT(snapshot, importedModulesList);
};

// Before a Pyodide isolate starts up, all its packages begin loading. This struct provides an
// interface for the Pyodide startup code to request promises for each package that was loaded.
class PackagePromiseMap {
// This implementation is complex because we're not allowed to store kj::Promises on an object
// stored on the V8 heap. Ideally, we'd like to simply store a map from package paths to Promises.
// Instead, we store a CrossThreadWaitList and get a new Promise from the waitlist as needed.
kj::HashMap<kj::String, kj::ArrayPtr<const unsigned char>> fetchedPackages;
kj::HashMap<kj::String, CrossThreadWaitList> waitlists;

public:
void insert(kj::String path, kj::Promise<kj::ArrayPtr<const unsigned char>> promise);

void onPackageReceived(jsg::Lock& lock,
kj::String path,
jsg::Function<void(jsg::Ref<SmallPackagesTarReader>)> resolve);
};

// A loaded bundle of artifacts for a particular script id. It can also contain V8 version and
// CPU architecture-specific artifacts. The logic for loading these is in getArtifacts.
class ArtifactBundler: public jsg::Object {
public:
kj::Maybe<const PyodidePackageManager&> packageManager;
// ^ lifetime should be static since there is normally one worker set for the whole process. see worker-set.h
kj::Maybe<MemorySnapshotResult> storedSnapshot;
kj::Own<PackagePromiseMap> loadedPackages;

ArtifactBundler(kj::Own<PackagePromiseMap> loadedPackages,
kj::Maybe<kj::Array<kj::byte>> existingSnapshot = kj::none)
: storedSnapshot(kj::none),
loadedPackages(kj::mv(loadedPackages)),
ArtifactBundler(kj::Maybe<const PyodidePackageManager&> packageManager,
kj::Maybe<kj::Array<kj::byte>> existingSnapshot)
: packageManager(packageManager),
storedSnapshot(kj::none),
existingSnapshot(kj::mv(existingSnapshot)),
isValidating(false) {};

ArtifactBundler(bool isValidating = false)
: storedSnapshot(kj::none),
loadedPackages(kj::heap<PackagePromiseMap>()),
: packageManager(kj::none),
storedSnapshot(kj::none),
existingSnapshot(kj::none),
isValidating(isValidating) {};

Expand Down Expand Up @@ -276,8 +247,8 @@ class ArtifactBundler: public jsg::Object {
}

static jsg::Ref<ArtifactBundler> makeDisabledBundler(
kj::Own<PackagePromiseMap> loadedPackages = kj::heap<PackagePromiseMap>()) {
return jsg::alloc<ArtifactBundler>(kj::mv(loadedPackages));
kj::Maybe<const PyodidePackageManager&> packageManager = kj::none) {
return jsg::alloc<ArtifactBundler>(packageManager, kj::none);
}

void visitForMemoryInfo(jsg::MemoryTracker& tracker) const {
Expand All @@ -291,10 +262,14 @@ class ArtifactBundler: public jsg::Object {
return false; // TODO(later): Remove this function once we regenerate the bundle.
}

void onPackageReceived(jsg::Lock& lock,
kj::String path,
jsg::Function<void(jsg::Ref<SmallPackagesTarReader>)> resolve) {
loadedPackages->onPackageReceived(lock, kj::mv(path), kj::mv(resolve));
kj::Maybe<jsg::Ref<PackagesTarReader>> getPackage(kj::String path) {
KJ_IF_SOME(pacman, packageManager) {
KJ_IF_SOME(ptr, pacman.getPyodidePackage(path)) {
return jsg::alloc<PackagesTarReader>(ptr);
}
}

return kj::none;
}

JSG_RESOURCE_TYPE(ArtifactBundler) {
Expand All @@ -305,7 +280,7 @@ class ArtifactBundler: public jsg::Object {
JSG_METHOD(isEwValidating);
JSG_METHOD(storeMemorySnapshot);
JSG_METHOD(isEnabled);
JSG_METHOD(onPackageReceived);
JSG_METHOD(getPackage);
}

private:
Expand Down Expand Up @@ -405,7 +380,7 @@ bool hasPythonModules(capnp::List<server::config::Worker::Module>::Reader module
api::pyodide::PackagesTarReader, api::pyodide::PyodideMetadataReader, \
api::pyodide::ArtifactBundler, api::pyodide::DiskCache, \
api::pyodide::DisabledInternalJaeger, api::pyodide::SimplePythonLimiter, \
api::pyodide::MemorySnapshotResult, api::pyodide::SmallPackagesTarReader
api::pyodide::MemorySnapshotResult

template <class Registry>
void registerPyodideModules(Registry& registry, auto featureFlags) {
Expand Down

0 comments on commit a8c297f

Please sign in to comment.