Skip to content

Commit

Permalink
Fix generation timeout executor shutdown handling
Browse files Browse the repository at this point in the history
  • Loading branch information
yma96 committed Jan 6, 2025
1 parent 0324103 commit 4cd7608
Showing 1 changed file with 56 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
import org.apache.http.impl.execchain.RequestAbortedException;
import org.commonjava.indy.service.archive.config.PreSeedConfig;
import org.commonjava.indy.service.archive.model.ArchiveStatus;
import org.commonjava.indy.service.archive.model.dto.HistoricalContentDTO;
Expand Down Expand Up @@ -124,20 +125,8 @@ public class ArchiveController
public void init()
throws IOException
{
int threads = preSeedConfig.threadMultiplier().orElse( 4 ) * Runtime.getRuntime().availableProcessors();
downloadExecutor = Executors.newFixedThreadPool( threads, ( final Runnable r ) -> {
final Thread t = new Thread( r );
t.setName( "Download-" + t.getName() );
t.setDaemon( true );
return t;
} );
generateExecutor = Executors.newFixedThreadPool( threads, ( final Runnable r ) -> {
final Thread t = new Thread( r );
t.setName( "Generate-" + t.getName() );
t.setDaemon( true );
return t;
} );

newGenerateExecutor();
newDownloadExecutor();
final PoolingHttpClientConnectionManager ccm = new PoolingHttpClientConnectionManager();
ccm.setMaxTotal( 500 );
RequestConfig rc = RequestConfig.custom().build();
Expand All @@ -163,13 +152,24 @@ public void generate( HistoricalContentDTO content )
{
if ( isInProgress( buildConfigId ) )
{
logger.info( "There is already generation process in progress for buildConfigId {}, this request will skip.",
buildConfigId );
logger.info(
"There is already generation process in progress for buildConfigId {}, this request will skip.",
buildConfigId );
// Conflicted generation, just return immediately
return;
}

recordInProgress( buildConfigId );
if ( generateExecutor.isShutdown() || generateExecutor.isTerminated() )
{
logger.info( "new generateExecutor" );
newGenerateExecutor();
}
if ( downloadExecutor.isShutdown() || downloadExecutor.isTerminated() )
{
logger.info( "new downloadExecutor" );
newDownloadExecutor();
}
CompletableFuture<Void> future = CompletableFuture.runAsync( () -> {
try
{
Expand All @@ -195,7 +195,13 @@ public void generate( HistoricalContentDTO content )
// If timeout happens on generation, cancel and remove the status to make sure following generation
removeStatus( buildConfigId );
cleanupBCWorkspace( buildConfigId );
logger.error( "Generation timeout for buildConfigId {}", buildConfigId );
logger.warn( "Generation timeout for buildConfigId {}, try to shut down the generation executor",
buildConfigId );
generateExecutor.shutdownNow();
downloadExecutor.shutdownNow();

buildConfigLocks.remove( buildConfigId );
logger.info( "<<<Lock released for buildConfigId {}", buildConfigId );
return null;
} );
}
Expand All @@ -218,17 +224,18 @@ protected Boolean doGenerate( HistoricalContentDTO content )
}
catch ( final InterruptedException e )
{
logger.error( "Artifacts downloading is interrupted, build config id: " + buildConfigId, e );
logger.error( "Artifacts downloading is interrupted, build config id: {}.", buildConfigId, e );
return false;
}
catch ( final ExecutionException e )
{
logger.error( "Artifacts download execution manager failed, build config id: " + buildConfigId, e );
logger.error( "Artifacts download execution manager failed, build config id: {}.", buildConfigId, e );
return false;
}
catch ( final IOException e )
{
logger.error( "Failed to generate historical archive from content, build config id: " + buildConfigId, e );
logger.error( "Failed to generate historical archive from content, build config id: {}.", buildConfigId,
e );
return false;
}

Expand Down Expand Up @@ -342,6 +349,28 @@ public boolean isInProgress( final String buildConfigId )
ArchiveStatus.inProgress.getArchiveStatus() );
}

private void newGenerateExecutor()
{
int threads = preSeedConfig.threadMultiplier().orElse( 4 ) * Runtime.getRuntime().availableProcessors();
generateExecutor = Executors.newFixedThreadPool( threads, ( final Runnable r ) -> {
final Thread t = new Thread( r );
t.setName( "Generate-" + t.getName() );
t.setDaemon( true );
return t;
} );
}

private void newDownloadExecutor()
{
int threads = preSeedConfig.threadMultiplier().orElse( 4 ) * Runtime.getRuntime().availableProcessors();
downloadExecutor = Executors.newFixedThreadPool( threads, ( final Runnable r ) -> {
final Thread t = new Thread( r );
t.setName( "Download-" + t.getName() );
t.setDaemon( true );
return t;
} );
}

private void downloadArtifacts( final Map<String, HistoricalEntryDTO> entryDTOs,
final Map<String, String> downloadPaths, final HistoricalContentDTO content )
throws InterruptedException, ExecutionException, IOException
Expand Down Expand Up @@ -497,7 +526,7 @@ private void fileTrackedContent( String contentBuildDir, final HistoricalContent
}
catch ( final IOException e )
{
logger.error( "Failed to file tracked content, path: " + tracked.getPath(), e );
logger.error( "Failed to file tracked content, path: {}", tracked.getPath(), e );
}
finally
{
Expand Down Expand Up @@ -623,9 +652,14 @@ else if ( statusCode == 404 )
return false;
}
}
catch ( final RequestAbortedException e )
{
logger.warn( "<<<Request aborted: Download failed for path: {}", path );
return false;
}
catch ( final Exception e )
{
logger.error( "Download failed for path: " + path, e );
logger.error( "Download failed for path: {}", path, e );
}
finally
{
Expand Down

0 comments on commit 4cd7608

Please sign in to comment.