@@ -1989,8 +1989,7 @@ public Map<String, Long> executeTableExecute(ConnectorSession session, Connector
19891989 executeExpireSnapshots (session , executeHandle );
19901990 return ImmutableMap .of ();
19911991 case REMOVE_ORPHAN_FILES :
1992- executeRemoveOrphanFiles (session , executeHandle );
1993- return ImmutableMap .of ();
1992+ return executeRemoveOrphanFiles (session , executeHandle );
19941993 case ADD_FILES :
19951994 executeAddFiles (session , executeHandle );
19961995 return ImmutableMap .of ();
@@ -2118,7 +2117,7 @@ private static void validateTableExecuteParameters(
21182117 sessionMinRetentionParameterName );
21192118 }
21202119
2121- public void executeRemoveOrphanFiles (ConnectorSession session , IcebergTableExecuteHandle executeHandle )
2120+ public Map < String , Long > executeRemoveOrphanFiles (ConnectorSession session , IcebergTableExecuteHandle executeHandle )
21222121 {
21232122 IcebergRemoveOrphanFilesHandle removeOrphanFilesHandle = (IcebergRemoveOrphanFilesHandle ) executeHandle .procedureHandle ();
21242123
@@ -2135,14 +2134,14 @@ public void executeRemoveOrphanFiles(ConnectorSession session, IcebergTableExecu
21352134
21362135 if (table .currentSnapshot () == null ) {
21372136 log .debug ("Skipping remove_orphan_files procedure for empty table %s" , table );
2138- return ;
2137+ return ImmutableMap . of () ;
21392138 }
21402139
21412140 Instant expiration = session .getStart ().minusMillis (retention .toMillis ());
2142- removeOrphanFiles (table , session , executeHandle .schemaTableName (), expiration , executeHandle .fileIoProperties ());
2141+ return removeOrphanFiles (table , session , executeHandle .schemaTableName (), expiration , executeHandle .fileIoProperties ());
21432142 }
21442143
2145- private void removeOrphanFiles (Table table , ConnectorSession session , SchemaTableName schemaTableName , Instant expiration , Map <String , String > fileIoProperties )
2144+ private Map < String , Long > removeOrphanFiles (Table table , ConnectorSession session , SchemaTableName schemaTableName , Instant expiration , Map <String , String > fileIoProperties )
21462145 {
21472146 Set <String > processedManifestFilePaths = new HashSet <>();
21482147 // Similarly to issues like https://github.com/trinodb/trino/issues/13759, equivalent paths may have different String
@@ -2205,7 +2204,18 @@ private void removeOrphanFiles(Table table, ConnectorSession session, SchemaTabl
22052204 // Ensure any futures still running are canceled in case of failure
22062205 manifestScanFutures .forEach (future -> future .cancel (true ));
22072206 }
2208- scanAndDeleteInvalidFiles (table , session , schemaTableName , expiration , validFileNames , fileIoProperties );
2207+ ScanAndDeleteResult result = scanAndDeleteInvalidFiles (table , session , schemaTableName , expiration , validFileNames , fileIoProperties );
2208+ log .info ("remove_orphan_files for table %s processed %d manifest files, found %d active files, scanned %d files, deleted %d files" ,
2209+ schemaTableName ,
2210+ processedManifestFilePaths .size (),
2211+ validFileNames .size () - 1 , // excluding version-hint.text
2212+ result .scannedFilesCount (),
2213+ result .deletedFilesCount ());
2214+ return ImmutableMap .of (
2215+ "processed_manifests_count" , (long ) processedManifestFilePaths .size (),
2216+ "active_files_count" , (long ) validFileNames .size () - 1 , // excluding version-hint.text
2217+ "scanned_files_count" , result .scannedFilesCount (),
2218+ "deleted_files_count" , result .deletedFilesCount ());
22092219 }
22102220
22112221 public void executeAddFiles (ConnectorSession session , IcebergTableExecuteHandle executeHandle )
@@ -2240,17 +2250,21 @@ public void executeAddFilesFromTable(ConnectorSession session, IcebergTableExecu
22402250 icebergScanExecutor );
22412251 }
22422252
2243- private void scanAndDeleteInvalidFiles (Table table , ConnectorSession session , SchemaTableName schemaTableName , Instant expiration , Set <String > validFiles , Map <String , String > fileIoProperties )
2253+ private ScanAndDeleteResult scanAndDeleteInvalidFiles (Table table , ConnectorSession session , SchemaTableName schemaTableName , Instant expiration , Set <String > validFiles , Map <String , String > fileIoProperties )
22442254 {
22452255 List <Future <?>> deleteFutures = new ArrayList <>();
2256+ long scannedFilesCount = 0 ;
2257+ long deletedFilesCount = 0 ;
22462258 try {
22472259 List <Location > filesToDelete = new ArrayList <>(DELETE_BATCH_SIZE );
22482260 TrinoFileSystem fileSystem = fileSystemFactory .create (session .getIdentity (), fileIoProperties );
22492261 FileIterator allFiles = fileSystem .listFiles (Location .of (table .location ()));
22502262 while (allFiles .hasNext ()) {
22512263 FileEntry entry = allFiles .next ();
2264+ scannedFilesCount ++;
22522265 if (entry .lastModified ().isBefore (expiration ) && !validFiles .contains (entry .location ().fileName ())) {
22532266 filesToDelete .add (entry .location ());
2267+ deletedFilesCount ++;
22542268 if (filesToDelete .size () >= DELETE_BATCH_SIZE ) {
22552269 List <Location > finalFilesToDelete = filesToDelete ;
22562270 deleteFutures .add (icebergFileDeleteExecutor .submit (() -> deleteFiles (finalFilesToDelete , schemaTableName , fileSystem )));
@@ -2277,8 +2291,11 @@ private void scanAndDeleteInvalidFiles(Table table, ConnectorSession session, Sc
22772291 // Ensure any futures still running are canceled in case of failure
22782292 deleteFutures .forEach (future -> future .cancel (true ));
22792293 }
2294+ return new ScanAndDeleteResult (scannedFilesCount , deletedFilesCount );
22802295 }
22812296
2297+ private record ScanAndDeleteResult (long scannedFilesCount , long deletedFilesCount ) {}
2298+
22822299 private void deleteFiles (List <Location > files , SchemaTableName schemaTableName , TrinoFileSystem fileSystem )
22832300 {
22842301 log .debug ("Deleting files while removing orphan files for table %s [%s]" , schemaTableName , files );
0 commit comments