Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix corrupted archive with checksum validation, synchronized lock and API enhancement #51

Merged
merged 8 commits into from
Dec 6, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,7 @@ public interface PreSeedConfig

@WithName( "not-used-days-cleanup" )
Optional<Long> notUsedDaysCleanup();

@WithName( "thread-multiplier" )
Optional<Integer> threadMultiplier();
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import jakarta.annotation.PreDestroy;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.commons.io.IOUtils;
import org.apache.http.client.CookieStore;
import org.apache.http.client.config.RequestConfig;
Expand All @@ -33,6 +34,7 @@
import org.commonjava.indy.service.archive.config.PreSeedConfig;
import org.commonjava.indy.service.archive.model.ArchiveStatus;
import org.commonjava.indy.service.archive.model.dto.HistoricalContentDTO;
import org.commonjava.indy.service.archive.model.dto.HistoricalEntryDTO;
import org.commonjava.indy.service.archive.util.HistoricalContentListReader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -47,11 +49,17 @@
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
Expand All @@ -64,21 +72,31 @@
@ApplicationScoped
public class ArchiveController
{
private final Logger logger = LoggerFactory.getLogger( getClass() );

public final static String EVENT_GENERATE_ARCHIVE = "generate-archive";

public final static String CONTENT_DIR = "/content";

public final static String ARCHIVE_DIR = "/archive";

private final Logger logger = LoggerFactory.getLogger( getClass() );

private final String ARCHIVE_SUFFIX = ".zip";

private final String PART_SUFFIX = ".part";

private final String PART_ARCHIVE_SUFFIX = PART_SUFFIX + ARCHIVE_SUFFIX;

private static final Set<String> CHECKSUMS = Collections.unmodifiableSet( new HashSet<String>()
{
{
add( ".sha1" );
add( ".sha256" );
add( ".md5" );
}
} );

private static final Map<String, Object> buildConfigLocks = new ConcurrentHashMap<>();

@Inject
HistoricalContentListReader reader;

Expand All @@ -88,7 +106,9 @@ public class ArchiveController
@Inject
ObjectMapper objectMapper;

private ExecutorService executorService;
private ExecutorService downloadExecutor;

private ExecutorService generateExecutor;

private CloseableHttpClient client;

Expand All @@ -102,17 +122,22 @@ public class ArchiveController
public void init()
throws IOException
{
int threads = 4 * Runtime.getRuntime().availableProcessors();
executorService = Executors.newFixedThreadPool( threads, ( final Runnable r ) -> {
int threads = preSeedConfig.threadMultiplier().orElse( 4 ) * Runtime.getRuntime().availableProcessors();
downloadExecutor = Executors.newFixedThreadPool( threads, ( final Runnable r ) -> {
final Thread t = new Thread( r );
t.setName( "Download-" + t.getName() );
t.setDaemon( true );
return t;
} );
generateExecutor = Executors.newFixedThreadPool( threads, ( final Runnable r ) -> {
final Thread t = new Thread( r );
t.setName( "Generate-" + t.getName() );
t.setDaemon( true );
return t;
} );

final PoolingHttpClientConnectionManager ccm = new PoolingHttpClientConnectionManager();
ccm.setMaxTotal( 500 );

RequestConfig rc = RequestConfig.custom().build();
client = HttpClients.custom().setConnectionManager( ccm ).setDefaultRequestConfig( rc ).build();

Expand All @@ -130,27 +155,57 @@ public void destroy()

public void generate( HistoricalContentDTO content )
{
int threads = 4 * Runtime.getRuntime().availableProcessors();
ExecutorService generateExecutor = Executors.newFixedThreadPool( threads, ( final Runnable r ) -> {
final Thread t = new Thread( r );
t.setName( "Generate-" + t.getName() );
t.setDaemon( true );
return t;
} );
generateExecutor.execute( () -> doGenerate( content ) );
String buildConfigId = content.getBuildConfigId();
Object lock = buildConfigLocks.computeIfAbsent( buildConfigId, k -> new Object() );
synchronized ( lock )
{
while ( isInProgress( buildConfigId ) )
{
logger.info( "There is already generation process in progress for buildConfigId {}, try lock wait.",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we just throw warning to user that there is already process in progress, please try later ? sorry if I miss some requirements. ; -)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sswguo You mean don't let the coming process wait, just warn to user and return? Not sure whether PNC care the response of archive generation since this should be at the end of build, if that's the point I will comment to confirm.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can confirm that which case will send two same archive requests in a short time.

Copy link
Member Author

@yma96 yma96 Dec 3, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sswguo probably large build with large archive file, since PNC UI doesn't allow concurrent builds with the same config ID, the previous build might be already done but generation with download was still in progress, then another build was in progress and used the same workspace to process download or generation, so we need to know if they want to handle the generation response themselves or just let archive go with the right thing, comment on JIRA and discuss on meeting.

buildConfigId );
try
{
lock.wait();
}
catch ( InterruptedException e )
{
Thread.currentThread().interrupt();
return;
}
}
recordInProgress( content.getBuildConfigId() );
generateExecutor.execute( () -> {
try
{
doGenerate( content );
}
finally
{
synchronized ( lock )
{
recordCompleted( content.getBuildConfigId() );
buildConfigLocks.remove( buildConfigId );
lock.notifyAll();
logger.info( "lock released, buildConfigId {}", buildConfigId );
}
}
} );
}
}

protected Boolean doGenerate( HistoricalContentDTO content )
{
logger.info( "Handle generate event: {}, build config id: {}", EVENT_GENERATE_ARCHIVE,
content.getBuildConfigId() );
recordInProgress( content.getBuildConfigId() );

Map<String, String> downloadPaths = reader.readPaths( content );
Map<String, HistoricalEntryDTO> entryDTOs = reader.readEntries( content );
Map<String, String> downloadPaths = new HashMap<>();
entryDTOs.forEach( ( key, value ) -> downloadPaths.put( key, value.getPath() ) );

Optional<File> archive;
try
{
downloadArtifacts( downloadPaths, content );
downloadArtifacts( entryDTOs, downloadPaths, content );
archive = generateArchive( new ArrayList<>( downloadPaths.values() ), content );
}
catch ( final InterruptedException e )
Expand All @@ -177,7 +232,6 @@ protected Boolean doGenerate( HistoricalContentDTO content )
created = renderArchive( archive.get(), content.getBuildConfigId() );
}

recordCompleted( content.getBuildConfigId() );
return created;
}

Expand All @@ -203,6 +257,36 @@ public void deleteArchive( final String buildConfigId )
logger.info( "Historical archive for build config id: {} is deleted.", buildConfigId );
}

public void deleteArchiveWithChecksum( final String buildConfigId, final String checksum )
throws IOException
{
logger.info( "Start to delete archive with checksum validation, buildConfigId {}, checksum {}", buildConfigId,
checksum );
File zip = new File( archiveDir, buildConfigId + ARCHIVE_SUFFIX );
if ( !zip.exists() )
{
return;
}

try (FileInputStream fis = new FileInputStream( zip ))
{
String storedChecksum = DigestUtils.sha256Hex( fis );

// only delete the zip once checksum is matched
if ( storedChecksum.equals( checksum ) )
{
zip.delete();
logger.info( "Historical archive for build config id: {} is deleted, checksum {}.", buildConfigId,
storedChecksum );
}
else
{
logger.info( "Don't delete the {} zip, transferred checksum {}, but stored checksum {}.",
buildConfigId, checksum, storedChecksum );
}
}
}

public void cleanup()
throws IOException
{
Expand All @@ -221,28 +305,46 @@ public boolean statusExists( final String buildConfigId )
return treated.containsKey( buildConfigId );
}

public String getStatus( String buildConfigId )
public String getStatus( final String buildConfigId )
{
return treated.get( buildConfigId );
}

private void downloadArtifacts( final Map<String, String> downloadPaths, final HistoricalContentDTO content )
public boolean isInProgress( final String buildConfigId )
{
return statusExists( buildConfigId ) && getStatus( buildConfigId ).equals(
ArchiveStatus.inProgress.getArchiveStatus() );
}

private void downloadArtifacts( final Map<String, HistoricalEntryDTO> entryDTOs,
final Map<String, String> downloadPaths, final HistoricalContentDTO content )
throws InterruptedException, ExecutionException, IOException
{
BasicCookieStore cookieStore = new BasicCookieStore();
ExecutorCompletionService<Boolean> executor = new ExecutorCompletionService<>( executorService );
ExecutorCompletionService<Boolean> executor = new ExecutorCompletionService<>( downloadExecutor );

String contentBuildDir = String.format( "%s/%s", contentDir, content.getBuildConfigId() );
File dir = new File( contentBuildDir );
dir.delete();

fileTrackedContent( contentBuildDir, content );
unpackHistoricalArchive( contentBuildDir, content.getBuildConfigId() );
HistoricalContentDTO originalTracked = unpackHistoricalArchive( contentBuildDir, content.getBuildConfigId() );
Map<String, List<String>> originalChecksumsMap = new HashMap<>();
if ( originalTracked != null )
{
logger.trace( "originalChecksumsMap generated for {}", content.getBuildConfigId() );
Map<String, HistoricalEntryDTO> originalEntries = reader.readEntries( originalTracked );
originalEntries.forEach( ( key, entry ) -> originalChecksumsMap.put( key, new ArrayList<>(
Arrays.asList( entry.getSha1(), entry.getSha256(), entry.getMd5() ) ) ) );
}

for ( String path : downloadPaths.keySet() )
{
String filePath = downloadPaths.get( path );
executor.submit( download( contentBuildDir, path, filePath, cookieStore ) );
HistoricalEntryDTO entry = entryDTOs.get( path );
List<String> checksums =
new ArrayList<>( Arrays.asList( entry.getSha1(), entry.getSha256(), entry.getMd5() ) );
List<String> originalChecksums = originalChecksumsMap.get( path );
executor.submit( download( contentBuildDir, path, filePath, checksums, originalChecksums, cookieStore ) );
}
int success = 0;
int failed = 0;
Expand All @@ -257,6 +359,8 @@ private void downloadArtifacts( final Map<String, String> downloadPaths, final H
failed++;
}
}
// file the latest tracked json at the end
fileTrackedContent( contentBuildDir, content );
logger.info( "Artifacts download completed, success:{}, failed:{}", success, failed );
}

