From 894cbcb5e7120134a2b488a413aed5e85cc3dfd2 Mon Sep 17 00:00:00 2001 From: Stanislav Gospodinov Date: Fri, 7 Jul 2017 18:42:41 +0100 Subject: [PATCH 1/3] Concurrent deleting in mirror services --- lib/active_storage/service/mirror_service.rb | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/lib/active_storage/service/mirror_service.rb b/lib/active_storage/service/mirror_service.rb index 54465ca..fa48b68 100644 --- a/lib/active_storage/service/mirror_service.rb +++ b/lib/active_storage/service/mirror_service.rb @@ -1,4 +1,5 @@ require "active_support/core_ext/module/delegation" +require "concurrent/promise" class ActiveStorage::Service::MirrorService < ActiveStorage::Service attr_reader :primary, :mirrors @@ -32,9 +33,9 @@ def each_service(&block) end def perform_across_services(method, *args) - # FIXME: Convert to be threaded - each_service.collect do |service| - service.public_send method, *args + promises = services.collect do |service| + Concurrent::Promise.execute { service.public_send method, *args } end + Concurrent::Promise.zip(*promises).value end end From daa40ac17d58f8f3c00384ee93de8d9bfd2172ed Mon Sep 17 00:00:00 2001 From: Stanislav Gospodinov Date: Sun, 9 Jul 2017 16:30:21 +0100 Subject: [PATCH 2/3] Adding concurrent-ruby as a dependency --- Gemfile | 5 +++-- Gemfile.lock | 51 ++++++++++++++++++++++++++------------------------- 2 files changed, 29 insertions(+), 27 deletions(-) diff --git a/Gemfile b/Gemfile index d2d6db9..f4d38fc 100644 --- a/Gemfile +++ b/Gemfile @@ -5,8 +5,9 @@ gemspec gem "rake" gem "byebug" -gem "sqlite3" -gem "httparty" +gem 'sqlite3' +gem 'httparty' +gem 'concurrent-ruby' gem "aws-sdk", "~> 2", require: false gem "google-cloud-storage", "~> 1.3", require: false diff --git a/Gemfile.lock b/Gemfile.lock index cce1d34..aa8d175 100644 --- a/Gemfile.lock +++ b/Gemfile.lock @@ -10,29 +10,29 @@ PATH GEM remote: https://rubygems.org/ specs: - actionpack (5.1.1) - actionview (= 5.1.1) - activesupport (= 5.1.1) + actionpack (5.1.2) + actionview (= 5.1.2) + activesupport (= 5.1.2) rack (~> 2.0) rack-test (~> 0.6.3) rails-dom-testing (~> 2.0) rails-html-sanitizer (~> 1.0, >= 1.0.2) - actionview (5.1.1) - activesupport (= 5.1.1) + actionview (5.1.2) + activesupport (= 5.1.2) builder (~> 3.1) erubi (~> 1.4) rails-dom-testing (~> 2.0) rails-html-sanitizer (~> 1.0, >= 1.0.3) - activejob (5.1.1) - activesupport (= 5.1.1) + activejob (5.1.2) + activesupport (= 5.1.2) globalid (>= 0.3.6) - activemodel (5.1.1) - activesupport (= 5.1.1) - activerecord (5.1.1) - activemodel (= 5.1.1) - activesupport (= 5.1.1) + activemodel (5.1.2) + activesupport (= 5.1.2) + activerecord (5.1.2) + activemodel (= 5.1.2) + activesupport (= 5.1.2) arel (~> 8.0) - activesupport (5.1.1) + activesupport (5.1.2) concurrent-ruby (~> 1.0, >= 1.0.2) i18n (~> 0.7) minitest (~> 5.1) @@ -41,26 +41,26 @@ GEM public_suffix (~> 2.0, >= 2.0.2) arel (8.0.0) ast (2.3.0) - aws-sdk (2.10.7) - aws-sdk-resources (= 2.10.7) - aws-sdk-core (2.10.7) + aws-sdk (2.10.11) + aws-sdk-resources (= 2.10.11) + aws-sdk-core (2.10.11) aws-sigv4 (~> 1.0) jmespath (~> 1.0) - aws-sdk-resources (2.10.7) - aws-sdk-core (= 2.10.7) - aws-sigv4 (1.0.0) + aws-sdk-resources (2.10.11) + aws-sdk-core (= 2.10.11) + aws-sigv4 (1.0.1) builder (3.2.3) byebug (9.0.6) concurrent-ruby (1.0.5) declarative (0.0.9) declarative-option (0.1.0) digest-crc (0.4.1) - erubi (1.6.0) + erubi (1.6.1) faraday (0.12.1) multipart-post (>= 1.2, < 3) globalid (0.4.0) activesupport (>= 4.2.0) - google-api-client (0.13.0) + google-api-client (0.13.1) addressable (~> 2.5, >= 2.5.1) googleauth (~> 0.5) httpclient (>= 2.8.1, < 3.0) @@ -87,7 +87,7 @@ GEM httparty (0.15.5) multi_xml (>= 0.5.2) httpclient (2.8.3) - i18n (0.8.4) + i18n (0.8.6) jmespath (1.3.1) jwt (1.5.6) little-plugger (1.1.4) @@ -100,13 +100,13 @@ GEM mime-types (3.1) mime-types-data (~> 3.2015) mime-types-data (3.2016.0521) - mini_portile2 (2.1.0) + mini_portile2 (2.2.0) minitest (5.10.2) multi_json (1.12.1) multi_xml (0.6.0) multipart-post (2.0.0) - nokogiri (1.7.2) - mini_portile2 (~> 2.1.0) + nokogiri (1.8.0) + mini_portile2 (~> 2.2.0) os (0.9.6) parallel (1.11.2) parser (2.4.0.0) @@ -157,6 +157,7 @@ DEPENDENCIES aws-sdk (~> 2) bundler (~> 1.15) byebug + concurrent-ruby google-cloud-storage (~> 1.3) httparty rake From 5929b0eabbb4dba2e349b193dbda49f3578ac8a1 Mon Sep 17 00:00:00 2001 From: Stanislav Gospodinov Date: Mon, 10 Jul 2017 15:46:07 +0100 Subject: [PATCH 3/3] Concurrent uploading in mirror services --- Gemfile | 6 ++-- lib/active_storage/async_uploader.rb | 34 ++++++++++++++++++++ lib/active_storage/service/mirror_service.rb | 19 +++++++---- 3 files changed, 50 insertions(+), 9 deletions(-) create mode 100644 lib/active_storage/async_uploader.rb diff --git a/Gemfile b/Gemfile index f4d38fc..d2167cd 100644 --- a/Gemfile +++ b/Gemfile @@ -5,9 +5,9 @@ gemspec gem "rake" gem "byebug" -gem 'sqlite3' -gem 'httparty' -gem 'concurrent-ruby' +gem "sqlite3" +gem "httparty" +gem "concurrent-ruby" gem "aws-sdk", "~> 2", require: false gem "google-cloud-storage", "~> 1.3", require: false diff --git a/lib/active_storage/async_uploader.rb b/lib/active_storage/async_uploader.rb new file mode 100644 index 0000000..dd448a3 --- /dev/null +++ b/lib/active_storage/async_uploader.rb @@ -0,0 +1,34 @@ +require "concurrent/promise" + +class ActiveStorage::AsyncUploader + class << self + def result(uploaders) + promises = uploaders.map(&:promise) + Concurrent::Promise.zip(*promises).value! + end + end + + attr_reader :promise + + def initialize(service, key, checksum: nil) + @data = "" + @eof = false + @promise = Concurrent::Promise.execute do + until eof? do; end + service.upload key, StringIO.new(@data), checksum: checksum + end + end + + def eof? + @eof + end + + def write(chunk) + @data << chunk + @eof = false + end + + def close + @eof = true + end +end diff --git a/lib/active_storage/service/mirror_service.rb b/lib/active_storage/service/mirror_service.rb index fa48b68..d06f308 100644 --- a/lib/active_storage/service/mirror_service.rb +++ b/lib/active_storage/service/mirror_service.rb @@ -1,7 +1,9 @@ require "active_support/core_ext/module/delegation" +require "active_storage/async_uploader" require "concurrent/promise" class ActiveStorage::Service::MirrorService < ActiveStorage::Service + CHUNK_SIZE = 1024 attr_reader :primary, :mirrors delegate :download, :exist?, :url, to: :primary @@ -18,13 +20,18 @@ def initialize(primary:, mirrors:) end def upload(key, io, checksum: nil) - each_service.collect do |service| - service.upload key, io.tap(&:rewind), checksum: checksum + uploaders = each_service.collect do |service| + ActiveStorage::AsyncUploader.new(service, key, checksum: checksum) end + io.rewind + while chunk = io.read(CHUNK_SIZE) + uploaders.each { |uploader| uploader.write(chunk) } + end + ActiveStorage::AsyncUploader.result(uploaders.each(&:close)) end def delete(key) - perform_across_services :delete, key + perform_async_across_services :delete, key end private @@ -32,10 +39,10 @@ def each_service(&block) [ primary, *mirrors ].each(&block) end - def perform_across_services(method, *args) - promises = services.collect do |service| + def perform_async_across_services(method, *args) + promises = each_service.collect do |service| Concurrent::Promise.execute { service.public_send method, *args } end - Concurrent::Promise.zip(*promises).value + Concurrent::Promise.zip(*promises).value! end end