|
| 1 | +module River::Driver |
| 2 | + # Provides a Sequel driver for River. |
| 3 | + # |
| 4 | + # Used in conjunction with a River client like: |
| 5 | + # |
| 6 | + # DB = Sequel.connect("postgres://...") |
| 7 | + # client = River::Client.new(River::Driver::Sequel.new(DB)) |
| 8 | + # |
| 9 | + class Sequel |
| 10 | + def initialize(db) |
| 11 | + @db = db |
| 12 | + |
| 13 | + # It's Ruby, so we can only define a model after Sequel's established a |
| 14 | + # connection because it's all dynamic. |
| 15 | + if !River::Driver::Sequel.const_defined?(:RiverJob) |
| 16 | + River::Driver::Sequel.const_set(:RiverJob, Class.new(::Sequel::Model(:river_job))) |
| 17 | + |
| 18 | + # Since we only define our model once, take advantage of knowing this is |
| 19 | + # our first initialization to add required extensions. |
| 20 | + db.extension(:pg_array) |
| 21 | + end |
| 22 | + end |
| 23 | + |
| 24 | + def insert(insert_params) |
| 25 | + # the call to `#compact` is important so that we remove nils and table |
| 26 | + # default values get picked up instead |
| 27 | + to_job_row( |
| 28 | + RiverJob.create( |
| 29 | + { |
| 30 | + args: insert_params.encoded_args, |
| 31 | + kind: insert_params.kind, |
| 32 | + max_attempts: insert_params.max_attempts, |
| 33 | + priority: insert_params.priority, |
| 34 | + queue: insert_params.queue, |
| 35 | + state: insert_params.state, |
| 36 | + scheduled_at: insert_params.scheduled_at, |
| 37 | + tags: insert_params.tags ? ::Sequel.pg_array(insert_params.tags) : nil |
| 38 | + }.compact |
| 39 | + ) |
| 40 | + ) |
| 41 | + end |
| 42 | + |
| 43 | + private def to_job_row(river_job) |
| 44 | + # needs to be accessed through values because Sequel shadows `errors` |
| 45 | + errors = river_job.values[:errors] |
| 46 | + |
| 47 | + River::JobRow.new( |
| 48 | + id: river_job.id, |
| 49 | + attempt: river_job.attempt, |
| 50 | + attempted_at: river_job.attempted_at, |
| 51 | + attempted_by: river_job.attempted_by, |
| 52 | + created_at: river_job.created_at, |
| 53 | + encoded_args: river_job.args, |
| 54 | + errors: errors ? JSON.parse(errors, symbolize_names: true).map { |e| |
| 55 | + River::AttemptError.new( |
| 56 | + at: Time.parse(e[:at]), |
| 57 | + attempt: e[:attempt], |
| 58 | + error: e[:error], |
| 59 | + trace: e[:trace] |
| 60 | + ) |
| 61 | + } : nil, |
| 62 | + finalized_at: river_job.finalized_at, |
| 63 | + kind: river_job.kind, |
| 64 | + max_attempts: river_job.max_attempts, |
| 65 | + priority: river_job.priority, |
| 66 | + queue: river_job.queue, |
| 67 | + scheduled_at: river_job.scheduled_at, |
| 68 | + state: river_job.state, |
| 69 | + tags: river_job.tags |
| 70 | + ) |
| 71 | + end |
| 72 | + end |
| 73 | +end |
0 commit comments