diff --git a/Gemfile b/Gemfile index d2d6db9..d2167cd 100644 --- a/Gemfile +++ b/Gemfile @@ -7,6 +7,7 @@ gem "byebug" gem "sqlite3" gem "httparty" +gem "concurrent-ruby" gem "aws-sdk", "~> 2", require: false gem "google-cloud-storage", "~> 1.3", require: false diff --git a/Gemfile.lock b/Gemfile.lock index cce1d34..aa8d175 100644 --- a/Gemfile.lock +++ b/Gemfile.lock @@ -10,29 +10,29 @@ PATH GEM remote: https://rubygems.org/ specs: - actionpack (5.1.1) - actionview (= 5.1.1) - activesupport (= 5.1.1) + actionpack (5.1.2) + actionview (= 5.1.2) + activesupport (= 5.1.2) rack (~> 2.0) rack-test (~> 0.6.3) rails-dom-testing (~> 2.0) rails-html-sanitizer (~> 1.0, >= 1.0.2) - actionview (5.1.1) - activesupport (= 5.1.1) + actionview (5.1.2) + activesupport (= 5.1.2) builder (~> 3.1) erubi (~> 1.4) rails-dom-testing (~> 2.0) rails-html-sanitizer (~> 1.0, >= 1.0.3) - activejob (5.1.1) - activesupport (= 5.1.1) + activejob (5.1.2) + activesupport (= 5.1.2) globalid (>= 0.3.6) - activemodel (5.1.1) - activesupport (= 5.1.1) - activerecord (5.1.1) - activemodel (= 5.1.1) - activesupport (= 5.1.1) + activemodel (5.1.2) + activesupport (= 5.1.2) + activerecord (5.1.2) + activemodel (= 5.1.2) + activesupport (= 5.1.2) arel (~> 8.0) - activesupport (5.1.1) + activesupport (5.1.2) concurrent-ruby (~> 1.0, >= 1.0.2) i18n (~> 0.7) minitest (~> 5.1) @@ -41,26 +41,26 @@ GEM public_suffix (~> 2.0, >= 2.0.2) arel (8.0.0) ast (2.3.0) - aws-sdk (2.10.7) - aws-sdk-resources (= 2.10.7) - aws-sdk-core (2.10.7) + aws-sdk (2.10.11) + aws-sdk-resources (= 2.10.11) + aws-sdk-core (2.10.11) aws-sigv4 (~> 1.0) jmespath (~> 1.0) - aws-sdk-resources (2.10.7) - aws-sdk-core (= 2.10.7) - aws-sigv4 (1.0.0) + aws-sdk-resources (2.10.11) + aws-sdk-core (= 2.10.11) + aws-sigv4 (1.0.1) builder (3.2.3) byebug (9.0.6) concurrent-ruby (1.0.5) declarative (0.0.9) declarative-option (0.1.0) digest-crc (0.4.1) - erubi (1.6.0) + erubi (1.6.1) faraday (0.12.1) multipart-post (>= 1.2, < 3) globalid (0.4.0) activesupport (>= 4.2.0) - google-api-client (0.13.0) + google-api-client (0.13.1) addressable (~> 2.5, >= 2.5.1) googleauth (~> 0.5) httpclient (>= 2.8.1, < 3.0) @@ -87,7 +87,7 @@ GEM httparty (0.15.5) multi_xml (>= 0.5.2) httpclient (2.8.3) - i18n (0.8.4) + i18n (0.8.6) jmespath (1.3.1) jwt (1.5.6) little-plugger (1.1.4) @@ -100,13 +100,13 @@ GEM mime-types (3.1) mime-types-data (~> 3.2015) mime-types-data (3.2016.0521) - mini_portile2 (2.1.0) + mini_portile2 (2.2.0) minitest (5.10.2) multi_json (1.12.1) multi_xml (0.6.0) multipart-post (2.0.0) - nokogiri (1.7.2) - mini_portile2 (~> 2.1.0) + nokogiri (1.8.0) + mini_portile2 (~> 2.2.0) os (0.9.6) parallel (1.11.2) parser (2.4.0.0) @@ -157,6 +157,7 @@ DEPENDENCIES aws-sdk (~> 2) bundler (~> 1.15) byebug + concurrent-ruby google-cloud-storage (~> 1.3) httparty rake diff --git a/lib/active_storage/async_uploader.rb b/lib/active_storage/async_uploader.rb new file mode 100644 index 0000000..dd448a3 --- /dev/null +++ b/lib/active_storage/async_uploader.rb @@ -0,0 +1,34 @@ +require "concurrent/promise" + +class ActiveStorage::AsyncUploader + class << self + def result(uploaders) + promises = uploaders.map(&:promise) + Concurrent::Promise.zip(*promises).value! + end + end + + attr_reader :promise + + def initialize(service, key, checksum: nil) + @data = "" + @eof = false + @promise = Concurrent::Promise.execute do + until eof? do; end + service.upload key, StringIO.new(@data), checksum: checksum + end + end + + def eof? + @eof + end + + def write(chunk) + @data << chunk + @eof = false + end + + def close + @eof = true + end +end diff --git a/lib/active_storage/service/mirror_service.rb b/lib/active_storage/service/mirror_service.rb index 54465ca..d06f308 100644 --- a/lib/active_storage/service/mirror_service.rb +++ b/lib/active_storage/service/mirror_service.rb @@ -1,6 +1,9 @@ require "active_support/core_ext/module/delegation" +require "active_storage/async_uploader" +require "concurrent/promise" class ActiveStorage::Service::MirrorService < ActiveStorage::Service + CHUNK_SIZE = 1024 attr_reader :primary, :mirrors delegate :download, :exist?, :url, to: :primary @@ -17,13 +20,18 @@ def initialize(primary:, mirrors:) end def upload(key, io, checksum: nil) - each_service.collect do |service| - service.upload key, io.tap(&:rewind), checksum: checksum + uploaders = each_service.collect do |service| + ActiveStorage::AsyncUploader.new(service, key, checksum: checksum) end + io.rewind + while chunk = io.read(CHUNK_SIZE) + uploaders.each { |uploader| uploader.write(chunk) } + end + ActiveStorage::AsyncUploader.result(uploaders.each(&:close)) end def delete(key) - perform_across_services :delete, key + perform_async_across_services :delete, key end private @@ -31,10 +39,10 @@ def each_service(&block) [ primary, *mirrors ].each(&block) end - def perform_across_services(method, *args) - # FIXME: Convert to be threaded - each_service.collect do |service| - service.public_send method, *args + def perform_async_across_services(method, *args) + promises = each_service.collect do |service| + Concurrent::Promise.execute { service.public_send method, *args } end + Concurrent::Promise.zip(*promises).value! end end