Skip to content

Commit 1ecc7ac

Browse files
fix(ConnectionManager): allow duplicate connections to a peer
1 parent a0d9b07 commit 1ecc7ac

File tree

4 files changed

+126
-120
lines changed

4 files changed

+126
-120
lines changed

PeerTalk/src/ConnectionManager.cs

+53-66
Original file line numberDiff line numberDiff line change
@@ -26,14 +26,17 @@ public class ConnectionManager
2626
/// <summary>
2727
/// The connections to other peers. Key is the base58 hash of the peer ID.
2828
/// </summary>
29-
ConcurrentDictionary<string, PeerConnection> connections = new ConcurrentDictionary<string, PeerConnection>();
29+
ConcurrentDictionary<string, List<PeerConnection>> connections = new ConcurrentDictionary<string, List<PeerConnection>>();
3030

3131
string Key(Peer peer) => peer.Id.ToBase58();
32+
string Key(MultiHash id) => id.ToBase58();
3233

3334
/// <summary>
3435
/// Gets the current connections.
3536
/// </summary>
36-
public IEnumerable<PeerConnection> Connections => connections.Values;
37+
public IEnumerable<PeerConnection> Connections => connections.Values
38+
.SelectMany(c => c)
39+
.Where(c => c.Stream != null && c.Stream.CanRead && c.Stream.CanWrite);
3740

3841
/// <summary>
3942
/// Determines if a connection exists to the specified peer.
@@ -68,20 +71,17 @@ public bool IsConnected(Peer peer)
6871
/// </remarks>
6972
public bool TryGet(Peer peer, out PeerConnection connection)
7073
{
71-
if (!connections.TryGetValue(Key(peer), out connection))
74+
connection = null;
75+
if (!connections.TryGetValue(Key(peer), out List<PeerConnection> conns))
7276
{
7377
return false;
7478
}
7579

76-
// Is nolonger active.
77-
if (connection.Stream == null || !connection.Stream.CanRead || !connection.Stream.CanWrite)
78-
{
79-
Remove(connection);
80-
connection = null;
81-
return false;
82-
}
80+
connection = conns
81+
.Where(c => c.Stream != null && c.Stream.CanRead && c.Stream.CanWrite)
82+
.FirstOrDefault();
8383

84-
return true;
84+
return connection != null;
8585
}
8686

8787
/// <summary>
@@ -100,39 +100,20 @@ public bool TryGet(Peer peer, out PeerConnection connection)
100100
/// </remarks>
101101
public PeerConnection Add(PeerConnection connection)
102102
{
103-
if (connections.TryGetValue(Key(connection.RemotePeer), out PeerConnection existing))
104-
{
105-
// If the same connection.
106-
if (connection == existing)
103+
connections.AddOrUpdate(
104+
Key(connection.RemotePeer),
105+
(key) => new List<PeerConnection> { connection },
106+
(key, conns) =>
107107
{
108-
return connection;
108+
if (!conns.Contains(connection))
109+
{
110+
conns.Add(connection);
111+
}
112+
return conns;
109113
}
114+
);
110115

111-
// If existing is dead, then use current connection.
112-
if (existing.Stream == null || !existing.Stream.CanRead || !existing.Stream.CanWrite)
113-
{
114-
var address = connection.RemotePeer.ConnectedAddress;
115-
Remove(existing);
116-
connection.RemotePeer.ConnectedAddress = address;
117-
// fall thru to add logic
118-
}
119-
else
120-
{
121-
var address = existing.RemotePeer.ConnectedAddress;
122-
log.Debug($"duplicate {connection.RemoteAddress}, keeping {existing.RemoteAddress}");
123-
connection.Dispose();
124-
existing.RemotePeer.ConnectedAddress = address;
125-
return existing;
126-
}
127-
}
128-
129-
if (!connections.TryAdd(Key(connection.RemotePeer), connection))
130-
{
131-
// This case should not happen.
132-
connection.Dispose();
133-
return connections[Key(connection.RemotePeer)];
134-
}
135-
116+
connection.Closed += (s, e) => Remove(e);
136117
return connection;
137118
}
138119

@@ -156,29 +137,30 @@ public bool Remove(PeerConnection connection)
156137
return false;
157138
}
158139

159-
var q = connections.TryRemove(Key(connection.RemotePeer), out PeerConnection _);
160-
connection.Dispose();
161140

162-
return q;
163-
}
141+
if (!connections.TryGetValue(Key(connection.RemotePeer), out List<PeerConnection> conns))
142+
{
143+
connection.Dispose();
144+
return false;
145+
}
146+
if (!conns.Contains(connection))
147+
{
148+
connection.Dispose();
149+
return false;
150+
}
164151

