Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 14 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,18 @@ if err != nil {

#### Inspect

[Protocol Doc](https://github.com/iamduo/workq/blob/master/doc/protocol.md#inspect)
[Protocol Doc](https://github.com/iamduo/workq/blob/master/doc/protocol.md#inspect) | [Go Doc](https://godoc.org/github.com/iamduo/go-workq#Client.InspectJobs)

Inspect commands not yet supported yet.
##### Inspect foreground or background jobs by name

```go
// Inspect jobs with name "ping" starting from cursor offset 0 and limiting results to 10.
jobs, err = client.InspectJobs("ping", 0, 10)
if err != nil {
// ...
}
// Print jobs as table
for _, job := range jobs {
fmt.Printf("%s\t%d\t%s\n", job.ID, job.Priority, job.Created.Local())
}
```
190 changes: 186 additions & 4 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,21 +13,25 @@ import (
"strings"

"github.com/satori/go.uuid"
"time"
)

var (
// ErrMalformed is returned when responses from workq can not be parsed
// due to unrecognized responses.
ErrMalformed = errors.New("Malformed response")
ErrMalformed = errors.New("Malformed response")
ErrPayloadMustFollowSize = errors.New("Payload must immediately follow payload size when inspecting jobs")
)

const (
// Max Data Block that can be read within a response, 1 MiB.
maxDataBlock = 1048576

// Line terminator in string form.
crnl = "\r\n"
termLen = 2
crnl = "\r\n"
termLen = 2
// prefix of payload line when inspecting jobs
payloadKey = "payload "

// Time format for any date times. Compatible with time.Format.
TimeFormat = "2006-01-02T15:04:05Z"
Expand Down Expand Up @@ -296,6 +300,33 @@ func (c *Client) Delete(id string) error {
return c.parser.parseOk()
}

// "inspect jobs" command: https://github.com/iamduo/workq/blob/master/doc/protocol.md#inspect-foreground-or-background-jobs-by-name
//
// Inspect foreground or background jobs by name, @see PROTOCOL_DOC
// Returns ResponseError for Workq response errors.
// Returns NetError on any network errors.
// Returns ErrMalformed if response can't be parsed.
// Returns ErrPayloadMustFollowSize if payload is not directly preceded by payload size in key value list.
func (c *Client) InspectJobs(name string, cursorOffset int, limit int) ([]*InspectedJob, error) {
r := []byte(fmt.Sprintf(
"inspect jobs %s %d %d"+crnl,
name,
cursorOffset,
limit,
))
_, err := c.conn.Write(r)
if err != nil {
return nil, NewNetError(err.Error())
}

count, err := c.parser.parseOkWithReply()
if err != nil {
return nil, err
}

return c.parser.readInspectedJobs(count)
}

type responseParser struct {
rdr *bufio.Reader
}
Expand Down Expand Up @@ -437,7 +468,7 @@ func (p *responseParser) readResult() (*JobResult, error) {

// Read leased job consisting of 2 separate terminated lines.
// "<id> <name> <payload-length>\r\n
// <payload-block\r\n"
// <payload-block>\r\n"
func (p *responseParser) readLeasedJob() (*LeasedJob, error) {
line, err := p.readLine()
split := strings.Split(string(line), " ")
Expand Down Expand Up @@ -476,6 +507,157 @@ func (p *responseParser) readLeasedJob() (*LeasedJob, error) {
return j, nil
}

// Read inspected jobs.
// <id> <key-count>\r\n
// <key> <value>\r\n
// ... Repeats up to <key-count>
// ... Repeats up to <reply-count>
func (p *responseParser) readInspectedJobs(replyCount int) ([]*InspectedJob, error) {
var jobs []*InspectedJob
for i := 0; i < replyCount; i++ {
job, err := p.parseInspectedJob()
if err != nil {
return nil, err
}
jobs = append(jobs, job)
}

// Check for unexpected trailing bytes
block := make([]byte, 1)
_, err := io.ReadAtLeast(p.rdr, block, 1)
if err == nil {
return nil, ErrMalformed
}

return jobs, nil
}

// Parse a single job from an inspected job response.
// <id> <key-count>\r\n
// <key> <value>\r\n
// ... Repeats up to <key-count>
func (p *responseParser) parseInspectedJob() (*InspectedJob, error) {
line, err := p.readLine()
if err != nil {
return nil, ErrMalformed
}
split := strings.Split(string(line), " ")
if len(split) != 2 {
return nil, ErrMalformed
}

j := &InspectedJob{}

j.ID, err = idFromString(split[0])
if err != nil {
return nil, err
}

keyCount, err := strconv.Atoi(split[1])
if err != nil {
return nil, ErrMalformed
}

for k := 0; k < keyCount; k++ {
line, err := p.readLine()
if err != nil {
return nil, ErrMalformed
}

split := strings.Split(string(line), " ")
if len(split) != 2 {
return nil, ErrMalformed
}

switch split[0] {
case "name":
j.Name, err = nameFromString(split[1])
if err != nil {
return nil, ErrMalformed
}
case "ttr":
ttr, err := strconv.ParseUint(split[1], 10, 32)
if err != nil {
return nil, ErrMalformed
}
j.TTR = int(ttr)
case "ttl":
ttl, err := strconv.ParseUint(split[1], 10, 64)
if err != nil {
return nil, ErrMalformed
}
j.TTL = int(ttl)
case "payload":
// The payload line is always read by the payload-size section.
// Encountering it here means that the order of keys is incorrect.
return nil, ErrPayloadMustFollowSize
case "payload-size":
payloadSize, err := strconv.ParseUint(split[1], 10, 64)
if err != nil {
return nil, ErrMalformed
}
// Payload line has to immediately follow payload-size line because the entire
// payload must be read as bytes regardless of the newlines it may contain.
b := make([]byte, len(payloadKey))
n, err := p.rdr.Read(b)
if err != nil || n != len(payloadKey) || string(b) != payloadKey {
return nil, ErrPayloadMustFollowSize
}
j.Payload, err = p.readBlock(int(payloadSize))
if err != nil {
return nil, err
}
k++ // because payload line has been processed outside of loop
case "max-attempts":
maxAttempts, err := strconv.ParseUint(split[1], 10, 8)
if err != nil {
return nil, ErrMalformed
}
j.MaxAttempts = int(maxAttempts)
case "attempts":
attempts, err := strconv.ParseUint(split[1], 10, 8)
if err != nil {
return nil, ErrMalformed
}
j.Attempts = int(attempts)
case "max-fails":
maxFails, err := strconv.ParseUint(split[1], 10, 8)
if err != nil {
return nil, ErrMalformed
}
j.MaxFails = int(maxFails)
case "fails":
fails, err := strconv.ParseUint(split[1], 10, 8)
if err != nil {
return nil, ErrMalformed
}
j.Fails = int(fails)
case "priority":
priority, err := strconv.ParseInt(split[1], 10, 32)
if err != nil {
return nil, ErrMalformed
}
j.Priority = int(priority)
case "state":
state, err := strconv.ParseUint(split[1], 10, 8)
if err != nil {
return nil, ErrMalformed
}
j.State = int(state)
case "created":
var created time.Time
created, err = time.Parse(time.RFC3339, split[1])
if err != nil {
return nil, ErrMalformed
}
j.Created = created
default:
return nil, ErrMalformed
}
}
return j, nil
}

// Parse an error from "-CODE TEXT"
func (p *responseParser) errorFromLine(line []byte) (error, bool) {
split := strings.SplitN(string(line), " ", 2)
Expand Down
Loading