Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix corrupted archive with checksum validation, synchronized lock and API enhancement #51

Merged
merged 8 commits into from
Dec 6, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.commonjava.indy.service.archive.config.PreSeedConfig;
import org.commonjava.indy.service.archive.model.ArchiveStatus;
import org.commonjava.indy.service.archive.model.dto.HistoricalContentDTO;
import org.commonjava.indy.service.archive.model.dto.HistoricalEntryDTO;
import org.commonjava.indy.service.archive.util.HistoricalContentListReader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -43,15 +44,25 @@
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
Expand Down Expand Up @@ -79,6 +90,35 @@ public class ArchiveController

private final String PART_ARCHIVE_SUFFIX = PART_SUFFIX + ARCHIVE_SUFFIX;

private static final int threads = 4 * Runtime.getRuntime().availableProcessors();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe we can make it configurable.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sswguo sure, done in new commit.


private final ExecutorService generateExecutor =
Executors.newFixedThreadPool( threads, ( final Runnable r ) -> {
final Thread t = new Thread( r );
t.setName( "Generate-" + t.getName() );
t.setDaemon( true );
return t;
} );

private static final Set<String> CHECKSUMS = Collections.unmodifiableSet( new HashSet<String>()
{
{
add( ".md5" );
add( ".sha1" );
add( ".sha256" );
}
} );

private static final Map<String, Object> buildConfigLocks = new ConcurrentHashMap<>();

private static final String SHA_256 = "SHA-256";

private static final Long BOLCK_SIZE = 100 * 1024 * 1024L;

private static final String HEX_DIGITS = "0123456789abcdef";

private static final char[] HEX_ARRAY = HEX_DIGITS.toCharArray();

@Inject
HistoricalContentListReader reader;

Expand Down Expand Up @@ -130,27 +170,57 @@ public void destroy()

public void generate( HistoricalContentDTO content )
{
int threads = 4 * Runtime.getRuntime().availableProcessors();
ExecutorService generateExecutor = Executors.newFixedThreadPool( threads, ( final Runnable r ) -> {
final Thread t = new Thread( r );
t.setName( "Generate-" + t.getName() );
t.setDaemon( true );
return t;
} );
generateExecutor.execute( () -> doGenerate( content ) );
String buildConfigId = content.getBuildConfigId();
Object lock = buildConfigLocks.computeIfAbsent( buildConfigId, k -> new Object() );
synchronized ( lock )
{
while ( isInProgress( buildConfigId ) )
{
logger.info( "There is already generation process in progress for buildConfigId {}, try lock wait.",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we just throw warning to user that there is already process in progress, please try later ? sorry if I miss some requirements. ; -)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sswguo You mean don't let the coming process wait, just warn to user and return? Not sure whether PNC care the response of archive generation since this should be at the end of build, if that's the point I will comment to confirm.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can confirm that which case will send two same archive requests in a short time.

Copy link
Member Author

@yma96 yma96 Dec 3, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sswguo probably large build with large archive file, since PNC UI doesn't allow concurrent builds with the same config ID, the previous build might be already done but generation with download was still in progress, then another build was in progress and used the same workspace to process download or generation, so we need to know if they want to handle the generation response themselves or just let archive go with the right thing, comment on JIRA and discuss on meeting.

buildConfigId );
try
{
lock.wait();
}
catch ( InterruptedException e )
{
Thread.currentThread().interrupt();
return;
}
}
recordInProgress( content.getBuildConfigId() );
generateExecutor.execute( () -> {
try
{
doGenerate( content );
}
finally
{
synchronized ( lock )
{
recordCompleted( content.getBuildConfigId() );
buildConfigLocks.remove( buildConfigId );
lock.notifyAll();
logger.info( "lock released, buildConfigId {}", buildConfigId );
}
}
} );
}
}

protected Boolean doGenerate( HistoricalContentDTO content )
{
logger.info( "Handle generate event: {}, build config id: {}", EVENT_GENERATE_ARCHIVE,
content.getBuildConfigId() );
recordInProgress( content.getBuildConfigId() );

Map<String, String> downloadPaths = reader.readPaths( content );
Map<String, HistoricalEntryDTO> entryDTOs = reader.readEntries( content );
Map<String, String> downloadPaths = new HashMap<>();
entryDTOs.forEach( ( key, value ) -> downloadPaths.put( key, value.getPath() ) );

Optional<File> archive;
try
{
downloadArtifacts( downloadPaths, content );
downloadArtifacts( entryDTOs, downloadPaths, content );
archive = generateArchive( new ArrayList<>( downloadPaths.values() ), content );
}
catch ( final InterruptedException e )
Expand All @@ -177,7 +247,6 @@ protected Boolean doGenerate( HistoricalContentDTO content )
created = renderArchive( archive.get(), content.getBuildConfigId() );
}

