Skip to content

Commit

Permalink
feat: add finished handler to server
Browse files Browse the repository at this point in the history
  • Loading branch information
hungtcs committed Apr 19, 2024
1 parent 8df0bfa commit 1e13014
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 4 deletions.
37 changes: 34 additions & 3 deletions processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ type processor struct {
errHandler ErrorHandler
shutdownTimeout time.Duration

finishedHandler FinishedHandler

// channel via which to send sync requests to syncer.
syncRequestCh chan<- *syncRequest

Expand Down Expand Up @@ -85,6 +87,7 @@ type processorParams struct {
queues map[string]int
strictPriority bool
errHandler ErrorHandler
finishedHandler FinishedHandler
shutdownTimeout time.Duration
starting chan<- *workerInfo
finished chan<- *base.TaskMessage
Expand Down Expand Up @@ -115,6 +118,7 @@ func newProcessor(params processorParams) *processor {
quit: make(chan struct{}),
abort: make(chan struct{}),
errHandler: params.errHandler,
finishedHandler: params.finishedHandler,
handler: HandlerFunc(func(ctx context.Context, t *Task) error { return fmt.Errorf("handler not set") }),
shutdownTimeout: params.shutdownTimeout,
starting: params.starting,
Expand Down Expand Up @@ -291,12 +295,21 @@ func (p *processor) markAsComplete(l *base.Lease, msg *base.TaskMessage) {
p.logger.Warnf("%s; Will retry syncing", errMsg)
p.syncRequestCh <- &syncRequest{
fn: func() error {
return p.broker.MarkAsComplete(ctx, msg)
if err := p.broker.MarkAsComplete(ctx, msg); err != nil {
return err
}
if p.finishedHandler != nil {
p.finishedHandler.HandleFinished(newTaskInfo(msg, base.TaskStateArchived, time.Time{}, nil))
}
return nil
},
errMsg: errMsg,
deadline: l.Deadline(),
}
}
if p.finishedHandler != nil {
p.finishedHandler.HandleFinished(newTaskInfo(msg, base.TaskStateArchived, time.Time{}, nil))
}
}

func (p *processor) markAsDone(l *base.Lease, msg *base.TaskMessage) {
Expand All @@ -311,12 +324,21 @@ func (p *processor) markAsDone(l *base.Lease, msg *base.TaskMessage) {
p.logger.Warnf("%s; Will retry syncing", errMsg)
p.syncRequestCh <- &syncRequest{
fn: func() error {
return p.broker.Done(ctx, msg)
if err := p.broker.Done(ctx, msg); err != nil {
return err
}
if p.finishedHandler != nil {
p.finishedHandler.HandleFinished(newTaskInfo(msg, base.TaskStateArchived, time.Time{}, nil))
}
return nil
},
errMsg: errMsg,
deadline: l.Deadline(),
}
}
if p.finishedHandler != nil {
p.finishedHandler.HandleFinished(newTaskInfo(msg, base.TaskStateArchived, time.Time{}, nil))
}
}

// SkipRetry is used as a return value from Handler.ProcessTask to indicate that
Expand Down Expand Up @@ -374,12 +396,21 @@ func (p *processor) archive(l *base.Lease, msg *base.TaskMessage, e error) {
p.logger.Warnf("%s; Will retry syncing", errMsg)
p.syncRequestCh <- &syncRequest{
fn: func() error {
return p.broker.Archive(ctx, msg, e.Error())
if err := p.broker.Archive(ctx, msg, e.Error()); err != nil {
return err
}
if p.finishedHandler != nil {
p.finishedHandler.HandleFinished(newTaskInfo(msg, base.TaskStateArchived, time.Time{}, nil))
}
return nil
},
errMsg: errMsg,
deadline: l.Deadline(),
}
}
if p.finishedHandler != nil {
p.finishedHandler.HandleFinished(newTaskInfo(msg, base.TaskStateArchived, time.Time{}, nil))
}
}

// queues returns a list of queues to query.
Expand Down
22 changes: 21 additions & 1 deletion server.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ type Config struct {
// If BaseContext is nil, the default is context.Background().
// If this is defined, then it MUST return a non-nil context
BaseContext func() context.Context

// TaskCheckInterval specifies the interval between checks for new tasks to process when all queues are empty.
//
// If unset, zero or a negative value, the interval is set to 1 second.
Expand Down Expand Up @@ -183,6 +183,11 @@ type Config struct {

ErrorHandler ErrorHandler

// FinishedHandler handles a task that has been processed.
//
// FinishedHandler is called when the task status becomes completed or archived.
FinishedHandler FinishedHandler

// Logger specifies the logger used by the server instance.
//
// If unset, default logger is used.
Expand Down Expand Up @@ -275,6 +280,20 @@ func (fn ErrorHandlerFunc) HandleError(ctx context.Context, task *Task, err erro
fn(ctx, task, err)
}

// An FinishedHandler handles a task that has been processed.
type FinishedHandler interface {
HandleFinished(task *TaskInfo)
}

// The FinishedHandlerFunc type is an adapter to allow the use of ordinary functions as a FinishedHandler.
// If f is a function with the appropriate signature, FinishedHandlerFunc(f) is a FinishedHandler that calls f.
type FinishedHandlerFunc func(task *TaskInfo)

// HandleFinished calls fn(ctx, task, err)
func (fn FinishedHandlerFunc) HandleFinished(task *TaskInfo) {
fn(task)
}

// RetryDelayFunc calculates the retry delay duration for a failed task given
// the retry count, error, and the task.
//
Expand Down Expand Up @@ -529,6 +548,7 @@ func NewServer(r RedisConnOpt, cfg Config) *Server {
queues: queues,
strictPriority: cfg.StrictPriority,
errHandler: cfg.ErrorHandler,
finishedHandler: cfg.FinishedHandler,
shutdownTimeout: shutdownTimeout,
starting: starting,
finished: finished,
Expand Down

0 comments on commit 1e13014

Please sign in to comment.