From bdf73a67f4ce40c14d4aae7540dd70911ad60feb Mon Sep 17 00:00:00 2001 From: Jochen Ullrich Date: Wed, 10 Aug 2022 10:28:56 +0200 Subject: [PATCH 1/4] fix: Fix object timestamp logic This should make the s3 input compatible with more S3 compatible backends since the timestamps are taken from the same call. It also changes the logic to something I thought makes more sense. It shouldn't really matter if a file has changed since listing, just if it has changed during downloading/processing. If it had just changed during listing but before the download, the exact same file would be processed twice. --- lib/logstash/inputs/s3.rb | 9 +++++---- spec/inputs/s3_spec.rb | 4 +++- 2 files changed, 8 insertions(+), 5 deletions(-) diff --git a/lib/logstash/inputs/s3.rb b/lib/logstash/inputs/s3.rb index 36a1f65..b2513b6 100644 --- a/lib/logstash/inputs/s3.rb +++ b/lib/logstash/inputs/s3.rb @@ -375,19 +375,20 @@ def ignore_filename?(filename) def process_log(queue, log) @logger.debug("Processing", :bucket => @bucket, :key => log.key) - object = @s3bucket.object(log.key) + object = @s3bucket.object(log.key).load filename = File.join(temporary_directory, File.basename(log.key)) if download_remote_file(object, filename) if process_local_log(queue, filename, object) - if object.last_modified == log.last_modified + refreshed_object = @s3bucket.object(log.key) + if object.last_modified == refreshed_object.last_modified backup_to_bucket(object) backup_to_dir(filename) delete_file_from_bucket(object) FileUtils.remove_entry_secure(filename, true) - sincedb.write(log.last_modified) + sincedb.write(object.last_modified) else - @logger.info("#{log.key} is updated at #{object.last_modified} and will process in the next cycle") + @logger.info("#{log.key} is updated at #{refreshed_object.last_modified} and will process in the next cycle") end end else diff --git a/spec/inputs/s3_spec.rb b/spec/inputs/s3_spec.rb index b34b682..753d9c2 100644 --- a/spec/inputs/s3_spec.rb +++ b/spec/inputs/s3_spec.rb @@ -583,7 +583,7 @@ end end - context 's3 object updated after getting summary' do + context 's3 object updated during processing' do it 'should not update sincedb' do s3_summary = [ double(:key => 'YESTERDAY', :last_modified => Time.now.round - day, :content_length => 5, :storage_class => 'STANDARD'), @@ -592,6 +592,8 @@ s3_objects = [ double(:key => 'YESTERDAY', :last_modified => Time.now.round - day, :content_length => 5, :storage_class => 'STANDARD'), + double(:key => 'YESTERDAY', :last_modified => Time.now.round - day, :content_length => 5, :storage_class => 'STANDARD'), + double(:key => 'TODAY', :last_modified => Time.now.round - (cutoff * 10), :content_length => 5, :storage_class => 'STANDARD'), double(:key => 'TODAY_UPDATED', :last_modified => Time.now.round, :content_length => 5, :storage_class => 'STANDARD') ] From fafd19dd3c6b21a81bd0f0b41e7d9a64a0028899 Mon Sep 17 00:00:00 2001 From: Jochen Ullrich Date: Wed, 10 Aug 2022 10:36:09 +0200 Subject: [PATCH 2/4] doc: Add some more documentation to the changed behavior --- lib/logstash/inputs/s3.rb | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/lib/logstash/inputs/s3.rb b/lib/logstash/inputs/s3.rb index b2513b6..efbd6a3 100644 --- a/lib/logstash/inputs/s3.rb +++ b/lib/logstash/inputs/s3.rb @@ -375,12 +375,14 @@ def ignore_filename?(filename) def process_log(queue, log) @logger.debug("Processing", :bucket => @bucket, :key => log.key) + # Eager-loads the object data so the last_modified field is populated right before the download object = @s3bucket.object(log.key).load filename = File.join(temporary_directory, File.basename(log.key)) if download_remote_file(object, filename) if process_local_log(queue, filename, object) refreshed_object = @s3bucket.object(log.key) + # If the object was modified during download and processing, do not backup/delete it and process it again during the next iteration if object.last_modified == refreshed_object.last_modified backup_to_bucket(object) backup_to_dir(filename) @@ -388,7 +390,7 @@ def process_log(queue, log) FileUtils.remove_entry_secure(filename, true) sincedb.write(object.last_modified) else - @logger.info("#{log.key} is updated at #{refreshed_object.last_modified} and will process in the next cycle") + @logger.info("#{log.key} was updated at #{refreshed_object.last_modified} and will process in the next cycle") end end else From c10166a074915d834700e6a0eadb6b28240c41f1 Mon Sep 17 00:00:00 2001 From: Jochen Ullrich Date: Wed, 10 Aug 2022 10:50:19 +0200 Subject: [PATCH 3/4] fix: Fix rspec for new load behavior --- lib/logstash/inputs/s3.rb | 3 ++- spec/inputs/s3_spec.rb | 3 +++ 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/lib/logstash/inputs/s3.rb b/lib/logstash/inputs/s3.rb index efbd6a3..8bf3fe1 100644 --- a/lib/logstash/inputs/s3.rb +++ b/lib/logstash/inputs/s3.rb @@ -375,8 +375,9 @@ def ignore_filename?(filename) def process_log(queue, log) @logger.debug("Processing", :bucket => @bucket, :key => log.key) + object = @s3bucket.object(log.key) # Eager-loads the object data so the last_modified field is populated right before the download - object = @s3bucket.object(log.key).load + object.load filename = File.join(temporary_directory, File.basename(log.key)) if download_remote_file(object, filename) diff --git a/spec/inputs/s3_spec.rb b/spec/inputs/s3_spec.rb index 753d9c2..69fbdfa 100644 --- a/spec/inputs/s3_spec.rb +++ b/spec/inputs/s3_spec.rb @@ -407,6 +407,7 @@ } allow_any_instance_of(Aws::S3::Bucket).to receive(:objects) { objects } allow_any_instance_of(Aws::S3::Bucket).to receive(:object).with(log.key) { log } + allow_any_instance_of(Aws::S3::Object).to receive(:load) expect(log).to receive(:get).with(instance_of(Hash)) do |arg| File.open(arg[:response_target], 'wb') { |s3file| s3file.write(data) } end @@ -569,6 +570,7 @@ allow_any_instance_of(Aws::S3::Bucket).to receive(:objects) { s3_objects } allow_any_instance_of(Aws::S3::Bucket).to receive(:object).and_return(*s3_objects) + allow_any_instance_of(Aws::S3::Object).to receive(:load) expect(s3_plugin).to receive(:process_log).at_least(size).and_call_original expect(s3_plugin).to receive(:stop?).and_return(false).at_least(size) expect(s3_plugin).to receive(:download_remote_file).and_return(true).at_least(size) @@ -601,6 +603,7 @@ allow_any_instance_of(Aws::S3::Bucket).to receive(:objects) { s3_summary } allow_any_instance_of(Aws::S3::Bucket).to receive(:object).and_return(*s3_objects) + allow_any_instance_of(Aws::S3::Object).to receive(:load) expect(s3_plugin).to receive(:process_log).at_least(size).and_call_original expect(s3_plugin).to receive(:stop?).and_return(false).at_least(size) expect(s3_plugin).to receive(:download_remote_file).and_return(true).at_least(size) From b687c4e45bb15a6cffeb0af997e0d9cddf16d8e7 Mon Sep 17 00:00:00 2001 From: Jochen Ullrich Date: Wed, 10 Aug 2022 10:58:19 +0200 Subject: [PATCH 4/4] fix: Move stub to general before --- spec/inputs/s3_spec.rb | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/spec/inputs/s3_spec.rb b/spec/inputs/s3_spec.rb index 69fbdfa..beaee8f 100644 --- a/spec/inputs/s3_spec.rb +++ b/spec/inputs/s3_spec.rb @@ -32,6 +32,7 @@ FileUtils.mkdir_p(sincedb_path) Aws.config[:stub_responses] = true Thread.abort_on_exception = true + allow_any_instance_of(Aws::S3::Object).to receive(:load) end context "when interrupting the plugin" do @@ -407,7 +408,6 @@ } allow_any_instance_of(Aws::S3::Bucket).to receive(:objects) { objects } allow_any_instance_of(Aws::S3::Bucket).to receive(:object).with(log.key) { log } - allow_any_instance_of(Aws::S3::Object).to receive(:load) expect(log).to receive(:get).with(instance_of(Hash)) do |arg| File.open(arg[:response_target], 'wb') { |s3file| s3file.write(data) } end @@ -570,7 +570,6 @@ allow_any_instance_of(Aws::S3::Bucket).to receive(:objects) { s3_objects } allow_any_instance_of(Aws::S3::Bucket).to receive(:object).and_return(*s3_objects) - allow_any_instance_of(Aws::S3::Object).to receive(:load) expect(s3_plugin).to receive(:process_log).at_least(size).and_call_original expect(s3_plugin).to receive(:stop?).and_return(false).at_least(size) expect(s3_plugin).to receive(:download_remote_file).and_return(true).at_least(size) @@ -603,7 +602,6 @@ allow_any_instance_of(Aws::S3::Bucket).to receive(:objects) { s3_summary } allow_any_instance_of(Aws::S3::Bucket).to receive(:object).and_return(*s3_objects) - allow_any_instance_of(Aws::S3::Object).to receive(:load) expect(s3_plugin).to receive(:process_log).at_least(size).and_call_original expect(s3_plugin).to receive(:stop?).and_return(false).at_least(size) expect(s3_plugin).to receive(:download_remote_file).and_return(true).at_least(size)