From 6ed2ca5d827fd384d5916573ed6ec88d8bdb26fd Mon Sep 17 00:00:00 2001 From: yma Date: Wed, 23 Oct 2024 19:54:07 +0800 Subject: [PATCH] Reconnect Cassandra client and reinit session when NoHostAvailableException --- .../pathdb/datastax/CassandraPathDB.java | 60 ++++++++++++++----- 1 file changed, 46 insertions(+), 14 deletions(-) diff --git a/pathdb/datastax/src/main/java/org/commonjava/storage/pathmapped/pathdb/datastax/CassandraPathDB.java b/pathdb/datastax/src/main/java/org/commonjava/storage/pathmapped/pathdb/datastax/CassandraPathDB.java index 4784039..f2b70ce 100644 --- a/pathdb/datastax/src/main/java/org/commonjava/storage/pathmapped/pathdb/datastax/CassandraPathDB.java +++ b/pathdb/datastax/src/main/java/org/commonjava/storage/pathmapped/pathdb/datastax/CassandraPathDB.java @@ -21,6 +21,7 @@ import com.datastax.driver.core.ResultSet; import com.datastax.driver.core.Row; import com.datastax.driver.core.Session; +import com.datastax.driver.core.exceptions.NoHostAvailableException; import com.datastax.driver.core.policies.ConstantReconnectionPolicy; import com.datastax.driver.mapping.Mapper; import com.datastax.driver.mapping.MappingManager; @@ -205,6 +206,7 @@ public void close() { if ( cluster != null ) // close only if the session and cluster were built by self { + logger.info( "Close cassandra client" ); asyncJobExecutor.shutdownAndWaitTermination(); session.close(); cluster.close(); @@ -229,7 +231,7 @@ public Set getFileSystemContaining( Collection candidates, Strin String filename = PathMapUtils.getFilename( path ); BoundStatement bound = preparedContainingQuery.bind( candidates, parentPath, filename ); - ResultSet result = session.execute( bound ); + ResultSet result = executeSession( bound ); return result.all().stream().map( row -> row.get( 0, String.class ) ).collect( Collectors.toSet() ); } @@ -287,7 +289,7 @@ public List list( String fileSystem, String path, boolean recursive, in private Result boundAndRunListQuery( String fileSystem, String parentPath ) { BoundStatement bound = preparedListQuery.bind( fileSystem, parentPath ); - ResultSet result = session.execute( bound ); + ResultSet result = executeSession( bound ); return pathMapMapper.map( result ); } @@ -435,7 +437,7 @@ public FileType exists( String fileSystem, String path ) { bound = preparedExistQuery.bind( fileSystem, parentPath, Arrays.asList( filename, filename + "/" ) ); } - ResultSet result = session.execute( bound ); + ResultSet result = executeSession( bound ); FileType ret = getFileTypeOrNull( result ); if ( ret != null ) { @@ -455,7 +457,7 @@ public boolean existsFile( String fileSystem, String path ) String filename = PathMapUtils.getFilename( path ); BoundStatement bound = preparedExistFileQuery.bind( fileSystem, parentPath, filename ); - ResultSet result = session.execute( bound ); + ResultSet result = executeSession( bound ); Row row = result.one(); boolean exists = false; if ( row != null ) @@ -590,7 +592,7 @@ public boolean isDirectory( String fileSystem, String path ) String filename = PathMapUtils.getFilename( path ); BoundStatement bound = preparedExistQuery.bind( fileSystem, parentPath, Arrays.asList( filename ) ); - ResultSet result = session.execute( bound ); + ResultSet result = executeSession( bound ); return notNull( result ); } @@ -605,7 +607,7 @@ public boolean isFile( String fileSystem, String path ) String filename = PathMapUtils.getFilename( path ); BoundStatement bound = preparedExistQuery.bind( fileSystem, parentPath, Arrays.asList( filename ) ); - ResultSet result = session.execute( bound ); + ResultSet result = executeSession( bound ); return notNull( result ); } @@ -685,7 +687,7 @@ private boolean isEmptyDirectory( String fileSystem, String path ) { path = PathMapUtils.normalizeParentPath( path ); BoundStatement bound = preparedListCheckEmpty.bind( fileSystem, path ); - ResultSet result = session.execute( bound ); + ResultSet result = executeSession( bound ); Row row = result.one(); boolean empty = false; if ( row != null ) @@ -705,7 +707,7 @@ private void deleteFromReverseMap( String fileId, String path ) reduction.add( path ); bound.setSet( 0, reduction ); bound.setString( 1, fileId ); - session.execute( bound ); + executeSession( bound ); } private void addToReverseMap( String fileId, String path ) @@ -716,7 +718,7 @@ private void addToReverseMap( String fileId, String path ) increment.add( path ); bound.setSet( 0, increment ); bound.setString( 1, fileId ); - session.execute( bound ); + executeSession( bound ); } private void updateFilesystemIncrease(String filesystem, long count, long size) @@ -726,7 +728,7 @@ private void updateFilesystemIncrease(String filesystem, long count, long size) bound.setLong( 0, count ); bound.setLong( 1, size ); bound.setString( 2, filesystem ); - session.execute( bound ); + executeSession( bound ); } private void updateFilesystemDecrease(String filesystem, long count, long size) @@ -736,7 +738,7 @@ private void updateFilesystemDecrease(String filesystem, long count, long size) bound.setLong( 0, count ); bound.setLong( 1, size ); bound.setString( 2, filesystem ); - session.execute( bound ); + executeSession( bound ); } private void reclaim( String fileId, String fileStorage, String checksum ) @@ -833,7 +835,7 @@ public void expire(String fileSystem, String path, Date expiration) bound.setString( 1, fileSystem ); bound.setString( 2, parentPath ); bound.setString( 3, filename ); - session.execute( bound ); + executeSession( bound ); } @Override @@ -854,7 +856,7 @@ public void makeDirs( String fileSystem, String path ) String filename = PathMapUtils.getFilename( path ); BoundStatement bound = preparedExistQuery.bind( fileSystem, parentPath, Arrays.asList( filename ) ); - ResultSet result = session.execute( bound ); + ResultSet result = executeSession( bound ); if ( notNull( result ) ) { logger.debug( "Dir already exists, fileSystem: {}, path: {}", fileSystem, path ); @@ -930,7 +932,7 @@ public Filesystem getFilesystem(String filesystem) @Override public List getFilesystems() { - ResultSet result = session.execute( preparedFilesystemList.bind() ); + ResultSet result = executeSession( preparedFilesystemList.bind() ); return filesystemMapper.map(result).all(); } @@ -961,4 +963,34 @@ public Set getPathsByFileId( String fileId ) } return emptySet(); } + + private ResultSet executeSession ( BoundStatement bind ) + { + boolean exception = false; + ResultSet trackingRecord = null; + try + { + if ( session == null || session.isClosed() ) + { + close(); + new CassandraPathDB( config ); + } + trackingRecord = session.execute( bind ); + } + catch ( NoHostAvailableException e ) + { + exception = true; + logger.error( "Cannot connect to host, reconnect once more with new session.", e ); + } + finally + { + if ( exception ) + { + close(); + new CassandraPathDB( config ); + trackingRecord = session.execute( bind ); + } + } + return trackingRecord; + } }