diff --git a/gems/aws-sdk-s3/CHANGELOG.md b/gems/aws-sdk-s3/CHANGELOG.md index 7fd43ebaea8..9b595d882c0 100644 --- a/gems/aws-sdk-s3/CHANGELOG.md +++ b/gems/aws-sdk-s3/CHANGELOG.md @@ -1,6 +1,8 @@ Unreleased Changes ------------------ +* Feature - TODO + 1.199.1 (2025-09-25) ------------------ diff --git a/gems/aws-sdk-s3/lib/aws-sdk-s3/customizations.rb b/gems/aws-sdk-s3/lib/aws-sdk-s3/customizations.rb index c0dba64b79c..a85bfd6e14c 100644 --- a/gems/aws-sdk-s3/lib/aws-sdk-s3/customizations.rb +++ b/gems/aws-sdk-s3/lib/aws-sdk-s3/customizations.rb @@ -6,6 +6,7 @@ module S3 autoload :BucketRegionCache, 'aws-sdk-s3/bucket_region_cache' autoload :Encryption, 'aws-sdk-s3/encryption' autoload :EncryptionV2, 'aws-sdk-s3/encryption_v2' + autoload :DefaultExecutor, 'aws-sdk-s3/default_executor' autoload :FilePart, 'aws-sdk-s3/file_part' autoload :FileUploader, 'aws-sdk-s3/file_uploader' autoload :FileDownloader, 'aws-sdk-s3/file_downloader' @@ -18,6 +19,8 @@ module S3 autoload :ObjectMultipartCopier, 'aws-sdk-s3/object_multipart_copier' autoload :PresignedPost, 'aws-sdk-s3/presigned_post' autoload :Presigner, 'aws-sdk-s3/presigner' + autoload :DirectoryUploader, 'aws-sdk-s3/directory_uploader' + autoload :DirectoryDownloader, 'aws-sdk-s3/directory_downloader' autoload :TransferManager, 'aws-sdk-s3/transfer_manager' # s3 express session auth diff --git a/gems/aws-sdk-s3/lib/aws-sdk-s3/customizations/object.rb b/gems/aws-sdk-s3/lib/aws-sdk-s3/customizations/object.rb index 0a9a9b9d3ca..69e7d6905b9 100644 --- a/gems/aws-sdk-s3/lib/aws-sdk-s3/customizations/object.rb +++ b/gems/aws-sdk-s3/lib/aws-sdk-s3/customizations/object.rb @@ -459,11 +459,17 @@ def upload_stream(options = {}, &block) # @see Client#upload_part def upload_file(source, options = {}) uploading_options = options.dup - uploader = FileUploader.new(multipart_threshold: uploading_options.delete(:multipart_threshold), client: client) + executor = DefaultExecutor.new(max_threads: uploading_options.delete(:thread_count)) + uploader = FileUploader.new( + client: client, + executor: executor, + multipart_threshold: uploading_options.delete(:multipart_threshold) + ) response = Aws::Plugins::UserAgent.metric('RESOURCE_MODEL') do uploader.upload(source, uploading_options.merge(bucket: bucket_name, key: key)) end yield response if block_given? + executor.shutdown true end deprecated(:upload_file, use: 'Aws::S3::TransferManager#upload_file', version: 'next major version') @@ -539,10 +545,12 @@ def upload_file(source, options = {}) # @see Client#get_object # @see Client#head_object def download_file(destination, options = {}) - downloader = FileDownloader.new(client: client) + executor = DefaultExecutor.new(max_threads: options[:thread_count]) + downloader = FileDownloader.new(client: client, executor: executor) Aws::Plugins::UserAgent.metric('RESOURCE_MODEL') do downloader.download(destination, options.merge(bucket: bucket_name, key: key)) end + executor.shutdown true end deprecated(:download_file, use: 'Aws::S3::TransferManager#download_file', version: 'next major version') diff --git a/gems/aws-sdk-s3/lib/aws-sdk-s3/default_executor.rb b/gems/aws-sdk-s3/lib/aws-sdk-s3/default_executor.rb new file mode 100644 index 00000000000..38158b343d7 --- /dev/null +++ b/gems/aws-sdk-s3/lib/aws-sdk-s3/default_executor.rb @@ -0,0 +1,92 @@ +# frozen_string_literal: true + +module Aws + module S3 + # @api private + class DefaultExecutor + RUNNING = :running + SHUTTING_DOWN = :shutting_down + SHUTDOWN = :shutdown + + def initialize(options = {}) + @max_threads = options[:max_threads] || 10 + @state = RUNNING + @queue = Queue.new + @pool = [] + @mutex = Mutex.new + end + + def post(*args, &block) + raise 'Executor is not accepting new tasks' unless @state == RUNNING + + @queue << [args, block] + ensure_worker_available + end + + def kill + @mutex.synchronize do + @state = SHUTDOWN + @pool.each(&:kill) + @pool.clear + @queue.clear + end + true + end + + def shutdown(timeout = nil) + @mutex.synchronize do + return true if @state == SHUTDOWN + + @state = SHUTTING_DOWN + end + + @max_threads.times { @queue << :shutdown } + + if timeout + @pool.each { |thread| thread.join(timeout) } + @pool.select(&:alive?).each(&:kill) + else + @pool.each(&:join) + end + + @pool.clear + @state = SHUTDOWN + true + end + + def running? + @state == RUNNING + end + + def shutting_down? + @state == SHUTTING_DOWN + end + + def shutdown? + @state == SHUTDOWN + end + + private + + def ensure_worker_available + @mutex.synchronize do + return unless @state == RUNNING + + @pool.select!(&:alive?) + @pool << spawn_worker if @pool.size < @max_threads + end + end + + def spawn_worker + Thread.new do + while (job = @queue.shift) + break if job == :shutdown + + args, block = job + block.call(*args) + end + end + end + end + end +end diff --git a/gems/aws-sdk-s3/lib/aws-sdk-s3/directory_downloader.rb b/gems/aws-sdk-s3/lib/aws-sdk-s3/directory_downloader.rb new file mode 100644 index 00000000000..86d849f72fd --- /dev/null +++ b/gems/aws-sdk-s3/lib/aws-sdk-s3/directory_downloader.rb @@ -0,0 +1,155 @@ +# frozen_string_literal: true + +module Aws + module S3 + # Raised when DirectoryDownloader fails to download objects from S3 bucket + class DirectoryDownloadError < StandardError + def initialize(message, errors = []) + @errors = errors + super(message) + end + + # @return [Array] The list of errors encountered when downloading objects + attr_reader :errors + end + + # @api private + class DirectoryDownloader + def initialize(options = {}) + @client = options[:client] || Client.new + @executor = options[:executor] || DefaultExecutor.new + @options = options + @abort_download = false + end + + attr_reader :client, :abort_download + + # TODO: need to add progress tracker + def download(destination, bucket:, **options) + if File.exist?(destination) + raise ArgumentError 'invalid destination, expected a directory' unless File.directory?(destination) + else + FileUtils.mkdir_p(destination) + end + + download_opts = options.dup + @bucket = bucket + @ignore_failure = download_opts.delete(:ignore_failure) || false + @errors = [] + + downloader = FileDownloader.new(client: client, executor: @executor) + producer = ObjectProducer.new(destination, build_producer_opts(download_opts)) + producer.run + + downloads = process_download_queue(producer, downloader, download_opts) + build_result(downloads) + ensure + @executor.shutdown unless @options[:executor] + end + + def build_producer_opts(opts) + { + directory_downloader: self, + client: @client, + bucket: @bucket, + s3_prefix: opts.delete(:s3_prefix), + ignore_failure: @ignore_failure, + filter_callback: opts.delete(:filter_callback), + errors: @errors + } + end + + def build_result(download_count) + downloads = [download_count - @errors.count, 0].max + + if @abort_download + msg = "failed to download directory: downloaded #{downloads} files " \ + "and failed to download #{@errors.count} files." + raise DirectoryDownloadError.new(msg, @errors) + else + result = { completed_downloads: downloads, failed_downloads: @errors.count } + result[:errors] = @errors if @errors.any? + result + end + end + + def process_download_queue(producer, downloader, opts) + download_attempts = 0 + completion_queue = Queue.new + queue_executor = DefaultExecutor.new + while (object = producer.object_queue.shift) != :done + break if @abort_download + + download_attempts += 1 + queue_executor.post(object) do |o| + dir_path = File.dirname(o[:path]) + FileUtils.mkdir_p(dir_path) unless dir_path == @destination || Dir.exist?(dir_path) + + downloader.download(o[:path], opts.merge(bucket: @bucket, key: o[:key])) + rescue StandardError => e + @errors << e + @abort_download = true unless @ignore_failure + ensure + completion_queue << :done + end + end + download_attempts.times { completion_queue.pop } + download_attempts + ensure + queue_executor.shutdown + end + + # @api private + class ObjectProducer + def initialize(destination_dir, options = {}) + @destination_dir = destination_dir + @client = options[:client] + @bucket = options[:bucket] + @s3_prefix = options[:s3_prefix] + @ignore_failure = options[:ignore_failure] + @filter_callback = options[:filter_callback] + @errors = options[:errors] + @directory_downloader = options[:directory_downloader] + @object_queue = SizedQueue.new(100) + end + + attr_reader :object_queue + + def run + Thread.new do + stream_objects + @object_queue << :done + end + end + + private + + def build_object_entry(key) + { path: File.join(@destination_dir, normalize_key(key)), key: key } + end + + # TODO: need to add filter callback, double check handling of objects that ends with / + def stream_objects(continuation_token: nil) + resp = @client.list_objects_v2(bucket: @bucket, continuation_token: continuation_token) + resp.contents.each do |o| + break if @directory_downloader.abort_download + next if o.key.end_with?('/') + + @object_queue << build_object_entry(o.key) + rescue StandardError => e + @errors << e + @abort_download = true unless @ignore_failure + end + stream_objects(continuation_token: resp.next_continuation_token) if resp.next_continuation_token + end + + def normalize_key(key) + key = key.delete_prefix(@s3_prefix) if @s3_prefix + return key if File::SEPARATOR == '/' + + key.tr('/', File::SEPARATOR) + end + end + end + end +end diff --git a/gems/aws-sdk-s3/lib/aws-sdk-s3/directory_uploader.rb b/gems/aws-sdk-s3/lib/aws-sdk-s3/directory_uploader.rb new file mode 100644 index 00000000000..d7d87a58311 --- /dev/null +++ b/gems/aws-sdk-s3/lib/aws-sdk-s3/directory_uploader.rb @@ -0,0 +1,215 @@ +# frozen_string_literal: true + +require 'set' + +module Aws + module S3 + # Raised when DirectoryUploader fails to upload files to S3 bucket + class DirectoryUploadError < StandardError + def initialize(message, errors = []) + @errors = errors + super(message) + end + + # @return [Array] The list of errors encountered when uploading files + attr_reader :errors + end + + # @api private + class DirectoryUploader + def initialize(options = {}) + @client = options[:client] || Client.new + @executor = options[:executor] || DefaultExecutor.new + @options = options + @abort_upload = false + end + + # @return [Client] + attr_reader :client + + # @return [Boolean] + attr_accessor :abort_upload + + # TODO: Need to add progress tracker + def upload(source, bucket:, **options) + raise ArgumentError, 'Invalid directory' unless Dir.exist?(source) + + upload_opts = options.dup + @ignore_failure = upload_opts.delete(:ignore_failure) || false + @errors = [] + + uploader = FileUploader.new( + multipart_threshold: upload_opts.delete(:multipart_threshold), + client: @client, + executor: @executor + ) + producer = FileProducer.new(source, build_producer_opts(upload_opts)) + producer.run + uploads = process_upload_queue(producer, uploader, upload_opts.merge(bucket: bucket)) + build_result(uploads) + ensure + @executor.shutdown unless @options[:executor] + end + + private + + def build_producer_opts(opts) + { + directory_uploader: self, + s3_prefix: opts.delete(:s3_prefix), + recursive: opts.delete(:recursive), + follow_symlinks: opts.delete(:follow_symlinks), + filter_callback: opts.delete(:filter_callback), + ignore_failure: @ignore_failure, + errors: @errors + } + end + + def build_result(upload_count) + uploads = [upload_count - @errors.count, 0].max + + if @abort_upload + msg = "failed to upload directory: uploaded #{uploads} files " \ + "and failed to upload #{@errors.count} files." + raise DirectoryUploadError.new(msg, @errors) + else + result = { completed_uploads: uploads, failed_uploads: @errors.count } + result[:errors] = @errors if @errors.any? + result + end + end + + def process_upload_queue(producer, uploader, opts) + upload_attempts = 0 + completion_queue = Queue.new + queue_executor = DefaultExecutor.new + while (file = producer.file_queue.shift) != :done + break if @abort_upload + + upload_attempts += 1 + queue_executor.post(file) do |f| + uploader.upload(f[:path], opts.merge(key: f[:key])) + rescue StandardError => e + @errors << e + @abort_upload = true unless @ignore_failure + ensure + completion_queue << :done + end + end + upload_attempts.times { completion_queue.pop } + upload_attempts + ensure + queue_executor.shutdown + end + + + # @api private + class FileProducer + def initialize(source_dir, options = {}) + @source_dir = source_dir + @s3_prefix = options[:s3_prefix] + @recursive = options[:recursive] || false + @follow_symlinks = options[:follow_symlinks] || false + @ignore_failure = options[:ignore_failure] + @filter_callback = options[:filter_callback] + @errors = options[:errors] + @directory_uploader = options[:directory_uploader] + @file_queue = SizedQueue.new(100) + end + + attr_accessor :file_queue + + def run + Thread.new do + if @recursive + find_recursively + else + find_directly + end + @file_queue << :done + end + end + + private + + def build_file_entry(file_path, key) + normalized_key = @s3_prefix ? File.join(@s3_prefix, key) : key + { path: file_path, key: normalized_key } + end + + def find_directly + Dir.each_child(@source_dir) do |entry| + break if @directory_uploader.abort_upload + + entry_path = File.join(@source_dir, entry) + next if File.directory?(entry_path) || skip_symlink?(entry_path) + next unless include_file?(entry_path, entry) + next unless valid_file_type?(entry_path) + + @file_queue << build_file_entry(entry_path, entry) + rescue StandardError => e + @errors << e + @directory_uploader.abort_upload = true unless @ignore_failure + end + end + + def find_recursively + if @follow_symlinks + visited = Set.new + visited << File.stat(@source_dir).ino + scan_directory(@source_dir, visited: visited) + else + scan_directory(@source_dir) + end + end + + def valid_file_type?(path) + File.file?(path) || File.symlink?(path) + end + + def skip_symlink?(path) + !@follow_symlinks && File.symlink?(path) + end + + def include_file?(file_path, file_name) + return true unless @filter_callback + + @filter_callback.call(file_path, file_name) + end + + def scan_directory(dir_path, key_prefix: '', visited: nil) + return if @directory_uploader.abort_upload + + Dir.each_child(dir_path) do |entry| + break if @directory_uploader.abort_upload + + full_path = File.join(dir_path, entry) + next unless include_file?(full_path, entry) + next if !@follow_symlinks && File.symlink?(full_path) + + if File.directory?(full_path) + handle_directory(full_path, entry, key_prefix, visited) + elsif valid_file_type?(full_path) + key = key_prefix.empty? ? entry : File.join(key_prefix, entry) + @file_queue << build_file_entry(full_path, key) + end + rescue StandardError => e + @errors << e + @directory_uploader.abort_upload = true unless @ignore_failure + end + end + + def handle_directory(dir_path, dir_name, key_prefix, visited) + if @follow_symlinks && visited + stat = File.stat(dir_path) + return if visited.include?(stat.ino) + + visited << stat.ino + end + new_prefix = key_prefix.empty? ? dir_name : File.join(key_prefix, dir_name) + scan_directory(dir_path, key_prefix: new_prefix, visited: visited) + end + end + end + end +end diff --git a/gems/aws-sdk-s3/lib/aws-sdk-s3/file_downloader.rb b/gems/aws-sdk-s3/lib/aws-sdk-s3/file_downloader.rb index 517a06dabea..4181c7eaeac 100644 --- a/gems/aws-sdk-s3/lib/aws-sdk-s3/file_downloader.rb +++ b/gems/aws-sdk-s3/lib/aws-sdk-s3/file_downloader.rb @@ -8,184 +8,243 @@ module Aws module S3 # @api private class FileDownloader - MIN_CHUNK_SIZE = 5 * 1024 * 1024 MAX_PARTS = 10_000 + HEAD_OPTIONS = Set.new(Client.api.operation(:head_object).input.shape.member_names) + GET_OPTIONS = Set.new(Client.api.operation(:get_object).input.shape.member_names) def initialize(options = {}) @client = options[:client] || Client.new + @executor = options[:executor] end # @return [Client] attr_reader :client def download(destination, options = {}) - valid_types = [String, Pathname, File, Tempfile] - unless valid_types.include?(destination.class) - raise ArgumentError, "Invalid destination, expected #{valid_types.join(', ')} but got: #{destination.class}" - end - - @destination = destination - @mode = options.delete(:mode) || 'auto' - @thread_count = options.delete(:thread_count) || 10 - @chunk_size = options.delete(:chunk_size) - @on_checksum_validated = options.delete(:on_checksum_validated) - @progress_callback = options.delete(:progress_callback) - @params = options - validate! + validate_destination!(destination) + opts = build_download_opts(destination, options.dup) + validate_opts!(opts) Aws::Plugins::UserAgent.metric('S3_TRANSFER') do - case @mode - when 'auto' then multipart_download - when 'single_request' then single_request - when 'get_range' - raise ArgumentError, 'In get_range mode, :chunk_size must be provided' unless @chunk_size - - resp = @client.head_object(@params) - multithreaded_get_by_ranges(resp.content_length, resp.etag) - else - raise ArgumentError, "Invalid mode #{@mode} provided, :mode should be single_request, get_range or auto" + case opts[:mode] + when 'auto' then multipart_download(opts) + when 'single_request' then single_request(opts) + when 'get_range' then range_request(opts) end end - File.rename(@temp_path, @destination) if @temp_path + File.rename(opts[:temp_path], destination) if opts[:temp_path] ensure - File.delete(@temp_path) if @temp_path && File.exist?(@temp_path) + cleanup_temp_file!(opts) end private - def validate! - return unless @on_checksum_validated && !@on_checksum_validated.respond_to?(:call) + def build_download_opts(destination, opts) + { + destination: destination, + mode: opts.delete(:mode) || 'auto', + chunk_size: opts.delete(:chunk_size), + on_checksum_validated: opts.delete(:on_checksum_validated), + progress_callback: opts.delete(:progress_callback), + params: opts, + temp_path: nil + } + end + + def cleanup_temp_file!(opts) + return unless opts + + temp_file = opts[:temp_path] + File.delete(temp_file) if temp_file && File.exist?(temp_file) + end + + def download_with_executor(part_list, total_size, opts) + download_attempts = 0 + completion_queue = Queue.new + abort_download = false + error = nil + progress = + if (progress_callback = opts[:progress_callback]) + MultipartProgress.new(part_list, total_size, progress_callback) + end + + while (part = part_list.shift) + break if abort_download + + download_attempts += 1 + @executor.post(part) do |p| + update_progress(progress, p) if progress + + resp = @client.get_object(p.params) + range = extract_range(resp.content_range) + validate_range(range, p.params[:range]) if p.params[:range] + write(resp.body, range, opts) + + if opts[:on_checksum_validated] && resp.checksum_validated + opts[:on_checksum_validated].call(resp.checksum_validated, resp) + end + rescue StandardError => e + abort_download = true + error = e + ensure + completion_queue << :done + end + end + + download_attempts.times { completion_queue.pop } + raise error unless error.nil? + end + + def get_opts(opts) + GET_OPTIONS.each_with_object({}) { |k, h| h[k] = opts[k] if opts.key?(k) } + end + + def head_opts(opts) + HEAD_OPTIONS.each_with_object({}) { |k, h| h[k] = opts[k] if opts.key?(k) } + end - raise ArgumentError, ':on_checksum_validated must be callable' + def compute_chunk(chunk_size, file_size) + raise ArgumentError, ":chunk_size shouldn't exceed total file size." if chunk_size && chunk_size > file_size + + chunk_size || [(file_size.to_f / MAX_PARTS).ceil, MIN_CHUNK_SIZE].max.to_i end - def multipart_download - resp = @client.head_object(@params.merge(part_number: 1)) + def compute_mode(file_size, total_parts, etag, opts) + chunk_size = compute_chunk(opts[:chunk_size], file_size) + part_size = (file_size.to_f / total_parts).ceil + + resolve_temp_path(opts) + if chunk_size < part_size + multithreaded_get_by_ranges(file_size, etag, opts) + else + multithreaded_get_by_parts(total_parts, file_size, etag, opts) + end + end + + def extract_range(value) + value.match(%r{bytes (?\d+-\d+)/\d+})[:range] + end + + def multipart_download(opts) + resp = @client.head_object(head_opts(opts[:params].merge(part_number: 1))) count = resp.parts_count if count.nil? || count <= 1 if resp.content_length <= MIN_CHUNK_SIZE - single_request + single_request(opts) else - multithreaded_get_by_ranges(resp.content_length, resp.etag) + resolve_temp_path(opts) + multithreaded_get_by_ranges(resp.content_length, resp.etag, opts) end else # covers cases when given object is not uploaded via UploadPart API - resp = @client.head_object(@params) # partNumber is an option + resp = @client.head_object(head_opts(opts[:params])) # partNumber is an option if resp.content_length <= MIN_CHUNK_SIZE - single_request + single_request(opts) else - compute_mode(resp.content_length, count, resp.etag) + compute_mode(resp.content_length, count, resp.etag, opts) end end end - def compute_mode(file_size, count, etag) - chunk_size = compute_chunk(file_size) - part_size = (file_size.to_f / count).ceil - if chunk_size < part_size - multithreaded_get_by_ranges(file_size, etag) - else - multithreaded_get_by_parts(count, file_size, etag) + def multithreaded_get_by_parts(total_parts, file_size, etag, opts) + parts = (1..total_parts).map do |part| + params = get_opts(opts[:params].merge(part_number: part, if_match: etag)) + Part.new(part_number: part, params: params) end + download_with_executor(PartList.new(parts), file_size, opts) end - def compute_chunk(file_size) - raise ArgumentError, ":chunk_size shouldn't exceed total file size." if @chunk_size && @chunk_size > file_size - - @chunk_size || [(file_size.to_f / MAX_PARTS).ceil, MIN_CHUNK_SIZE].max.to_i - end - - def multithreaded_get_by_ranges(file_size, etag) + def multithreaded_get_by_ranges(file_size, etag, opts) offset = 0 - default_chunk_size = compute_chunk(file_size) + default_chunk_size = compute_chunk(opts[:chunk_size], file_size) chunks = [] part_number = 1 # parts start at 1 while offset < file_size progress = offset + default_chunk_size progress = file_size if progress > file_size - params = @params.merge(range: "bytes=#{offset}-#{progress - 1}", if_match: etag) + params = get_opts(opts[:params].merge(range: "bytes=#{offset}-#{progress - 1}", if_match: etag)) chunks << Part.new(part_number: part_number, size: (progress - offset), params: params) part_number += 1 offset = progress end - download_in_threads(PartList.new(chunks), file_size) - end - - def multithreaded_get_by_parts(n_parts, total_size, etag) - parts = (1..n_parts).map do |part| - Part.new(part_number: part, params: @params.merge(part_number: part, if_match: etag)) - end - download_in_threads(PartList.new(parts), total_size) - end - - def download_in_threads(pending, total_size) - threads = [] - progress = MultipartProgress.new(pending, total_size, @progress_callback) if @progress_callback - unless [File, Tempfile].include?(@destination.class) - @temp_path = "#{@destination}.s3tmp.#{SecureRandom.alphanumeric(8)}" - end - @thread_count.times do - thread = Thread.new do - begin - while (part = pending.shift) - if progress - part.params[:on_chunk_received] = - proc do |_chunk, bytes, total| - progress.call(part.part_number, bytes, total) - end - end - resp = @client.get_object(part.params) - range = extract_range(resp.content_range) - validate_range(range, part.params[:range]) if part.params[:range] - write(resp.body, range) - if @on_checksum_validated && resp.checksum_validated - @on_checksum_validated.call(resp.checksum_validated, resp) - end - end - nil - rescue StandardError => e - pending.clear! # keep other threads from downloading other parts - raise e - end - end - threads << thread - end - threads.map(&:value).compact + download_with_executor(PartList.new(chunks), file_size, opts) end - def extract_range(value) - value.match(%r{bytes (?\d+-\d+)/\d+})[:range] + def range_request(opts) + resp = @client.head_object(head_opts(opts[:params])) + resolve_temp_path(opts) + multithreaded_get_by_ranges(resp.content_length, resp.etag, opts) end - def validate_range(actual, expected) - return if actual == expected.match(/bytes=(?\d+-\d+)/)[:range] + def resolve_temp_path(opts) + return if [File, Tempfile].include?(opts[:destination].class) - raise MultipartDownloadError, "multipart download failed: expected range of #{expected} but got #{actual}" + opts[:temp_path] ||= "#{opts[:destination]}.s3tmp.#{SecureRandom.alphanumeric(8)}" end - def write(body, range) - path = @temp_path || @destination - File.write(path, body.read, range.split('-').first.to_i) - end - - def single_request - params = @params.merge(response_target: @destination) - params[:on_chunk_received] = single_part_progress if @progress_callback + def single_request(opts) + params = get_opts(opts[:params]).merge(response_target: opts[:destination]) + params[:on_chunk_received] = single_part_progress(opts) if opts[:progress_callback] resp = @client.get_object(params) - return resp unless @on_checksum_validated + return resp unless opts[:on_checksum_validated] - @on_checksum_validated.call(resp.checksum_validated, resp) if resp.checksum_validated + opts[:on_checksum_validated].call(resp.checksum_validated, resp) if resp.checksum_validated resp end - def single_part_progress + def single_part_progress(opts) proc do |_chunk, bytes_read, total_size| - @progress_callback.call([bytes_read], [total_size], total_size) + opts[:progress_callback].call([bytes_read], [total_size], total_size) + end + end + + def update_progress(progress, part) + part.params[:on_chunk_received] = + proc do |_chunk, bytes, total| + progress.call(part.part_number, bytes, total) + end + end + + def validate_destination!(destination) + valid_types = [String, Pathname, File, Tempfile] + return if valid_types.include?(destination.class) + + raise ArgumentError, "Invalid destination, expected #{valid_types.join(', ')} but got: #{destination.class}" + end + + def validate_opts!(opts) + if opts[:on_checksum_validated] && !opts[:on_checksum_validated].respond_to?(:call) + raise ArgumentError, ':on_checksum_validated must be callable' + end + + valid_modes = %w[auto get_range single_request] + unless valid_modes.include?(opts[:mode]) + msg = "Invalid mode #{opts[:mode]} provided, :mode should be single_request, get_range or auto" + raise ArgumentError, msg + end + + if opts[:mode] == 'get_range' && opts[:chunk_size].nil? + raise ArgumentError, 'In get_range mode, :chunk_size must be provided' + end + + if opts[:chunk_size] && opts[:chunk_size] <= 0 + raise ArgumentError, ':chunk_size must be positive' end end + def validate_range(actual, expected) + return if actual == expected.match(/bytes=(?\d+-\d+)/)[:range] + + raise MultipartDownloadError, "multipart download failed: expected range of #{expected} but got #{actual}" + end + + def write(body, range, opts) + path = opts[:temp_path] || opts[:destination] + File.write(path, body.read, range.split('-').first.to_i) + end + # @api private class Part < Struct.new(:part_number, :size, :params) include Aws::Structure diff --git a/gems/aws-sdk-s3/lib/aws-sdk-s3/file_uploader.rb b/gems/aws-sdk-s3/lib/aws-sdk-s3/file_uploader.rb index 587066551ea..62dbb07f8c3 100644 --- a/gems/aws-sdk-s3/lib/aws-sdk-s3/file_uploader.rb +++ b/gems/aws-sdk-s3/lib/aws-sdk-s3/file_uploader.rb @@ -13,8 +13,8 @@ class FileUploader # @option options [Client] :client # @option options [Integer] :multipart_threshold (104857600) def initialize(options = {}) - @options = options @client = options[:client] || Client.new + @executor = options[:executor] @multipart_threshold = options[:multipart_threshold] || DEFAULT_MULTIPART_THRESHOLD end @@ -36,11 +36,9 @@ def initialize(options = {}) # @return [void] def upload(source, options = {}) Aws::Plugins::UserAgent.metric('S3_TRANSFER') do - if File.size(source) >= multipart_threshold - MultipartFileUploader.new(@options).upload(source, options) + if File.size(source) >= @multipart_threshold + MultipartFileUploader.new(client: @client, executor: @executor).upload(source, options) else - # remove multipart parameters not supported by put_object - options.delete(:thread_count) put_object(source, options) end end @@ -48,9 +46,9 @@ def upload(source, options = {}) private - def open_file(source) - if String === source || Pathname === source - File.open(source, 'rb') { |file| yield(file) } + def open_file(source, &block) + if source.is_a?(String) || source.is_a?(Pathname) + File.open(source, 'rb', &block) else yield(source) end diff --git a/gems/aws-sdk-s3/lib/aws-sdk-s3/multipart_file_uploader.rb b/gems/aws-sdk-s3/lib/aws-sdk-s3/multipart_file_uploader.rb index bcc05f1fc9e..bab023e1bf7 100644 --- a/gems/aws-sdk-s3/lib/aws-sdk-s3/multipart_file_uploader.rb +++ b/gems/aws-sdk-s3/lib/aws-sdk-s3/multipart_file_uploader.rb @@ -7,7 +7,6 @@ module Aws module S3 # @api private class MultipartFileUploader - MIN_PART_SIZE = 5 * 1024 * 1024 # 5MB MAX_PARTS = 10_000 DEFAULT_THREAD_COUNT = 10 @@ -21,10 +20,9 @@ class MultipartFileUploader ) # @option options [Client] :client - # @option options [Integer] :thread_count (DEFAULT_THREAD_COUNT) def initialize(options = {}) @client = options[:client] || Client.new - @thread_count = options[:thread_count] || DEFAULT_THREAD_COUNT + @executor = options[:executor] end # @return [Client] @@ -38,11 +36,12 @@ def initialize(options = {}) # It will be invoked with [bytes_read], [total_sizes] # @return [Seahorse::Client::Response] - the CompleteMultipartUploadResponse def upload(source, options = {}) - raise ArgumentError, 'unable to multipart upload files smaller than 5MB' if File.size(source) < MIN_PART_SIZE + file_size = File.size(source) + raise ArgumentError, 'unable to multipart upload files smaller than 5MB' if file_size < MIN_PART_SIZE upload_id = initiate_upload(options) - parts = upload_parts(upload_id, source, options) - complete_upload(upload_id, parts, source, options) + parts = upload_parts(upload_id, source, file_size, options) + complete_upload(upload_id, parts, file_size, options) end private @@ -51,22 +50,22 @@ def initiate_upload(options) @client.create_multipart_upload(create_opts(options)).upload_id end - def complete_upload(upload_id, parts, source, options) + def complete_upload(upload_id, parts, file_size, options) @client.complete_multipart_upload( **complete_opts(options).merge( upload_id: upload_id, multipart_upload: { parts: parts }, - mpu_object_size: File.size(source) + mpu_object_size: file_size ) ) rescue StandardError => e abort_upload(upload_id, options, [e]) end - def upload_parts(upload_id, source, options) + def upload_parts(upload_id, source, file_size, options) completed = PartList.new - pending = PartList.new(compute_parts(upload_id, source, options)) - errors = upload_in_threads(pending, completed, options) + pending = PartList.new(compute_parts(upload_id, source, file_size, options)) + errors = upload_with_executor(pending, completed, options) if errors.empty? completed.to_a.sort_by { |part| part[:part_number] } else @@ -86,17 +85,20 @@ def abort_upload(upload_id, options, errors) raise MultipartUploadError.new(msg, errors + [e]) end - def compute_parts(upload_id, source, options) - size = File.size(source) - default_part_size = compute_default_part_size(size) + def compute_parts(upload_id, source, file_size, options) + default_part_size = compute_default_part_size(file_size) offset = 0 part_number = 1 parts = [] - while offset < size + while offset < file_size parts << upload_part_opts(options).merge( upload_id: upload_id, part_number: part_number, - body: FilePart.new(source: source, offset: offset, size: part_size(size, default_part_size, offset)) + body: FilePart.new( + source: source, + offset: offset, + size: part_size(file_size, default_part_size, offset) + ) ) part_number += 1 offset += default_part_size @@ -108,24 +110,20 @@ def checksum_key?(key) CHECKSUM_KEYS.include?(key) end - def has_checksum_key?(keys) + def has_checksum_keys?(keys) keys.any? { |key| checksum_key?(key) } end def create_opts(options) opts = { checksum_algorithm: Aws::Plugins::ChecksumAlgorithm::DEFAULT_CHECKSUM } - opts[:checksum_type] = 'FULL_OBJECT' if has_checksum_key?(options.keys) - CREATE_OPTIONS.each_with_object(opts) do |key, hash| - hash[key] = options[key] if options.key?(key) - end + opts[:checksum_type] = 'FULL_OBJECT' if has_checksum_keys?(options.keys) + CREATE_OPTIONS.each_with_object(opts) { |k, h| h[k] = options[k] if options.key?(k) } end def complete_opts(options) opts = {} - opts[:checksum_type] = 'FULL_OBJECT' if has_checksum_key?(options.keys) - COMPLETE_OPTIONS.each_with_object(opts) do |key, hash| - hash[key] = options[key] if options.key?(key) - end + opts[:checksum_type] = 'FULL_OBJECT' if has_checksum_keys?(options.keys) + COMPLETE_OPTIONS.each_with_object(opts) { |k, h| h[k] = options[k] if options.key?(k) } end def upload_part_opts(options) @@ -135,43 +133,43 @@ def upload_part_opts(options) end end - def upload_in_threads(pending, completed, options) - threads = [] + def upload_with_executor(pending, completed, options) + upload_attempts = 0 + completion_queue = Queue.new + abort_upload = false + errors = [] + if (callback = options[:progress_callback]) progress = MultipartProgress.new(pending, callback) end - options.fetch(:thread_count, @thread_count).times do - thread = Thread.new do - begin - while (part = pending.shift) - if progress - part[:on_chunk_sent] = - proc do |_chunk, bytes, _total| - progress.call(part[:part_number], bytes) - end - end - resp = @client.upload_part(part) - part[:body].close - completed_part = { etag: resp.etag, part_number: part[:part_number] } - algorithm = resp.context.params[:checksum_algorithm] - k = "checksum_#{algorithm.downcase}".to_sym - completed_part[k] = resp.send(k) - completed.push(completed_part) - end - nil - rescue StandardError => e - # keep other threads from uploading other parts - pending.clear! - e - end + + while (part = pending.shift) + break if abort_upload + + upload_attempts += 1 + @executor.post(part) do |p| + update_progress(progress, p) if progress + resp = @client.upload_part(p) + p[:body].close + completed_part = { etag: resp.etag, part_number: p[:part_number] } + algorithm = resp.context.params[:checksum_algorithm].downcase + k = "checksum_#{algorithm}".to_sym + completed_part[k] = resp.send(k) + completed.push(completed_part) + rescue StandardError => e + abort_upload = true + errors << e + ensure + completion_queue << :done end - threads << thread end - threads.map(&:value).compact + + upload_attempts.times { completion_queue.pop } + errors end - def compute_default_part_size(source_size) - [(source_size.to_f / MAX_PARTS).ceil, MIN_PART_SIZE].max.to_i + def compute_default_part_size(file_size) + [(file_size.to_f / MAX_PARTS).ceil, MIN_PART_SIZE].max.to_i end def part_size(total_size, part_size, offset) @@ -182,6 +180,13 @@ def part_size(total_size, part_size, offset) end end + def update_progress(progress, part) + part[:on_chunk_sent] = + proc do |_chunk, bytes, _total| + progress.call(part[:part_number], bytes) + end + end + # @api private class PartList def initialize(parts = []) diff --git a/gems/aws-sdk-s3/lib/aws-sdk-s3/transfer_manager.rb b/gems/aws-sdk-s3/lib/aws-sdk-s3/transfer_manager.rb index 03bd249d5e2..7f925cc0ba1 100644 --- a/gems/aws-sdk-s3/lib/aws-sdk-s3/transfer_manager.rb +++ b/gems/aws-sdk-s3/lib/aws-sdk-s3/transfer_manager.rb @@ -18,11 +18,15 @@ class TransferManager # will be created automatically. def initialize(options = {}) @client = options.delete(:client) || Client.new + @executor = options.delete(:executor) end # @return [S3::Client] attr_reader :client + # @return [Object] executor + attr_reader :executor + # Downloads a file in S3 to a path on disk. # # # small files (< 5MB) are downloaded in a single API call @@ -107,6 +111,11 @@ def download_file(destination, bucket:, key:, **options) true end + # TODO: Docs + def download_directory(destination, bucket:, **options) + # TODO + end + # Uploads a file from disk to S3. # # # a small file are uploaded with PutObject API @@ -178,13 +187,19 @@ def upload_file(source, bucket:, key:, **options) uploading_options = options.dup uploader = FileUploader.new( multipart_threshold: uploading_options.delete(:multipart_threshold), - client: @client + client: @client, + executor: @executor ) response = uploader.upload(source, uploading_options.merge(bucket: bucket, key: key)) yield response if block_given? true end + # TODO: Docs + def upload_directory(source, bucket:, **options) + # TODO + end + # Uploads a stream in a streaming fashion to S3. # # Passed chunks automatically split into multipart upload parts and the parts are uploaded in parallel. @@ -248,5 +263,6 @@ def upload_stream(bucket:, key:, **options, &block) true end end + end end diff --git a/gems/aws-sdk-s3/spec/file_downloader_spec.rb b/gems/aws-sdk-s3/spec/file_downloader_spec.rb index 0f3dd8fb2cf..2632377b981 100644 --- a/gems/aws-sdk-s3/spec/file_downloader_spec.rb +++ b/gems/aws-sdk-s3/spec/file_downloader_spec.rb @@ -7,7 +7,8 @@ module Aws module S3 describe FileDownloader do let(:client) { S3::Client.new(stub_responses: true) } - let(:subject) { FileDownloader.new(client: client) } + let(:executor) { DefaultExecutor.new } + let(:subject) { FileDownloader.new(client: client, executor: executor) } let(:tmpdir) { Dir.tmpdir } describe '#initialize' do @@ -198,7 +199,6 @@ module S3 it 'raises when checksum validation fails on multipart object' do client.stub_responses(:get_object, { body: 'body', checksum_sha1: 'invalid' }) - expect(Thread).to receive(:new).and_yield.and_return(double(value: nil)) expect { subject.download(path, parts_params) }.to raise_error(Aws::Errors::ChecksumError) end @@ -208,7 +208,6 @@ module S3 expect(ctx.params[:if_match]).to eq('test-etag') 'PreconditionFailed' }) - expect(Thread).to receive(:new).and_yield.and_return(double(value: nil)) expect { subject.download(path, range_params.merge(chunk_size: one_meg, mode: 'get_range')) } .to raise_error(Aws::S3::Errors::PreconditionFailed) end @@ -219,8 +218,6 @@ module S3 expect(ctx.params[:if_match]).to eq('test-etag') 'PreconditionFailed' }) - - expect(Thread).to receive(:new).and_yield.and_return(double(value: nil)) expect { subject.download(path, parts_params) }.to raise_error(Aws::S3::Errors::PreconditionFailed) end @@ -246,7 +243,6 @@ module S3 it 'raises when range validation fails' do client.stub_responses(:get_object, { body: 'body', content_range: 'bytes 0-3/4' }) - expect(Thread).to receive(:new).and_yield.and_return(double(value: nil)) expect { subject.download(path, range_params.merge(mode: 'get_range', chunk_size: one_meg)) } .to raise_error(Aws::S3::MultipartDownloadError) end @@ -263,7 +259,6 @@ module S3 responses[context.params[:range]] }) - expect(Thread).to receive(:new).and_yield.and_return(double(value: nil)) expect { subject.download(path, range_params.merge(chunk_size: 5 * one_meg, mode: 'get_range')) } .to raise_error(Aws::S3::MultipartDownloadError) expect(File.exist?(path)).to be(true) diff --git a/gems/aws-sdk-s3/spec/file_uploader_spec.rb b/gems/aws-sdk-s3/spec/file_uploader_spec.rb index 64dbaf7cdd2..dc3ab94b052 100644 --- a/gems/aws-sdk-s3/spec/file_uploader_spec.rb +++ b/gems/aws-sdk-s3/spec/file_uploader_spec.rb @@ -81,12 +81,6 @@ module S3 subject.upload(ten_meg_file.path, params) end - - it 'does not fail when given :thread_count' do - expect(client).to receive(:put_object).with(params.merge(body: ten_meg_file)) - - subject.upload(ten_meg_file, params.merge(thread_count: 1)) - end end end end diff --git a/gems/aws-sdk-s3/spec/multipart_file_uploader_spec.rb b/gems/aws-sdk-s3/spec/multipart_file_uploader_spec.rb index 82e147986e3..54b50a13a01 100644 --- a/gems/aws-sdk-s3/spec/multipart_file_uploader_spec.rb +++ b/gems/aws-sdk-s3/spec/multipart_file_uploader_spec.rb @@ -7,7 +7,8 @@ module Aws module S3 describe MultipartFileUploader do let(:client) { S3::Client.new(stub_responses: true) } - let(:subject) { MultipartFileUploader.new(client: client) } + let(:executor) { DefaultExecutor.new } + let(:subject) { MultipartFileUploader.new(client: client, executor: executor) } let(:params) { { bucket: 'bucket', key: 'key' } } describe '#initialize' do @@ -85,7 +86,6 @@ module S3 end it 'reports progress for multipart uploads' do - allow(Thread).to receive(:new).and_yield.and_return(double(value: nil)) client.stub_responses(:create_multipart_upload, upload_id: 'id') client.stub_responses(:complete_multipart_upload) expect(client).to receive(:upload_part).exactly(24).times do |args| @@ -127,10 +127,6 @@ module S3 end it 'reports when it is unable to abort a failed multipart upload' do - allow(Thread).to receive(:new) do |_, &block| - double(value: block.call) - end - client.stub_responses( :upload_part, [