-
Notifications
You must be signed in to change notification settings - Fork 21
/
Copy pathgroups.go
161 lines (135 loc) · 3.78 KB
/
groups.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
package tasqueue
import (
"context"
"fmt"
"github.com/google/uuid"
"github.com/vmihailenco/msgpack/v5"
)
type Group struct {
Jobs []Job
Opts GroupOpts
}
type GroupOpts struct {
ID string
}
// GroupMeta contains fields related to a group job. These are updated when a task is consumed.
type GroupMeta struct {
ID string
Status string
// JobStatus is a map of job id -> status
JobStatus map[string]string
}
// GroupMessage is a wrapper over Group, containing meta info such as status, id.
// A GroupMessage is stored in the results store.
type GroupMessage struct {
GroupMeta
Group *Group
}
// message() converts a group into a group message, ready to be enqueued/stored.
func (t *Group) message() GroupMessage {
if t.Opts.ID == "" {
t.Opts.ID = uuid.NewString()
}
return GroupMessage{
GroupMeta: GroupMeta{
JobStatus: make(map[string]string),
ID: t.Opts.ID,
Status: StatusProcessing,
},
Group: t,
}
}
// NewGroup() accepts a list of jobs and creates a group.
func NewGroup(j []Job, opts GroupOpts) (Group, error) {
return Group{
Jobs: j,
Opts: opts,
}, nil
}
// EnqueueGroup() accepts a group and returns the assigned ID.
// The following steps take place:
// 1. Converts it into a group message, which assigns a ID (among other meta info) to the group.
// 2. Sets the group status as "started" on the results store.
// 3. Loops over all jobs part of the group and enqueues the job each job.
// 4. The job status map is updated with the IDs of each enqueued job.
func (s *Server) EnqueueGroup(ctx context.Context, t Group) (string, error) {
msg := t.message()
for _, v := range t.Jobs {
uid, err := s.Enqueue(ctx, v)
if err != nil {
return "", fmt.Errorf("could not enqueue group : %w", err)
}
msg.JobStatus[uid] = StatusStarted
}
if err := s.setGroupMessage(ctx, msg); err != nil {
return "", err
}
return msg.ID, nil
}
func (s *Server) GetGroup(ctx context.Context, id string) (GroupMessage, error) {
g, err := s.getGroupMessage(ctx, id)
if err != nil {
return g, err
}
// If the group status is either "done" or "failed".
// Do an early return
if g.Status == StatusDone || g.Status == StatusFailed {
return g, nil
}
// jobStatus holds the updated map of job status'
jobStatus := make(map[string]string)
// Run over the individual jobs and their status.
for id, status := range g.JobStatus {
switch status {
// Jobs with a final status remain the same and do not require lookup
case StatusFailed, StatusDone:
jobStatus[id] = status
// Re-look the jobs where the status is an intermediatery state (processing, retrying, etc).
case StatusStarted, StatusProcessing, StatusRetrying:
j, err := s.GetJob(ctx, id)
if err != nil {
return GroupMessage{}, err
}
jobStatus[id] = j.Status
}
}
// Update the overall group status, based on the individual jobs.
g.JobStatus = jobStatus
g.Status = getGroupStatus(jobStatus)
// Re-set the group result in the store.
if err := s.setGroupMessage(ctx, g); err != nil {
return GroupMessage{}, err
}
return g, nil
}
func getGroupStatus(jobStatus map[string]string) string {
status := StatusDone
for _, st := range jobStatus {
if st == StatusFailed {
return StatusFailed
}
if st != StatusDone {
status = StatusProcessing
}
}
return status
}
const groupPrefix = "group:msg:"
func (s *Server) setGroupMessage(ctx context.Context, g GroupMessage) error {
b, err := msgpack.Marshal(g)
if err != nil {
return err
}
return s.results.Set(ctx, groupPrefix+g.ID, b)
}
func (s *Server) getGroupMessage(ctx context.Context, id string) (GroupMessage, error) {
b, err := s.GetResult(ctx, groupPrefix+id)
if err != nil {
return GroupMessage{}, err
}
var g GroupMessage
if err := msgpack.Unmarshal(b, &g); err != nil {
return GroupMessage{}, err
}
return g, nil
}