Skip to content
Draft
Show file tree
Hide file tree
Changes from 55 commits
Commits
Show all changes
56 commits
Select commit Hold shift + click to select a range
fbbe19e
add KV cache for C1 index
juliangruber Oct 1, 2025
de444fb
add passing test
juliangruber Oct 2, 2025
0d3f60a
add passing test
juliangruber Oct 2, 2025
69291bf
add missing wrangler config
juliangruber Oct 2, 2025
a093922
add test, fix implementation
juliangruber Oct 2, 2025
e8608ef
fix order
juliangruber Oct 2, 2025
8abd8ef
add passing test
juliangruber Oct 2, 2025
c504896
add passing test
juliangruber Oct 2, 2025
8f879bf
fix indentation
juliangruber Oct 2, 2025
6b5f822
fix unrelated test
juliangruber Oct 2, 2025
624b184
add passing test
juliangruber Oct 2, 2025
b66e758
Merge branch 'main' into add/index-cache
juliangruber Oct 2, 2025
5f18618
update KV ids
juliangruber Oct 2, 2025
19cb854
Update piece-retriever/bin/piece-retriever.js
juliangruber Oct 6, 2025
ed6c067
fix lint
juliangruber Oct 6, 2025
4acf720
Merge branch 'main' into add/index-cache
juliangruber Oct 7, 2025
7101bb0
fix lint & types
juliangruber Oct 7, 2025
fe5d246
refactor `sqlPlaceholders`
juliangruber Oct 7, 2025
d149614
cache bad bits separately (wip)
juliangruber Oct 7, 2025
7158b9d
Merge branch 'main' into add/index-cache
juliangruber Oct 15, 2025
f4115c4
implement strategy
juliangruber Oct 15, 2025
738399e
fmt
juliangruber Oct 15, 2025
fd35178
fix type errors
juliangruber Oct 15, 2025
93b0acc
fix test
juliangruber Oct 15, 2025
d6b095f
clean up bad bits tables
juliangruber Oct 15, 2025
117f878
fix test
juliangruber Oct 17, 2025
982b25a
fix test
juliangruber Oct 17, 2025
d4d1fec
move bad bits to kv
juliangruber Oct 20, 2025
d850cfa
chore: fix TypeScript errors
bajtos Oct 20, 2025
9107a5d
fix: remove apply-migrations.js reference from vitest.config.js
pyropy Oct 20, 2025
56e807d
Merge branch 'update/move-bad-bits-to-kv' into add/index-cache
juliangruber Oct 20, 2025
720fb95
move `getAllBadBitHashes`
juliangruber Oct 21, 2025
41ecdb9
only write etag if was capped
juliangruber Oct 21, 2025
bc780ed
give bad bits its own kv namespace
juliangruber Oct 21, 2025
c158b88
refactor `persistUpdates()`
juliangruber Oct 21, 2025
1a72ebe
allow `latest-hashes:*` to shrink
juliangruber Oct 21, 2025
112e8dd
Apply suggestion from @pyropy
juliangruber Oct 21, 2025
a818ad3
Apply suggestion from @pyropy
juliangruber Oct 21, 2025
eb3c03f
Apply suggestion from @pyropy
juliangruber Oct 21, 2025
174d6bd
refactor using `Set#difference()`
juliangruber Oct 21, 2025
97332cf
fmt
juliangruber Oct 21, 2025
30cd11b
Merge branch 'main' into update/move-bad-bits-to-kv
juliangruber Oct 21, 2025
17de33a
Merge branch 'update/move-bad-bits-to-kv' into add/index-cache
juliangruber Oct 21, 2025
3b2a846
rename `KV` to `INDEX_CACHE_KV`
juliangruber Oct 21, 2025
2270be5
refactor
juliangruber Oct 21, 2025
6e1e6cd
test: reliably clear all kv data
juliangruber Oct 22, 2025
d786f26
fix `wasCapped`
juliangruber Oct 22, 2025
0b891ef
refactor
juliangruber Oct 22, 2025
8f694c5
Update bad-bits/lib/store.js
juliangruber Oct 22, 2025
7d84c55
use r2 for `latest-hashes`
juliangruber Oct 22, 2025
e141fbe
Merge branch 'update/move-bad-bits-to-kv' of https://github.com/filbe…
juliangruber Oct 22, 2025
14c9fbc
Merge branch 'main' into update/move-bad-bits-to-kv
juliangruber Oct 22, 2025
9aed565
Merge branch 'update/move-bad-bits-to-kv' into add/index-cache
juliangruber Oct 22, 2025
248a038
fix test
juliangruber Oct 22, 2025
48597ec
Merge branch 'main' into add/index-cache
juliangruber Oct 22, 2025
041f09f
rebuild
juliangruber Oct 22, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions bad-bits/worker-configuration.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -465,7 +465,11 @@
deleteAlarm(options?: DurableObjectSetAlarmOptions): Promise<void>;
sync(): Promise<void>;
sql: SqlStorage;
<<<<<<< HEAD

