Skip to content

Commit 1ef2b1f

Browse files
committed
feature: JOBS v3, []byte payloads instead of string
Signed-off-by: Valery Piashchynski <[email protected]>
1 parent 67f2cfc commit 1ef2b1f

File tree

6 files changed

+184
-5
lines changed

6 files changed

+184
-5
lines changed

build/jobs/v1/jobs.pb.go

Lines changed: 4 additions & 4 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

plugins/v3/jobs/driver.go

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
package jobs
2+
3+
import (
4+
"context"
5+
)
6+
7+
// Driver represents the interface for a single jobs driver
8+
type Driver interface {
9+
// Push pushes the job to the underlying driver
10+
Push(ctx context.Context, msg Message) error
11+
// Run starts consuming the pipeline
12+
Run(ctx context.Context, pipeline Pipeline) error
13+
// Stop stops the consumer and closes the underlying connection
14+
Stop(ctx context.Context) error
15+
// Pause pauses the jobs consuming (while still allowing job pushing)
16+
Pause(ctx context.Context, pipeline string) error
17+
// Resume resumes the consumer
18+
Resume(ctx context.Context, pipeline string) error
19+
// State returns information about the driver state
20+
State(ctx context.Context) (*State, error)
21+
}
22+
23+
// Commander provides the ability to send a command to the Jobs plugin
24+
type Commander interface {
25+
// Command returns the command name
26+
Command() Command
27+
// Pipeline returns the associated command pipeline
28+
Pipeline() string
29+
}
30+
31+
// Constructor constructs Consumer interface. Endure abstraction.
32+
type Constructor interface {
33+
// Name returns the name of the driver
34+
Name() string
35+
// DriverFromConfig constructs a driver (e.g. kafka, amqp) from the configuration using the provided configKey
36+
DriverFromConfig(configKey string, queue Queue, pipeline Pipeline, cmder chan<- Commander) (Driver, error)
37+
// DriverFromPipeline constructs a driver (e.g. kafka, amqp) from the pipeline. All configuration is provided by the pipeline
38+
DriverFromPipeline(pipe Pipeline, queue Queue, cmder chan<- Commander) (Driver, error)
39+
}

plugins/v3/jobs/job.go

Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
package jobs
2+
3+
import (
4+
pq "github.com/roadrunner-server/api/v4/plugins/v3/priority_queue"
5+
)
6+
7+
// Queue represents JOBS plugin queue with it's elements types inside
8+
type Queue interface {
9+
// Remove removes element with provided ID (if exists) and returns that elements
10+
Remove(id string) []Job
11+
// Insert adds an item to the queue
12+
Insert(item Job)
13+
// ExtractMin returns the item with the highest priority (less value is the highest priority)
14+
ExtractMin() Job
15+
// Len returns the number of items in the queue
16+
Len() uint64
17+
}
18+
19+
// Job represents a binary heap item
20+
type Job interface {
21+
pq.Item
22+
// Ack acknowledges the item after processing
23+
Ack() error
24+
// Nack discards the item
25+
Nack() error
26+
// Requeue puts the message back to the queue with an optional delay
27+
Requeue(headers map[string][]string, delay int64) error
28+
// Body returns the payload associated with the item
29+
Body() []byte
30+
// Context returns any meta-information associated with the item
31+
Context() ([]byte, error)
32+
// Headers returns the metadata for the item
33+
Headers() map[string][]string
34+
}
35+
36+
// Message represents the protobuf message received from the RPC call
37+
type Message interface {
38+
pq.Item
39+
KafkaOptions
40+
// Name returns the name of the Job
41+
Name() string
42+
// Payload returns the data associated with the job
43+
Payload() []byte
44+
// Delay returns the delay time for the Job (not supported by all drivers)
45+
Delay() int64
46+
// AutoAck returns the autocommit status for the Job
47+
AutoAck() bool
48+
// UpdatePriority sets the priority of the Job. Priority is optional but cannot be set to 0.
49+
// The default priority is 10
50+
UpdatePriority(int64)
51+
// Headers returns the metadata for the item
52+
Headers() map[string][]string
53+
}
54+
55+
// KAFKA options (leave them empty for other drivers)
56+
type KafkaOptions interface {
57+
// Offset returns the offset associated with the Job
58+
Offset() int64
59+
// Partition returns the partition associated with the Job
60+
Partition() int32
61+
// Topic returns the topic associated with the Job
62+
Topic() string
63+
// Metadata returns the metadata associated with the Job
64+
Metadata() string
65+
}
66+
67+
type Pipeline interface {
68+
// With sets a pipeline value
69+
With(name string, value interface{})
70+
// Name returns the pipeline name.
71+
Name() string
72+
// Driver returns the driver associated with the pipeline.
73+
Driver() string
74+
// Has checks if a value is present in the pipeline.
75+
Has(name string) bool
76+
// String returns the value of an option as a string or the default value.
77+
String(name string, d string) string
78+
// Int returns the value of an option as an int or the default value.
79+
Int(name string, d int) int
80+
// Bool returns the value of an option as a bool or the default value.
81+
Bool(name string, d bool) bool
82+
// Map returns the nested map value or an empty config.
83+
// This might be used for SQS attributes or tags, for example
84+
Map(name string, out map[string]string) error
85+
// Priority returns the default pipeline priority
86+
Priority() int64
87+
// Get is used to retrieve the data associated with a key
88+
Get(key string) interface{}
89+
}

plugins/v3/jobs/state.go

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
package jobs
2+
3+
// constant keys to pack/unpack messages from different drivers
4+
const (
5+
RRID string = "rr_id"
6+
RRJob string = "rr_job"
7+
RRHeaders string = "rr_headers"
8+
RRPipeline string = "rr_pipeline"
9+
RRDelay string = "rr_delay"
10+
RRPriority string = "rr_priority"
11+
RRAutoAck string = "rr_auto_ack"
12+
)
13+
14+
type Command string
15+
16+
const (
17+
Stop Command = "stop"
18+
)
19+
20+
// State represents job's state
21+
type State struct {
22+
// Pipeline name
23+
Pipeline string
24+
// Driver name
25+
Driver string
26+
// Queue name (tube for the beanstalk)
27+
Queue string
28+
// Active jobs which are consumed from the driver but not handled by the PHP worker yet
29+
Active int64
30+
// Delayed jobs
31+
Delayed int64
32+
// Reserved jobs which are in the driver but not consumed yet
33+
Reserved int64
34+
// Status - 1 Ready, 0 - Paused
35+
Ready bool
36+
// New in 2.10.5, pipeline priority
37+
Priority uint64
38+
// ErrorMessage New in 2023.1
39+
ErrorMessage string
40+
}
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
package priority_queue
2+
3+
// Item interface represents the base meta-information which any priority queue message must have
4+
type Item interface {
5+
// ID returns a unique identifier for the item
6+
ID() string
7+
// GroupID returns the group associated with the item, used to remove all items with the same groupID
8+
GroupID() string
9+
// Priority returns the priority level used to sort the item
10+
Priority() int64
11+
}

proto/jobs/v1/jobs.proto

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ message Job {
2727
// unique job id
2828
string id = 2;
2929
// payload, might be embedded json or just byte-string
30-
string payload = 3;
30+
bytes payload = 3;
3131
// headers map[string][]string
3232
map<string, HeaderValue> headers = 4;
3333
// job options, contains common and driver specific fields

0 commit comments

Comments
 (0)