Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion app/jobs/enrichment_batch_process_job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ class EnrichmentBatchProcessJob < ApplicationJob

queue_as :enrichment_batch_process_job

def perform(lines, filename)
def perform(lines, filename, source_id)
log_prefix = "EnrichmentBatchProcessJob (#{filename})"

# We will process the lines in parallel to speed up ingestion.
Expand Down Expand Up @@ -43,6 +43,7 @@ def perform(lines, filename)
enrichment = Enrichment.new(
filename: filename,
doi: uid,
source_id: source_id,
contributors: parsed_line["contributors"],
resources: parsed_line["resources"],
field: parsed_line["field"],
Expand Down
7 changes: 6 additions & 1 deletion app/models/enrichment.rb
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
class Enrichment < ApplicationRecord
before_validation :set_defaults
before_validation :normalize_source_id

validates :uuid, presence: true, uniqueness: true

validates :source_id, presence: true
validate :validate_json_schema

belongs_to :doi_record,
Expand Down Expand Up @@ -31,6 +32,10 @@ def set_defaults
self.uuid = SecureRandom.uuid if uuid.blank?
end

def normalize_source_id
self.source_id = source_id&.strip&.upcase
end

def validate_json_schema
doc = to_enrichment_hash
error_list = self.class.enrichment_schemer.validate(doc).to_a
Expand Down
7 changes: 7 additions & 0 deletions db/migrate/20260310084109_add_source_id_to_enrichments.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
class AddSourceIdToEnrichments < ActiveRecord::Migration[7.2]
disable_departure!

def change
add_column :enrichments, :source_id, :string, limit: 255, null: false
end
end
3 changes: 2 additions & 1 deletion db/schema.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
#
# It's strongly recommended that you check this file into your version control system.

ActiveRecord::Schema[7.2].define(version: 2026_03_03_053712) do
ActiveRecord::Schema[7.2].define(version: 2026_03_10_084109) do
create_table "active_storage_attachments", charset: "utf8mb4", collation: "utf8mb4_0900_ai_ci", force: :cascade do |t|
t.string "name", limit: 191, null: false
t.string "record_type", null: false
Expand Down Expand Up @@ -251,6 +251,7 @@
t.datetime "created_at", null: false
t.datetime "updated_at", null: false
t.string "uuid", limit: 36, null: false
t.string "source_id", null: false
t.index ["doi", "updated_at", "id"], name: "index_enrichments_on_doi_and_updated_at_and_id", order: { updated_at: :desc, id: :desc }
t.index ["uuid"], name: "index_enrichments_on_uuid", unique: true
end
Expand Down
7 changes: 4 additions & 3 deletions lib/tasks/enrichment.rake
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,15 @@

namespace :enrichment do
desc "Process JSONL from S3 and enqueue batches sized by bytes (256KB message size limit)"
# "Example command: bundle exec rake enrichment:batch_process_file KEY=02022026_test_ingestion_file.jsonl
# bundle exec rake enrichment:batch_process_file KEY=preprint_matching_enrichments_datacite_format_1000.jsonl
# "Example command: bundle exec rake enrichment:batch_process_file KEY=02022026_test_ingestion_file.jsonl SOURCE_ID=DATACITE.COMET
task batch_process_file: :environment do
bucket = ENV["ENRICHMENTS_INGESTION_FILES_BUCKET_NAME"]
key = ENV["KEY"]
source_id = ENV["SOURCE_ID"]

abort("ENRICHMENTS_INGESTION_FILES_BUCKET_NAME is not set") if bucket.blank?
abort("KEY is not set") if key.blank?
abort("SOURCE_ID is not set") if source_id.blank?

# SQS limit is 256KB so we'll set the batch size to be more conservative to allow for some
# overhead and ensure we don't exceed limits.
Expand All @@ -28,7 +29,7 @@ namespace :enrichment do
flush = lambda do
return if batch_lines.empty?

EnrichmentBatchProcessJob.perform_later(batch_lines, key)
EnrichmentBatchProcessJob.perform_later(batch_lines, key, source_id)

batch_lines.clear
batch_bytes = 0
Expand Down