recordCompleted( content.getBuildConfigId() );
return created;
}

Expand All @@ -203,6 +272,64 @@ public void deleteArchive( final String buildConfigId )
logger.info( "Historical archive for build config id: {} is deleted.", buildConfigId );
}

public void deleteArchiveWithChecksum( final String buildConfigId, final String checksum )
throws IOException
{
logger.info( "Start to delete archive with checksum validation, buildConfigId {}, checksum {}", buildConfigId,
checksum );
File zip = new File( archiveDir, buildConfigId + ARCHIVE_SUFFIX );
if ( !zip.exists() )
{
return;
}

try (FileChannel channel = new FileInputStream( zip ).getChannel())
{
MessageDigest digest = MessageDigest.getInstance( SHA_256 );
long position = 0;
long size = channel.size();

while ( position < size )
{
long remaining = size - position;
long currentBlock = Math.min( remaining, BOLCK_SIZE );
MappedByteBuffer buffer = channel.map( FileChannel.MapMode.READ_ONLY, position, currentBlock );
digest.update( buffer );
position += currentBlock;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any 3rd lib for this? like e.g.: commons-codec .

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sswguo done in new commit.

}

String stored = bytesToHex( digest.digest() );
// only delete the zip once checksum is equaled
if ( stored.equals( checksum ) )
{
zip.delete();
logger.info( "Historical archive for build config id: {} is deleted, checksum {}.", buildConfigId,
stored );
}
else
{
logger.info( "Don't delete the {} zip, transferred checksum {}, but stored checksum {}.", buildConfigId,
checksum, stored );
}
}
catch ( NoSuchAlgorithmException e )
{
logger.error( "No such algorithm SHA-256 Exception", e );
}
}

private String bytesToHex( byte[] hash )
{
char[] hexChars = new char[hash.length * 2];
for ( int i = 0; i < hash.length; i++ )
{
int v = hash[i] & 0xFF;
hexChars[i * 2] = HEX_ARRAY[v >>> 4];
hexChars[i * 2 + 1] = HEX_ARRAY[v & 0x0F];
}
return new String( hexChars );
}

public void cleanup()
throws IOException
{
Expand All @@ -221,12 +348,19 @@ public boolean statusExists( final String buildConfigId )
return treated.containsKey( buildConfigId );
}

public String getStatus( String buildConfigId )
public String getStatus( final String buildConfigId )
{
return treated.get( buildConfigId );
}

private void downloadArtifacts( final Map<String, String> downloadPaths, final HistoricalContentDTO content )
public boolean isInProgress( final String buildConfigId )
{
return statusExists( buildConfigId ) && getStatus( buildConfigId ).equals(
ArchiveStatus.inProgress.getArchiveStatus() );
}

private void downloadArtifacts( final Map<String, HistoricalEntryDTO> entryDTOs,
final Map<String, String> downloadPaths, final HistoricalContentDTO content )
throws InterruptedException, ExecutionException, IOException
{
BasicCookieStore cookieStore = new BasicCookieStore();
Expand All @@ -236,13 +370,23 @@ private void downloadArtifacts( final Map<String, String> downloadPaths, final H
File dir = new File( contentBuildDir );
dir.delete();

fileTrackedContent( contentBuildDir, content );
unpackHistoricalArchive( contentBuildDir, content.getBuildConfigId() );
HistoricalContentDTO originalTracked = unpackHistoricalArchive( contentBuildDir, content.getBuildConfigId() );
Map<String, List<String>> originalChecksumsMap = new HashMap<>();
if ( originalTracked != null )
{
Map<String, HistoricalEntryDTO> originalEntries = reader.readEntries( originalTracked );
originalEntries.forEach( ( key, entry ) -> originalChecksumsMap.put( key, new ArrayList<>(
Arrays.asList( entry.getSha1(), entry.getSha256(), entry.getMd5() ) ) ) );
}

for ( String path : downloadPaths.keySet() )
{
String filePath = downloadPaths.get( path );
executor.submit( download( contentBuildDir, path, filePath, cookieStore ) );
HistoricalEntryDTO entry = entryDTOs.get( path );
List<String> checksums =
new ArrayList<>( Arrays.asList( entry.getSha1(), entry.getSha256(), entry.getMd5() ) );
List<String> originalChecksums = originalChecksumsMap.get( path );
executor.submit( download( contentBuildDir, path, filePath, checksums, originalChecksums, cookieStore ) );
}
int success = 0;
int failed = 0;
Expand All @@ -257,6 +401,8 @@ private void downloadArtifacts( final Map<String, String> downloadPaths, final H
failed++;
}
}
// file the latest tracked json at the end
fileTrackedContent( contentBuildDir, content );
logger.info( "Artifacts download completed, success:{}, failed:{}", success, failed );
}

