diff --git a/src/BlockExchange/Bitswap.cs b/src/BlockExchange/Bitswap.cs index e42694f6..ef48b852 100644 --- a/src/BlockExchange/Bitswap.cs +++ b/src/BlockExchange/Bitswap.cs @@ -3,6 +3,7 @@ using PeerTalk; using PeerTalk.Protocols; using System; +using System.Collections; using System.Collections.Concurrent; using System.Collections.Generic; using System.Linq; @@ -175,7 +176,7 @@ async void Swarm_ConnectionEstablished(object sender, PeerConnection connection) var peer = await connection.IdentityEstablished.Task.ConfigureAwait(false); // Fire and forget. - var _ = SendWantListAsync(peer, wants.Values, true); + var _ = SendWantListAsync(peer, wants.Keys, Enumerable.Empty(), true); } catch (Exception e) { @@ -214,7 +215,7 @@ public Task StopAsync() public IEnumerable PeerWants(MultiHash peer) { return wants.Values - .Where(w => w.Peers.Contains(peer)) + .Where(w => w.Tasks.Values.Contains(peer)) .Select(w => w.Id); } @@ -240,7 +241,7 @@ public IEnumerable PeerWants(MultiHash peer) /// someone will forward it to us. /// /// Besides using for cancellation, the - /// method will also cancel the operation. + /// method will also cancel the operation. /// /// public Task WantAsync(Cid id, MultiHash peer, CancellationToken cancel) @@ -253,27 +254,32 @@ public Task WantAsync(Cid id, MultiHash peer, CancellationToken canc var tsc = new TaskCompletionSource(); var want = wants.AddOrUpdate( id, - (key) => new WantedBlock - { - Id = id, - Consumers = new List> { tsc }, - Peers = new List { peer } + (key) => { + var block = new WantedBlock + { + Id = id, + Tasks = new ConcurrentDictionary, MultiHash>() + }; + block.Tasks.TryAdd(tsc, peer); + return block; }, (key, block) => { - block.Peers.Add(peer); - block.Consumers.Add(tsc); + block.Tasks.TryAdd(tsc, peer); return block; } ); // If cancelled, then the block is unwanted. - cancel.Register(() => Unwant(id)); + cancel.Register(() => Unwant(id, peer)); // If first time, tell other peers. - if (want.Consumers.Count == 1) + if (want.Tasks.Count == 1 && peer == Swarm.LocalPeer.Id) { - var _ = SendWantListToAllAsync(new[] { want }, full: false); + var _ = SendWantListToAllAsync( + new[] { id }, + Enumerable.Empty(), + full: false); BlockNeeded?.Invoke(this, new CidEventArgs { Id = want.Id }); } @@ -302,13 +308,80 @@ public void Unwant(Cid id) if (wants.TryRemove(id, out WantedBlock block)) { - foreach (var consumer in block.Consumers) + foreach (var task in block.Tasks.Keys) { - consumer.SetCanceled(); + task.TrySetCanceled(); } + + // Tell the swarm. + var _ = SendWantListToAllAsync( + Enumerable.Empty(), + new[] { block.Id }, + false); + } + } + + /// + /// Removes the block from the want list. + /// + /// + /// The CID of the block to remove from the want list. + /// + /// + /// The id of the peer that no longer wants the block. + /// + /// + /// Any tasks from the waiting for the block are cancelled. + /// + /// No exception is thrown if the is not + /// on the want list. + /// + /// + public void Unwant(Cid id, MultiHash peer) + { + if (log.IsDebugEnabled) + { + log.Debug($"Unwant {id} for {peer}"); + } + + // Short curcuit if id is not not wanted or not wanted by + // the peer. + if (!wants.TryGetValue(id, out WantedBlock want)) + { + return; + } + if (!want.Tasks.Values.Contains(peer)) + { + return; + } + + // Get the tasks that want the CID for the peer. + var tasks = want.Tasks + .Where(t => t.Value == peer) + .Select(t => t.Key) + .ToArray(); + foreach (var task in tasks) + { + log.Debug($"cancel {id} for {peer}"); + task.TrySetCanceled(); + want.Tasks.TryRemove(task, out _); + } + if (peer == Swarm.LocalPeer.Id) + { + log.Debug($"sending cancel {id}"); + // Tell the swarm. + var _ = SendWantListToAllAsync( + Enumerable.Empty(), + new[] { id }, + false); } - // TODO: Tell the swarm + // If no other peer wants the CID, then remove it + // from the want list. + if (want.Tasks.Count == 0) + { + wants.TryRemove(id, out _); + } } /// @@ -455,11 +528,18 @@ public int Found(IDataBlock block) { if (wants.TryRemove(block.Id, out WantedBlock want)) { - foreach (var consumer in want.Consumers) + foreach (var task in want.Tasks.Keys) { - consumer.SetResult(block); + task.SetResult(block); } - return want.Consumers.Count; + + // Tell the swarm. + var _ = SendWantListToAllAsync( + Enumerable.Empty(), + new[] { block.Id }, + false); + + return want.Tasks.Count; } return 0; @@ -468,7 +548,10 @@ public int Found(IDataBlock block) /// /// Send our want list to the connected peers. /// - async Task SendWantListToAllAsync(IEnumerable wants, bool full) + async Task SendWantListToAllAsync( + IEnumerable wants, + IEnumerable cancels, + bool full) { if (Swarm == null) return; @@ -477,7 +560,7 @@ async Task SendWantListToAllAsync(IEnumerable wants, bool full) { var tasks = Swarm.KnownPeers .Where(p => p.ConnectedAddress != null) - .Select(p => SendWantListAsync(p, wants, full)) + .Select(p => SendWantListAsync(p, wants, cancels, full)) .ToArray(); if (log.IsDebugEnabled) log.Debug($"Spamming {tasks.Count()} connected peers"); @@ -492,7 +575,11 @@ async Task SendWantListToAllAsync(IEnumerable wants, bool full) } } - async Task SendWantListAsync(Peer peer, IEnumerable wants, bool full) + async Task SendWantListAsync( + Peer peer, + IEnumerable wants, + IEnumerable cancels, + bool full) { log.Debug($"sending want list to {peer}"); @@ -504,7 +591,7 @@ async Task SendWantListAsync(Peer peer, IEnumerable wants, bool ful { using (var stream = await Swarm.DialAsync(peer, protocol.ToString()).ConfigureAwait(false)) { - await protocol.SendWantsAsync(stream, wants, full: full).ConfigureAwait(false); + await protocol.SendWantsAsync(stream, wants, cancels, full: full).ConfigureAwait(false); } return; } diff --git a/src/BlockExchange/Bitswap1.cs b/src/BlockExchange/Bitswap1.cs index c9433a46..d2a1976d 100644 --- a/src/BlockExchange/Bitswap1.cs +++ b/src/BlockExchange/Bitswap1.cs @@ -61,8 +61,7 @@ public override string ToString() Cid cid = s; if (entry.cancel) { - // TODO: Unwant specific to remote peer - Bitswap.Unwant(cid); + Bitswap.Unwant(cid, connection.RemotePeer.Id); } else { @@ -117,24 +116,27 @@ async Task GetBlockAsync(Cid cid, Peer remotePeer, CancellationToken cancel) /// public async Task SendWantsAsync( Stream stream, - IEnumerable wants, + IEnumerable wants, + IEnumerable cancels, bool full = true, CancellationToken cancel = default(CancellationToken) ) { - log.Debug("Sending want list"); - + var entries = new List(); + foreach (var cid in wants) + { + entries.Add(new Entry { block = cid.ToArray() }); + } + foreach (var cid in cancels) + { + entries.Add(new Entry { block = cid.ToArray(), cancel = true }); + } var message = new Message { wantlist = new Wantlist { full = full, - entries = wants - .Select(w => new Entry - { - block = w.Id.Hash.ToArray() - }) - .ToArray() + entries = entries.ToArray(), } }; diff --git a/src/BlockExchange/Bitswap11.cs b/src/BlockExchange/Bitswap11.cs index 604227b3..faa577bb 100644 --- a/src/BlockExchange/Bitswap11.cs +++ b/src/BlockExchange/Bitswap11.cs @@ -57,10 +57,10 @@ public override string ToString() foreach (var entry in request.wantlist.entries) { var cid = Cid.Read(entry.block); + log.Debug($"entry {cid} cancel {entry.cancel}"); if (entry.cancel) { - // TODO: Unwant specific to remote peer - Bitswap.Unwant(cid); + Bitswap.Unwant(cid, connection.RemotePeer.Id); } else { @@ -126,24 +126,27 @@ async Task GetBlockAsync(Cid cid, Peer remotePeer, CancellationToken cancel) /// public async Task SendWantsAsync( Stream stream, - IEnumerable wants, + IEnumerable wants, + IEnumerable cancels, bool full = true, CancellationToken cancel = default(CancellationToken) ) { + var entries = new List(); + foreach (var cid in wants) + { + entries.Add(new Entry { block = cid.ToArray() }); + } + foreach (var cid in cancels) + { + entries.Add(new Entry { block = cid.ToArray(), cancel = true }); + } var message = new Message { wantlist = new Wantlist { full = full, - entries = wants - .Select(w => { - return new Entry - { - block = w.Id.ToArray() - }; - }) - .ToArray() + entries = entries.ToArray(), }, payload = new List(0) }; diff --git a/src/BlockExchange/IBitswapProtocol.cs b/src/BlockExchange/IBitswapProtocol.cs index 6d30d6ff..48a440fe 100644 --- a/src/BlockExchange/IBitswapProtocol.cs +++ b/src/BlockExchange/IBitswapProtocol.cs @@ -21,7 +21,10 @@ public interface IBitswapProtocol : IPeerProtocol /// The destination of the want list. /// /// - /// A sequence of . + /// A sequence of that is wanted. + /// + /// + /// A sequence of that is not wanted. /// /// /// true if is the full want list. @@ -35,7 +38,8 @@ public interface IBitswapProtocol : IPeerProtocol Task SendWantsAsync ( Stream stream, - IEnumerable wants, + IEnumerable wants, + IEnumerable cancels, bool full = true, CancellationToken cancel = default(CancellationToken) ); diff --git a/src/BlockExchange/WantedBlock.cs b/src/BlockExchange/WantedBlock.cs index 24f1bca7..ec2cda4c 100644 --- a/src/BlockExchange/WantedBlock.cs +++ b/src/BlockExchange/WantedBlock.cs @@ -16,19 +16,18 @@ namespace Ipfs.Engine.BlockExchange public class WantedBlock { /// - /// The content ID of the block; + /// The content ID of the block. /// public Cid Id; - /// - /// The peers that want the block. - /// - public List Peers; - /// /// The consumers that are waiting for the block. /// - public List> Consumers; + /// + /// The keys is a TaskCompletionSource and the value is + /// the peer ID. + /// + public ConcurrentDictionary, MultiHash> Tasks; } } diff --git a/src/IpfsEngine.csproj b/src/IpfsEngine.csproj index e8888ebe..682ea25a 100644 --- a/src/IpfsEngine.csproj +++ b/src/IpfsEngine.csproj @@ -54,7 +54,7 @@ - + diff --git a/test/BlockExchange/BitswapTest .cs b/test/BlockExchange/BitswapTest .cs index 14e4f3b2..5dd279c7 100644 --- a/test/BlockExchange/BitswapTest .cs +++ b/test/BlockExchange/BitswapTest .cs @@ -17,6 +17,11 @@ public class BitswapTest Id = "QmXK9VBxaXFuuT29AaPUTgW3jBWZ9JgLVZYdMYTHC6LLAH", PublicKey = "CAASXjBcMA0GCSqGSIb3DQEBAQUAA0sAMEgCQQCC5r4nQBtnd9qgjnG8fBN5+gnqIeWEIcUFUdCG4su/vrbQ1py8XGKNUBuDjkyTv25Gd3hlrtNJV3eOKZVSL8ePAgMBAAE=" }; + Peer other = new Peer + { + Id = "QmXK9VBxaXFuuT29AaPUTgW3jBWZ9JgLVZYdMYTHC6LLAX", + PublicKey = "CAASXjBcMA0GCSqGSIb3DQEBAQUAA0sAMEgCQQCC5r4nQBtnd9qgjnG8fBN5+gnqIeWEIcUFUdCG4su/vrbQ1py8XGKNUBuDjkyTv25Gd3hlrtNJV3eOKZVSL8ePAgMBAAE=" + }; [TestMethod] public void WantList() @@ -88,6 +93,28 @@ public void Want_Unwant() CollectionAssert.DoesNotContain(bitswap.PeerWants(self.Id).ToArray(), cid); } + [TestMethod] + public void Want_Unwant_PeerSpecific() + { + var bitswap = new Bitswap { Swarm = new Swarm { LocalPeer = self } }; + var cid = new DagNode(Encoding.UTF8.GetBytes("Want_Unwant_PeerSpecific unknown block")).Id; + var cts = new CancellationTokenSource(); + var task1 = bitswap.WantAsync(cid, self.Id, cts.Token); + var cts2 = new CancellationTokenSource(); + var task2 = bitswap.WantAsync(cid, other.Id, cts.Token); + + CollectionAssert.Contains(bitswap.PeerWants(self.Id).ToArray(), cid); + CollectionAssert.Contains(bitswap.PeerWants(other.Id).ToArray(), cid); + + bitswap.Unwant(cid, self.Id); + CollectionAssert.DoesNotContain(bitswap.PeerWants(self.Id).ToArray(), cid); + CollectionAssert.Contains(bitswap.PeerWants(other.Id).ToArray(), cid); + + bitswap.Unwant(cid, other.Id); + CollectionAssert.DoesNotContain(bitswap.PeerWants(self.Id).ToArray(), cid); + CollectionAssert.DoesNotContain(bitswap.PeerWants(other.Id).ToArray(), cid); + } + [TestMethod] public void Found() { diff --git a/test/CoreApi/BitswapApiTest.cs b/test/CoreApi/BitswapApiTest.cs index 2d04f8f3..361357f2 100644 --- a/test/CoreApi/BitswapApiTest.cs +++ b/test/CoreApi/BitswapApiTest.cs @@ -1,6 +1,7 @@ using Ipfs.Engine.BlockExchange; using Microsoft.VisualStudio.TestTools.UnitTesting; using System; +using System.Collections.Generic; using System.IO; using System.Linq; using System.Text; @@ -122,6 +123,163 @@ public async Task OnConnect_Sends_WantList() } } + [TestMethod] + public async Task Sends_Cancel_on_Unwant() + { + ipfs.Options.Discovery.DisableMdns = true; + ipfs.Options.Discovery.BootstrapPeers = new MultiAddress[0]; + await ipfs.StartAsync(); + + ipfsOther.Options.Discovery.DisableMdns = true; + ipfsOther.Options.Discovery.BootstrapPeers = new MultiAddress[0]; + await ipfsOther.StartAsync(); + try + { + var local = await ipfs.LocalPeer; + var remote = await ipfsOther.LocalPeer; + + var data = Guid.NewGuid().ToByteArray(); + var cid = new Cid { Hash = MultiHash.ComputeHash(data) }; + var cts = new CancellationTokenSource(); + var _ = ipfs.Block.GetAsync(cid, cts.Token); + await ipfs.Swarm.ConnectAsync(remote.Addresses.First()); + + // Send the want list. + var endTime = DateTime.Now.AddSeconds(10); + IEnumerable wants = null; + while (DateTime.Now < endTime && (wants == null || !wants.Contains(cid))) + { + wants = await ipfsOther.Bitswap.WantsAsync(local.Id); + await Task.Delay(200); + } + Assert.IsTrue(wants != null && wants.Contains(cid), "want list not sent"); + + // Send the cancel list. + //cts.Cancel(); + await ipfs.Bitswap.UnwantAsync(cid); + endTime = DateTime.Now.AddSeconds(10); + wants = null; + while (DateTime.Now < endTime && (wants == null || wants.Contains(cid))) + { + wants = await ipfsOther.Bitswap.WantsAsync(local.Id); + await Task.Delay(200); + } + Assert.IsTrue(wants != null && !wants.Contains(cid), "cancel list not sent"); + } + finally + { + await ipfsOther.StopAsync(); + await ipfs.StopAsync(); + + ipfs.Options.Discovery = new DiscoveryOptions(); + ipfsOther.Options.Discovery = new DiscoveryOptions(); + } + } + + [TestMethod] + public async Task Sends_Cancel_on_Cancellation() + { + ipfs.Options.Discovery.DisableMdns = true; + ipfs.Options.Discovery.BootstrapPeers = new MultiAddress[0]; + await ipfs.StartAsync(); + + ipfsOther.Options.Discovery.DisableMdns = true; + ipfsOther.Options.Discovery.BootstrapPeers = new MultiAddress[0]; + await ipfsOther.StartAsync(); + try + { + var local = await ipfs.LocalPeer; + var remote = await ipfsOther.LocalPeer; + + var data = Guid.NewGuid().ToByteArray(); + var cid = new Cid { Hash = MultiHash.ComputeHash(data) }; + var cts = new CancellationTokenSource(); + var _ = ipfs.Block.GetAsync(cid, cts.Token); + await ipfs.Swarm.ConnectAsync(remote.Addresses.First()); + + // Send the want list. + var endTime = DateTime.Now.AddSeconds(10); + IEnumerable wants = null; + while (DateTime.Now < endTime && (wants == null || !wants.Contains(cid))) + { + wants = await ipfsOther.Bitswap.WantsAsync(local.Id); + await Task.Delay(200); + } + Assert.IsTrue(wants != null && wants.Contains(cid), "want list not sent"); + + // Send the cancel list. + cts.Cancel(); + endTime = DateTime.Now.AddSeconds(10); + wants = null; + while (DateTime.Now < endTime && (wants == null || wants.Contains(cid))) + { + wants = await ipfsOther.Bitswap.WantsAsync(local.Id); + await Task.Delay(200); + } + Assert.IsTrue(wants != null && !wants.Contains(cid), "cancel list not sent"); + } + finally + { + await ipfsOther.StopAsync(); + await ipfs.StopAsync(); + + ipfs.Options.Discovery = new DiscoveryOptions(); + ipfsOther.Options.Discovery = new DiscoveryOptions(); + } + } + + [TestMethod] + public async Task Sends_Cancel_on_Found() + { + ipfs.Options.Discovery.DisableMdns = true; + ipfs.Options.Discovery.BootstrapPeers = new MultiAddress[0]; + await ipfs.StartAsync(); + + ipfsOther.Options.Discovery.DisableMdns = true; + ipfsOther.Options.Discovery.BootstrapPeers = new MultiAddress[0]; + await ipfsOther.StartAsync(); + try + { + var local = await ipfs.LocalPeer; + var remote = await ipfsOther.LocalPeer; + + var data = Guid.NewGuid().ToByteArray(); + var cid = new Cid { Hash = MultiHash.ComputeHash(data) }; + var cts = new CancellationTokenSource(); + var _ = ipfs.Block.GetAsync(cid, cts.Token); + await ipfs.Swarm.ConnectAsync(remote.Addresses.First()); + + // Send the want list. + var endTime = DateTime.Now.AddSeconds(10); + IEnumerable wants = null; + while (DateTime.Now < endTime && (wants == null || !wants.Contains(cid))) + { + wants = await ipfsOther.Bitswap.WantsAsync(local.Id); + await Task.Delay(200); + } + Assert.IsTrue(wants != null && wants.Contains(cid), "want list not sent"); + + // Send the cancel list. + await ipfs.Block.PutAsync(data); + endTime = DateTime.Now.AddSeconds(10); + wants = null; + while (DateTime.Now < endTime && (wants == null || wants.Contains(cid))) + { + wants = await ipfsOther.Bitswap.WantsAsync(local.Id); + await Task.Delay(200); + } + Assert.IsTrue(wants != null && !wants.Contains(cid), "cancel list not sent"); + } + finally + { + await ipfsOther.StopAsync(); + await ipfs.StopAsync(); + + ipfs.Options.Discovery = new DiscoveryOptions(); + ipfsOther.Options.Discovery = new DiscoveryOptions(); + } + } + [TestMethod] public async Task GetsBlock_OnConnect() { diff --git a/test/CoreApi/FileSystemApiTest.cs b/test/CoreApi/FileSystemApiTest.cs index 4586c198..8d433b85 100644 --- a/test/CoreApi/FileSystemApiTest.cs +++ b/test/CoreApi/FileSystemApiTest.cs @@ -796,7 +796,6 @@ public async Task Read_From_OtherNode() // Start bootstrap node. b.Options.Discovery.DisableMdns = true; - b.Options.Swarm.MinConnections = 0; b.Options.Swarm.PrivateNetworkKey = psk; b.Options.Discovery.BootstrapPeers = new MultiAddress[0]; await b.StartAsync(); @@ -808,7 +807,6 @@ public async Task Read_From_OtherNode() // Node that has the content. c.Options.Discovery.DisableMdns = true; - c.Options.Swarm.MinConnections = 0; c.Options.Swarm.PrivateNetworkKey = psk; c.Options.Discovery.BootstrapPeers = bootstrapPeers; await c.StartAsync(); @@ -820,10 +818,10 @@ public async Task Read_From_OtherNode() // Node that reads the content. a.Options.Discovery.DisableMdns = true; - a.Options.Swarm.MinConnections = 0; a.Options.Swarm.PrivateNetworkKey = psk; a.Options.Discovery.BootstrapPeers = bootstrapPeers; await a.StartAsync(); + await a.Swarm.ConnectAsync(bootstrapPeers[0]); Console.WriteLine($"A is {await a.LocalPeer}"); var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30)); var content = await a.FileSystem.ReadAllTextAsync(cid, cts.Token);