From f01f6be27975cc22f3469f1ecd1a434b66901160 Mon Sep 17 00:00:00 2001 From: youngkidwarrior Date: Sat, 4 Oct 2025 01:33:21 -0700 Subject: [PATCH] workflows: revert hodler DV activity to always-upsert (pre-change) Why: Recent gating added complexity and caused overpopulation side effects. Revert to the simpler behavior while we decide on DB-trigger-only vs app-level DV upsert. Optimizing for speed likely favors DB triggers. Next: - Option A (recommended for speed/consistency): remove this activity and rely solely on the DB trigger (already implemented) to drive DV. - Option B: keep this always-upsert activity and remove the DB trigger. Test plan: - yarn lint; SQL tests for hodler path continue to pass. --- .../src/transfer-workflow/activities.ts | 216 ++++++++++++++++++ 1 file changed, 216 insertions(+) diff --git a/packages/workflows/src/transfer-workflow/activities.ts b/packages/workflows/src/transfer-workflow/activities.ts index 879b71043..a1f2e8a6e 100644 --- a/packages/workflows/src/transfer-workflow/activities.ts +++ b/packages/workflows/src/transfer-workflow/activities.ts @@ -12,6 +12,11 @@ import { type TemporalTransferUpdate, } from './supabase' import { isAddressInTopic, isReceiveTopic, isTransferTopic } from './wagmi' +import { config, readSendTokenBalanceOf, sendTokenAddress, baseMainnetClient } from '@my/wagmi' +import { createSupabaseAdminClient } from 'app/utils/supabase/admin' +import { hexToBytea } from 'app/utils/hexToBytea' +import { getUserIdFromAddress } from '../deposit-workflow/supabase' +import type { Database } from '@my/supabase/database.types' type TransferActivities = { upsertTemporalSendAccountTransferActivity: ( @@ -43,6 +48,27 @@ type TransferActivities = { eventName: string eventId: string }> + readBalanceActivity: (params: { + token: Address + account: Address + }) => Promise<{ + userId: string + token: Address + balance: string + address: Address + chainId: number + } | null> + persistBalanceActivity: (params: { + userId: string + token: Address | null + balance: string | bigint + address: Address + chainId: number + }) => Promise + upsertSendTokenHodlerVerificationActivity: (params: { + userId: string + balance: string | bigint + }) => Promise } export const createTransferActivities = ( @@ -209,5 +235,195 @@ export const createTransferActivities = ( eventId, } }, + + async readBalanceActivity({ token, account }) { + try { + // SEND-only gate + const chainId = baseMainnetClient.chain.id + const sendAddr = sendTokenAddress[chainId] + if (!sendAddr || token.toLowerCase() !== sendAddr.toLowerCase()) { + return null + } + + // Resolve user_id via existing helper (send_accounts.address is CITEXT) + const userId = await getUserIdFromAddress(account) + if (!userId) return null + + // Read balanceOf + const balance = await readSendTokenBalanceOf(config, { + args: [account], + chainId, + }) + + // Only return data; do not persist (simplicity) + return { + userId, + token: sendAddr, + balance: balance.toString(), + address: account, + chainId, + } + } catch (error) { + if (error instanceof ApplicationFailure) throw error + log.error('readBalanceActivity failed', { error }) + throw ApplicationFailure.nonRetryable('readBalanceActivity failed', 'READ_BALANCE_FAILED', { + error, + }) + } + }, + + async persistBalanceActivity({ userId, token, balance, address, chainId }) { + try { + const supabaseAdmin = createSupabaseAdminClient() + const payload: Database['public']['Tables']['token_balances']['Insert'] = { + user_id: userId, + address: address, + chain_id: chainId, + token: token ? hexToBytea(token) : null, + balance: balance, + updated_at: new Date().toISOString(), + } + const { error } = await supabaseAdmin + .from('token_balances') + .upsert([payload], { onConflict: 'user_id,token_key' }) + + if (error) { + if (isRetryableDBError(error)) { + throw ApplicationFailure.retryable( + 'Database connection error, retrying...', + error.code, + { + error, + userId, + } + ) + } + throw ApplicationFailure.nonRetryable('Database error occurred', error.code, { + error, + userId, + }) + } + } catch (error) { + if (error instanceof ApplicationFailure) throw error + log.error('persistBalanceActivity failed', { error }) + throw ApplicationFailure.nonRetryable( + error?.message ?? 'persistBalanceActivity failed', + error?.code ?? 'PERSIST_BALANCE_FAILED', + error + ) + } + }, + + async upsertSendTokenHodlerVerificationActivity({ userId, balance }) { + try { + const supabaseAdmin = createSupabaseAdminClient() + const nowIso = new Date().toISOString() + + // Resolve current distribution (qualification window contains now) + const { data: distribution, error: distError } = await supabaseAdmin + .from('distributions') + .select('id, qualification_start, qualification_end') + .lte('qualification_start', nowIso) + .gte('qualification_end', nowIso) + .order('qualification_start', { ascending: false }) + .limit(1) + .maybeSingle() + + if (distError) { + if (isRetryableDBError(distError)) { + throw ApplicationFailure.retryable( + 'Database connection error, retrying...', + distError.code, + { + error: distError, + } + ) + } + throw ApplicationFailure.nonRetryable( + 'Error fetching current distribution', + distError.code, + distError + ) + } + + if (!distribution) { + log.info('No active distribution window; skipping hodler verification upsert') + return + } + + // Insert or update without ON CONFLICT (index is not unique by design) + const { data: existing, error: selectError } = await supabaseAdmin + .from('distribution_verifications') + .select('id') + .eq('distribution_id', distribution.id) + .eq('user_id', userId) + .eq('type', 'send_token_hodler') + .maybeSingle() + + if (selectError) { + if (isRetryableDBError(selectError)) { + throw ApplicationFailure.retryable( + 'Database connection error, retrying...', + selectError.code, + { + error: selectError, + } + ) + } + throw ApplicationFailure.nonRetryable( + 'Error selecting distribution_verifications', + selectError.code, + selectError + ) + } + + let dvError: import('@supabase/supabase-js').PostgrestError | null = null + if (existing && 'id' in existing && existing.id) { + const { error: updateErr } = await supabaseAdmin + .from('distribution_verifications') + .update({ weight: balance, metadata: null }) + .eq('id', existing.id) + dvError = updateErr + } else { + const { error: insertErr } = await supabaseAdmin + .from('distribution_verifications') + .insert([ + { + distribution_id: distribution.id, + user_id: userId, + type: 'send_token_hodler', + weight: balance, + metadata: null, + }, + ]) + dvError = insertErr + } + + if (dvError) { + if (isRetryableDBError(dvError)) { + throw ApplicationFailure.retryable( + 'Database connection error, retrying...', + dvError.code, + { + error: dvError, + userId, + } + ) + } + throw ApplicationFailure.nonRetryable('Database error occurred', dvError.code, { + error: dvError, + userId, + }) + } + } catch (error) { + if (error instanceof ApplicationFailure) throw error + log.error('upsertSendTokenHodlerVerificationActivity failed', { error }) + throw ApplicationFailure.nonRetryable( + error?.message ?? 'upsertSendTokenHodlerVerificationActivity failed', + error?.code ?? 'UPSERT_HODLER_VERIFICATION_FAILED', + error + ) + } + }, } }