Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Send cancels to peers #148

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
133 changes: 110 additions & 23 deletions src/BlockExchange/Bitswap.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
using PeerTalk;
using PeerTalk.Protocols;
using System;
using System.Collections;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
Expand Down Expand Up @@ -175,7 +176,7 @@ async void Swarm_ConnectionEstablished(object sender, PeerConnection connection)
var peer = await connection.IdentityEstablished.Task.ConfigureAwait(false);

// Fire and forget.
var _ = SendWantListAsync(peer, wants.Values, true);
var _ = SendWantListAsync(peer, wants.Keys, Enumerable.Empty<Cid>(), true);
}
catch (Exception e)
{
Expand Down Expand Up @@ -214,7 +215,7 @@ public Task StopAsync()
public IEnumerable<Cid> PeerWants(MultiHash peer)
{
return wants.Values
.Where(w => w.Peers.Contains(peer))
.Where(w => w.Tasks.Values.Contains(peer))
.Select(w => w.Id);
}

Expand All @@ -240,7 +241,7 @@ public IEnumerable<Cid> PeerWants(MultiHash peer)
/// someone will forward it to us.
/// <para>
/// Besides using <paramref name="cancel"/> for cancellation, the
/// <see cref="Unwant"/> method will also cancel the operation.
/// <see cref="Unwant(Cid, MultiHash)"/> method will also cancel the operation.
/// </para>
/// </remarks>
public Task<IDataBlock> WantAsync(Cid id, MultiHash peer, CancellationToken cancel)
Expand All @@ -253,27 +254,32 @@ public Task<IDataBlock> WantAsync(Cid id, MultiHash peer, CancellationToken canc
var tsc = new TaskCompletionSource<IDataBlock>();
var want = wants.AddOrUpdate(
id,
(key) => new WantedBlock
{
Id = id,
Consumers = new List<TaskCompletionSource<IDataBlock>> { tsc },
Peers = new List<MultiHash> { peer }
(key) => {
var block = new WantedBlock
{
Id = id,
Tasks = new ConcurrentDictionary<TaskCompletionSource<IDataBlock>, MultiHash>()
};
block.Tasks.TryAdd(tsc, peer);
return block;
},
(key, block) =>
{
block.Peers.Add(peer);
block.Consumers.Add(tsc);
block.Tasks.TryAdd(tsc, peer);
return block;
}
);

// If cancelled, then the block is unwanted.
cancel.Register(() => Unwant(id));
cancel.Register(() => Unwant(id, peer));

// If first time, tell other peers.
if (want.Consumers.Count == 1)
if (want.Tasks.Count == 1 && peer == Swarm.LocalPeer.Id)
{
var _ = SendWantListToAllAsync(new[] { want }, full: false);
var _ = SendWantListToAllAsync(
new[] { id },
Enumerable.Empty<Cid>(),
full: false);
BlockNeeded?.Invoke(this, new CidEventArgs { Id = want.Id });
}

Expand Down Expand Up @@ -302,13 +308,80 @@ public void Unwant(Cid id)

if (wants.TryRemove(id, out WantedBlock block))
{
foreach (var consumer in block.Consumers)
foreach (var task in block.Tasks.Keys)
{
consumer.SetCanceled();
task.TrySetCanceled();
}

// Tell the swarm.
var _ = SendWantListToAllAsync(
Enumerable.Empty<Cid>(),
new[] { block.Id },
false);
}
}

/// <summary>
/// Removes the block from the want list.
/// </summary>
/// <param name="id">
/// The CID of the block to remove from the want list.
/// </param>
/// <param name="peer">
/// The id of the peer that no longer wants the block.
/// </param>
/// <remarks>
/// Any tasks from the <paramref name="peer"/> waiting for the block are cancelled.
/// <para>
/// No exception is thrown if the <paramref name="id"/> is not
/// on the want list.
/// </para>
/// </remarks>
public void Unwant(Cid id, MultiHash peer)
{
if (log.IsDebugEnabled)
{
log.Debug($"Unwant {id} for {peer}");
}

// Short curcuit if id is not not wanted or not wanted by
// the peer.
if (!wants.TryGetValue(id, out WantedBlock want))
{
return;
}
if (!want.Tasks.Values.Contains(peer))
{
return;
}

// Get the tasks that want the CID for the peer.
var tasks = want.Tasks
.Where(t => t.Value == peer)
.Select(t => t.Key)
.ToArray();
foreach (var task in tasks)
{
log.Debug($"cancel {id} for {peer}");
task.TrySetCanceled();
want.Tasks.TryRemove(task, out _);
}
if (peer == Swarm.LocalPeer.Id)
{
log.Debug($"sending cancel {id}");
// Tell the swarm.
var _ = SendWantListToAllAsync(
Enumerable.Empty<Cid>(),
new[] { id },
false);
}

// TODO: Tell the swarm
// If no other peer wants the CID, then remove it
// from the want list.
if (want.Tasks.Count == 0)
{
wants.TryRemove(id, out _);
}
}

/// <summary>
Expand Down Expand Up @@ -455,11 +528,18 @@ public int Found(IDataBlock block)
{
if (wants.TryRemove(block.Id, out WantedBlock want))
{
foreach (var consumer in want.Consumers)
foreach (var task in want.Tasks.Keys)
{
consumer.SetResult(block);
task.SetResult(block);
}
return want.Consumers.Count;

// Tell the swarm.
var _ = SendWantListToAllAsync(
Enumerable.Empty<Cid>(),
new[] { block.Id },
false);

return want.Tasks.Count;
}

return 0;
Expand All @@ -468,7 +548,10 @@ public int Found(IDataBlock block)
/// <summary>
/// Send our want list to the connected peers.
/// </summary>
async Task SendWantListToAllAsync(IEnumerable<WantedBlock> wants, bool full)
async Task SendWantListToAllAsync(
IEnumerable<Cid> wants,
IEnumerable<Cid> cancels,
bool full)
{
if (Swarm == null)
return;
Expand All @@ -477,7 +560,7 @@ async Task SendWantListToAllAsync(IEnumerable<WantedBlock> wants, bool full)
{
var tasks = Swarm.KnownPeers
.Where(p => p.ConnectedAddress != null)
.Select(p => SendWantListAsync(p, wants, full))
.Select(p => SendWantListAsync(p, wants, cancels, full))
.ToArray();
if (log.IsDebugEnabled)
log.Debug($"Spamming {tasks.Count()} connected peers");
Expand All @@ -492,7 +575,11 @@ async Task SendWantListToAllAsync(IEnumerable<WantedBlock> wants, bool full)
}
}

async Task SendWantListAsync(Peer peer, IEnumerable<WantedBlock> wants, bool full)
async Task SendWantListAsync(
Peer peer,
IEnumerable<Cid> wants,
IEnumerable<Cid> cancels,
bool full)
{
log.Debug($"sending want list to {peer}");

Expand All @@ -504,7 +591,7 @@ async Task SendWantListAsync(Peer peer, IEnumerable<WantedBlock> wants, bool ful
{
using (var stream = await Swarm.DialAsync(peer, protocol.ToString()).ConfigureAwait(false))
{
await protocol.SendWantsAsync(stream, wants, full: full).ConfigureAwait(false);
await protocol.SendWantsAsync(stream, wants, cancels, full: full).ConfigureAwait(false);
}
return;
}
Expand Down
24 changes: 13 additions & 11 deletions src/BlockExchange/Bitswap1.cs
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,7 @@ public override string ToString()
Cid cid = s;
if (entry.cancel)
{
// TODO: Unwant specific to remote peer
Bitswap.Unwant(cid);
Bitswap.Unwant(cid, connection.RemotePeer.Id);
}
else
{
Expand Down Expand Up @@ -117,24 +116,27 @@ async Task GetBlockAsync(Cid cid, Peer remotePeer, CancellationToken cancel)
/// <inheritdoc />
public async Task SendWantsAsync(
Stream stream,
IEnumerable<WantedBlock> wants,
IEnumerable<Cid> wants,
IEnumerable<Cid> cancels,
bool full = true,
CancellationToken cancel = default(CancellationToken)
)
{
log.Debug("Sending want list");

var entries = new List<Entry>();
foreach (var cid in wants)
{
entries.Add(new Entry { block = cid.ToArray() });
}
foreach (var cid in cancels)
{
entries.Add(new Entry { block = cid.ToArray(), cancel = true });
}
var message = new Message
{
wantlist = new Wantlist
{
full = full,
entries = wants
.Select(w => new Entry
{
block = w.Id.Hash.ToArray()
})
.ToArray()
entries = entries.ToArray(),
}
};

Expand Down
25 changes: 14 additions & 11 deletions src/BlockExchange/Bitswap11.cs
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,10 @@ public override string ToString()
foreach (var entry in request.wantlist.entries)
{
var cid = Cid.Read(entry.block);
log.Debug($"entry {cid} cancel {entry.cancel}");
if (entry.cancel)
{
// TODO: Unwant specific to remote peer
Bitswap.Unwant(cid);
Bitswap.Unwant(cid, connection.RemotePeer.Id);
}
else
{
Expand Down Expand Up @@ -126,24 +126,27 @@ async Task GetBlockAsync(Cid cid, Peer remotePeer, CancellationToken cancel)
/// <inheritdoc />
public async Task SendWantsAsync(
Stream stream,
IEnumerable<WantedBlock> wants,
IEnumerable<Cid> wants,
IEnumerable<Cid> cancels,
bool full = true,
CancellationToken cancel = default(CancellationToken)
)
{
var entries = new List<Entry>();
foreach (var cid in wants)
{
entries.Add(new Entry { block = cid.ToArray() });
}
foreach (var cid in cancels)
{
entries.Add(new Entry { block = cid.ToArray(), cancel = true });
}
var message = new Message
{
wantlist = new Wantlist
{
full = full,
entries = wants
.Select(w => {
return new Entry
{
block = w.Id.ToArray()
};
})
.ToArray()
entries = entries.ToArray(),
},
payload = new List<Block>(0)
};
Expand Down
8 changes: 6 additions & 2 deletions src/BlockExchange/IBitswapProtocol.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,10 @@ public interface IBitswapProtocol : IPeerProtocol
/// The destination of the want list.
/// </param>
/// <param name="wants">
/// A sequence of <see cref="WantedBlock"/>.
/// A sequence of <see cref="Cid"/> that is wanted.
/// </param>
/// <param name="cancels">
/// A sequence of <see cref="Cid"/> that is not wanted.
/// </param>
/// <param name="full">
/// <b>true</b> if <paramref name="wants"/> is the full want list.
Expand All @@ -35,7 +38,8 @@ public interface IBitswapProtocol : IPeerProtocol
Task SendWantsAsync
(
Stream stream,
IEnumerable<WantedBlock> wants,
IEnumerable<Cid> wants,
IEnumerable<Cid> cancels,
bool full = true,
CancellationToken cancel = default(CancellationToken)
);
Expand Down
13 changes: 6 additions & 7 deletions src/BlockExchange/WantedBlock.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,18 @@ namespace Ipfs.Engine.BlockExchange
public class WantedBlock
{
/// <summary>
/// The content ID of the block;
/// The content ID of the block.
/// </summary>
public Cid Id;

/// <summary>
/// The peers that want the block.
/// </summary>
public List<MultiHash> Peers;

/// <summary>
/// The consumers that are waiting for the block.
/// </summary>
public List<TaskCompletionSource<IDataBlock>> Consumers;
/// <remarks>
/// The keys is a TaskCompletionSource and the value is
/// the peer ID.
/// </remarks>
public ConcurrentDictionary<TaskCompletionSource<IDataBlock>, MultiHash> Tasks;
}

}
2 changes: 1 addition & 1 deletion src/IpfsEngine.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@
<PackageReference Include="Makaretu.Dns.Unicast" Version="0.11.1" />
<PackageReference Include="Newtonsoft.Json" Version="12.0.2" />
<PackageReference Include="Nito.AsyncEx.Coordination" Version="5.0.0" />
<PackageReference Include="PeerTalk" Version="0.20.1" />
<PackageReference Include="PeerTalk" Version="0.20.2" />
<PackageReference Include="PeterO.Cbor" Version="3.1.0" />
<PackageReference Include="Portable.BouncyCastle" Version="1.8.5" />
<PackageReference Include="protobuf-net" Version="2.4.0" />
Expand Down
Loading