Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
216 changes: 216 additions & 0 deletions packages/workflows/src/transfer-workflow/activities.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,11 @@ import {
type TemporalTransferUpdate,
} from './supabase'
import { isAddressInTopic, isReceiveTopic, isTransferTopic } from './wagmi'
import { config, readSendTokenBalanceOf, sendTokenAddress, baseMainnetClient } from '@my/wagmi'
import { createSupabaseAdminClient } from 'app/utils/supabase/admin'
import { hexToBytea } from 'app/utils/hexToBytea'
import { getUserIdFromAddress } from '../deposit-workflow/supabase'
import type { Database } from '@my/supabase/database.types'

type TransferActivities = {
upsertTemporalSendAccountTransferActivity: (
Expand Down Expand Up @@ -43,6 +48,27 @@ type TransferActivities = {
eventName: string
eventId: string
}>
readBalanceActivity: (params: {
token: Address
account: Address
}) => Promise<{
userId: string
token: Address
balance: string
address: Address
chainId: number
} | null>
persistBalanceActivity: (params: {
userId: string
token: Address | null
balance: string | bigint
address: Address
chainId: number
}) => Promise<void>
upsertSendTokenHodlerVerificationActivity: (params: {
userId: string
balance: string | bigint
}) => Promise<void>
}

export const createTransferActivities = (
Expand Down Expand Up @@ -209,5 +235,195 @@ export const createTransferActivities = (
eventId,
}
},

async readBalanceActivity({ token, account }) {
try {
// SEND-only gate
const chainId = baseMainnetClient.chain.id
const sendAddr = sendTokenAddress[chainId]
if (!sendAddr || token.toLowerCase() !== sendAddr.toLowerCase()) {
return null
}

// Resolve user_id via existing helper (send_accounts.address is CITEXT)
const userId = await getUserIdFromAddress(account)
if (!userId) return null

// Read balanceOf
const balance = await readSendTokenBalanceOf(config, {
args: [account],
chainId,
})

// Only return data; do not persist (simplicity)
return {
userId,
token: sendAddr,
balance: balance.toString(),
address: account,
chainId,
}
} catch (error) {
if (error instanceof ApplicationFailure) throw error
log.error('readBalanceActivity failed', { error })
throw ApplicationFailure.nonRetryable('readBalanceActivity failed', 'READ_BALANCE_FAILED', {
error,
})
}
},

async persistBalanceActivity({ userId, token, balance, address, chainId }) {
try {
const supabaseAdmin = createSupabaseAdminClient()
const payload: Database['public']['Tables']['token_balances']['Insert'] = {
user_id: userId,
address: address,
chain_id: chainId,
token: token ? hexToBytea(token) : null,
balance: balance,
updated_at: new Date().toISOString(),
}
const { error } = await supabaseAdmin
.from('token_balances')
.upsert([payload], { onConflict: 'user_id,token_key' })

if (error) {
if (isRetryableDBError(error)) {
throw ApplicationFailure.retryable(
'Database connection error, retrying...',
error.code,
{
error,
userId,
}
)
}
throw ApplicationFailure.nonRetryable('Database error occurred', error.code, {
error,
userId,
})
}
} catch (error) {
if (error instanceof ApplicationFailure) throw error
log.error('persistBalanceActivity failed', { error })
throw ApplicationFailure.nonRetryable(
error?.message ?? 'persistBalanceActivity failed',
error?.code ?? 'PERSIST_BALANCE_FAILED',
error
)
}
},

async upsertSendTokenHodlerVerificationActivity({ userId, balance }) {
try {
const supabaseAdmin = createSupabaseAdminClient()
const nowIso = new Date().toISOString()

// Resolve current distribution (qualification window contains now)
const { data: distribution, error: distError } = await supabaseAdmin
.from('distributions')
.select('id, qualification_start, qualification_end')
.lte('qualification_start', nowIso)
.gte('qualification_end', nowIso)
.order('qualification_start', { ascending: false })
.limit(1)
.maybeSingle()

if (distError) {
if (isRetryableDBError(distError)) {
throw ApplicationFailure.retryable(
'Database connection error, retrying...',
distError.code,
{
error: distError,
}
)
}
throw ApplicationFailure.nonRetryable(
'Error fetching current distribution',
distError.code,
distError
)
}

if (!distribution) {
log.info('No active distribution window; skipping hodler verification upsert')
return
}

// Insert or update without ON CONFLICT (index is not unique by design)
const { data: existing, error: selectError } = await supabaseAdmin
.from('distribution_verifications')
.select('id')
.eq('distribution_id', distribution.id)
.eq('user_id', userId)
.eq('type', 'send_token_hodler')
.maybeSingle()

if (selectError) {
if (isRetryableDBError(selectError)) {
throw ApplicationFailure.retryable(
'Database connection error, retrying...',
selectError.code,
{
error: selectError,
}
)
}
throw ApplicationFailure.nonRetryable(
'Error selecting distribution_verifications',
selectError.code,
selectError
)
}

let dvError: import('@supabase/supabase-js').PostgrestError | null = null
if (existing && 'id' in existing && existing.id) {
const { error: updateErr } = await supabaseAdmin
.from('distribution_verifications')
.update({ weight: balance, metadata: null })
.eq('id', existing.id)
dvError = updateErr
} else {
const { error: insertErr } = await supabaseAdmin
.from('distribution_verifications')
.insert([
{
distribution_id: distribution.id,
user_id: userId,
type: 'send_token_hodler',
weight: balance,
metadata: null,
},
])
dvError = insertErr
}

if (dvError) {
if (isRetryableDBError(dvError)) {
throw ApplicationFailure.retryable(
'Database connection error, retrying...',
dvError.code,
{
error: dvError,
userId,
}
)
}
throw ApplicationFailure.nonRetryable('Database error occurred', dvError.code, {
error: dvError,
userId,
})
}
} catch (error) {
if (error instanceof ApplicationFailure) throw error
log.error('upsertSendTokenHodlerVerificationActivity failed', { error })
throw ApplicationFailure.nonRetryable(
error?.message ?? 'upsertSendTokenHodlerVerificationActivity failed',
error?.code ?? 'UPSERT_HODLER_VERIFICATION_FAILED',
error
)
}
},
}
}
Loading