Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions apis/api-journeys-modern/src/lib/google/sheets.ts
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,12 @@ export async function readValues({
return values
}

// Escape a sheet title for use in A1 notation ranges.
// Per Google Sheets: wrap in single quotes, doubling any internal single quotes.
export function escapeSheetName(name: string): string {
return `'${name.replace(/'/g, "''")}'`
}

// Convert a 0-based column index to A1 column letters
export function columnIndexToA1(colIndexZeroBased: number): string {
let n = colIndexZeroBased + 1
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
import { Job } from 'bullmq'
import { Logger } from 'pino'

import { prismaMock } from '../../../../test/prismaMock'
import { getIntegrationGoogleAccessToken } from '../../../lib/google/googleAuth'
import {
clearSheet,
ensureSheet,
readValues,
writeValues
} from '../../../lib/google/sheets'
import { GoogleSheetsSyncBackfillJobData } from '../queue'

import { backfillService } from './backfill'

jest.mock('../../../lib/google/googleAuth', () => ({
getIntegrationGoogleAccessToken: jest.fn()
}))

jest.mock('../../../lib/google/sheets', () => {
const actual = jest.requireActual('../../../lib/google/sheets')
return {
...actual,
clearSheet: jest.fn(),
ensureSheet: jest.fn(),
readValues: jest.fn(),
writeValues: jest.fn()
}
})

const mockGetIntegrationGoogleAccessToken =
getIntegrationGoogleAccessToken as jest.MockedFunction<
typeof getIntegrationGoogleAccessToken
>
const mockEnsureSheet = ensureSheet as jest.MockedFunction<typeof ensureSheet>
const mockReadValues = readValues as jest.MockedFunction<typeof readValues>
const mockClearSheet = clearSheet as jest.MockedFunction<typeof clearSheet>
const mockWriteValues = writeValues as jest.MockedFunction<typeof writeValues>

const backfillJob: Job<GoogleSheetsSyncBackfillJobData> = {
name: 'google-sheets-sync-backfill',
data: {
type: 'backfill',
journeyId: 'journey-id',
teamId: 'team-id',
syncId: 'sync-id',
spreadsheetId: 'spreadsheet-id',
sheetName: 'Sheet1',
timezone: 'UTC',
integrationId: 'integration-id'
}
} as unknown as Job<GoogleSheetsSyncBackfillJobData>

describe('backfillService', () => {
let logger: Logger

beforeEach(() => {
jest.clearAllMocks()
logger = {
info: jest.fn(),
warn: jest.fn(),
error: jest.fn()
} as unknown as Logger

prismaMock.googleSheetsSync.findFirst.mockResolvedValue({
id: 'sync-id'
} as any)
prismaMock.journey.findUnique.mockResolvedValue({
id: 'journey-id',
blocks: []
} as any)
prismaMock.event.findMany.mockResolvedValue([] as any)
prismaMock.journeyVisitor.findMany.mockResolvedValue([] as any)

mockGetIntegrationGoogleAccessToken.mockResolvedValue({
accessToken: 'access-token',
accountEmail: '[email protected]'
})
mockEnsureSheet.mockResolvedValue(undefined)
})

it('skips clear/write when sheet content is unchanged', async () => {
mockReadValues.mockResolvedValue([['Visitor ID', 'Date']])

await backfillService(backfillJob, logger)

expect(mockReadValues).toHaveBeenCalledWith({
accessToken: 'access-token',
spreadsheetId: 'spreadsheet-id',
range: "'Sheet1'!A:B"
})
expect(mockClearSheet).not.toHaveBeenCalled()
expect(mockWriteValues).not.toHaveBeenCalled()
})

it('clears and rewrites when sheet content changed', async () => {
mockReadValues.mockResolvedValue([['Visitor ID', 'Date (Old)']])

await backfillService(backfillJob, logger)

expect(mockClearSheet).toHaveBeenCalledWith({
accessToken: 'access-token',
spreadsheetId: 'spreadsheet-id',
sheetTitle: 'Sheet1'
})
expect(mockWriteValues).toHaveBeenCalledWith({
accessToken: 'access-token',
spreadsheetId: 'spreadsheet-id',
sheetTitle: 'Sheet1',
values: [['Visitor ID', 'Date']],
append: false
})
})

it('clears and rewrites when existing sheet has more rows than new data', async () => {
mockReadValues.mockResolvedValue([
['Visitor ID', 'Date'],
['visitor-1', '2026-01-01']
])

await backfillService(backfillJob, logger)

expect(mockClearSheet).toHaveBeenCalled()
expect(mockWriteValues).toHaveBeenCalled()
})

it('clears and rewrites when existing sheet is empty', async () => {
mockReadValues.mockResolvedValue([])

await backfillService(backfillJob, logger)

expect(mockClearSheet).toHaveBeenCalled()
expect(mockWriteValues).toHaveBeenCalled()
})

it('falls through to write when readValues fails', async () => {
mockReadValues.mockRejectedValue(new Error('API rate limit exceeded'))

await backfillService(backfillJob, logger)

expect(logger.warn).toHaveBeenCalledWith(
expect.objectContaining({ err: expect.any(Error) }),
'Failed to read existing sheet values, proceeding with full write'
)
expect(mockClearSheet).toHaveBeenCalled()
expect(mockWriteValues).toHaveBeenCalled()
})

it('skips backfill when sync is not found', async () => {
prismaMock.googleSheetsSync.findFirst.mockResolvedValue(null)

await backfillService(backfillJob, logger)

expect(logger.warn).toHaveBeenCalledWith(
expect.objectContaining({ syncId: 'sync-id' }),
'Sync not found or deleted, skipping backfill'
)
expect(mockEnsureSheet).not.toHaveBeenCalled()
expect(mockReadValues).not.toHaveBeenCalled()
})

it('skips backfill when journey is not found', async () => {
prismaMock.journey.findUnique.mockResolvedValue(null)

await backfillService(backfillJob, logger)

expect(logger.warn).toHaveBeenCalledWith(
expect.objectContaining({ journeyId: 'journey-id' }),
'Journey not found, skipping backfill'
)
expect(mockEnsureSheet).not.toHaveBeenCalled()
})
})
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,10 @@ import { Prisma, prisma } from '@core/prisma/journeys/client'
import { getIntegrationGoogleAccessToken } from '../../../lib/google/googleAuth'
import {
clearSheet,
columnIndexToA1,
ensureSheet,
escapeSheetName,
readValues,
writeValues
} from '../../../lib/google/sheets'
import { computeConnectedBlockIds } from '../../../schema/journeyVisitor/export/connectivity'
Expand Down Expand Up @@ -44,6 +47,30 @@ interface JourneyVisitorExportRow {
[key: string]: string
}

function isSheetContentUnchanged({
nextValues,
existingValues,
writeWidth
}: {
nextValues: (string | null)[][]
existingValues: (string | null)[][]
writeWidth: number
}): boolean {
if (existingValues.length !== nextValues.length) return false

return nextValues.every((nextRow, rowIndex) => {
const existingRow = existingValues[rowIndex] ?? []

for (let colIndex = 0; colIndex < writeWidth; colIndex++) {
const nextCell = String(nextRow[colIndex] ?? '')
const existingCell = String(existingRow[colIndex] ?? '')
if (nextCell !== existingCell) return false
}

return true
})
}

async function* getJourneyVisitors(
journeyId: string,
eventWhere: Prisma.EventWhereInput,
Expand Down Expand Up @@ -281,9 +308,6 @@ export async function backfillService(
// Ensure sheet exists
await ensureSheet({ accessToken, spreadsheetId, sheetTitle: sheetName })

// Clear existing data
await clearSheet({ accessToken, spreadsheetId, sheetTitle: sheetName })

// Build data rows
const sanitizedHeaderRow = headerRow.map((cell) =>
sanitizeGoogleSheetsCell(cell)
Expand All @@ -303,14 +327,46 @@ export async function backfillService(
values.push(aligned)
}

// Write all data at once
await writeValues({
accessToken,
spreadsheetId,
sheetTitle: sheetName,
values,
append: false
})
const writeWidth = finalHeader.length
const lastColumnA1 = columnIndexToA1(writeWidth - 1)
const escapedName = escapeSheetName(sheetName)
const fullRange = `${escapedName}!A:${lastColumnA1}`

let existingValues: (string | null)[][] = []
let readFailed = false
try {
existingValues = await readValues({
accessToken,
spreadsheetId,
range: fullRange
})
} catch (err) {
readFailed = true
logger?.warn(
{ err, range: fullRange },
'Failed to read existing sheet values, proceeding with full write'
)
}

const contentUnchanged =
!readFailed &&
isSheetContentUnchanged({
nextValues: values,
existingValues,
writeWidth
})

if (!contentUnchanged) {
await clearSheet({ accessToken, spreadsheetId, sheetTitle: sheetName })

await writeValues({
accessToken,
spreadsheetId,
sheetTitle: sheetName,
values,
append: false
})
}

// Update exportOrder on blocks that don't have it set yet.
// This ensures columns maintain their positions for future syncs.
Expand Down
Loading