Expand Down Expand Up @@ -369,21 +473,22 @@ private void fileTrackedContent( String contentBuildDir, final HistoricalContent
}
}

private void unpackHistoricalArchive( String contentBuildDir, String buildConfigId )
private HistoricalContentDTO unpackHistoricalArchive( String contentBuildDir, String buildConfigId )
throws IOException
{
final File archive = new File( archiveDir, buildConfigId + ARCHIVE_SUFFIX );
if ( !archive.exists() )
{
logger.debug( "Don't find historical archive for buildConfigId: {}.", buildConfigId );
return;
return null;
}

logger.info( "Start unpacking historical archive for buildConfigId: {}.", buildConfigId );
ZipInputStream inputStream = new ZipInputStream( new FileInputStream( archive ) );
ZipEntry entry;
while ( ( entry = inputStream.getNextEntry() ) != null )
{
logger.trace( "entry path:" + entry.getName() );
File outputFile = new File( contentBuildDir, entry.getName() );
outputFile.getParentFile().mkdirs();
try ( FileOutputStream outputStream = new FileOutputStream( outputFile ) )
Expand All @@ -393,16 +498,56 @@ private void unpackHistoricalArchive( String contentBuildDir, String buildConfig

}
inputStream.close();

File originalTracked = new File( contentBuildDir, buildConfigId );
if ( originalTracked.exists() )
{
return objectMapper.readValue( originalTracked, HistoricalContentDTO.class );
}
else
{
logger.debug( "No tracked json file found after zip unpack for buildConfigId {}", buildConfigId );
return null;
}
}

private Callable<Boolean> download( String contentBuildDir, final String path, final String filePath,
private boolean validateChecksum( final String filePath, final List<String> current, final List<String> original )
{
if ( CHECKSUMS.stream().anyMatch( suffix -> filePath.toLowerCase().endsWith( suffix ) ) )
{
// skip to validate checksum files
return false;
}
if ( original == null || original.isEmpty() || original.stream().allMatch( Objects::isNull ) )
{
return false;
}
// once sha1 is matched, skip downloading
if ( original.get( 0 ) != null && original.get( 0 ).equals( current.get( 0 ) ) )
{
return true;
}
// once sha256 is matched, skip downloading
if ( original.get( 1 ) != null && original.get( 1 ).equals( current.get( 1 ) ) )
{
return true;
}
// once md5 is matched, skip downloading
return original.get( 2 ) != null && original.get( 2 ).equals( current.get( 2 ) );
}

private Callable<Boolean> download( final String contentBuildDir, final String path, final String filePath,
final List<String> checksums, final List<String> originalChecksums,
final CookieStore cookieStore )
{
return () -> {
final File target = new File( contentBuildDir, filePath );
if ( target.exists() )

if ( target.exists() && validateChecksum( filePath, checksums, originalChecksums ) )
{
logger.trace( "<<<Already existed in historical archive, skip downloading, path: {}.", path );
logger.debug(
"<<<Already existed in historical archive, and checksum matches, skip downloading, path: {}.",
path );
return true;
}
final File dir = target.getParentFile();
Expand All @@ -413,6 +558,11 @@ private Callable<Boolean> download( String contentBuildDir, final String path, f
context.setCookieStore( cookieStore );
final HttpGet request = new HttpGet( path );
InputStream input = null;
if ( target.exists() )
{
// prevent the obsolete file still existed caused by http error
target.delete();
}
try
{
CloseableHttpResponse response = client.execute( request, context );
Expand Down Expand Up @@ -465,12 +615,7 @@ private void restoreGenerateStatusFromDisk() throws IOException
List<File> contents = walkAllFiles( archiveDir );
for ( File content : contents )
{
if ( content.getName().endsWith( PART_ARCHIVE_SUFFIX ) )
{
treated.put( content.getName().split( PART_ARCHIVE_SUFFIX )[0],
ArchiveStatus.inProgress.getArchiveStatus() );
}
else if ( content.getName().endsWith( ARCHIVE_SUFFIX ) )
if ( content.getName().endsWith( ARCHIVE_SUFFIX ) )
{
treated.put( content.getName().split( ARCHIVE_SUFFIX )[0], ArchiveStatus.completed.getArchiveStatus() );
}
Expand Down
Loading
Loading