Skip to content
Open
98 changes: 98 additions & 0 deletions docs/composite-structure.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
# Composite Structure

## Overview

A **composite structure** represents a logical grouping of namespaces or services that are treated as a single dynamic unit.

The composite structure defines **membership only** — it does not describe behavior, configuration, or lifecycle of individual members. Its purpose is to provide a consistent and up-to-date view of which entities belong to a composite at any given time.

---

## Composite Identifier

Each composite is identified by a unique **composite ID**.

The composite ID:

* is used as a key prefix in Consul,
* is included in all composite structure notifications,
* uniquely identifies a composite across the platform.

---

## Composite Members

A **composite member** is a namespace (or logical unit) that belongs to a composite.

Characteristics:

* Membership is **dynamic** — members can be added at runtime.
* The member list represents the **current desired state** of the composite.
* Ordering of members is not significant; membership is treated as a set.

---

## Storage Model

### Source of Truth

The composite structure is stored in **Consul**, which acts as the single source of truth.

For each composite:

* Members are stored as a set of keys under a composite-specific prefix.
* The presence or absence of a key determines membership.

---

### Modify Index

When reading composite membership from Consul, the response includes a **modify index**.

The modify index:

* is a monotonically increasing value maintained by Consul,
* changes whenever any member is added or removed,
* represents the **version of the composite structure**.

If the composite membership does not change, the modify index remains the same.

---

## Composite Structure Model

Conceptually, a composite structure consists of:

* **Composite ID**
* **Member set**
* **Modify index**

This can be represented as:

```text
CompositeStructure
├─ compositeId
├─ members (set)
└─ modifyIndex
```

The modify index applies to the structure as a whole, not to individual members.

---

## Propagation and Consumers

The composite structure is propagated to external systems as a **snapshot**, not as a delta.

Each propagation includes:

* the full member set,
* the current modify index.

Consumers may use the modify index to:

* detect whether the structure has changed since the last update,
* ignore duplicate updates,
* enforce ordering or freshness constraints.

The interpretation of the modify index is the responsibility of the consumer.
Original file line number Diff line number Diff line change
@@ -1,25 +1,25 @@
package com.netcracker.core.declarative.client.reconciler;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.netcracker.core.declarative.service.composite.CompositeStructureWatcher;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.javaoperatorsdk.operator.api.reconciler.UpdateControl;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import com.netcracker.core.declarative.client.rest.Condition;
import com.netcracker.core.declarative.client.rest.ProcessStatus;
import com.netcracker.core.declarative.exception.NoopConsulException;
import com.netcracker.core.declarative.model.CompositeMembersList;
import com.netcracker.core.declarative.resources.base.CoreCondition;
import com.netcracker.core.declarative.resources.base.CoreResource;
import com.netcracker.core.declarative.resources.base.Phase;
import com.netcracker.core.declarative.resources.composite.Composite;
import com.netcracker.core.declarative.service.CompositeCRHolder;
import com.netcracker.core.declarative.service.CompositeConsulUpdater;
import com.netcracker.core.declarative.service.CompositeSpec;
import com.netcracker.core.declarative.service.CompositeCRHolder;
import com.netcracker.core.declarative.service.CompositeStructureUpdateNotifier;
import com.netcracker.core.declarative.service.composite.CompositeStructureWatcher;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.javaoperatorsdk.operator.api.reconciler.UpdateControl;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;

import java.util.List;
import java.util.Set;
import java.util.function.Function;

import static com.netcracker.core.declarative.client.constants.Constants.VALIDATED_STEP_NAME;
Expand Down Expand Up @@ -86,7 +86,7 @@ public UpdateControl<T> reconcileInternal(T composite) throws Exception {
}
}

Set<String> namespaceList = compositeConsulUpdater.getCompositeMembers(compositeSpec.getCompositeId());
CompositeMembersList compositeMembersList = compositeConsulUpdater.getCompositeMembers(compositeSpec.getCompositeId());

