Skip to content

Commit

Permalink
Cleanup queuePublished in RemoveQueue
Browse files Browse the repository at this point in the history
  • Loading branch information
pior committed Oct 29, 2024
1 parent 2e99b71 commit 24da63e
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 1 deletion.
3 changes: 2 additions & 1 deletion internal/rdb/inspect.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@ import (
"strings"
"time"

"github.com/redis/go-redis/v9"
"github.com/hibiken/asynq/internal/base"
"github.com/hibiken/asynq/internal/errors"
"github.com/redis/go-redis/v9"
"github.com/spf13/cast"
)

Expand Down Expand Up @@ -1832,6 +1832,7 @@ func (r *RDB) RemoveQueue(qname string, force bool) error {
if err := r.client.SRem(context.Background(), base.AllQueues, qname).Err(); err != nil {
return errors.E(op, errors.Unknown, err)
}
r.queuesPublished.Delete(qname)
return nil
case -1:
return errors.E(op, errors.NotFound, &errors.QueueNotEmptyError{Queue: qname})
Expand Down
29 changes: 29 additions & 0 deletions internal/rdb/rdb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,35 @@ func TestEnqueueQueueCache(t *testing.T) {
if _, ok := r.queuesPublished.Load(t1.Queue); !ok {
t.Fatalf("%q is not cached in queuesPublished", t1.Queue)
}

t.Run("remove-queue", func(t *testing.T) {
err := r.RemoveQueue(t1.Queue, true)
if err != nil {
t.Errorf("(*RDB).RemoveQueue(%q, %t) = %v, want nil", t1.Queue, true, err)
}

if _, ok := r.queuesPublished.Load(t1.Queue); ok {
t.Fatalf("%q is still cached in queuesPublished", t1.Queue)
}

if r.client.SIsMember(context.Background(), base.AllQueues, t1.Queue).Val() {
t.Fatalf("%q is a member of SET %q", t1.Queue, base.AllQueues)
}

err = r.Enqueue(context.Background(), t1)
if err != nil {
t.Fatalf("(*RDB).Enqueue(msg) = %v, want nil", err)
}

// Check queue is in the AllQueues set.
if !r.client.SIsMember(context.Background(), base.AllQueues, t1.Queue).Val() {
t.Fatalf("%q is not a member of SET %q", t1.Queue, base.AllQueues)
}

if _, ok := r.queuesPublished.Load(t1.Queue); !ok {
t.Fatalf("%q is not cached in queuesPublished", t1.Queue)
}
})
}

func TestEnqueueUnique(t *testing.T) {
Expand Down

0 comments on commit 24da63e

Please sign in to comment.