Skip to content
This repository has been archived by the owner on Oct 21, 2024. It is now read-only.

Commit

Permalink
dev: improve large input/output flakiness
Browse files Browse the repository at this point in the history
  • Loading branch information
thdxr committed Sep 10, 2024
1 parent 6008dff commit ef260ef
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 29 deletions.
21 changes: 10 additions & 11 deletions cmd/sst/mosaic/aws/aws.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,15 +171,9 @@ func Start(
prefix := fmt.Sprintf("ion/%s/%s", p.App().Name, p.App().Stage)
reader := iot_writer.NewReader(s3Client)
if token := mqttClient.Subscribe(prefix+"/+/response/#", 1, func(c MQTT.Client, m MQTT.Message) {
splits := strings.Split(m.Topic(), "/")
workerID := splits[3]
go func() {
topic := prefix + "/" + workerID + "/ack"
slog.Info("acking", "topic", topic)
mqttClient.Publish(topic, 1, false, []byte{1}).Wait()
}()
slog.Info("iot", "topic", m.Topic())
slog.Info("iot", "topic", m.Topic(), "payload", len(m.Payload()))
for _, msg := range reader.Read(m) {
slog.Info("read", "requestID", msg.ID, "data", len(msg.Data))
write, ok := pending.Load(msg.ID)
if !ok {
return
Expand All @@ -188,7 +182,7 @@ func Start(
if len(msg.Data) == 0 {
slog.Info("closing", "requestID", msg.ID)
casted.Close()
break
return
}
casted.Write(msg.Data)
}
Expand Down Expand Up @@ -309,7 +303,6 @@ func Start(
return true
}

slog.Info("wheewooo")
for {
select {
case <-ctx.Done():
Expand All @@ -319,7 +312,6 @@ func Start(
if !ok {
continue
}

responseBody, err := io.ReadAll(evt.response.Body)
if err != nil {
continue
Expand All @@ -332,6 +324,9 @@ func Start(
RequestID: info.CurrentRequestID,
Input: responseBody,
})
topic := prefix + "/" + info.WorkerID + "/ack"
slog.Info("acking", "topic", topic)
mqttClient.Publish(topic, 1, false, []byte{1}).Wait()
}
if evt.path[len(evt.path)-1] == "response" {
bus.Publish(&FunctionResponseEvent{
Expand Down Expand Up @@ -420,6 +415,7 @@ func Start(
}
if _, ok := targets[payload.FunctionID]; !ok {
go func() {
slog.Info("dev not ready yet", "functionID", payload.FunctionID)
time.Sleep(time.Second * 1)
initChan <- m
}()
Expand Down Expand Up @@ -532,6 +528,9 @@ func Start(
}()

<-done
read.Close()
conn.Close()
write.Close()
slog.Info("lambda sent response", "workerID", workerID)
})

Expand Down
25 changes: 10 additions & 15 deletions cmd/sst/mosaic/aws/iot_writer/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,21 +99,21 @@ func (iw *IoTWriter) Close() error {
Body: bytes.NewReader(iw.buffer),
})
bytes := make([]byte, 4)
binary.BigEndian.PutUint32(bytes, uint32(99))
bytes = append(bytes, []byte(iw.bucket+"|"+key)...)
binary.BigEndian.PutUint32(bytes, uint32(iw.count))
iw.count++
bytes = append(bytes, []byte("blk"+iw.bucket+"|"+key)...)
token := iw.client.Publish(iw.topic, 1, false, bytes)
if token.Wait() && token.Error() != nil {
return token.Error()
}
}
bytes := make([]byte, 4)
binary.BigEndian.PutUint32(bytes, uint32(iw.count))
iw.count++
token := iw.client.Publish(iw.topic, 1, false, bytes)
if token.Wait() && token.Error() != nil {
return token.Error()
}
slog.Info("closed iot writer", "topic", iw.topic)
slog.Info("closed iot writer", "topic", iw.topic, "count", iw.count)
return nil
}

Expand Down Expand Up @@ -146,22 +146,17 @@ func (r *Reader) Read(m MQTT.Message) []ReadMsg {
r.buffer[requestID] = requestBuffer
}
id := int(binary.BigEndian.Uint32(payload[:4]))
if id == 99 {
data := string(payload[4:])
bucket, key, _ := strings.Cut(data, "|")
payload = payload[4:]
if bytes.HasPrefix(payload, []byte("blk")) {
data := string(payload)
bucket, key, _ := strings.Cut(data[3:], "|")
slog.Info("fetching from s3", "bucket", bucket, "key", key)
resp, _ := r.s3.GetObject(context.TODO(), &s3.GetObjectInput{
Bucket: aws.String(bucket),
Key: aws.String(key),
})
body, _ := io.ReadAll(resp.Body)
return []ReadMsg{
{
Data: body,
ID: requestID,
},
}
payload, _ = io.ReadAll(resp.Body)
}
payload = payload[4:]
requestBuffer[id] = ReadMsg{
Data: payload,
ID: requestID,
Expand Down
5 changes: 2 additions & 3 deletions pkg/bus/bus.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,11 @@ func SubscribeAll() chan interface{} {
}

func Publish(event interface{}) {
t := reflect.TypeOf(event)
slog.Info("publishing", "type", t)
bus.mu.RLock()
defer bus.mu.RUnlock()

t := reflect.TypeOf(event)

slog.Info("publishing", "type", t)
// Send to type-specific subscribers
if chans, found := bus.subscribers[t]; found {
for _, ch := range chans {
Expand Down

0 comments on commit ef260ef

Please sign in to comment.