-
Notifications
You must be signed in to change notification settings - Fork 4
Add alternative provider retrieval check #132
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
aedec80
233cc1f
83e7f31
afe30dd
23ee203
4bc1076
63424ff
8a94f4e
c4350b6
edfdef1
dbf0fd7
d33f276
97bee91
4b6d0bc
97fcc28
5121a49
4065784
74f06e9
ea8cce4
f9afe34
9959b50
a2da050
9759d80
820e8a3
5b13287
3c14f84
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -34,3 +34,6 @@ export { | |
export { assertOkResponse } from 'https://cdn.skypack.dev/[email protected]/?dts' | ||
import pRetry from 'https://cdn.skypack.dev/[email protected]/?dts' | ||
export { pRetry } | ||
|
||
import Prando from 'https://cdn.jsdelivr.net/npm/[email protected]/+esm' | ||
export { Prando } | ||
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,5 +1,6 @@ | ||
/* global Zinnia */ | ||
|
||
/** @import { Provider } from './ipni-client.js' */ | ||
import { ActivityState } from './activity-state.js' | ||
import { | ||
SPARK_VERSION, | ||
|
@@ -18,6 +19,7 @@ import { | |
CarBlockIterator, | ||
encodeHex, | ||
HashMismatchError, | ||
Prando, | ||
UnsupportedHashError, | ||
validateBlock, | ||
} from '../vendor/deno-deps.js' | ||
|
@@ -41,16 +43,17 @@ export default class Spark { | |
|
||
async getRetrieval() { | ||
const retrieval = await this.#tasker.next() | ||
if (retrieval) { | ||
if (retrieval.retrievalTask) { | ||
console.log({ retrieval }) | ||
} | ||
|
||
return retrieval | ||
} | ||
|
||
async executeRetrievalCheck(retrieval, stats) { | ||
console.log(`Calling Filecoin JSON-RPC to get PeerId of miner ${retrieval.minerId}`) | ||
async executeRetrievalCheck({ retrievalTask, stats, randomness }) { | ||
console.log(`Calling Filecoin JSON-RPC to get PeerId of miner ${retrievalTask.minerId}`) | ||
try { | ||
const peerId = await this.#getIndexProviderPeerId(retrieval.minerId) | ||
const peerId = await this.#getIndexProviderPeerId(retrievalTask.minerId) | ||
console.log(`Found peer id: ${peerId}`) | ||
stats.providerId = peerId | ||
} catch (err) { | ||
|
@@ -70,19 +73,39 @@ export default class Spark { | |
throw err | ||
} | ||
|
||
console.log(`Querying IPNI to find retrieval providers for ${retrieval.cid}`) | ||
const { indexerResult, provider } = await queryTheIndex(retrieval.cid, stats.providerId) | ||
console.log(`Querying IPNI to find retrieval providers for ${retrievalTask.cid}`) | ||
const { indexerResult, provider, alternativeProviders } = await queryTheIndex( | ||
retrievalTask.cid, | ||
stats.providerId, | ||
) | ||
stats.indexerResult = indexerResult | ||
|
||
const providerFound = indexerResult === 'OK' || indexerResult === 'HTTP_NOT_ADVERTISED' | ||
if (!providerFound) return | ||
const noValidAdvertisement = indexerResult === 'NO_VALID_ADVERTISEMENT' | ||
|
||
// In case index lookup failed we will not perform any retrieval | ||
if (!providerFound && !noValidAdvertisement) return | ||
|
||
// In case we fail to find a valid advertisement for the provider | ||
// we will try to perform network wide retrieval from other providers | ||
if (noValidAdvertisement) { | ||
console.log( | ||
'No valid advertisement found. Trying to retrieve from an alternative provider...', | ||
) | ||
stats.alternativeProviderCheck = await this.checkRetrievalFromAlternativeProvider({ | ||
alternativeProviders, | ||
randomness, | ||
cid: retrievalTask.cid, | ||
}) | ||
return | ||
} | ||
|
||
stats.protocol = provider.protocol | ||
stats.providerAddress = provider.address | ||
|
||
await this.fetchCAR(provider.protocol, provider.address, retrieval.cid, stats) | ||
await this.fetchCAR(provider.protocol, provider.address, retrievalTask.cid, stats) | ||
if (stats.protocol === 'http') { | ||
await this.testHeadRequest(provider.address, retrieval.cid, stats) | ||
await this.testHeadRequest(provider.address, retrievalTask.cid, stats) | ||
} | ||
} | ||
|
||
|
@@ -202,6 +225,31 @@ export default class Spark { | |
} | ||
} | ||
|
||
async checkRetrievalFromAlternativeProvider({ alternativeProviders, randomness, cid }) { | ||
if (!alternativeProviders.length) { | ||
console.info('No alternative providers found for this CID.') | ||
return | ||
} | ||
|
||
const randomProvider = pickRandomProvider(alternativeProviders, randomness) | ||
if (!randomProvider) { | ||
console.warn( | ||
'No providers serving the content via HTTP or Graphsync found. Skipping network-wide retrieval check.', | ||
) | ||
return | ||
} | ||
|
||
const alternativeProviderRetrievalStats = newAlternativeProviderCheckStats() | ||
await this.fetchCAR( | ||
randomProvider.protocol, | ||
randomProvider.address, | ||
cid, | ||
alternativeProviderRetrievalStats, | ||
) | ||
|
||
return alternativeProviderRetrievalStats | ||
} | ||
|
||
async submitMeasurement(task, stats) { | ||
console.log('Submitting measurement...') | ||
const payload = { | ||
|
@@ -228,17 +276,17 @@ export default class Spark { | |
} | ||
|
||
async nextRetrieval() { | ||
const retrieval = await this.getRetrieval() | ||
if (!retrieval) { | ||
const { retrievalTask, randomness } = await this.getRetrieval() | ||
if (!retrievalTask) { | ||
console.log('Completed all tasks for the current round. Waiting for the next round to start.') | ||
return | ||
} | ||
|
||
const stats = newStats() | ||
|
||
await this.executeRetrievalCheck(retrieval, stats) | ||
await this.executeRetrievalCheck({ retrievalTask, randomness, stats }) | ||
|
||
const measurementId = await this.submitMeasurement(retrieval, { ...stats }) | ||
const measurementId = await this.submitMeasurement(retrievalTask, { ...stats }) | ||
Zinnia.jobCompleted() | ||
return measurementId | ||
} | ||
|
@@ -315,6 +363,17 @@ export function newStats() { | |
carChecksum: null, | ||
statusCode: null, | ||
headStatusCode: null, | ||
alternativeProviderCheck: null, | ||
} | ||
} | ||
|
||
function newAlternativeProviderCheckStats() { | ||
return { | ||
statusCode: null, | ||
timeout: false, | ||
endAt: null, | ||
carTooLarge: false, | ||
providerId: null, | ||
} | ||
} | ||
|
||
|
@@ -395,3 +454,62 @@ function mapErrorToStatusCode(err) { | |
// Fallback code for unknown errors | ||
return 600 | ||
} | ||
|
||
/** | ||
* Picks a random provider based on their priority and supplied randomness. | ||
* | ||
* Providers are prioritized in the following order: | ||
* | ||
* 1. HTTP Providers with Piece Info advertised in their ContextID. | ||
* 2. Graphsync Providers with Piece Info advertised in their ContextID. | ||
* 3. HTTP Providers. | ||
* 4. Graphsync Providers. | ||
* | ||
* @param {Provider[]} providers | ||
* @param {number} randomness | ||
* @returns {Provider | undefined} | ||
*/ | ||
export function pickRandomProvider(providers, randomness) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
const rng = new Prando(randomness) | ||
|
||
const filterByProtocol = (items, protocol) => | ||
items.filter((provider) => provider.protocol === protocol) | ||
|
||
const pickRandomItem = (items) => { | ||
if (!items.length) return undefined | ||
return items[Math.floor(rng.next() * items.length)] | ||
} | ||
|
||
const providersWithPieceInfoContextID = providers.filter( | ||
(p) => p.contextId.startsWith('ghsA') && p.protocol !== 'bitswap', | ||
) | ||
|
||
// Priority 1: HTTP providers with ContextID containing PieceCID | ||
const httpProvidersWithPieceInfoContextID = filterByProtocol( | ||
providersWithPieceInfoContextID, | ||
'http', | ||
) | ||
if (httpProvidersWithPieceInfoContextID.length) { | ||
return pickRandomItem(httpProvidersWithPieceInfoContextID, randomness) | ||
} | ||
|
||
// Priority 2: Graphsync providers with ContextID containing PieceCID | ||
const graphsyncProvidersWithPieceInfoContextID = filterByProtocol( | ||
providersWithPieceInfoContextID, | ||
'graphsync', | ||
) | ||
if (graphsyncProvidersWithPieceInfoContextID.length) { | ||
return pickRandomItem(graphsyncProvidersWithPieceInfoContextID, randomness) | ||
} | ||
|
||
// Priority 3: HTTP providers | ||
const httpProviders = filterByProtocol(providers, 'http') | ||
if (httpProviders.length) return pickRandomItem(httpProviders, randomness) | ||
|
||
// Priority 4: Graphsync providers | ||
const graphsyncProviders = filterByProtocol(providers, 'graphsync') | ||
if (graphsyncProviders.length) return pickRandomItem(graphsyncProviders, randomness) | ||
|
||
// No valid providers found | ||
return undefined | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -15,6 +15,7 @@ export class Tasker { | |
#remainingRoundTasks | ||
#fetch | ||
#activity | ||
#randomness | ||
|
||
/** | ||
* @param {object} args | ||
|
@@ -35,11 +36,12 @@ export class Tasker { | |
} | ||
|
||
/** | ||
* @returns {Task | undefined} | ||
* @returns {{retrievalTask?: RetrievalTask; randomness: number; }} | ||
*/ | ||
async next() { | ||
await this.#updateCurrentRound() | ||
return this.#remainingRoundTasks.pop() | ||
const retrievalTask = this.#remainingRoundTasks.pop() | ||
return { retrievalTask, randomness: this.#randomness } | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We somehow need to export the round randomness so I have opted for returning object with Maybe adding the |
||
} | ||
|
||
async #updateCurrentRound() { | ||
|
@@ -72,13 +74,13 @@ export class Tasker { | |
console.log(' %s retrieval tasks', retrievalTasks.length) | ||
this.maxTasksPerRound = maxTasksPerNode | ||
|
||
const randomness = await getRandomnessForSparkRound(round.startEpoch) | ||
console.log(' randomness: %s', randomness) | ||
this.#randomness = await getRandomnessForSparkRound(round.startEpoch) | ||
console.log(' randomness: %s', this.#randomness) | ||
|
||
this.#remainingRoundTasks = await pickTasksForNode({ | ||
tasks: retrievalTasks, | ||
maxTasksPerRound: this.maxTasksPerRound, | ||
randomness, | ||
randomness: this.#randomness, | ||
stationId: Zinnia.stationId, | ||
}) | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have opted for using package instead of the custom implementation for the pRNG. There's lack of good packages for pRNG so I have settled in the end for Prando. I also wanted to use Deno's random package but from what I realize they have added it to newer versions of the std package which we don't use yet.
This may be a good thing to update in the future.