Skip to content

Commit f9bfeee

Browse files
committed
fix: harden turso session sync transport and timeouts
1 parent 2b3c496 commit f9bfeee

2 files changed

Lines changed: 147 additions & 16 deletions

File tree

src/sync/turso.test.ts

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,13 @@ import { describe, expect, it } from 'vitest';
22
import { normalizeSyncConfig } from './config.js';
33
import type { SyncLocations } from './paths.js';
44
import {
5+
estimateBase64EncodedLength,
6+
estimateSnapshotPayloadBase64Bytes,
57
extractHeadlessLoginHints,
68
extractRows,
79
isRetryableTursoError,
10+
isSnapshotPayloadSizeAllowed,
11+
MAX_TURSO_SNAPSHOT_BASE64_BYTES,
812
resolveSessionDbPaths,
913
resolveTursoCredentialPath,
1014
resolveTursoDatabaseName,
@@ -121,3 +125,27 @@ describe('extractRows', () => {
121125
expect(extractRows(null)).toEqual([]);
122126
});
123127
});
128+
129+
describe('snapshot payload sizing', () => {
130+
it('estimates base64 length for byte counts', () => {
131+
expect(estimateBase64EncodedLength(0)).toBe(0);
132+
expect(estimateBase64EncodedLength(1)).toBe(4);
133+
expect(estimateBase64EncodedLength(2)).toBe(4);
134+
expect(estimateBase64EncodedLength(3)).toBe(4);
135+
expect(estimateBase64EncodedLength(4)).toBe(8);
136+
});
137+
138+
it('estimates combined snapshot payload size', () => {
139+
const total = estimateSnapshotPayloadBase64Bytes({
140+
dbByteLength: 4,
141+
walByteLength: 3,
142+
shmByteLength: 1,
143+
});
144+
expect(total).toBe(16);
145+
});
146+
147+
it('checks whether snapshot payload size is within limit', () => {
148+
expect(isSnapshotPayloadSizeAllowed(MAX_TURSO_SNAPSHOT_BASE64_BYTES)).toBe(true);
149+
expect(isSnapshotPayloadSizeAllowed(MAX_TURSO_SNAPSHOT_BASE64_BYTES + 1)).toBe(false);
150+
});
151+
});

src/sync/turso.ts

