Skip to content
This repository was archived by the owner on Nov 9, 2017. It is now read-only.

Commit cd3063d

Browse files
committed
Concurrent uploading in mirror services
1 parent 2520ab9 commit cd3063d

File tree

2 files changed

+46
-6
lines changed

2 files changed

+46
-6
lines changed

lib/active_storage/async_uploader.rb

+34
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
require "concurrent/promise"
2+
3+
class ActiveStorage::AsyncUploader
4+
class << self
5+
def result uploaders
6+
promises = uploaders.map(&:promise)
7+
Concurrent::Promise.zip(*promises).value!
8+
end
9+
end
10+
11+
attr_reader :promise
12+
13+
def initialize service, key, checksum: nil
14+
@data = ""
15+
@eof = false
16+
@promise = Concurrent::Promise.execute do
17+
until eof? do; end
18+
service.upload key, StringIO.new(@data), checksum: checksum
19+
end
20+
end
21+
22+
def eof?
23+
@eof
24+
end
25+
26+
def write chunk
27+
@data << chunk
28+
@eof = false
29+
end
30+
31+
def close
32+
@eof = true
33+
end
34+
end

lib/active_storage/service/mirror_service.rb

+12-6
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
require "active_support/core_ext/module/delegation"
2+
require "active_storage/async_uploader"
23
require "concurrent/promise"
34

45
class ActiveStorage::Service::MirrorService < ActiveStorage::Service
@@ -18,24 +19,29 @@ def initialize(primary:, mirrors:)
1819
end
1920

2021
def upload(key, io, checksum: nil)
21-
each_service.collect do |service|
22-
service.upload key, io.tap(&:rewind), checksum: checksum
22+
uploaders = each_service.collect do |service|
23+
ActiveStorage::AsyncUploader.new(service, key, checksum: checksum)
2324
end
25+
io.rewind
26+
while chunk = io.read(1024)
27+
uploaders.each { |uploader| uploader.write(chunk) }
28+
end
29+
ActiveStorage::AsyncUploader.result(uploaders.each(&:close))
2430
end
2531

2632
def delete(key)
27-
perform_across_services :delete, key
33+
perform_async_across_services :delete, key
2834
end
2935

3036
private
3137
def each_service(&block)
3238
[ primary, *mirrors ].each(&block)
3339
end
3440

35-
def perform_across_services(method, *args)
36-
promises = services.collect do |service|
41+
def perform_async_across_services(method, *args)
42+
promises = each_service.collect do |service|
3743
Concurrent::Promise.execute { service.public_send method, *args }
3844
end
39-
Concurrent::Promise.zip(*promises).value
45+
Concurrent::Promise.zip(*promises).value!
4046
end
4147
end

0 commit comments

Comments
 (0)