log.info("Update XaaSes...");
for (CompositeStructureUpdateNotifier step : compositeStructureUpdateNotifiers) {
Expand All @@ -97,7 +97,7 @@ public UpdateControl<T> reconcileInternal(T composite) throws Exception {

try {
log.info("Send composite structure to {}", step.getXaasName());
step.notify(compositeSpec.getCompositeId(), namespaceList);
step.notify(compositeSpec.getCompositeId(), compositeMembersList.members(), compositeMembersList.modifyIndex());
completeStep(composite, stepId);
} catch (Exception e) {
log.error("Notification failed for {}", step.getXaasName(), e);
Expand All @@ -118,8 +118,7 @@ protected UpdateControl<T> onReconciliationCompleted(T composite) {
log.info("CompositeStructure updated -> start CompositeStructure watcher for compositeId = {}", compositeSpec.getCompositeId());
compositeCRHolder.set(composite);
compositeStructureWatcher.start(compositeSpec.getCompositeId());
}
catch (Exception e) {
} catch (Exception e) {
log.error("Cannot start compositeStructureWatcher", e);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@ public interface CompositeClient {

record Request(
String id,
Set<String> namespaces) {
Set<String> namespaces,
long modifyIndex
) {
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package com.netcracker.core.declarative.model;

import java.util.Set;

public record CompositeMembersList(long modifyIndex, Set<String> members) {
}
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
package com.netcracker.core.declarative.service;

import java.util.Set;
import com.netcracker.core.declarative.model.CompositeMembersList;

import java.util.concurrent.ExecutionException;

public interface CompositeConsulUpdater {
void updateCompositeStructureInConsul(CompositeSpec compositeSpec) throws ExecutionException, InterruptedException;

void updateCompositeStructureInConsul(String namespace, CompositeSpec compositeSpec) throws ExecutionException, InterruptedException;

Set<String> getCompositeMembers(String compositeId) throws ExecutionException, InterruptedException;
CompositeMembersList getCompositeMembers(String compositeId) throws ExecutionException, InterruptedException;
}
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
package com.netcracker.core.declarative.service;

import com.netcracker.cloud.consul.provider.common.TokenStorage;
import com.netcracker.core.declarative.model.CompositeMembersList;
import io.vertx.ext.consul.*;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import com.netcracker.cloud.consul.provider.common.TokenStorage;

import java.nio.file.Paths;
import java.util.Collections;
Expand Down Expand Up @@ -86,18 +87,21 @@ public void updateCompositeStructureInConsul(String namespace, CompositeSpec com
}

@Override
public Set<String> getCompositeMembers(String compositeId) throws ExecutionException, InterruptedException {
public CompositeMembersList getCompositeMembers(String compositeId) throws ExecutionException, InterruptedException {
String compositeDefinitionRoot = COMPOSITE_STRUCTURE_BASE_PATH_TEMPLATE.formatted(compositeId);
log.info("Get updated composite structure from consul by path: {}", compositeDefinitionRoot);
ConsulClient consulClient = consulClientFactory.create(consulTokenStorage.get());
try {
return consulClient.getKeys(compositeDefinitionRoot)
KeyValueList keyValueList = consulClient.getValues(compositeDefinitionRoot)
.toCompletionStage()
.toCompletableFuture()
.get()
.get();
Set<String> members = keyValueList
.getList()
.stream()
.map(s -> Paths.get(compositeDefinitionRoot).relativize(Paths.get(s)).getParent().toString())
.map(s -> Paths.get(compositeDefinitionRoot).relativize(Paths.get(s.getKey())).getParent().toString())
.collect(Collectors.toSet());
return new CompositeMembersList(keyValueList.getIndex(), members);
} finally {
consulClient.close();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.netcracker.cloud.core.error.rest.tmf.DefaultTmfErrorResponseConverter;
import com.netcracker.cloud.core.error.rest.tmf.TmfErrorResponse;
import com.netcracker.core.declarative.client.rest.CompositeClient;
import lombok.AllArgsConstructor;
import lombok.Getter;
import com.netcracker.cloud.core.error.rest.tmf.DefaultTmfErrorResponseConverter;
import com.netcracker.cloud.core.error.rest.tmf.TmfErrorResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -23,8 +23,8 @@ public class CompositeStructureUpdateNotifier {
private final String xaasName;
private final CompositeClient compositeClient;

public void notify(String compositeId, Set<String> compositeMembers) {
CompositeClient.Request compositeStructure = new CompositeClient.Request(compositeId, compositeMembers);
public void notify(String compositeId, Set<String> compositeMembers, long modifyIndex) {
CompositeClient.Request compositeStructure = new CompositeClient.Request(compositeId, compositeMembers, modifyIndex);
log.info("Send request to {} with composite structure {}", xaasName, compositeStructure);
try (jakarta.ws.rs.core.Response response = compositeClient.structures(compositeStructure)) {
if (response.getStatusInfo().getStatusCode() == SC_NO_CONTENT) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package com.netcracker.core.declarative.service;

import com.netcracker.core.declarative.exception.NoopConsulException;
import com.netcracker.core.declarative.model.CompositeMembersList;

import java.util.Set;
import java.util.concurrent.ExecutionException;

public class NoopCompositeConsulUpdaterImpl implements CompositeConsulUpdater {
Expand All @@ -18,7 +18,7 @@ public void updateCompositeStructureInConsul(String namespace, CompositeSpec com
}

@Override
public Set<String> getCompositeMembers(String compositeId) {
public CompositeMembersList getCompositeMembers(String compositeId) {
throw new NoopConsulException();
}

Expand Down
8 changes: 5 additions & 3 deletions service/src/main/resources/application.properties
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,11 @@ quarkus.log.console.format=[%d{yyyy-MM-dd'T'HH:mm:ss.SSS}] [%-5p] [request_id=%X
quarkus.http.limits.max-header-size=${http.buffer.header.max.size:10240}

quarkus.rest-client.mesh-client-v3.url=http://control-plane:8080
%dev.quarkus.rest-client.mesh-client-v3.url=http://127.0.0.1:8083
%dev.quarkus.rest-client.mesh-client-v3.url=http://control-plane:8080
quarkus.rest-client.maas-client.url=http://maas-agent:8080
%dev.quarkus.rest-client.maas-client.url=http://127.0.0.1:8084
%dev.quarkus.rest-client.maas-client.url=http://maas-agent:8080
quarkus.rest-client.dbaas-client.url=http://dbaas-agent:8080
%dev.quarkus.rest-client.dbaas-client.url=http://127.0.0.1:8085
%dev.quarkus.rest-client.dbaas-client.url=http://dbaas-agent:8080
quarkus.rest-client.idp-extensions-client.url=http://idp-extensions:8080
%dev.quarkus.rest-client.idp-extensions-client.url=http://127.0.0.1:8087
quarkus.rest-client.key-manager-client.url=http://key-manager:8080
Expand All @@ -38,6 +38,8 @@ cloud.composite.structure.xaas.read-timeout=5000
cloud.composite.structure.xaas.connect-timeout=1000
cloud.composite.structure.consul.update-timeout=5000

quarkus.consul-source-config.m2m.enabled=false

cloud.composite.structure.sync.enabled=true
cloud.composite.structure.sync.check-interval=300000
cloud.consul-long-poll.retry-time=20000
Expand Down
Loading
Loading