From 34ffecdc10de122ab059e219e946a47e9fea3f49 Mon Sep 17 00:00:00 2001 From: Ubuntu Date: Sat, 14 Dec 2024 15:16:58 +0000 Subject: [PATCH] recheck on each iteration --- svc/backfill.ts | 15 ++++++------ svc/scanner.ts | 60 ++++++++++++++++++++++++++++----------------- util/buildStatus.ts | 2 +- 3 files changed, 45 insertions(+), 32 deletions(-) diff --git a/svc/backfill.ts b/svc/backfill.ts index 3235018e4..db41009c0 100644 --- a/svc/backfill.ts +++ b/svc/backfill.ts @@ -18,15 +18,15 @@ const blobArchive = new Archive('blob'); // We can stop at approximately 6330000000 (Jan 3 2024) const stop = Number(process.env.BACKFILL_STOP) || 6330000000; -async function scanApi(seqNum: number) { - let nextSeqNum = seqNum; +async function scanApi() { while (true) { - if (nextSeqNum > stop) { + const seqNum = Number(fs.readFileSync('./match_seq_num.txt')) || 0; + if (seqNum > stop) { process.exit(0); } const begin = Date.now(); const container = generateJob('api_sequence', { - start_at_match_seq_num: nextSeqNum, + start_at_match_seq_num: seqNum, }); let data = null; try { @@ -42,7 +42,7 @@ async function scanApi(seqNum: number) { } const resp = data && data.result && data.result.matches ? data.result.matches : []; - console.log('[API] match_seq_num:%s, matches:%s', nextSeqNum, resp.length); + console.log('[API] match_seq_num:%s, matches:%s', seqNum, resp.length); // write to blobstore, process the blob using same function as insertMatch try { const insertResult = await Promise.all( @@ -58,7 +58,7 @@ async function scanApi(seqNum: number) { }), ); if (resp.length) { - nextSeqNum = resp[resp.length - 1].match_seq_num + 1; + const nextSeqNum = resp[resp.length - 1].match_seq_num + 1; console.log('next_seq_num: %s', nextSeqNum); // Completed inserting matches on this page so update fs.writeFileSync('./match_seq_num.txt', nextSeqNum.toString()); @@ -76,7 +76,6 @@ async function scanApi(seqNum: number) { } async function start() { - let seqNum = Number(fs.readFileSync('./match_seq_num.txt')) || 0; - await scanApi(seqNum); + await scanApi(); } start(); diff --git a/svc/scanner.ts b/svc/scanner.ts index 6de526624..b6e7cca06 100755 --- a/svc/scanner.ts +++ b/svc/scanner.ts @@ -16,24 +16,25 @@ const PAGE_SIZE = 100; const SCANNER_WAIT = 5000; const isSecondary = Boolean(Number(config.SCANNER_OFFSET)); -async function scanApi(seqNum: number) { +async function scanApi() { const offset = Number(config.SCANNER_OFFSET); - let nextSeqNum = seqNum - offset; while (true) { + const seqNum = await getCurrentSeqNum() - offset; if (offset) { const current = await getCurrentSeqNum(); - if (nextSeqNum > current - offset) { + if (seqNum > current - offset) { // Secondary scanner is catching up too much. Wait and try again - console.log('secondary scanner waiting', nextSeqNum, current, offset); + console.log('secondary scanner waiting', seqNum, current, offset); await new Promise((resolve) => setTimeout(resolve, SCANNER_WAIT)); continue; } } + const start = Date.now(); const apiHosts = await getApiHosts(); const parallelism = Math.min(apiHosts.length, API_KEYS.length); const scannerWaitCatchup = SCANNER_WAIT / parallelism; const container = generateJob('api_sequence', { - start_at_match_seq_num: nextSeqNum, + start_at_match_seq_num: seqNum, }); let data = null; try { @@ -43,13 +44,17 @@ async function scanApi(seqNum: number) { }); } catch (err: any) { console.log(err); + if (err?.result?.statusDetail === 'Error retrieving match data.') { + redisCount('skip_seq_num'); + // Could increment nextSeqNum by 1 to avoid stalling (maybe after some number of retries?) + } // failed, try the same number again await new Promise((resolve) => setTimeout(resolve, SCANNER_WAIT)); continue; } const resp = data && data.result && data.result.matches ? data.result.matches : []; - console.log('[API] match_seq_num:%s, matches:%s', nextSeqNum, resp.length); + console.log('[API] match_seq_num:%s, matches:%s', seqNum, resp.length); console.time('insert'); await Promise.all( resp.map(async (match: ApiMatch) => { @@ -86,21 +91,24 @@ async function scanApi(seqNum: number) { console.timeEnd('insert'); // Completed inserting matches on this page so update redis if (resp.length) { - nextSeqNum = resp[resp.length - 1].match_seq_num + 1; + const nextSeqNum = resp[resp.length - 1].match_seq_num + 1; console.log('next_seq_num: %s', nextSeqNum); + if (!isSecondary) { + // Only set match seq num on primary + await db.raw( + 'INSERT INTO last_seq_num(match_seq_num) VALUES (?) ON CONFLICT DO NOTHING', + [nextSeqNum], + ); + } } - if (!isSecondary) { - // Only set match seq num on primary - await db.raw( - 'INSERT INTO last_seq_num(match_seq_num) VALUES (?) ON CONFLICT DO NOTHING', - [nextSeqNum], - ); - } + const end = Date.now(); + const elapsed = end - start; + const adjustedWait = Math.max((resp.length < PAGE_SIZE ? SCANNER_WAIT : scannerWaitCatchup) - elapsed, 0); // If not a full page, delay the next iteration await new Promise((resolve) => setTimeout( resolve, - resp.length < PAGE_SIZE ? SCANNER_WAIT : scannerWaitCatchup, + adjustedWait, ), ); } @@ -112,14 +120,20 @@ async function getCurrentSeqNum(): Promise { } async function start() { - let numResult = await getCurrentSeqNum(); - if (!numResult && config.NODE_ENV === 'development') { - // Never do this in production to avoid skipping sequence number if we didn't pull .env properly - const container = generateJob('api_history', {}); - // Just get the approximate current seq num - const data = await getSteamAPIData({ url: container.url }); - numResult = data.result.matches[0].match_seq_num; + if (config.NODE_ENV === 'development') { + let numResult = await getCurrentSeqNum(); + if (!numResult) { + // Never do this in production to avoid skipping sequence number if we didn't pull .env properly + const container = generateJob('api_history', {}); + // Just get the approximate current seq num + const data = await getSteamAPIData({ url: container.url }); + numResult = data.result.matches[0].match_seq_num; + await db.raw( + 'INSERT INTO last_seq_num(match_seq_num) VALUES (?) ON CONFLICT DO NOTHING', + [numResult], + ); + } } - await scanApi(numResult); + await scanApi(); } start(); diff --git a/util/buildStatus.ts b/util/buildStatus.ts index 5cc1c6fd2..adcbe4774 100644 --- a/util/buildStatus.ts +++ b/util/buildStatus.ts @@ -177,7 +177,7 @@ export async function buildStatus() { web_crash_last_day: async () => countDay('web_crash'), secondary_scanner_last_day: async () => countDay('secondary_scanner'), steam_api_backfill_last_day: async () => countDay('steam_api_backfill'), - // skip_seq_num_last_day: async () => countDay('skip_seq_num'), + skip_seq_num_last_day: async () => countDay('skip_seq_num'), // gen_api_key_invalid_last_day: async () => getRedisCountDay('gen_api_key_invalid'), }; return parallelPromise>(counts);