Skip to content

Commit

Permalink
feat: sample retrieval tasks from eligible_deals (#486)
Browse files Browse the repository at this point in the history
Stop using the table `retrievable_deals`.

Signed-off-by: Miroslav Bajtoš <[email protected]>
  • Loading branch information
bajtos authored Jan 10, 2025
1 parent 0a002a7 commit 09259e0
Show file tree
Hide file tree
Showing 5 changed files with 32 additions and 27 deletions.
8 changes: 4 additions & 4 deletions api/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,7 @@ const redirect = (res, location) => {
const getSummaryOfEligibleDealsForMiner = async (_req, res, client, minerId) => {
/** @type {{rows: {client_id: string; deal_count: number}[]}} */
const { rows } = await client.query(`
SELECT client_id, COUNT(cid)::INTEGER as deal_count FROM retrievable_deals
SELECT client_id, COUNT(payload_cid)::INTEGER as deal_count FROM eligible_deals
WHERE miner_id = $1 AND expires_at > now()
GROUP BY client_id
ORDER BY deal_count DESC, client_id ASC
Expand All @@ -340,7 +340,7 @@ const getSummaryOfEligibleDealsForMiner = async (_req, res, client, minerId) =>
const getSummaryOfEligibleDealsForClient = async (_req, res, client, clientId) => {
/** @type {{rows: {miner_id: string; deal_count: number}[]}} */
const { rows } = await client.query(`
SELECT miner_id, COUNT(cid)::INTEGER as deal_count FROM retrievable_deals
SELECT miner_id, COUNT(payload_cid)::INTEGER as deal_count FROM eligible_deals
WHERE client_id = $1 AND expires_at > now()
GROUP BY miner_id
ORDER BY deal_count DESC, miner_id ASC
Expand All @@ -365,9 +365,9 @@ const getSummaryOfEligibleDealsForClient = async (_req, res, client, clientId) =
const getSummaryOfEligibleDealsForAllocator = async (_req, res, client, allocatorId) => {
/** @type {{rows: {client_id: string; deal_count: number}[]}} */
const { rows } = await client.query(`
SELECT ac.client_id, COUNT(cid)::INTEGER as deal_count
SELECT ac.client_id, COUNT(payload_cid)::INTEGER as deal_count
FROM allocator_clients ac
LEFT JOIN retrievable_deals rd ON ac.client_id = rd.client_id
LEFT JOIN eligible_deals rd ON ac.client_id = rd.client_id
WHERE ac.allocator_id = $1 AND expires_at > now()
GROUP BY ac.client_id
ORDER BY deal_count DESC, ac.client_id ASC
Expand Down
14 changes: 7 additions & 7 deletions api/lib/round-tracker.js
Original file line number Diff line number Diff line change
Expand Up @@ -311,18 +311,18 @@ async function defineTasksForRound (pgClient, sparkRoundNumber, taskCount) {
await pgClient.query(`
INSERT INTO retrieval_tasks (round_id, cid, miner_id, clients)
WITH selected AS (
SELECT cid, miner_id
FROM retrievable_deals
SELECT payload_cid, miner_id
FROM eligible_deals
WHERE expires_at > now()
ORDER BY random()
LIMIT $2
)
SELECT $1 as round_id, selected.cid, selected.miner_id, array_agg(client_id) as clients
SELECT $1 as round_id, selected.payload_cid as cid, selected.miner_id, array_agg(client_id) as clients
FROM selected
LEFT JOIN retrievable_deals
ON selected.cid = retrievable_deals.cid AND selected.miner_id = retrievable_deals.miner_id
WHERE retrievable_deals.expires_at > now()
GROUP BY selected.cid, selected.miner_id;
LEFT JOIN eligible_deals
ON selected.payload_cid = eligible_deals.payload_cid AND selected.miner_id = eligible_deals.miner_id
WHERE eligible_deals.expires_at > now()
GROUP BY selected.payload_cid, selected.miner_id;
`, [
sparkRoundNumber,
taskCount
Expand Down
16 changes: 10 additions & 6 deletions api/test/db.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,20 @@ describe('spark-api database', () => {

it('allows multiple storage deals for the same CID', async () => {
const DUMMY_CID = 'bafyone'
await client.query('DELETE FROM retrievable_deals WHERE cid = $1', [DUMMY_CID])
await client.query('DELETE FROM eligible_deals WHERE payload_cid = $1', [DUMMY_CID])

await client.query(`
INSERT INTO retrievable_deals (cid, miner_id, client_id, expires_at)
VALUES ($1, $2, 'f099', $3), ($1, $4, 'f099', $3)
INSERT INTO eligible_deals
(miner_id, client_id, piece_cid, piece_size, payload_cid, expires_at)
VALUES
($1, 'f099', $3, 256, $4, $5),
($2, 'f099', $3, 256, $4, $5)
`, [
DUMMY_CID,
'f010',
new Date(),
'f020'
'f020',
'baga12345',
DUMMY_CID,
new Date()
])
})
})
2 changes: 1 addition & 1 deletion api/test/round-tracker.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ describe('Round Tracker', () => {
pgClient = await pgPool.connect()
await migrate(pgClient)
await pgClient.query(`
UPDATE retrievable_deals SET expires_at = NOW() + INTERVAL '1 year'
UPDATE eligible_deals SET expires_at = NOW() + INTERVAL '1 year'
`)
})

Expand Down
19 changes: 10 additions & 9 deletions api/test/test.js
Original file line number Diff line number Diff line change
Expand Up @@ -660,16 +660,17 @@ describe('Routes', () => {
describe('summary of eligible deals', () => {
before(async () => {
await client.query(`
INSERT INTO retrievable_deals (cid, miner_id, client_id, expires_at)
INSERT INTO eligible_deals
(payload_cid, miner_id, client_id, piece_cid, piece_size, expires_at)
VALUES
('bafyone', 'f0210', 'f0800', '2100-01-01'),
('bafyone', 'f0220', 'f0800', '2100-01-01'),
('bafytwo', 'f0220', 'f0810', '2100-01-01'),
('bafyone', 'f0230', 'f0800', '2100-01-01'),
('bafytwo', 'f0230', 'f0800', '2100-01-01'),
('bafythree', 'f0230', 'f0810', '2100-01-01'),
('bafyfour', 'f0230', 'f0820', '2100-01-01'),
('bafyexpired', 'f0230', 'f0800', '2020-01-01')
('bafyone', 'f0210', 'f0800', 'bagaone', 256, '2100-01-01'),
('bafyone', 'f0220', 'f0800', 'bagaone', 256, '2100-01-01'),
('bafytwo', 'f0220', 'f0810', 'bagatwo', 256, '2100-01-01'),
('bafyone', 'f0230', 'f0800', 'bagaone', 256, '2100-01-01'),
('bafytwo', 'f0230', 'f0800', 'bagatwo', 256, '2100-01-01'),
('bafythree', 'f0230', 'f0810', 'bagathree', 256, '2100-01-01'),
('bafyfour', 'f0230', 'f0820', 'bagafour', 256, '2100-01-01'),
('bafyexpired', 'f0230', 'f0800', 'bagaexpired', 256, '2020-01-01')
ON CONFLICT DO NOTHING
`)

Expand Down

0 comments on commit 09259e0

Please sign in to comment.