Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix generation timeout executor shutdown handling #56

Merged
merged 1 commit into from
Jan 6, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
import org.apache.http.impl.execchain.RequestAbortedException;
import org.commonjava.indy.service.archive.config.PreSeedConfig;
import org.commonjava.indy.service.archive.model.ArchiveStatus;
import org.commonjava.indy.service.archive.model.dto.HistoricalContentDTO;
Expand Down Expand Up @@ -124,20 +125,8 @@ public class ArchiveController
public void init()
throws IOException
{
int threads = preSeedConfig.threadMultiplier().orElse( 4 ) * Runtime.getRuntime().availableProcessors();
downloadExecutor = Executors.newFixedThreadPool( threads, ( final Runnable r ) -> {
final Thread t = new Thread( r );
t.setName( "Download-" + t.getName() );
t.setDaemon( true );
return t;
} );
generateExecutor = Executors.newFixedThreadPool( threads, ( final Runnable r ) -> {
final Thread t = new Thread( r );
t.setName( "Generate-" + t.getName() );
t.setDaemon( true );
return t;
} );

newGenerateExecutor();
newDownloadExecutor();
final PoolingHttpClientConnectionManager ccm = new PoolingHttpClientConnectionManager();
ccm.setMaxTotal( 500 );
RequestConfig rc = RequestConfig.custom().build();
Expand All @@ -163,13 +152,24 @@ public void generate( HistoricalContentDTO content )
{
if ( isInProgress( buildConfigId ) )
{
logger.info( "There is already generation process in progress for buildConfigId {}, this request will skip.",
buildConfigId );
logger.info(
"There is already generation process in progress for buildConfigId {}, this request will skip.",
buildConfigId );
// Conflicted generation, just return immediately
return;
}

recordInProgress( buildConfigId );
if ( generateExecutor.isShutdown() || generateExecutor.isTerminated() )
{
logger.info( "new generateExecutor" );
newGenerateExecutor();
}
if ( downloadExecutor.isShutdown() || downloadExecutor.isTerminated() )
{
logger.info( "new downloadExecutor" );
newDownloadExecutor();
}
CompletableFuture<Void> future = CompletableFuture.runAsync( () -> {
try
{
Expand All @@ -195,7 +195,13 @@ public void generate( HistoricalContentDTO content )
// If timeout happens on generation, cancel and remove the status to make sure following generation
removeStatus( buildConfigId );
cleanupBCWorkspace( buildConfigId );
logger.error( "Generation timeout for buildConfigId {}", buildConfigId );
logger.warn( "Generation timeout for buildConfigId {}, try to shut down the generation executor",
buildConfigId );
generateExecutor.shutdownNow();
downloadExecutor.shutdownNow();

buildConfigLocks.remove( buildConfigId );
logger.info( "<<<Lock released for buildConfigId {}", buildConfigId );
return null;
} );
}
Expand All @@ -218,17 +224,18 @@ protected Boolean doGenerate( HistoricalContentDTO content )
}
catch ( final InterruptedException e )
{
logger.error( "Artifacts downloading is interrupted, build config id: " + buildConfigId, e );
logger.error( "Artifacts downloading is interrupted, build config id: {}.", buildConfigId, e );
return false;
}
catch ( final ExecutionException e )
{
logger.error( "Artifacts download execution manager failed, build config id: " + buildConfigId, e );
logger.error( "Artifacts download execution manager failed, build config id: {}.", buildConfigId, e );
return false;
}
catch ( final IOException e )
{
logger.error( "Failed to generate historical archive from content, build config id: " + buildConfigId, e );
logger.error( "Failed to generate historical archive from content, build config id: {}.", buildConfigId,
e );
return false;
}

Expand Down Expand Up @@ -342,6 +349,28 @@ public boolean isInProgress( final String buildConfigId )
ArchiveStatus.inProgress.getArchiveStatus() );
}

private void newGenerateExecutor()
{
int threads = preSeedConfig.threadMultiplier().orElse( 4 ) * Runtime.getRuntime().availableProcessors();
generateExecutor = Executors.newFixedThreadPool( threads, ( final Runnable r ) -> {
final Thread t = new Thread( r );
t.setName( "Generate-" + t.getName() );
t.setDaemon( true );
return t;
} );
}

private void newDownloadExecutor()
{
int threads = preSeedConfig.threadMultiplier().orElse( 4 ) * Runtime.getRuntime().availableProcessors();
downloadExecutor = Executors.newFixedThreadPool( threads, ( final Runnable r ) -> {
final Thread t = new Thread( r );
t.setName( "Download-" + t.getName() );
t.setDaemon( true );
return t;
} );
}

private void downloadArtifacts( final Map<String, HistoricalEntryDTO> entryDTOs,
final Map<String, String> downloadPaths, final HistoricalContentDTO content )
throws InterruptedException, ExecutionException, IOException
Expand Down Expand Up @@ -497,7 +526,7 @@ private void fileTrackedContent( String contentBuildDir, final HistoricalContent
}
catch ( final IOException e )
{
logger.error( "Failed to file tracked content, path: " + tracked.getPath(), e );
logger.error( "Failed to file tracked content, path: {}", tracked.getPath(), e );
}
finally
{
Expand Down Expand Up @@ -623,9 +652,14 @@ else if ( statusCode == 404 )
return false;
}
}
catch ( final RequestAbortedException e )
{
logger.warn( "<<<Request aborted: Download failed for path: {}", path );
return false;
}
catch ( final Exception e )
{
logger.error( "Download failed for path: " + path, e );
logger.error( "Download failed for path: {}", path, e );
}
finally
{
Expand Down
Loading