Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add custom redis prefix configuration #647

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 16 additions & 5 deletions asynq.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,11 @@ import (
"net/url"
"strconv"
"strings"
"sync"
"time"

"github.com/redis/go-redis/v9"
"github.com/hibiken/asynq/internal/base"
"github.com/redis/go-redis/v9"
)

// Task represents a unit of work to be performed.
Expand Down Expand Up @@ -438,10 +439,11 @@ func (opt RedisClusterClientOpt) MakeRedisClient() interface{} {
//
// Three URI schemes are supported, which are redis:, rediss:, redis-socket:, and redis-sentinel:.
// Supported formats are:
// redis://[:password@]host[:port][/dbnumber]
// rediss://[:password@]host[:port][/dbnumber]
// redis-socket://[:password@]path[?db=dbnumber]
// redis-sentinel://[:password@]host1[:port][,host2:[:port]][,hostN:[:port]][?master=masterName]
//
// redis://[:password@]host[:port][/dbnumber]
// rediss://[:password@]host[:port][/dbnumber]
// redis-socket://[:password@]path[?db=dbnumber]
// redis-sentinel://[:password@]host1[:port][,host2:[:port]][,hostN:[:port]][?master=masterName]
func ParseRedisURI(uri string) (RedisConnOpt, error) {
u, err := url.Parse(uri)
if err != nil {
Expand Down Expand Up @@ -545,3 +547,12 @@ func (w *ResultWriter) Write(data []byte) (n int, err error) {
func (w *ResultWriter) TaskID() string {
return w.id
}

var globalPrefixOnce sync.Once

// SetGlobalPrefix sets the global prefix for all redis keys used by asynq.
func SetGlobalPrefix(prefix string) {
globalPrefixOnce.Do(func() {
base.GlobalPrefix = prefix
})
}
2 changes: 1 addition & 1 deletion asynq_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,11 @@ import (
"strings"
"testing"

"github.com/redis/go-redis/v9"
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"github.com/hibiken/asynq/internal/log"
h "github.com/hibiken/asynq/internal/testutil"
"github.com/redis/go-redis/v9"
)

//============================================================================
Expand Down
8 changes: 4 additions & 4 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,11 @@ import (
"strings"
"time"

"github.com/redis/go-redis/v9"
"github.com/google/uuid"
"github.com/hibiken/asynq/internal/base"
"github.com/hibiken/asynq/internal/errors"
"github.com/hibiken/asynq/internal/rdb"
"github.com/redis/go-redis/v9"
)

// A Client is responsible for scheduling tasks.
Expand Down Expand Up @@ -150,9 +150,9 @@ func (t deadlineOption) Value() interface{} { return time.Time(t) }
// TTL duration must be greater than or equal to 1 second.
//
// Uniqueness of a task is based on the following properties:
// - Task Type
// - Task Payload
// - Queue Name
// - Task Type
// - Task Payload
// - Queue Name
func Unique(ttl time.Duration) Option {
return uniqueOption(ttl)
}
Expand Down
2 changes: 1 addition & 1 deletion inspector.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,10 @@ import (
"strings"
"time"

"github.com/redis/go-redis/v9"
"github.com/hibiken/asynq/internal/base"
"github.com/hibiken/asynq/internal/errors"
"github.com/hibiken/asynq/internal/rdb"
"github.com/redis/go-redis/v9"
)

// Inspector is a client interface to inspect and mutate the state of
Expand Down
10 changes: 5 additions & 5 deletions inspector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func TestInspectorQueues(t *testing.T) {
for _, tc := range tests {
h.FlushDB(t, r)
for _, qname := range tc.queues {
if err := r.SAdd(context.Background(), base.AllQueues, qname).Err(); err != nil {
if err := r.SAdd(context.Background(), base.AllQueues(), qname).Err(); err != nil {
t.Fatalf("could not initialize all queue set: %v", err)
}
}
Expand Down Expand Up @@ -138,8 +138,8 @@ func TestInspectorDeleteQueue(t *testing.T) {
tc.qname, tc.force, err)
continue
}
if r.SIsMember(context.Background(), base.AllQueues, tc.qname).Val() {
t.Errorf("%q is a member of %q", tc.qname, base.AllQueues)
if r.SIsMember(context.Background(), base.AllQueues(), tc.qname).Val() {
t.Errorf("%q is a member of %q", tc.qname, base.AllQueues())
}
}
}
Expand Down Expand Up @@ -429,7 +429,7 @@ func TestInspectorHistory(t *testing.T) {
for _, tc := range tests {
h.FlushDB(t, r)

r.SAdd(context.Background(), base.AllQueues, tc.qname)
r.SAdd(context.Background(), base.AllQueues(), tc.qname)
// populate last n days data
for i := 0; i < tc.n; i++ {
ts := now.Add(-time.Duration(i) * 24 * time.Hour)
Expand Down Expand Up @@ -1196,7 +1196,7 @@ func TestInspectorListAggregatingTasks(t *testing.T) {
for _, tc := range tests {
h.FlushDB(t, r)
h.SeedTasks(t, r, fxt.tasks)
h.SeedRedisSet(t, r, base.AllQueues, fxt.allQueues)
h.SeedRedisSet(t, r, base.AllQueues(), fxt.allQueues)
h.SeedRedisSets(t, r, fxt.allGroups)
h.SeedRedisZSets(t, r, fxt.groups)

Expand Down
42 changes: 28 additions & 14 deletions internal/base/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,11 @@ import (
"sync"
"time"

"github.com/redis/go-redis/v9"
"github.com/golang/protobuf/ptypes"
"github.com/hibiken/asynq/internal/errors"
pb "github.com/hibiken/asynq/internal/proto"
"github.com/hibiken/asynq/internal/timeutil"
"github.com/redis/go-redis/v9"
"google.golang.org/protobuf/proto"
)

Expand All @@ -31,14 +31,28 @@ const DefaultQueueName = "default"
// DefaultQueue is the redis key for the default queue.
var DefaultQueue = PendingKey(DefaultQueueName)

// Global Redis keys.
const (
AllServers = "asynq:servers" // ZSET
AllWorkers = "asynq:workers" // ZSET
AllSchedulers = "asynq:schedulers" // ZSET
AllQueues = "asynq:queues" // SET
CancelChannel = "asynq:cancel" // PubSub channel
)
// GlobalPrefix is the prefix for all redis keys used by asynq.
var GlobalPrefix = "asynq"

func AllServers() string {
return fmt.Sprintf("%s:servers", GlobalPrefix) // ZSET
}

func AllWorkers() string {
return fmt.Sprintf("%s:workers", GlobalPrefix) // ZSET
}

func AllSchedulers() string {
return fmt.Sprintf("%s:schedulers", GlobalPrefix) // ZSET
}

func AllQueues() string {
return fmt.Sprintf("%s:queues", GlobalPrefix) // SET
}

func CancelChannel() string {
return fmt.Sprintf("%s:cancel", GlobalPrefix) // PubSub channel
}

// TaskState denotes the state of a task.
type TaskState int
Expand Down Expand Up @@ -104,7 +118,7 @@ func ValidateQueueName(qname string) error {

// QueueKeyPrefix returns a prefix for all keys in the given queue.
func QueueKeyPrefix(qname string) string {
return fmt.Sprintf("asynq:{%s}:", qname)
return fmt.Sprintf("%s:{%s}:", GlobalPrefix, qname)
}

// TaskKeyPrefix returns a prefix for task key.
Expand Down Expand Up @@ -178,22 +192,22 @@ func FailedKey(qname string, t time.Time) string {

// ServerInfoKey returns a redis key for process info.
func ServerInfoKey(hostname string, pid int, serverID string) string {
return fmt.Sprintf("asynq:servers:{%s:%d:%s}", hostname, pid, serverID)
return fmt.Sprintf("%s:servers:{%s:%d:%s}", GlobalPrefix, hostname, pid, serverID)
}

// WorkersKey returns a redis key for the workers given hostname, pid, and server ID.
func WorkersKey(hostname string, pid int, serverID string) string {
return fmt.Sprintf("asynq:workers:{%s:%d:%s}", hostname, pid, serverID)
return fmt.Sprintf("%s:workers:{%s:%d:%s}", GlobalPrefix, hostname, pid, serverID)
}

// SchedulerEntriesKey returns a redis key for the scheduler entries given scheduler ID.
func SchedulerEntriesKey(schedulerID string) string {
return fmt.Sprintf("asynq:schedulers:{%s}", schedulerID)
return fmt.Sprintf("%s:schedulers:{%s}", GlobalPrefix, schedulerID)
}

// SchedulerHistoryKey returns a redis key for the scheduler's history for the given entry.
func SchedulerHistoryKey(entryID string) string {
return fmt.Sprintf("asynq:scheduler_history:%s", entryID)
return fmt.Sprintf("%s:scheduler_history:%s", GlobalPrefix, entryID)
}

// UniqueKey returns a redis key with the given type, payload, and queue name.
Expand Down
14 changes: 7 additions & 7 deletions internal/rdb/inspect.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,15 @@ import (
"strings"
"time"

"github.com/redis/go-redis/v9"
"github.com/hibiken/asynq/internal/base"
"github.com/hibiken/asynq/internal/errors"
"github.com/redis/go-redis/v9"
"github.com/spf13/cast"
)

// AllQueues returns a list of all queue names.
func (r *RDB) AllQueues() ([]string, error) {
return r.client.SMembers(context.Background(), base.AllQueues).Result()
return r.client.SMembers(context.Background(), base.AllQueues()).Result()
}

// Stats represents a state of queues at a certain time.
Expand Down Expand Up @@ -804,7 +804,7 @@ func (r *RDB) ListAggregating(qname, gname string, pgn Pagination) ([]*base.Task

// Reports whether a queue with the given name exists.
func (r *RDB) queueExists(qname string) (bool, error) {
return r.client.SIsMember(context.Background(), base.AllQueues, qname).Result()
return r.client.SIsMember(context.Background(), base.AllQueues(), qname).Result()
}

// KEYS[1] -> key for ids set (e.g. asynq:{<qname>}:scheduled)
Expand Down Expand Up @@ -1829,7 +1829,7 @@ func (r *RDB) RemoveQueue(qname string, force bool) error {
}
switch n {
case 1:
if err := r.client.SRem(context.Background(), base.AllQueues, qname).Err(); err != nil {
if err := r.client.SRem(context.Background(), base.AllQueues(), qname).Err(); err != nil {
return errors.E(op, errors.Unknown, err)
}
return nil
Expand All @@ -1852,7 +1852,7 @@ return keys`)
// ListServers returns the list of server info.
func (r *RDB) ListServers() ([]*base.ServerInfo, error) {
now := r.clock.Now()
res, err := listServerKeysCmd.Run(context.Background(), r.client, []string{base.AllServers}, now.Unix()).Result()
res, err := listServerKeysCmd.Run(context.Background(), r.client, []string{base.AllServers()}, now.Unix()).Result()
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -1886,7 +1886,7 @@ return keys`)
func (r *RDB) ListWorkers() ([]*base.WorkerInfo, error) {
var op errors.Op = "rdb.ListWorkers"
now := r.clock.Now()
res, err := listWorkersCmd.Run(context.Background(), r.client, []string{base.AllWorkers}, now.Unix()).Result()
res, err := listWorkersCmd.Run(context.Background(), r.client, []string{base.AllWorkers()}, now.Unix()).Result()
if err != nil {
return nil, errors.E(op, errors.Unknown, err)
}
Expand Down Expand Up @@ -1921,7 +1921,7 @@ return keys`)
// ListSchedulerEntries returns the list of scheduler entries.
func (r *RDB) ListSchedulerEntries() ([]*base.SchedulerEntry, error) {
now := r.clock.Now()
res, err := listSchedulerKeysCmd.Run(context.Background(), r.client, []string{base.AllSchedulers}, now.Unix()).Result()
res, err := listSchedulerKeysCmd.Run(context.Background(), r.client, []string{base.AllSchedulers()}, now.Unix()).Result()
if err != nil {
return nil, err
}
Expand Down
26 changes: 13 additions & 13 deletions internal/rdb/inspect_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func TestAllQueues(t *testing.T) {
for _, tc := range tests {
h.FlushDB(t, r.client)
for _, qname := range tc.queues {
if err := r.client.SAdd(context.Background(), base.AllQueues, qname).Err(); err != nil {
if err := r.client.SAdd(context.Background(), base.AllQueues(), qname).Err(); err != nil {
t.Fatalf("could not initialize all queue set: %v", err)
}
}
Expand Down Expand Up @@ -284,7 +284,7 @@ func TestCurrentStats(t *testing.T) {
t.Fatal(err)
}
}
h.SeedRedisSet(t, r.client, base.AllQueues, tc.allQueues)
h.SeedRedisSet(t, r.client, base.AllQueues(), tc.allQueues)
h.SeedRedisSets(t, r.client, tc.allGroups)
h.SeedTasks(t, r.client, tc.tasks)
h.SeedRedisLists(t, r.client, tc.pending)
Expand Down Expand Up @@ -357,7 +357,7 @@ func TestHistoricalStats(t *testing.T) {
for _, tc := range tests {
h.FlushDB(t, r.client)

r.client.SAdd(context.Background(), base.AllQueues, tc.qname)
r.client.SAdd(context.Background(), base.AllQueues(), tc.qname)
// populate last n days data
for i := 0; i < tc.n; i++ {
ts := now.Add(-time.Duration(i) * 24 * time.Hour)
Expand Down Expand Up @@ -1637,7 +1637,7 @@ func TestListAggregating(t *testing.T) {

for _, tc := range tests {
h.FlushDB(t, r.client)
h.SeedRedisSet(t, r.client, base.AllQueues, fxt.allQueues)
h.SeedRedisSet(t, r.client, base.AllQueues(), fxt.allQueues)
h.SeedRedisSets(t, r.client, fxt.allGroups)
h.SeedTasks(t, r.client, fxt.tasks)
h.SeedRedisZSets(t, r.client, fxt.groups)
Expand Down Expand Up @@ -1751,7 +1751,7 @@ func TestListAggregatingPagination(t *testing.T) {

for _, tc := range tests {
h.FlushDB(t, r.client)
h.SeedRedisSet(t, r.client, base.AllQueues, fxt.allQueues)
h.SeedRedisSet(t, r.client, base.AllQueues(), fxt.allQueues)
h.SeedRedisSets(t, r.client, fxt.allGroups)
h.SeedTasks(t, r.client, fxt.tasks)
h.SeedRedisZSets(t, r.client, fxt.groups)
Expand Down Expand Up @@ -2072,7 +2072,7 @@ func TestRunAggregatingTask(t *testing.T) {
for _, tc := range tests {
h.FlushDB(t, r.client)
h.SeedTasks(t, r.client, fxt.tasks)
h.SeedRedisSet(t, r.client, base.AllQueues, fxt.allQueues)
h.SeedRedisSet(t, r.client, base.AllQueues(), fxt.allQueues)
h.SeedRedisSets(t, r.client, fxt.allGroups)
h.SeedRedisZSets(t, r.client, fxt.groups)

Expand Down Expand Up @@ -2764,7 +2764,7 @@ func TestRunAllAggregatingTasks(t *testing.T) {
for _, tc := range tests {
h.FlushDB(t, r.client)
h.SeedTasks(t, r.client, fxt.tasks)
h.SeedRedisSet(t, r.client, base.AllQueues, fxt.allQueues)
h.SeedRedisSet(t, r.client, base.AllQueues(), fxt.allQueues)
h.SeedRedisSets(t, r.client, fxt.allGroups)
h.SeedRedisZSets(t, r.client, fxt.groups)

Expand Down Expand Up @@ -3077,7 +3077,7 @@ func TestArchiveAggregatingTask(t *testing.T) {
for _, tc := range tests {
h.FlushDB(t, r.client)
h.SeedTasks(t, r.client, fxt.tasks)
h.SeedRedisSet(t, r.client, base.AllQueues, fxt.allQueues)
h.SeedRedisSet(t, r.client, base.AllQueues(), fxt.allQueues)
h.SeedRedisSets(t, r.client, fxt.allGroups)
h.SeedRedisZSets(t, r.client, fxt.groups)

Expand Down Expand Up @@ -3564,7 +3564,7 @@ func TestArchiveAllAggregatingTasks(t *testing.T) {
for _, tc := range tests {
h.FlushDB(t, r.client)
h.SeedTasks(t, r.client, fxt.tasks)
h.SeedRedisSet(t, r.client, base.AllQueues, fxt.allQueues)
h.SeedRedisSet(t, r.client, base.AllQueues(), fxt.allQueues)
h.SeedRedisSets(t, r.client, fxt.allGroups)
h.SeedRedisZSets(t, r.client, fxt.groups)

Expand Down Expand Up @@ -4190,7 +4190,7 @@ func TestDeleteAggregatingTask(t *testing.T) {
for _, tc := range tests {
h.FlushDB(t, r.client)
h.SeedTasks(t, r.client, fxt.tasks)
h.SeedRedisSet(t, r.client, base.AllQueues, fxt.allQueues)
h.SeedRedisSet(t, r.client, base.AllQueues(), fxt.allQueues)
h.SeedRedisSets(t, r.client, fxt.allGroups)
h.SeedRedisZSets(t, r.client, fxt.groups)

Expand Down Expand Up @@ -4825,7 +4825,7 @@ func TestDeleteAllAggregatingTasks(t *testing.T) {
for _, tc := range tests {
h.FlushDB(t, r.client)
h.SeedTasks(t, r.client, fxt.tasks)
h.SeedRedisSet(t, r.client, base.AllQueues, fxt.allQueues)
h.SeedRedisSet(t, r.client, base.AllQueues(), fxt.allQueues)
h.SeedRedisSets(t, r.client, fxt.allGroups)
h.SeedRedisZSets(t, r.client, fxt.groups)

Expand Down Expand Up @@ -5013,8 +5013,8 @@ func TestRemoveQueue(t *testing.T) {
tc.qname, tc.force, err)
continue
}
if r.client.SIsMember(context.Background(), base.AllQueues, tc.qname).Val() {
t.Errorf("%q is a member of %q", tc.qname, base.AllQueues)
if r.client.SIsMember(context.Background(), base.AllQueues(), tc.qname).Val() {
t.Errorf("%q is a member of %q", tc.qname, base.AllQueues())
}

keys := []string{
Expand Down
Loading