Skip to content

Commit

Permalink
Merge pull request #119 from yma96/master
Browse files Browse the repository at this point in the history
Reconnect Cassandra client and reinit session when NoHostAvailableException
  • Loading branch information
yma96 authored Oct 24, 2024
2 parents 6a63cec + 6ed2ca5 commit 7048b13
Showing 1 changed file with 46 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.Row;
import com.datastax.driver.core.Session;
import com.datastax.driver.core.exceptions.NoHostAvailableException;
import com.datastax.driver.core.policies.ConstantReconnectionPolicy;
import com.datastax.driver.mapping.Mapper;
import com.datastax.driver.mapping.MappingManager;
Expand Down Expand Up @@ -205,6 +206,7 @@ public void close()
{
if ( cluster != null ) // close only if the session and cluster were built by self
{
logger.info( "Close cassandra client" );
asyncJobExecutor.shutdownAndWaitTermination();
session.close();
cluster.close();
Expand All @@ -229,7 +231,7 @@ public Set<String> getFileSystemContaining( Collection<String> candidates, Strin
String filename = PathMapUtils.getFilename( path );

BoundStatement bound = preparedContainingQuery.bind( candidates, parentPath, filename );
ResultSet result = session.execute( bound );
ResultSet result = executeSession( bound );
return result.all().stream().map( row -> row.get( 0, String.class ) ).collect( Collectors.toSet() );
}

Expand Down Expand Up @@ -287,7 +289,7 @@ public List<PathMap> list( String fileSystem, String path, boolean recursive, in
private Result<DtxPathMap> boundAndRunListQuery( String fileSystem, String parentPath )
{
BoundStatement bound = preparedListQuery.bind( fileSystem, parentPath );
ResultSet result = session.execute( bound );
ResultSet result = executeSession( bound );
return pathMapMapper.map( result );
}

Expand Down Expand Up @@ -435,7 +437,7 @@ public FileType exists( String fileSystem, String path )
{
bound = preparedExistQuery.bind( fileSystem, parentPath, Arrays.asList( filename, filename + "/" ) );
}
ResultSet result = session.execute( bound );
ResultSet result = executeSession( bound );
FileType ret = getFileTypeOrNull( result );
if ( ret != null )
{
Expand All @@ -455,7 +457,7 @@ public boolean existsFile( String fileSystem, String path )
String filename = PathMapUtils.getFilename( path );

BoundStatement bound = preparedExistFileQuery.bind( fileSystem, parentPath, filename );
ResultSet result = session.execute( bound );
ResultSet result = executeSession( bound );
Row row = result.one();
boolean exists = false;
if ( row != null )
Expand Down Expand Up @@ -590,7 +592,7 @@ public boolean isDirectory( String fileSystem, String path )
String filename = PathMapUtils.getFilename( path );

BoundStatement bound = preparedExistQuery.bind( fileSystem, parentPath, Arrays.asList( filename ) );
ResultSet result = session.execute( bound );
ResultSet result = executeSession( bound );
return notNull( result );
}

Expand All @@ -605,7 +607,7 @@ public boolean isFile( String fileSystem, String path )
String filename = PathMapUtils.getFilename( path );

BoundStatement bound = preparedExistQuery.bind( fileSystem, parentPath, Arrays.asList( filename ) );
ResultSet result = session.execute( bound );
ResultSet result = executeSession( bound );
return notNull( result );
}

Expand Down Expand Up @@ -685,7 +687,7 @@ private boolean isEmptyDirectory( String fileSystem, String path )
{
path = PathMapUtils.normalizeParentPath( path );
BoundStatement bound = preparedListCheckEmpty.bind( fileSystem, path );
ResultSet result = session.execute( bound );
ResultSet result = executeSession( bound );
Row row = result.one();
boolean empty = false;
if ( row != null )
Expand All @@ -705,7 +707,7 @@ private void deleteFromReverseMap( String fileId, String path )
reduction.add( path );
bound.setSet( 0, reduction );
bound.setString( 1, fileId );
session.execute( bound );
executeSession( bound );
}

private void addToReverseMap( String fileId, String path )
Expand All @@ -716,7 +718,7 @@ private void addToReverseMap( String fileId, String path )
increment.add( path );
bound.setSet( 0, increment );
bound.setString( 1, fileId );
session.execute( bound );
executeSession( bound );
}

private void updateFilesystemIncrease(String filesystem, long count, long size)
Expand All @@ -726,7 +728,7 @@ private void updateFilesystemIncrease(String filesystem, long count, long size)
bound.setLong( 0, count );
bound.setLong( 1, size );
bound.setString( 2, filesystem );
session.execute( bound );
executeSession( bound );
}

private void updateFilesystemDecrease(String filesystem, long count, long size)
Expand All @@ -736,7 +738,7 @@ private void updateFilesystemDecrease(String filesystem, long count, long size)
bound.setLong( 0, count );
bound.setLong( 1, size );
bound.setString( 2, filesystem );
session.execute( bound );
executeSession( bound );
}

private void reclaim( String fileId, String fileStorage, String checksum )
Expand Down Expand Up @@ -833,7 +835,7 @@ public void expire(String fileSystem, String path, Date expiration)
bound.setString( 1, fileSystem );
bound.setString( 2, parentPath );
bound.setString( 3, filename );
session.execute( bound );
executeSession( bound );
}

@Override
Expand All @@ -854,7 +856,7 @@ public void makeDirs( String fileSystem, String path )
String filename = PathMapUtils.getFilename( path );

BoundStatement bound = preparedExistQuery.bind( fileSystem, parentPath, Arrays.asList( filename ) );
ResultSet result = session.execute( bound );
ResultSet result = executeSession( bound );
if ( notNull( result ) )
{
logger.debug( "Dir already exists, fileSystem: {}, path: {}", fileSystem, path );
Expand Down Expand Up @@ -930,7 +932,7 @@ public Filesystem getFilesystem(String filesystem)
@Override
public List<? extends Filesystem> getFilesystems()
{
ResultSet result = session.execute( preparedFilesystemList.bind() );
ResultSet result = executeSession( preparedFilesystemList.bind() );
return filesystemMapper.map(result).all();
}

Expand Down Expand Up @@ -961,4 +963,34 @@ public Set<String> getPathsByFileId( String fileId )
}
return emptySet();
}

private ResultSet executeSession ( BoundStatement bind )
{
boolean exception = false;
ResultSet trackingRecord = null;
try
{
if ( session == null || session.isClosed() )
{
close();
new CassandraPathDB( config );
}
trackingRecord = session.execute( bind );
}
catch ( NoHostAvailableException e )
{
exception = true;
logger.error( "Cannot connect to host, reconnect once more with new session.", e );
}
finally
{
if ( exception )
{
close();
new CassandraPathDB( config );
trackingRecord = session.execute( bind );
}
}
return trackingRecord;
}
}

0 comments on commit 7048b13

Please sign in to comment.