Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file modified bun.lockb
Binary file not shown.
8 changes: 8 additions & 0 deletions constants/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
const RECALCULATE = {
LAST_NOT_MARKED_AS_PROCESSED_BATCH: 3,
FIRST_ASTROPORT_CORRECT_EXCHANGE_RATE_HEIGHT: 13464964,
FIRST_MARS_LP_CORRECT_HEIGHT: 14036905,
FIRST_DEMEX_CORRECT_HEIGHT: 15843825,
};

export { RECALCULATE };
174 changes: 159 additions & 15 deletions crawler/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import { getSigningCosmWasmClient } from '../lib/stargate';
import { validateOnChainContractInfo } from '../lib/validations/config';
import { getValidData } from '../types/utils';
import { dropletRuleSchema } from '../types/config/dropletRule';
import { RECALCULATE } from '../constants';

const program = new Command();
program.option('--config <config>', 'Config file path', 'config.toml');
Expand Down Expand Up @@ -183,6 +184,11 @@ program
.argument('<protocol_id>', 'protocol to crawl')
.description('Process the specified protocol')
.option('-b --batch_id <batch_id>', 'Batch ID to process')
.option(
'-r --recalculate',
'Mark if publish needs to recalculate points',
false,
)
.action(async (protocolId: string, options) => {
// Get the batch ID and height of the task
const { batchId, height } = (() => {
Expand Down Expand Up @@ -231,7 +237,10 @@ program
// Processing the source
const sourceObj = new sources[
config.protocols[protocolId].source as keyof typeof sources
](config.protocols[protocolId].rpc, logger, config.protocols[protocolId]);
](config.protocols[protocolId].rpc, logger, {
...config.protocols[protocolId],
recalculate: options.recalculate,
});
await sourceObj.getUsersBalances(
height,
multipliers,
Expand Down Expand Up @@ -270,8 +279,13 @@ program
program
.command('finish')
.description('Calculate points for users and finish the task')
.option('-b, --batch_id <batch_id>', 'batch ID to finish')
.option('-b, --batch_id <batch_id>', 'Batch ID to finish')
.option('-p --publish', 'Publish the points to the blockchain')
.option(
'-r --recalculate',
'Mark if publish needs to recalculate points',
false,
)
.action((options) => {
const batchId = (() => {
if (options.batch_id === undefined) {
Expand Down Expand Up @@ -358,6 +372,33 @@ program
[batchId],
);

if (options.recalculate) {
const batchChangesQuery = db.query<
{
address: string;
batch_id: number;
points: number;
},
[number]
>(
'SELECT address, batch_id, points FROM changes WHERE batch_id = ? AND points <> 0',
);
const batchChanges = batchChangesQuery.all(batchId);
if (batchChanges) {
batchChanges.forEach((change) => {
db.exec(`
INSERT INTO user_points (batch_id, address, asset_id, points)
VALUES (${batchId}, '${change.address}', 'dATOM', ${change.points})
ON CONFLICT (batch_id, address, asset_id) DO UPDATE SET
points = user_points.points + excluded.points
`);
});
logger.debug(
`${batchChanges.length} addresses user points were updated manually while recalculating`,
);
}
}

db.exec<[number]>(
'UPDATE tasks SET status = "processed" WHERE batch_id = ?',
[batchId],
Expand All @@ -372,7 +413,18 @@ program
const all = query.all('new');
const batchIds = all.map((row) => row.batch_id);
logger.debug('Batch IDs: %s', batchIds.join(','));
const firstTs = all[0].ts;

let firstTs = all[0].ts;
const lastBatchId = batchIds.at(-1);
if (
options.recalculate &&
lastBatchId &&
lastBatchId <= RECALCULATE.LAST_NOT_MARKED_AS_PROCESSED_BATCH
) {
firstTs = all[lastBatchId - 1].ts;
logger.debug('Last batch id is %s', lastBatchId);
logger.debug('First ts is %s', firstTs);
}

db.exec(`UPDATE user_points_public SET change = 0`);

Expand Down Expand Up @@ -400,27 +452,28 @@ program
INSERT OR IGNORE INTO user_points_public
(address, asset_id, points, "change", prev_points_l1, prev_points_l2, points_l1, points_l2, place, prev_place)
SELECT
r.referrer address,
COALESCE(als.address, r.referrer) address,
CASE
WHEN INSTR(s.asset_id, '_') > 0
THEN SUBSTR(s.asset_id, 1, INSTR(s.asset_id, '_') - 1)
ELSE s.asset_id END AS asset_id,
0 points,
0 change,
0 prev_points_l1,
0 prev_points_l2,
0 points_l1,
0 points_l2,
0 place,
0 prev_place
0 prev_points_l2,
0 points_l1,
0 points_l2,
0 place,
0 prev_place
FROM referrals r
LEFT JOIN aliases als ON (als.alias = r.referrer)
LEFT JOIN schedule s
GROUP BY address;
`,
);

// calc L1, L2 points
const stmt = db.prepare<null, { $ts: number }>(
let stmt = db.prepare<null, { $ts: number }>(
`
UPDATE
user_points_public
Expand All @@ -432,10 +485,11 @@ program
FLOOR(SUM(upp1.change) * ${config.l1_percent / 100})
FROM
referrals r
LEFT JOIN aliases als ON (als.alias = r.referrer)
LEFT JOIN user_points_public upp1 ON (upp1.address = r.referral AND r.ts <= $ts)
LEFT JOIN user_kyc k ON (k.address = r.referrer AND k.ts <= $ts)
LEFT JOIN user_kyc k ON (k.address = COALESCE(als.address, r.referrer) AND k.ts <= $ts)
WHERE
r.referrer = user_points_public.address AND
COALESCE(als.address, r.referrer) = user_points_public.address AND
k.address IS NOT NULL
),0),
points_l2 = COALESCE(points_l2,0) + COALESCE((
Expand All @@ -444,14 +498,58 @@ program
FROM
referrals r2
LEFT JOIN referrals r3 ON (r3.referrer = r2.referral AND r3.ts <= $ts)
LEFT JOIN aliases als ON (als.alias = r2.referrer)
LEFT JOIN user_points_public upp2 ON (upp2.address = r3.referral AND r3.ts <= $ts)
LEFT JOIN user_kyc k2 ON (k2.address = r2.referrer AND k2.ts <= $ts)
LEFT JOIN user_kyc k2 ON (k2.address = COALESCE(als.address, r2.referrer) AND k2.ts <= $ts)
WHERE
r2.referrer = user_points_public.address AND
COALESCE(als.address, r2.referrer) = user_points_public.address AND
k2.address IS NOT NULL
),0)
`,
);

if (options.recalculate) {
const query = db.query<{ height: number }, number>(
`SELECT height FROM tasks WHERE ts = $ts LIMIT 1`,
);
const batchHeight = query.all(firstTs)[0].height;

stmt = db.prepare<null, { $ts: number }>(
`
UPDATE
user_points_public
SET
prev_points_l1 = points_l1,
prev_points_l2 = points_l2,
points_l1 = COALESCE(points_l1,0) + COALESCE((
SELECT
FLOOR(SUM(upp1.change) * ${config.l1_percent / 100})
FROM
referrals r
LEFT JOIN aliases als ON (als.alias = r.referrer)
LEFT JOIN user_points_public upp1 ON (upp1.address = r.referral AND r.height <= ${batchHeight})
LEFT JOIN user_kyc k ON (k.address = COALESCE(als.address, r.referrer) AND k.ts <= $ts)
WHERE
COALESCE(als.address, r.referrer) = user_points_public.address AND
k.address IS NOT NULL
),0),
points_l2 = COALESCE(points_l2,0) + COALESCE((
SELECT
FLOOR(SUM(upp2.change) * ${config.l2_percent / 100})
FROM
referrals r2
LEFT JOIN referrals r3 ON (r3.referrer = r2.referral AND r3.height <= ${batchHeight})
LEFT JOIN aliases als ON (als.alias = r2.referrer)
LEFT JOIN user_points_public upp2 ON (upp2.address = r3.referral AND r3.height <= ${batchHeight})
LEFT JOIN user_kyc k2 ON (k2.address = COALESCE(als.address, r2.referrer) AND k2.ts <= $ts)
WHERE
COALESCE(als.address, r2.referrer) = user_points_public.address AND
k2.address IS NOT NULL
),0)
`,
);
}

stmt.run({ $ts: firstTs });

db.exec(
Expand All @@ -473,9 +571,55 @@ program
place = (SELECT place FROM ranked WHERE address = user_points_public.address)`,
);

let processedBatchesIds = batchIds;
if (options.recalculate) {
const lastBatchId = processedBatchesIds.at(-1);
if (lastBatchId) {
if (lastBatchId < RECALCULATE.LAST_NOT_MARKED_AS_PROCESSED_BATCH)
processedBatchesIds = [];
if (lastBatchId >= RECALCULATE.LAST_NOT_MARKED_AS_PROCESSED_BATCH)
processedBatchesIds = processedBatchesIds.concat(
Array.from(
{ length: RECALCULATE.LAST_NOT_MARKED_AS_PROCESSED_BATCH },
(_, i) => i,
),
);
}
}
db.exec(
`UPDATE batches SET status="processed" WHERE batch_id IN (${batchIds.join(',')})`,
`UPDATE batches SET status="processed" WHERE batch_id IN (${processedBatchesIds})`,
);

if (options.recalculate) {
const batchChangesQuery = db.query<
{
address: string;
batch_id: number;
points_l1: number;
points_l2: number;
},
[number]
>(
'SELECT address, batch_id, points_l1, points_l2 FROM changes WHERE batch_id = ? AND (points_l1 <> 0 OR points_l2 <> 0)',
);
const batchChanges = batchChangesQuery.all(batchId);
if (batchChanges) {
batchChanges.forEach((change) => {
db.exec(`
UPDATE
user_points_public
SET
points_l1 = COALESCE(points_l1,0) + COALESCE(${change.points_l1}, 0),
points_l2 = COALESCE(points_l2,0) + COALESCE(${change.points_l2}, 0)
WHERE
address = '${change.address}'
`);
});
logger.debug(
`${batchChanges.length} addresses referral points were updated manually while recalculating`,
);
}
}
}
});

Expand Down
11 changes: 10 additions & 1 deletion lib/sources/astroport/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,11 @@ import {
QuerySupplyOfResponse,
} from 'cosmjs-types/cosmos/bank/v1beta1/query';
import { PageRequest } from 'cosmjs-types/cosmos/base/query/v1beta1/pagination';
import { RECALCULATE } from '../../../constants';

export default class AstroportSource implements SourceInterface {
rpc: string;
recalculate: boolean;
concurrencyLimit: number;
paginationLimit: number;
logger: Logger<never>;
Expand Down Expand Up @@ -41,6 +43,8 @@ export default class AstroportSource implements SourceInterface {
}
this.assets = params.assets;

this.recalculate = params.recalculate || false;

this.rpc = rpc;
this.concurrencyLimit = parseInt(params.concurrency_limit || '3', 10);
this.paginationLimit = parseInt(params.paginationLimit || '30', 10);
Expand Down Expand Up @@ -205,9 +209,14 @@ export default class AstroportSource implements SourceInterface {
this.logger.debug(`Exchange rate ${exchangeRate}`);
let nextKey: undefined | Uint8Array = undefined;
do {
const multiplier =
this.recalculate &&
height < RECALCULATE.FIRST_ASTROPORT_CORRECT_EXCHANGE_RATE_HEIGHT
? multipliers[assetId] || 1
: (multipliers[assetId] || 1) * exchangeRate;
const { results, nextKey: newNextKey } = await this.getDenomBalances(
lpToken,
(multipliers[assetId] || 1) * exchangeRate,
multiplier,
height,
nextKey,
);
Expand Down
18 changes: 14 additions & 4 deletions lib/sources/demex/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,13 @@ import { Logger } from 'pino';
import { SourceInterface } from '../../../types/sources/source';
import { Tendermint37Client } from '@cosmjs/tendermint-rpc';
import { CbOnUserBalances } from '../../../types/sources/cbOnUserBalances';
import { RECALCULATE } from '../../../constants';

type DemexCoin = { denom: string; amount: number };

export default class DemexSource implements SourceInterface {
rpc: string;
recalculate: boolean;
insightsApi: string;
logger: Logger<never>;
assets: Record<string, { denom: string }> = {};
Expand All @@ -25,6 +27,7 @@ export default class DemexSource implements SourceInterface {
throw new Error('No assets configured in params');
}
this.assets = params.assets;
this.recalculate = params.recalculate || false;
this.rpc = rpc;
this.insightsApi = params.insights_api;
logger.info('Demex source initialized');
Expand Down Expand Up @@ -93,10 +96,17 @@ export default class DemexSource implements SourceInterface {
}

const borrowed = entry.borrowed.filter((o) => o.amount > 0).length > 0;
const balance =
supplied.amount *
(borrowed || asset.includes('_') ? multiplier : 1) *
1_000_000;
let finMultiplier = borrowed || asset.includes('_') ? multiplier : 1;
if (
this.recalculate &&
height < RECALCULATE.FIRST_ASTROPORT_CORRECT_EXCHANGE_RATE_HEIGHT
) {
finMultiplier =
entry.borrowed.filter((o) => o.amount > 0).length > 0
? multiplier
: 1;
}
const balance = supplied.amount * finMultiplier * 1_000_000;
if (balance > 0) {
out.push({
address,
Expand Down
Loading