From 68a1561ed5273d2704d4cb7ca814c3d81d371a49 Mon Sep 17 00:00:00 2001 From: Isaque Date: Mon, 14 Jul 2025 00:44:29 -0300 Subject: [PATCH] refact: using JobRow type --- client.go | 95 ++++++++++++++++++++---------- producer.go | 9 ++- storage/postgresql/postgresql.go | 14 +++-- storage/postgresql/queries/data.go | 24 +++++--- storage/storage.go | 6 +- types/types.go | 23 ++++++++ 6 files changed, 121 insertions(+), 50 deletions(-) diff --git a/client.go b/client.go index bfbb487..a109b58 100644 --- a/client.go +++ b/client.go @@ -9,6 +9,7 @@ import ( "crypto/rand" "database/sql" "encoding/json" + "errors" "log/slog" "sync" "time" @@ -70,11 +71,11 @@ type Client interface { Stop() // Insert add a job into the queue to be processed. - Insert(queue string, params JobArgs) (*int64, error) + Insert(queue string, params JobArgs, opts ...*types.InsertOptions) (*int64, error) // InsertTx adds a job into the specified queue within the context of the provided // transaction, allowing the operation to be part of an atomic database transaction. - InsertTx(tx *sql.Tx, queue string, params JobArgs) (*int64, error) + InsertTx(tx *sql.Tx, queue string, params JobArgs, opts ...*types.InsertOptions) (*int64, error) } // NewClient creates a new instance of worker with the provided context and options. @@ -122,6 +123,7 @@ func NewClient(ctx context.Context, opts ...Option) Client { for queue, config := range clt.cfg.queues { logger := clt.cfg.logger.WithGroup("producer").With(slog.String("queue", queue)) clt.producers[queue] = &producer{ + clientID: &clt.id, logger: logger, workers: clt.cfg.workers, storage: clt.cfg.storage, @@ -157,58 +159,87 @@ func (c *IClient) Stop() { } // Insert add a job into the queue to be processed. -func (c *IClient) Insert(queue string, params JobArgs) (*int64, error) { +func (c *IClient) Insert(queue string, params JobArgs, options ...*types.InsertOptions) (*int64, error) { args, err := json.Marshal(params) if err != nil { return nil, err } - id, err := c.cfg.storage.Insert(queue, params.Kind(), args) - if err != nil { - c.cfg.logger.ErrorContext(c.ctx, "failed to insert job into queue", - slog.String("error", err.Error()), - slog.String("queue", queue), - slog.String("kind", params.Kind()), - slog.Any("args", params), - ) + opts := &types.InsertOptions{} + if len(options) > 0 { + opts = options[0] + } + + if opts.Priority == 0 { + opts.Priority = 4 + } + + if opts.Priority > 4 { + return nil, errors.New("priority must be between 1 and 4") + } + + state := types.JobStateAvailable + if !opts.ScheduledAt.IsZero() { + state = types.JobStateScheduled + } + + if opts.ScheduledAt.IsZero() || opts.ScheduledAt.Before(time.Now()) { + opts.ScheduledAt = time.Now().UTC() + } + + if opts.MaxRetries == 0 { + opts.MaxRetries = 7 + } + + if opts.Pending { + state = types.JobStatePending + } + + job := &types.JobRow{ + Kind: params.Kind(), + Queue: queue, + Args: args, + State: state, + Options: opts, + } + + var jobID *int64 + if jobID, err = c.cfg.storage.Insert(job); err != nil { + c.cfg.logger.ErrorContext(c.ctx, "failed to insert job into queue", slog.String("error", err.Error()), + slog.String("queue", queue), slog.String("kind", params.Kind()), slog.Any("args", params)) return nil, err } - c.cfg.logger.DebugContext(c.ctx, "job inserted into queue", - slog.String("queue", queue), - slog.Int64("job_id", *id), - slog.String("kind", params.Kind()), - slog.Any("args", params), - ) + c.cfg.logger.DebugContext(c.ctx, "job inserted into queue", slog.String("queue", queue), + slog.Int64("job_id", *jobID), slog.String("kind", params.Kind()), slog.Any("args", params)) - return id, nil + return jobID, nil } // InsertTx adds a job into the specified queue within the context of the provided // transaction, allowing the operation to be part of an atomic database transaction. -func (c *IClient) InsertTx(tx *sql.Tx, queue string, params JobArgs) (*int64, error) { +func (c *IClient) InsertTx(tx *sql.Tx, queue string, params JobArgs, options ...*types.InsertOptions) (*int64, error) { args, err := json.Marshal(params) if err != nil { return nil, err } - id, err := c.cfg.storage.InsertTx(tx, queue, params.Kind(), args) + jobRow := &types.JobRow{ + Kind: params.Kind(), + Queue: queue, + Args: args, + State: types.JobStateAvailable, + } + + id, err := c.cfg.storage.InsertTx(tx, jobRow) if err != nil { - c.cfg.logger.ErrorContext(c.ctx, "failed to insert job into queue within transaction", - slog.String("error", err.Error()), - slog.String("queue", queue), - slog.String("kind", params.Kind()), - slog.Any("args", params), - ) + c.cfg.logger.ErrorContext(c.ctx, "failed to insert job into queue within transaction", slog.String("error", err.Error()), + slog.String("queue", queue), slog.String("kind", params.Kind()), slog.Any("args", params)) return nil, err } - c.cfg.logger.DebugContext(c.ctx, "job inserted into queue within transaction", - slog.String("queue", queue), - slog.Int64("job_id", *id), - slog.String("kind", params.Kind()), - slog.Any("args", params), - ) + c.cfg.logger.DebugContext(c.ctx, "job inserted into queue within transaction", slog.String("queue", queue), + slog.Int64("job_id", *id), slog.String("kind", params.Kind()), slog.Any("args", params)) return id, nil } diff --git a/producer.go b/producer.go index c8b3151..7acb8a8 100644 --- a/producer.go +++ b/producer.go @@ -10,9 +10,12 @@ import ( "github.com/isaqueveras/synk/storage" "github.com/isaqueveras/synk/types" + "github.com/oklog/ulid/v2" ) type producer struct { + clientID *ulid.ULID + logger *slog.Logger jobsChannel chan *types.JobRow config *producerConfig @@ -37,7 +40,7 @@ type producerConfig struct { func (p *producer) process(ctx context.Context, jobs chan []*types.JobRow) { limit := int32(p.config.maxWorkerCount) - p.numJobsActive.Load() - go p.getJobAvailable(jobs, limit) + go p.getJobAvailable(jobs, limit, p.clientID) for { select { case jobs := <-jobs: @@ -156,8 +159,8 @@ func (p *producer) handleWorkerDone(job *types.JobRow) { p.jobsChannel <- job } -func (p *producer) getJobAvailable(jobs chan<- []*types.JobRow, limit int32) { - items, err := p.storage.GetJobAvailable(p.config.queueName, limit) +func (p *producer) getJobAvailable(jobs chan<- []*types.JobRow, limit int32, clientID *ulid.ULID) { + items, err := p.storage.GetJobAvailable(p.config.queueName, limit, clientID) if err != nil { panic(err) } diff --git a/storage/postgresql/postgresql.go b/storage/postgresql/postgresql.go index 88c5840..7a8eb8e 100644 --- a/storage/postgresql/postgresql.go +++ b/storage/postgresql/postgresql.go @@ -8,6 +8,8 @@ import ( "github.com/isaqueveras/synk/storage" "github.com/isaqueveras/synk/storage/postgresql/queries" "github.com/isaqueveras/synk/types" + + "github.com/oklog/ulid/v2" ) // New creates a new instance of the storage repository using the provided @@ -40,7 +42,7 @@ func (pg *postgres) Ping() error { } // GetJobAvailable retrieves a list of available jobs from the specified queue with a limit on the number of jobs. -func (pg *postgres) GetJobAvailable(queue string, limit int32) (items []*types.JobRow, err error) { +func (pg *postgres) GetJobAvailable(queue string, limit int32, clientID *ulid.ULID) (items []*types.JobRow, err error) { ctx, cancel := context.WithTimeout(pg.ctx, pg.timeout) defer cancel() @@ -50,7 +52,7 @@ func (pg *postgres) GetJobAvailable(queue string, limit int32) (items []*types.J } defer tx.Rollback() // nolint - if items, err = pg.queries.GetJobAvailable(ctx, tx, queue, limit); err != nil { + if items, err = pg.queries.GetJobAvailable(ctx, tx, queue, limit, clientID); err != nil { return nil, err } @@ -62,7 +64,7 @@ func (pg *postgres) GetJobAvailable(queue string, limit int32) (items []*types.J } // Insert inserts a new job into the specified queue with the given kind and arguments. -func (pg *postgres) Insert(queue, kind string, args []byte) (*int64, error) { +func (pg *postgres) Insert(params *types.JobRow) (*int64, error) { ctx, cancel := context.WithTimeout(pg.ctx, pg.timeout) defer cancel() @@ -73,7 +75,7 @@ func (pg *postgres) Insert(queue, kind string, args []byte) (*int64, error) { defer tx.Rollback() var id *int64 - if id, err = pg.queries.Insert(ctx, tx, queue, kind, args); err != nil { + if id, err = pg.queries.Insert(ctx, tx, params); err != nil { return nil, err } @@ -87,10 +89,10 @@ func (pg *postgres) Insert(queue, kind string, args []byte) (*int64, error) { // InsertTx inserts a new job into the specified queue with the given kind and arguments // within the context of the provided transaction. // This allows the operation to be part of an atomic database transaction. -func (pg *postgres) InsertTx(tx *sql.Tx, queue, kind string, args []byte) (*int64, error) { +func (pg *postgres) InsertTx(tx *sql.Tx, params *types.JobRow) (*int64, error) { ctx, cancel := context.WithTimeout(pg.ctx, pg.timeout) defer cancel() - return pg.queries.Insert(ctx, tx, queue, kind, args) + return pg.queries.Insert(ctx, tx, params) } // UpdateJobState updates the state, finalized_at, and error message of a job. diff --git a/storage/postgresql/queries/data.go b/storage/postgresql/queries/data.go index 9133476..f9534bf 100644 --- a/storage/postgresql/queries/data.go +++ b/storage/postgresql/queries/data.go @@ -7,6 +7,8 @@ import ( "time" "github.com/isaqueveras/synk/types" + + "github.com/oklog/ulid/v2" ) // Queries represents a collection of methods to interact with the PostgreSQL database. @@ -23,8 +25,9 @@ WITH jobs AS ( SELECT id, args, kind FROM synk.job WHERE state = 'available' AND queue = $1::TEXT + AND scheduled_at <= COALESCE($4::TIMESTAMPTZ, NOW()) ORDER BY priority ASC, scheduled_at ASC, id ASC - LIMIT $2::integer + LIMIT $2::INTEGER FOR UPDATE SKIP LOCKED ) UPDATE synk.job SET state = 'running', @@ -36,8 +39,8 @@ WHERE job.id = jobs.id RETURNING job.id, job.args, job.kind` // GetJobAvailable retrieves available jobs from the database and updates their state to 'running'. -func (q *Queries) GetJobAvailable(ctx context.Context, tx *sql.Tx, queue string, limit int32) ([]*types.JobRow, error) { - rows, err := tx.QueryContext(ctx, getJobAvailableSQL, queue, limit, "01JK7753BK0C8PY75K0JVXFYY0") +func (q *Queries) GetJobAvailable(ctx context.Context, tx *sql.Tx, queue string, limit int32, clientID *ulid.ULID) ([]*types.JobRow, error) { + rows, err := tx.QueryContext(ctx, getJobAvailableSQL, queue, limit, clientID.String(), nil) if err != nil { return nil, err } @@ -63,12 +66,19 @@ func (q *Queries) GetJobAvailable(ctx context.Context, tx *sql.Tx, queue string, return jobs, nil } -const insertSQL = "INSERT INTO synk.job (queue, kind, args, max_attempts) VALUES ($1, $2, $3::jsonb, 3) RETURNING id" +const insertSQL = ` +INSERT INTO synk.job (queue, kind, args, max_attempts, state, scheduled_at) +VALUES ($1, $2, $3::jsonb, $4, $5, $6) RETURNING id` // Insert inserts a new job into the database with the specified queue, kind, and arguments. -func (q *Queries) Insert(ctx context.Context, tx *sql.Tx, queue, kind string, args []byte) (id *int64, err error) { - err = tx.QueryRowContext(ctx, insertSQL, queue, kind, args).Scan(&id) - return id, err +func (q *Queries) Insert(ctx context.Context, tx *sql.Tx, params *types.JobRow) (id *int64, err error) { + if err = tx. + QueryRowContext(ctx, insertSQL, params.Queue, params.Kind, params.Args, params.Options.MaxRetries, + params.State, params.Options.ScheduledAt). + Scan(&id); err != nil { + return nil, err + } + return id, nil } const updateJobStateSQLNoError = `UPDATE synk.job SET state = $1, finalized_at = $2 WHERE id = $3` diff --git a/storage/storage.go b/storage/storage.go index d5029c7..4b5b3ec 100644 --- a/storage/storage.go +++ b/storage/storage.go @@ -5,6 +5,8 @@ import ( "time" "github.com/isaqueveras/synk/types" + + "github.com/oklog/ulid/v2" ) // Storage is an interface that defines methods for interacting with job storage. @@ -22,11 +24,11 @@ type Storage interface { // Insert adds a new job to the specified queue with the given kind and arguments. // It takes the name of the queue, the kind of job, and the arguments as a byte slice. // It returns a pointer to the job ID and an error if the insertion fails. - Insert(queue, kind string, args []byte) (*int64, error) + Insert(params *types.JobRow) (*int64, error) // InsertTx adds a new job to the specified queue with the given kind and arguments // within the context of the provided transaction. - InsertTx(tx *sql.Tx, queue, kind string, args []byte) (*int64, error) + InsertTx(tx *sql.Tx, params *types.JobRow) (*int64, error) // UpdateJobState updates the state of a job identified by its ID. // It takes the job ID, the new state, an optional finalized time, and an diff --git a/types/types.go b/types/types.go index d6344a8..5c6d749 100644 --- a/types/types.go +++ b/types/types.go @@ -15,6 +15,28 @@ type JobRow struct { Args []byte State JobState Errors []AttemptError + Options *InsertOptions +} + +// InsertOptions represents options for inserting a job into the queue. +type InsertOptions struct { + // ScheduledAt is the time at which the job should be scheduled to run. + ScheduledAt time.Time + + // Priority is the priority of the job, which can be used to determine the order + // in which jobs are processed. Higher values indicate higher priority. + Priority int + + // Pending indicates whether the job is pending execution. + // If true, the job is considered pending and will not be executed until it is marked + // as ready. If false, the job is ready to be executed. + Pending bool + + // Retryable indicates whether the job can be retried if it fails. + Retryable bool + + // MaxRetries is the maximum number of times the job can be retried if it fails. + MaxRetries int } // JobState represents the status of a job. @@ -28,6 +50,7 @@ const ( JobStateRetryable JobState = "retryable" JobStateRunning JobState = "running" JobStateScheduled JobState = "scheduled" + JobStatePending JobState = "pending" ) // AttemptError represents an error that occurred during a job attempt.