Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Factor out IO-selection code, prepare for PR #2466 #2762

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 2 additions & 17 deletions Library/Homebrew/cask/lib/hbc/system_command.rb
Original file line number Diff line number Diff line change
Expand Up @@ -81,31 +81,16 @@ def each_output_line(&b)

write_input_to(raw_stdin)
raw_stdin.close_write
each_line_from [raw_stdout, raw_stderr], &b

::Utils::IOSelector
.each_line_from(stdout: raw_stdout, stderr: raw_stderr, &b)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this (and the method parameter) be &block too?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I figure that should be &block too; however that should be left to another PR.

I have left &b untouched because the corresponding method signature does not really relate to this issue. I’d prefer to keep commits focused, and diffs too ☺️

@processed_status = raw_wait_thr.value
end

def write_input_to(raw_stdin)
[*options[:input]].each { |line| raw_stdin.print line }
end

def each_line_from(sources)
loop do
readable_sources = IO.select(sources)[0]
readable_sources.delete_if(&:eof?).first(1).each do |source|
type = ((source == sources[0]) ? :stdout : :stderr)
begin
yield(type, source.readline_nonblock || "")
rescue IO::WaitReadable, EOFError
next
end
end
break if readable_sources.empty?
end
sources.each(&:close_read)
end

def result
Result.new(command,
processed_output[:stdout],
Expand Down
1 change: 1 addition & 0 deletions Library/Homebrew/test/spec_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
require "rspec/its"
require "rspec/wait"
require "set"
require "English"

if ENV["HOMEBREW_TESTS_COVERAGE"]
require "simplecov"
Expand Down
176 changes: 176 additions & 0 deletions Library/Homebrew/test/utils/io_selector_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
require "utils/io_selector"

describe Utils::IOSelector do
describe "given text streams" do
let(:first_pipe) { IO.pipe }
let(:second_pipe) { IO.pipe }

let(:first_reader) { first_pipe[0] }
let(:second_reader) { second_pipe[0] }

let(:first_writer) { first_pipe[1] }
let(:second_writer) { second_pipe[1] }

let(:queues) { { first: Queue.new, second: Queue.new } }

let(:write_first!) do
thread = Thread.new(first_writer, queues[:second]) do |io, queue|
io.puts "Lorem"
wait(1).for { queue.pop }.to end_with("\n")
io.puts "dolor"
io.close
end
thread.abort_on_exception = true
thread
end

let(:write_second!) do
thread = Thread.new(second_writer, queues[:first]) do |io, queue|
wait(1).for { queue.pop }.to end_with("\n")
io.puts "ipsum"
wait(1).for { queue.pop }.to end_with("\n")
io.puts "sit"
io.puts "amet"
io.close
end
thread.abort_on_exception = true
thread
end

let(:queue_feeder) do
lambda do |proc_under_test, *args, &block|
proc_under_test.call(*args) do |tag, string_received|
queues[tag] << string_received
block.call(tag, string_received)
end
end
end

before do
allow_any_instance_of(Utils::IOSelector)
.to receive(:each_line_nonblock)
.and_wrap_original do |*args, &block|
queue_feeder.call(*args, &block)
end

write_first!
write_second!
end

after do
write_first!.exit
write_second!.exit
end

describe "::each_line_from" do
subject do
line_hash = { first: "", second: "", full_text: "" }

Utils::IOSelector.each_line_from(
first: first_reader,
second: second_reader,
) do |tag, string_received|
line_hash[tag] << string_received
line_hash[:full_text] << string_received
end
line_hash
end

before { wait(1).for(subject) }

its([:first]) { is_expected.to eq("Lorem\ndolor\n") }
its([:second]) { is_expected.to eq("ipsum\nsit\namet\n") }

its([:full_text]) {
is_expected.to eq("Lorem\nipsum\ndolor\nsit\namet\n")
}
end

describe "::new" do
let(:selector) do
Utils::IOSelector.new(
first: first_reader,
second: second_reader,
)
end
subject { selector }

describe "pre-read" do
its(:pending_streams) {
are_expected.to eq([first_reader, second_reader])
}

its(:separator) {
is_expected.to eq($INPUT_RECORD_SEPARATOR)
}
end

describe "post-read" do
before do
wait(1).for {
subject.each_line_nonblock {}
true
}.to be true
end

after { expect(selector.all_streams).to all be_closed }

its(:pending_streams) { are_expected.to be_empty }
its(:separator) {
is_expected.to eq($INPUT_RECORD_SEPARATOR)
}
end

describe "#each_line_nonblock" do
subject do
line_hash = { first: "", second: "", full_text: "" }
super().each_line_nonblock do |tag, string_received|
line_hash[tag] << string_received
line_hash[:full_text] << string_received
end
line_hash
end

before { wait(1).for(subject) }
after { expect(selector.all_streams).to all be_closed }

its([:first]) { is_expected.to eq("Lorem\ndolor\n") }
its([:second]) { is_expected.to eq("ipsum\nsit\namet\n") }

its([:full_text]) {
is_expected.to eq("Lorem\nipsum\ndolor\nsit\namet\n")
}
end

its(:all_streams) {
are_expected.to eq([first_reader, second_reader])
}

its(:all_tags) { are_expected.to eq([:first, :second]) }

describe "#tag_of" do
subject do
{
"first tag" => super().tag_of(first_reader),
"second tag" => super().tag_of(second_reader),
}
end

its(["first tag"]) { is_expected.to eq(:first) }
its(["second tag"]) { is_expected.to eq(:second) }
end
end

describe "::new with a custom separator" do
subject do
tagged_streams = {
first: first_reader,
second: second_reader,
}
Utils::IOSelector.new(tagged_streams, ",")
end

its(:separator) { is_expected.to eq(",") }
end
end
end
1 change: 1 addition & 0 deletions Library/Homebrew/utils.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
require "utils/github"
require "utils/hash"
require "utils/inreplace"
require "utils/io_selector"
require "utils/link"
require "utils/popen"
require "utils/svn"
Expand Down
79 changes: 79 additions & 0 deletions Library/Homebrew/utils/io_selector.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
require "delegate"
require "English"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This shouldn't be required; it's part of global now. It may not be needed in the spec_helper either.


require "extend/io"

module Utils
#
# The class `IOSelector` is a wrapper for `IO::select` with the
# added benefit that it spans the streams' lifetimes.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure I understand why this is a benefit, can you elaborate.

#
# The class accepts multiple IOs which must be open for reading.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does that mean they must be already open? When would they not be open?

# It then notifies the client as data becomes available
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the client? What is the notification?

# per-stream.
#
# Its main use is to allow a client to read both `stdout` and
# `stderr` of a subprocess in a way that avoids buffer-related
# deadlocks.
#
# For a more in-depth explanation, see:
# https://github.com/Homebrew/brew/pull/2466
#
class IOSelector < DelegateClass(Hash)
attr_reader :separator

alias all_streams keys
alias all_tags values
alias tag_of fetch
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are the aliases needed?


def self.each_line_from(streams = {},
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not obvious what the streams input hash format should be.

separator = $INPUT_RECORD_SEPARATOR, &block)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you indent this further in; this looks currently like it's a line in the function.

new(streams, separator).each_line_nonblock(&block)
end

def initialize(streams = {},
separator = $INPUT_RECORD_SEPARATOR)
super(streams.invert.compare_by_identity)
@separator = separator
end

def each_line_nonblock
Copy link
Contributor

@apjanke apjanke Jun 13, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is each_line_nonblock the right name for this method? It calls readline_nonblock, which is nonblocking, but each_readable_stream_until_eof is calling IO::select, which may block if there is no input on any stream, so this method may block. Maybe each_line_each_stream or each_line_all_streams or each_line_dont_deadlock?

each_readable_stream_until_eof do |stream|
line = stream.readline_nonblock(separator) || ""
yield(tag_of(stream), line)
end
close_streams
end

def pending_streams
@pending_streams ||= all_streams.dup
end

private

def each_readable_stream_until_eof(&block)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe read_io_until_eofs or multiplex_streams_until_eof or similar would be a better name? To me, in Ruby, a name each_X suggests iterating over the Xs, processing each once in turn. This method doesn't iterate over the streams; it multiplexes over them in an arbitrary order determined by external state, with possible repeats. (The readable_streams.each does iterate over some streams, but the streams in each call are an arbitrary sublist of the streams passed in to the IOSelector object, and doesn't correspond to the behavior visible to the caller.)

loop do
readable_streams.each do |stream|
pending_streams.delete(stream) if stream.eof?
yield_gracefully(stream, &block)
end
break if pending_streams.empty?
end
end

def readable_streams
IO.select(pending_streams)[0]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd favour .first here

end

def yield_gracefully(stream)
yield(stream)
rescue IO::WaitReadable, IO::WaitWritable, EOFError
# We'll be back until/unless EOF
return
end

def close_streams
all_streams.each(&:close_read)
end
end
end