diff --git a/addons/path-mapped/common/src/main/java/org/commonjava/indy/pathmapped/cache/PathMappedMavenGACache.java b/addons/path-mapped/common/src/main/java/org/commonjava/indy/pathmapped/cache/PathMappedMavenGACache.java index 434ff86ef7..6348906c3d 100644 --- a/addons/path-mapped/common/src/main/java/org/commonjava/indy/pathmapped/cache/PathMappedMavenGACache.java +++ b/addons/path-mapped/common/src/main/java/org/commonjava/indy/pathmapped/cache/PathMappedMavenGACache.java @@ -18,8 +18,10 @@ import com.datastax.driver.core.BoundStatement; import com.datastax.driver.core.PreparedStatement; import com.datastax.driver.core.ResultSet; +import com.datastax.driver.core.ResultSetFuture; import com.datastax.driver.core.Row; import com.datastax.driver.core.Session; +import com.datastax.driver.core.exceptions.NoHostAvailableException; import com.google.common.collect.Lists; import org.commonjava.indy.action.IndyLifecycleException; import org.commonjava.indy.action.StartupAction; @@ -302,7 +304,7 @@ private void update( String ga, Set set ) BoundStatement bound = preparedStoresIncrement.bind(); bound.setSet( 0, set ); bound.setString( 1, ga ); - session.execute( bound ); + executeSession( bound ); inMemoryCache.remove( ga ); // clear to force reloading } @@ -313,11 +315,11 @@ public void reduce( String ga, Set set, boolean isAsync ) bound.setString( 1, ga ); if ( isAsync ) { - session.executeAsync( bound ); + executeSession ( bound, true, ResultSetFuture.class ); } else { - session.execute( bound ); + executeSession( bound ); } inMemoryCache.remove( ga ); // clear to force reloading } @@ -393,7 +395,7 @@ public Set getStoresContaining( String gaPath ) } // query db BoundStatement bound = preparedQueryByGA.bind( gaPath ); - ResultSet result = session.execute( bound ); + ResultSet result = executeSession( bound ); Row row = result.one(); if ( row != null ) { @@ -426,4 +428,41 @@ public boolean matchAny( List concreteStores ) } return false; } + + private ResultSet executeSession ( BoundStatement bind ) + { + return executeSession ( bind, false, ResultSet.class ); + } + + private T executeSession ( BoundStatement bind, boolean isAsync, Class type ) + { + boolean exception = false; + T trackingRecord = null; + try + { + if ( session == null || session.isClosed() ) + { + cassandraClient.close(); + cassandraClient.init(); + this.init(); + } + trackingRecord = type.cast( isAsync ? session.executeAsync( bind ) : session.execute( bind ) ); + } + catch ( NoHostAvailableException e ) + { + exception = true; + logger.error( "Cannot connect to host, reconnect once more with new session.", e ); + } + finally + { + if ( exception ) + { + cassandraClient.close(); + cassandraClient.init(); + this.init(); + trackingRecord = type.cast( isAsync ? session.executeAsync( bind ) : session.execute( bind ) ); + } + } + return trackingRecord; + } } diff --git a/addons/schedule/common/src/main/java/org/commonjava/indy/schedule/ScheduleDB.java b/addons/schedule/common/src/main/java/org/commonjava/indy/schedule/ScheduleDB.java index 471a4f28a8..5988b07437 100644 --- a/addons/schedule/common/src/main/java/org/commonjava/indy/schedule/ScheduleDB.java +++ b/addons/schedule/common/src/main/java/org/commonjava/indy/schedule/ScheduleDB.java @@ -20,6 +20,7 @@ import com.datastax.driver.core.ResultSet; import com.datastax.driver.core.Row; import com.datastax.driver.core.Session; +import com.datastax.driver.core.exceptions.NoHostAvailableException; import com.datastax.driver.mapping.Mapper; import com.datastax.driver.mapping.MappingManager; import org.commonjava.indy.conf.IndyConfiguration; @@ -224,7 +225,7 @@ private Date calculateExpirationTime( Date scheduleTime, Long timeout) public DtxSchedule querySchedule( String storeKey, String jobName ) { BoundStatement bound = preparedSingleScheduleQuery.bind( storeKey, jobName ); - ResultSet resultSet = session.execute( bound ); + ResultSet resultSet = executeSession( bound ); Row row = resultSet.one(); @@ -240,7 +241,7 @@ public Collection queryExpirations( Date date ) Collection expirations = new ArrayList<>( ); BoundStatement bound = preparedExpiredQuery.bind( pid ); - ResultSet resultSet = session.execute( bound ); + ResultSet resultSet = executeSession( bound ); resultSet.forEach( row -> { expirations.add( toDtxExpiration( row ) ); } ); @@ -260,7 +261,7 @@ public void queryAndSetExpiredSchedule( Date date ) .equals( expiration.getScheduleUID() ) ) { BoundStatement boundU = preparedExpiredUpdate.bind( schedule.getStoreKey(), schedule.getJobName() ); - session.execute( boundU ); + executeSession( boundU ); logger.debug( "Expired entry: {}", schedule ); eventDispatcher.fire( new ScheduleTriggerEvent( schedule.getJobType(), schedule.getPayload() ) ); @@ -273,7 +274,7 @@ public Collection querySchedulesByJobType( String jobType ) { Collection schedules = new ArrayList<>( ); BoundStatement bound = preparedScheduleByTypeQuery.bind( jobType ); - ResultSet resultSet = session.execute( bound ); + ResultSet resultSet = executeSession( bound ); resultSet.forEach( row -> { schedules.add(toDtxSchedule(row)); } ); @@ -284,7 +285,7 @@ public Collection querySchedulesByStoreKey( String storeKey ) { Collection schedules = new ArrayList<>( ); BoundStatement bound = preparedScheduleByStoreKeyQuery.bind( storeKey ); - ResultSet resultSet = session.execute( bound ); + ResultSet resultSet = executeSession( bound ); resultSet.forEach( row -> { schedules.add(toDtxSchedule(row)); } ); @@ -295,7 +296,7 @@ public Collection querySchedules( String storeKey, String jobType, { Collection schedules = new ArrayList<>( ); BoundStatement bound = preparedScheduleByStoreKeyAndTypeQuery.bind( storeKey, jobType ); - ResultSet resultSet = session.execute( bound ); + ResultSet resultSet = executeSession( bound ); resultSet.forEach( row -> { DtxSchedule schedule = toDtxSchedule( row ); if ( !expired && !schedule.getExpired() ) @@ -342,4 +343,35 @@ private DtxExpiration toDtxExpiration( Row row ) return null; } + private ResultSet executeSession ( BoundStatement bind ) + { + boolean exception = false; + ResultSet trackingRecord = null; + try + { + if ( session == null || session.isClosed() ) + { + client.close(); + client.init(); + this.init(); + } + trackingRecord = session.execute( bind ); + } + catch ( NoHostAvailableException e ) + { + exception = true; + logger.error( "Cannot connect to host, reconnect once more with new session.", e ); + } + finally + { + if ( exception ) + { + client.close(); + client.init(); + this.init(); + trackingRecord = session.execute( bind ); + } + } + return trackingRecord; + } } diff --git a/core/src/main/java/org/commonjava/indy/core/inject/CassandraNotFoundCache.java b/core/src/main/java/org/commonjava/indy/core/inject/CassandraNotFoundCache.java index af9e542f1f..b2558df0f8 100644 --- a/core/src/main/java/org/commonjava/indy/core/inject/CassandraNotFoundCache.java +++ b/core/src/main/java/org/commonjava/indy/core/inject/CassandraNotFoundCache.java @@ -20,6 +20,7 @@ import com.datastax.driver.core.ResultSet; import com.datastax.driver.core.Row; import com.datastax.driver.core.Session; +import com.datastax.driver.core.exceptions.NoHostAvailableException; import org.commonjava.indy.conf.IndyConfiguration; import org.commonjava.o11yphant.metrics.annotation.Measure; import org.commonjava.indy.model.core.StoreKey; @@ -177,7 +178,7 @@ public void addMissing( final ConcreteResource resource ) BoundStatement bound = preparedInsert.bind( key.toString(), resource.getPath(), curDate, timeoutDate, timeoutInSeconds ); - session.execute( bound ); + executeSession( bound ); inMemoryCache.put( resource, DUMB_CACHE_VALUE, timeoutInSeconds, TimeUnit.SECONDS ); } @@ -191,7 +192,7 @@ public boolean isMissing( final ConcreteResource resource ) } StoreKey key = getResourceKey( resource ); BoundStatement bound = preparedExistQuery.bind( key.toString(), resource.getPath() ); - ResultSet result = session.execute( bound ); + ResultSet result = executeSession( bound ); Row row = result.one(); if ( row == null ) { @@ -217,7 +218,7 @@ public void clearMissing( final Location location ) { StoreKey key = ( (KeyedLocation) location ).getKey(); BoundStatement bound = preparedDeleteByStore.bind( key.toString() ); - session.execute( bound ); + executeSession( bound ); clearInMemoryCache( location ); } @@ -246,7 +247,7 @@ public void clearMissing( final ConcreteResource resource ) { StoreKey key = getResourceKey( resource ); BoundStatement bound = preparedDelete.bind( key.toString(), resource.getPath() ); - session.execute( bound ); + executeSession( bound ); inMemoryCache.remove( resource ); } @@ -272,7 +273,7 @@ public Set getMissing( final Location location ) logger.debug( "[NFC] getMissing for {}", location ); StoreKey key = ( (KeyedLocation) location ).getKey(); BoundStatement bound = preparedQueryByStore.bind( key.toString() ); - ResultSet result = session.execute( bound ); + ResultSet result = executeSession( bound ); int count = 0; Set matches = new HashSet<>(); for ( Row row : result ) @@ -315,7 +316,7 @@ public Set getMissing( Location location, int pageIndex, int pageSize ) public long getSize( StoreKey storeKey ) { BoundStatement bound = preparedCountByStore.bind( storeKey.toString() ); - ResultSet result = session.execute( bound ); + ResultSet result = executeSession( bound ); return result.one().get( 0, Long.class ); } @@ -331,4 +332,36 @@ private StoreKey getResourceKey( ConcreteResource resource ) KeyedLocation location = (KeyedLocation) resource.getLocation(); return location.getKey(); } + + private ResultSet executeSession ( BoundStatement bind ) + { + boolean exception = false; + ResultSet trackingRecord = null; + try + { + if ( session == null || session.isClosed() ) + { + cassandraClient.close(); + cassandraClient.init(); + this.start(); + } + trackingRecord = session.execute( bind ); + } + catch ( NoHostAvailableException e ) + { + exception = true; + logger.error( "Cannot connect to host, reconnect once more with new session.", e ); + } + finally + { + if ( exception ) + { + cassandraClient.close(); + cassandraClient.init(); + this.start(); + trackingRecord = session.execute( bind ); + } + } + return trackingRecord; + } } diff --git a/subsys/cassandra/src/main/java/org/commonjava/indy/subsys/cassandra/CassandraClient.java b/subsys/cassandra/src/main/java/org/commonjava/indy/subsys/cassandra/CassandraClient.java index 8401878313..89fae69efa 100644 --- a/subsys/cassandra/src/main/java/org/commonjava/indy/subsys/cassandra/CassandraClient.java +++ b/subsys/cassandra/src/main/java/org/commonjava/indy/subsys/cassandra/CassandraClient.java @@ -61,7 +61,7 @@ public CassandraClient( CassandraConfig config ) } @PostConstruct - private void init() + public void init() { if ( !config.isEnabled() ) { @@ -116,18 +116,15 @@ public Session getSession( String keyspace ) } ); } - private volatile boolean closed; - public void close() { - if ( !closed && cluster != null && sessions != null ) + if ( cluster != null && sessions != null ) { logger.info( "Close cassandra client" ); sessions.entrySet().forEach( e -> e.getValue().close() ); sessions.clear(); cluster.close(); cluster = null; - closed = true; } }