-
-
Notifications
You must be signed in to change notification settings - Fork 6
/
Copy pathprocess.rb
173 lines (141 loc) · 4.23 KB
/
process.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
# frozen_string_literal: true
# Released under the MIT License.
# Copyright, 2020-2022, by Samuel Williams.
require_relative 'channel'
require_relative 'error'
require_relative 'notify/pipe'
module Async
module Container
# Represents a running child process from the point of view of the parent container.
class Process < Channel
# Represents a running child process from the point of view of the child process.
class Instance < Notify::Pipe
# Wrap an instance around the {Process} instance from within the forked child.
# @parameter process [Process] The process intance to wrap.
def self.for(process)
instance = self.new(process.out)
# The child process won't be reading from the channel:
process.close_read
instance.name = process.name
return instance
end
def initialize(io)
super
@name = nil
end
# Set the process title to the specified value.
# @parameter value [String] The name of the process.
def name= value
if @name = value
::Process.setproctitle(@name)
end
end
# The name of the process.
# @returns [String]
def name
@name
end
# Replace the current child process with a different one. Forwards arguments and options to {::Process.exec}.
# This method replaces the child process with the new executable, thus this method never returns.
def exec(*arguments, ready: true, **options)
if ready
self.ready!(status: "(exec)") if ready
else
self.before_spawn(arguments, options)
end
# TODO prefer **options... but it doesn't support redirections on < 2.7
::Process.exec(*arguments, options)
end
end
# Fork a child process appropriate for a container.
# @returns [Process]
def self.fork(**options)
self.new(**options) do |process|
::Process.fork do
# We use `Thread.current.raise(...)` so that exceptions are filtered through `Thread.handle_interrupt` correctly.
Signal.trap(:INT) {::Thread.current.raise(Interrupt)}
Signal.trap(:TERM) {::Thread.current.raise(Terminate)}
begin
yield Instance.for(process)
rescue Interrupt
# Graceful exit.
rescue Exception => error
Console.logger.error(self) {error}
exit!(1)
end
end
end
end
# def self.spawn(*arguments, name: nil, **options)
# self.new(name: name) do |process|
# unless options.key?(:out)
# options[:out] = process.out
# end
#
# ::Process.spawn(*arguments, **options)
# end
# end
# Initialize the process.
# @parameter name [String] The name to use for the child process.
def initialize(name: nil)
super()
@name = name
@status = nil
@pid = nil
@pid = yield(self)
# The parent process won't be writing to the channel:
self.close_write
end
# Set the name of the process.
# Invokes {::Process.setproctitle} if invoked in the child process.
def name= value
@name = value
# If we are the child process:
::Process.setproctitle(@name) if @pid.nil?
end
# The name of the process.
# @attribute [String]
attr :name
# A human readable representation of the process.
# @returns [String]
def to_s
"\#<#{self.class} #{@name}>"
end
# Invoke {#terminate!} and then {#wait} for the child process to exit.
def close
self.terminate!
self.wait
ensure
super
end
# Send `SIGINT` to the child process.
def interrupt!
unless @status
::Process.kill(:INT, @pid)
end
end
# Send `SIGTERM` to the child process.
def terminate!
unless @status
::Process.kill(:TERM, @pid)
end
end
# Wait for the child process to exit.
# @returns [::Process::Status] The process exit status.
def wait
if @pid && @status.nil?
_, @status = ::Process.wait2(@pid, ::Process::WNOHANG)
if @status.nil?
sleep(0.01)
_, @status = ::Process.wait2(@pid, ::Process::WNOHANG)
end
if @status.nil?
Console.logger.warn(self) {"Process #{@pid} is blocking, has it exited?"}
_, @status = ::Process.wait2(@pid)
end
end
return @status
end
end
end
end