Check failure on line 468 in bad-bits/worker-configuration.d.ts

View workflow job for this annotation

GitHub Actions / test

Merge conflict marker encountered.
INDEX_CACHE_KV: SyncKvStorage;
=======

Check failure on line 470 in bad-bits/worker-configuration.d.ts

View workflow job for this annotation

GitHub Actions / test

Merge conflict marker encountered.
kv: SyncKvStorage;
>>>>>>> main

Check failure on line 472 in bad-bits/worker-configuration.d.ts

View workflow job for this annotation

GitHub Actions / test

Merge conflict marker encountered.
transactionSync<T>(closure: () => T): T;
getCurrentBookmark(): Promise<string>;
getBookmarkForTime(timestamp: number | Date): Promise<string>;
Expand Down
2 changes: 1 addition & 1 deletion indexer/bin/indexer.js
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import { CID } from 'multiformats/cid'
* WALLET_SCREENING_BATCH_SIZE: 1 | 10
* WALLET_SCREENING_STALE_THRESHOLD_MS: 86400000 | 21600000
* DB: D1Database
* KV: KVNamespace
* INDEX_CACHE_KV: KVNamespace
* RETRY_QUEUE: Queue
* SECRET_HEADER_KEY: string
* SECRET_HEADER_VALUE: string
Expand Down
25 changes: 24 additions & 1 deletion indexer/lib/fwss-handlers.js
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ export async function handleFWSSDataSetCreated(
/**
* Handle Filecoin Warm Storage Service service termination
*
* @param {{ DB: D1Database }} env
* @param {{ DB: D1Database; INDEX_CACHE_KV: KVNamespace }} env
* @param {any} payload
* @throws {Error}
*/
Expand All @@ -78,4 +78,27 @@ export async function handleFWSSServiceTerminated(env, payload) {
)
.bind(String(payload.data_set_id))
.run()
await clearDataSetIndexCache(env, payload.data_set_id)
}

/**
* @param {{ DB: D1Database; INDEX_CACHE_KV: KVNamespace }} env
* @param {string | number} dataSetId
*/
async function clearDataSetIndexCache(env, dataSetId) {
const { results } = await env.DB.prepare(
`
SELECT data_sets.payer_address AS payerAddress, pieces.cid AS pieceCID
FROM data_sets
INNER JOIN pieces ON pieces.data_set_id = data_sets.id
WHERE data_sets.id = ?
`,
)
.bind(String(dataSetId))
.run()
await Promise.all(
results.map(async ({ payerAddress, pieceCID }) => {
await env.INDEX_CACHE_KV.delete(`${payerAddress}/${pieceCID}`)
}),
Comment on lines +100 to +102
Copy link
Contributor

@bajtos bajtos Oct 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this run into the limit of KV calls we can make per worker invocation? (I vaguely remember the number 1000.)

I think it's not likely for a long time, so we don't need to worry about that too much right now.

But it would be nice to have some visibility, so that we know early when we have a user approaching 1000 pieces stored. For example, we can have a Grafana chart with an alert where we show the value returned by a SQL query like the following one:

SELECT MAX(COUNT(*)) 
FROM pieces INNER JOIN data_sets ON pieces.data_set_id = data_sets.id
GROUP BY payer_address

I propose to open a follow-up tech-debt issue.

The question is whether we need this for the GA launch, and I don't think so.

Thoughts?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh right it can happen, when there are at least 1000 pieces in a data set for example. I don't see this case as unlikely.

I see two options going forward:

  • use queues
  • use the REST API, which has higher batch limits

I will evaluate both tomorrow

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, on second thought, I also concluded that the limit of 1000 pieces per dataset is too low, and we need to explore other options.

Considering the complexities, maybe we should put this performance optimisation on hold until the GA launch. WDYT?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sounds good, let's reevaluate

)
}
41 changes: 36 additions & 5 deletions indexer/lib/pdp-verifier-handlers.js
Original file line number Diff line number Diff line change
Expand Up @@ -27,20 +27,51 @@ export async function insertDataSetPiece(
}

/**
* @param {{ DB: D1Database }} env
* @param {{ DB: D1Database; INDEX_CACHE_KV: KVNamespace }} env
* @param {number | string} dataSetId
* @param {(number | string)[]} pieceIds
*/
export async function removeDataSetPieces(env, dataSetId, pieceIds) {
await clearDataSetPiecesIndexCache(env, dataSetId, pieceIds)
await env.DB.prepare(
`
DELETE FROM pieces
WHERE data_set_id = ? AND id IN (${new Array(pieceIds.length)
.fill(null)
.map(() => '?')
.join(', ')})
WHERE data_set_id = ? AND id IN (${sqlPlaceholders(pieceIds.length)})
`,
)
.bind(String(dataSetId), ...pieceIds.map(String))
.run()
}

