From b680b4a4b987fae134062c1aef56ce529fd94fa9 Mon Sep 17 00:00:00 2001 From: Aaron <69273634+aaron-congo@users.noreply.github.com> Date: Wed, 18 Dec 2024 10:42:09 -0800 Subject: [PATCH] fix: avoid setting ignoreNewTopologyRequestsEndTimeNano on initial connection (#1221) --- .../amazon/jdbc/PluginServiceImpl.java | 3 +- .../ClusterTopologyMonitorImpl.java | 62 ++++++++++++---- .../ClusterAwareWriterFailoverHandler.java | 2 +- .../failover2/FailoverConnectionPlugin.java | 60 +++++++-------- ..._advanced_jdbc_wrapper_messages.properties | 31 +++++--- .../container/tests/AuroraFailoverTest.java | 73 +++++++++++++++++-- 6 files changed, 165 insertions(+), 66 deletions(-) diff --git a/wrapper/src/main/java/software/amazon/jdbc/PluginServiceImpl.java b/wrapper/src/main/java/software/amazon/jdbc/PluginServiceImpl.java index 4b2775124..326d020ce 100644 --- a/wrapper/src/main/java/software/amazon/jdbc/PluginServiceImpl.java +++ b/wrapper/src/main/java/software/amazon/jdbc/PluginServiceImpl.java @@ -535,7 +535,8 @@ public boolean forceRefreshHostList(final boolean shouldVerifyWriter, final long return true; } } catch (TimeoutException ex) { - // do nothing + // do nothing. + LOGGER.finest(Messages.get("PluginServiceImpl.forceRefreshTimeout", new Object[]{timeoutMs})); } return false; } diff --git a/wrapper/src/main/java/software/amazon/jdbc/hostlistprovider/monitoring/ClusterTopologyMonitorImpl.java b/wrapper/src/main/java/software/amazon/jdbc/hostlistprovider/monitoring/ClusterTopologyMonitorImpl.java index dcf95e241..34934e27c 100644 --- a/wrapper/src/main/java/software/amazon/jdbc/hostlistprovider/monitoring/ClusterTopologyMonitorImpl.java +++ b/wrapper/src/main/java/software/amazon/jdbc/hostlistprovider/monitoring/ClusterTopologyMonitorImpl.java @@ -97,7 +97,7 @@ public class ClusterTopologyMonitorImpl implements ClusterTopologyMonitor { protected long highRefreshRateEndTimeNano = 0; protected final Object topologyUpdated = new Object(); protected final AtomicBoolean requestToUpdateTopology = new AtomicBoolean(false); - protected final AtomicLong ignoreNewTopologyRequestsEndTimeNano = new AtomicLong(0); + protected final AtomicLong ignoreNewTopologyRequestsEndTimeNano = new AtomicLong(-1); protected final ConcurrentHashMap nodeThreads = new ConcurrentHashMap<>(); protected final AtomicBoolean nodeThreadsStop = new AtomicBoolean(false); protected final AtomicReference nodeThreadsWriterConnection = new AtomicReference<>(null); @@ -188,6 +188,8 @@ public List forceRefresh(final boolean shouldVerifyWriter, final long // Previous failover has just completed. We can use results of it without triggering a new topology update. List currentHosts = this.topologyMap.get(this.clusterId); + LOGGER.finest( + Utils.logTopology(currentHosts, Messages.get("ClusterTopologyMonitorImpl.ignoringTopologyRequest"))); if (currentHosts != null) { return currentHosts; } @@ -229,6 +231,7 @@ protected List waitTillTopologyGetsUpdated(final long timeoutMs) throw } if (timeoutMs == 0) { + LOGGER.finest(Utils.logTopology(currentHosts, Messages.get("ClusterTopologyMonitorImpl.timeoutSetToZero"))); return currentHosts; } @@ -240,6 +243,7 @@ protected List waitTillTopologyGetsUpdated(final long timeoutMs) throw this.topologyUpdated.wait(1000); } } catch (InterruptedException ex) { + LOGGER.fine(Messages.get("ClusterTopologyMonitorImpl.interrupted")); Thread.currentThread().interrupt(); return null; } @@ -282,6 +286,7 @@ public void run() { if (this.isInPanicMode()) { if (this.nodeThreads.isEmpty()) { + LOGGER.finest(Messages.get("ClusterTopologyMonitorImpl.startingNodeMonitoringThreads")); // start node threads this.nodeThreadsStop.set(false); @@ -309,19 +314,28 @@ public void run() { // otherwise let's try it again the next round } else { - // node threads are running // check if writer is already detected final Connection writerConnection = this.nodeThreadsWriterConnection.get(); final HostSpec writerConnectionHostSpec = this.nodeThreadsWriterHostSpec.get(); if (writerConnection != null && writerConnectionHostSpec != null) { + LOGGER.finest( + Messages.get( + "ClusterTopologyMonitorImpl.writerPickedUpFromNodeMonitors", + new Object[]{writerConnectionHostSpec})); this.closeConnection(this.monitoringConnection.get()); this.monitoringConnection.set(writerConnection); this.writerHostSpec.set(writerConnectionHostSpec); this.isVerifiedWriterConnection = true; this.highRefreshRateEndTimeNano = System.nanoTime() + highRefreshPeriodAfterPanicNano; - this.ignoreNewTopologyRequestsEndTimeNano.set(System.nanoTime() + ignoreTopologyRequestNano); + + // We verify the writer on initial connection and on failover, but we only want to ignore new topology + // requests after failover. To accomplish this, the first time we verify the writer we set the ignore end + // time to 0. Any future writer verifications will set it to a positive value. + if (!this.ignoreNewTopologyRequestsEndTimeNano.compareAndSet(-1, 0)) { + this.ignoreNewTopologyRequestsEndTimeNano.set(System.nanoTime() + ignoreTopologyRequestNano); + } this.nodeThreadsStop.set(true); for (Thread thread : this.nodeThreads.values()) { @@ -427,7 +441,7 @@ protected Thread getNodeMonitoringThread(final HostSpec hostSpec, final @Nullabl } protected List openAnyConnectionAndUpdateTopology() { - + boolean writerVerifiedByThisThread = false; if (this.monitoringConnection.get() == null) { Connection conn; @@ -448,14 +462,22 @@ protected List openAnyConnectionAndUpdateTopology() { try { if (!StringUtils.isNullOrEmpty(this.getWriterNodeId(this.monitoringConnection.get()))) { this.isVerifiedWriterConnection = true; + writerVerifiedByThisThread = true; + if (rdsHelper.isRdsInstance(this.initialHostSpec.getHost())) { this.writerHostSpec.set(this.initialHostSpec); - LOGGER.finest("writerHostSpec: " + this.writerHostSpec.get().getHost()); + LOGGER.finest( + Messages.get( + "ClusterTopologyMonitorImpl.writerMonitoringConnection", + new Object[]{this.writerHostSpec.get().getHost()})); } else { final String nodeId = this.getNodeId(this.monitoringConnection.get()); if (!StringUtils.isNullOrEmpty(nodeId)) { this.writerHostSpec.set(this.createHost(nodeId, true, 0, null)); - LOGGER.finest("writerHostSpec: " + this.writerHostSpec.get().getHost()); + LOGGER.finest( + Messages.get( + "ClusterTopologyMonitorImpl.writerMonitoringConnection", + new Object[]{this.writerHostSpec.get().getHost()})); } } } @@ -471,6 +493,14 @@ protected List openAnyConnectionAndUpdateTopology() { } final List hosts = this.fetchTopologyAndUpdateCache(this.monitoringConnection.get()); + if (writerVerifiedByThisThread) { + // We verify the writer on initial connection and on failover, but we only want to ignore new topology + // requests after failover. To accomplish this, the first time we verify the writer we set the ignore end + // time to 0. Any future writer verifications will set it to a positive value. + if (!this.ignoreNewTopologyRequestsEndTimeNano.compareAndSet(-1, 0)) { + this.ignoreNewTopologyRequestsEndTimeNano.set(System.nanoTime() + ignoreTopologyRequestNano); + } + } if (hosts == null) { // can't get topology; it might be something's wrong with a connection @@ -550,7 +580,7 @@ protected void delay(boolean useHighRefreshRate) throws InterruptedException { return hosts; } catch (SQLException ex) { // do nothing - LOGGER.log(Level.FINEST, "Error fetching topology:", ex); + LOGGER.finest(Messages.get("ClusterTopologyMonitorImpl.errorFetchingTopology", new Object[]{ex})); } return null; } @@ -760,7 +790,7 @@ public void run() { writerId = this.monitor.getWriterNodeId(connection); } catch (SQLSyntaxErrorException ex) { - LOGGER.severe(() -> Messages.get("ClusterTopologyMonitorImpl.invalidWriterQuery", + LOGGER.severe(() -> Messages.get("NodeMonitoringThread.invalidWriterQuery", new Object[] {ex.getMessage()})); throw new RuntimeException(ex); @@ -771,21 +801,21 @@ public void run() { if (!StringUtils.isNullOrEmpty(writerId)) { // this prevents closing connection in finally block - if (!this.monitor.nodeThreadsWriterConnection - .compareAndSet(null, connection)) { + if (!this.monitor.nodeThreadsWriterConnection.compareAndSet(null, connection)) { // writer connection is already setup this.monitor.closeConnection(connection); } else { // writer connection is successfully set to writerConnection + LOGGER.fine(Messages.get("NodeMonitoringThread.detectedWriter", new Object[]{writerId})); + // When nodeThreadsWriterConnection and nodeThreadsWriterHostSpec are both set, the topology monitor may + // set ignoreNewTopologyRequestsEndTimeNano, in which case other threads will use the cached topology + // for the ignore duration, so we need to update the topology before setting nodeThreadsWriterHostSpec. + this.monitor.fetchTopologyAndUpdateCache(connection); this.monitor.nodeThreadsWriterHostSpec.set(hostSpec); - LOGGER.fine("Detected writer: " + writerId); this.monitor.nodeThreadsStop.set(true); - - this.monitor.fetchTopologyAndUpdateCache(connection); LOGGER.fine(Utils.logTopology( this.monitor.topologyMap.get(this.monitor.clusterId))); - } // Setting the connection to null here prevents the finally block @@ -816,7 +846,7 @@ public void run() { } finally { this.monitor.closeConnection(connection); final long end = System.nanoTime(); - LOGGER.finest(() -> Messages.get("ClusterTopologyMonitorImpl.nodeThreadCompleted", + LOGGER.finest(() -> Messages.get("NodeMonitoringThread.threadCompleted", new Object[] {TimeUnit.NANOSECONDS.toMillis(end - start)})); } } @@ -853,7 +883,7 @@ private void readerThreadFetchTopology(final Connection connection, final @Nulla // writer node has changed this.writerChanged = true; - LOGGER.fine(() -> Messages.get("ClusterTopologyMonitorImpl.writerNodeChanged", + LOGGER.fine(() -> Messages.get("NodeMonitoringThread.writerNodeChanged", new Object[] {writerHostSpec.getHost(), latestWriterHostSpec.getHost()})); // we can update topology cache and notify all waiting threads diff --git a/wrapper/src/main/java/software/amazon/jdbc/plugin/failover/ClusterAwareWriterFailoverHandler.java b/wrapper/src/main/java/software/amazon/jdbc/plugin/failover/ClusterAwareWriterFailoverHandler.java index 825efe267..a942d9dbf 100644 --- a/wrapper/src/main/java/software/amazon/jdbc/plugin/failover/ClusterAwareWriterFailoverHandler.java +++ b/wrapper/src/main/java/software/amazon/jdbc/plugin/failover/ClusterAwareWriterFailoverHandler.java @@ -302,7 +302,7 @@ public WriterFailoverResult call() { private boolean isCurrentHostWriter(final List latestTopology) { final HostSpec latestWriter = getWriter(latestTopology); final Set latestWriterAllAliases = latestWriter.asAliases(); - final Set currentAliases = this.originalWriterHost.getAliases(); + final Set currentAliases = this.originalWriterHost.asAliases(); return (currentAliases != null) && (latestWriterAllAliases.stream().anyMatch(currentAliases::contains)); diff --git a/wrapper/src/main/java/software/amazon/jdbc/plugin/failover2/FailoverConnectionPlugin.java b/wrapper/src/main/java/software/amazon/jdbc/plugin/failover2/FailoverConnectionPlugin.java index f771fa355..fd11ea225 100644 --- a/wrapper/src/main/java/software/amazon/jdbc/plugin/failover2/FailoverConnectionPlugin.java +++ b/wrapper/src/main/java/software/amazon/jdbc/plugin/failover2/FailoverConnectionPlugin.java @@ -18,6 +18,7 @@ import java.sql.Connection; import java.sql.SQLException; +import java.util.ArrayList; import java.util.Collections; import java.util.HashSet; import java.util.List; @@ -25,7 +26,6 @@ import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.logging.Logger; -import java.util.stream.Collectors; import software.amazon.jdbc.AwsWrapperProperty; import software.amazon.jdbc.HostListProviderService; import software.amazon.jdbc.HostRole; @@ -218,13 +218,11 @@ public void initHostProvider( final JdbcCallable initHostProviderFunc) throws SQLException { initHostProvider( - initialUrl, hostListProviderService, initHostProviderFunc); } void initHostProvider( - final String initialUrl, final HostListProviderService hostListProviderService, final JdbcCallable initHostProviderFunc) throws SQLException { @@ -305,10 +303,9 @@ protected void dealWithIllegalStateException( * Initiates the failover procedure. This process tries to establish a new connection to an * instance in the topology. * - * @param failedHost The host with network errors. * @throws SQLException if an error occurs */ - protected void failover(final HostSpec failedHost) throws SQLException { + protected void failover() throws SQLException { if (this.failoverMode == FailoverMode.STRICT_WRITER) { failoverWriter(); @@ -369,7 +366,7 @@ protected void failoverReader() throws SQLException { try { readerCandidate = this.pluginService.getHostSpecByStrategy( - remainingHosts.stream().collect(Collectors.toList()), + new ArrayList<>(remainingHosts), HostRole.READER, this.failoverReaderHostSelectorStrategySetting); } catch (UnsupportedOperationException | SQLException ex) { @@ -469,54 +466,57 @@ protected void failoverWriter() throws SQLException { if (!this.pluginService.forceRefreshHostList(true, this.failoverTimeoutMsSetting)) { // "Unable to establish SQL connection to writer node" this.failoverWriterFailedCounter.inc(); - LOGGER.severe(Messages.get("Failover.unableToConnectToWriter")); - throw new FailoverFailedSQLException(Messages.get("Failover.unableToConnectToWriter")); + LOGGER.severe(Messages.get("Failover.unableToRefreshHostList")); + throw new FailoverFailedSQLException(Messages.get("Failover.unableToRefreshHostList")); } final List updatedHosts = this.pluginService.getAllHosts(); final Properties copyProp = PropertyUtils.copyProperties(this.properties); copyProp.setProperty(INTERNAL_CONNECT_PROPERTY_NAME, "true"); - Connection writerCandidateConn = null; + Connection writerCandidateConn; final HostSpec writerCandidate = updatedHosts.stream() .filter(x -> x.getRole() == HostRole.WRITER) .findFirst() .orElse(null); + if (writerCandidate == null) { + this.failoverWriterFailedCounter.inc(); + String message = Utils.logTopology(updatedHosts, Messages.get("Failover.noWriterHost")); + LOGGER.severe(message); + throw new FailoverFailedSQLException(message); + } + List allowedHosts = this.pluginService.getHosts(); - if (writerCandidate != null && !allowedHosts.contains(writerCandidate)) { + if (!allowedHosts.contains(writerCandidate)) { this.failoverWriterFailedCounter.inc(); + String topologyString = Utils.logTopology(allowedHosts, ""); LOGGER.severe(Messages.get("Failover.newWriterNotAllowed", - new Object[] {writerCandidate.getHost(), Utils.logTopology(allowedHosts, "")})); + new Object[] {writerCandidate.getHost(), topologyString})); throw new FailoverFailedSQLException( Messages.get("Failover.newWriterNotAllowed", - new Object[] {writerCandidate.getHost(), Utils.logTopology(allowedHosts, "")})); - } - - if (writerCandidate != null) { - try { - writerCandidateConn = this.pluginService.connect(writerCandidate, copyProp); - } catch (SQLException ex) { - // do nothing - } + new Object[] {writerCandidate.getHost(), topologyString})); } - if (writerCandidateConn == null) { - // "Unable to establish SQL connection to writer node" + try { + writerCandidateConn = this.pluginService.connect(writerCandidate, copyProp); + } catch (SQLException ex) { this.failoverWriterFailedCounter.inc(); - LOGGER.severe(Messages.get("Failover.unableToConnectToWriter")); - throw new FailoverFailedSQLException(Messages.get("Failover.unableToConnectToWriter")); + LOGGER.severe( + Messages.get("Failover.exceptionConnectingToWriter", new Object[]{writerCandidate.getHost(), ex})); + throw new FailoverFailedSQLException(Messages.get("Failover.exceptionConnectingToWriter")); } - if (this.pluginService.getHostRole(writerCandidateConn) != HostRole.WRITER) { + HostRole role = this.pluginService.getHostRole(writerCandidateConn); + if (role != HostRole.WRITER) { try { writerCandidateConn.close(); } catch (SQLException ex) { // do nothing } this.failoverWriterFailedCounter.inc(); - LOGGER.severe(Messages.get("Failover.unableToConnectToWriter")); - throw new FailoverFailedSQLException(Messages.get("Failover.unableToConnectToWriter")); + LOGGER.severe(Messages.get("Failover.unexpectedReaderRole", new Object[]{writerCandidate.getHost(), role})); + throw new FailoverFailedSQLException(Messages.get("Failover.unexpectedReaderRole")); } this.pluginService.setCurrentConnection(writerCandidateConn, writerCandidate); @@ -579,7 +579,7 @@ protected void pickNewConnection() throws SQLException { return; } - this.failover(this.pluginService.getCurrentHostSpec()); + this.failover(); } protected boolean shouldExceptionTriggerConnectionSwitch(final Throwable t) { @@ -674,7 +674,7 @@ public Connection connect( this.pluginService.setAvailability(hostSpec.asAliases(), HostAvailability.NOT_AVAILABLE); try { - this.failover(hostSpec); + this.failover(); } catch (FailoverSuccessSQLException failoverSuccessException) { conn = this.pluginService.getCurrentConnection(); } @@ -682,7 +682,7 @@ public Connection connect( } else { try { this.pluginService.refreshHostList(); - this.failover(hostSpec); + this.failover(); } catch (FailoverSuccessSQLException failoverSuccessException) { conn = this.pluginService.getCurrentConnection(); } diff --git a/wrapper/src/main/resources/aws_advanced_jdbc_wrapper_messages.properties b/wrapper/src/main/resources/aws_advanced_jdbc_wrapper_messages.properties index 392591da3..99eb6f93e 100644 --- a/wrapper/src/main/resources/aws_advanced_jdbc_wrapper_messages.properties +++ b/wrapper/src/main/resources/aws_advanced_jdbc_wrapper_messages.properties @@ -171,10 +171,13 @@ ExecutionTimeConnectionPlugin.executionTime=Executed {0} in {1} nanos. # Failover Connection Plugin Failover.transactionResolutionUnknownError=Transaction resolution unknown. Please re-configure session state if required and try restarting the transaction. Failover.connectionChangedError=The active SQL connection has changed due to a connection failure. Please re-configure session state if required. +Failover.exceptionConnectingToWriter=An exception occurred while trying to connect to the new writer ''{0}''. Exception: {1} Failover.parameterValue={0}={1} Failover.unableToConnect=Unable to establish a SQL connection due to an unexpected error. Failover.unableToConnectToWriter=Unable to establish SQL connection to the writer instance. Failover.unableToConnectToReader=Unable to establish SQL connection to the reader instance. +Failover.unableToRefreshHostList=The request to discover the new topology timed out or was unsuccessful. +Failover.unexpectedReaderRole=The new writer was identified to be ''{0}'', but querying the instance for its role returned a role of {1}. Failover.detectedException=Detected an exception while executing a command: {0} Failover.failoverDisabled=Cluster-aware failover is disabled. Failover.establishedConnection=Connected to: {0} @@ -183,6 +186,7 @@ Failover.startReaderFailover=Starting reader failover procedure. Failover.invalidNode=Node is no longer available in the topology: {0} Failover.newWriterNotAllowed=The failover process identified the new writer but the host is not in the list of allowed hosts. New writer host: ''{0}''. Allowed hosts: {1} Failover.noOperationsAfterConnectionClosed=No operations allowed after connection closed. +Failover.noWriterHost=Unable to find writer in updated host list: Failover.readerFailoverElapsed=Reader failover elapsed in {0} ms. Failover.writerFailoverElapsed=Writer failover elapsed in {0} ms. @@ -275,7 +279,11 @@ MonitorImpl.stopMonitoringThread=Stop monitoring thread for {0}. # Monitor Service Impl MonitorServiceImpl.emptyAliasSet=Empty alias set passed for ''{0}''. Set should not be empty. -MonitorServiceImpl.errorPopulatingAliases=Error occurred while populating aliases: ''{0}''. + +NodeMonitoringThread.detectedWriter=Writer detected by node monitoring thread: ''{0}''. +NodeMonitoringThread.invalidWriterQuery=The writer topology query is invalid: {0} +NodeMonitoringThread.threadCompleted=Node monitoring thread completed in {0} ms. +NodeMonitoringThread.writerNodeChanged=Writer node changed from ''{0}'' to node ''{1}''. OktaAuthPlugin.unableToDetermineRegion=Unable to determine connection region. If you are using a non-standard RDS URL, please set the ''{0}'' property. OktaAuthPlugin.requiredDependenciesMissing=OktaAuthPlugin requires the 'AWS Java SDK for AWS Secret Token Service' and 'JSoup' dependencies. Both of these dependencies must be registered on the classpath. @@ -289,8 +297,7 @@ OktaCredentialsProviderFactory.samlRequestFailed=Okta SAML Assertion request fai PluginServiceImpl.currentHostNotAllowed=The current host is not in the list of allowed hosts. Current host: ''{0}''. Allowed hosts: {1} PluginServiceImpl.hostListEmpty=Current host list is empty. PluginServiceImpl.releaseResources=Releasing resources. -PluginServiceImpl.hostListException=Exception while getting a host list. -PluginServiceImpl.hostAliasNotFound=Can''t find any host by the following aliases: ''{0}''. +PluginServiceImpl.forceRefreshTimeout=A timeout exception occurred after waiting {0}ms for refreshed topology. PluginServiceImpl.hostsChangelistEmpty=There are no changes in the hosts' availability. PluginServiceImpl.failedToRetrieveHostPort=Could not retrieve Host:Port for connection. PluginServiceImpl.nonEmptyAliases=fillAliases called when HostSpec already contains the following aliases: ''{0}''. @@ -378,14 +385,18 @@ NodeResponseTimeMonitor.openingConnection=Opening a Response time connection to NodeResponseTimeMonitor.openedConnection=Opened Response time connection: {0}. # Monitoring RDS HostList Provider -ClusterTopologyMonitorImpl.startMonitoringThread=Start cluster topology monitoring thread for {0}. -ClusterTopologyMonitorImpl.stopMonitoringThread=Stop cluster topology monitoring thread for {0}. -ClusterTopologyMonitorImpl.exceptionDuringMonitoringStop=Stopping cluster topology monitoring after unhandled exception was thrown in monitoring thread for node {0}. +ClusterTopologyMonitorImpl.startMonitoringThread=Start cluster topology monitoring thread for ''{0}''. +ClusterTopologyMonitorImpl.stopMonitoringThread=Stop cluster topology monitoring thread for ''{0}''. +ClusterTopologyMonitorImpl.exceptionDuringMonitoringStop=Stopping cluster topology monitoring after unhandled exception was thrown in monitoring thread for node ''{0}''. ClusterTopologyMonitorImpl.invalidQuery=An error occurred while attempting to obtain the topology because the topology query was invalid. Please ensure you are connecting to an Aurora or RDS Db cluster. ClusterTopologyMonitorImpl.errorGettingNetworkTimeout=An error occurred while getting the connection network timeout: {0} ClusterTopologyMonitorImpl.invalidTopology=The topology query returned an invalid topology - no writer instance detected. -ClusterTopologyMonitorImpl.invalidWriterQuery=The writer topology query is invalid. ClusterTopologyMonitorImpl.topologyNotUpdated=Topology hasn't been updated after {0} ms. -ClusterTopologyMonitorImpl.openedMonitoringConnection=Opened monitoring connection to node {0}. -ClusterTopologyMonitorImpl.nodeThreadCompleted=Thread completed in {0} ms. -ClusterTopologyMonitorImpl.writerNodeChanged=Writer node changed from {0} to node {1}. +ClusterTopologyMonitorImpl.openedMonitoringConnection=Opened monitoring connection to node ''{0}''. +ClusterTopologyMonitorImpl.ignoringTopologyRequest=A topology refresh was requested, but the topology was already updated recently. Returning cached hosts: +ClusterTopologyMonitorImpl.timeoutSetToZero=A topology refresh was requested, but the given timeout for the request was 0ms. Returning cached hosts: +ClusterTopologyMonitorImpl.interrupted=The thread was interrupted while waiting for updated topology. +ClusterTopologyMonitorImpl.startingNodeMonitoringThreads=Starting node monitoring threads. +ClusterTopologyMonitorImpl.writerPickedUpFromNodeMonitors=The writer host detected by the node monitors was picked up by the topology monitor: ''{0}''. +ClusterTopologyMonitorImpl.writerMonitoringConnection=The monitoring connection is connected to a writer: ''{0}''. +ClusterTopologyMonitorImpl.errorFetchingTopology=An error occurred while querying for topology: {0} diff --git a/wrapper/src/test/java/integration/container/tests/AuroraFailoverTest.java b/wrapper/src/test/java/integration/container/tests/AuroraFailoverTest.java index 4b179111c..49569ebbe 100644 --- a/wrapper/src/test/java/integration/container/tests/AuroraFailoverTest.java +++ b/wrapper/src/test/java/integration/container/tests/AuroraFailoverTest.java @@ -21,6 +21,7 @@ import static org.junit.jupiter.api.Assertions.assertNotEquals; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; import com.mysql.cj.conf.PropertyKey; import integration.DatabaseEngine; @@ -34,8 +35,6 @@ import integration.container.TestDriverProvider; import integration.container.TestEnvironment; import integration.container.condition.DisableOnTestFeature; -import integration.container.condition.EnableOnDatabaseEngine; -import integration.container.condition.EnableOnDatabaseEngineDeployment; import integration.container.condition.EnableOnNumOfInstances; import integration.container.condition.EnableOnTestDriver; import integration.container.condition.EnableOnTestFeature; @@ -45,23 +44,21 @@ import java.sql.DriverManager; import java.sql.ResultSet; import java.sql.SQLException; -import java.sql.SQLSyntaxErrorException; import java.sql.Statement; import java.util.ArrayList; import java.util.List; import java.util.Properties; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.logging.Logger; import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.MethodOrderer; import org.junit.jupiter.api.Order; import org.junit.jupiter.api.TestMethodOrder; import org.junit.jupiter.api.TestTemplate; import org.junit.jupiter.api.extension.ExtendWith; import software.amazon.jdbc.PropertyDefinition; -import software.amazon.jdbc.dialect.DialectCodes; -import software.amazon.jdbc.dialect.DialectManager; import software.amazon.jdbc.ds.AwsWrapperDataSource; import software.amazon.jdbc.hostlistprovider.AuroraHostListProvider; import software.amazon.jdbc.plugin.failover.FailoverSQLException; @@ -75,7 +72,6 @@ TestEnvironmentFeatures.PERFORMANCE, TestEnvironmentFeatures.RUN_HIBERNATE_TESTS_ONLY, TestEnvironmentFeatures.RUN_AUTOSCALING_TESTS_ONLY}) -@EnableOnNumOfInstances(min = 2) @MakeSureFirstInstanceWriter @Order(14) public class AuroraFailoverTest { @@ -100,6 +96,7 @@ public void setUpEach() { * writer. Driver failover occurs when executing a method against the connection */ @TestTemplate + @EnableOnNumOfInstances(min = 2) public void test_failFromWriterToNewWriter_failOnConnectionInvocation() throws SQLException, InterruptedException { @@ -136,6 +133,7 @@ public void test_failFromWriterToNewWriter_failOnConnectionInvocation() * connection (eg a Statement object created by the connection). */ @TestTemplate + @EnableOnNumOfInstances(min = 2) public void test_failFromWriterToNewWriter_failOnConnectionBoundObjectInvocation() throws SQLException, InterruptedException { @@ -173,7 +171,7 @@ public void test_failFromWriterToNewWriter_failOnConnectionBoundObjectInvocation * Current reader dies, no other reader instance, failover to writer. */ @TestTemplate - @EnableOnNumOfInstances(max = 2) + @EnableOnNumOfInstances(min = 2, max = 2) public void test_failFromReaderToWriter() throws SQLException { // Connect to the only available reader instance final TestInstanceInfo instanceInfo = @@ -206,6 +204,7 @@ public void test_failFromReaderToWriter() throws SQLException { /** Writer fails within a transaction. Open transaction with setAutoCommit(false) */ @TestTemplate + @EnableOnNumOfInstances(min = 2) public void test_writerFailWithinTransaction_setAutoCommitFalse() throws SQLException, InterruptedException { @@ -265,6 +264,7 @@ public void test_writerFailWithinTransaction_setAutoCommitFalse() /** Writer fails within a transaction. Open transaction with "START TRANSACTION". */ @TestTemplate + @EnableOnNumOfInstances(min = 2) public void test_writerFailWithinTransaction_startTransaction() throws SQLException, InterruptedException { @@ -326,6 +326,7 @@ public void test_writerFailWithinTransaction_startTransaction() } @TestTemplate + @EnableOnNumOfInstances(min = 2) @EnableOnTestFeature(TestEnvironmentFeatures.NETWORK_OUTAGES_ENABLED) public void testServerFailoverWithIdleConnections() throws SQLException, InterruptedException { final List idleConnections = new ArrayList<>(); @@ -407,6 +408,7 @@ public void testServerFailoverWithIdleConnections() throws SQLException, Interru } @TestTemplate + @EnableOnNumOfInstances(min = 2) public void test_DataSourceWriterConnection_BasicFailover() throws SQLException, InterruptedException { @@ -474,6 +476,7 @@ public void test_DataSourceWriterConnection_BasicFailover() } @TestTemplate + @EnableOnNumOfInstances(min = 2) @EnableOnTestDriver(TestDriver.MYSQL) public void test_takeOverConnectionProperties() throws SQLException, InterruptedException { final Properties props = initDefaultProps(); @@ -520,6 +523,7 @@ public void test_takeOverConnectionProperties() throws SQLException, Interrupted * writer. Autocommit is set to false and the keepSessionStateOnFailover property is set to true. */ @TestTemplate + @EnableOnNumOfInstances(min = 2) public void test_failFromWriterWhereKeepSessionStateOnFailoverIsTrue() throws SQLException, InterruptedException { @@ -583,6 +587,59 @@ public void test_failFromWriterWhereKeepSessionStateOnFailoverIsTrue() } } + @TestTemplate + @EnableOnTestFeature(TestEnvironmentFeatures.NETWORK_OUTAGES_ENABLED) + public void test_writerFailover_writerReelected() throws SQLException, InterruptedException { + final String initialWriterId = this.currentWriter; + TestInstanceInfo initialWriterInstanceInfo = + TestEnvironment.getCurrent().getInfo().getProxyDatabaseInfo().getInstance(initialWriterId); + + final Properties props = initDefaultProxiedProps(); + PropertyDefinition.SOCKET_TIMEOUT.set(props, "2000"); + + try (final Connection conn = + DriverManager.getConnection( + ConnectionStringHelper.getWrapperUrl( + initialWriterInstanceInfo.getHost(), + initialWriterInstanceInfo.getPort(), + TestEnvironment.getCurrent().getInfo().getDatabaseInfo().getDefaultDbName()), + props)) { + + ExecutorService executor = Executors.newFixedThreadPool(1, r -> { + final Thread thread = new Thread(r); + thread.setDaemon(true); + return thread; + }); + + try { + // Failover usually changes the writer instance, but we want to test re-election of the same writer, so we will + // simulate this by temporarily disabling connectivity to the writer. + executor.submit(() -> { + try { + ProxyHelper.disableConnectivity(initialWriterId); + TimeUnit.SECONDS.sleep(5); + ProxyHelper.enableConnectivity(initialWriterId); + } catch (InterruptedException e) { + fail("The disable connectivity thread was unexpectedly interrupted."); + } + }); + + // Leave some time for the other thread to start up + TimeUnit.MILLISECONDS.sleep(500); + + // Failure occurs on Connection invocation + auroraUtil.assertFirstQueryThrows(conn, FailoverSuccessSQLException.class); + + // Assert that we are connected to the new writer after failover happens. + final String currentConnectionId = auroraUtil.queryInstanceId(conn); + assertTrue(auroraUtil.isDBInstanceWriter(currentConnectionId)); + assertEquals(currentConnectionId, initialWriterId); + } finally { + executor.shutdownNow(); + } + } + } + // Helper methods below protected String getFailoverPlugin() {