Skip to content

Commit

Permalink
fix: avoid setting ignoreNewTopologyRequestsEndTimeNano on initial co…
Browse files Browse the repository at this point in the history
…nnection (#1221)
  • Loading branch information
aaron-congo authored Dec 18, 2024
1 parent 929d356 commit b680b4a
Show file tree
Hide file tree
Showing 6 changed files with 165 additions and 66 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -535,7 +535,8 @@ public boolean forceRefreshHostList(final boolean shouldVerifyWriter, final long
return true;
}
} catch (TimeoutException ex) {
// do nothing
// do nothing.
LOGGER.finest(Messages.get("PluginServiceImpl.forceRefreshTimeout", new Object[]{timeoutMs}));
}
return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ public class ClusterTopologyMonitorImpl implements ClusterTopologyMonitor {
protected long highRefreshRateEndTimeNano = 0;
protected final Object topologyUpdated = new Object();
protected final AtomicBoolean requestToUpdateTopology = new AtomicBoolean(false);
protected final AtomicLong ignoreNewTopologyRequestsEndTimeNano = new AtomicLong(0);
protected final AtomicLong ignoreNewTopologyRequestsEndTimeNano = new AtomicLong(-1);
protected final ConcurrentHashMap<String /* host */, Thread> nodeThreads = new ConcurrentHashMap<>();
protected final AtomicBoolean nodeThreadsStop = new AtomicBoolean(false);
protected final AtomicReference<Connection> nodeThreadsWriterConnection = new AtomicReference<>(null);
Expand Down Expand Up @@ -188,6 +188,8 @@ public List<HostSpec> forceRefresh(final boolean shouldVerifyWriter, final long

// Previous failover has just completed. We can use results of it without triggering a new topology update.
List<HostSpec> currentHosts = this.topologyMap.get(this.clusterId);
LOGGER.finest(
Utils.logTopology(currentHosts, Messages.get("ClusterTopologyMonitorImpl.ignoringTopologyRequest")));
if (currentHosts != null) {
return currentHosts;
}
Expand Down Expand Up @@ -229,6 +231,7 @@ protected List<HostSpec> waitTillTopologyGetsUpdated(final long timeoutMs) throw
}

if (timeoutMs == 0) {
LOGGER.finest(Utils.logTopology(currentHosts, Messages.get("ClusterTopologyMonitorImpl.timeoutSetToZero")));
return currentHosts;
}

Expand All @@ -240,6 +243,7 @@ protected List<HostSpec> waitTillTopologyGetsUpdated(final long timeoutMs) throw
this.topologyUpdated.wait(1000);
}
} catch (InterruptedException ex) {
LOGGER.fine(Messages.get("ClusterTopologyMonitorImpl.interrupted"));
Thread.currentThread().interrupt();
return null;
}
Expand Down Expand Up @@ -282,6 +286,7 @@ public void run() {
if (this.isInPanicMode()) {

if (this.nodeThreads.isEmpty()) {
LOGGER.finest(Messages.get("ClusterTopologyMonitorImpl.startingNodeMonitoringThreads"));

// start node threads
this.nodeThreadsStop.set(false);
Expand Down Expand Up @@ -309,19 +314,28 @@ public void run() {
// otherwise let's try it again the next round

} else {

// node threads are running
// check if writer is already detected
final Connection writerConnection = this.nodeThreadsWriterConnection.get();
final HostSpec writerConnectionHostSpec = this.nodeThreadsWriterHostSpec.get();
if (writerConnection != null && writerConnectionHostSpec != null) {
LOGGER.finest(
Messages.get(
"ClusterTopologyMonitorImpl.writerPickedUpFromNodeMonitors",
new Object[]{writerConnectionHostSpec}));

this.closeConnection(this.monitoringConnection.get());
this.monitoringConnection.set(writerConnection);
this.writerHostSpec.set(writerConnectionHostSpec);
this.isVerifiedWriterConnection = true;
this.highRefreshRateEndTimeNano = System.nanoTime() + highRefreshPeriodAfterPanicNano;
this.ignoreNewTopologyRequestsEndTimeNano.set(System.nanoTime() + ignoreTopologyRequestNano);

// We verify the writer on initial connection and on failover, but we only want to ignore new topology
// requests after failover. To accomplish this, the first time we verify the writer we set the ignore end
// time to 0. Any future writer verifications will set it to a positive value.
if (!this.ignoreNewTopologyRequestsEndTimeNano.compareAndSet(-1, 0)) {
this.ignoreNewTopologyRequestsEndTimeNano.set(System.nanoTime() + ignoreTopologyRequestNano);
}

this.nodeThreadsStop.set(true);
for (Thread thread : this.nodeThreads.values()) {
Expand Down Expand Up @@ -427,7 +441,7 @@ protected Thread getNodeMonitoringThread(final HostSpec hostSpec, final @Nullabl
}

protected List<HostSpec> openAnyConnectionAndUpdateTopology() {

boolean writerVerifiedByThisThread = false;
if (this.monitoringConnection.get() == null) {

Connection conn;
Expand All @@ -448,14 +462,22 @@ protected List<HostSpec> openAnyConnectionAndUpdateTopology() {
try {
if (!StringUtils.isNullOrEmpty(this.getWriterNodeId(this.monitoringConnection.get()))) {
this.isVerifiedWriterConnection = true;
writerVerifiedByThisThread = true;

if (rdsHelper.isRdsInstance(this.initialHostSpec.getHost())) {
this.writerHostSpec.set(this.initialHostSpec);
LOGGER.finest("writerHostSpec: " + this.writerHostSpec.get().getHost());
LOGGER.finest(
Messages.get(
"ClusterTopologyMonitorImpl.writerMonitoringConnection",
new Object[]{this.writerHostSpec.get().getHost()}));
} else {
final String nodeId = this.getNodeId(this.monitoringConnection.get());
if (!StringUtils.isNullOrEmpty(nodeId)) {
this.writerHostSpec.set(this.createHost(nodeId, true, 0, null));
LOGGER.finest("writerHostSpec: " + this.writerHostSpec.get().getHost());
LOGGER.finest(
Messages.get(
"ClusterTopologyMonitorImpl.writerMonitoringConnection",
new Object[]{this.writerHostSpec.get().getHost()}));
}
}
}
Expand All @@ -471,6 +493,14 @@ protected List<HostSpec> openAnyConnectionAndUpdateTopology() {
}

final List<HostSpec> hosts = this.fetchTopologyAndUpdateCache(this.monitoringConnection.get());
if (writerVerifiedByThisThread) {
// We verify the writer on initial connection and on failover, but we only want to ignore new topology
// requests after failover. To accomplish this, the first time we verify the writer we set the ignore end
// time to 0. Any future writer verifications will set it to a positive value.
if (!this.ignoreNewTopologyRequestsEndTimeNano.compareAndSet(-1, 0)) {
this.ignoreNewTopologyRequestsEndTimeNano.set(System.nanoTime() + ignoreTopologyRequestNano);
}
}

if (hosts == null) {
// can't get topology; it might be something's wrong with a connection
Expand Down Expand Up @@ -550,7 +580,7 @@ protected void delay(boolean useHighRefreshRate) throws InterruptedException {
return hosts;
} catch (SQLException ex) {
// do nothing
LOGGER.log(Level.FINEST, "Error fetching topology:", ex);
LOGGER.finest(Messages.get("ClusterTopologyMonitorImpl.errorFetchingTopology", new Object[]{ex}));
}
return null;
}
Expand Down Expand Up @@ -760,7 +790,7 @@ public void run() {
writerId = this.monitor.getWriterNodeId(connection);

} catch (SQLSyntaxErrorException ex) {
LOGGER.severe(() -> Messages.get("ClusterTopologyMonitorImpl.invalidWriterQuery",
LOGGER.severe(() -> Messages.get("NodeMonitoringThread.invalidWriterQuery",
new Object[] {ex.getMessage()}));
throw new RuntimeException(ex);

Expand All @@ -771,21 +801,21 @@ public void run() {

if (!StringUtils.isNullOrEmpty(writerId)) {
// this prevents closing connection in finally block
if (!this.monitor.nodeThreadsWriterConnection
.compareAndSet(null, connection)) {
if (!this.monitor.nodeThreadsWriterConnection.compareAndSet(null, connection)) {
// writer connection is already setup
this.monitor.closeConnection(connection);

} else {
// writer connection is successfully set to writerConnection
LOGGER.fine(Messages.get("NodeMonitoringThread.detectedWriter", new Object[]{writerId}));
// When nodeThreadsWriterConnection and nodeThreadsWriterHostSpec are both set, the topology monitor may
// set ignoreNewTopologyRequestsEndTimeNano, in which case other threads will use the cached topology
// for the ignore duration, so we need to update the topology before setting nodeThreadsWriterHostSpec.
this.monitor.fetchTopologyAndUpdateCache(connection);
this.monitor.nodeThreadsWriterHostSpec.set(hostSpec);
LOGGER.fine("Detected writer: " + writerId);
this.monitor.nodeThreadsStop.set(true);

this.monitor.fetchTopologyAndUpdateCache(connection);
LOGGER.fine(Utils.logTopology(
this.monitor.topologyMap.get(this.monitor.clusterId)));

}

// Setting the connection to null here prevents the finally block
Expand Down Expand Up @@ -816,7 +846,7 @@ public void run() {
} finally {
this.monitor.closeConnection(connection);
final long end = System.nanoTime();
LOGGER.finest(() -> Messages.get("ClusterTopologyMonitorImpl.nodeThreadCompleted",
LOGGER.finest(() -> Messages.get("NodeMonitoringThread.threadCompleted",
new Object[] {TimeUnit.NANOSECONDS.toMillis(end - start)}));
}
}
Expand Down Expand Up @@ -853,7 +883,7 @@ private void readerThreadFetchTopology(final Connection connection, final @Nulla
// writer node has changed
this.writerChanged = true;

LOGGER.fine(() -> Messages.get("ClusterTopologyMonitorImpl.writerNodeChanged",
LOGGER.fine(() -> Messages.get("NodeMonitoringThread.writerNodeChanged",
new Object[] {writerHostSpec.getHost(), latestWriterHostSpec.getHost()}));

// we can update topology cache and notify all waiting threads
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,7 @@ public WriterFailoverResult call() {
private boolean isCurrentHostWriter(final List<HostSpec> latestTopology) {
final HostSpec latestWriter = getWriter(latestTopology);
final Set<String> latestWriterAllAliases = latestWriter.asAliases();
final Set<String> currentAliases = this.originalWriterHost.getAliases();
final Set<String> currentAliases = this.originalWriterHost.asAliases();

return (currentAliases != null)
&& (latestWriterAllAliases.stream().anyMatch(currentAliases::contains));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,14 @@

import java.sql.Connection;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.logging.Logger;
import java.util.stream.Collectors;
import software.amazon.jdbc.AwsWrapperProperty;
import software.amazon.jdbc.HostListProviderService;
import software.amazon.jdbc.HostRole;
Expand Down Expand Up @@ -218,13 +218,11 @@ public void initHostProvider(
final JdbcCallable<Void, SQLException> initHostProviderFunc)
throws SQLException {
initHostProvider(
initialUrl,
hostListProviderService,
initHostProviderFunc);
}

void initHostProvider(
final String initialUrl,
final HostListProviderService hostListProviderService,
final JdbcCallable<Void, SQLException> initHostProviderFunc)
throws SQLException {
Expand Down Expand Up @@ -305,10 +303,9 @@ protected <E extends Exception> void dealWithIllegalStateException(
* Initiates the failover procedure. This process tries to establish a new connection to an
* instance in the topology.
*
* @param failedHost The host with network errors.
* @throws SQLException if an error occurs
*/
protected void failover(final HostSpec failedHost) throws SQLException {
protected void failover() throws SQLException {

if (this.failoverMode == FailoverMode.STRICT_WRITER) {
failoverWriter();
Expand Down Expand Up @@ -369,7 +366,7 @@ protected void failoverReader() throws SQLException {
try {
readerCandidate =
this.pluginService.getHostSpecByStrategy(
remainingHosts.stream().collect(Collectors.toList()),
new ArrayList<>(remainingHosts),
HostRole.READER,
this.failoverReaderHostSelectorStrategySetting);
} catch (UnsupportedOperationException | SQLException ex) {
Expand Down Expand Up @@ -469,54 +466,57 @@ protected void failoverWriter() throws SQLException {
if (!this.pluginService.forceRefreshHostList(true, this.failoverTimeoutMsSetting)) {
// "Unable to establish SQL connection to writer node"
this.failoverWriterFailedCounter.inc();
LOGGER.severe(Messages.get("Failover.unableToConnectToWriter"));
throw new FailoverFailedSQLException(Messages.get("Failover.unableToConnectToWriter"));
LOGGER.severe(Messages.get("Failover.unableToRefreshHostList"));
throw new FailoverFailedSQLException(Messages.get("Failover.unableToRefreshHostList"));
}

final List<HostSpec> updatedHosts = this.pluginService.getAllHosts();
final Properties copyProp = PropertyUtils.copyProperties(this.properties);
copyProp.setProperty(INTERNAL_CONNECT_PROPERTY_NAME, "true");

Connection writerCandidateConn = null;
Connection writerCandidateConn;
final HostSpec writerCandidate = updatedHosts.stream()
.filter(x -> x.getRole() == HostRole.WRITER)
.findFirst()
.orElse(null);

if (writerCandidate == null) {
this.failoverWriterFailedCounter.inc();
String message = Utils.logTopology(updatedHosts, Messages.get("Failover.noWriterHost"));
LOGGER.severe(message);
throw new FailoverFailedSQLException(message);
}

List<HostSpec> allowedHosts = this.pluginService.getHosts();
if (writerCandidate != null && !allowedHosts.contains(writerCandidate)) {
if (!allowedHosts.contains(writerCandidate)) {
this.failoverWriterFailedCounter.inc();
String topologyString = Utils.logTopology(allowedHosts, "");
LOGGER.severe(Messages.get("Failover.newWriterNotAllowed",
new Object[] {writerCandidate.getHost(), Utils.logTopology(allowedHosts, "")}));
new Object[] {writerCandidate.getHost(), topologyString}));
throw new FailoverFailedSQLException(
Messages.get("Failover.newWriterNotAllowed",
new Object[] {writerCandidate.getHost(), Utils.logTopology(allowedHosts, "")}));
}

if (writerCandidate != null) {
try {
writerCandidateConn = this.pluginService.connect(writerCandidate, copyProp);
} catch (SQLException ex) {
// do nothing
}
new Object[] {writerCandidate.getHost(), topologyString}));
}

if (writerCandidateConn == null) {
// "Unable to establish SQL connection to writer node"
try {
writerCandidateConn = this.pluginService.connect(writerCandidate, copyProp);
} catch (SQLException ex) {
this.failoverWriterFailedCounter.inc();
LOGGER.severe(Messages.get("Failover.unableToConnectToWriter"));
throw new FailoverFailedSQLException(Messages.get("Failover.unableToConnectToWriter"));
LOGGER.severe(
Messages.get("Failover.exceptionConnectingToWriter", new Object[]{writerCandidate.getHost(), ex}));
throw new FailoverFailedSQLException(Messages.get("Failover.exceptionConnectingToWriter"));
}

if (this.pluginService.getHostRole(writerCandidateConn) != HostRole.WRITER) {
HostRole role = this.pluginService.getHostRole(writerCandidateConn);
if (role != HostRole.WRITER) {
try {
writerCandidateConn.close();
} catch (SQLException ex) {
// do nothing
}
this.failoverWriterFailedCounter.inc();
LOGGER.severe(Messages.get("Failover.unableToConnectToWriter"));
throw new FailoverFailedSQLException(Messages.get("Failover.unableToConnectToWriter"));
LOGGER.severe(Messages.get("Failover.unexpectedReaderRole", new Object[]{writerCandidate.getHost(), role}));
throw new FailoverFailedSQLException(Messages.get("Failover.unexpectedReaderRole"));
}

this.pluginService.setCurrentConnection(writerCandidateConn, writerCandidate);
Expand Down Expand Up @@ -579,7 +579,7 @@ protected void pickNewConnection() throws SQLException {
return;
}

this.failover(this.pluginService.getCurrentHostSpec());
this.failover();
}

protected boolean shouldExceptionTriggerConnectionSwitch(final Throwable t) {
Expand Down Expand Up @@ -674,15 +674,15 @@ public Connection connect(
this.pluginService.setAvailability(hostSpec.asAliases(), HostAvailability.NOT_AVAILABLE);

try {
this.failover(hostSpec);
this.failover();
} catch (FailoverSuccessSQLException failoverSuccessException) {
conn = this.pluginService.getCurrentConnection();
}
}
} else {
try {
this.pluginService.refreshHostList();
this.failover(hostSpec);
this.failover();
} catch (FailoverSuccessSQLException failoverSuccessException) {
conn = this.pluginService.getCurrentConnection();
}
Expand Down
Loading

0 comments on commit b680b4a

Please sign in to comment.