@@ -1989,8 +1989,7 @@ public Map<String, Long> executeTableExecute(ConnectorSession session, Connector
1989
1989
executeExpireSnapshots (session , executeHandle );
1990
1990
return ImmutableMap .of ();
1991
1991
case REMOVE_ORPHAN_FILES :
1992
- executeRemoveOrphanFiles (session , executeHandle );
1993
- return ImmutableMap .of ();
1992
+ return executeRemoveOrphanFiles (session , executeHandle );
1994
1993
case ADD_FILES :
1995
1994
executeAddFiles (session , executeHandle );
1996
1995
return ImmutableMap .of ();
@@ -2118,7 +2117,7 @@ private static void validateTableExecuteParameters(
2118
2117
sessionMinRetentionParameterName );
2119
2118
}
2120
2119
2121
- public void executeRemoveOrphanFiles (ConnectorSession session , IcebergTableExecuteHandle executeHandle )
2120
+ public Map < String , Long > executeRemoveOrphanFiles (ConnectorSession session , IcebergTableExecuteHandle executeHandle )
2122
2121
{
2123
2122
IcebergRemoveOrphanFilesHandle removeOrphanFilesHandle = (IcebergRemoveOrphanFilesHandle ) executeHandle .procedureHandle ();
2124
2123
@@ -2135,14 +2134,14 @@ public void executeRemoveOrphanFiles(ConnectorSession session, IcebergTableExecu
2135
2134
2136
2135
if (table .currentSnapshot () == null ) {
2137
2136
log .debug ("Skipping remove_orphan_files procedure for empty table %s" , table );
2138
- return ;
2137
+ return ImmutableMap . of () ;
2139
2138
}
2140
2139
2141
2140
Instant expiration = session .getStart ().minusMillis (retention .toMillis ());
2142
- removeOrphanFiles (table , session , executeHandle .schemaTableName (), expiration , executeHandle .fileIoProperties ());
2141
+ return removeOrphanFiles (table , session , executeHandle .schemaTableName (), expiration , executeHandle .fileIoProperties ());
2143
2142
}
2144
2143
2145
- private void removeOrphanFiles (Table table , ConnectorSession session , SchemaTableName schemaTableName , Instant expiration , Map <String , String > fileIoProperties )
2144
+ private Map < String , Long > removeOrphanFiles (Table table , ConnectorSession session , SchemaTableName schemaTableName , Instant expiration , Map <String , String > fileIoProperties )
2146
2145
{
2147
2146
Set <String > processedManifestFilePaths = new HashSet <>();
2148
2147
// Similarly to issues like https://github.com/trinodb/trino/issues/13759, equivalent paths may have different String
@@ -2205,7 +2204,18 @@ private void removeOrphanFiles(Table table, ConnectorSession session, SchemaTabl
2205
2204
// Ensure any futures still running are canceled in case of failure
2206
2205
manifestScanFutures .forEach (future -> future .cancel (true ));
2207
2206
}
2208
- scanAndDeleteInvalidFiles (table , session , schemaTableName , expiration , validFileNames , fileIoProperties );
2207
+ ScanAndDeleteResult result = scanAndDeleteInvalidFiles (table , session , schemaTableName , expiration , validFileNames , fileIoProperties );
2208
+ log .info ("remove_orphan_files for table %s completed. Processed %d manifest files, found %d valid files, scanned %d files, deleted %d files" ,
2209
+ schemaTableName ,
2210
+ processedManifestFilePaths .size (),
2211
+ validFileNames .size () - 1 , // excluding version-hint.text
2212
+ result .scannedFilesCount (),
2213
+ result .deletedFilesCount ());
2214
+ return ImmutableMap .of (
2215
+ "processed_manifests_count" , (long ) processedManifestFilePaths .size (),
2216
+ "valid_files_count" , (long ) validFileNames .size () - 1 , // excluding version-hint.text
2217
+ "scanned_files_count" , result .scannedFilesCount (),
2218
+ "deleted_files_count" , result .deletedFilesCount ());
2209
2219
}
2210
2220
2211
2221
public void executeAddFiles (ConnectorSession session , IcebergTableExecuteHandle executeHandle )
@@ -2240,17 +2250,21 @@ public void executeAddFilesFromTable(ConnectorSession session, IcebergTableExecu
2240
2250
icebergScanExecutor );
2241
2251
}
2242
2252
2243
- private void scanAndDeleteInvalidFiles (Table table , ConnectorSession session , SchemaTableName schemaTableName , Instant expiration , Set <String > validFiles , Map <String , String > fileIoProperties )
2253
+ private ScanAndDeleteResult scanAndDeleteInvalidFiles (Table table , ConnectorSession session , SchemaTableName schemaTableName , Instant expiration , Set <String > validFiles , Map <String , String > fileIoProperties )
2244
2254
{
2245
2255
List <Future <?>> deleteFutures = new ArrayList <>();
2256
+ long deletedFilesCount = 0 ;
2257
+ long scannedFilesCount = 0 ;
2246
2258
try {
2247
2259
List <Location > filesToDelete = new ArrayList <>(DELETE_BATCH_SIZE );
2248
2260
TrinoFileSystem fileSystem = fileSystemFactory .create (session .getIdentity (), fileIoProperties );
2249
2261
FileIterator allFiles = fileSystem .listFiles (Location .of (table .location ()));
2250
2262
while (allFiles .hasNext ()) {
2251
2263
FileEntry entry = allFiles .next ();
2264
+ scannedFilesCount ++;
2252
2265
if (entry .lastModified ().isBefore (expiration ) && !validFiles .contains (entry .location ().fileName ())) {
2253
2266
filesToDelete .add (entry .location ());
2267
+ deletedFilesCount ++;
2254
2268
if (filesToDelete .size () >= DELETE_BATCH_SIZE ) {
2255
2269
List <Location > finalFilesToDelete = filesToDelete ;
2256
2270
deleteFutures .add (icebergFileDeleteExecutor .submit (() -> deleteFiles (finalFilesToDelete , schemaTableName , fileSystem )));
@@ -2277,8 +2291,11 @@ private void scanAndDeleteInvalidFiles(Table table, ConnectorSession session, Sc
2277
2291
// Ensure any futures still running are canceled in case of failure
2278
2292
deleteFutures .forEach (future -> future .cancel (true ));
2279
2293
}
2294
+ return new ScanAndDeleteResult (deletedFilesCount , scannedFilesCount );
2280
2295
}
2281
2296
2297
+ private record ScanAndDeleteResult (long deletedFilesCount , long scannedFilesCount ) {}
2298
+
2282
2299
private void deleteFiles (List <Location > files , SchemaTableName schemaTableName , TrinoFileSystem fileSystem )
2283
2300
{
2284
2301
log .debug ("Deleting files while removing orphan files for table %s [%s]" , schemaTableName , files );
0 commit comments