Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion .env.build
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,6 @@ ZBMATH_RELATED_SOURCE_TOKEN=
ZBMATH_AUTHOR_SOURCE_TOKEN=
ZBMATH_IDENTIFIER_SOURCE_TOKEN=
EVENTDATA_TOKEN=
STAFF_ADMIN_TOKEN=
STAFF_ADMIN_TOKEN=
CROSSREF_SOURCE_ID=
CROSSREF_SOURCE_TOKEN=
4 changes: 3 additions & 1 deletion .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,6 @@ AWS_SECRET_ACCESS_KEY=
# Can not be staff admin JWT must be valid user account
STAFF_PROFILES_ADMIN_TOKEN=

STAFF_ADMIN_TOKEN=
STAFF_ADMIN_TOKEN=
CROSSREF_SOURCE_ID=
CROSSREF_SOURCE_TOKEN=
48 changes: 20 additions & 28 deletions app/models/crossref.rb
Original file line number Diff line number Diff line change
Expand Up @@ -29,23 +29,21 @@ def source_id

def get_query_url(options = {})
params = {
source: "crossref",
"from-collected-date" => options[:from_date],
"until-collected-date" => options[:until_date],
"from-created-date" => options[:from_date],
"until-created-date" => options[:until_date],
mailto: "info@datacite.org",
scholix: true,
rows: options[:rows],
cursor: options[:cursor],
page: options[:page],
}.compact

"#{ENV['CROSSREF_QUERY_URL']}/v1/events?#{URI.encode_www_form(params)}"
"#{ENV['CROSSREF_QUERY_URL']}/beta/datacitations?#{URI.encode_www_form(params)}"
Comment thread
kaysiz marked this conversation as resolved.
end

def get_total(options = {})
query_url = get_query_url(options.merge(rows: 0))
result = Maremma.get(query_url, options)
message = result.body.dig("data", "message").to_h
[message["total-results"].to_i, message["next-cursor"]]
message["total-results"].to_i
end

def queue_jobs(options = {})
Expand All @@ -57,18 +55,18 @@ def queue_jobs(options = {})
options[:until_date].presence || Time.now.to_date.iso8601
options[:content_type] = "json"

total, cursor = get_total(options)
total = get_total(options)

if total.positive?
# walk through results paginated via cursor
# walk through results paginated via page
total_pages = (total.to_f / job_batch_size).ceil
error_total = 0

(0...total_pages).each do |page|
options[:offset] = page * job_batch_size
(0...total_pages).each do |page_num|
options[:offset] = page_num * job_batch_size
options[:total] = total
options[:cursor] = cursor
count, cursor = process_data(options)
options[:page] = page_num
process_data(options)
Comment on lines +65 to +69
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

🧩 Analysis chain

🌐 Web query:

Crossref beta datacitations API pagination parameters

💡 Result:

The Crossref beta Data Citations API, accessible at https://api.crossref.org/beta/datacitations/, supports pagination using the standard Crossref REST API parameters: rows (default 20, max 1000) to specify the number of results per page, and cursor (start with cursor=* for the first page, then use the next-cursor from the response) for efficient pagination through large result sets. Stop paginating when the number of returned items is less than the requested rows. Cursors expire after 5 minutes of inactivity. Include mailto parameter during beta for contact. Full details, including all parameters, are in the official Swagger documentation referenced in the API docs.

Citations:


🏁 Script executed:

# Find the crossref.rb file and examine the pagination implementation
find . -name "crossref.rb" -type f | head -5

Repository: datacite/levriero

Length of output: 85


🏁 Script executed:

# Once located, read the file to understand the full context of pagination
cat -n app/models/crossref.rb | head -100

Repository: datacite/levriero

Length of output: 4288


🏁 Script executed:

# Also search for the get_query_url method implementation
rg "def get_query_url" -A 20 app/models/

Repository: datacite/levriero

Length of output: 7738


🏁 Script executed:

# Search for process_data method
rg "def process_data" -A 15 app/models/

Repository: datacite/levriero

Length of output: 688