Lines changed: 119 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,9 @@ import type { SyncLocations } from './paths.js';
1111
const SESSION_SYNC_TABLE = 'opencode_session_sync_snapshot';
1212
const CREDENTIAL_VERSION = 1;
1313
const TURSO_INSTALL_SCRIPT = 'curl -sSfL https://get.tur.so/install.sh | bash';
14+
const TURSO_SQL_TIMEOUT_MS = 30_000;
15+
const TURSO_PROCESS_KILL_GRACE_MS = 2_000;
16+
export const MAX_TURSO_SNAPSHOT_BASE64_BYTES = 8 * 1024 * 1024;
1417
const TURSO_EXECUTABLE_CANDIDATES = [
1518
'turso',
1619
'/opt/homebrew/bin/turso',
@@ -62,6 +65,11 @@ interface TursoCommandResult {
6265
timedOut: boolean;
6366
}
6467

68+
interface TimeoutSignalHandle {
69+
signal: AbortSignal;
70+
cleanup: () => void;
71+
}
72+
6573
export interface TursoSyncLogger {
6674
debug: (message: string, metadata?: Record<string, unknown>) => void;
6775
info: (message: string, metadata?: Record<string, unknown>) => void;
@@ -609,6 +617,29 @@ export function isRetryableTursoError(error: unknown): boolean {
609617
].some((token) => message.includes(token));
610618
}
611619

620+
export function estimateBase64EncodedLength(byteLength: number): number {
621+
if (!Number.isFinite(byteLength) || byteLength <= 0) {
622+
return 0;
623+
}
624+
return Math.ceil(byteLength / 3) * 4;
625+
}
626+
627+
export function estimateSnapshotPayloadBase64Bytes(input: {
628+
dbByteLength: number;
629+
walByteLength?: number | null;
630+
shmByteLength?: number | null;
631+
}): number {
632+
return (
633+
estimateBase64EncodedLength(input.dbByteLength) +
634+
estimateBase64EncodedLength(input.walByteLength ?? 0) +
635+
estimateBase64EncodedLength(input.shmByteLength ?? 0)
636+
);
637+
}
638+
639+
export function isSnapshotPayloadSizeAllowed(totalBase64Bytes: number): boolean {
640+
return totalBase64Bytes <= MAX_TURSO_SNAPSHOT_BASE64_BYTES;
641+
}
642+
612643
function sanitizeDatabaseName(input: string): string {
613644
const cleaned = input
614645
.trim()
@@ -712,7 +743,7 @@ async function ensureTursoExecutable(
712743
}
713744

714745
if (process.platform !== 'win32') {
715-
const installScript = await runCommand('bash', ['-lc', TURSO_INSTALL_SCRIPT], {
746+
const installScript = await runCommand('bash', ['-c', TURSO_INSTALL_SCRIPT], {
716747
timeoutMs: 120000,
717748
});
718749
if (installScript.code !== 0) {
@@ -914,11 +945,29 @@ async function runCommand(
914945
let stderr = '';
915946
let timedOut = false;
916947
let timeout: NodeJS.Timeout | null = null;
948+
let forceKillTimeout: NodeJS.Timeout | null = null;
949+
950+
const clearCommandTimers = (): void => {
951+
if (timeout) clearTimeout(timeout);
952+
if (forceKillTimeout) clearTimeout(forceKillTimeout);
953+
};
917954

918955
if (options.timeoutMs && options.timeoutMs > 0) {
919956
timeout = setTimeout(() => {
920957
timedOut = true;
921-
child.kill('SIGTERM');
958+
try {
959+
child.kill('SIGTERM');
960+
} catch {
961+
return;
962+
}
963+
forceKillTimeout = setTimeout(() => {
964+
if (child.exitCode !== null || child.signalCode !== null) return;
965+
try {
966+
child.kill('SIGKILL');
967+
} catch {
968+
// Process can already be gone by the time fallback runs.
969+
}
970+
}, TURSO_PROCESS_KILL_GRACE_MS);
922971
}, options.timeoutMs);
923972
}
924973

@@ -930,12 +979,12 @@ async function runCommand(
930979
});
931980

932981
child.on('error', (error) => {
933-
if (timeout) clearTimeout(timeout);
982+
clearCommandTimers();
934983
reject(error);
935984
});
936985

937986
child.on('close', (code) => {
938-
if (timeout) clearTimeout(timeout);
987+
clearCommandTimers();
939988
resolve({
940989
code: code ?? 1,
941990
stdout,
@@ -1002,7 +1051,7 @@ async function writeLocalSessionSnapshot(
10021051

10031052
async function writeBufferAtomically(targetPath: string, payload: Buffer): Promise<void> {
10041053
await fs.mkdir(path.dirname(targetPath), { recursive: true });
1005-
const tempPath = `${targetPath}.tmp-${process.pid}-${Date.now()}-${Math.random().toString(16).slice(2)}`;
1054+
const tempPath = `${targetPath}.tmp-${process.pid}-${Date.now()}-${crypto.randomUUID()}`;
10061055
await fs.writeFile(tempPath, payload, { mode: 0o600 });
10071056
await fs.rename(tempPath, targetPath);
10081057
}
@@ -1103,6 +1152,22 @@ async function upsertRemoteSnapshot(
11031152
snapshot: SessionSnapshot,
11041153
machineId: string
11051154
): Promise<void> {
1155+
const dbPayloadBase64 = snapshot.db.toString('base64');
1156+
const walPayloadBase64 = snapshot.wal ? snapshot.wal.toString('base64') : null;
1157+
const shmPayloadBase64 = snapshot.shm ? snapshot.shm.toString('base64') : null;
1158+
const payloadBase64Bytes = estimateSnapshotPayloadBase64Bytes({
1159+
dbByteLength: snapshot.db.byteLength,
1160+
walByteLength: snapshot.wal?.byteLength,
1161+
shmByteLength: snapshot.shm?.byteLength,
1162+
});
1163+
if (!isSnapshotPayloadSizeAllowed(payloadBase64Bytes)) {
1164+
throw new SyncCommandError(
1165+
`Session snapshot payload is too large for Turso upload ` +
1166+
`(${payloadBase64Bytes} base64 bytes; max ${MAX_TURSO_SNAPSHOT_BASE64_BYTES}). ` +
1167+
'Chunked uploads are not supported yet.'
1168+
);
1169+
}
1170+
11061171
const upsert = [
11071172
`INSERT INTO ${SESSION_SYNC_TABLE} (`,
11081173
'id, updated_at, machine_id, payload_sha256, payload_db_b64, payload_wal_b64, payload_shm_b64',
@@ -1120,9 +1185,9 @@ async function upsertRemoteSnapshot(
11201185
{ type: 'text', value: new Date().toISOString() },
11211186
{ type: 'text', value: machineId },
11221187
{ type: 'text', value: snapshot.sha256 },
1123-
{ type: 'text', value: snapshot.db.toString('base64') },
1124-
snapshot.wal ? { type: 'text', value: snapshot.wal.toString('base64') } : { type: 'null' },
1125-
snapshot.shm ? { type: 'text', value: snapshot.shm.toString('base64') } : { type: 'null' },
1188+
{ type: 'text', value: dbPayloadBase64 },
1189+
walPayloadBase64 ? { type: 'text', value: walPayloadBase64 } : { type: 'null' },
1190+
shmPayloadBase64 ? { type: 'text', value: shmPayloadBase64 } : { type: 'null' },
11261191
];
11271192

11281193
await executeSql(credential, upsert, args);
@@ -1172,14 +1237,26 @@ async function executeSql(
11721237
],
11731238
};
11741239

1175-
const response = await fetch(`${credential.httpUrl}/v2/pipeline`, {
1176-
method: 'POST',
1177-
headers: {
1178-
Authorization: `Bearer ${credential.token}`,
1179-
'Content-Type': 'application/json',
1180-
},
1181-
body: JSON.stringify(body),
1182-
});
1240+
const timeout = createTimeoutSignal(TURSO_SQL_TIMEOUT_MS);
1241+
let response: Response;
1242+
try {
1243+
response = await fetch(`${credential.httpUrl}/v2/pipeline`, {
1244+
method: 'POST',
1245+
headers: {
1246+
Authorization: `Bearer ${credential.token}`,
1247+
'Content-Type': 'application/json',
1248+
},
1249+
body: JSON.stringify(body),
1250+
signal: timeout.signal,
1251+
});
1252+
} catch (error) {
1253+
if (isAbortError(error)) {
1254+
throw new SyncCommandError(`Turso SQL request timed out after ${TURSO_SQL_TIMEOUT_MS}ms.`);
1255+
}
1256+
throw new SyncCommandError(`Turso SQL request failed: ${formatUnknownError(error)}`);
1257+
} finally {
1258+
timeout.cleanup();
1259+
}
11831260

11841261
const text = await response.text();
11851262
if (!response.ok) {
@@ -1264,3 +1341,29 @@ function decodeSqlCellToText(cell: unknown): string | null {
12641341
function trimTrailingSlash(value: string): string {
12651342
return value.replace(/\/+$/, '');
12661343
}
1344+
1345+
function createTimeoutSignal(timeoutMs: number): TimeoutSignalHandle {
1346+
const abortSignalWithTimeout = AbortSignal as typeof AbortSignal & {
1347+
timeout?: (_ms: number) => AbortSignal;
1348+
};
1349+
if (typeof abortSignalWithTimeout.timeout === 'function') {
1350+
return { signal: abortSignalWithTimeout.timeout(timeoutMs), cleanup: () => {} };
1351+
}
1352+
1353+
const controller = new AbortController();
1354+
const timeout = setTimeout(() => controller.abort(), timeoutMs);
1355+
return {
1356+
signal: controller.signal,
1357+
cleanup: () => {
1358+
clearTimeout(timeout);
1359+
},
1360+
};
1361+
}
1362+
1363+
function isAbortError(error: unknown): boolean {
1364+
return error instanceof Error && error.name === 'AbortError';
1365+
}
1366+
1367+
function formatUnknownError(error: unknown): string {
1368+
return error instanceof Error ? error.message : String(error);
1369+
}

0 commit comments

Comments
 (0)