Expand Down Expand Up @@ -369,14 +515,14 @@ private void fileTrackedContent( String contentBuildDir, final HistoricalContent
}
}

private void unpackHistoricalArchive( String contentBuildDir, String buildConfigId )
private HistoricalContentDTO unpackHistoricalArchive( String contentBuildDir, String buildConfigId )
throws IOException
{
final File archive = new File( archiveDir, buildConfigId + ARCHIVE_SUFFIX );
if ( !archive.exists() )
{
logger.debug( "Don't find historical archive for buildConfigId: {}.", buildConfigId );
return;
return null;
}

logger.info( "Start unpacking historical archive for buildConfigId: {}.", buildConfigId );
Expand All @@ -393,16 +539,53 @@ private void unpackHistoricalArchive( String contentBuildDir, String buildConfig

}
inputStream.close();

File originalTracked = new File( contentBuildDir, buildConfigId );
if ( originalTracked.exists() )
{
return objectMapper.readValue( originalTracked, HistoricalContentDTO.class );
}
else
{
logger.debug( "No tracked json file found after zip unpack for buildConfigId {}", buildConfigId );
return null;
}
}

private boolean validateChecksum( final String filePath, final List<String> current, final List<String> original )
{
if ( CHECKSUMS.stream().anyMatch( suffix -> filePath.toLowerCase().endsWith( "." + suffix ) ) )
{
// skip to validate checksum files
return false;
}
if ( original == null || original.isEmpty() || original.stream().allMatch( Objects::isNull ) )
{
return false;
}
if ( original.get( 0 ) != null && original.get( 0 ).equals( current.get( 0 ) ) )
{
return true;
}
if ( original.get( 1 ) != null && original.get( 1 ).equals( current.get( 1 ) ) )
{
return true;
}
return original.get( 2 ) != null && original.get( 2 ).equals( current.get( 2 ) );
}

private Callable<Boolean> download( String contentBuildDir, final String path, final String filePath,
private Callable<Boolean> download( final String contentBuildDir, final String path, final String filePath,
final List<String> checksums, final List<String> originalChecksums,
final CookieStore cookieStore )
{
return () -> {
final File target = new File( contentBuildDir, filePath );
if ( target.exists() )

if ( target.exists() && validateChecksum( filePath, checksums, originalChecksums ) )
{
logger.trace( "<<<Already existed in historical archive, skip downloading, path: {}.", path );
logger.debug(
"<<<Already existed in historical archive, and checksum equals, skip downloading, path: {}.",
path );
return true;
}
final File dir = target.getParentFile();
Expand Down Expand Up @@ -465,12 +648,7 @@ private void restoreGenerateStatusFromDisk() throws IOException
List<File> contents = walkAllFiles( archiveDir );
for ( File content : contents )
{
if ( content.getName().endsWith( PART_ARCHIVE_SUFFIX ) )
{
treated.put( content.getName().split( PART_ARCHIVE_SUFFIX )[0],
ArchiveStatus.inProgress.getArchiveStatus() );
}
else if ( content.getName().endsWith( ARCHIVE_SUFFIX ) )
if ( content.getName().endsWith( ARCHIVE_SUFFIX ) )
{
treated.put( content.getName().split( ARCHIVE_SUFFIX )[0], ArchiveStatus.completed.getArchiveStatus() );
}
Expand Down
Loading
Loading