Skip to content

Commit 9ea934e

Browse files
fix: improve sessions implementation (#495)
Moves most common session code into an abstract superclass to remove duplication. - Sessions are created synchronously - The root CID of a session is filled on the first CID retrieval - Providers are found and queried for the root block directly, any that have it are added to the session - Providers that have errored (e.g. protocol selection failure) are excluded from the session - Bitswap only queries provider peers, not directly connected peers - HTTP Gatways are loaded from the routing - When providers are returned without multiaddrs we try to load them without blocking yielding of other providers --------- Co-authored-by: Russell Dempsey <[email protected]>
1 parent 9a10498 commit 9ea934e

31 files changed

+1665
-787
lines changed

Diff for: packages/bitswap/package.json

+1-2
Original file line numberDiff line numberDiff line change
@@ -149,12 +149,12 @@
149149
},
150150
"dependencies": {
151151
"@helia/interface": "^4.0.0",
152+
"@helia/utils": "^0.1.0",
152153
"@libp2p/interface": "^1.1.2",
153154
"@libp2p/logger": "^4.0.5",
154155
"@libp2p/peer-collections": "^5.1.6",
155156
"@libp2p/utils": "^5.2.3",
156157
"@multiformats/multiaddr": "^12.1.14",
157-
"@multiformats/multiaddr-matcher": "^1.1.2",
158158
"any-signal": "^4.1.1",
159159
"debug": "^4.3.4",
160160
"interface-blockstore": "^5.2.9",
@@ -165,7 +165,6 @@
165165
"it-length-prefixed": "^9.0.0",
166166
"it-length-prefixed-stream": "^1.1.6",
167167
"it-map": "^3.0.5",
168-
"it-merge": "^3.0.3",
169168
"it-pipe": "^3.0.1",
170169
"it-take": "^3.0.1",
171170
"multiformats": "^13.0.1",

Diff for: packages/bitswap/src/bitswap.ts

+4-14
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,13 @@
11
/* eslint-disable no-loop-func */
2-
import { DEFAULT_SESSION_MAX_PROVIDERS, DEFAULT_SESSION_MIN_PROVIDERS, DEFAULT_SESSION_PROVIDER_QUERY_CONCURRENCY } from '@helia/interface'
32
import { setMaxListeners } from '@libp2p/interface'
43
import { anySignal } from 'any-signal'
54
import { Network } from './network.js'
65
import { PeerWantLists } from './peer-want-lists/index.js'
76
import { createBitswapSession } from './session.js'
87
import { Stats } from './stats.js'
98
import { WantList } from './want-list.js'
10-
import type { BitswapOptions, Bitswap as BitswapInterface, BitswapWantProgressEvents, BitswapNotifyProgressEvents, BitswapSession, WantListEntry, BitswapComponents, CreateBitswapSessionOptions } from './index.js'
9+
import type { BitswapOptions, Bitswap as BitswapInterface, BitswapWantProgressEvents, BitswapNotifyProgressEvents, WantListEntry, BitswapComponents } from './index.js'
10+
import type { BlockBroker, CreateSessionOptions } from '@helia/interface'
1111
import type { ComponentLogger, PeerId } from '@libp2p/interface'
1212
import type { Logger } from '@libp2p/logger'
1313
import type { AbortOptions } from '@multiformats/multiaddr'
@@ -62,22 +62,12 @@ export class Bitswap implements BitswapInterface {
6262
}, init)
6363
}
6464

65-
async createSession (root: CID, options?: CreateBitswapSessionOptions): Promise<BitswapSession> {
66-
const minProviders = options?.minProviders ?? DEFAULT_SESSION_MIN_PROVIDERS
67-
const maxProviders = options?.maxProviders ?? DEFAULT_SESSION_MAX_PROVIDERS
68-
65+
createSession (options: CreateSessionOptions = {}): Required<Pick<BlockBroker<BitswapWantProgressEvents>, 'retrieve'>> {
6966
return createBitswapSession({
7067
wantList: this.wantList,
7168
network: this.network,
7269
logger: this.logger
73-
}, {
74-
root,
75-
queryConcurrency: options?.providerQueryConcurrency ?? DEFAULT_SESSION_PROVIDER_QUERY_CONCURRENCY,
76-
minProviders,
77-
maxProviders,
78-
connectedPeers: options?.queryConnectedPeers !== false ? [...this.wantList.peers.keys()] : [],
79-
signal: options?.signal
80-
})
70+
}, options)
8171
}
8272

