Skip to content

Commit

Permalink
Change response for synchronized generation and add timeout error han…
Browse files Browse the repository at this point in the history
…dling
  • Loading branch information
yma96 committed Dec 4, 2024
1 parent dac7f70 commit c6a9cd9
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,9 @@
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import java.util.zip.ZipEntry;
import java.util.zip.ZipInputStream;
Expand Down Expand Up @@ -153,44 +156,59 @@ public void destroy()
IOUtils.closeQuietly( client, null );
}

public void generate( HistoricalContentDTO content )
public boolean generate( HistoricalContentDTO content )
{
String buildConfigId = content.getBuildConfigId();
Object lock = buildConfigLocks.computeIfAbsent( buildConfigId, k -> new Object() );
synchronized ( lock )
{
while ( isInProgress( buildConfigId ) )
if ( isInProgress( buildConfigId ) )
{
logger.info( "There is already generation process in progress for buildConfigId {}, try lock wait.",
buildConfigId );
try
{
lock.wait();
}
catch ( InterruptedException e )
{
Thread.currentThread().interrupt();
return;
}
// Conflicted generation, just return with expected http response immediately
return false;
}
recordInProgress( content.getBuildConfigId() );
generateExecutor.execute( () -> {

recordInProgress( buildConfigId );
Future<?> future = generateExecutor.submit( () -> {
try
{
doGenerate( content );
}
catch ( Exception e )
{
// If error happens on generation, remove the status to make sure coming generation
removeStatus( buildConfigId );
logger.error( "Generation failed for buildConfigId {}", buildConfigId, e );
}
finally
{
synchronized ( lock )
{
recordCompleted( content.getBuildConfigId() );
buildConfigLocks.remove( buildConfigId );
lock.notifyAll();
logger.info( "lock released, buildConfigId {}", buildConfigId );
}
recordCompleted( buildConfigId );
buildConfigLocks.remove( buildConfigId );
logger.info( "lock released, buildConfigId {}", buildConfigId );
}
} );
try
{
future.get( 60, TimeUnit.MINUTES );
}
catch ( TimeoutException e )
{
// If timeout happens on generation, cancel and remove the status to make sure coming generation
future.cancel( true );
removeStatus( buildConfigId );
logger.error( "Generation timeout for buildConfigId {}", buildConfigId, e );
}
catch ( InterruptedException | ExecutionException e )
{
// If future task level error happens on generation, cancel and remove the status to make sure coming generation
future.cancel( true );
removeStatus( buildConfigId );
logger.error( "Generation future task level failed for buildConfigId {}", buildConfigId, e );
}
}
return true;
}

protected Boolean doGenerate( HistoricalContentDTO content )
Expand Down Expand Up @@ -622,15 +640,20 @@ private void restoreGenerateStatusFromDisk() throws IOException
}
}

private void recordInProgress( String buildConfigId )
private void recordInProgress( final String buildConfigId )
{
treated.remove( buildConfigId );
treated.put( buildConfigId, ArchiveStatus.inProgress.getArchiveStatus() );
}

private void recordCompleted( String buildConfigId )
private void recordCompleted( final String buildConfigId )
{
treated.remove( buildConfigId );
treated.put( buildConfigId, ArchiveStatus.completed.getArchiveStatus() );
}

private void removeStatus( final String buildConfigId )
{
treated.remove( buildConfigId );
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@

import static jakarta.ws.rs.core.MediaType.APPLICATION_JSON;
import static jakarta.ws.rs.core.MediaType.APPLICATION_OCTET_STREAM;
import static jakarta.ws.rs.core.Response.Status.CONFLICT;
import static jakarta.ws.rs.core.Response.Status.NOT_FOUND;
import static jakarta.ws.rs.core.Response.accepted;
import static jakarta.ws.rs.core.Response.noContent;
Expand All @@ -74,6 +75,7 @@ public class ArchiveManageResources

@Operation( description = "Generate archive based on tracked content" )
@APIResponse( responseCode = "202", description = "The archive created request is accepted" )
@APIResponse( responseCode = "409", description = "The archive created request is conflicted" )
@RequestBody( description = "The tracked content definition JSON", name = "body", required = true,
content = @Content( mediaType = APPLICATION_JSON,
example = "{" + "\"buildConfigId\": \"XXX\"," + "\"downloads\":" + "[{"
Expand Down Expand Up @@ -104,11 +106,17 @@ public Uni<Response> create( final @Context UriInfo uriInfo, final InputStream b
return fromResponse( message );
}

controller.generate( content );
boolean accepted = controller.generate( content );
return Uni.createFrom()
.item( accepted().type( MediaType.TEXT_PLAIN )
.entity( "Archive created request is accepted." )
.build() );
.item( accepted ?
accepted().type( MediaType.TEXT_PLAIN )
.entity( "Archive created request is accepted." )
.build() :
Response.status( CONFLICT )
.type( MediaType.TEXT_PLAIN )
.entity(
"Another generation with same build config ID was already in progress, request is conflicted." )
.build() );
}

@Operation( description = "Get the status of generating archive based on build config Id" )
Expand Down

0 comments on commit c6a9cd9

Please sign in to comment.