Skip to content
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .talismanrc
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
fileignoreconfig:
- filename: pnpm-lock.yaml
checksum: 7ec6345eb15ed0be001753ee49733421a8a07096dc8a18465cdad1b82562fed8
checksum: 1215ef6c0dfe2200186bb5d842d7fce7ee7d7ea5954224a38553e9c574b9f9da
version: '1.0'
4 changes: 3 additions & 1 deletion packages/contentstack-asset-management/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@
],
"license": "MIT",
"dependencies": {
"@contentstack/cli-utilities": "~2.0.0-beta.9"
"@contentstack/cli-utilities": "~2.0.0-beta.9",
"lodash": "^4.17.21"
},
"oclif": {
"commands": "./lib/commands",
Expand All @@ -42,6 +43,7 @@
},
"devDependencies": {
"@types/chai": "^4.3.11",
"@types/lodash": "^4.17.0",
"@types/mocha": "^10.0.6",
"@types/node": "^20.17.50",
"@types/sinon": "^17.0.4",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ export const FALLBACK_AM_CHUNK_FILE_SIZE_MB = 1;
export const FALLBACK_AM_API_CONCURRENCY = 5;
/** @deprecated Use FALLBACK_AM_API_CONCURRENCY */
export const DEFAULT_AM_API_CONCURRENCY = FALLBACK_AM_API_CONCURRENCY;
export const FALLBACK_AM_API_PAGE_SIZE = 100;
export const FALLBACK_AM_API_FETCH_CONCURRENCY = 5;

/** Fallback strip lists when import options omit `fieldsImportInvalidKeys` / `assetTypesImportInvalidKeys`. */
export const FALLBACK_FIELDS_IMPORT_INVALID_KEYS = [
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ export default class ExportAssetTypes extends CSAssetsExportAdapter {

log.debug('Starting shared asset types export process...', this.exportContext.context);

const assetTypesData = await this.getWorkspaceAssetTypes(spaceUid);
const assetTypesData = await this.getWorkspaceAssetTypes(spaceUid, this.apiPageSize, this.apiFetchConcurrency);
const items = getArrayFromResponse(assetTypesData, 'asset_types');
const dir = this.getAssetTypesDir();
if (items.length === 0) {
Expand Down
198 changes: 119 additions & 79 deletions packages/contentstack-asset-management/src/export/assets.ts
Original file line number Diff line number Diff line change
@@ -1,20 +1,31 @@
import { resolve as pResolve } from 'node:path';
import { Readable } from 'node:stream';
import { mkdir, writeFile } from 'node:fs/promises';
import { configHandler, log } from '@contentstack/cli-utilities';
import chunk from 'lodash/chunk';
import { configHandler, log, FsUtility } from '@contentstack/cli-utilities';

import type { CSAssetsAPIConfig, LinkedWorkspace } from '../types/cs-assets-api';
import type { ExportContext } from '../types/export-types';
import { CSAssetsExportAdapter } from './base';
import { getAssetItems, writeStreamToFile } from '../utils/export-helpers';
import { runInBatches } from '../utils/concurrent-batch';
import { writeStreamToFile } from '../utils/export-helpers';
import { forEachChunkedJsonStore } from '../utils/chunked-json-reader';
import { withRetry, RetryableHttpError, isRetryableStatus, parseRetryAfterMs } from '../utils/retry';
import type { CustomPromiseHandler } from '../utils/cs-assets-api-adapter';
import { PROCESS_NAMES, PROCESS_STATUS } from '../constants/index';

const ASSET_META_KEYS = ['uid', 'url', 'filename', 'file_name', 'parent_uid'];

type AssetRecord = { uid?: string; _uid?: string; url?: string; filename?: string; file_name?: string };

export default class ExportAssets extends CSAssetsExportAdapter {
constructor(apiConfig: CSAssetsAPIConfig, exportContext: ExportContext) {
super(apiConfig, exportContext);
}

private isDownloadable(asset: AssetRecord): boolean {
return Boolean(asset?.url && (asset?.uid ?? asset?._uid));
}

async start(workspace: LinkedWorkspace, spaceDir: string): Promise<void> {
await this.init();

Expand All @@ -25,113 +36,142 @@ export default class ExportAssets extends CSAssetsExportAdapter {
await mkdir(assetsDir, { recursive: true });
log.debug(`Assets directory ready: ${assetsDir}`, this.exportContext.context);

log.debug(`Fetching folders and assets for space ${workspace.space_uid}`, this.exportContext.context);

const [folders, assetsData] = await Promise.all([
this.getWorkspaceFolders(workspace.space_uid, workspace.uid),
this.getWorkspaceAssets(workspace.space_uid, workspace.uid),
// Stream asset metadata straight to chunked JSON as pages arrive — never hold the full set in
// memory. The writer is created lazily so an empty space writes an empty index instead of chunks.
let fsWriter: FsUtility | undefined;
let totalStreamed = 0;
let downloadableCount = 0;
const onPage = (items: unknown[]) => {
if (items.length === 0) return;
if (!fsWriter) fsWriter = this.createChunkedJsonWriter(assetsDir, 'assets.json', 'assets', ASSET_META_KEYS);
fsWriter.writeIntoFile(items as Record<string, string>[], { mapKeyVal: true });
totalStreamed += items.length;
for (const asset of items as AssetRecord[]) if (this.isDownloadable(asset)) downloadableCount += 1;
};

log.debug(`Fetching folders and streaming assets for space ${workspace.space_uid}`, this.exportContext.context);
const [folders] = await Promise.all([
this.getWorkspaceFolders(workspace.space_uid, workspace.uid, this.apiPageSize, this.apiFetchConcurrency),
this.streamWorkspaceAssets(workspace.space_uid, workspace.uid, onPage, this.apiPageSize, this.apiFetchConcurrency),
]);

const assetItems = getAssetItems(assetsData);
const downloadableCount = assetItems.filter((asset) => Boolean(asset.url && (asset.uid ?? asset._uid))).length;
if (fsWriter) fsWriter.completeFile(true);
else await this.writeEmptyChunkedJson(assetsDir, 'assets.json');
log.debug(`Wrote chunked assets metadata (${totalStreamed} item(s)) under ${assetsDir}`, this.exportContext.context);

// Per-space total: 1 folder write + 1 metadata write + N per-asset downloads.
// The shared module-level total is just a placeholder before this point; update
// it now so the multibar row shows real progress as downloads tick in.
this.progressOrParent?.updateProcessTotal?.(this.processName, 2 + downloadableCount);

await writeFile(pResolve(assetsDir, 'folders.json'), JSON.stringify(folders, null, 2));
this.tick(true, `folders: ${workspace.space_uid}`, null);
log.debug(`Wrote folders.json for space ${workspace.space_uid}`, this.exportContext.context);

log.debug(
assetItems.length === 0
? `No assets for space ${workspace.space_uid}, wrote empty assets.json`
: `Writing ${assetItems.length} assets metadata for space ${workspace.space_uid}`,
this.exportContext.context,
);
await this.writeItemsToChunkedJson(
assetsDir,
'assets.json',
'assets',
['uid', 'url', 'filename', 'file_name', 'parent_uid'],
assetItems,
);
log.debug(
`Finished writing chunked assets metadata (${assetItems.length} item(s)) under ${assetsDir}`,
this.exportContext.context,
);
log.info(
assetItems.length === 0
totalStreamed === 0
? `Wrote empty asset metadata for space ${workspace.space_uid}`
: `Wrote ${assetItems.length} asset metadata record(s) for space ${workspace.space_uid}`,
: `Wrote ${totalStreamed} asset metadata record(s) for space ${workspace.space_uid}`,
this.exportContext.context,
);
this.tick(true, `metadata: ${workspace.space_uid} (${assetItems.length})`, null);
this.tick(true, `metadata: ${workspace.space_uid} (${totalStreamed})`, null);

log.debug(`Starting binary downloads for space ${workspace.space_uid}`, this.exportContext.context);
await this.downloadWorkspaceAssets(assetsData, assetsDir, workspace.space_uid);
await this.downloadWorkspaceAssets(assetsDir, workspace.space_uid, downloadableCount);
}

private async downloadWorkspaceAssets(assetsData: unknown, assetsDir: string, spaceUid: string): Promise<void> {
const items = getAssetItems(assetsData);
if (items.length === 0) {
log.info(`No asset files to download for space ${spaceUid}`, this.exportContext.context);
log.debug('No assets to download', this.exportContext.context);
return;
}

this.updateStatus(PROCESS_STATUS[PROCESS_NAMES.AM_DOWNLOADS].DOWNLOADING);
log.info(`Downloading asset files for space ${spaceUid} (${items.length} in metadata)`, this.exportContext.context);
log.debug(`Downloading ${items.length} asset file(s) for space ${spaceUid}...`, this.exportContext.context);
/**
* Download asset binaries by reading the just-written chunked `assets.json` back from disk
* (one chunk at a time), so we never re-materialize the whole asset list in memory.
*/
private async downloadWorkspaceAssets(assetsDir: string, spaceUid: string, expectedDownloads: number): Promise<void> {
const filesDir = pResolve(assetsDir, 'files');
await mkdir(filesDir, { recursive: true });
log.debug(`Asset files directory ready: ${filesDir}`, this.exportContext.context);

const securedAssets = this.exportContext.securedAssets ?? false;
const authtoken = securedAssets ? configHandler.get('authtoken') : null;
log.debug(
`Asset downloads: securedAssets=${securedAssets}, concurrency=${this.downloadAssetsBatchConcurrency}`,
`Asset downloads: securedAssets=${securedAssets}, concurrency=${this.downloadAssetsBatchConcurrency}, expected=${expectedDownloads}`,
this.exportContext.context,
);
this.updateStatus(PROCESS_STATUS[PROCESS_NAMES.AM_DOWNLOADS].DOWNLOADING);

let downloadOk = 0;
let downloadFail = 0;

const validItems = items.filter((asset) => Boolean(asset.url && (asset.uid ?? asset._uid)));
const skipped = items.length - validItems.length;
if (skipped > 0) {
log.debug(
`Skipping ${skipped} asset row(s) without url or uid (${validItems.length} file download(s) scheduled)`,
await forEachChunkedJsonStore<AssetRecord>(
assetsDir,
'assets.json',
{
context: this.exportContext.context,
chunkReadLogLabel: 'assets',
onOpenError: (err) => log.debug(`Could not open assets.json for download: ${err}`, this.exportContext.context),
onEmptyIndexer: () => log.info(`No asset files to download for space ${spaceUid}`, this.exportContext.context),
},
async (records) => {
const valid = records.filter((asset) => this.isDownloadable(asset));
if (valid.length === 0) return;
const apiBatches = chunk(valid, this.downloadAssetsBatchConcurrency);
const promisifyHandler: CustomPromiseHandler = async ({ index, batchIndex }) => {
const asset = apiBatches[batchIndex][index] as AssetRecord;
const uid = (asset.uid ?? asset._uid) as string;
const url = asset.url as string;
const filename = asset.filename ?? asset.file_name ?? 'asset';
if (!url || !uid) return;
try {
const separator = url.includes('?') ? '&' : '?';
const downloadUrl = securedAssets && authtoken ? `${url}${separator}authtoken=${authtoken}` : url;
// Binary GET is idempotent — retry transient failures with backoff.
const response = await withRetry(
async () => {
let resp: Response;
try {
resp = await fetch(downloadUrl);
} catch (e) {
throw new RetryableHttpError(`download network error: ${(e as Error)?.message ?? String(e)}`);
}
if (!resp.ok) {
if (isRetryableStatus(resp.status)) {
throw new RetryableHttpError(`HTTP ${resp.status}`, resp.status, parseRetryAfterMs(resp.headers.get('retry-after')));
}
throw new Error(`HTTP ${resp.status}`);
}
return resp;
},
{ context: this.exportContext.context, label: `download ${filename}` },
);
const body = response.body;
if (!body) throw new Error('No response body');
const nodeStream = Readable.fromWeb(body as Parameters<typeof Readable.fromWeb>[0]);
const assetFolderPath = pResolve(filesDir, uid);
await mkdir(assetFolderPath, { recursive: true });
const filePath = pResolve(assetFolderPath, filename);
await writeStreamToFile(nodeStream, filePath);
downloadOk += 1;
// Per-asset tick so the per-space progress bar moves in real time.
this.tick(true, `asset: ${filename}`, null);
log.debug(`Downloaded asset ${uid} → ${filePath}`, this.exportContext.context);
} catch (e) {
downloadFail += 1;
const err = (e as Error)?.message ?? PROCESS_STATUS[PROCESS_NAMES.AM_DOWNLOADS].FAILED;
this.tick(false, `asset: ${filename}`, err);
log.debug(`Failed to download asset ${uid}: ${e}`, this.exportContext.context);
}
};

await this.makeConcurrentCall({ apiBatches, module: 'asset downloads' }, promisifyHandler);
},
);

// Completeness check: a chunk that fails to read back is skipped (logged at debug) by
// forEachChunkedJsonStore, which would silently drop those downloads. Reconcile attempts
// (ok + failed) against what streaming counted as downloadable.
const attempted = downloadOk + downloadFail;
if (attempted < expectedDownloads) {
log.warn(
`Asset downloads for space ${spaceUid} incomplete: expected ${expectedDownloads}, attempted ${attempted}` +
` — ${expectedDownloads - attempted} asset(s) were never read back for download.`,
this.exportContext.context,
);
}
await runInBatches(validItems, this.downloadAssetsBatchConcurrency, async (asset) => {
const uid = asset.uid ?? asset._uid;
const url = asset.url;
const filename = asset.filename ?? asset.file_name ?? 'asset';
if (!url || !uid) return;
try {
const separator = url.includes('?') ? '&' : '?';
const downloadUrl = securedAssets && authtoken ? `${url}${separator}authtoken=${authtoken}` : url;
const response = await fetch(downloadUrl);
if (!response.ok) throw new Error(`HTTP ${response.status}`);
const body = response.body;
if (!body) throw new Error('No response body');
const nodeStream = Readable.fromWeb(body as Parameters<typeof Readable.fromWeb>[0]);
const assetFolderPath = pResolve(filesDir, uid);
await mkdir(assetFolderPath, { recursive: true });
const filePath = pResolve(assetFolderPath, filename);
await writeStreamToFile(nodeStream, filePath);
downloadOk += 1;
// Per-asset tick so the per-space progress bar moves in real time.
this.tick(true, `asset: ${filename}`, null);
log.debug(`Downloaded asset ${uid} → ${filePath}`, this.exportContext.context);
} catch (e) {
downloadFail += 1;
const err = (e as Error)?.message ?? PROCESS_STATUS[PROCESS_NAMES.AM_DOWNLOADS].FAILED;
this.tick(false, `asset: ${filename}`, err);
log.debug(`Failed to download asset ${uid}: ${e}`, this.exportContext.context);
}
});

log.info(
downloadFail === 0
Expand Down
42 changes: 30 additions & 12 deletions packages/contentstack-asset-management/src/export/base.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import { FsUtility, log, CLIProgressManager, configHandler } from '@contentstack
import type { CSAssetsAPIConfig } from '../types/cs-assets-api';
import type { ExportContext } from '../types/export-types';
import { CSAssetsAdapter } from '../utils/cs-assets-api-adapter';
import { CS_ASSETS_MAIN_PROCESS_NAME, FALLBACK_AM_API_CONCURRENCY, FALLBACK_AM_CHUNK_FILE_SIZE_MB } from '../constants/index';
import { CS_ASSETS_MAIN_PROCESS_NAME, FALLBACK_AM_API_CONCURRENCY, FALLBACK_AM_API_FETCH_CONCURRENCY, FALLBACK_AM_API_PAGE_SIZE, FALLBACK_AM_CHUNK_FILE_SIZE_MB } from '../constants/index';

export type { ExportContext };

Expand Down Expand Up @@ -82,6 +82,14 @@ export class CSAssetsExportAdapter extends CSAssetsAdapter {
return this.exportContext.downloadAssetsConcurrency ?? this.apiConcurrency;
}

protected get apiPageSize(): number {
return this.exportContext.pageSize ?? FALLBACK_AM_API_PAGE_SIZE;
}

protected get apiFetchConcurrency(): number {
return this.exportContext.fetchConcurrency ?? FALLBACK_AM_API_FETCH_CONCURRENCY;
}

protected getAssetTypesDir(): string {
return pResolve(this.exportContext.spacesRootPath, 'asset_types');
}
Expand All @@ -90,6 +98,25 @@ export class CSAssetsExportAdapter extends CSAssetsAdapter {
return pResolve(this.exportContext.spacesRootPath, 'fields');
}

/** Build a chunked-JSON writer for incremental (streaming) writes. Caller must `completeFile(true)`. */
protected createChunkedJsonWriter(dir: string, indexFileName: string, moduleName: string, metaPickKeys: string[]): FsUtility {
const chunkMb = this.exportContext.chunkFileSizeMb ?? FALLBACK_AM_CHUNK_FILE_SIZE_MB;
return new FsUtility({
basePath: dir,
indexFileName,
chunkFileSize: chunkMb,
moduleName,
fileExt: 'json',
metaPickKeys,
keepMetadata: true,
});
}

/** Write an empty index file (matches FsUtility's layout for a zero-record store). */
protected async writeEmptyChunkedJson(dir: string, indexFileName: string): Promise<void> {
await writeFile(pResolve(dir, indexFileName), '{}');
}

protected async writeItemsToChunkedJson(
dir: string,
indexFileName: string,
Expand All @@ -98,19 +125,10 @@ export class CSAssetsExportAdapter extends CSAssetsAdapter {
items: unknown[],
): Promise<void> {
if (items.length === 0) {
await writeFile(pResolve(dir, indexFileName), '{}');
await this.writeEmptyChunkedJson(dir, indexFileName);
return;
}
const chunkMb = this.exportContext.chunkFileSizeMb ?? FALLBACK_AM_CHUNK_FILE_SIZE_MB;
const fs = new FsUtility({
basePath: dir,
indexFileName,
chunkFileSize: chunkMb,
moduleName,
fileExt: 'json',
metaPickKeys,
keepMetadata: true,
});
const fs = this.createChunkedJsonWriter(dir, indexFileName, moduleName, metaPickKeys);
fs.writeIntoFile(items as Record<string, string>[], { mapKeyVal: true });
fs.completeFile(true);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ export default class ExportFields extends CSAssetsExportAdapter {

log.debug('Starting shared fields export process...', this.exportContext.context);

const fieldsData = await this.getWorkspaceFields(spaceUid);
const fieldsData = await this.getWorkspaceFields(spaceUid, this.apiPageSize, this.apiFetchConcurrency);
const items = getArrayFromResponse(fieldsData, 'fields');
const dir = this.getFieldsDir();
if (items.length === 0) {
Expand Down
2 changes: 2 additions & 0 deletions packages/contentstack-asset-management/src/export/spaces.ts
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ export class ExportSpaces {
chunkFileSizeMb,
apiConcurrency: this.options.apiConcurrency,
downloadAssetsConcurrency: this.options.downloadAssetsConcurrency,
pageSize: this.options.pageSize,
fetchConcurrency: this.options.fetchConcurrency,
};

const sharedFieldsDir = pResolve(spacesRootPath, 'fields');
Expand Down
Loading
Loading