8373
async want (cid: CID, options: WantOptions = {}): Promise<Uint8Array> {

Diff for: packages/bitswap/src/index.ts

+2-43
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,9 @@
99
import { Bitswap as BitswapClass } from './bitswap.js'
1010
import type { BitswapNetworkNotifyProgressEvents, BitswapNetworkWantProgressEvents } from './network.js'
1111
import type { WantType } from './pb/message.js'
12-
import type { CreateSessionOptions } from '@helia/interface'
12+
import type { BlockBroker, CreateSessionOptions } from '@helia/interface'
1313
import type { Routing } from '@helia/interface/routing'
1414
import type { Libp2p, AbortOptions, Startable, ComponentLogger, Metrics, PeerId } from '@libp2p/interface'
15-
import type { PeerSet } from '@libp2p/peer-collections'
1615
import type { Blockstore } from 'interface-blockstore'
1716
import type { CID } from 'multiformats/cid'
1817
import type { MultihashHasher } from 'multiformats/hashes/interface'
@@ -29,52 +28,12 @@ export type BitswapWantBlockProgressEvents =
2928
ProgressEvent<'bitswap:want-block:block', CID> |
3029
BitswapNetworkWantProgressEvents
3130

32-
/**
33-
* A bitswap session is a network overlay consisting of peers that all have the
34-
* first block in a file. Subsequent requests will only go to these peers.
35-
*/
36-
export interface BitswapSession {
37-
/**
38-
* The peers in this session
39-
*/
40-
peers: PeerSet
41-
42-
/**
43-
* Fetch an additional CID from this DAG
44-
*/
45-
want(cid: CID, options?: AbortOptions & ProgressOptions<BitswapWantProgressEvents>): Promise<Uint8Array>
46-
}
47-
4831
export interface WantListEntry {
4932
cid: CID
5033
priority: number
5134
wantType: WantType
5235
}
5336

54-
export interface CreateBitswapSessionOptions extends CreateSessionOptions<BitswapWantProgressEvents> {
55-
/**
56-
* If true, query connected peers before searching for providers via
57-
* Helia routers
58-
*
59-
* @default true
60-
*/
61-
queryConnectedPeers?: boolean
62-
63-
/**
64-
* If true, search for providers via Helia routers to query for the root CID
65-
*
66-
* @default true
67-
*/
68-
queryRoutingPeers?: boolean
69-
70-
/**
71-
* The priority to use when querying availability of the root CID
72-
*
73-
* @default 1
74-
*/
75-
priority?: number
76-
}
77-
7837
export interface Bitswap extends Startable {
7938
/**
8039
* Returns the current state of the wantlist
@@ -100,7 +59,7 @@ export interface Bitswap extends Startable {
10059
/**
10160
* Start a session to retrieve a file from the network
10261
*/
103-
createSession(root: CID, options?: AbortOptions & ProgressOptions<BitswapWantProgressEvents>): Promise<BitswapSession>
62+
createSession(options?: CreateSessionOptions<BitswapWantProgressEvents>): Required<Pick<BlockBroker<BitswapWantProgressEvents>, 'retrieve'>>
10463
}
10564

10665
export interface MultihashHasherLoader {

Diff for: packages/bitswap/src/network.ts

+11-28
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
import { CodeError, TypedEventEmitter, setMaxListeners } from '@libp2p/interface'
22
import { PeerQueue, type PeerQueueJobOptions } from '@libp2p/utils/peer-queue'
3-
import { Circuit } from '@multiformats/multiaddr-matcher'
43
import { anySignal } from 'any-signal'
54
import debug from 'debug'
65
import drain from 'it-drain'
@@ -129,14 +128,14 @@ export class Network extends TypedEventEmitter<NetworkEvents> {
129128
this.messageSendTimeout = init.messageSendTimeout ?? DEFAULT_MESSAGE_SEND_TIMEOUT
130129
this.runOnTransientConnections = init.runOnTransientConnections ?? DEFAULT_RUN_ON_TRANSIENT_CONNECTIONS
131130
this.metrics = {
132-
blocksSent: components.libp2p.metrics?.registerCounter('ipfs_bitswap_sent_blocks_total'),
133-
dataSent: components.libp2p.metrics?.registerCounter('ipfs_bitswap_sent_data_bytes_total')
131+
blocksSent: components.libp2p.metrics?.registerCounter('helia_bitswap_sent_blocks_total'),
132+
dataSent: components.libp2p.metrics?.registerCounter('helia_bitswap_sent_data_bytes_total')
134133
}
135134

136135
this.sendQueue = new PeerQueue({
137136
concurrency: init.messageSendConcurrency ?? DEFAULT_MESSAGE_SEND_CONCURRENCY,
138137
metrics: components.libp2p.metrics,
139-
metricName: 'ipfs_bitswap_message_send_queue'
138+
metricName: 'helia_bitswap_message_send_queue'
140139
})
141140
this.sendQueue.addEventListener('error', (evt) => {
142141
this.log.error('error sending wantlist to peer', evt.detail)
@@ -263,29 +262,12 @@ export class Network extends TypedEventEmitter<NetworkEvents> {
263262
options?.onProgress?.(new CustomProgressEvent<PeerId>('bitswap:network:find-providers', cid))
264263

265264
for await (const provider of this.routing.findProviders(cid, options)) {
266-
// unless we explicitly run on transient connections, skip peers that only
267-
// have circuit relay addresses as bitswap won't run over them
268-
if (!this.runOnTransientConnections) {
269-
let hasDirectAddress = false
270-
271-
for (let ma of provider.multiaddrs) {
272-
if (ma.getPeerId() == null) {
273-
ma = ma.encapsulate(`/p2p/${provider.id}`)
274-
}
275-
276-
if (!Circuit.exactMatch(ma)) {
277-
hasDirectAddress = true
278-
break
279-
}
280-
}
281-
282-
if (!hasDirectAddress) {
283-
continue
284-
}
285-
}
265+
// make sure we can dial the provider
266+
const dialable = await this.libp2p.isDialable(provider.multiaddrs, {
267+
runOnTransientConnection: this.runOnTransientConnections
268+
})
286269

287-
// ignore non-bitswap providers
288-
if (provider.protocols?.includes('transport-bitswap') === false) {
270+
if (!dialable) {
289271
continue
290272
}
291273

@@ -327,8 +309,9 @@ export class Network extends TypedEventEmitter<NetworkEvents> {
327309
pendingBytes: msg.pendingBytes ?? 0
328310
}
329311

330-
const signal = anySignal([AbortSignal.timeout(this.messageSendTimeout), options?.signal])
331-
setMaxListeners(Infinity, signal)
312+
const timeoutSignal = AbortSignal.timeout(this.messageSendTimeout)
313+
const signal = anySignal([timeoutSignal, options?.signal])
314+
setMaxListeners(Infinity, timeoutSignal, signal)
332315

333316
try {
334317
const existingJob = this.sendQueue.queue.find(job => {

Diff for: packages/bitswap/src/peer-want-lists/index.ts

+1-1
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ export class PeerWantLists {
4545
this.log = components.logger.forComponent('helia:bitswap:peer-want-lists')
4646

4747
this.ledgerMap = trackedPeerMap({
48-
name: 'ipfs_bitswap_ledger_map',
48+
name: 'helia_bitswap_ledger_map',
4949
metrics: components.libp2p.metrics
5050
})
5151

0 commit comments

Comments
 (0)