Skip to content
This repository has been archived by the owner on Oct 21, 2024. It is now read-only.

Commit

Permalink
socket support
Browse files Browse the repository at this point in the history
  • Loading branch information
thdxr committed Feb 21, 2024
1 parent 3fc800a commit ce9fc3e
Show file tree
Hide file tree
Showing 6 changed files with 63 additions and 16 deletions.
7 changes: 7 additions & 0 deletions cmd/sst/ui/ui.go
Original file line number Diff line number Diff line change
Expand Up @@ -402,6 +402,13 @@ func (u *UI) Event(evt *server.Event) {
u.printEvent(color.FgGreen, "Build", evt.FunctionBuildEvent.FunctionID)
}

if evt.FunctionErrorEvent != nil {
u.printEvent(color.FgRed, "Error", evt.FunctionErrorEvent.ErrorMessage)
for _, item := range evt.FunctionErrorEvent.Trace {
u.printEvent(color.FgRed, "", strings.TrimSpace(item))
}
}

}

func (u *UI) printEvent(barColor color.Attribute, label string, message string) {
Expand Down
1 change: 1 addition & 0 deletions examples/test/src/index.mjs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
export async function handler() {
console.log("hello");
throw new Error("test");
return {
statusCode: 200,
headers: {
Expand Down
12 changes: 6 additions & 6 deletions pkg/platform/functions/nodejs-runtime/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,11 @@ let response: any;
let context: LambdaContext;

async function error(ex: any) {
console.log(ex);
const body = JSON.stringify({
errorType: "Error",
errorMessage: ex.message,
trace: ex.stack?.split("\n"),
});
await fetch(
AWS_LAMBDA_RUNTIME_API +
(!context
Expand All @@ -32,11 +36,7 @@ async function error(ex: any) {
headers: {
"Content-Type": "application/json",
},
body: JSON.stringify({
errorType: "Error",
errorMessage: ex.message,
trace: ex.stack?.split("\n"),
}),
body,
},
);
}
Expand Down
42 changes: 32 additions & 10 deletions pkg/server/dev/aws/aws.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,15 @@ type FunctionResponseEvent struct {
Output []byte
}

type FunctionErrorEvent struct {
FunctionID string
WorkerID string
RequestID string
ErrorType string `json:"errorType"`
ErrorMessage string `json:"errorMessage"`
Trace []string `json:"trace"`
}

type FunctionBuildEvent struct {
FunctionID string
Errors []string
Expand Down Expand Up @@ -178,9 +187,10 @@ func Start(
fileChan := make(chan *watcher.FileChangedEvent, 1000)

type workerResponse struct {
response *http.Response
workerID string
path []string
response *http.Response
requestBody *bytes.Buffer
workerID string
path []string
}
workerResponseChan := make(chan workerResponse, 1000)
workerShutdownChan := make(chan *WorkerInfo, 1000)
Expand Down Expand Up @@ -287,7 +297,7 @@ func Start(
continue
}

body, err := io.ReadAll(evt.response.Body)
responseBody, err := io.ReadAll(evt.response.Body)
if err != nil {
continue
}
Expand All @@ -297,17 +307,26 @@ func Start(
FunctionID: info.FunctionID,
WorkerID: info.WorkerID,
RequestID: info.CurrentRequestID,
Input: body,
Input: responseBody,
})
}
if evt.path[len(evt.path)-1] == "response" {
bus.Publish(&FunctionResponseEvent{
FunctionID: info.FunctionID,
WorkerID: info.WorkerID,
RequestID: evt.path[len(evt.path)-2],
Output: body,
Output: responseBody,
})
}
if evt.path[len(evt.path)-1] == "error" {
fee := &FunctionErrorEvent{
FunctionID: info.FunctionID,
WorkerID: info.WorkerID,
RequestID: evt.path[len(evt.path)-2],
}
json.Unmarshal(evt.requestBody.Bytes(), &fee)
bus.Publish(fee)
}
case info := <-workerShutdownChan:
slog.Info("worker died", "workerID", info.WorkerID)
existing, ok := workers[info.WorkerID]
Expand Down Expand Up @@ -404,8 +423,10 @@ func Start(
fmt.Fprint(writer, "Host: 127.0.0.1\r\n")
_, err := fmt.Fprint(writer, "\r\n")

requestBody := &bytes.Buffer{}
if r.ContentLength > 0 {
io.Copy(writer, r.Body)
write := io.MultiWriter(writer, requestBody)
io.Copy(write, r.Body)
}
writer.Flush()

Expand Down Expand Up @@ -435,9 +456,10 @@ func Start(
resp, err := http.ReadResponse(bufio.NewReader(buf), nil)
if err == nil {
workerResponseChan <- workerResponse{
workerID: workerID,
response: resp,
path: path,
workerID: workerID,
response: resp,
requestBody: requestBody,
path: path,
}
}
done <- struct{}{}
Expand Down
6 changes: 6 additions & 0 deletions pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ type Event struct {
StateEvent *StateEvent
FunctionInvokedEvent *aws.FunctionInvokedEvent
FunctionResponseEvent *aws.FunctionResponseEvent
FunctionErrorEvent *aws.FunctionErrorEvent
FunctionLogEvent *aws.FunctionLogEvent
FunctionBuildEvent *aws.FunctionBuildEvent
}
Expand Down Expand Up @@ -122,6 +123,11 @@ func (s *Server) Start(parentContext context.Context) error {
FunctionResponseEvent: event,
})
})
bus.Subscribe(ctx, func(event *aws.FunctionErrorEvent) {
publish(&Event{
FunctionErrorEvent: event,
})
})

bus.Subscribe(ctx, func(event *aws.FunctionLogEvent) {
publish(&Event{
Expand Down
11 changes: 11 additions & 0 deletions pkg/server/socket/socket.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ func Start(ctx context.Context, p *project.Project, mux *http.ServeMux) {

invoke := bus.Listen(ctx, &aws.FunctionInvokedEvent{})
response := bus.Listen(ctx, &aws.FunctionResponseEvent{})
error := bus.Listen(ctx, &aws.FunctionErrorEvent{})
log := bus.Listen(ctx, &aws.FunctionLogEvent{})
stack := bus.Listen(ctx, &project.StackEvent{})

Expand Down Expand Up @@ -145,6 +146,16 @@ func Start(ctx context.Context, p *project.Project, mux *http.ServeMux) {
publishInvocation(invocation)
}
break
case evt := <-error:
invocation, ok := invocations[evt.RequestID]
if ok {
invocation.End = time.Now().UnixMilli()
invocation.Report = &InvocationReport{
Duration: invocation.End - invocation.Start,
}
publishInvocation(invocation)
}
break
case evt := <-log:
invocation, ok := invocations[evt.RequestID]
if ok {
Expand Down

0 comments on commit ce9fc3e

Please sign in to comment.