diff --git a/core/pom.xml b/core/pom.xml index b8d7d5c2d3b..2167b1cabc0 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -116,6 +116,11 @@ org.reactivestreams reactive-streams + + com.github.seancfoley + ipaddress + true + com.github.stephenc.jcip jcip-annotations diff --git a/core/src/main/java/com/datastax/oss/driver/api/core/config/DefaultDriverOption.java b/core/src/main/java/com/datastax/oss/driver/api/core/config/DefaultDriverOption.java index 6ffd51d86ef..4e45bf7b117 100644 --- a/core/src/main/java/com/datastax/oss/driver/api/core/config/DefaultDriverOption.java +++ b/core/src/main/java/com/datastax/oss/driver/api/core/config/DefaultDriverOption.java @@ -994,7 +994,48 @@ public enum DefaultDriverOption implements DriverOption { * *

Value-type: boolean */ - SSL_ALLOW_DNS_REVERSE_LOOKUP_SAN("advanced.ssl-engine-factory.allow-dns-reverse-lookup-san"); + SSL_ALLOW_DNS_REVERSE_LOOKUP_SAN("advanced.ssl-engine-factory.allow-dns-reverse-lookup-san"), + /** + * An address to always translate all node addresses to that same proxy hostname no matter what IP + * address a node has, but still using its native transport port. + * + *

Value-Type: {@link String} + */ + ADDRESS_TRANSLATOR_ADVERTISED_HOSTNAME("advanced.address-translator.advertised-hostname"), + /** + * A map of Cassandra node subnets (CIDR notations) to target addresses, for example (note quoted + * keys): + * + *

+   * advanced.address-translator.subnet-addresses {
+   *   "100.64.0.0/15" = "cassandra.datacenter1.com:9042"
+   *   "100.66.0.0/15" = "cassandra.datacenter2.com:9042"
+   *   # IPv6 example:
+   *   # "::ffff:6440:0/111" = "cassandra.datacenter1.com:9042"
+   *   # "::ffff:6442:0/111" = "cassandra.datacenter2.com:9042"
+   * }
+   * 
+ * + * Note: subnets must be represented as prefix blocks, see {@link + * inet.ipaddr.Address#isPrefixBlock()}. + * + *

Value type: {@link java.util.Map Map}<{@link String},{@link String}> + */ + ADDRESS_TRANSLATOR_SUBNET_ADDRESSES("advanced.address-translator.subnet-addresses"), + /** + * A default address to fallback to if Cassandra node IP isn't contained in any of the configured + * subnets. + * + *

Value-Type: {@link String} + */ + ADDRESS_TRANSLATOR_DEFAULT_ADDRESS("advanced.address-translator.default-address"), + /** + * Whether to resolve the addresses on initialization (if true) or on each node (re-)connection + * (if false). Defaults to false. + * + *

Value-Type: boolean + */ + ADDRESS_TRANSLATOR_RESOLVE_ADDRESSES("advanced.address-translator.resolve-addresses"); private final String path; diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/ContactPoints.java b/core/src/main/java/com/datastax/oss/driver/internal/core/ContactPoints.java index 1ed2a1cebf3..bb65661b72f 100644 --- a/core/src/main/java/com/datastax/oss/driver/internal/core/ContactPoints.java +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/ContactPoints.java @@ -19,14 +19,11 @@ import com.datastax.oss.driver.api.core.metadata.EndPoint; import com.datastax.oss.driver.internal.core.metadata.DefaultEndPoint; +import com.datastax.oss.driver.internal.core.util.AddressUtils; import com.datastax.oss.driver.shaded.guava.common.collect.ImmutableSet; import com.datastax.oss.driver.shaded.guava.common.collect.Sets; -import java.net.InetAddress; import java.net.InetSocketAddress; -import java.net.UnknownHostException; -import java.util.Arrays; import java.util.Collections; -import java.util.HashSet; import java.util.List; import java.util.Set; import org.slf4j.Logger; @@ -41,7 +38,22 @@ public static Set merge( Set result = Sets.newHashSet(programmaticContactPoints); for (String spec : configContactPoints) { - for (InetSocketAddress address : extract(spec, resolve)) { + + Set addresses = Collections.emptySet(); + try { + addresses = AddressUtils.extract(spec, resolve); + } catch (RuntimeException e) { + LOG.warn("Ignoring invalid contact point {} ({})", spec, e.getMessage(), e); + } + + if (addresses.size() > 1) { + LOG.info( + "Contact point {} resolves to multiple addresses, will use them all ({})", + spec, + addresses); + } + + for (InetSocketAddress address : addresses) { DefaultEndPoint endPoint = new DefaultEndPoint(address); boolean wasNew = result.add(endPoint); if (!wasNew) { @@ -51,43 +63,4 @@ public static Set merge( } return ImmutableSet.copyOf(result); } - - private static Set extract(String spec, boolean resolve) { - int separator = spec.lastIndexOf(':'); - if (separator < 0) { - LOG.warn("Ignoring invalid contact point {} (expecting host:port)", spec); - return Collections.emptySet(); - } - - String host = spec.substring(0, separator); - String portSpec = spec.substring(separator + 1); - int port; - try { - port = Integer.parseInt(portSpec); - } catch (NumberFormatException e) { - LOG.warn("Ignoring invalid contact point {} (expecting a number, got {})", spec, portSpec); - return Collections.emptySet(); - } - if (!resolve) { - return ImmutableSet.of(InetSocketAddress.createUnresolved(host, port)); - } else { - try { - InetAddress[] inetAddresses = InetAddress.getAllByName(host); - if (inetAddresses.length > 1) { - LOG.info( - "Contact point {} resolves to multiple addresses, will use them all ({})", - spec, - Arrays.deepToString(inetAddresses)); - } - Set result = new HashSet<>(); - for (InetAddress inetAddress : inetAddresses) { - result.add(new InetSocketAddress(inetAddress, port)); - } - return result; - } catch (UnknownHostException e) { - LOG.warn("Ignoring invalid contact point {} (unknown host {})", spec, host); - return Collections.emptySet(); - } - } - } } diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/addresstranslation/FixedHostNameAddressTranslator.java b/core/src/main/java/com/datastax/oss/driver/internal/core/addresstranslation/FixedHostNameAddressTranslator.java index 4fb9782f566..5cc6c2518fb 100644 --- a/core/src/main/java/com/datastax/oss/driver/internal/core/addresstranslation/FixedHostNameAddressTranslator.java +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/addresstranslation/FixedHostNameAddressTranslator.java @@ -17,8 +17,9 @@ */ package com.datastax.oss.driver.internal.core.addresstranslation; +import static com.datastax.oss.driver.api.core.config.DefaultDriverOption.ADDRESS_TRANSLATOR_ADVERTISED_HOSTNAME; + import com.datastax.oss.driver.api.core.addresstranslation.AddressTranslator; -import com.datastax.oss.driver.api.core.config.DriverOption; import com.datastax.oss.driver.api.core.context.DriverContext; import edu.umd.cs.findbugs.annotations.NonNull; import java.net.InetSocketAddress; @@ -37,28 +38,13 @@ public class FixedHostNameAddressTranslator implements AddressTranslator { private static final Logger LOG = LoggerFactory.getLogger(FixedHostNameAddressTranslator.class); - public static final String ADDRESS_TRANSLATOR_ADVERTISED_HOSTNAME = - "advanced.address-translator.advertised-hostname"; - - public static DriverOption ADDRESS_TRANSLATOR_ADVERTISED_HOSTNAME_OPTION = - new DriverOption() { - @NonNull - @Override - public String getPath() { - return ADDRESS_TRANSLATOR_ADVERTISED_HOSTNAME; - } - }; - private final String advertisedHostname; private final String logPrefix; public FixedHostNameAddressTranslator(@NonNull DriverContext context) { logPrefix = context.getSessionName(); advertisedHostname = - context - .getConfig() - .getDefaultProfile() - .getString(ADDRESS_TRANSLATOR_ADVERTISED_HOSTNAME_OPTION); + context.getConfig().getDefaultProfile().getString(ADDRESS_TRANSLATOR_ADVERTISED_HOSTNAME); } @NonNull diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/addresstranslation/SubnetAddressTranslator.java b/core/src/main/java/com/datastax/oss/driver/internal/core/addresstranslation/SubnetAddressTranslator.java new file mode 100644 index 00000000000..2a2798a7117 --- /dev/null +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/addresstranslation/SubnetAddressTranslator.java @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.datastax.oss.driver.internal.core.addresstranslation; + +import static com.datastax.oss.driver.api.core.config.DefaultDriverOption.ADDRESS_TRANSLATOR_DEFAULT_ADDRESS; +import static com.datastax.oss.driver.api.core.config.DefaultDriverOption.ADDRESS_TRANSLATOR_RESOLVE_ADDRESSES; +import static com.datastax.oss.driver.api.core.config.DefaultDriverOption.ADDRESS_TRANSLATOR_SUBNET_ADDRESSES; + +import com.datastax.oss.driver.api.core.addresstranslation.AddressTranslator; +import com.datastax.oss.driver.api.core.context.DriverContext; +import com.datastax.oss.driver.internal.core.util.AddressUtils; +import edu.umd.cs.findbugs.annotations.NonNull; +import edu.umd.cs.findbugs.annotations.Nullable; +import inet.ipaddr.IPAddress; +import inet.ipaddr.IPAddressString; +import java.net.InetSocketAddress; +import java.util.List; +import java.util.Optional; +import java.util.stream.Collectors; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This translator returns the proxy address of the private subnet containing the Cassandra node IP, + * or default address if no matching subnets, or passes through the original node address if no + * default configured. + * + *

The translator can be used for scenarios when all nodes are behind some kind of proxy, and + * that proxy is different for nodes located in different subnets (eg. when Cassandra is deployed in + * multiple datacenters/regions). One can use this, for example, for Cassandra on Kubernetes with + * different Cassandra datacenters deployed to different Kubernetes clusters. + */ +public class SubnetAddressTranslator implements AddressTranslator { + private static final Logger LOG = LoggerFactory.getLogger(SubnetAddressTranslator.class); + + private final List subnetAddresses; + + @SuppressWarnings("OptionalUsedAsFieldOrParameterType") + private final Optional defaultAddress; + + private final String logPrefix; + + public SubnetAddressTranslator(@NonNull DriverContext context) { + logPrefix = context.getSessionName(); + boolean resolveAddresses = + context + .getConfig() + .getDefaultProfile() + .getBoolean(ADDRESS_TRANSLATOR_RESOLVE_ADDRESSES, false); + this.subnetAddresses = + context.getConfig().getDefaultProfile().getStringMap(ADDRESS_TRANSLATOR_SUBNET_ADDRESSES) + .entrySet().stream() + .map( + e -> { + // Quoted and/or containing forward slashes map keys in reference.conf are read to + // strings with additional quotes, eg. 100.64.0.0/15 -> '100.64.0."0/15"' or + // "100.64.0.0/15" -> '"100.64.0.0/15"' + String subnetCIDR = e.getKey().replaceAll("\"", ""); + String address = e.getValue(); + return new SubnetAddress(subnetCIDR, parseAddress(address, resolveAddresses)); + }) + .collect(Collectors.toList()); + this.defaultAddress = + Optional.ofNullable( + context + .getConfig() + .getDefaultProfile() + .getString(ADDRESS_TRANSLATOR_DEFAULT_ADDRESS, null)) + .map(address -> parseAddress(address, resolveAddresses)); + + validateSubnetsAreNotOverlapping(this.subnetAddresses); + } + + @NonNull + @Override + public InetSocketAddress translate(@NonNull InetSocketAddress address) { + InetSocketAddress translatedAddress = null; + for (SubnetAddress subnetAddress : subnetAddresses) { + if (subnetAddress.contains(address)) { + translatedAddress = subnetAddress.address; + } + } + if (translatedAddress == null && defaultAddress.isPresent()) { + translatedAddress = defaultAddress.get(); + } + if (translatedAddress == null) { + translatedAddress = address; + } + LOG.debug("[{}] Translated {} to {}", logPrefix, address, translatedAddress); + return translatedAddress; + } + + @Override + public void close() {} + + @Nullable + private InetSocketAddress parseAddress(String address, boolean resolve) { + try { + InetSocketAddress parsedAddress = AddressUtils.extract(address, resolve).iterator().next(); + LOG.debug("[{}] Parsed {} to {}", logPrefix, address, parsedAddress); + return parsedAddress; + } catch (RuntimeException e) { + throw new IllegalArgumentException( + String.format("Invalid address %s (%s)", address, e.getMessage()), e); + } + } + + private static void validateSubnetsAreNotOverlapping(List subnetAddresses) { + for (int i = 0; i < subnetAddresses.size() - 1; i++) { + for (int j = i + 1; j < subnetAddresses.size(); j++) { + SubnetAddress subnetAddress1 = subnetAddresses.get(i); + SubnetAddress subnetAddress2 = subnetAddresses.get(j); + if (subnetAddress1.isOverlapping(subnetAddress2)) { + throw new IllegalArgumentException( + String.format( + "Configured subnets are overlapping: %s, %s", + subnetAddress1.subnet, subnetAddress2.subnet)); + } + } + } + } + + private static class SubnetAddress { + private final IPAddress subnet; + private final InetSocketAddress address; + + private SubnetAddress(String subnetCIDR, InetSocketAddress address) { + this.subnet = parseSubnet(subnetCIDR); + this.address = address; + } + + private static IPAddress parseSubnet(String subnetCIDR) { + IPAddress subnet = new IPAddressString(subnetCIDR).getAddress(); + if (subnet == null) { + throw new IllegalArgumentException("Invalid subnet: " + subnetCIDR); + } + if (!subnet.isPrefixBlock()) { + throw new IllegalArgumentException( + String.format( + "Subnet %s must be represented as a network prefix block %s", + subnet, subnet.toPrefixBlock())); + } + return subnet; + } + + private boolean isOverlapping(SubnetAddress other) { + IPAddress thisSubnet = this.subnet; + IPAddress otherSubnet = other.subnet; + return thisSubnet.contains(otherSubnet.getLower()) + || thisSubnet.contains(otherSubnet.getUpper()) + || otherSubnet.contains(thisSubnet.getLower()) + || otherSubnet.contains(thisSubnet.getUpper()); + } + + private boolean contains(InetSocketAddress address) { + IPAddress ipAddress = new IPAddressString(address.getAddress().getHostAddress()).getAddress(); + if (subnet.isIPv4() && ipAddress.isIPv4Convertible()) { + ipAddress = ipAddress.toIPv4(); + } else if (subnet.isIPv6() && ipAddress.isIPv6Convertible()) { + ipAddress = ipAddress.toIPv6(); + } + return subnet.contains(ipAddress); + } + } +} diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/util/AddressUtils.java b/core/src/main/java/com/datastax/oss/driver/internal/core/util/AddressUtils.java new file mode 100644 index 00000000000..8905edb9192 --- /dev/null +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/util/AddressUtils.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.datastax.oss.driver.internal.core.util; + +import com.datastax.oss.driver.shaded.guava.common.collect.ImmutableSet; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.UnknownHostException; +import java.util.HashSet; +import java.util.Set; + +public class AddressUtils { + + public static Set extract(String address, boolean resolve) { + int separator = address.lastIndexOf(':'); + if (separator < 0) { + throw new IllegalArgumentException("expecting format host:port"); + } + + String host = address.substring(0, separator); + String portString = address.substring(separator + 1); + int port; + try { + port = Integer.parseInt(portString); + } catch (NumberFormatException e) { + throw new IllegalArgumentException("expecting port to be a number, got " + portString, e); + } + if (!resolve) { + return ImmutableSet.of(InetSocketAddress.createUnresolved(host, port)); + } else { + InetAddress[] inetAddresses; + try { + inetAddresses = InetAddress.getAllByName(host); + } catch (UnknownHostException e) { + throw new RuntimeException(e); + } + Set result = new HashSet<>(); + for (InetAddress inetAddress : inetAddresses) { + result.add(new InetSocketAddress(inetAddress, port)); + } + return result; + } + } +} diff --git a/core/src/main/resources/reference.conf b/core/src/main/resources/reference.conf index f09ffd18a10..3c6851a48ee 100644 --- a/core/src/main/resources/reference.conf +++ b/core/src/main/resources/reference.conf @@ -1026,8 +1026,9 @@ datastax-java-driver { # the package com.datastax.oss.driver.internal.core.addresstranslation. # # The driver provides the following implementations out of the box: - # - PassThroughAddressTranslator: returns all addresses unchanged + # - PassThroughAddressTranslator: returns all addresses unchanged. # - FixedHostNameAddressTranslator: translates all addresses to a specific hostname. + # - SubnetAddressTranslator: translates addresses to hostname based on the subnet match. # - Ec2MultiRegionAddressTranslator: suitable for an Amazon multi-region EC2 deployment where # clients are also deployed in EC2. It optimizes network costs by favoring private IPs over # public ones whenever possible. @@ -1035,8 +1036,23 @@ datastax-java-driver { # You can also specify a custom class that implements AddressTranslator and has a public # constructor with a DriverContext argument. class = PassThroughAddressTranslator + # # This property has to be set only in case you use FixedHostNameAddressTranslator. # advertised-hostname = mycustomhostname + # + # These properties are only applicable in case you use SubnetAddressTranslator. + # subnet-addresses { + # "100.64.0.0/15" = "cassandra.datacenter1.com:9042" + # "100.66.0.0/15" = "cassandra.datacenter2.com:9042" + # # IPv6 example: + # # "::ffff:6440:0/111" = "cassandra.datacenter1.com:9042" + # # "::ffff:6442:0/111" = "cassandra.datacenter2.com:9042" + # } + # Optional. When configured, addresses not matching the configured subnets are translated to this address. + # default-address = "cassandra.datacenter1.com:9042" + # Whether to resolve the addresses once on initialization (if true) or on each node (re-)connection (if false). + # If not configured, defaults to false. + # resolve-addresses = false } # Whether to resolve the addresses passed to `basic.contact-points`. diff --git a/core/src/test/java/com/datastax/oss/driver/internal/core/ContactPointsTest.java b/core/src/test/java/com/datastax/oss/driver/internal/core/ContactPointsTest.java index 9e0d8737619..72b875b8602 100644 --- a/core/src/test/java/com/datastax/oss/driver/internal/core/ContactPointsTest.java +++ b/core/src/test/java/com/datastax/oss/driver/internal/core/ContactPointsTest.java @@ -121,7 +121,7 @@ public void should_ignore_malformed_host_and_port_and_warn() { ContactPoints.merge(Collections.emptySet(), ImmutableList.of("foobar"), true); assertThat(endPoints).isEmpty(); - assertLog(Level.WARN, "Ignoring invalid contact point foobar (expecting host:port)"); + assertLog(Level.WARN, "Ignoring invalid contact point foobar (expecting format host:port)"); } @Test @@ -132,7 +132,7 @@ public void should_ignore_malformed_port_and_warn() { assertThat(endPoints).isEmpty(); assertLog( Level.WARN, - "Ignoring invalid contact point 127.0.0.1:foobar (expecting a number, got foobar)"); + "Ignoring invalid contact point 127.0.0.1:foobar (expecting port to be a number, got foobar)"); } @Test diff --git a/core/src/test/java/com/datastax/oss/driver/internal/core/addresstranslation/FixedHostNameAddressTranslatorTest.java b/core/src/test/java/com/datastax/oss/driver/internal/core/addresstranslation/FixedHostNameAddressTranslatorTest.java index c5e864b4bae..92800998056 100644 --- a/core/src/test/java/com/datastax/oss/driver/internal/core/addresstranslation/FixedHostNameAddressTranslatorTest.java +++ b/core/src/test/java/com/datastax/oss/driver/internal/core/addresstranslation/FixedHostNameAddressTranslatorTest.java @@ -17,6 +17,7 @@ */ package com.datastax.oss.driver.internal.core.addresstranslation; +import static com.datastax.oss.driver.api.core.config.DefaultDriverOption.ADDRESS_TRANSLATOR_ADVERTISED_HOSTNAME; import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -33,9 +34,7 @@ public class FixedHostNameAddressTranslatorTest { @Test public void should_translate_address() { DriverExecutionProfile defaultProfile = mock(DriverExecutionProfile.class); - when(defaultProfile.getString( - FixedHostNameAddressTranslator.ADDRESS_TRANSLATOR_ADVERTISED_HOSTNAME_OPTION)) - .thenReturn("myaddress"); + when(defaultProfile.getString(ADDRESS_TRANSLATOR_ADVERTISED_HOSTNAME)).thenReturn("myaddress"); DefaultDriverContext defaultDriverContext = MockedDriverContextFactory.defaultDriverContext(Optional.of(defaultProfile)); diff --git a/core/src/test/java/com/datastax/oss/driver/internal/core/addresstranslation/SubnetAddressTranslatorTest.java b/core/src/test/java/com/datastax/oss/driver/internal/core/addresstranslation/SubnetAddressTranslatorTest.java new file mode 100644 index 00000000000..b8c2f6422cf --- /dev/null +++ b/core/src/test/java/com/datastax/oss/driver/internal/core/addresstranslation/SubnetAddressTranslatorTest.java @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.datastax.oss.driver.internal.core.addresstranslation; + +import static com.datastax.oss.driver.api.core.config.DefaultDriverOption.ADDRESS_TRANSLATOR_DEFAULT_ADDRESS; +import static com.datastax.oss.driver.api.core.config.DefaultDriverOption.ADDRESS_TRANSLATOR_SUBNET_ADDRESSES; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import com.datastax.oss.driver.api.core.config.DriverExecutionProfile; +import com.datastax.oss.driver.internal.core.context.DefaultDriverContext; +import com.datastax.oss.driver.internal.core.context.MockedDriverContextFactory; +import com.google.common.collect.ImmutableMap; +import java.net.InetSocketAddress; +import java.util.Map; +import java.util.Optional; +import org.junit.Test; + +@SuppressWarnings("resource") +public class SubnetAddressTranslatorTest { + + @Test + public void should_translate_to_correct_subnet_address_ipv4() { + Map subnetAddresses = + ImmutableMap.of( + "\"100.64.0.0/15\"", "cassandra.datacenter1.com:19042", + "100.66.0.\"0/15\"", "cassandra.datacenter2.com:19042"); + DefaultDriverContext context = context(subnetAddresses); + SubnetAddressTranslator translator = new SubnetAddressTranslator(context); + InetSocketAddress address = new InetSocketAddress("100.64.0.1", 9042); + assertThat(translator.translate(address)) + .isEqualTo(InetSocketAddress.createUnresolved("cassandra.datacenter1.com", 19042)); + } + + @Test + public void should_translate_to_correct_subnet_address_ipv6() { + Map subnetAddresses = + ImmutableMap.of( + "\"::ffff:6440:0/111\"", "cassandra.datacenter1.com:19042", + "\"::ffff:6442:0/111\"", "cassandra.datacenter2.com:19042"); + DefaultDriverContext context = context(subnetAddresses); + SubnetAddressTranslator translator = new SubnetAddressTranslator(context); + InetSocketAddress address = new InetSocketAddress("::ffff:6440:1", 9042); + assertThat(translator.translate(address)) + .isEqualTo(InetSocketAddress.createUnresolved("cassandra.datacenter1.com", 19042)); + } + + @Test + public void should_translate_to_default_address() { + DefaultDriverContext context = context(ImmutableMap.of()); + when(context + .getConfig() + .getDefaultProfile() + .getString(ADDRESS_TRANSLATOR_DEFAULT_ADDRESS, null)) + .thenReturn("cassandra.com:19042"); + SubnetAddressTranslator translator = new SubnetAddressTranslator(context); + InetSocketAddress address = new InetSocketAddress("100.68.0.1", 9042); + assertThat(translator.translate(address)) + .isEqualTo(InetSocketAddress.createUnresolved("cassandra.com", 19042)); + } + + @Test + public void should_pass_through_not_matched_address() { + DefaultDriverContext context = context(ImmutableMap.of()); + SubnetAddressTranslator translator = new SubnetAddressTranslator(context); + InetSocketAddress address = new InetSocketAddress("100.68.0.1", 9042); + assertThat(translator.translate(address)).isEqualTo(address); + } + + @Test + public void should_fail_on_intersecting_subnets_ipv4() { + Map subnetAddresses = + ImmutableMap.of( + "\"100.64.0.0/15\"", "cassandra.datacenter1.com:19042", + "100.65.0.\"0/16\"", "cassandra.datacenter2.com:19042"); + DefaultDriverContext context = context(subnetAddresses); + assertThatIllegalArgumentException() + .isThrownBy(() -> new SubnetAddressTranslator(context)) + .withMessage("Configured subnets are overlapping: 100.64.0.0/15, 100.65.0.0/16"); + } + + @Test + public void should_fail_on_intersecting_subnets_ipv6() { + Map subnetAddresses = + ImmutableMap.of( + "\"::ffff:6440:0/111\"", "cassandra.datacenter1.com:19042", + "\"::ffff:6441:0/112\"", "cassandra.datacenter2.com:19042"); + DefaultDriverContext context = context(subnetAddresses); + assertThatIllegalArgumentException() + .isThrownBy(() -> new SubnetAddressTranslator(context)) + .withMessage("Configured subnets are overlapping: ::ffff:6440:0/111, ::ffff:6441:0/112"); + } + + @Test + public void should_fail_on_not_prefix_block_subnet_ipv4() { + Map subnetAddresses = + ImmutableMap.of("\"100.65.0.0/15\"", "cassandra.datacenter1.com:19042"); + DefaultDriverContext context = context(subnetAddresses); + assertThatIllegalArgumentException() + .isThrownBy(() -> new SubnetAddressTranslator(context)) + .withMessage( + "Subnet 100.65.0.0/15 must be represented as a network prefix block 100.64.0.0/15"); + } + + @Test + public void should_fail_on_not_prefix_block_subnet_ipv6() { + Map subnetAddresses = + ImmutableMap.of("\"::ffff:6441:0/111\"", "cassandra.datacenter1.com:19042"); + DefaultDriverContext context = context(subnetAddresses); + assertThatIllegalArgumentException() + .isThrownBy(() -> new SubnetAddressTranslator(context)) + .withMessage( + "Subnet ::ffff:6441:0/111 must be represented as a network prefix block ::ffff:6440:0/111"); + } + + @Test + public void should_fail_on_subnet_address_without_port() { + Map subnetAddresses = + ImmutableMap.of("\"100.64.0.0/15\"", "cassandra.datacenter1.com"); + DefaultDriverContext context = context(subnetAddresses); + assertThatIllegalArgumentException() + .isThrownBy(() -> new SubnetAddressTranslator(context)) + .withMessage("Invalid address cassandra.datacenter1.com (expecting format host:port)"); + } + + @Test + public void should_fail_on_default_address_without_port() { + DefaultDriverContext context = context(ImmutableMap.of()); + when(context + .getConfig() + .getDefaultProfile() + .getString(ADDRESS_TRANSLATOR_DEFAULT_ADDRESS, null)) + .thenReturn("cassandra.com"); + assertThatIllegalArgumentException() + .isThrownBy(() -> new SubnetAddressTranslator(context)) + .withMessage("Invalid address cassandra.com (expecting format host:port)"); + } + + private static DefaultDriverContext context(Map subnetAddresses) { + DriverExecutionProfile profile = mock(DriverExecutionProfile.class); + when(profile.getStringMap(ADDRESS_TRANSLATOR_SUBNET_ADDRESSES)).thenReturn(subnetAddresses); + return MockedDriverContextFactory.defaultDriverContext(Optional.of(profile)); + } +} diff --git a/core/src/test/java/com/datastax/oss/driver/internal/core/config/MockOptions.java b/core/src/test/java/com/datastax/oss/driver/internal/core/config/MockOptions.java index 25c1e8b26fd..cee57abbfdf 100644 --- a/core/src/test/java/com/datastax/oss/driver/internal/core/config/MockOptions.java +++ b/core/src/test/java/com/datastax/oss/driver/internal/core/config/MockOptions.java @@ -24,6 +24,7 @@ public enum MockOptions implements DriverOption { INT1("int1"), INT2("int2"), AUTH_PROVIDER("auth_provider"), + SUBNET_ADDRESSES("subnet_addresses"), ; private final String path; diff --git a/core/src/test/java/com/datastax/oss/driver/internal/core/config/typesafe/TypesafeDriverConfigTest.java b/core/src/test/java/com/datastax/oss/driver/internal/core/config/typesafe/TypesafeDriverConfigTest.java index 16ccb73da9f..4a78c3ccb03 100644 --- a/core/src/test/java/com/datastax/oss/driver/internal/core/config/typesafe/TypesafeDriverConfigTest.java +++ b/core/src/test/java/com/datastax/oss/driver/internal/core/config/typesafe/TypesafeDriverConfigTest.java @@ -101,7 +101,6 @@ public void should_fetch_string_map() { parse( "int1 = 42 \n auth_provider { auth_thing_one= one \n auth_thing_two = two \n auth_thing_three = three}"); DriverExecutionProfile base = config.getDefaultProfile(); - base.getStringMap(MockOptions.AUTH_PROVIDER); Map map = base.getStringMap(MockOptions.AUTH_PROVIDER); assertThat(map.entrySet().size()).isEqualTo(3); assertThat(map.get("auth_thing_one")).isEqualTo("one"); @@ -109,6 +108,19 @@ public void should_fetch_string_map() { assertThat(map.get("auth_thing_three")).isEqualTo("three"); } + @Test + public void should_fetch_string_map_with_forward_slash_in_keys() { + TypesafeDriverConfig config = + parse( + "subnet_addresses { 100.64.0.0/15 = \"cassandra.datacenter1.com:9042\" \n \"100.66.0.0/15\" = \"cassandra.datacenter2.com\" \n \"::ffff:6440:0/111\" = \"cassandra.datacenter3.com:19042\" }"); + DriverExecutionProfile base = config.getDefaultProfile(); + Map map = base.getStringMap(MockOptions.SUBNET_ADDRESSES); + assertThat(map.entrySet().size()).isEqualTo(3); + assertThat(map.get("100.64.0.\"0/15\"")).isEqualTo("cassandra.datacenter1.com:9042"); + assertThat(map.get("\"100.66.0.0/15\"")).isEqualTo("cassandra.datacenter2.com"); + assertThat(map.get("\"::ffff:6440:0/111\"")).isEqualTo("cassandra.datacenter3.com:19042"); + } + @Test public void should_create_derived_profile_with_string_map() { TypesafeDriverConfig config = parse("int1 = 42"); diff --git a/manual/core/address_resolution/README.md b/manual/core/address_resolution/README.md index 84efb4a796c..867b1d02735 100644 --- a/manual/core/address_resolution/README.md +++ b/manual/core/address_resolution/README.md @@ -118,6 +118,59 @@ datastax-java-driver.advanced.address-translator.class = com.mycompany.MyAddress Note: the contact points provided while creating the `CqlSession` are not translated, only addresses retrieved from or sent by Cassandra nodes are. +### Fixed proxy hostname + +If your client applications access Cassandra through some kind of proxy (eg. with AWS PrivateLink when all Cassandra +nodes are exposed via one hostname pointing to AWS Endpoint), you can configure driver with +`FixedHostNameAddressTranslator` to always translate all node addresses to that same proxy hostname, no matter what IP +address a node has but still using its native transport port. + +To use it, specify the following in the [configuration](../configuration): + +``` +datastax-java-driver.advanced.address-translator.class = FixedHostNameAddressTranslator +advertised-hostname = proxyhostname +``` + +### Fixed proxy hostname per subnet + +When running Cassandra in a private network and accessing it from outside of that private network via some kind of +proxy, we have an option to use `FixedHostNameAddressTranslator`. But for multi-datacenter Cassandra deployments, we +want to have more control over routing queries to a specific datacenter (eg. for optimizing latencies), which requires +setting up a separate proxy per datacenter. + +Normally, each Cassandra datacenter nodes are deployed to a different subnet to support internode communications in the +cluster and avoid IP address collisions. So when Cassandra broadcasts its nodes IP addresses, we can determine which +datacenter that node belongs to by checking its IP address against the given datacenter subnet. + +For such scenarios you can use `SubnetAddressTranslator` to translate node IPs to the datacenter proxy address +associated with it. + +To use it, specify the following in the [configuration](../configuration): +``` +datastax-java-driver.advanced.address-translator { + class = SubnetAddressTranslator + subnet-addresses { + "100.64.0.0/15" = "cassandra.datacenter1.com:9042" + "100.66.0.0/15" = "cassandra.datacenter2.com:9042" + # IPv6 example: + # "::ffff:6440:0/111" = "cassandra.datacenter1.com:9042" + # "::ffff:6442:0/111" = "cassandra.datacenter2.com:9042" + } + # Optional. When configured, addresses not matching the configured subnets are translated to this address. + default-address = "cassandra.datacenter1.com:9042" + # Whether to resolve the addresses once on initialization (if true) or on each node (re-)connection (if false). + # If not configured, defaults to false. + resolve-addresses = false +} +``` + +Such setup is common for running Cassandra on Kubernetes with [k8ssandra](https://docs.k8ssandra.io/). + +Note: this address translator has optional dependency +on [IPAddress](https://mvnrepository.com/artifact/com.github.seancfoley/ipaddress) which must be explicitly set in your +project to use this address translator. + ### EC2 multi-region If you deploy both Cassandra and client applications on Amazon EC2, and your cluster spans multiple regions, you'll have diff --git a/pom.xml b/pom.xml index 3fd2d1347c2..1b8cf3a5a74 100644 --- a/pom.xml +++ b/pom.xml @@ -74,6 +74,7 @@ 1.1.10.1 1.7.1 + 5.5.1 3.19.0 1.3 @@ -157,6 +158,11 @@ HdrHistogram ${hdrhistogram.version} + + com.github.seancfoley + ipaddress + ${ipaddress.version} + com.esri.geometry esri-geometry-api