Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reconnect Cassandra client and reinit session when NoHostAvailableException #119

Merged
merged 1 commit into from
Oct 24, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.Row;
import com.datastax.driver.core.Session;
import com.datastax.driver.core.exceptions.NoHostAvailableException;
import com.datastax.driver.core.policies.ConstantReconnectionPolicy;
import com.datastax.driver.mapping.Mapper;
import com.datastax.driver.mapping.MappingManager;
Expand Down Expand Up @@ -205,6 +206,7 @@ public void close()
{
if ( cluster != null ) // close only if the session and cluster were built by self
{
logger.info( "Close cassandra client" );
asyncJobExecutor.shutdownAndWaitTermination();
session.close();
cluster.close();
Expand All @@ -229,7 +231,7 @@ public Set<String> getFileSystemContaining( Collection<String> candidates, Strin
String filename = PathMapUtils.getFilename( path );

BoundStatement bound = preparedContainingQuery.bind( candidates, parentPath, filename );
ResultSet result = session.execute( bound );
ResultSet result = executeSession( bound );
return result.all().stream().map( row -> row.get( 0, String.class ) ).collect( Collectors.toSet() );
}

Expand Down Expand Up @@ -287,7 +289,7 @@ public List<PathMap> list( String fileSystem, String path, boolean recursive, in
private Result<DtxPathMap> boundAndRunListQuery( String fileSystem, String parentPath )
{
BoundStatement bound = preparedListQuery.bind( fileSystem, parentPath );
ResultSet result = session.execute( bound );
ResultSet result = executeSession( bound );
return pathMapMapper.map( result );
}

Expand Down Expand Up @@ -435,7 +437,7 @@ public FileType exists( String fileSystem, String path )
{
bound = preparedExistQuery.bind( fileSystem, parentPath, Arrays.asList( filename, filename + "/" ) );
}
ResultSet result = session.execute( bound );
ResultSet result = executeSession( bound );
FileType ret = getFileTypeOrNull( result );
if ( ret != null )
{
Expand All @@ -455,7 +457,7 @@ public boolean existsFile( String fileSystem, String path )
String filename = PathMapUtils.getFilename( path );

BoundStatement bound = preparedExistFileQuery.bind( fileSystem, parentPath, filename );
ResultSet result = session.execute( bound );
ResultSet result = executeSession( bound );
Row row = result.one();
boolean exists = false;
if ( row != null )
Expand Down Expand Up @@ -590,7 +592,7 @@ public boolean isDirectory( String fileSystem, String path )
String filename = PathMapUtils.getFilename( path );

BoundStatement bound = preparedExistQuery.bind( fileSystem, parentPath, Arrays.asList( filename ) );
ResultSet result = session.execute( bound );
ResultSet result = executeSession( bound );
return notNull( result );
}

Expand All @@ -605,7 +607,7 @@ public boolean isFile( String fileSystem, String path )
String filename = PathMapUtils.getFilename( path );

BoundStatement bound = preparedExistQuery.bind( fileSystem, parentPath, Arrays.asList( filename ) );
ResultSet result = session.execute( bound );
ResultSet result = executeSession( bound );
return notNull( result );
}

Expand Down Expand Up @@ -685,7 +687,7 @@ private boolean isEmptyDirectory( String fileSystem, String path )
{
path = PathMapUtils.normalizeParentPath( path );
BoundStatement bound = preparedListCheckEmpty.bind( fileSystem, path );
ResultSet result = session.execute( bound );
ResultSet result = executeSession( bound );
Row row = result.one();
boolean empty = false;
if ( row != null )
Expand All @@ -705,7 +707,7 @@ private void deleteFromReverseMap( String fileId, String path )
reduction.add( path );
bound.setSet( 0, reduction );
bound.setString( 1, fileId );
session.execute( bound );
executeSession( bound );
}

private void addToReverseMap( String fileId, String path )
Expand All @@ -716,7 +718,7 @@ private void addToReverseMap( String fileId, String path )
increment.add( path );
bound.setSet( 0, increment );
bound.setString( 1, fileId );
session.execute( bound );
executeSession( bound );
}

private void updateFilesystemIncrease(String filesystem, long count, long size)
Expand All @@ -726,7 +728,7 @@ private void updateFilesystemIncrease(String filesystem, long count, long size)
bound.setLong( 0, count );
bound.setLong( 1, size );
bound.setString( 2, filesystem );
session.execute( bound );
executeSession( bound );
}

private void updateFilesystemDecrease(String filesystem, long count, long size)
Expand All @@ -736,7 +738,7 @@ private void updateFilesystemDecrease(String filesystem, long count, long size)
bound.setLong( 0, count );
bound.setLong( 1, size );
bound.setString( 2, filesystem );
session.execute( bound );
executeSession( bound );
}

private void reclaim( String fileId, String fileStorage, String checksum )
Expand Down Expand Up @@ -833,7 +835,7 @@ public void expire(String fileSystem, String path, Date expiration)
bound.setString( 1, fileSystem );
bound.setString( 2, parentPath );
bound.setString( 3, filename );
session.execute( bound );
executeSession( bound );
}

@Override
Expand All @@ -854,7 +856,7 @@ public void makeDirs( String fileSystem, String path )
String filename = PathMapUtils.getFilename( path );

BoundStatement bound = preparedExistQuery.bind( fileSystem, parentPath, Arrays.asList( filename ) );
ResultSet result = session.execute( bound );
ResultSet result = executeSession( bound );
if ( notNull( result ) )
{
logger.debug( "Dir already exists, fileSystem: {}, path: {}", fileSystem, path );
Expand Down Expand Up @@ -930,7 +932,7 @@ public Filesystem getFilesystem(String filesystem)
@Override
public List<? extends Filesystem> getFilesystems()
{
ResultSet result = session.execute( preparedFilesystemList.bind() );
ResultSet result = executeSession( preparedFilesystemList.bind() );
return filesystemMapper.map(result).all();
}

Expand Down Expand Up @@ -961,4 +963,34 @@ public Set<String> getPathsByFileId( String fileId )
}
return emptySet();
}

private ResultSet executeSession ( BoundStatement bind )
{
boolean exception = false;
ResultSet trackingRecord = null;
try
{
if ( session == null || session.isClosed() )
{
close();
new CassandraPathDB( config );
}
trackingRecord = session.execute( bind );
}
catch ( NoHostAvailableException e )
{
exception = true;
logger.error( "Cannot connect to host, reconnect once more with new session.", e );
}
finally
{
if ( exception )
{
close();
new CassandraPathDB( config );
trackingRecord = session.execute( bind );
}
}
return trackingRecord;
}
}
Loading