Replace page-based pagination with cursor-based pagination for Crossref /beta/datacitations API.

The Crossref /beta/datacitations API uses cursor-based pagination (with cursor and next-cursor parameters), not page-based pagination. The current code at lines 65-69 generates sequential page numbers (0, 1, 2, ...) and passes them via the page parameter, which the API does not recognize for this endpoint. This causes incorrect or missing results.

The loop should instead:

  1. Start with cursor: "*" for the first request
  2. Extract next-cursor from the API response
  3. Use that cursor for the next request
  4. Stop when results returned < rows

Additionally, options[:offset] (line 66) is calculated but never used—remove it.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@app/models/crossref.rb` around lines 65 - 69, The code is using page-based
pagination but Crossref /beta/datacitations requires cursor-based pagination;
replace the (0...total_pages) loop with a cursor loop: initialize
options[:cursor] = "*" for the first request, remove options[:offset] and
options[:page], call process_data(options) and have it return the API response
(or modify it to yield the response), extract the
response['message']['next-cursor'] (or 'next-cursor') and set options[:cursor] =
next_cursor for the next iteration, and stop looping when the number of items
returned is less than job_batch_size (rows); ensure process_data uses
options[:cursor] when making requests and propagate next-cursor back to the
loop.

end
text = "Queued import for #{total} DOIs updated #{options[:from_date]} - #{options[:until_date]}."
else
Expand Down Expand Up @@ -99,34 +97,28 @@ def push_data(result, _options = {})
return result.body.fetch("errors") if result.body.fetch("errors",
nil).present?

items = result.body.dig("data", "message", "events")
items = result.body.dig("data", "message", "items")
# Rails.logger.info "Extracting related identifiers for #{items.size} DOIs updated from #{options[:from_date]} until #{options[:until_date]}."

Array.wrap(items).map do |item|
CrossrefImportJob.perform_later(item)
end

[items.length, result.body.dig("data", "message", "next-cursor")]
end

def self.push_item(item)
subj = cached_crossref_response(item["subj_id"])
obj = cached_datacite_response(item["obj_id"])
subj = cached_crossref_response(item["subject"]["id"])
obj = cached_datacite_response(item["object"]["id"])

data = {
"data" => {
"id" => item["id"],
"type" => "events",
"attributes" => {
"messageAction" => item["action"],
"subjId" => item["subj_id"],
"objId" => item["obj_id"],
"relationTypeId" => item["relation_type_id"].to_s.dasherize,
"sourceId" => item["source_id"].to_s.dasherize,
"sourceToken" => item["source_token"],
"occurredAt" => item["occurred_at"],
"subjId" => item["subject"]["id"],
"objId" => item["object"]["id"],
"relationTypeId" => item["relation"].to_s.dasherize,
"sourceId" => "crossref",
"sourceToken" => ENV["CROSSREF_SOURCE_TOKEN"],
"timestamp" => item["timestamp"],
"license" => item["license"],
"subj" => subj,
"obj" => obj,
},
Expand All @@ -135,6 +127,6 @@ def self.push_item(item)

send_event_import_message(data)

Rails.logger.info "[Event Data] #{item['subj_id']} #{item['relation_type_id']} #{item['obj_id']} sent to the events queue."
Rails.logger.info "[Event Data] #{item["subject"]["id"]} #{item["relation"]} #{item["object"]["id"]} sent to the events queue."
end
end
2 changes: 1 addition & 1 deletion config/application.rb
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
ENV["LAGOTTINO_URL"] ||= "https://api.stage.datacite.org"
ENV["SASHIMI_QUERY_URL"] ||= "https://api.stage.datacite.org"
ENV["EVENTDATA_URL"] ||= "https://bus-staging.eventdata.crossref.org"
ENV["CROSSREF_QUERY_URL"] ||= "https://api.eventdata.crossref.org"
ENV["CROSSREF_QUERY_URL"] ||= "https://api.crossref.org"
ENV["TRUSTED_IP"] ||= "10.0.40.1"
ENV["SLACK_WEBHOOK_URL"] ||= ""
ENV["USER_AGENT"] ||= "Mozilla/5.0 (compatible; Maremma/#{Maremma::VERSION}; mailto:info@datacite.org)"
Expand Down

Large diffs are not rendered by default.

Large diffs are not rendered by default.

36 changes: 16 additions & 20 deletions spec/models/crossref_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -60,28 +60,26 @@
it "returns a valid query URL with the given options" do
crossref = Crossref.new
query_url = crossref.get_query_url(from_date: from_date, until_date: until_date, rows: 10, cursor: "abc123")
expect(query_url).to include("source=crossref")
expect(query_url).to include("from-collected-date=#{from_date}")
expect(query_url).to include("until-collected-date=#{until_date}")
expect(query_url).to include("from-created-date=#{from_date}")
expect(query_url).to include("until-created-date=#{until_date}")
expect(query_url).to include("rows=10")
expect(query_url).to include("cursor=abc123")
end
end

describe "#queue_jobs" do
context "when there are DOIs to import" do
it "queues jobs and returns the total number of works queued" do
allow_any_instance_of(Crossref).to receive(:get_total).and_return([5, "next_cursor"])
allow_any_instance_of(Crossref).to receive(:process_data).and_return([5, "next_cursor"])
allow_any_instance_of(Crossref).to receive(:get_total).and_return(5)
allow_any_instance_of(Crossref).to receive(:process_data).and_return(5)

response = Crossref.new.queue_jobs(from_date: from_date, until_date: until_date)

expect(response).to eq(5)
end

it "sends a Slack notification when slack_webhook_url is present" do
allow_any_instance_of(Crossref).to receive(:get_total).and_return([1, "cursor"])
allow_any_instance_of(Crossref).to receive(:process_data).and_return([1, "cursor"])
allow_any_instance_of(Crossref).to receive(:get_total).and_return(1)
allow_any_instance_of(Crossref).to receive(:process_data).and_return(1)

allow(Rails.logger).to receive(:info)

Expand All @@ -96,7 +94,7 @@

context "when there are no DOIs to import" do
it "returns 0 and logs a message when there are no DOIs to import" do
allow_any_instance_of(Crossref).to receive(:get_total).and_return([0, nil])
allow_any_instance_of(Crossref).to receive(:get_total).and_return(0)

# Spy on Rails.logger
logger_spy = spy("logger")
Expand All @@ -119,22 +117,20 @@
allow(Rails.logger).to(receive(:info))

item = {
"id" => "example_id",
"action" => "example_action",
"subj_id" => "example_subj_id",
"obj_id" => "example_obj_id",
"relation_type_id" => "example_relation_type_id",
"source_id" => "example_source_id",
"source_token" => "example_source_token",
"occurred_at" => "2023-01-05T12:00:00Z",
"timestamp" => 1641379200,
"license" => "example_license",
"timestamp" => "2002-07-25T03:18:25Z",
"relation" => "example_relation_type",
"subject" => {
"id" => "example_subj_id"
},
"object" => {
"id" => "example_obj_id"
}
}
Comment thread
coderabbitai[bot] marked this conversation as resolved.

Crossref.push_item(item)

expect(Crossref).to(have_received(:send_event_import_message).once)
expect(Rails.logger).to(have_received(:info).with("[Event Data] example_subj_id example_relation_type_id example_obj_id sent to the events queue."))
expect(Rails.logger).to(have_received(:info).with("[Event Data] example_subj_id example_relation_type example_obj_id sent to the events queue."))
end
end
end
2 changes: 1 addition & 1 deletion spec/rails_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ def fixture_path
c.filter_sensitive_data("<STAFF_ADMIN_TOKEN>") { ENV["STAFF_ADMIN_TOKEN"] }
c.filter_sensitive_data("<SLACK_WEBHOOK_URL>") { ENV["SLACK_WEBHOOK_URL"] }
c.configure_rspec_metadata!
c.default_cassette_options = { match_requests_on: %i[method uri] }
c.default_cassette_options = { record: vcr_mode, match_requests_on: %i[method uri] }
end

def capture_stdout
Expand Down
Loading