Skip to content

Commit

Permalink
Adapt to latest vertx-core changes
Browse files Browse the repository at this point in the history
  • Loading branch information
vietj committed Nov 29, 2023
1 parent eb6bd59 commit 5e996e9
Show file tree
Hide file tree
Showing 8 changed files with 31 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,14 @@

public class ServiceResolverImpl implements ServiceResolver {

private final BiFunction<Vertx, ServiceResolverImpl, AddressResolver<?, ?, ?>> provider;
private final BiFunction<Vertx, ServiceResolverImpl, AddressResolver<?, ?, ?, ?>> provider;

public ServiceResolverImpl(BiFunction<Vertx, ServiceResolverImpl, AddressResolver<?, ?, ?>> provider) {
public ServiceResolverImpl(BiFunction<Vertx, ServiceResolverImpl, AddressResolver<?, ?, ?, ?>> provider) {
this.provider = provider;
}

@Override
public AddressResolver<?, ?, ?> resolver(Vertx vertx) {
public AddressResolver<?, ?, ?, ?> resolver(Vertx vertx) {
return provider.apply(vertx, this);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import io.vertx.core.net.Address;
import io.vertx.core.net.SocketAddress;
import io.vertx.core.spi.resolver.address.AddressResolver;
import io.vertx.core.spi.resolver.address.Endpoint;
import io.vertx.serviceresolver.ServiceAddress;
import io.vertx.serviceresolver.kube.KubeResolverOptions;

Expand All @@ -28,7 +27,7 @@

import static io.vertx.core.http.HttpMethod.GET;

public class KubeResolverImpl implements AddressResolver<ServiceAddress, SocketAddress, KubeServiceState> {
public class KubeResolverImpl<B> implements AddressResolver<ServiceAddress, SocketAddress, KubeServiceState<B>, B> {

final KubeResolverOptions options;
final String host;
Expand Down Expand Up @@ -60,7 +59,7 @@ public ServiceAddress tryCast(Address address) {
}

@Override
public Future<KubeServiceState> resolve(Function<SocketAddress, Endpoint<SocketAddress>> factory, ServiceAddress address) {
public Future<KubeServiceState<B>> resolve(Function<SocketAddress, B> factory, ServiceAddress address) {
return httpClient
.request(GET, port, host, "/api/v1/namespaces/" + namespace + "/endpoints")
.compose(req -> {
Expand All @@ -84,7 +83,7 @@ public Future<KubeServiceState> resolve(Function<SocketAddress, Endpoint<SocketA
});
}).map(response -> {
String resourceVersion = response.getJsonObject("metadata").getString("resourceVersion");
KubeServiceState state = new KubeServiceState(factory, this, vertx, resourceVersion, address.name());
KubeServiceState<B> state = new KubeServiceState<>(factory, this, vertx, resourceVersion, address.name());
JsonArray items = response.getJsonArray("items");
for (int i = 0;i < items.size();i++) {
JsonObject item = items.getJsonObject(i);
Expand All @@ -93,14 +92,14 @@ public Future<KubeServiceState> resolve(Function<SocketAddress, Endpoint<SocketA
return state;
}).andThen(ar -> {
if (ar.succeeded()) {
KubeServiceState res = ar.result();
KubeServiceState<B> res = ar.result();
res.connectWebSocket();
}
});
}

@Override
public List<Endpoint<SocketAddress>> endpoints(KubeServiceState state) {
public List<B> endpoints(KubeServiceState<B> state) {
return state.endpoints.get();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,26 +17,25 @@
import io.vertx.core.json.JsonArray;
import io.vertx.core.json.JsonObject;
import io.vertx.core.net.SocketAddress;
import io.vertx.core.spi.resolver.address.Endpoint;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;

class KubeServiceState {
class KubeServiceState<B> {

final String name;
final Vertx vertx;
final KubeResolverImpl resolver;
final Function<SocketAddress, Endpoint<SocketAddress>> endpointFactory;
final Function<SocketAddress, B> endpointFactory;
String lastResourceVersion;
boolean disposed;
WebSocket ws;
AtomicReference<List<Endpoint<SocketAddress>>> endpoints = new AtomicReference<>(Collections.emptyList());
AtomicReference<List<B>> endpoints = new AtomicReference<>(Collections.emptyList());

KubeServiceState(Function<SocketAddress, Endpoint<SocketAddress>> endpointFactory, KubeResolverImpl resolver, Vertx vertx, String lastResourceVersion, String name) {
KubeServiceState(Function<SocketAddress, B> endpointFactory, KubeResolverImpl resolver, Vertx vertx, String lastResourceVersion, String name) {
this.endpointFactory = endpointFactory;
this.name = name;
this.resolver = resolver;
Expand Down Expand Up @@ -102,7 +101,7 @@ void handleEndpoints(JsonObject item) {
if (this.name.equals(name)) {
JsonArray subsets = item.getJsonArray("subsets");
if (subsets != null) {
List<Endpoint<SocketAddress>> endpoints = new ArrayList<>();
List<B> endpoints = new ArrayList<>();
for (int j = 0;j < subsets.size();j++) {
List<String> podIps = new ArrayList<>();
JsonObject subset = subsets.getJsonObject(j);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,14 @@
import io.vertx.core.net.Address;
import io.vertx.core.net.SocketAddress;
import io.vertx.core.spi.resolver.address.AddressResolver;
import io.vertx.core.spi.resolver.address.Endpoint;
import io.vertx.serviceresolver.ServiceAddress;
import io.vertx.serviceresolver.srv.SrvResolverOptions;

import java.util.ArrayList;
import java.util.List;
import java.util.function.Function;

public class SrvResolverImpl implements AddressResolver<ServiceAddress, SrvRecord, SrvServiceState> {
public class SrvResolverImpl<B> implements AddressResolver<ServiceAddress, SrvRecord, SrvServiceState<B>, B> {

Vertx vertx;
DnsClient client;
Expand All @@ -45,19 +44,21 @@ public ServiceAddress tryCast(Address address) {
}

@Override
public Future<SrvServiceState> resolve(Function<SrvRecord, Endpoint<SrvRecord>> factory, ServiceAddress address) {
public Future<SrvServiceState<B>> resolve(Function<SrvRecord, B> factory, ServiceAddress address) {
Future<List<SrvRecord>> fut = client.resolveSRV(address.name());
return fut.map(records -> {
List<Endpoint<SrvRecord>> endpoints = new ArrayList<>();
long ttl = 10_000_000;
List<B> endpoints = new ArrayList<>();
for (SrvRecord record : records) {
endpoints.add(factory.apply(record));
ttl = Math.min(ttl, record.ttl());
}
return new SrvServiceState(System.currentTimeMillis(), endpoints);
return new SrvServiceState<>(endpoints, System.currentTimeMillis() + 1000 * ttl);
});
}

@Override
public List<Endpoint<SrvRecord>> endpoints(SrvServiceState state) {
public List<B> endpoints(SrvServiceState<B> state) {
return state.endpoints;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,28 +10,19 @@
*/
package io.vertx.serviceresolver.srv.impl;

import io.vertx.core.dns.SrvRecord;
import io.vertx.core.spi.resolver.address.Endpoint;

import java.util.List;

class SrvServiceState {
class SrvServiceState<B> {

final long timestamp;
final List<Endpoint<SrvRecord>> endpoints;
final List<B> endpoints;
final long expirationMs;

public SrvServiceState(long timestamp, List<Endpoint<SrvRecord>> endpoints) {
this.timestamp = timestamp;
public SrvServiceState(List<B> endpoints, long expirationMs) {
this.endpoints = endpoints;
this.expirationMs = expirationMs;
}

boolean isValid() {
long now = System.currentTimeMillis();
for (Endpoint<SrvRecord> endpoint : endpoints) {
if (now > endpoint.get().ttl() * 1000 + timestamp) {
return false;
}
}
return true;
return System.currentTimeMillis() <= expirationMs;
}
}
Original file line number Diff line number Diff line change
@@ -1,14 +1,11 @@
package io.vertx.serviceresolver.srv;

import io.vertx.core.VertxOptions;
import io.vertx.core.dns.AddressResolverOptions;
import io.vertx.ext.unit.TestContext;
import io.vertx.serviceresolver.ServiceAddress;
import io.vertx.serviceresolver.ServiceResolverTestBase;
import io.vertx.test.fakedns.FakeDNSServer;
import org.apache.directory.server.dns.messages.*;
import org.apache.directory.server.dns.store.DnsAttribute;
import org.junit.Ignore;
import org.junit.Test;
import org.apache.directory.server.dns.store.RecordStore;

Expand Down
9 changes: 4 additions & 5 deletions src/test/java/mock/MockResolver.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import io.vertx.core.net.Address;
import io.vertx.core.net.SocketAddress;
import io.vertx.core.spi.resolver.address.AddressResolver;
import io.vertx.core.spi.resolver.address.Endpoint;
import io.vertx.serviceresolver.impl.*;
import io.vertx.serviceresolver.ServiceAddress;
import io.vertx.serviceresolver.ServiceResolver;
Expand All @@ -14,7 +13,7 @@
import java.util.concurrent.ConcurrentMap;
import java.util.function.Function;

public class MockResolver implements AddressResolver<ServiceAddress, SocketAddress, MockServiceState> {
public class MockResolver<B> implements AddressResolver<ServiceAddress, SocketAddress, MockServiceState<B>, B> {

public static ServiceResolver create(MockController controller) {
return new ServiceResolverImpl((vertx, lookup) -> {
Expand Down Expand Up @@ -48,12 +47,12 @@ public SocketAddress addressOfEndpoint(SocketAddress endpoint) {
}

@Override
public Future<MockServiceState> resolve(Function<SocketAddress, Endpoint<SocketAddress>> factory, ServiceAddress address) {
public Future<MockServiceState<B>> resolve(Function<SocketAddress, B> factory, ServiceAddress address) {
List<SocketAddress> endpoints = templates.get(address.name());
if (endpoints == null) {
return Future.failedFuture("No addresses for service svc");
}
MockServiceState state = new MockServiceState();
MockServiceState<B> state = new MockServiceState<>();
// state.set(endpoints);
return Future.succeededFuture(state);
}
Expand All @@ -64,7 +63,7 @@ public void dispose(MockServiceState state) {
}

@Override
public List<Endpoint<SocketAddress>> endpoints(MockServiceState state) {
public List<B> endpoints(MockServiceState state) {
throw new UnsupportedOperationException();
}

Expand Down
2 changes: 1 addition & 1 deletion src/test/java/mock/MockServiceState.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package mock;

public class MockServiceState {
public class MockServiceState<B> {

boolean disposed;

Expand Down

0 comments on commit 5e996e9

Please sign in to comment.