Skip to content

Conversation

jterapin
Copy link
Contributor

Experimental prototype


By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.

  1. To make sure we include your contribution in the release notes, please make sure to add description entry for your changes in the "unreleased changes" section of the CHANGELOG.md file (at corresponding gem). For the description entry, please make sure it lives in one line and starts with Feature or Issue in the correct format.

  2. For generated code changes, please checkout below instructions first:
    https://github.com/aws/aws-sdk-ruby/blob/version-3/CONTRIBUTING.md

Thank you for your contribution!

Copy link

github-actions bot commented Sep 9, 2025

Detected 1 possible performance regressions:

  • aws-sdk-s3.gem_size_kb - z-score regression: 513.54 -> 517.0. Z-score: 26.42

Copy link
Contributor

@mullermp mullermp left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is on the right path but we should break this down into chunks. I think you should first refactor file uploader/downloader to use the executor, and we should release that, then add directory upload/download after that.

@queue = Queue.new
@max_threads = options[:max_threads] || 10
@pool = []
@running = true
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does running state default to true in the initializer for other implementations of this in ruby-concurrency?

module Aws
module S3
# Raised when DirectoryDownloader fails to download objects from S3 bucket
class DirectoryDownloadError < StandardError
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By convention we were putting these in separate files right? If you want to promote the other two (multipart errors) to the files where they are used that's fine too, but let's stay consistent.

# @option options [Integer] :thread_count (DEFAULT_THREAD_COUNT)
def initialize(options = {})
@client = options[:client] || Client.new
@thread_count = options[:thread_count] || DEFAULT_THREAD_COUNT
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The thread count wouldn't matter in this class anymore right?

parts = upload_parts(upload_id, source, file_size, options)
complete_upload(upload_id, parts, file_size, options)
ensure
@executor.shutdown if @executor.running? && @options[:executor].nil?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems odd. I would think the executor always exists because we provide a default - if nil we still provide a default. Also you would check if it were nil before you attempt to call a method on it?

end

download_opts = options.dup
@bucket = bucket
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's odd to set instance state like bucket in a method. Wouldn't you pass bucket, errors, configuration, etc, down to relevant methods and return back errors?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I'd agree. I'm not against this being a "one-shot" class - IE, for each directory download, you must create this object again - in which case bucket/destination would be set on initialization instead.

Either way though - setting it as a member variable here is a little weird.

downloads = process_download_queue(producer, downloader, download_opts)
build_result(downloads)
ensure
@executor.shutdown unless @options[:executor]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should always assume an executor I think.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this is an internal/private api - I think I'd agree that its reasonable to always require an executor to be provided, and then we don't ever shut it down

end

def shutdown
@running = false
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we track state of running -> shutting down -> shutdown? There is a gap in between calling shutdown and being fully shutdown (Where tasks cannot be submitted but tasks are still completing).

Additionally - we should probably offer a "kill" method that will immediately kill all threads. And finally - shutdown methods usually offer the ability to specify a timeout after which remaining tasks are killed (I think you can use the limit value in join for that).

downloads = process_download_queue(producer, downloader, download_opts)
build_result(downloads)
ensure
@executor.shutdown unless @options[:executor]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this is an internal/private api - I think I'd agree that its reasonable to always require an executor to be provided, and then we don't ever shut it down

end

download_opts = options.dup
@bucket = bucket
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I'd agree. I'm not against this being a "one-shot" class - IE, for each directory download, you must create this object again - in which case bucket/destination would be set on initialization instead.

Either way though - setting it as a member variable here is a little weird.

def process_download_queue(producer, downloader, opts)
download_attempts = 0
completion_queue = Queue.new
queue_executor = DefaultExecutor.new
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need an executor here? I know this needs to be done async/in a thread, but would it work to just spawn a single thread here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Edit: NVM, I understand why we're doing this to limit the max concurrent downloads at one time. Maybe add comments or have a parameter/constant for that?

@filter_callback = options[:filter_callback]
@errors = options[:errors]
@directory_downloader = options[:directory_downloader]
@object_queue = SizedQueue.new(100)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

make this a constant?

download_attempts = 0
completion_queue = Queue.new
queue_executor = DefaultExecutor.new
while (object = producer.object_queue.shift) != :done
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggestion on the Producer interface - I think the object_queue should be an internal detail here. I'd lean towards having this implement enumerable, so here you would just do:

producer.each do |object| 
  break if @abord_download
  # rest of code
end

calling each on the producer would start it (so no need to call run) and would handle yielding objects from the internal object_queue.

upload_attempts = 0
completion_queue = Queue.new
queue_executor = DefaultExecutor.new
while (file = producer.file_queue.shift) != :done
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto here - I think this producer could follow the same pattern.

uploader.upload(f[:path], opts.merge(key: f[:key]))
rescue StandardError => e
@errors << e
@abort_upload = true unless @ignore_failure
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

naming nit - what about upload_aborted instead to make it clear that this is state rather than configuration


upload_opts = options.dup
@ignore_failure = upload_opts.delete(:ignore_failure) || false
@errors = []
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same comment as on the DirectoryDownloader - I think we need to decide if these classes are "one shot" or re-usable. If they are re-usable, then we cannot set state on the class.

Because the File uploader/downloader are NOT one shot, I guess I'd lean towards consistency here and having a DirectoryUploader that is re-useable and then I think the easiest way to manage this is to create a private class DirectoryUpload which is one-shot (ie, represents the state of a single directory upload.

As part of this - I think we would move management of the queue executor to the top level object- that is, the same queue executor would be shared across any concurrent DirectoryUploads (so that effectively a configured max_concurrent_file_uploads setting would apply across all concurrent uploads started from the same DirectoryUploader).

File.rename(opts[:temp_path], destination) if opts[:temp_path]
ensure
File.delete(@temp_path) if @temp_path && File.exist?(@temp_path)
@executor.shutdown if @executor.running? && @options[:executor].nil?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there is an issue with this - the FileDowloader can be used to download multiple files right (ie, without creating a new one)? so here, if we shutdown the executor on the first download call, we'll never be able to call download again.

I think we'd probably want to call shutdown only when the downloader is GC'ed (and maybe at that point, we would just call kill on it instead?)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants