diff --git a/README.md b/README.md index d60ca10..e3e75b4 100644 --- a/README.md +++ b/README.md @@ -192,6 +192,18 @@ if err != nil { #### Inspect -[Protocol Doc](https://github.com/iamduo/workq/blob/master/doc/protocol.md#inspect) +[Protocol Doc](https://github.com/iamduo/workq/blob/master/doc/protocol.md#inspect) | [Go Doc](https://godoc.org/github.com/iamduo/go-workq#Client.InspectJobs) -Inspect commands not yet supported yet. +##### Inspect foreground or background jobs by name + +```go +// Inspect jobs with name "ping" starting from cursor offset 0 and limiting results to 10. +jobs, err = client.InspectJobs("ping", 0, 10) +if err != nil { + // ... +} +// Print jobs as table +for _, job := range jobs { + fmt.Printf("%s\t%d\t%s\n", job.ID, job.Priority, job.Created.Local()) +} +``` diff --git a/client.go b/client.go index a776270..ffe411b 100644 --- a/client.go +++ b/client.go @@ -13,12 +13,14 @@ import ( "strings" "github.com/satori/go.uuid" + "time" ) var ( // ErrMalformed is returned when responses from workq can not be parsed // due to unrecognized responses. - ErrMalformed = errors.New("Malformed response") + ErrMalformed = errors.New("Malformed response") + ErrPayloadMustFollowSize = errors.New("Payload must immediately follow payload size when inspecting jobs") ) const ( @@ -26,8 +28,10 @@ const ( maxDataBlock = 1048576 // Line terminator in string form. - crnl = "\r\n" - termLen = 2 + crnl = "\r\n" + termLen = 2 + // prefix of payload line when inspecting jobs + payloadKey = "payload " // Time format for any date times. Compatible with time.Format. TimeFormat = "2006-01-02T15:04:05Z" @@ -296,6 +300,33 @@ func (c *Client) Delete(id string) error { return c.parser.parseOk() } +// "inspect jobs" command: https://github.com/iamduo/workq/blob/master/doc/protocol.md#inspect-foreground-or-background-jobs-by-name +// +// Inspect foreground or background jobs by name, @see PROTOCOL_DOC +// Returns ResponseError for Workq response errors. +// Returns NetError on any network errors. +// Returns ErrMalformed if response can't be parsed. +// Returns ErrPayloadMustFollowSize if payload is not directly preceded by payload size in key value list. +func (c *Client) InspectJobs(name string, cursorOffset int, limit int) ([]*InspectedJob, error) { + r := []byte(fmt.Sprintf( + "inspect jobs %s %d %d"+crnl, + name, + cursorOffset, + limit, + )) + _, err := c.conn.Write(r) + if err != nil { + return nil, NewNetError(err.Error()) + } + + count, err := c.parser.parseOkWithReply() + if err != nil { + return nil, err + } + + return c.parser.readInspectedJobs(count) +} + type responseParser struct { rdr *bufio.Reader } @@ -437,7 +468,7 @@ func (p *responseParser) readResult() (*JobResult, error) { // Read leased job consisting of 2 separate terminated lines. // " \r\n -// \r\n" func (p *responseParser) readLeasedJob() (*LeasedJob, error) { line, err := p.readLine() split := strings.Split(string(line), " ") @@ -476,6 +507,157 @@ func (p *responseParser) readLeasedJob() (*LeasedJob, error) { return j, nil } +// Read inspected jobs. +// \r\n +// \r\n +// ... Repeats up to +// ... Repeats up to +func (p *responseParser) readInspectedJobs(replyCount int) ([]*InspectedJob, error) { + var jobs []*InspectedJob + for i := 0; i < replyCount; i++ { + job, err := p.parseInspectedJob() + if err != nil { + return nil, err + } + jobs = append(jobs, job) + } + + // Check for unexpected trailing bytes + block := make([]byte, 1) + _, err := io.ReadAtLeast(p.rdr, block, 1) + if err == nil { + return nil, ErrMalformed + } + + return jobs, nil +} + +// Parse a single job from an inspected job response. +// \r\n +// \r\n +// ... Repeats up to +func (p *responseParser) parseInspectedJob() (*InspectedJob, error) { + line, err := p.readLine() + if err != nil { + return nil, ErrMalformed + } + split := strings.Split(string(line), " ") + if len(split) != 2 { + return nil, ErrMalformed + } + + j := &InspectedJob{} + + j.ID, err = idFromString(split[0]) + if err != nil { + return nil, err + } + + keyCount, err := strconv.Atoi(split[1]) + if err != nil { + return nil, ErrMalformed + } + + for k := 0; k < keyCount; k++ { + line, err := p.readLine() + if err != nil { + return nil, ErrMalformed + } + + split := strings.Split(string(line), " ") + if len(split) != 2 { + return nil, ErrMalformed + } + + switch split[0] { + case "name": + j.Name, err = nameFromString(split[1]) + if err != nil { + return nil, ErrMalformed + } + case "ttr": + ttr, err := strconv.ParseUint(split[1], 10, 32) + if err != nil { + return nil, ErrMalformed + } + j.TTR = int(ttr) + case "ttl": + ttl, err := strconv.ParseUint(split[1], 10, 64) + if err != nil { + return nil, ErrMalformed + } + j.TTL = int(ttl) + case "payload": + // The payload line is always read by the payload-size section. + // Encountering it here means that the order of keys is incorrect. + return nil, ErrPayloadMustFollowSize + case "payload-size": + payloadSize, err := strconv.ParseUint(split[1], 10, 64) + if err != nil { + return nil, ErrMalformed + } + // Payload line has to immediately follow payload-size line because the entire + // payload must be read as bytes regardless of the newlines it may contain. + b := make([]byte, len(payloadKey)) + n, err := p.rdr.Read(b) + if err != nil || n != len(payloadKey) || string(b) != payloadKey { + return nil, ErrPayloadMustFollowSize + } + j.Payload, err = p.readBlock(int(payloadSize)) + if err != nil { + return nil, err + } + k++ // because payload line has been processed outside of loop + case "max-attempts": + maxAttempts, err := strconv.ParseUint(split[1], 10, 8) + if err != nil { + return nil, ErrMalformed + } + j.MaxAttempts = int(maxAttempts) + case "attempts": + attempts, err := strconv.ParseUint(split[1], 10, 8) + if err != nil { + return nil, ErrMalformed + } + j.Attempts = int(attempts) + case "max-fails": + maxFails, err := strconv.ParseUint(split[1], 10, 8) + if err != nil { + return nil, ErrMalformed + } + j.MaxFails = int(maxFails) + case "fails": + fails, err := strconv.ParseUint(split[1], 10, 8) + if err != nil { + return nil, ErrMalformed + } + j.Fails = int(fails) + case "priority": + priority, err := strconv.ParseInt(split[1], 10, 32) + if err != nil { + return nil, ErrMalformed + } + j.Priority = int(priority) + case "state": + state, err := strconv.ParseUint(split[1], 10, 8) + if err != nil { + return nil, ErrMalformed + } + j.State = int(state) + case "created": + var created time.Time + created, err = time.Parse(time.RFC3339, split[1]) + if err != nil { + return nil, ErrMalformed + } + j.Created = created + default: + return nil, ErrMalformed + } + } + return j, nil +} + // Parse an error from "-CODE TEXT" func (p *responseParser) errorFromLine(line []byte) (error, bool) { split := strings.SplitN(string(line), " ", 2) diff --git a/client_test.go b/client_test.go index 1e577ef..08bb0df 100644 --- a/client_test.go +++ b/client_test.go @@ -464,7 +464,7 @@ func TestResult(t *testing.T) { } if !bytes.Equal([]byte("a"), result.Result) { - t.Fatalf("Resullt mismatch") + t.Fatalf("Result mismatch") } expWrite := []byte( @@ -820,6 +820,468 @@ func TestDeleteBadConnError(t *testing.T) { } } +func TestInspectJobs(t *testing.T) { + conn := &TestConn{ + rdr: bytes.NewBuffer([]byte( + "+OK 2\r\n" + + "6ba7b810-9dad-11d1-80b4-00c04fd430c4 12\r\n" + + "name ping\r\n" + + "ttr 1000\r\n" + + "ttl 60000\r\n" + + "payload-size 4\r\n" + + "payload ping\r\n" + + "max-attempts 0\r\n" + + "attempts 0\r\n" + + "max-fails 0\r\n" + + "fails 0\r\n" + + "priority 0\r\n" + + "state 0\r\n" + + "created 2016-08-22T01:50:51Z\r\n" + + "6ba7b810-9dad-11d1-80b4-00c04fd430c6 12\r\n" + + "name ping\r\n" + + "ttr 1000\r\n" + + "ttl 60000\r\n" + + "payload-size 4\r\n" + + "payload ping\r\n" + + "max-attempts 0\r\n" + + "attempts 0\r\n" + + "max-fails 0\r\n" + + "fails 0\r\n" + + "priority 0\r\n" + + "state 0\r\n" + + "created 2016-08-22T02:00:17Z\r\n", + )), + wrt: bytes.NewBuffer([]byte("")), + } + client := NewClient(conn) + jobs, err := client.InspectJobs("ping", 0, 10) + if err != nil { + t.Fatalf("Response mismatch, err=%s", err) + } + + if len(jobs) != 2 { + t.Fatalf("Reply count mismatch") + } + + if jobs[0].ID != "6ba7b810-9dad-11d1-80b4-00c04fd430c4" { + t.Fatalf("ID mismatch") + } + if jobs[1].ID != "6ba7b810-9dad-11d1-80b4-00c04fd430c6" { + t.Fatalf("ID mismatch") + } + if jobs[0].Name!= "ping" { + t.Fatalf("Name mismatch") + } + if jobs[0].TTR!= 1000 { + t.Fatalf("TTR mismatch") + } + if jobs[0].TTL!= 60000 { + t.Fatalf("TTL mismatch") + } + if !bytes.Equal([]byte("ping"), jobs[0].Payload) { + t.Fatalf("Payload mismatch") + } + if jobs[0].MaxAttempts!= 0 { + t.Fatalf("MaxAttempts mismatch") + } + if jobs[0].Attempts!= 0 { + t.Fatalf("Attempts mismatch") + } + if jobs[0].MaxFails!= 0 { + t.Fatalf("MaxFails mismatch") + } + if jobs[0].Fails!= 0 { + t.Fatalf("Fails mismatch") + } + if jobs[0].Priority!= 0 { + t.Fatalf("Fails mismatch") + } + if jobs[0].State!= 0 { + t.Fatalf("Fails mismatch") + } + timeRef := time.Date(2016, time.August, 22, 1, 50, 51, 0, time.UTC) + if !time.Time.Equal(jobs[0].Created, timeRef) { // 2016-08-22T02:00:17Z + t.Fatalf("Creation date mismatch: %s != %s", jobs[0].Created, timeRef) + } + + expWrite := []byte( + "inspect jobs ping 0 10\r\n", + ) + if !bytes.Equal(expWrite, conn.wrt.Bytes()) { + t.Fatalf("Write mismatch, act=%s", conn.wrt.Bytes()) + } +} + +func TestInspectJobsErrors(t *testing.T) { + tests := []RespErrTestCase{ + // Invalid reply-count + { + resp: []byte( + "+OK 2\r\n" + + "6ba7b810-9dad-11d1-80b4-00c04fd430c4 12\r\n"+ + "name ping\r\n"+ + "ttr 1000\r\n"+ + "ttl 60000\r\n"+ + "payload-size 4\r\n"+ + "payload ping\r\n"+ + "max-attempts 0\r\n"+ + "attempts 0\r\n"+ + "max-fails 0\r\n"+ + "fails 0\r\n"+ + "priority 0\r\n"+ + "state 0\r\n"+ + "created 2016-08-22T01:50:51Z\r\n", + ), + expErr: ErrMalformed, + }, + // payload after payload size but with other keys in between + { + resp: []byte( + "+OK 1\r\n" + + "6ba7b810-9dad-11d1-80b4-00c04fd430c4 12\r\n"+ + "name ping\r\n"+ + "ttr 1000\r\n"+ + "ttl 60000\r\n"+ + "payload-size 4\r\n"+ + "max-attempts 0\r\n"+ + "payload ping\r\n"+ + "attempts 0\r\n"+ + "max-fails 0\r\n"+ + "fails 0\r\n"+ + "priority 0\r\n"+ + "state 0\r\n"+ + "created 2016-08-22T01:50:51Z\r\n", + ), + expErr: ErrPayloadMustFollowSize, + }, + // payload before payload-size + { + resp: []byte( + "+OK 1\r\n" + + "6ba7b810-9dad-11d1-80b4-00c04fd430c4 12\r\n"+ + "name ping\r\n"+ + "ttr 1000\r\n"+ + "payload ping\r\n"+ + "ttl 60000\r\n"+ + "payload-size 4\r\n"+ + "max-attempts 0\r\n"+ + "attempts 0\r\n"+ + "max-fails 0\r\n"+ + "fails 0\r\n"+ + "priority 0\r\n"+ + "state 0\r\n"+ + "created 2016-08-22T01:50:51Z\r\n", + ), + expErr: ErrPayloadMustFollowSize, + }, + // Invalid key-count (too small) + { + resp: []byte( + "+OK 2\r\n" + + "6ba7b810-9dad-11d1-80b4-00c04fd430c4 11\r\n" + + "name ping\r\n" + + "ttr 1000\r\n" + + "ttl 60000\r\n" + + "payload-size 4\r\n" + + "payload ping\r\n" + + "max-attempts 0\r\n" + + "attempts 0\r\n" + + "max-fails 0\r\n" + + "fails 0\r\n" + + "priority 0\r\n" + + "state 0\r\n" + + "created 2016-08-22T01:50:51Z\r\n" + + "6ba7b810-9dad-11d1-80b4-00c04fd430c6 12\r\n" + + "name ping\r\n" + + "ttr 1000\r\n" + + "ttl 60000\r\n" + + "payload-size 4\r\n" + + "payload ping\r\n" + + "max-attempts 0\r\n" + + "attempts 0\r\n" + + "max-fails 0\r\n" + + "fails 0\r\n" + + "priority 0\r\n" + + "state 0\r\n" + + "created 2016-08-22T02:00:17Z\r\n", + ), + expErr: ErrMalformed, + }, + // Invalid key-count (too small) on last reply + { + resp: []byte( + "+OK 2\r\n" + + "6ba7b810-9dad-11d1-80b4-00c04fd430c4 12\r\n" + + "name ping\r\n" + + "ttr 1000\r\n" + + "ttl 60000\r\n" + + "payload-size 4\r\n" + + "payload ping\r\n" + + "max-attempts 0\r\n" + + "attempts 0\r\n" + + "max-fails 0\r\n" + + "fails 0\r\n" + + "priority 0\r\n" + + "state 0\r\n" + + "created 2016-08-22T01:50:51Z\r\n" + + "6ba7b810-9dad-11d1-80b4-00c04fd430c6 11\r\n" + + "name ping\r\n" + + "ttr 1000\r\n" + + "ttl 60000\r\n" + + "payload-size 4\r\n" + + "payload ping\r\n" + + "max-attempts 0\r\n" + + "attempts 0\r\n" + + "max-fails 0\r\n" + + "fails 0\r\n" + + "priority 0\r\n" + + "state 0\r\n" + + "created 2016-08-22T02:00:17Z\r\n", + ), + expErr: ErrMalformed, + }, + // Invalid key-count (too large) + { + resp: []byte( + "+OK 2\r\n" + + "6ba7b810-9dad-11d1-80b4-00c04fd430c4 13\r\n" + + "name ping\r\n" + + "ttr 1000\r\n" + + "ttl 60000\r\n" + + "payload-size 4\r\n" + + "payload ping\r\n" + + "max-attempts 0\r\n" + + "attempts 0\r\n" + + "max-fails 0\r\n" + + "fails 0\r\n" + + "priority 0\r\n" + + "state 0\r\n" + + "created 2016-08-22T01:50:51Z\r\n" + + "6ba7b810-9dad-11d1-80b4-00c04fd430c6 12\r\n" + + "name ping\r\n" + + "ttr 1000\r\n" + + "ttl 60000\r\n" + + "payload-size 4\r\n" + + "payload ping\r\n" + + "max-attempts 0\r\n" + + "attempts 0\r\n" + + "max-fails 0\r\n" + + "fails 0\r\n" + + "priority 0\r\n" + + "state 0\r\n" + + "created 2016-08-22T02:00:17Z\r\n", + ), + expErr: ErrMalformed, + }, + // Invalid key-count (too large) on last reply + { + resp: []byte( + "+OK 2\r\n" + + "6ba7b810-9dad-11d1-80b4-00c04fd430c4 12\r\n" + + "name ping\r\n" + + "ttr 1000\r\n" + + "ttl 60000\r\n" + + "payload-size 4\r\n" + + "payload ping\r\n" + + "max-attempts 0\r\n" + + "attempts 0\r\n" + + "max-fails 0\r\n" + + "fails 0\r\n" + + "priority 0\r\n" + + "state 0\r\n" + + "created 2016-08-22T01:50:51Z\r\n" + + "6ba7b810-9dad-11d1-80b4-00c04fd430c6 13\r\n" + + "name ping\r\n" + + "ttr 1000\r\n" + + "ttl 60000\r\n" + + "payload-size 4\r\n" + + "payload ping\r\n" + + "max-attempts 0\r\n" + + "attempts 0\r\n" + + "max-fails 0\r\n" + + "fails 0\r\n" + + "priority 0\r\n" + + "state 0\r\n" + + "created 2016-08-22T02:00:17Z\r\n", + ), + expErr: ErrMalformed, + }, + // Malformed " " line (no spaces) + { + resp: []byte( + "+OK 2\r\n" + + "6ba7b810-9dad-11d1-80b4-00c04fd430c4\r\n", + ), + expErr: ErrMalformed, + }, + // Malformed " " line (too many spaces) + { + resp: []byte( + "+OK 2\r\n" + + "6ba7b810-9dad-11d1-80b4-00c04fd430c4 12 abc\r\n", + ), + expErr: ErrMalformed, + }, + // Malformed " " line (key count not a number) + { + resp: []byte( + "+OK 2\r\n" + + "6ba7b810-9dad-11d1-80b4-00c04fd430c4 xy\r\n", + ), + expErr: ErrMalformed, + }, + // Malformed " " line (no spaces) + { + resp: []byte( + "+OK 2\r\n" + + "6ba7b810-9dad-11d1-80b4-00c04fd430c4 12\r\n" + + "nameping\r\n", + ), + expErr: ErrMalformed, + }, + // Malformed " " line (spaces in value) + { + resp: []byte( + "+OK 2\r\n" + + "6ba7b810-9dad-11d1-80b4-00c04fd430c4 12\r\n" + + "name pi ng\r\n", + ), + expErr: ErrMalformed, + }, + // Malformed "name " line (illegal characters in value) + { + resp: []byte( + "+OK 2\r\n" + + "6ba7b810-9dad-11d1-80b4-00c04fd430c4 12\r\n" + + "name pi*ng\r\n", + ), + expErr: ErrMalformed, + }, + // Invalid TTR + { + resp: []byte( + "+OK 2\r\n" + + "6ba7b810-9dad-11d1-80b4-00c04fd430c4 12\r\n" + + "ttr -1\r\n", + ), + expErr: ErrMalformed, + }, + // Invalid TTL + { + resp: []byte( + "+OK 2\r\n" + + "6ba7b810-9dad-11d1-80b4-00c04fd430c4 12\r\n" + + "ttl -1\r\n", + ), + expErr: ErrMalformed, + }, + // Invalid payload-size + { + resp: []byte( + "+OK 2\r\n" + + "6ba7b810-9dad-11d1-80b4-00c04fd430c4 12\r\n" + + "payload-size -1\r\n", + ), + expErr: ErrMalformed, + }, + // Invalid payload + { + resp: []byte( + "+OK 2\r\n" + + "6ba7b810-9dad-11d1-80b4-00c04fd430c4 12\r\n" + + "payload-size 10\r\n" + + "payload abc\r\n", + ), + expErr: ErrMalformed, + }, + // Invalid max-attempts + { + resp: []byte( + "+OK 2\r\n" + + "6ba7b810-9dad-11d1-80b4-00c04fd430c4 12\r\n" + + "max-attempts -1\r\n", + ), + expErr: ErrMalformed, + }, + // Invalid attempts + { + resp: []byte( + "+OK 2\r\n" + + "6ba7b810-9dad-11d1-80b4-00c04fd430c4 12\r\n" + + "attempts -1\r\n", + ), + expErr: ErrMalformed, + }, + // Invalid max-fails + { + resp: []byte( + "+OK 2\r\n" + + "6ba7b810-9dad-11d1-80b4-00c04fd430c4 12\r\n" + + "max-fails -1\r\n", + ), + expErr: ErrMalformed, + }, + // Invalid fails + { + resp: []byte( + "+OK 2\r\n" + + "6ba7b810-9dad-11d1-80b4-00c04fd430c4 12\r\n" + + "fails -1\r\n", + ), + expErr: ErrMalformed, + }, + // Invalid priority + { + resp: []byte( + "+OK 2\r\n" + + "6ba7b810-9dad-11d1-80b4-00c04fd430c4 12\r\n" + + "priority xy\r\n", + ), + expErr: ErrMalformed, + }, + // Invalid state + { + resp: []byte( + "+OK 2\r\n" + + "6ba7b810-9dad-11d1-80b4-00c04fd430c4 12\r\n" + + "state xy\r\n", + ), + expErr: ErrMalformed, + }, + // Invalid created + { + resp: []byte( + "+OK 2\r\n" + + "6ba7b810-9dad-11d1-80b4-00c04fd430c4 12\r\n" + + "created 20invalid16-08-22T02:00:17Z\r\n", + ), + expErr: ErrMalformed, + }, + } + tests = append(tests, invalidCommonErrorTests()...) + + for _, tt := range tests { + conn := &TestConn{ + rdr: bytes.NewBuffer(tt.resp), + wrt: bytes.NewBuffer([]byte("")), + } + client := NewClient(conn) + j, err := client.InspectJobs("ping", 0,10) + if j != nil || err == nil || tt.expErr == nil || err.Error() != tt.expErr.Error() { + t.Fatalf("Response mismatch, err=%q, expErr=%q", err, tt.expErr) + } + } +} + +func TestInspectJobsBadConnError(t *testing.T) { + conn := &TestBadWriteConn{} + client := NewClient(conn) + _, err := client.InspectJobs("ping", 0, 10) + if _, ok := err.(*NetError); !ok { + t.Fatalf("Error mismatch, err=%+v", err) + } +} + type RespErrTestCase struct { resp []byte expErr error diff --git a/job.go b/job.go index b57a2cb..adceb01 100644 --- a/job.go +++ b/job.go @@ -1,5 +1,7 @@ package workq +import "time" + // FgJob is executed by the "run" command. // Describes a foreground job specification. type FgJob struct { @@ -51,3 +53,12 @@ type JobResult struct { Success bool Result []byte } + +// InspectedJob is returned by the "inspect jobs" command. +type InspectedJob struct { + BgJob + Attempts int // Number of already made attempts. + Fails int // Number of already occured fails. + State int // Current state of the job. + Created time.Time // Time of job creation +} \ No newline at end of file