Skip to content

Commit

Permalink
recheck on each iteration
Browse files Browse the repository at this point in the history
  • Loading branch information
howardchung committed Dec 14, 2024
1 parent 94165bf commit 34ffecd
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 32 deletions.
15 changes: 7 additions & 8 deletions svc/backfill.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,15 @@ const blobArchive = new Archive('blob');

// We can stop at approximately 6330000000 (Jan 3 2024)
const stop = Number(process.env.BACKFILL_STOP) || 6330000000;
async function scanApi(seqNum: number) {
let nextSeqNum = seqNum;
async function scanApi() {
while (true) {
if (nextSeqNum > stop) {
const seqNum = Number(fs.readFileSync('./match_seq_num.txt')) || 0;
if (seqNum > stop) {
process.exit(0);
}
const begin = Date.now();
const container = generateJob('api_sequence', {
start_at_match_seq_num: nextSeqNum,
start_at_match_seq_num: seqNum,
});
let data = null;
try {
Expand All @@ -42,7 +42,7 @@ async function scanApi(seqNum: number) {
}
const resp =
data && data.result && data.result.matches ? data.result.matches : [];
console.log('[API] match_seq_num:%s, matches:%s', nextSeqNum, resp.length);
console.log('[API] match_seq_num:%s, matches:%s', seqNum, resp.length);
// write to blobstore, process the blob using same function as insertMatch
try {
const insertResult = await Promise.all(
Expand All @@ -58,7 +58,7 @@ async function scanApi(seqNum: number) {
}),
);
if (resp.length) {
nextSeqNum = resp[resp.length - 1].match_seq_num + 1;
const nextSeqNum = resp[resp.length - 1].match_seq_num + 1;
console.log('next_seq_num: %s', nextSeqNum);
// Completed inserting matches on this page so update
fs.writeFileSync('./match_seq_num.txt', nextSeqNum.toString());
Expand All @@ -76,7 +76,6 @@ async function scanApi(seqNum: number) {
}

async function start() {
let seqNum = Number(fs.readFileSync('./match_seq_num.txt')) || 0;
await scanApi(seqNum);
await scanApi();
}
start();
60 changes: 37 additions & 23 deletions svc/scanner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,24 +16,25 @@ const PAGE_SIZE = 100;
const SCANNER_WAIT = 5000;
const isSecondary = Boolean(Number(config.SCANNER_OFFSET));

async function scanApi(seqNum: number) {
async function scanApi() {
const offset = Number(config.SCANNER_OFFSET);
let nextSeqNum = seqNum - offset;
while (true) {
const seqNum = await getCurrentSeqNum() - offset;
if (offset) {
const current = await getCurrentSeqNum();
if (nextSeqNum > current - offset) {
if (seqNum > current - offset) {
// Secondary scanner is catching up too much. Wait and try again
console.log('secondary scanner waiting', nextSeqNum, current, offset);
console.log('secondary scanner waiting', seqNum, current, offset);
await new Promise((resolve) => setTimeout(resolve, SCANNER_WAIT));
continue;
}
}
const start = Date.now();
const apiHosts = await getApiHosts();
const parallelism = Math.min(apiHosts.length, API_KEYS.length);
const scannerWaitCatchup = SCANNER_WAIT / parallelism;
const container = generateJob('api_sequence', {
start_at_match_seq_num: nextSeqNum,
start_at_match_seq_num: seqNum,
});
let data = null;
try {
Expand All @@ -43,13 +44,17 @@ async function scanApi(seqNum: number) {
});
} catch (err: any) {
console.log(err);
if (err?.result?.statusDetail === 'Error retrieving match data.') {
redisCount('skip_seq_num');
// Could increment nextSeqNum by 1 to avoid stalling (maybe after some number of retries?)
}
// failed, try the same number again
await new Promise((resolve) => setTimeout(resolve, SCANNER_WAIT));
continue;
}
const resp =
data && data.result && data.result.matches ? data.result.matches : [];
console.log('[API] match_seq_num:%s, matches:%s', nextSeqNum, resp.length);
console.log('[API] match_seq_num:%s, matches:%s', seqNum, resp.length);
console.time('insert');
await Promise.all(
resp.map(async (match: ApiMatch) => {
Expand Down Expand Up @@ -86,21 +91,24 @@ async function scanApi(seqNum: number) {
console.timeEnd('insert');
// Completed inserting matches on this page so update redis
if (resp.length) {
nextSeqNum = resp[resp.length - 1].match_seq_num + 1;
const nextSeqNum = resp[resp.length - 1].match_seq_num + 1;
console.log('next_seq_num: %s', nextSeqNum);
if (!isSecondary) {
// Only set match seq num on primary
await db.raw(
'INSERT INTO last_seq_num(match_seq_num) VALUES (?) ON CONFLICT DO NOTHING',
[nextSeqNum],
);
}
}
if (!isSecondary) {
// Only set match seq num on primary
await db.raw(
'INSERT INTO last_seq_num(match_seq_num) VALUES (?) ON CONFLICT DO NOTHING',
[nextSeqNum],
);
}
const end = Date.now();
const elapsed = end - start;
const adjustedWait = Math.max((resp.length < PAGE_SIZE ? SCANNER_WAIT : scannerWaitCatchup) - elapsed, 0);
// If not a full page, delay the next iteration
await new Promise((resolve) =>
setTimeout(
resolve,
resp.length < PAGE_SIZE ? SCANNER_WAIT : scannerWaitCatchup,
adjustedWait,
),
);
}
Expand All @@ -112,14 +120,20 @@ async function getCurrentSeqNum(): Promise<number> {
}

async function start() {
let numResult = await getCurrentSeqNum();
if (!numResult && config.NODE_ENV === 'development') {
// Never do this in production to avoid skipping sequence number if we didn't pull .env properly
const container = generateJob('api_history', {});
// Just get the approximate current seq num
const data = await getSteamAPIData({ url: container.url });
numResult = data.result.matches[0].match_seq_num;
if (config.NODE_ENV === 'development') {
let numResult = await getCurrentSeqNum();
if (!numResult) {
// Never do this in production to avoid skipping sequence number if we didn't pull .env properly
const container = generateJob('api_history', {});
// Just get the approximate current seq num
const data = await getSteamAPIData({ url: container.url });
numResult = data.result.matches[0].match_seq_num;
await db.raw(
'INSERT INTO last_seq_num(match_seq_num) VALUES (?) ON CONFLICT DO NOTHING',
[numResult],
);
}
}
await scanApi(numResult);
await scanApi();
}
start();
2 changes: 1 addition & 1 deletion util/buildStatus.ts
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ export async function buildStatus() {
web_crash_last_day: async () => countDay('web_crash'),
secondary_scanner_last_day: async () => countDay('secondary_scanner'),
steam_api_backfill_last_day: async () => countDay('steam_api_backfill'),
// skip_seq_num_last_day: async () => countDay('skip_seq_num'),
skip_seq_num_last_day: async () => countDay('skip_seq_num'),
// gen_api_key_invalid_last_day: async () => getRedisCountDay('gen_api_key_invalid'),
};
return parallelPromise<Record<string, number>>(counts);
Expand Down

0 comments on commit 34ffecd

Please sign in to comment.