Skip to content

Commit

Permalink
more fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
cirospaciari committed Jan 1, 2025
1 parent 80fc4fd commit 2a52a0e
Show file tree
Hide file tree
Showing 3 changed files with 229 additions and 121 deletions.
72 changes: 38 additions & 34 deletions src/bun.js/webcore/S3File.zig
Original file line number Diff line number Diff line change
Expand Up @@ -212,24 +212,26 @@ pub fn constructS3FileWithAWSCredentialsAndOptions(

var blob = Blob.initWithStore(store, globalObject);
if (options) |opts| {
if (try opts.getTruthy(globalObject, "type")) |file_type| {
inner: {
if (file_type.isString()) {
var allocator = bun.default_allocator;
var str = file_type.toSlice(globalObject, bun.default_allocator);
defer str.deinit();
const slice = str.slice();
if (!strings.isAllASCII(slice)) {
break :inner;
if (opts.isObject()) {
if (try opts.getTruthyComptime(globalObject, "type")) |file_type| {
inner: {
if (file_type.isString()) {
var allocator = bun.default_allocator;
var str = file_type.toSlice(globalObject, bun.default_allocator);
defer str.deinit();
const slice = str.slice();
if (!strings.isAllASCII(slice)) {
break :inner;
}
blob.content_type_was_set = true;
if (globalObject.bunVM().mimeType(str.slice())) |entry| {
blob.content_type = entry.value;
break :inner;
}
const content_type_buf = allocator.alloc(u8, slice.len) catch bun.outOfMemory();
blob.content_type = strings.copyLowercase(slice, content_type_buf);
blob.content_type_allocated = true;
}
blob.content_type_was_set = true;
if (globalObject.bunVM().mimeType(str.slice())) |entry| {
blob.content_type = entry.value;
break :inner;
}
const content_type_buf = allocator.alloc(u8, slice.len) catch bun.outOfMemory();
blob.content_type = strings.copyLowercase(slice, content_type_buf);
blob.content_type_allocated = true;
}
}
}
Expand All @@ -251,24 +253,26 @@ pub fn constructS3FileWithAWSCredentials(

var blob = Blob.initWithStore(store, globalObject);
if (options) |opts| {
if (try opts.getTruthy(globalObject, "type")) |file_type| {
inner: {
if (file_type.isString()) {
var allocator = bun.default_allocator;
var str = file_type.toSlice(globalObject, bun.default_allocator);
defer str.deinit();
const slice = str.slice();
if (!strings.isAllASCII(slice)) {
break :inner;
}
blob.content_type_was_set = true;
if (globalObject.bunVM().mimeType(str.slice())) |entry| {
blob.content_type = entry.value;
break :inner;
if (opts.isObject()) {
if (try opts.getTruthyComptime(globalObject, "type")) |file_type| {
inner: {
if (file_type.isString()) {
var allocator = bun.default_allocator;
var str = file_type.toSlice(globalObject, bun.default_allocator);
defer str.deinit();
const slice = str.slice();
if (!strings.isAllASCII(slice)) {
break :inner;
}
blob.content_type_was_set = true;
if (globalObject.bunVM().mimeType(str.slice())) |entry| {
blob.content_type = entry.value;
break :inner;
}
const content_type_buf = allocator.alloc(u8, slice.len) catch bun.outOfMemory();
blob.content_type = strings.copyLowercase(slice, content_type_buf);
blob.content_type_allocated = true;
}
const content_type_buf = allocator.alloc(u8, slice.len) catch bun.outOfMemory();
blob.content_type = strings.copyLowercase(slice, content_type_buf);
blob.content_type_allocated = true;
}
}
}
Expand Down
67 changes: 54 additions & 13 deletions src/s3.zig
Original file line number Diff line number Diff line change
Expand Up @@ -690,12 +690,15 @@ pub const AWSCredentials = struct {
.path = if (path) |p| bun.String.init(p) else bun.String.empty,
};
}

pub fn deinit(this: *const @This()) void {
this.path.deref();
this.code.deref();
this.message.deref();
}

pub fn toErrorInstance(this: *const @This(), global: *JSC.JSGlobalObject) JSC.JSValue {
defer {
this.path.deref();
this.code.deref();
this.message.deref();
}
defer this.deinit();

return S3Error__toErrorInstance(this, global);
}
Expand Down Expand Up @@ -1557,6 +1560,7 @@ pub const AWSCredentials = struct {
const S3UploadStreamWrapper = struct {
readable_stream_ref: JSC.WebCore.ReadableStream.Strong,
sink: *JSC.WebCore.FetchTaskletChunkedRequestSink,
task: *MultiPartUpload,
callback: ?*const fn (S3UploadResult, *anyopaque) void,
callback_context: *anyopaque,
ref_count: u32 = 1,
Expand Down Expand Up @@ -1588,20 +1592,23 @@ pub const AWSCredentials = struct {
self.readable_stream_ref.deinit();
self.sink.finalize();
self.sink.destroy();
self.task.deref();
self.destroy();
}
};
pub fn onUploadStreamResolveRequestStream(globalThis: *JSC.JSGlobalObject, callframe: *JSC.CallFrame) bun.JSError!JSC.JSValue {
var args = callframe.arguments_old(2);
var this = args.ptr[args.len - 1].asPromisePtr(S3UploadStreamWrapper);
defer this.deref();

if (this.sink.endPromise.hasValue()) {
this.sink.endPromise.resolve(globalThis, JSC.jsNumber(0));
}
if (this.readable_stream_ref.get()) |stream| {
stream.done(globalThis);
}
this.readable_stream_ref.deinit();
this.task.continueStream();

return .undefined;
}
Expand All @@ -1610,6 +1617,7 @@ pub const AWSCredentials = struct {
const args = callframe.arguments_old(2);
var this = args.ptr[args.len - 1].asPromisePtr(S3UploadStreamWrapper);
defer this.deref();

const err = args.ptr[0];
if (this.sink.endPromise.hasValue()) {
this.sink.endPromise.rejectOnNextTick(globalThis, err);
Expand All @@ -1627,6 +1635,8 @@ pub const AWSCredentials = struct {
});
}
}
this.task.continueStream();

return .undefined;
}
pub const shim = JSC.Shimmer("Bun", "S3UploadStream", @This());
Expand All @@ -1647,6 +1657,30 @@ pub const AWSCredentials = struct {
this.ref(); // ref the credentials
const proxy_url = (proxy orelse "");

if (readable_stream.isDisturbed(globalThis)) {
return JSC.JSPromise.rejectedPromiseValue(globalThis, bun.String.static("ReadableStream is already disturbed").toErrorInstance(globalThis));
}

switch (readable_stream.ptr) {
.Invalid => {
return JSC.JSPromise.rejectedPromiseValue(globalThis, bun.String.static("ReadableStream is invalid").toErrorInstance(globalThis));
},
inline .File, .Bytes => |stream| {
if (stream.pending.result == .err) {
// we got an error, fail early
const err = stream.pending.result.err;
stream.pending = .{ .result = .{ .done = {} } };
const js_err, const was_strong = err.toJSWeak(globalThis);
if (was_strong == .Strong) {
js_err.unprotect();
}
js_err.ensureStillAlive();
return JSC.JSPromise.rejectedPromise(globalThis, js_err).asValue(globalThis);
}
},
else => {},
}

const task = MultiPartUpload.new(.{
.credentials = this,
.path = bun.default_allocator.dupe(u8, path) catch bun.outOfMemory(),
Expand All @@ -1662,7 +1696,7 @@ pub const AWSCredentials = struct {

task.poll_ref.ref(task.vm);

task.ref(); // + 1 for the stream
task.ref(); // + 1 for the stream sink

var response_stream = JSC.WebCore.FetchTaskletChunkedRequestSink.new(.{
.task = .{ .s3_upload = task },
Expand All @@ -1671,15 +1705,22 @@ pub const AWSCredentials = struct {
.encoded = false,
.endPromise = JSC.JSPromise.Strong.init(globalThis),
}).toSink();
task.ref(); // + 1 for the stream wrapper

const endPromise = response_stream.sink.endPromise.value();
const ctx = S3UploadStreamWrapper.new(.{
.readable_stream_ref = JSC.WebCore.ReadableStream.Strong.init(readable_stream, globalThis),
.sink = &response_stream.sink,
.callback = callback,
.callback_context = callback_context,
.path = task.path,
.task = task,
});
task.callback_context = @ptrCast(ctx);
// keep the task alive until we are done configuring the signal
task.ref();
defer task.deref();

var signal = &response_stream.sink.signal;

signal.* = JSC.WebCore.FetchTaskletChunkedRequestSink.JSSink.SinkSignal.init(.zero);
Expand All @@ -1706,6 +1747,7 @@ pub const AWSCredentials = struct {
if (response_stream.sink.endPromise.hasValue()) {
response_stream.sink.endPromise.rejectOnNextTick(globalThis, err);
}

task.fail(.{
.code = "UnknownError",
.message = "ReadableStream ended with an error",
Expand All @@ -1722,26 +1764,29 @@ pub const AWSCredentials = struct {
if (assignment_result.asAnyPromise()) |promise| {
switch (promise.status(globalThis.vm())) {
.pending => {
task.continueStream();
ctx.ref();
assignment_result.then(
globalThis,
task.callback_context,
onUploadStreamResolveRequestStream,
onUploadStreamRejectRequestStream,
);
if (!task.ended)
task.continueStream();
},
.fulfilled => {
task.continueStream();
if (response_stream.sink.endPromise.hasValue()) {
response_stream.sink.endPromise.resolve(globalThis, JSC.jsNumber(0));
}

readable_stream.done(globalThis);
},
.rejected => {
if (response_stream.sink.endPromise.hasValue()) {
response_stream.sink.endPromise.rejectOnNextTick(globalThis, promise.result(globalThis.vm()));
}

task.fail(.{
.code = "UnknownError",
.message = "ReadableStream ended with an error",
Expand All @@ -1753,6 +1798,7 @@ pub const AWSCredentials = struct {
if (response_stream.sink.endPromise.hasValue()) {
response_stream.sink.endPromise.rejectOnNextTick(globalThis, assignment_result);
}

task.fail(.{
.code = "UnknownError",
.message = "ReadableStream ended with an error",
Expand Down Expand Up @@ -2191,10 +2237,7 @@ pub const MultiPartUpload = struct {

pub fn onCommitMultiPartRequest(result: AWS.S3CommitResult, this: *@This()) void {
log("onCommitMultiPartRequest {s}", .{this.upload_id});
if (this.state == .finished) {
this.deinit();
return;
}

switch (result) {
.failure => |err| {
if (this.options.retry > 0) {
Expand Down Expand Up @@ -2238,7 +2281,6 @@ pub const MultiPartUpload = struct {
const searchParams = std.fmt.bufPrint(&params_buffer, "?uploadId={s}", .{
this.upload_id,
}) catch unreachable;
this.ref();

this.credentials.executeSimpleS3Request(.{
.path = this.path,
Expand All @@ -2254,7 +2296,6 @@ pub const MultiPartUpload = struct {
const search_params = std.fmt.bufPrint(&params_buffer, "?uploadId={s}", .{
this.upload_id,
}) catch unreachable;
this.ref();

this.credentials.executeSimpleS3Request(.{
.path = this.path,
Expand Down
Loading

0 comments on commit 2a52a0e

Please sign in to comment.