Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,11 @@
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;

Expand All @@ -40,7 +42,6 @@
import com.alipay.sofa.jraft.JRaftUtils;
import com.alipay.sofa.jraft.Node;
import com.alipay.sofa.jraft.RaftGroupService;
import com.alipay.sofa.jraft.ReplicatorGroup;
import com.alipay.sofa.jraft.Status;
import com.alipay.sofa.jraft.conf.Configuration;
import com.alipay.sofa.jraft.core.Replicator;
Expand All @@ -54,7 +55,6 @@
import com.alipay.sofa.jraft.rpc.RpcServer;
import com.alipay.sofa.jraft.rpc.impl.BoltRpcServer;
import com.alipay.sofa.jraft.util.Endpoint;
import com.alipay.sofa.jraft.util.ThreadId;
import com.alipay.sofa.jraft.util.internal.ThrowUtil;

import io.netty.channel.ChannelHandler;
Expand Down Expand Up @@ -313,23 +313,61 @@ public List<Metapb.Member> getMembers() throws ExecutionException, InterruptedEx

public Status changePeerList(String peerList) {
AtomicReference<Status> result = new AtomicReference<>();
Configuration newPeers = new Configuration();
try {
String[] peers = peerList.split(",", -1);
if ((peers.length & 1) != 1) {
throw new PDException(-1, "the number of peer list must be odd.");
}
Configuration newPeers = new Configuration();
newPeers.parse(peerList);
CountDownLatch latch = new CountDownLatch(1);
this.raftNode.changePeers(newPeers, status -> {
result.set(status);
// Use compareAndSet so a late callback does not overwrite a timeout status
result.compareAndSet(null, status);
latch.countDown();
});
latch.await();
// Use configured RPC timeout — bare await() would block forever if
// the callback is never invoked (e.g. node not started / RPC failure)
boolean completed = latch.await(3L * config.getRpcTimeout(),
TimeUnit.MILLISECONDS);
if (!completed && result.get() == null) {
Status timeoutStatus = new Status(RaftError.EINTERNAL,
"changePeerList timed out after %d ms",
3L * config.getRpcTimeout());
if (!result.compareAndSet(null, timeoutStatus)) {
// Callback arrived just before us — keep its result
timeoutStatus = null;
}
if (timeoutStatus != null) {
log.error("changePeerList to {} timed out after {} ms",
peerList, 3L * config.getRpcTimeout());
}
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
result.set(new Status(RaftError.EINTERNAL, "changePeerList interrupted"));
log.error("changePeerList to {} was interrupted", peerList, e);
} catch (Exception e) {
log.error("failed to changePeerList to {},{}", peerList, e);
result.set(new Status(-1, e.getMessage()));
}

// Refresh IpAuthHandler so newly added peers are not blocked
if (result.get() != null && result.get().isOk()) {
IpAuthHandler handler = IpAuthHandler.getInstance();
if (handler != null) {
Set<String> newIps = newPeers.getPeers()
.stream()
.map(PeerId::getIp)
.collect(Collectors.toSet());
handler.refresh(newIps);
log.info("IpAuthHandler refreshed after peer list change to: {}", peerList);
} else {
log.warn("IpAuthHandler not initialized, skipping refresh for peer list: {}",
peerList);
}
}

return result.get();
}

Expand Down Expand Up @@ -366,7 +404,8 @@ private boolean peerEquals(PeerId p1, PeerId p2) {
if (p1 == null || p2 == null) {
return false;
}
return Objects.equals(p1.getIp(), p2.getIp()) && Objects.equals(p1.getPort(), p2.getPort());
return Objects.equals(p1.getIp(), p2.getIp()) &&
Objects.equals(p1.getPort(), p2.getPort());
}

private Replicator.State getReplicatorState(PeerId peerId) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,11 @@

package org.apache.hugegraph.pd.raft.auth;

import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;

import io.netty.channel.ChannelDuplexHandler;
Expand All @@ -30,11 +33,11 @@
@ChannelHandler.Sharable
public class IpAuthHandler extends ChannelDuplexHandler {

private final Set<String> allowedIps;
private volatile Set<String> resolvedIps;
private static volatile IpAuthHandler instance;

private IpAuthHandler(Set<String> allowedIps) {
this.allowedIps = Collections.unmodifiableSet(allowedIps);
this.resolvedIps = resolveAll(allowedIps);
}

public static IpAuthHandler getInstance(Set<String> allowedIps) {
Expand All @@ -48,6 +51,25 @@ public static IpAuthHandler getInstance(Set<String> allowedIps) {
return instance;
}

/**
* Returns the existing singleton instance, or null if not yet initialized.
* Should only be called after getInstance(Set) has been called during startup.
*/
public static IpAuthHandler getInstance() {
return instance;
}

/**
* Refreshes the resolved IP allowlist from a new set of hostnames or IPs.
* Should be called when the Raft peer list changes via RaftEngine#changePeerList().
* Note: DNS-only changes (e.g. container restart with new IP, same hostname)
* are not automatically detected and still require a process restart.
*/
public void refresh(Set<String> newAllowedIps) {
this.resolvedIps = resolveAll(newAllowedIps);
log.info("IpAuthHandler allowlist refreshed, resolved {} entries", resolvedIps.size());
}

@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
String clientIp = getClientIp(ctx);
Expand All @@ -65,7 +87,25 @@ private static String getClientIp(ChannelHandlerContext ctx) {
}

private boolean isIpAllowed(String ip) {
return allowedIps.isEmpty() || allowedIps.contains(ip);
Set<String> resolved = this.resolvedIps;
// Empty allowlist means no restriction is configured — allow all
return resolved.isEmpty() || resolved.contains(ip);
}

private static Set<String> resolveAll(Set<String> entries) {
Set<String> result = new HashSet<>(entries);

for (String entry : entries) {
try {
for (InetAddress addr : InetAddress.getAllByName(entry)) {
result.add(addr.getHostAddress());
}
} catch (UnknownHostException e) {
log.warn("Could not resolve allowlist entry '{}': {}", entry, e.getMessage());
}
}

return Collections.unmodifiableSet(result);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,9 @@ public RestApiResponse getMembers() throws InterruptedException, ExecutionExcept
* @return Returns a JSON string containing the modification results
* @throws Exception If an exception occurs during request processing, service invocation, or Peer list modification, it is captured and returned as the JSON representation of the exception
*/
// TODO: this endpoint has no authentication check — any caller with network
// access to the management port can trigger a peer list change.
// Wire authentication here as part of the planned auth refactor.
@PostMapping(value = "/members/change", consumes = MediaType.APPLICATION_JSON_VALUE,
produces = MediaType.APPLICATION_JSON_VALUE)
@ResponseBody
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -85,6 +86,7 @@
import org.apache.hugegraph.pd.raft.PeerUtil;
import org.apache.hugegraph.pd.raft.RaftEngine;
import org.apache.hugegraph.pd.raft.RaftStateListener;
import org.apache.hugegraph.pd.raft.auth.IpAuthHandler;
import org.apache.hugegraph.pd.util.grpc.StreamObserverUtil;
import org.apache.hugegraph.pd.watch.PDWatchSubject;
import org.lognet.springboot.grpc.GRpcService;
Expand Down Expand Up @@ -1735,6 +1737,17 @@ public void updatePdRaft(Pdpb.UpdatePdRaftRequest request,
node.changePeers(config, status -> {
if (status.isOk()) {
log.info("updatePdRaft, change peers success");
// Refresh IpAuthHandler so newly added peers are not blocked
IpAuthHandler handler = IpAuthHandler.getInstance();
if (handler != null) {
Set<String> newIps = new HashSet<>();
config.getPeers().forEach(p -> newIps.add(p.getIp()));
config.getLearners().forEach(p -> newIps.add(p.getIp()));
handler.refresh(newIps);
log.info("IpAuthHandler refreshed after updatePdRaft peer change");
} else {
log.warn("IpAuthHandler not initialized, skipping refresh");
}
} else {
log.error("changePeers status: {}, msg:{}, code: {}, raft error:{}",
status, status.getErrorMsg(), status.getCode(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ protected <T> T authenticate(String authority, String token, Function<String, T>
}

String name = info.substring(0, delim);
// TODO: password validation is skipped — only service name is checked against
// innerModules. Full credential validation should be added as part of the auth refactor.
//String pwd = info.substring(delim + 1);
if (innerModules.contains(name)) {
return call.get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ public void configure(ServerBuilder<?> serverBuilder) {
HgExecutorUtil.createExecutor(EXECUTOR_NAME, poolGrpc.getCore(), poolGrpc.getMax(),
poolGrpc.getQueue()));
serverBuilder.maxInboundMessageSize(MAX_INBOUND_MESSAGE_SIZE);
// TODO: GrpcAuthentication is instantiated as a Spring bean but never registered
// here — add serverBuilder.intercept(grpcAuthentication) once auth is refactored.
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

import org.apache.hugegraph.pd.core.meta.MetadataKeyHelperTest;
import org.apache.hugegraph.pd.core.store.HgKVStoreImplTest;
import org.apache.hugegraph.pd.raft.IpAuthHandlerTest;
import org.apache.hugegraph.pd.raft.RaftEngineIpAuthIntegrationTest;
import org.junit.runner.RunWith;
import org.junit.runners.Suite;

Expand All @@ -36,6 +38,8 @@
StoreMonitorDataServiceTest.class,
StoreServiceTest.class,
TaskScheduleServiceTest.class,
IpAuthHandlerTest.class,
RaftEngineIpAuthIntegrationTest.class,
// StoreNodeServiceTest.class,
})
@Slf4j
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hugegraph.pd.raft;

import java.net.InetAddress;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;

import org.apache.hugegraph.pd.raft.auth.IpAuthHandler;
import org.apache.hugegraph.testutil.Whitebox;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class IpAuthHandlerTest {

@Before
public void setUp() {
// Must reset BEFORE each test — earlier suite classes (e.g. ConfigServiceTest)
// initialize RaftEngine which creates the IpAuthHandler singleton with their
// own peer IPs. Without this reset, our getInstance() calls return the stale
// singleton and ignore the allowlist passed by the test.
Whitebox.setInternalState(IpAuthHandler.class, "instance", null);
}

@After
public void tearDown() {
// Must reset AFTER each test — prevents our test singleton from leaking
// into later suite classes that also depend on IpAuthHandler state.
Whitebox.setInternalState(IpAuthHandler.class, "instance", null);
}

private boolean isIpAllowed(IpAuthHandler handler, String ip) {
return Whitebox.invoke(IpAuthHandler.class,
new Class[]{String.class},
"isIpAllowed", handler, ip);
}

@Test
public void testHostnameResolvesToIp() throws Exception {
// "localhost" should resolve to one or more IPs via InetAddress.getAllByName()
// This verifies the core fix: hostname allowlists match numeric remote addresses
// Using dynamic resolution avoids hardcoding "127.0.0.1" which may not be
// returned on IPv6-only or custom resolver environments
IpAuthHandler handler = IpAuthHandler.getInstance(
Collections.singleton("localhost"));
InetAddress[] addresses = InetAddress.getAllByName("localhost");
// All resolved addresses should be allowed — resolveAll() adds every address
// returned by getAllByName() so none should be blocked
Assert.assertTrue("Expected at least one resolved address",
addresses.length > 0);
for (InetAddress address : addresses) {
Assert.assertTrue(
"Expected " + address.getHostAddress() + " to be allowed",
isIpAllowed(handler, address.getHostAddress()));
}
}

@Test
public void testUnresolvableHostnameDoesNotCrash() {
// Should log a warning and skip — no exception thrown during construction
IpAuthHandler handler = IpAuthHandler.getInstance(
Collections.singleton("nonexistent.invalid.hostname"));
// Handler was still created successfully despite bad hostname
Assert.assertNotNull(handler);
// Unresolvable entry is skipped so no IPs should be allowed
Assert.assertFalse(isIpAllowed(handler, "127.0.0.1"));
Assert.assertFalse(isIpAllowed(handler, "192.168.0.1"));
}

@Test
public void testRefreshUpdatesResolvedIps() {
// Start with 127.0.0.1
IpAuthHandler handler = IpAuthHandler.getInstance(
Collections.singleton("127.0.0.1"));
Assert.assertTrue(isIpAllowed(handler, "127.0.0.1"));

// Refresh with a different IP — verifies refresh() swaps the set correctly
Set<String> newIps = new HashSet<>();
newIps.add("192.168.0.1");
handler.refresh(newIps);

// Old IP should no longer be allowed
Assert.assertFalse(isIpAllowed(handler, "127.0.0.1"));
// New IP should now be allowed
Assert.assertTrue(isIpAllowed(handler, "192.168.0.1"));
}

@Test
public void testEmptyAllowlistAllowsAll() {
// Empty allowlist = no restriction configured = allow all connections
// This is intentional fallback behavior and must be explicitly tested
// because it is a security-relevant boundary
IpAuthHandler handler = IpAuthHandler.getInstance(
Collections.emptySet());
Assert.assertTrue(isIpAllowed(handler, "1.2.3.4"));
Assert.assertTrue(isIpAllowed(handler, "192.168.99.99"));
}

@Test
public void testGetInstanceReturnsSingletonIgnoresNewAllowlist() {
// First call creates the singleton with 127.0.0.1
IpAuthHandler first = IpAuthHandler.getInstance(
Collections.singleton("127.0.0.1"));
// Second call with a different set must return the same instance
// and must NOT reinitialize or override the existing allowlist
IpAuthHandler second = IpAuthHandler.getInstance(
Collections.singleton("192.168.0.1"));
Assert.assertSame(first, second);
// Original allowlist still in effect
Assert.assertTrue(isIpAllowed(second, "127.0.0.1"));
// New set was ignored — 192.168.0.1 should not be allowed
Assert.assertFalse(isIpAllowed(second, "192.168.0.1"));
}
}
Loading
Loading