11import { resolve as pResolve } from 'node:path' ;
22import { Readable } from 'node:stream' ;
33import { mkdir , writeFile } from 'node:fs/promises' ;
4- import { configHandler , log } from '@contentstack/cli-utilities' ;
4+ import chunk from 'lodash/chunk' ;
5+ import { configHandler , log , FsUtility } from '@contentstack/cli-utilities' ;
56
67import type { CSAssetsAPIConfig , LinkedWorkspace } from '../types/cs-assets-api' ;
78import type { ExportContext } from '../types/export-types' ;
89import { CSAssetsExportAdapter } from './base' ;
9- import { getAssetItems , writeStreamToFile } from '../utils/export-helpers' ;
10- import { runInBatches } from '../utils/concurrent-batch' ;
10+ import { writeStreamToFile } from '../utils/export-helpers' ;
11+ import { forEachChunkedJsonStore } from '../utils/chunked-json-reader' ;
12+ import { withRetry , RetryableHttpError , isRetryableStatus , parseRetryAfterMs } from '../utils/retry' ;
13+ import type { CustomPromiseHandler } from '../utils/cs-assets-api-adapter' ;
1114import { PROCESS_NAMES , PROCESS_STATUS } from '../constants/index' ;
1215
16+ const ASSET_META_KEYS = [ 'uid' , 'url' , 'filename' , 'file_name' , 'parent_uid' ] ;
17+
18+ type AssetRecord = { uid ?: string ; _uid ?: string ; url ?: string ; filename ?: string ; file_name ?: string } ;
19+
1320export default class ExportAssets extends CSAssetsExportAdapter {
1421 constructor ( apiConfig : CSAssetsAPIConfig , exportContext : ExportContext ) {
1522 super ( apiConfig , exportContext ) ;
1623 }
1724
25+ private isDownloadable ( asset : AssetRecord ) : boolean {
26+ return Boolean ( asset ?. url && ( asset ?. uid ?? asset ?. _uid ) ) ;
27+ }
28+
1829 async start ( workspace : LinkedWorkspace , spaceDir : string ) : Promise < void > {
1930 await this . init ( ) ;
2031
@@ -25,113 +36,142 @@ export default class ExportAssets extends CSAssetsExportAdapter {
2536 await mkdir ( assetsDir , { recursive : true } ) ;
2637 log . debug ( `Assets directory ready: ${ assetsDir } ` , this . exportContext . context ) ;
2738
28- log . debug ( `Fetching folders and assets for space ${ workspace . space_uid } ` , this . exportContext . context ) ;
29-
30- const [ folders , assetsData ] = await Promise . all ( [
39+ // Stream asset metadata straight to chunked JSON as pages arrive — never hold the full set in
40+ // memory. The writer is created lazily so an empty space writes an empty index instead of chunks.
41+ let fsWriter : FsUtility | undefined ;
42+ let totalStreamed = 0 ;
43+ let downloadableCount = 0 ;
44+ const onPage = ( items : unknown [ ] ) => {
45+ if ( items . length === 0 ) return ;
46+ if ( ! fsWriter ) fsWriter = this . createChunkedJsonWriter ( assetsDir , 'assets.json' , 'assets' , ASSET_META_KEYS ) ;
47+ fsWriter . writeIntoFile ( items as Record < string , string > [ ] , { mapKeyVal : true } ) ;
48+ totalStreamed += items . length ;
49+ for ( const asset of items as AssetRecord [ ] ) if ( this . isDownloadable ( asset ) ) downloadableCount += 1 ;
50+ } ;
51+
52+ log . debug ( `Fetching folders and streaming assets for space ${ workspace . space_uid } ` , this . exportContext . context ) ;
53+ const [ folders ] = await Promise . all ( [
3154 this . getWorkspaceFolders ( workspace . space_uid , workspace . uid , this . apiPageSize , this . apiFetchConcurrency ) ,
32- this . getWorkspaceAssets ( workspace . space_uid , workspace . uid , this . apiPageSize , this . apiFetchConcurrency ) ,
55+ this . streamWorkspaceAssets ( workspace . space_uid , workspace . uid , onPage , this . apiPageSize , this . apiFetchConcurrency ) ,
3356 ] ) ;
3457
35- const assetItems = getAssetItems ( assetsData ) ;
36- const downloadableCount = assetItems . filter ( ( asset ) => Boolean ( asset . url && ( asset . uid ?? asset . _uid ) ) ) . length ;
58+ if ( fsWriter ) fsWriter . completeFile ( true ) ;
59+ else await this . writeEmptyChunkedJson ( assetsDir , 'assets.json' ) ;
60+ log . debug ( `Wrote chunked assets metadata (${ totalStreamed } item(s)) under ${ assetsDir } ` , this . exportContext . context ) ;
61+
3762 // Per-space total: 1 folder write + 1 metadata write + N per-asset downloads.
38- // The shared module-level total is just a placeholder before this point; update
39- // it now so the multibar row shows real progress as downloads tick in.
4063 this . progressOrParent ?. updateProcessTotal ?.( this . processName , 2 + downloadableCount ) ;
4164
4265 await writeFile ( pResolve ( assetsDir , 'folders.json' ) , JSON . stringify ( folders , null , 2 ) ) ;
4366 this . tick ( true , `folders: ${ workspace . space_uid } ` , null ) ;
4467 log . debug ( `Wrote folders.json for space ${ workspace . space_uid } ` , this . exportContext . context ) ;
4568
46- log . debug (
47- assetItems . length === 0
48- ? `No assets for space ${ workspace . space_uid } , wrote empty assets.json`
49- : `Writing ${ assetItems . length } assets metadata for space ${ workspace . space_uid } ` ,
50- this . exportContext . context ,
51- ) ;
52- await this . writeItemsToChunkedJson (
53- assetsDir ,
54- 'assets.json' ,
55- 'assets' ,
56- [ 'uid' , 'url' , 'filename' , 'file_name' , 'parent_uid' ] ,
57- assetItems ,
58- ) ;
59- log . debug (
60- `Finished writing chunked assets metadata (${ assetItems . length } item(s)) under ${ assetsDir } ` ,
61- this . exportContext . context ,
62- ) ;
6369 log . info (
64- assetItems . length === 0
70+ totalStreamed === 0
6571 ? `Wrote empty asset metadata for space ${ workspace . space_uid } `
66- : `Wrote ${ assetItems . length } asset metadata record(s) for space ${ workspace . space_uid } ` ,
72+ : `Wrote ${ totalStreamed } asset metadata record(s) for space ${ workspace . space_uid } ` ,
6773 this . exportContext . context ,
6874 ) ;
69- this . tick ( true , `metadata: ${ workspace . space_uid } (${ assetItems . length } )` , null ) ;
75+ this . tick ( true , `metadata: ${ workspace . space_uid } (${ totalStreamed } )` , null ) ;
7076
7177 log . debug ( `Starting binary downloads for space ${ workspace . space_uid } ` , this . exportContext . context ) ;
72- await this . downloadWorkspaceAssets ( assetsData , assetsDir , workspace . space_uid ) ;
78+ await this . downloadWorkspaceAssets ( assetsDir , workspace . space_uid , downloadableCount ) ;
7379 }
7480
75- private async downloadWorkspaceAssets ( assetsData : unknown , assetsDir : string , spaceUid : string ) : Promise < void > {
76- const items = getAssetItems ( assetsData ) ;
77- if ( items . length === 0 ) {
78- log . info ( `No asset files to download for space ${ spaceUid } ` , this . exportContext . context ) ;
79- log . debug ( 'No assets to download' , this . exportContext . context ) ;
80- return ;
81- }
82-
83- this . updateStatus ( PROCESS_STATUS [ PROCESS_NAMES . AM_DOWNLOADS ] . DOWNLOADING ) ;
84- log . info ( `Downloading asset files for space ${ spaceUid } (${ items . length } in metadata)` , this . exportContext . context ) ;
85- log . debug ( `Downloading ${ items . length } asset file(s) for space ${ spaceUid } ...` , this . exportContext . context ) ;
81+ /**
82+ * Download asset binaries by reading the just-written chunked `assets.json` back from disk
83+ * (one chunk at a time), so we never re-materialize the whole asset list in memory.
84+ */
85+ private async downloadWorkspaceAssets ( assetsDir : string , spaceUid : string , expectedDownloads : number ) : Promise < void > {
8686 const filesDir = pResolve ( assetsDir , 'files' ) ;
8787 await mkdir ( filesDir , { recursive : true } ) ;
88- log . debug ( `Asset files directory ready: ${ filesDir } ` , this . exportContext . context ) ;
8988
9089 const securedAssets = this . exportContext . securedAssets ?? false ;
9190 const authtoken = securedAssets ? configHandler . get ( 'authtoken' ) : null ;
9291 log . debug (
93- `Asset downloads: securedAssets=${ securedAssets } , concurrency=${ this . downloadAssetsBatchConcurrency } ` ,
92+ `Asset downloads: securedAssets=${ securedAssets } , concurrency=${ this . downloadAssetsBatchConcurrency } , expected= ${ expectedDownloads } ` ,
9493 this . exportContext . context ,
9594 ) ;
95+ this . updateStatus ( PROCESS_STATUS [ PROCESS_NAMES . AM_DOWNLOADS ] . DOWNLOADING ) ;
96+
9697 let downloadOk = 0 ;
9798 let downloadFail = 0 ;
9899
99- const validItems = items . filter ( ( asset ) => Boolean ( asset . url && ( asset . uid ?? asset . _uid ) ) ) ;
100- const skipped = items . length - validItems . length ;
101- if ( skipped > 0 ) {
102- log . debug (
103- `Skipping ${ skipped } asset row(s) without url or uid (${ validItems . length } file download(s) scheduled)` ,
100+ await forEachChunkedJsonStore < AssetRecord > (
101+ assetsDir ,
102+ 'assets.json' ,
103+ {
104+ context : this . exportContext . context ,
105+ chunkReadLogLabel : 'assets' ,
106+ onOpenError : ( err ) => log . debug ( `Could not open assets.json for download: ${ err } ` , this . exportContext . context ) ,
107+ onEmptyIndexer : ( ) => log . info ( `No asset files to download for space ${ spaceUid } ` , this . exportContext . context ) ,
108+ } ,
109+ async ( records ) => {
110+ const valid = records . filter ( ( asset ) => this . isDownloadable ( asset ) ) ;
111+ if ( valid . length === 0 ) return ;
112+ const apiBatches = chunk ( valid , this . downloadAssetsBatchConcurrency ) ;
113+ const promisifyHandler : CustomPromiseHandler = async ( { index, batchIndex } ) => {
114+ const asset = apiBatches [ batchIndex ] [ index ] as AssetRecord ;
115+ const uid = ( asset . uid ?? asset . _uid ) as string ;
116+ const url = asset . url as string ;
117+ const filename = asset . filename ?? asset . file_name ?? 'asset' ;
118+ if ( ! url || ! uid ) return ;
119+ try {
120+ const separator = url . includes ( '?' ) ? '&' : '?' ;
121+ const downloadUrl = securedAssets && authtoken ? `${ url } ${ separator } authtoken=${ authtoken } ` : url ;
122+ // Binary GET is idempotent — retry transient failures with backoff.
123+ const response = await withRetry (
124+ async ( ) => {
125+ let resp : Response ;
126+ try {
127+ resp = await fetch ( downloadUrl ) ;
128+ } catch ( e ) {
129+ throw new RetryableHttpError ( `download network error: ${ ( e as Error ) ?. message ?? String ( e ) } ` ) ;
130+ }
131+ if ( ! resp . ok ) {
132+ if ( isRetryableStatus ( resp . status ) ) {
133+ throw new RetryableHttpError ( `HTTP ${ resp . status } ` , resp . status , parseRetryAfterMs ( resp . headers . get ( 'retry-after' ) ) ) ;
134+ }
135+ throw new Error ( `HTTP ${ resp . status } ` ) ;
136+ }
137+ return resp ;
138+ } ,
139+ { context : this . exportContext . context , label : `download ${ filename } ` } ,
140+ ) ;
141+ const body = response . body ;
142+ if ( ! body ) throw new Error ( 'No response body' ) ;
143+ const nodeStream = Readable . fromWeb ( body as Parameters < typeof Readable . fromWeb > [ 0 ] ) ;
144+ const assetFolderPath = pResolve ( filesDir , uid ) ;
145+ await mkdir ( assetFolderPath , { recursive : true } ) ;
146+ const filePath = pResolve ( assetFolderPath , filename ) ;
147+ await writeStreamToFile ( nodeStream , filePath ) ;
148+ downloadOk += 1 ;
149+ // Per-asset tick so the per-space progress bar moves in real time.
150+ this . tick ( true , `asset: ${ filename } ` , null ) ;
151+ log . debug ( `Downloaded asset ${ uid } → ${ filePath } ` , this . exportContext . context ) ;
152+ } catch ( e ) {
153+ downloadFail += 1 ;
154+ const err = ( e as Error ) ?. message ?? PROCESS_STATUS [ PROCESS_NAMES . AM_DOWNLOADS ] . FAILED ;
155+ this . tick ( false , `asset: ${ filename } ` , err ) ;
156+ log . debug ( `Failed to download asset ${ uid } : ${ e } ` , this . exportContext . context ) ;
157+ }
158+ } ;
159+
160+ await this . makeConcurrentCall ( { apiBatches, module : 'asset downloads' } , promisifyHandler ) ;
161+ } ,
162+ ) ;
163+
164+ // Completeness check: a chunk that fails to read back is skipped (logged at debug) by
165+ // forEachChunkedJsonStore, which would silently drop those downloads. Reconcile attempts
166+ // (ok + failed) against what streaming counted as downloadable.
167+ const attempted = downloadOk + downloadFail ;
168+ if ( attempted < expectedDownloads ) {
169+ log . warn (
170+ `Asset downloads for space ${ spaceUid } incomplete: expected ${ expectedDownloads } , attempted ${ attempted } ` +
171+ ` — ${ expectedDownloads - attempted } asset(s) were never read back for download.` ,
104172 this . exportContext . context ,
105173 ) ;
106174 }
107- await runInBatches ( validItems , this . downloadAssetsBatchConcurrency , async ( asset ) => {
108- const uid = asset . uid ?? asset . _uid ;
109- const url = asset . url ;
110- const filename = asset . filename ?? asset . file_name ?? 'asset' ;
111- if ( ! url || ! uid ) return ;
112- try {
113- const separator = url . includes ( '?' ) ? '&' : '?' ;
114- const downloadUrl = securedAssets && authtoken ? `${ url } ${ separator } authtoken=${ authtoken } ` : url ;
115- const response = await fetch ( downloadUrl ) ;
116- if ( ! response . ok ) throw new Error ( `HTTP ${ response . status } ` ) ;
117- const body = response . body ;
118- if ( ! body ) throw new Error ( 'No response body' ) ;
119- const nodeStream = Readable . fromWeb ( body as Parameters < typeof Readable . fromWeb > [ 0 ] ) ;
120- const assetFolderPath = pResolve ( filesDir , uid ) ;
121- await mkdir ( assetFolderPath , { recursive : true } ) ;
122- const filePath = pResolve ( assetFolderPath , filename ) ;
123- await writeStreamToFile ( nodeStream , filePath ) ;
124- downloadOk += 1 ;
125- // Per-asset tick so the per-space progress bar moves in real time.
126- this . tick ( true , `asset: ${ filename } ` , null ) ;
127- log . debug ( `Downloaded asset ${ uid } → ${ filePath } ` , this . exportContext . context ) ;
128- } catch ( e ) {
129- downloadFail += 1 ;
130- const err = ( e as Error ) ?. message ?? PROCESS_STATUS [ PROCESS_NAMES . AM_DOWNLOADS ] . FAILED ;
131- this . tick ( false , `asset: ${ filename } ` , err ) ;
132- log . debug ( `Failed to download asset ${ uid } : ${ e } ` , this . exportContext . context ) ;
133- }
134- } ) ;
135175
136176 log . info (
137177 downloadFail === 0
0 commit comments