165-
/// <summary>
166-
/// Remove the connection to the peer.
167-
/// </summary>
168-
/// <param name="peer">
169-
/// The peer to remove.
170-
/// </param>
171-
/// <returns>
172-
/// <b>true</b> if a connection was removed; otherwise, <b>false</b>.
173-
/// </returns>
174-
public bool Remove(Peer peer)
175-
{
176-
var connection = connections.Values.FirstOrDefault(c => c.RemotePeer.Id == peer.Id);
177-
return Remove(connection);
152+
connection.Dispose();
153+
conns.Remove(connection);
154+
if (conns.Count > 0)
155+
{
156+
var last = conns.Last();
157+
last.RemotePeer.ConnectedAddress = last.RemoteAddress;
158+
}
159+
return true;
178160
}
179161

180162
/// <summary>
181-
/// Remove the connection to the peer ID.
163+
/// Remove and close all connection tos the peer ID.
182164
/// </summary>
183165
/// <param name="id">
184166
/// The ID of a <see cref="Peer"/> to remove.
@@ -188,21 +170,26 @@ public bool Remove(Peer peer)
188170
/// </returns>
189171
public bool Remove(MultiHash id)
190172
{
191-
var connection = connections.Values.FirstOrDefault(c => c.RemotePeer.Id == id);
192-
return Remove(connection);
173+
if (!connections.TryRemove(Key(id), out List<PeerConnection> conns))
174+
{
175+
return false;
176+
}
177+
foreach (var conn in conns)
178+
{
179+
conn.Dispose();
180+
}
181+
return true;
193182
}
194183

195184
/// <summary>
196185
/// Removes and closes all connections.
197186
/// </summary>
198187
public void Clear()
199188
{
200-
201-
for (var connection = connections.Values.LastOrDefault();
202-
connection != null;
203-
connection = connections.Values.LastOrDefault())
189+
var conns = connections.Values.SelectMany(c => c).ToArray();
190+
foreach (var conn in conns)
204191
{
205-
Remove(connection);
192+
Remove(conn);
206193
}
207194
}
208195
}

PeerTalk/src/PeerConnection.cs

+32-32
Original file line numberDiff line numberDiff line change
@@ -327,47 +327,47 @@ public async void ReadMessages(Stream stream, CancellationToken cancel)
327327
/// <param name="disposing"></param>
328328
protected virtual void Dispose(bool disposing)
329329
{
330-
if (!disposedValue)
330+
if (disposedValue)
331+
return;
332+
disposedValue = true;
333+
334+
if (disposing)
331335
{
332-
if (disposing)
336+
log.Debug($"Closing connection to {RemoteAddress}");
337+
if (Stream != null)
333338
{
334-
log.Debug($"Closing connection to {RemoteAddress}");
335-
if (Stream != null)
339+
try
340+
{
341+
Stream.Dispose();
342+
}
343+
catch (ObjectDisposedException)
344+
{
345+
// ignore stream already closed.
346+
}
347+
catch (Exception e)
336348
{
337-
try
338-
{
339-
Stream.Dispose();
340-
}
341-
catch (ObjectDisposedException)
342-
{
343-
// ignore stream already closed.
344-
}
345-
catch (Exception e)
346-
{
347-
log.Warn($"Failed to close connection to {RemoteAddress}", e);
348-
// eat it.
349-
}
350-
finally
351-
{
352-
Stream = null;
353-
statsStream = null;
354-
}
349+
log.Warn($"Failed to close connection to {RemoteAddress}", e);
350+
// eat it.
355351
}
356-
if (RemotePeer != null && RemotePeer.ConnectedAddress == RemoteAddress)
352+
finally
357353
{
358-
RemotePeer.ConnectedAddress = null;
354+
Stream = null;
355+
statsStream = null;
359356
}
360-
SecurityEstablished.TrySetCanceled();
361-
IdentityEstablished.TrySetCanceled();
362-
IdentityEstablished.TrySetCanceled();
363-
Closed?.Invoke(this, this);
364357
}
358+
if (RemotePeer != null && RemotePeer.ConnectedAddress == RemoteAddress)
359+
{
360+
RemotePeer.ConnectedAddress = null;
361+
}
362+
SecurityEstablished.TrySetCanceled();
363+
IdentityEstablished.TrySetCanceled();
364+
IdentityEstablished.TrySetCanceled();
365+
Closed?.Invoke(this, this);
366+
}
365367

366-
// free unmanaged resources (unmanaged objects) and override a finalizer below.
367-
// set large fields to null.
368+
// free unmanaged resources (unmanaged objects) and override a finalizer below.
369+
// set large fields to null.
368370

369-
disposedValue = true;
370-
}
371371
}
372372