/**
* @param {{ DB: D1Database; INDEX_CACHE_KV: KVNamespace }} env
* @param {number | string} dataSetId
* @param {(number | string)[]} pieceIds
*/
async function clearDataSetPiecesIndexCache(env, dataSetId, pieceIds) {
const { results } = await env.DB.prepare(
`
SELECT data_sets.payer_address AS payerAddress, pieces.cid AS pieceCID
FROM data_sets
INNER JOIN pieces ON pieces.data_set_id = data_sets.id
WHERE data_sets.id = ? AND pieces.id IN (${sqlPlaceholders(pieceIds.length)})
`,
)
.bind(String(dataSetId), ...pieceIds.map(String))
.run()
await Promise.all(
results.map(async ({ payerAddress, pieceCID }) => {
await env.INDEX_CACHE_KV.delete(`${payerAddress}/${pieceCID}`)
}),
)
}

/**
* @param {number} count
* @returns String
*/
const sqlPlaceholders = (count) =>
new Array(count)
.fill(null)
.map(() => '?')
.join(', ')
35 changes: 30 additions & 5 deletions indexer/lib/service-provider-registry-handlers.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import validator from 'validator'
const PRODUCT_TYPE_PDP = 0

/**
* @param {{ DB: D1Database }} env
* @param {{ DB: D1Database; INDEX_CACHE_KV: KVNamespace }} env
* @param {string | number} providerId
* @param {string | number} productType
* @param {string} serviceUrl
Expand Down Expand Up @@ -34,7 +34,7 @@ export async function handleProductAdded(
}

/**
* @param {{ DB: D1Database }} env
* @param {{ DB: D1Database; INDEX_CACHE_KV: KVNamespace }} env
* @param {string | number} providerId
* @param {string | number} productType
* @param {string} serviceUrl
Expand Down Expand Up @@ -65,7 +65,7 @@ export async function handleProductUpdated(
}

/**
* @param {{ DB: D1Database }} env
* @param {{ DB: D1Database; INDEX_CACHE_KV: KVNamespace }} env
* @param {string | number} providerId
* @param {string | number} productType
* @returns {Promise<Response>}
Expand All @@ -85,6 +85,7 @@ export async function handleProductRemoved(env, providerId, productType) {
return new Response('OK', { status: 200 })
}

await clearServiceProviderIndexCache(env, providerId)
const result = await env.DB.prepare(
`
DELETE FROM service_providers WHERE id = ?
Expand All @@ -99,7 +100,7 @@ export async function handleProductRemoved(env, providerId, productType) {
}

/**
* @param {{ DB: D1Database }} env
* @param {{ DB: D1Database; INDEX_CACHE_KV: KVNamespace }} env
* @param {string | number} providerId
* @returns {Promise<Response>}
*/
Expand All @@ -111,6 +112,7 @@ export async function handleProviderRemoved(env, providerId) {
return new Response('Bad Request', { status: 400 })
}

await clearServiceProviderIndexCache(env, providerId)
const result = await env.DB.prepare(
`
DELETE FROM service_providers WHERE id = ?
Expand All @@ -125,7 +127,7 @@ export async function handleProviderRemoved(env, providerId) {
}

/**
* @param {{ DB: D1Database }} env
* @param {{ DB: D1Database; INDEX_CACHE_KV: KVNamespace }} env
* @param {string | number} providerId
* @param {string} serviceUrl
* @returns {Promise<Response>}
Expand All @@ -141,6 +143,7 @@ async function handleProviderServiceUrlUpdate(env, providerId, serviceUrl) {
`Provider service url updated (providerId=${providerId}, serviceUrl=${serviceUrl})`,
)

await clearServiceProviderIndexCache(env, providerId)
await env.DB.prepare(
`
INSERT INTO service_providers (
Expand All @@ -156,3 +159,25 @@ async function handleProviderServiceUrlUpdate(env, providerId, serviceUrl) {
.run()
return new Response('OK', { status: 200 })
}

/**
* @param {{ DB: D1Database; INDEX_CACHE_KV: KVNamespace }} env
* @param {string | number} providerId
*/
async function clearServiceProviderIndexCache(env, providerId) {
const { results } = await env.DB.prepare(
`
SELECT data_sets.payer_address AS payerAddress, pieces.cid AS pieceCID
FROM data_sets
INNER JOIN pieces ON pieces.data_set_id = data_sets.id
WHERE data_sets.service_provider_id = ?
`,
)
.bind(String(providerId))
.run()
await Promise.all(
results.map(async ({ payerAddress, pieceCID }) => {
await env.INDEX_CACHE_KV.delete(`${payerAddress}/${pieceCID}`)
}),
)
}
Loading
Loading