Skip to content

Commit 722bca4

Browse files
committed
Tidy up transaction and connection handling.
1 parent 219e5ab commit 722bca4

File tree

4 files changed

+70
-16
lines changed

4 files changed

+70
-16
lines changed

lib/async/bus/protocol/connection.rb

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,9 @@ def next_id
5656

5757
attr :transactions
5858

59+
# Bind a local object to a name, such that it could be accessed remotely.
60+
#
61+
# @returns [String] The (unique) name of the object.
5962
def proxy(object)
6063
name = "<#{object.class}@#{next_id.to_s(16)}>".freeze
6164

@@ -94,6 +97,8 @@ def invoke(name, arguments, options = {}, &block)
9497
@transactions[id] = transaction
9598

9699
transaction.invoke(name, arguments, options, &block)
100+
ensure
101+
transaction&.close
97102
end
98103

99104
def run
@@ -123,6 +128,7 @@ def run
123128
Async do
124129
transaction.accept(object, *message)
125130
ensure
131+
# This will also delete the transaction from @transactions:
126132
transaction.close
127133
end
128134
else

lib/async/bus/protocol/transaction.rb

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -29,24 +29,26 @@ def read
2929
end
3030

3131
def write(*arguments)
32-
@connection.packer.write([id, *arguments])
33-
@connection.packer.flush
32+
if @connection
33+
@connection.packer.write([id, *arguments])
34+
@connection.packer.flush
35+
else
36+
raise RuntimeError, "Transaction is closed!"
37+
end
3438
end
3539

3640
def close
37-
if @connection
38-
@received.enqueue(nil)
39-
40-
connection = @connection
41+
if connection = @connection
4142
@connection = nil
43+
@received.enqueue(nil)
4244

4345
connection.transactions.delete(@id)
4446
end
4547
end
4648

4749
# Invoke a remote procedure.
4850
def invoke(name, arguments, options, &block)
49-
Console.logger.debug(self) {[name, arguments, options, block]}
51+
Console.debug(self) {[name, arguments, options, block]}
5052

5153
self.write(:invoke, name, arguments, options, block_given?)
5254

@@ -68,8 +70,8 @@ def invoke(name, arguments, options, &block)
6870
end
6971
end
7072

71-
# ensure
72-
# self.write(:close)
73+
# ensure
74+
# self.write(:close)
7375
end
7476

7577
# Accept a remote procedure invokation.
@@ -97,8 +99,8 @@ def accept(object, arguments, options, block)
9799
self.write(:throw, error.tag)
98100
rescue => error
99101
self.write(:error, error)
100-
# ensure
101-
# self.write(:close)
102+
# ensure
103+
# self.write(:close)
102104
end
103105
end
104106
end

lib/async/bus/server.rb

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -11,21 +11,16 @@ module Bus
1111
class Server
1212
def initialize(endpoint = nil)
1313
@endpoint = endpoint || Protocol.local_endpoint
14-
@connected = {}
1514
end
1615

17-
attr :connected
18-
1916
def accept
2017
@endpoint.accept do |peer|
2118
connection = Protocol::Connection.server(peer)
22-
@connected[peer] = connection
2319

2420
yield connection
2521

2622
connection.run
2723
ensure
28-
connection = @connected.delete(peer)
2924
connection&.close
3025
end
3126
end

test/async/bus/server.rb

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,20 @@
55

66
require "async/bus/a_server"
77

8+
class MyArray
9+
def initialize
10+
@values = []
11+
end
12+
13+
def sum
14+
@values.sum
15+
end
16+
17+
def <<(value)
18+
@values << value
19+
end
20+
end
21+
822
describe Async::Bus::Server do
923
include Async::Bus::AServer
1024

@@ -116,4 +130,41 @@ def after(error = nil)
116130
end
117131
end
118132
end
133+
134+
with "a bound Hash instance" do
135+
let(:hash) {Hash.new}
136+
137+
def before
138+
super
139+
140+
@server_task = Async do
141+
server.accept do |connection|
142+
connection.bind(:hash, hash)
143+
connection.bind(:sum_key, proc{|key| hash[key].sum})
144+
end
145+
end
146+
end
147+
148+
def after(error = nil)
149+
@server_task.stop
150+
end
151+
152+
it "can assign a local object to the hash" do
153+
# We can't use a primitive type here, because it will be serialized and deserialized (e.g. copied), losing the object reference.
154+
array = MyArray.new
155+
156+
client.connect do |connection|
157+
# As an array is a primitve type, we must force it to be a proxy:
158+
connection[:hash][:array] = array
159+
160+
expect(connection[:sum_key].call(:array)).to be == 0
161+
162+
array << 1 << 2 << 3
163+
164+
expect(connection[:sum_key].call(:array)).to be == 6
165+
166+
expect(connection.transactions).to be(:empty?)
167+
end
168+
end
169+
end
119170
end

0 commit comments

Comments
 (0)