Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add finished handler to server config #865

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 34 additions & 3 deletions processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ type processor struct {
errHandler ErrorHandler
shutdownTimeout time.Duration

finishedHandler FinishedHandler

// channel via which to send sync requests to syncer.
syncRequestCh chan<- *syncRequest

Expand Down Expand Up @@ -85,6 +87,7 @@ type processorParams struct {
queues map[string]int
strictPriority bool
errHandler ErrorHandler
finishedHandler FinishedHandler
shutdownTimeout time.Duration
starting chan<- *workerInfo
finished chan<- *base.TaskMessage
Expand Down Expand Up @@ -115,6 +118,7 @@ func newProcessor(params processorParams) *processor {
quit: make(chan struct{}),
abort: make(chan struct{}),
errHandler: params.errHandler,
finishedHandler: params.finishedHandler,
handler: HandlerFunc(func(ctx context.Context, t *Task) error { return fmt.Errorf("handler not set") }),
shutdownTimeout: params.shutdownTimeout,
starting: params.starting,
Expand Down Expand Up @@ -291,12 +295,21 @@ func (p *processor) markAsComplete(l *base.Lease, msg *base.TaskMessage) {
p.logger.Warnf("%s; Will retry syncing", errMsg)
p.syncRequestCh <- &syncRequest{
fn: func() error {
return p.broker.MarkAsComplete(ctx, msg)
if err := p.broker.MarkAsComplete(ctx, msg); err != nil {
return err
}
if p.finishedHandler != nil {
p.finishedHandler.HandleFinished(newTaskInfo(msg, base.TaskStateArchived, time.Time{}, nil))
}
return nil
},
errMsg: errMsg,
deadline: l.Deadline(),
}
}
if p.finishedHandler != nil {
p.finishedHandler.HandleFinished(newTaskInfo(msg, base.TaskStateArchived, time.Time{}, nil))
}
}

func (p *processor) markAsDone(l *base.Lease, msg *base.TaskMessage) {
Expand All @@ -311,12 +324,21 @@ func (p *processor) markAsDone(l *base.Lease, msg *base.TaskMessage) {
p.logger.Warnf("%s; Will retry syncing", errMsg)
p.syncRequestCh <- &syncRequest{
fn: func() error {
return p.broker.Done(ctx, msg)
if err := p.broker.Done(ctx, msg); err != nil {
return err
}
if p.finishedHandler != nil {
p.finishedHandler.HandleFinished(newTaskInfo(msg, base.TaskStateArchived, time.Time{}, nil))
}
return nil
},
errMsg: errMsg,
deadline: l.Deadline(),
}
}
if p.finishedHandler != nil {
p.finishedHandler.HandleFinished(newTaskInfo(msg, base.TaskStateArchived, time.Time{}, nil))
}
}

// SkipRetry is used as a return value from Handler.ProcessTask to indicate that
Expand Down Expand Up @@ -374,12 +396,21 @@ func (p *processor) archive(l *base.Lease, msg *base.TaskMessage, e error) {
p.logger.Warnf("%s; Will retry syncing", errMsg)
p.syncRequestCh <- &syncRequest{
fn: func() error {
return p.broker.Archive(ctx, msg, e.Error())
if err := p.broker.Archive(ctx, msg, e.Error()); err != nil {
return err
}
if p.finishedHandler != nil {
p.finishedHandler.HandleFinished(newTaskInfo(msg, base.TaskStateArchived, time.Time{}, nil))
}
return nil
},
errMsg: errMsg,
deadline: l.Deadline(),
}
}
if p.finishedHandler != nil {
p.finishedHandler.HandleFinished(newTaskInfo(msg, base.TaskStateArchived, time.Time{}, nil))
}
}

// queues returns a list of queues to query.
Expand Down
22 changes: 21 additions & 1 deletion server.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ type Config struct {
// If BaseContext is nil, the default is context.Background().
// If this is defined, then it MUST return a non-nil context
BaseContext func() context.Context

// TaskCheckInterval specifies the interval between checks for new tasks to process when all queues are empty.
//
// If unset, zero or a negative value, the interval is set to 1 second.
Expand Down Expand Up @@ -183,6 +183,11 @@ type Config struct {

ErrorHandler ErrorHandler

// FinishedHandler handles a task that has been processed.
//
// FinishedHandler is called when the task status becomes completed or archived.
FinishedHandler FinishedHandler

// Logger specifies the logger used by the server instance.
//
// If unset, default logger is used.
Expand Down Expand Up @@ -275,6 +280,20 @@ func (fn ErrorHandlerFunc) HandleError(ctx context.Context, task *Task, err erro
fn(ctx, task, err)
}

// An FinishedHandler handles a task that has been processed.
type FinishedHandler interface {
HandleFinished(task *TaskInfo)
}

// The FinishedHandlerFunc type is an adapter to allow the use of ordinary functions as a FinishedHandler.
// If f is a function with the appropriate signature, FinishedHandlerFunc(f) is a FinishedHandler that calls f.
type FinishedHandlerFunc func(task *TaskInfo)

// HandleFinished calls fn(ctx, task, err)
func (fn FinishedHandlerFunc) HandleFinished(task *TaskInfo) {
fn(task)
}

// RetryDelayFunc calculates the retry delay duration for a failed task given
// the retry count, error, and the task.
//
Expand Down Expand Up @@ -529,6 +548,7 @@ func NewServer(r RedisConnOpt, cfg Config) *Server {
queues: queues,
strictPriority: cfg.StrictPriority,
errHandler: cfg.ErrorHandler,
finishedHandler: cfg.FinishedHandler,
shutdownTimeout: shutdownTimeout,
starting: starting,
finished: finished,
Expand Down
Loading