Skip to content

Commit 6372439

Browse files
authored
fix: make service creation timeouts in a cluster more robust (CloudNetService#1085)
### Motivation The current handling of service starts has a small issue in the way services are started: ```mermaid flowchart TB A["Head Node"] -->|Start Request| B["Node to start on"] B -- Create Result --> A B -->|Publish| C["All other components"] ``` The head node waits for the node to start the service for 5 seconds, and if nothing happended will try to re-create the service or continue with the normal ticking (which might trigger a new service start try). However, the remote node is unaware that the service creation timed out and will still register the service locally and publish its info the cluster, which might lead to duplicate service creations. ### Modification The new system is more aware of delays and handles the creation mistakes much better: ```mermaid flowchart TB A1["Head Node"] -->Z1["Start request with timeout (20 seconds)"] Z1-->|To target node| A2 C1["Head Node Register Try"] C1 -->|"Success (Send by Head Node to Target Node)"| D2 C1 -->|Failure| G2 Z1 -->|Timeout| G2 C2-.->|"TTL exceeded"| G2 A2["Request received"]-->B2["Service created"]-->C2["Register to waiting services (TTL: 1 Minute)"]-->|Responds with Create Result| C1 D2["Removal from unaccepted services"] D2-->|Success| E2["Register as local Service"] D2-->|TTL exceeded| H2["Unregister from Head Node"] G2["Auto remove"] E2-->|Publish of Service Info| F2["All other components"] ``` With that way the head node takes full control over service creations and no longer allows a node to do things independent from the head node. That allows us to ensure that service registrations in the cluster happen once, and only once without any side effects for later service starts which are requested by the head node. ### Result Services should no longer get registered as "ghosts" but only fully controlled by the head node, and removed properly in case a service creation timeout occurs. ##### Other context Fixes CloudNetService#994
1 parent 3fdeee5 commit 6372439

File tree

6 files changed

+173
-22
lines changed

6 files changed

+173
-22
lines changed

node/src/main/java/eu/cloudnetservice/node/network/listener/message/ServiceChannelMessageListener.java

+33
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,14 @@ public void handleChannelMessage(@NonNull ChannelMessageReceiveEvent event) {
7272
.build()));
7373
}
7474