373373
// TODO: override a finalizer only if Dispose(bool disposing) above has code to free unmanaged resources.

PeerTalk/test/ConnectionManagerTest.cs

+39-20
Original file line numberDiff line numberDiff line change
@@ -58,10 +58,15 @@ public void Add_Duplicate()
5858
Assert.AreEqual(1, manager.Connections.Count());
5959
Assert.IsNotNull(a.Stream);
6060

61-
Assert.AreSame(a, manager.Add(b));
61+
Assert.AreSame(b, manager.Add(b));
6262
Assert.IsTrue(manager.IsConnected(peer));
63-
Assert.AreEqual(1, manager.Connections.Count());
63+
Assert.AreEqual(2, manager.Connections.Count());
6464
Assert.IsNotNull(a.Stream);
65+
Assert.IsNotNull(b.Stream);
66+
67+
manager.Clear();
68+
Assert.AreEqual(0, manager.Connections.Count());
69+
Assert.IsNull(a.Stream);
6570
Assert.IsNull(b.Stream);
6671
}
6772

@@ -99,7 +104,38 @@ public void Add_Duplicate_PeerConnectedAddress()
99104
Assert.IsNotNull(a.Stream);
100105
Assert.AreEqual(address, peer.ConnectedAddress);
101106

102-
Assert.AreSame(a, manager.Add(b));
107+
Assert.AreSame(b, manager.Add(b));
108+
Assert.IsTrue(manager.IsConnected(peer));
109+
Assert.AreEqual(2, manager.Connections.Count());
110+
Assert.IsNotNull(a.Stream);
111+
Assert.IsNotNull(b.Stream);
112+
Assert.AreEqual(address, peer.ConnectedAddress);
113+
}
114+
115+
[TestMethod]
116+
public void Remove_Duplicate_PeerConnectedAddress()
117+
{
118+
var address = "/ip6/::1/tcp/4007";
119+
120+
var manager = new ConnectionManager();
121+
var peer = new Peer { Id = aId, ConnectedAddress = address };
122+
var a = new PeerConnection { RemotePeer = peer, RemoteAddress = address, Stream = Stream.Null };
123+
var b = new PeerConnection { RemotePeer = peer, RemoteAddress = address, Stream = Stream.Null };
124+
125+
Assert.AreSame(a, manager.Add(a));
126+
Assert.IsTrue(manager.IsConnected(peer));
127+
Assert.AreEqual(1, manager.Connections.Count());
128+
Assert.IsNotNull(a.Stream);
129+
Assert.AreEqual(address, peer.ConnectedAddress);
130+
131+
Assert.AreSame(b, manager.Add(b));
132+
Assert.IsTrue(manager.IsConnected(peer));
133+
Assert.AreEqual(2, manager.Connections.Count());
134+
Assert.IsNotNull(a.Stream);
135+
Assert.IsNotNull(b.Stream);
136+
Assert.AreEqual(address, peer.ConnectedAddress);
137+
138+
Assert.IsTrue(manager.Remove(b));
103139
Assert.IsTrue(manager.IsConnected(peer));
104140
Assert.AreEqual(1, manager.Connections.Count());
105141
Assert.IsNotNull(a.Stream);
@@ -173,23 +209,6 @@ public void Remove_Connection()
173209
Assert.IsNull(a.Stream);
174210
}
175211

176-
[TestMethod]
177-
public void Remove_Peer()
178-
{
179-
var manager = new ConnectionManager();
180-
var peer = new Peer { Id = aId };
181-
var a = new PeerConnection { RemotePeer = peer, Stream = Stream.Null };
182-
183-
manager.Add(a);
184-
Assert.IsTrue(manager.IsConnected(peer));
185-
Assert.AreEqual(1, manager.Connections.Count());
186-
Assert.IsNotNull(a.Stream);
187-
188-
Assert.IsTrue(manager.Remove(peer));
189-
Assert.IsFalse(manager.IsConnected(peer));
190-
Assert.AreEqual(0, manager.Connections.Count());
191-
Assert.IsNull(a.Stream);
192-
}
193212

194213
[TestMethod]
195214
public void Remove_PeerId()

mung.cmd

+2-2
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
1-
dotnet build -c Release --framework net461 ./test
1+
dotnet build -c Release --framework netcoreapp2.1 ./test
22

33
:Loop
4-
dotnet test --logger "console;verbosity=normal" -c Release --no-restore --no-build --framework net461 ./test
4+
dotnet test --logger "console;verbosity=normal" -c Release --no-restore --no-build --framework netcoreapp2.1 ./test --filter BitswapApiTest
55
if %errorlevel% equ 0 goto :Loop
66
echo Connection established

0 commit comments

Comments
 (0)