Skip to content

Commit

Permalink
update log stream and rename fetcher
Browse files Browse the repository at this point in the history
  • Loading branch information
howardchung committed Dec 17, 2024
1 parent f3e9435 commit ba29648
Show file tree
Hide file tree
Showing 7 changed files with 19 additions and 21 deletions.
2 changes: 1 addition & 1 deletion fetcher/getPlayerArchive.ts
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ async function readArchivedPlayerMatches(
return arr;
}

export class PlayerMatchesFetcher extends PlayerFetcher<ParsedPlayerMatch[]> {
export class PlayerArchiveFetcher extends PlayerFetcher<ParsedPlayerMatch[]> {
readData = readArchivedPlayerMatches;
getOrFetchData = async (accountId: number) => {
await doArchivePlayerMatches(accountId);
Expand Down
4 changes: 3 additions & 1 deletion global.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -333,7 +333,9 @@ type ReliableQueueRow = {
priority: number;
};

type ReliableQueueOptions = { attempts?: number; priority?: number };
type JobMetadata = { attempts: number, timestamp: Date, priority: number };

type ReliableQueueOptions = { attempts?: number; priority?: number, caller?: string };

type ProPlayer = {
name: string;
Expand Down
1 change: 1 addition & 0 deletions routes/spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1521,6 +1521,7 @@ Without a key, you can make 2,000 free calls per day at a rate limit of 60 reque
{
attempts: 1,
priority,
caller: 'web',
},
);
return res.json({
Expand Down
17 changes: 4 additions & 13 deletions store/queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ export async function runQueue(
export async function runReliableQueue(
queueName: QueueName,
parallelism: number,
processor: (job: any) => Promise<boolean>,
processor: (job: any, metadata: JobMetadata) => Promise<boolean>,
getCapacity?: () => Promise<number>,
) {
const executor = async (i: number) => {
Expand Down Expand Up @@ -82,7 +82,7 @@ export async function runReliableQueue(
const job = result && result.rows && result.rows[0];
if (job) {
try {
const success = await processor(job.data);
const success = await processor(job.data, { priority: job.priority, attempts: job.attempts, timestamp: job.timestamp });
// If the processor returns true, it's successful and we should delete the job and then commit
if (success || job.attempts <= 0) {
await consumer.query('DELETE FROM queue WHERE id = $1', [job.id]);
Expand All @@ -99,15 +99,6 @@ export async function runReliableQueue(
} else if (job.attempts === 0) {
result = 'attempted';
}
const message = c.blue(
`[${new Date().toISOString()}] [queue: ${queueName}] [${result}] [priority: ${
job.priority
}] [attempts: ${job.attempts}] [queued: ${moment(
job.timestamp,
).fromNow()}]`,
);
console.log(message);
redis.publish('queue', message);
} catch (e) {
// If the processor crashes unexpectedly, we should rollback the transaction to not consume an attempt
await consumer.query('ROLLBACK');
Expand Down Expand Up @@ -162,9 +153,9 @@ export async function addReliableJob(
const job = rows[0];
if (job) {
const message = c.magenta(
`[${new Date().toISOString()}] [queue] [add: ${name}] [priority: ${
`[${new Date().toISOString()}] [${options.caller}] [queue: ${name}] [pri: ${
job.priority
}] [attempts: ${job.attempts}] ${name === 'parse' ? data.match_id : ''}`,
}] [att: ${job.attempts}] ${name === 'parse' ? data.match_id : ''}`,
);
redis.publish('queue', message);
}
Expand Down
9 changes: 6 additions & 3 deletions svc/parser.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,14 @@ import { ApiFetcher } from '../fetcher/getApiData';
import { ParsedFetcher } from '../fetcher/getParsedData';
import { GcdataFetcher } from '../fetcher/getGcData';
import { getPGroup } from '../util/pgroup';
import moment from 'moment';

const apiFetcher = new ApiFetcher();
const gcFetcher = new GcdataFetcher();
const parsedFetcher = new ParsedFetcher();
const { PARSER_PARALLELISM } = config;

async function parseProcessor(job: ParseJob) {
async function parseProcessor(job: ParseJob, metadata: JobMetadata) {
const start = Date.now();
let apiTime = 0;
let gcTime = 0;
Expand Down Expand Up @@ -149,9 +150,11 @@ async function parseProcessor(job: ParseJob) {
const message = c[colors[type]](
`[${new Date().toISOString()}] [parser] [${type}: ${
end - start
}ms] [api: ${apiTime}ms] [gcdata: ${gcTime}ms] [parse: ${parseTime}ms] ${
}ms] [api: ${apiTime}ms] [gcdata: ${gcTime}ms] [parse: ${parseTime}ms] [queued: ${moment(
metadata.timestamp,
).fromNow()}] [pri: ${metadata.priority}] [att: ${metadata.attempts}] ${
job.match_id
}: ${error ?? ''}`,
} ${error ?? ''}`,
);
redis.publish('parsed', message);
console.log(message);
Expand Down
6 changes: 3 additions & 3 deletions util/buildPlayer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ import { deserialize, pick, redisCount, redisCountDistinct } from './utility';
import { gzipSync, gunzipSync } from 'zlib';
import { cacheableCols } from '../routes/playerFields';
import { promises as fs } from 'fs';
import { PlayerMatchesFetcher } from '../fetcher/getPlayerArchive';
import { PlayerArchiveFetcher } from '../fetcher/getPlayerArchive';

const playerMatchesFetcher = new PlayerMatchesFetcher();
const playerArchiveFetcher = new PlayerArchiveFetcher();

export async function getPlayerMatches(
accountId: number,
Expand Down Expand Up @@ -65,7 +65,7 @@ export async function getPlayerMatchesWithMetadata(
// if dbLimit (recentMatches), don't use archive
const archivedMatches =
config.ENABLE_PLAYER_ARCHIVE && !queryObj.dbLimit
? await playerMatchesFetcher.readData(accountId)
? await playerArchiveFetcher.readData(accountId)
: [];
const localLength = localMatches.length;
const archivedLength = archivedMatches.length;
Expand Down
1 change: 1 addition & 0 deletions util/insert.ts
Original file line number Diff line number Diff line change
Expand Up @@ -652,6 +652,7 @@ export async function insertMatch(
{
priority,
attempts,
caller: options.origin,
},
);
return job;
Expand Down

0 comments on commit ba29648

Please sign in to comment.