75+
// feedback from a node that a service which should have been moved to accepted
76+
// is no longer registered as unaccepted and not yet moved to registered, which
77+
// means that the cache ttl on the target node exceeded
78+
case "node_to_head_node_unaccepted_service_ttl_exceeded" -> {
79+
var serviceUniqueId = event.content().readUniqueId();
80+
this.serviceManager.forceRemoveRegisteredService(serviceUniqueId);
81+
}
82+
7583
// request to start a service on the local node
7684
case "head_node_to_node_start_service" -> {
7785
var configuration = event.content().readObject(ServiceConfiguration.class);
@@ -80,6 +88,31 @@ public void handleChannelMessage(@NonNull ChannelMessageReceiveEvent event) {
8088
event.binaryResponse(DataBuf.empty().writeObject(ServiceCreateResult.created(service.serviceInfo())));
8189
}
8290

91+
// publish the service info of a created service to the cluster
92+
case "head_node_to_node_finish_service_registration" -> {
93+
var serviceUniqueId = event.content().readUniqueId();
94+
var service = this.serviceManager.takeUnacceptedService(serviceUniqueId);
95+
96+
if (service != null) {
97+
// service is still locally present, finish the registration of it
98+
service.handleServiceRegister();
99+
} else {
100+
// service is no longer locally present as unaccepted
101+
// re-check if the service was already moved to registered
102+
var registeredService = this.serviceManager.localCloudService(serviceUniqueId);
103+
if (registeredService == null) {
104+
// send this as feedback to the head node in order to remove the registered service from there as well
105+
ChannelMessage.builder()
106+
.target(event.sender().toTarget())
107+
.channel(NetworkConstants.INTERNAL_MSG_CHANNEL)
108+
.message("node_to_head_node_unaccepted_service_ttl_exceeded")
109+
.buffer(DataBuf.empty().writeUniqueId(serviceUniqueId))
110+
.build()
111+
.send();
112+
}
113+
}
114+
}
115+
83116
// update of a service in the network
84117
case "update_service_info" -> {
85118
var snapshot = event.content().readObject(ServiceInfoSnapshot.class);

node/src/main/java/eu/cloudnetservice/node/service/CloudService.java

+3
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,9 @@ public interface CloudService extends SpecificCloudServiceProvider {
7171

7272
void publishServiceInfoSnapshot();
7373

74+
@ApiStatus.Internal
75+
void handleServiceRegister();
76+
7477
@ApiStatus.Internal
7578
void updateServiceInfoSnapshot(@NonNull ServiceInfoSnapshot serviceInfoSnapshot);
7679
}

node/src/main/java/eu/cloudnetservice/node/service/CloudServiceManager.java

+15-2
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@
3131
import lombok.NonNull;
3232
import org.jetbrains.annotations.ApiStatus;
3333
import org.jetbrains.annotations.Nullable;
34-
import org.jetbrains.annotations.UnknownNullability;
3534
import org.jetbrains.annotations.Unmodifiable;
3635
import org.jetbrains.annotations.UnmodifiableView;
3736

@@ -95,7 +94,21 @@ void addServicePreparer(
9594
void unregisterLocalService(@NonNull CloudService service);
9695

9796
@ApiStatus.Internal
98-
void handleServiceUpdate(@NonNull ServiceInfoSnapshot snapshot, @UnknownNullability NetworkChannel source);
97+
void registerUnacceptedService(@NonNull CloudService service);
98+
99+
@ApiStatus.Internal
100+
@Nullable CloudService takeUnacceptedService(@NonNull UUID serviceUniqueId);
101+
102+
@ApiStatus.Internal
103+
void forceRemoveRegisteredService(@NonNull UUID uniqueId);
104+
105+
@ApiStatus.Internal
106+
@Nullable SpecificCloudServiceProvider registerService(
107+
@NonNull ServiceInfoSnapshot snapshot,
108+
@NonNull NetworkChannel source);
109+
110+
@ApiStatus.Internal
111+
void handleServiceUpdate(@NonNull ServiceInfoSnapshot snapshot, @Nullable NetworkChannel source);
99112

100113
@ApiStatus.Internal
101114
@NonNull CloudService createLocalCloudService(@NonNull ServiceConfiguration serviceConfiguration);

node/src/main/java/eu/cloudnetservice/node/service/defaults/AbstractService.java

+16-3
Original file line numberDiff line numberDiff line change
@@ -153,10 +153,10 @@ protected AbstractService(
153153
-1,
154154
ServiceLifeCycle.PREPARED,
155155
configuration.properties().clone());
156-
this.pushServiceInfoSnapshotUpdate(ServiceLifeCycle.PREPARED);
156+
this.pushServiceInfoSnapshotUpdate(ServiceLifeCycle.PREPARED, false);
157157

158-
manager.registerLocalService(this);
159-
eventManager.callEvent(new CloudServiceCreateEvent(this));
158+
// register the service locally for now
159+
manager.registerUnacceptedService(this);
160160
}
161161

162162
protected static @NonNull Path resolveServicePath(
@@ -531,6 +531,19 @@ public void publishServiceInfoSnapshot() {
531531
.send();
532532
}
533533

534+
@Override
535+
public void handleServiceRegister() {
536+
// just ensure that this service is removed from the cache & moved to a "real" registered local service
537+
this.cloudServiceManager.registerLocalService(this);
538+
this.cloudServiceManager.takeUnacceptedService(this.serviceId().uniqueId());
539+
540+
// publish the initial service info to the cluster
541+
this.pushServiceInfoSnapshotUpdate(ServiceLifeCycle.PREPARED);
542+
543+
// notify the local listeners that this service was created
544+
this.eventManager.callEvent(new CloudServiceCreateEvent(this));
545+
}
546+
534547
@Override
535548
public void updateServiceInfoSnapshot(@NonNull ServiceInfoSnapshot serviceInfoSnapshot) {
536549
this.lastServiceInfo = this.currentServiceInfo;

node/src/main/java/eu/cloudnetservice/node/service/defaults/DefaultCloudServiceManager.java

+52-10
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616

1717
package eu.cloudnetservice.node.service.defaults;
1818

19+
import com.github.benmanes.caffeine.cache.Cache;
20+
import com.github.benmanes.caffeine.cache.Caffeine;
1921
import com.google.common.collect.ComparisonChain;
2022
import dev.derklaro.aerogel.PostConstruct;
2123
import dev.derklaro.aerogel.auto.Provides;
@@ -61,6 +63,7 @@
6163
import jakarta.inject.Named;
6264
import jakarta.inject.Singleton;
6365
import java.nio.file.Path;
66+
import java.time.Duration;
6467
import java.util.Arrays;
6568
import java.util.Collection;
6669
import java.util.Collections;
@@ -73,7 +76,6 @@
7376
import java.util.stream.Collectors;
7477
import lombok.NonNull;
7578
import org.jetbrains.annotations.Nullable;
76-
import org.jetbrains.annotations.UnknownNullability;
7779
import org.jetbrains.annotations.Unmodifiable;
7880
import org.jetbrains.annotations.UnmodifiableView;
7981

@@ -96,6 +98,10 @@ public class DefaultCloudServiceManager implements CloudServiceManager {
9698
protected final CloudServiceFactory cloudServiceFactory;
9799

98100
protected final Map<UUID, SpecificCloudServiceProvider> knownServices = new ConcurrentHashMap<>();
101+
protected final Cache<UUID, CloudService> localUnacceptedServices = Caffeine.newBuilder()
102+
.expireAfterWrite(Duration.ofMinutes(1))
103+
.build();
104+
99105
protected final Map<String, LocalCloudServiceFactory> cloudServiceFactories = new ConcurrentHashMap<>();
100106
protected final Map<ServiceEnvironmentType, ServiceConfigurationPreparer> preparers = new ConcurrentHashMap<>();
101107

@@ -441,7 +447,48 @@ public void unregisterLocalService(@NonNull CloudService service) {
441447
}
442448

443449
@Override
444-
public void handleServiceUpdate(@NonNull ServiceInfoSnapshot snapshot, @UnknownNullability NetworkChannel source) {
450+
public void registerUnacceptedService(@NonNull CloudService service) {
451+
this.localUnacceptedServices.put(service.serviceId().uniqueId(), service);
452+
}
453+
454+
@Override
455+
public @Nullable CloudService takeUnacceptedService(@NonNull UUID serviceUniqueId) {
456+
// this is the correct way to invalidate & get the value associated with the id in the cache
457+
// see https://stackoverflow.com/a/67994912/13008679
458+
return this.localUnacceptedServices.asMap().remove(serviceUniqueId);
459+
}
460+
461+
@Override
462+
public void forceRemoveRegisteredService(@NonNull UUID uniqueId) {
463+
this.knownServices.remove(uniqueId);
464+
}
465+
466+
@Override
467+
public @Nullable SpecificCloudServiceProvider registerService(
468+
@NonNull ServiceInfoSnapshot snapshot,
469+
@NonNull NetworkChannel source
470+
) {
471+
// check if the service provider is already registered, return null to indicate that we didn't register the service
472+
var serviceUniqueId = snapshot.serviceId().uniqueId();
473+
if (this.knownServices.containsKey(serviceUniqueId)) {
474+
return null;
475+
}
476+
477+
// build the service provider for the newly added service
478+
var serviceProvider = this.sender.factory().generateRPCChainBasedApi(
479+
this.sender,
480+
"serviceProvider",
481+
SpecificCloudServiceProvider.class,
482+
GenerationContext.forClass(RemoteNodeCloudServiceProvider.class).channelSupplier(() -> source).build()
483+
).newInstance(new Object[]{snapshot}, new Object[]{snapshot.serviceId().uniqueId()});
484+
485+
// register the service and return the new provider, unless some other thread registered the service
486+
var knownProvider = this.knownServices.putIfAbsent(serviceUniqueId, serviceProvider);
487+
return knownProvider == null ? serviceProvider : null;
488+
}
489+
490+
@Override
491+
public void handleServiceUpdate(@NonNull ServiceInfoSnapshot snapshot, @Nullable NetworkChannel source) {
445492
// deleted services were removed on the other node - remove it here too
446493
if (snapshot.lifeCycle() == ServiceLifeCycle.DELETED) {
447494
this.knownServices.remove(snapshot.serviceId().uniqueId());
@@ -450,14 +497,9 @@ public void handleServiceUpdate(@NonNull ServiceInfoSnapshot snapshot, @UnknownN
450497
// register the service if the provider is available
451498
var provider = this.knownServices.get(snapshot.serviceId().uniqueId());
452499
if (provider == null) {
453-
this.knownServices.putIfAbsent(
454-
snapshot.serviceId().uniqueId(),
455-
this.sender.factory().generateRPCChainBasedApi(
456-
this.sender,
457-
"serviceProvider",
458-
SpecificCloudServiceProvider.class,
459-
GenerationContext.forClass(RemoteNodeCloudServiceProvider.class).channelSupplier(() -> source).build()
460-
).newInstance(new Object[]{snapshot}, new Object[]{snapshot.serviceId().uniqueId()}));
500+
// this is the only point where the channel has to be present
501+
Objects.requireNonNull(source, "Node Network Channel has to be present to register service");
502+
this.registerService(snapshot, source);
461503
LOGGER.fine("Registered remote service %s", null, snapshot.serviceId());
462504
} else if (provider instanceof RemoteNodeCloudServiceProvider remoteProvider) {
463505
// update the provider if possible - we need only to handle remote node providers as local providers will update

node/src/main/java/eu/cloudnetservice/node/service/defaults/NodeCloudServiceFactory.java

+54-7
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818

1919
import dev.derklaro.aerogel.PostConstruct;
2020
import dev.derklaro.aerogel.auto.Provides;
21+
import eu.cloudnetservice.common.log.LogManager;
22+
import eu.cloudnetservice.common.log.Logger;
2123
import eu.cloudnetservice.driver.channel.ChannelMessage;
2224
import eu.cloudnetservice.driver.channel.ChannelMessageTarget;
2325
import eu.cloudnetservice.driver.event.EventManager;
@@ -31,6 +33,7 @@
3133
import eu.cloudnetservice.driver.service.ServiceConfiguration;
3234
import eu.cloudnetservice.driver.service.ServiceCreateResult;
3335
import eu.cloudnetservice.driver.service.ServiceCreateRetryConfiguration;
36+
import eu.cloudnetservice.node.cluster.NodeServer;
3437
import eu.cloudnetservice.node.cluster.NodeServerProvider;
3538
import eu.cloudnetservice.node.event.service.CloudServiceNodeSelectEvent;
3639
import eu.cloudnetservice.node.network.listener.message.ServiceChannelMessageListener;
@@ -51,6 +54,8 @@
5154
@Provides(CloudServiceFactory.class)
5255
public class NodeCloudServiceFactory implements CloudServiceFactory {
5356

57+
private static final Logger LOGGER = LogManager.logger(NodeCloudServiceFactory.class);
58+
5459
private final EventManager eventManager;
5560
private final CloudServiceManager serviceManager;
5661
private final NodeServerProvider nodeServerProvider;
@@ -128,10 +133,10 @@ private void registerServiceChannelListener() {
128133
"head_node_to_node_start_service",
129134
nodeServer.info().uniqueId(),
130135
serviceConfiguration);
136+
137+
// process the service creation result and return it if the creation was successful
138+
createResult = this.processServiceStartResponse(createResult, nodeServer);
131139
if (createResult.state() == ServiceCreateResult.State.CREATED) {
132-
// register the service locally in case the registration packet was not sent before a response to this
133-
// packet was received
134-
this.serviceManager.handleServiceUpdate(createResult.serviceInfo(), nodeServer.channel());
135140
return createResult;
136141
}
137142

@@ -140,9 +145,12 @@ private void registerServiceChannelListener() {
140145
maybeServiceConfiguration.retryConfiguration(),
141146
serviceConfiguration);
142147
} else {
143-
// start on the current node
144-
var serviceInfo = this.serviceManager.createLocalCloudService(serviceConfiguration).serviceInfo();
145-
return ServiceCreateResult.created(serviceInfo);
148+
// start on the current node & publish the service snapshot to all components
149+
var createdService = this.serviceManager.createLocalCloudService(serviceConfiguration);
150+
createdService.handleServiceRegister();
151+
152+
// construct the create result
153+
return ServiceCreateResult.created(createdService.serviceInfo());
146154
}
147155
} finally {
148156
this.serviceCreationLock.unlock();
@@ -156,6 +164,45 @@ private void registerServiceChannelListener() {
156164
}
157165
}
158166

167+
protected @NonNull ServiceCreateResult processServiceStartResponse(
168+
@NonNull ServiceCreateResult result,
169+
@NonNull NodeServer associatedNode
170+
) {
171+
// if the service creation failed we don't need to check anything
172+
if (result.state() != ServiceCreateResult.State.CREATED) {
173+
return result;
174+
}
175+
176+
// check if the node is still connected
177+
var nodeChannel = associatedNode.channel();
178+
if (nodeChannel == null || !associatedNode.available()) {
179+
LOGGER.fine(
180+
"Unable to register service on node %s as the node is no longer connected",
181+
null,
182+
associatedNode.info().uniqueId());
183+
return ServiceCreateResult.FAILED;
184+
}
185+
186+
// try to register the created service locally
187+
var serviceUniqueId = result.serviceInfo().serviceId().uniqueId();
188+
var serviceProvider = this.serviceManager.registerService(result.serviceInfo(), nodeChannel);
189+
if (serviceProvider != null) {
190+
// service registered successfully, finish the registration of the service on the other node
191+
ChannelMessage.builder()
192+
.channel(NetworkConstants.INTERNAL_MSG_CHANNEL)
193+
.message("head_node_to_node_finish_service_registration")
194+
.buffer(DataBuf.empty().writeUniqueId(serviceUniqueId))
195+
.target(ChannelMessageTarget.Type.NODE, associatedNode.info().uniqueId())
196+
.build()
197+
.send();
198+
return result;
199+
} else {
200+
// a service with the id already exists, just let the unaccepted service
201+
// time out on the other node... ¯\_(ツ)_/¯
202+
return ServiceCreateResult.FAILED;
203+
}
204+
}
205+
159206
protected @NonNull ServiceCreateResult sendNodeServerStartRequest(
160207
@NonNull String message,
161208
@NonNull String targetNode,
@@ -169,7 +216,7 @@ private void registerServiceChannelListener() {
169216
.buffer(DataBuf.empty().writeObject(configuration))
170217
.build()
171218
.sendSingleQueryAsync()
172-
.get(5, TimeUnit.SECONDS, null);
219+
.get(20, TimeUnit.SECONDS, null);
173220

174221
// read the result service info from the buffer, if the there was no response then we need to fail (only the head
175222
// node should queue start requests)

0 commit comments

Comments
 (0)