Skip to content
This repository has been archived by the owner on Oct 21, 2024. It is now read-only.

Commit

Permalink
handle worker crashes
Browse files Browse the repository at this point in the history
  • Loading branch information
thdxr committed Feb 8, 2024
1 parent dbcee7e commit 6f56601
Show file tree
Hide file tree
Showing 4 changed files with 25 additions and 2 deletions.
2 changes: 1 addition & 1 deletion examples/test/sst.config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ export default $config({
url: true,
handler: "./src/index.handler",
environment: {
HELLO: "not lame",
HELLO: "why",
},
});

Expand Down
3 changes: 3 additions & 0 deletions pkg/runtime/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ type NodeWorker struct {

func (w *NodeWorker) Stop() {
w.cmd.Process.Signal(os.Interrupt)
}

func (w *NodeWorker) Done() {
w.cmd.Wait()
}

Expand Down
1 change: 1 addition & 0 deletions pkg/runtime/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ type Runtime interface {

type Worker interface {
Stop()
Done()
}

type BuildInput struct {
Expand Down
21 changes: 20 additions & 1 deletion pkg/server/dev/aws/aws.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ func Start(

type WorkerInfo struct {
FunctionID string
WorkerID string
Worker runtime.Worker
Env []string
}
Expand All @@ -144,6 +145,7 @@ func Start(
initChan := make(chan MQTT.Message, 1000)
shutdownChan := make(chan MQTT.Message, 1000)
fileChan := make(chan *watcher.FileChangedEvent, 1000)
workerShutdown := make(chan *WorkerInfo, 1000)

bus.Subscribe(ctx, func(event *project.StackEvent) {
if event.CompleteEvent != nil {
Expand Down Expand Up @@ -195,16 +197,33 @@ func Start(
Build: build,
Env: workerEnv[workerID],
})
workers[workerID] = &WorkerInfo{
info := &WorkerInfo{
FunctionID: functionID,
Worker: worker,
WorkerID: workerID,
}
go func() {
worker.Done()
workerShutdown <- info
}()
workers[workerID] = info
}

for {
select {
case <-ctx.Done():
return
case info := <-workerShutdown:
slog.Info("worker died", "workerID", info.WorkerID)
existing, ok := workers[info.WorkerID]
if !ok {
continue
}
// only delete if a new worker hasn't already been started
if existing == info {
delete(workers, info.WorkerID)
}
break
case complete = <-completeChan:
break
case m := <-initChan:
Expand Down

0 comments on commit 6f56601

Please sign in to comment.