From 8bc1731b773eed9d86f8b483ad92c792a020a6e3 Mon Sep 17 00:00:00 2001 From: Evgenii Stetsenko Date: Mon, 29 Dec 2025 16:19:03 +0300 Subject: [PATCH 1/8] feat: check modify index before composite structure apply --- .../reconciler/BaseCompositeReconciler.java | 14 +++++------ .../client/rest/CompositeClient.java | 4 +++- .../model/CompositeMembersList.java | 6 +++++ .../service/CompositeConsulUpdater.java | 5 ++-- .../service/CompositeConsulUpdaterImpl.java | 14 +++++++---- .../CompositeStructureUpdateNotifier.java | 4 ++-- .../NoopCompositeConsulUpdaterImpl.java | 4 ++-- .../reconciler/CompositeReconcilerTest.java | 23 +++++++++++++++---- .../CompositeConsulUpdaterImplTest.java | 19 ++++++++------- .../CompositeReconcilerConsulTest.java | 11 ++++----- 10 files changed, 67 insertions(+), 37 deletions(-) create mode 100644 service/src/main/java/com/netcracker/core/declarative/model/CompositeMembersList.java diff --git a/service/src/main/java/com/netcracker/core/declarative/client/reconciler/BaseCompositeReconciler.java b/service/src/main/java/com/netcracker/core/declarative/client/reconciler/BaseCompositeReconciler.java index 238229a7..28323413 100644 --- a/service/src/main/java/com/netcracker/core/declarative/client/reconciler/BaseCompositeReconciler.java +++ b/service/src/main/java/com/netcracker/core/declarative/client/reconciler/BaseCompositeReconciler.java @@ -1,13 +1,10 @@ package com.netcracker.core.declarative.client.reconciler; import com.fasterxml.jackson.databind.ObjectMapper; -import io.fabric8.kubernetes.client.KubernetesClient; -import io.javaoperatorsdk.operator.api.reconciler.UpdateControl; -import lombok.extern.slf4j.Slf4j; -import org.apache.commons.lang3.StringUtils; import com.netcracker.core.declarative.client.rest.Condition; import com.netcracker.core.declarative.client.rest.ProcessStatus; import com.netcracker.core.declarative.exception.NoopConsulException; +import com.netcracker.core.declarative.model.CompositeMembersList; import com.netcracker.core.declarative.resources.base.CoreCondition; import com.netcracker.core.declarative.resources.base.CoreResource; import com.netcracker.core.declarative.resources.base.Phase; @@ -15,9 +12,12 @@ import com.netcracker.core.declarative.service.CompositeConsulUpdater; import com.netcracker.core.declarative.service.CompositeSpec; import com.netcracker.core.declarative.service.CompositeStructureUpdateNotifier; +import io.fabric8.kubernetes.client.KubernetesClient; +import io.javaoperatorsdk.operator.api.reconciler.UpdateControl; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; import java.util.List; -import java.util.Set; import java.util.function.Function; import static com.netcracker.core.declarative.client.constants.Constants.VALIDATED_STEP_NAME; @@ -77,7 +77,7 @@ public UpdateControl reconcileInternal(T composite) throws Exception { } } - Set namespaceList = compositeConsulUpdater.getCompositeMembers(compositeSpec.getCompositeId()); + CompositeMembersList compositeMembersList = compositeConsulUpdater.getCompositeMembers(compositeSpec.getCompositeId()); log.info("Update XaaSes..."); for (CompositeStructureUpdateNotifier step : compositeStructureUpdateNotifiers) { @@ -88,7 +88,7 @@ public UpdateControl reconcileInternal(T composite) throws Exception { try { log.info("Send composite structure to {}", step.getXaasName()); - step.notify(compositeSpec.getCompositeId(), namespaceList); + step.notify(compositeSpec.getCompositeId(), compositeMembersList.members(), compositeMembersList.index()); completeStep(composite, stepId); } catch (Exception e) { log.error("Notification failed for {}", step.getXaasName(), e); diff --git a/service/src/main/java/com/netcracker/core/declarative/client/rest/CompositeClient.java b/service/src/main/java/com/netcracker/core/declarative/client/rest/CompositeClient.java index ff376fd7..cb92fd26 100644 --- a/service/src/main/java/com/netcracker/core/declarative/client/rest/CompositeClient.java +++ b/service/src/main/java/com/netcracker/core/declarative/client/rest/CompositeClient.java @@ -17,7 +17,9 @@ public interface CompositeClient { record Request( String id, - Set namespaces) { + Set namespaces, + long index + ) { } } \ No newline at end of file diff --git a/service/src/main/java/com/netcracker/core/declarative/model/CompositeMembersList.java b/service/src/main/java/com/netcracker/core/declarative/model/CompositeMembersList.java new file mode 100644 index 00000000..33fbac79 --- /dev/null +++ b/service/src/main/java/com/netcracker/core/declarative/model/CompositeMembersList.java @@ -0,0 +1,6 @@ +package com.netcracker.core.declarative.model; + +import java.util.Set; + +public record CompositeMembersList(Long index, Set members) { +} diff --git a/service/src/main/java/com/netcracker/core/declarative/service/CompositeConsulUpdater.java b/service/src/main/java/com/netcracker/core/declarative/service/CompositeConsulUpdater.java index bb4cb2a9..8f48d14a 100644 --- a/service/src/main/java/com/netcracker/core/declarative/service/CompositeConsulUpdater.java +++ b/service/src/main/java/com/netcracker/core/declarative/service/CompositeConsulUpdater.java @@ -1,6 +1,7 @@ package com.netcracker.core.declarative.service; -import java.util.Set; +import com.netcracker.core.declarative.model.CompositeMembersList; + import java.util.concurrent.ExecutionException; public interface CompositeConsulUpdater { @@ -8,5 +9,5 @@ public interface CompositeConsulUpdater { void updateCompositeStructureInConsul(String namespace, CompositeSpec compositeSpec) throws ExecutionException, InterruptedException; - Set getCompositeMembers(String compositeId) throws ExecutionException, InterruptedException; + CompositeMembersList getCompositeMembers(String compositeId) throws ExecutionException, InterruptedException; } diff --git a/service/src/main/java/com/netcracker/core/declarative/service/CompositeConsulUpdaterImpl.java b/service/src/main/java/com/netcracker/core/declarative/service/CompositeConsulUpdaterImpl.java index b3fbafb9..9da6e03f 100644 --- a/service/src/main/java/com/netcracker/core/declarative/service/CompositeConsulUpdaterImpl.java +++ b/service/src/main/java/com/netcracker/core/declarative/service/CompositeConsulUpdaterImpl.java @@ -1,10 +1,11 @@ package com.netcracker.core.declarative.service; +import com.netcracker.cloud.consul.provider.common.TokenStorage; +import com.netcracker.core.declarative.model.CompositeMembersList; import io.vertx.ext.consul.*; import lombok.AllArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; -import com.netcracker.cloud.consul.provider.common.TokenStorage; import java.nio.file.Paths; import java.util.Collections; @@ -86,18 +87,21 @@ public void updateCompositeStructureInConsul(String namespace, CompositeSpec com } @Override - public Set getCompositeMembers(String compositeId) throws ExecutionException, InterruptedException { + public CompositeMembersList getCompositeMembers(String compositeId) throws ExecutionException, InterruptedException { String compositeDefinitionRoot = COMPOSITE_STRUCTURE_BASE_PATH_TEMPLATE.formatted(compositeId); log.info("Get updated composite structure from consul by path: {}", compositeDefinitionRoot); ConsulClient consulClient = consulClientFactory.create(consulTokenStorage.get()); try { - return consulClient.getKeys(compositeDefinitionRoot) + KeyValueList keyValueList = consulClient.getValues(compositeDefinitionRoot) .toCompletionStage() .toCompletableFuture() - .get() + .get(); + Set members = keyValueList + .getList() .stream() - .map(s -> Paths.get(compositeDefinitionRoot).relativize(Paths.get(s)).getParent().toString()) + .map(s -> Paths.get(compositeDefinitionRoot).relativize(Paths.get(s.getKey())).getParent().toString()) .collect(Collectors.toSet()); + return new CompositeMembersList(keyValueList.getIndex(), members); } finally { consulClient.close(); } diff --git a/service/src/main/java/com/netcracker/core/declarative/service/CompositeStructureUpdateNotifier.java b/service/src/main/java/com/netcracker/core/declarative/service/CompositeStructureUpdateNotifier.java index 9eebd5a2..617c22ec 100644 --- a/service/src/main/java/com/netcracker/core/declarative/service/CompositeStructureUpdateNotifier.java +++ b/service/src/main/java/com/netcracker/core/declarative/service/CompositeStructureUpdateNotifier.java @@ -23,8 +23,8 @@ public class CompositeStructureUpdateNotifier { private final String xaasName; private final CompositeClient compositeClient; - public void notify(String compositeId, Set compositeMembers) { - CompositeClient.Request compositeStructure = new CompositeClient.Request(compositeId, compositeMembers); + public void notify(String compositeId, Set compositeMembers, long index) { + CompositeClient.Request compositeStructure = new CompositeClient.Request(compositeId, compositeMembers, index); log.info("Send request to {} with composite structure {}", xaasName, compositeStructure); try (jakarta.ws.rs.core.Response response = compositeClient.structures(compositeStructure)) { if (response.getStatusInfo().getStatusCode() == SC_NO_CONTENT) { diff --git a/service/src/main/java/com/netcracker/core/declarative/service/NoopCompositeConsulUpdaterImpl.java b/service/src/main/java/com/netcracker/core/declarative/service/NoopCompositeConsulUpdaterImpl.java index b0695c62..9292885d 100644 --- a/service/src/main/java/com/netcracker/core/declarative/service/NoopCompositeConsulUpdaterImpl.java +++ b/service/src/main/java/com/netcracker/core/declarative/service/NoopCompositeConsulUpdaterImpl.java @@ -1,8 +1,8 @@ package com.netcracker.core.declarative.service; import com.netcracker.core.declarative.exception.NoopConsulException; +import com.netcracker.core.declarative.model.CompositeMembersList; -import java.util.Set; import java.util.concurrent.ExecutionException; public class NoopCompositeConsulUpdaterImpl implements CompositeConsulUpdater { @@ -18,7 +18,7 @@ public void updateCompositeStructureInConsul(String namespace, CompositeSpec com } @Override - public Set getCompositeMembers(String compositeId) { + public CompositeMembersList getCompositeMembers(String compositeId) { throw new NoopConsulException(); } diff --git a/service/src/test/java/com/netcracker/core/declarative/client/reconciler/CompositeReconcilerTest.java b/service/src/test/java/com/netcracker/core/declarative/client/reconciler/CompositeReconcilerTest.java index 8968dd84..9bc6993b 100644 --- a/service/src/test/java/com/netcracker/core/declarative/client/reconciler/CompositeReconcilerTest.java +++ b/service/src/test/java/com/netcracker/core/declarative/client/reconciler/CompositeReconcilerTest.java @@ -1,6 +1,7 @@ package com.netcracker.core.declarative.client.reconciler; import com.netcracker.core.declarative.client.rest.CompositeClient; +import com.netcracker.core.declarative.model.CompositeMembersList; import com.netcracker.core.declarative.resources.base.CoreCondition; import com.netcracker.core.declarative.resources.base.CoreResource; import com.netcracker.core.declarative.resources.composite.Composite; @@ -17,6 +18,7 @@ import java.util.List; import java.util.Map; +import java.util.Set; import static com.netcracker.core.declarative.client.reconciler.CompositeReconciler.DBAAS_NAME; import static com.netcracker.core.declarative.client.reconciler.CompositeReconciler.MAAS_NAME; @@ -36,9 +38,13 @@ class CompositeReconcilerTest { void reconcileInternal() throws Exception { CompositeClient compositeClient = mock(CompositeClient.class); when(compositeClient.structures(any())).thenReturn(Response.noContent().build()); + + CompositeConsulUpdater compositeConsulUpdater = mock(CompositeConsulUpdater.class); + when(compositeConsulUpdater.getCompositeMembers(any())).thenReturn(new CompositeMembersList(123L, Set.of())); + CompositeReconciler compositeReconciler = new CompositeReconciler( mock(KubernetesClient.class), - mock(CompositeConsulUpdater.class), + compositeConsulUpdater, List.of( new CompositeStructureUpdateNotifier(MAAS_NAME, compositeClient), new CompositeStructureUpdateNotifier(DBAAS_NAME, compositeClient) @@ -155,9 +161,12 @@ void reconcileInternal_fail_MaaSUpdate() throws Exception { when(namespaceableResource.updateStatus()).thenReturn(mock(CoreResource.class)); when(kubernetesClient.resource(any(HasMetadata.class))).thenReturn(namespaceableResource); + CompositeConsulUpdater compositeConsulUpdater = mock(CompositeConsulUpdater.class); + when(compositeConsulUpdater.getCompositeMembers(any())).thenReturn(new CompositeMembersList(123L, Set.of())); + CompositeReconciler compositeReconciler = new CompositeReconciler( kubernetesClient, - mock(CompositeConsulUpdater.class), + compositeConsulUpdater, List.of(new CompositeStructureUpdateNotifier(MAAS_NAME, compositeClient)) ); @@ -192,9 +201,12 @@ void reconcileInternal_MaaSUpdate_fail_response() throws Exception { when(namespaceableResource.updateStatus()).thenReturn(mock(CoreResource.class)); when(kubernetesClient.resource(any(HasMetadata.class))).thenReturn(namespaceableResource); + CompositeConsulUpdater compositeConsulUpdater = mock(CompositeConsulUpdater.class); + when(compositeConsulUpdater.getCompositeMembers(any())).thenReturn(new CompositeMembersList(123L, Set.of())); + CompositeReconciler compositeReconciler = new CompositeReconciler( kubernetesClient, - mock(CompositeConsulUpdater.class), + compositeConsulUpdater, List.of(new CompositeStructureUpdateNotifier(MAAS_NAME, compositeClient)) ); @@ -232,9 +244,12 @@ void reconcileInternal_MaaSUpdate_fail_tmf_response() throws Exception { when(namespaceableResource.updateStatus()).thenReturn(mock(CoreResource.class)); when(kubernetesClient.resource(any(HasMetadata.class))).thenReturn(namespaceableResource); + CompositeConsulUpdater compositeConsulUpdater = mock(CompositeConsulUpdater.class); + when(compositeConsulUpdater.getCompositeMembers(any())).thenReturn(new CompositeMembersList(123L, Set.of())); + CompositeReconciler compositeReconciler = new CompositeReconciler( kubernetesClient, - mock(CompositeConsulUpdater.class), + compositeConsulUpdater, List.of(new CompositeStructureUpdateNotifier(MAAS_NAME, compositeClient)) ); diff --git a/service/src/test/java/com/netcracker/core/declarative/service/CompositeConsulUpdaterImplTest.java b/service/src/test/java/com/netcracker/core/declarative/service/CompositeConsulUpdaterImplTest.java index 545ca94c..8621f60b 100644 --- a/service/src/test/java/com/netcracker/core/declarative/service/CompositeConsulUpdaterImplTest.java +++ b/service/src/test/java/com/netcracker/core/declarative/service/CompositeConsulUpdaterImplTest.java @@ -1,6 +1,7 @@ package com.netcracker.core.declarative.service; import com.netcracker.core.declarative.client.rest.CompositeClient; +import com.netcracker.core.declarative.model.CompositeMembersList; import io.vertx.core.Future; import io.vertx.ext.consul.*; import org.junit.jupiter.api.BeforeEach; @@ -241,19 +242,21 @@ void compositeStructureUpdateStep_BO_BG_SO_BG() throws InterruptedException, Exe void getCompositeMembers() throws ExecutionException, InterruptedException { String basePath = "composite/first/structure"; - when(consulClient.getKeys(basePath)) - .thenReturn(Future.succeededFuture(List.of( - basePath + "/first" + "/compositeRole", - basePath + "/second" + "/compositeRole", - basePath + "/third" + "/compositeRole" - ))); + when(consulClient.getValues(basePath)) + .thenReturn(Future.succeededFuture(new KeyValueList() + .setIndex(123L) + .setList(List.of( + new KeyValue().setKey(basePath + "/first" + "/compositeRole").setValue("val1"), + new KeyValue().setKey(basePath + "/second" + "/compositeRole").setValue("val2"), + new KeyValue().setKey(basePath + "/third" + "/compositeRole").setValue("val3") + )))); CompositeClient compositeClient = mock(CompositeClient.class); when(compositeClient.structures(any())).thenReturn(jakarta.ws.rs.core.Response.noContent().build()); CompositeConsulUpdater compositeConsulUpdater = new CompositeConsulUpdaterImpl("first", consulClientFactory, mock(TokenStorage.class)); - Set compositeMembers = compositeConsulUpdater.getCompositeMembers("first"); - assertEquals(Set.of("first", "second", "third"), compositeMembers); + CompositeMembersList compositeMembersList = compositeConsulUpdater.getCompositeMembers("first"); + assertEquals(Set.of("first", "second", "third"), compositeMembersList.members()); } private void verifyConsulValueSetTxn(List ops, List kvs) { diff --git a/service/src/test/java/com/netcracker/core/declarative/service/CompositeReconcilerConsulTest.java b/service/src/test/java/com/netcracker/core/declarative/service/CompositeReconcilerConsulTest.java index 90da4c6a..8aafbfc6 100644 --- a/service/src/test/java/com/netcracker/core/declarative/service/CompositeReconcilerConsulTest.java +++ b/service/src/test/java/com/netcracker/core/declarative/service/CompositeReconcilerConsulTest.java @@ -1,13 +1,12 @@ package com.netcracker.core.declarative.service; import com.netcracker.core.declarative.ConsulTestResource; +import com.netcracker.core.declarative.model.CompositeMembersList; import io.quarkus.test.common.QuarkusTestResource; import io.quarkus.test.junit.QuarkusTest; import jakarta.inject.Inject; import org.junit.jupiter.api.Test; -import java.util.Set; - import static org.junit.jupiter.api.Assertions.assertTrue; @QuarkusTest @@ -20,15 +19,15 @@ public class CompositeReconcilerConsulTest { @Test public void deleteSubTree() throws Exception { compositeConsulUpdater.updateCompositeStructureInConsul("test-namespace-baseline", new CompositeSpec("", "test-namespace-baseline", "", null)); - Set compositeMembers = compositeConsulUpdater.getCompositeMembers("test-namespace-baseline"); - assertTrue(compositeMembers.contains("test-namespace-baseline")); + CompositeMembersList compositeMembers = compositeConsulUpdater.getCompositeMembers("test-namespace-baseline"); + assertTrue(compositeMembers.members().contains("test-namespace-baseline")); compositeConsulUpdater.updateCompositeStructureInConsul("test-namespace", new CompositeSpec("", "test-namespace", "", new CompositeSpec.CompositeSpecBaseline("", "test-namespace-baseline", ""))); compositeMembers = compositeConsulUpdater.getCompositeMembers("test-namespace-baseline"); - assertTrue(compositeMembers.contains("test-namespace-baseline")); + assertTrue(compositeMembers.members().contains("test-namespace-baseline")); compositeConsulUpdater.updateCompositeStructureInConsul("test-namespace", new CompositeSpec("", "test-namespace", "", new CompositeSpec.CompositeSpecBaseline("", "test-namespace-baseline", ""))); compositeMembers = compositeConsulUpdater.getCompositeMembers("test-namespace-baseline"); - assertTrue(compositeMembers.contains("test-namespace-baseline")); + assertTrue(compositeMembers.members().contains("test-namespace-baseline")); } } \ No newline at end of file From e7af9bddf3edafb3842668f4ced90eb18aac7460 Mon Sep 17 00:00:00 2001 From: Evgenii Stetsenko Date: Tue, 13 Jan 2026 18:06:35 +0300 Subject: [PATCH 2/8] feat: check modify index before composite structure apply --- .../reconciler/CompositeReconcilerTest.java | 29 +++++++++++++++++-- 1 file changed, 26 insertions(+), 3 deletions(-) diff --git a/service/src/test/java/com/netcracker/core/declarative/client/reconciler/CompositeReconcilerTest.java b/service/src/test/java/com/netcracker/core/declarative/client/reconciler/CompositeReconcilerTest.java index 9bc6993b..0d38b5be 100644 --- a/service/src/test/java/com/netcracker/core/declarative/client/reconciler/CompositeReconcilerTest.java +++ b/service/src/test/java/com/netcracker/core/declarative/client/reconciler/CompositeReconcilerTest.java @@ -28,9 +28,7 @@ import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; -import static org.mockito.Mockito.doThrow; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; +import static org.mockito.Mockito.*; class CompositeReconcilerTest { @@ -69,6 +67,31 @@ void reconcileInternal() throws Exception { } @Test + void reconcileInternal_modify_index() throws Exception { + CompositeClient compositeClient = mock(CompositeClient.class); + when(compositeClient.structures(any())).thenReturn(Response.noContent().build()); + + CompositeConsulUpdater compositeConsulUpdater = mock(CompositeConsulUpdater.class); + when(compositeConsulUpdater.getCompositeMembers(any())).thenReturn(new CompositeMembersList(123L, Set.of("ns-1", "ns-2"))); + + CompositeReconciler compositeReconciler = new CompositeReconciler( + mock(KubernetesClient.class), + compositeConsulUpdater, + List.of( + new CompositeStructureUpdateNotifier(MAAS_NAME, compositeClient), + new CompositeStructureUpdateNotifier(DBAAS_NAME, compositeClient) + ) + ); + + Composite composite = new Composite(); + composite.setSpec(new RawExtension(new CompositeSpec("", "ns-1", "", null))); + compositeReconciler.reconcileInternal(composite); + + verify(compositeClient, times(2)).structures(new CompositeClient.Request("ns-1", Set.of("ns-1", "ns-2"), 123L)); + + } + + @Test void reconcileInternal_no_consul() throws Exception { KubernetesClient kubernetesClient = mock(KubernetesClient.class); NamespaceableResource namespaceableResource = mock(NamespaceableResource.class); From adcc8400ea5fadbe7e7e404829feb7af10ddc03f Mon Sep 17 00:00:00 2001 From: Evgenii Stetsenko Date: Tue, 13 Jan 2026 22:07:00 +0300 Subject: [PATCH 3/8] feat: check modify index before composite structure apply --- .../client/reconciler/BaseCompositeReconciler.java | 2 +- .../core/declarative/client/rest/CompositeClient.java | 2 +- .../core/declarative/model/CompositeMembersList.java | 2 +- .../service/CompositeStructureUpdateNotifier.java | 8 ++++---- 4 files changed, 7 insertions(+), 7 deletions(-) diff --git a/service/src/main/java/com/netcracker/core/declarative/client/reconciler/BaseCompositeReconciler.java b/service/src/main/java/com/netcracker/core/declarative/client/reconciler/BaseCompositeReconciler.java index 28323413..2cd0c1e2 100644 --- a/service/src/main/java/com/netcracker/core/declarative/client/reconciler/BaseCompositeReconciler.java +++ b/service/src/main/java/com/netcracker/core/declarative/client/reconciler/BaseCompositeReconciler.java @@ -88,7 +88,7 @@ public UpdateControl reconcileInternal(T composite) throws Exception { try { log.info("Send composite structure to {}", step.getXaasName()); - step.notify(compositeSpec.getCompositeId(), compositeMembersList.members(), compositeMembersList.index()); + step.notify(compositeSpec.getCompositeId(), compositeMembersList.members(), compositeMembersList.modifyIndex()); completeStep(composite, stepId); } catch (Exception e) { log.error("Notification failed for {}", step.getXaasName(), e); diff --git a/service/src/main/java/com/netcracker/core/declarative/client/rest/CompositeClient.java b/service/src/main/java/com/netcracker/core/declarative/client/rest/CompositeClient.java index cb92fd26..ff4d46df 100644 --- a/service/src/main/java/com/netcracker/core/declarative/client/rest/CompositeClient.java +++ b/service/src/main/java/com/netcracker/core/declarative/client/rest/CompositeClient.java @@ -18,7 +18,7 @@ public interface CompositeClient { record Request( String id, Set namespaces, - long index + long modifyIndex ) { } diff --git a/service/src/main/java/com/netcracker/core/declarative/model/CompositeMembersList.java b/service/src/main/java/com/netcracker/core/declarative/model/CompositeMembersList.java index 33fbac79..4c0e33ce 100644 --- a/service/src/main/java/com/netcracker/core/declarative/model/CompositeMembersList.java +++ b/service/src/main/java/com/netcracker/core/declarative/model/CompositeMembersList.java @@ -2,5 +2,5 @@ import java.util.Set; -public record CompositeMembersList(Long index, Set members) { +public record CompositeMembersList(long modifyIndex, Set members) { } diff --git a/service/src/main/java/com/netcracker/core/declarative/service/CompositeStructureUpdateNotifier.java b/service/src/main/java/com/netcracker/core/declarative/service/CompositeStructureUpdateNotifier.java index 617c22ec..0aad1d7b 100644 --- a/service/src/main/java/com/netcracker/core/declarative/service/CompositeStructureUpdateNotifier.java +++ b/service/src/main/java/com/netcracker/core/declarative/service/CompositeStructureUpdateNotifier.java @@ -2,11 +2,11 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; +import com.netcracker.cloud.core.error.rest.tmf.DefaultTmfErrorResponseConverter; +import com.netcracker.cloud.core.error.rest.tmf.TmfErrorResponse; import com.netcracker.core.declarative.client.rest.CompositeClient; import lombok.AllArgsConstructor; import lombok.Getter; -import com.netcracker.cloud.core.error.rest.tmf.DefaultTmfErrorResponseConverter; -import com.netcracker.cloud.core.error.rest.tmf.TmfErrorResponse; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -23,8 +23,8 @@ public class CompositeStructureUpdateNotifier { private final String xaasName; private final CompositeClient compositeClient; - public void notify(String compositeId, Set compositeMembers, long index) { - CompositeClient.Request compositeStructure = new CompositeClient.Request(compositeId, compositeMembers, index); + public void notify(String compositeId, Set compositeMembers, long modifyIndex) { + CompositeClient.Request compositeStructure = new CompositeClient.Request(compositeId, compositeMembers, modifyIndex); log.info("Send request to {} with composite structure {}", xaasName, compositeStructure); try (jakarta.ws.rs.core.Response response = compositeClient.structures(compositeStructure)) { if (response.getStatusInfo().getStatusCode() == SC_NO_CONTENT) { From 7682693af905cedf23d146a6cf38f7edbed88534 Mon Sep 17 00:00:00 2001 From: Evgenii Stetsenko Date: Thu, 15 Jan 2026 11:30:55 +0300 Subject: [PATCH 4/8] feat: check modify index before composite structure apply --- .../declarative/service/CompositeConsulUpdaterImpl.java | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/service/src/main/java/com/netcracker/core/declarative/service/CompositeConsulUpdaterImpl.java b/service/src/main/java/com/netcracker/core/declarative/service/CompositeConsulUpdaterImpl.java index 9da6e03f..953abb77 100644 --- a/service/src/main/java/com/netcracker/core/declarative/service/CompositeConsulUpdaterImpl.java +++ b/service/src/main/java/com/netcracker/core/declarative/service/CompositeConsulUpdaterImpl.java @@ -32,7 +32,6 @@ public class CompositeConsulUpdaterImpl implements CompositeConsulUpdater { private final String namespace; private final ConsulClientFactory consulClientFactory; - private final TokenStorage consulTokenStorage; @Override public void updateCompositeStructureInConsul(CompositeSpec compositeSpec) throws ExecutionException, InterruptedException { @@ -73,7 +72,7 @@ public void updateCompositeStructureInConsul(String namespace, CompositeSpec com } log.info("Update composite structure in consul by path: {}", compositeDefinitionRoot); - ConsulClient consulClient = consulClientFactory.create(consulTokenStorage.get()); + ConsulClient consulClient = consulClientFactory.create(""); try { TxnResponse result = consulClient.transaction(request).toCompletionStage().toCompletableFuture().get(); if (!result.getErrors().isEmpty()) { @@ -90,7 +89,7 @@ public void updateCompositeStructureInConsul(String namespace, CompositeSpec com public CompositeMembersList getCompositeMembers(String compositeId) throws ExecutionException, InterruptedException { String compositeDefinitionRoot = COMPOSITE_STRUCTURE_BASE_PATH_TEMPLATE.formatted(compositeId); log.info("Get updated composite structure from consul by path: {}", compositeDefinitionRoot); - ConsulClient consulClient = consulClientFactory.create(consulTokenStorage.get()); + ConsulClient consulClient = consulClientFactory.create(""); try { KeyValueList keyValueList = consulClient.getValues(compositeDefinitionRoot) .toCompletionStage() @@ -108,7 +107,7 @@ public CompositeMembersList getCompositeMembers(String compositeId) throws Execu } private Set cleanUp(String compositeId, String originNamespace) throws ExecutionException, InterruptedException { - ConsulClient consulClient = consulClientFactory.create(consulTokenStorage.get()); + ConsulClient consulClient = consulClientFactory.create(""); try { List struct = consulClient .getValues(COMPOSITE_STRUCTURE_BASE_PATH_TEMPLATE.formatted(compositeId)).toCompletionStage().toCompletableFuture().get().getList(); From 508051d9753f0b153d38de0f8dcceb9d9fd8dd5f Mon Sep 17 00:00:00 2001 From: Evgenii Stetsenko Date: Thu, 15 Jan 2026 11:34:54 +0300 Subject: [PATCH 5/8] feat: check modify index before composite structure apply --- .../declarative/service/CompositeConsulUpdaterImpl.java | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/service/src/main/java/com/netcracker/core/declarative/service/CompositeConsulUpdaterImpl.java b/service/src/main/java/com/netcracker/core/declarative/service/CompositeConsulUpdaterImpl.java index 953abb77..9da6e03f 100644 --- a/service/src/main/java/com/netcracker/core/declarative/service/CompositeConsulUpdaterImpl.java +++ b/service/src/main/java/com/netcracker/core/declarative/service/CompositeConsulUpdaterImpl.java @@ -32,6 +32,7 @@ public class CompositeConsulUpdaterImpl implements CompositeConsulUpdater { private final String namespace; private final ConsulClientFactory consulClientFactory; + private final TokenStorage consulTokenStorage; @Override public void updateCompositeStructureInConsul(CompositeSpec compositeSpec) throws ExecutionException, InterruptedException { @@ -72,7 +73,7 @@ public void updateCompositeStructureInConsul(String namespace, CompositeSpec com } log.info("Update composite structure in consul by path: {}", compositeDefinitionRoot); - ConsulClient consulClient = consulClientFactory.create(""); + ConsulClient consulClient = consulClientFactory.create(consulTokenStorage.get()); try { TxnResponse result = consulClient.transaction(request).toCompletionStage().toCompletableFuture().get(); if (!result.getErrors().isEmpty()) { @@ -89,7 +90,7 @@ public void updateCompositeStructureInConsul(String namespace, CompositeSpec com public CompositeMembersList getCompositeMembers(String compositeId) throws ExecutionException, InterruptedException { String compositeDefinitionRoot = COMPOSITE_STRUCTURE_BASE_PATH_TEMPLATE.formatted(compositeId); log.info("Get updated composite structure from consul by path: {}", compositeDefinitionRoot); - ConsulClient consulClient = consulClientFactory.create(""); + ConsulClient consulClient = consulClientFactory.create(consulTokenStorage.get()); try { KeyValueList keyValueList = consulClient.getValues(compositeDefinitionRoot) .toCompletionStage() @@ -107,7 +108,7 @@ public CompositeMembersList getCompositeMembers(String compositeId) throws Execu } private Set cleanUp(String compositeId, String originNamespace) throws ExecutionException, InterruptedException { - ConsulClient consulClient = consulClientFactory.create(""); + ConsulClient consulClient = consulClientFactory.create(consulTokenStorage.get()); try { List struct = consulClient .getValues(COMPOSITE_STRUCTURE_BASE_PATH_TEMPLATE.formatted(compositeId)).toCompletionStage().toCompletableFuture().get().getList(); From 1592c52c286ba3aa95a68dbdd90eb15885cfc5b1 Mon Sep 17 00:00:00 2001 From: Evgenii Stetsenko Date: Thu, 15 Jan 2026 12:21:32 +0300 Subject: [PATCH 6/8] feat: check modify index before composite structure apply --- service/src/main/resources/application.properties | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/service/src/main/resources/application.properties b/service/src/main/resources/application.properties index dbb2f64a..5b85aecd 100644 --- a/service/src/main/resources/application.properties +++ b/service/src/main/resources/application.properties @@ -14,11 +14,11 @@ quarkus.log.console.format=[%d{yyyy-MM-dd'T'HH:mm:ss.SSS}] [%-5p] [request_id=%X quarkus.http.limits.max-header-size=${http.buffer.header.max.size:10240} quarkus.rest-client.mesh-client-v3.url=http://control-plane:8080 -%dev.quarkus.rest-client.mesh-client-v3.url=http://127.0.0.1:8083 +%dev.quarkus.rest-client.mesh-client-v3.url=http://control-plane:8080 quarkus.rest-client.maas-client.url=http://maas-agent:8080 -%dev.quarkus.rest-client.maas-client.url=http://127.0.0.1:8084 +%dev.quarkus.rest-client.maas-client.url=http://maas-agent:8080 quarkus.rest-client.dbaas-client.url=http://dbaas-agent:8080 -%dev.quarkus.rest-client.dbaas-client.url=http://127.0.0.1:8085 +%dev.quarkus.rest-client.dbaas-client.url=http://dbaas-agent:8080 quarkus.rest-client.idp-extensions-client.url=http://idp-extensions:8080 %dev.quarkus.rest-client.idp-extensions-client.url=http://127.0.0.1:8087 quarkus.rest-client.key-manager-client.url=http://key-manager:8080 @@ -35,3 +35,5 @@ cloud.composite.structure.xaas.receivers=${COMPOSITE_STRUCTURE_XAAS_RECEIVERS:db cloud.composite.structure.xaas.read-timeout=5000 cloud.composite.structure.xaas.connect-timeout=1000 cloud.composite.structure.consul.update-timeout=5000 + +quarkus.consul-source-config.m2m.enabled=false \ No newline at end of file From 0584473009f77118e8dff2e254d4dfb04769355a Mon Sep 17 00:00:00 2001 From: Evgenii Stetsenko Date: Wed, 4 Feb 2026 15:59:49 +0300 Subject: [PATCH 7/8] feat: check modify index before composite structure apply --- .../reconciler/BaseCompositeReconciler.java | 11 +++------- .../reconciler/CompositeReconcilerTest.java | 21 +++++++------------ 2 files changed, 10 insertions(+), 22 deletions(-) diff --git a/service/src/main/java/com/netcracker/core/declarative/client/reconciler/BaseCompositeReconciler.java b/service/src/main/java/com/netcracker/core/declarative/client/reconciler/BaseCompositeReconciler.java index 7256f0dc..04c0d001 100644 --- a/service/src/main/java/com/netcracker/core/declarative/client/reconciler/BaseCompositeReconciler.java +++ b/service/src/main/java/com/netcracker/core/declarative/client/reconciler/BaseCompositeReconciler.java @@ -1,11 +1,6 @@ package com.netcracker.core.declarative.client.reconciler; import com.fasterxml.jackson.databind.ObjectMapper; -import com.netcracker.core.declarative.service.composite.CompositeStructureWatcher; -import io.fabric8.kubernetes.client.KubernetesClient; -import io.javaoperatorsdk.operator.api.reconciler.UpdateControl; -import lombok.extern.slf4j.Slf4j; -import org.apache.commons.lang3.StringUtils; import com.netcracker.core.declarative.client.rest.Condition; import com.netcracker.core.declarative.client.rest.ProcessStatus; import com.netcracker.core.declarative.exception.NoopConsulException; @@ -14,10 +9,11 @@ import com.netcracker.core.declarative.resources.base.CoreResource; import com.netcracker.core.declarative.resources.base.Phase; import com.netcracker.core.declarative.resources.composite.Composite; +import com.netcracker.core.declarative.service.CompositeCRHolder; import com.netcracker.core.declarative.service.CompositeConsulUpdater; import com.netcracker.core.declarative.service.CompositeSpec; -import com.netcracker.core.declarative.service.CompositeCRHolder; import com.netcracker.core.declarative.service.CompositeStructureUpdateNotifier; +import com.netcracker.core.declarative.service.composite.CompositeStructureWatcher; import io.fabric8.kubernetes.client.KubernetesClient; import io.javaoperatorsdk.operator.api.reconciler.UpdateControl; import lombok.extern.slf4j.Slf4j; @@ -122,8 +118,7 @@ protected UpdateControl onReconciliationCompleted(T composite) { log.info("CompositeStructure updated -> start CompositeStructure watcher for compositeId = {}", compositeSpec.getCompositeId()); compositeCRHolder.set(composite); compositeStructureWatcher.start(compositeSpec.getCompositeId()); - } - catch (Exception e) { + } catch (Exception e) { log.error("Cannot start compositeStructureWatcher", e); } diff --git a/service/src/test/java/com/netcracker/core/declarative/client/reconciler/CompositeReconcilerTest.java b/service/src/test/java/com/netcracker/core/declarative/client/reconciler/CompositeReconcilerTest.java index bae097df..5de57186 100644 --- a/service/src/test/java/com/netcracker/core/declarative/client/reconciler/CompositeReconcilerTest.java +++ b/service/src/test/java/com/netcracker/core/declarative/client/reconciler/CompositeReconcilerTest.java @@ -5,11 +5,7 @@ import com.netcracker.core.declarative.resources.base.CoreCondition; import com.netcracker.core.declarative.resources.base.CoreResource; import com.netcracker.core.declarative.resources.composite.Composite; -import com.netcracker.core.declarative.service.CompositeConsulUpdater; -import com.netcracker.core.declarative.service.CompositeSpec; -import com.netcracker.core.declarative.service.CompositeCRHolder; -import com.netcracker.core.declarative.service.CompositeStructureUpdateNotifier; -import com.netcracker.core.declarative.service.NoopCompositeConsulUpdaterImpl; +import com.netcracker.core.declarative.service.*; import com.netcracker.core.declarative.service.composite.CompositeStructureWatcher; import io.fabric8.kubernetes.api.model.HasMetadata; import io.fabric8.kubernetes.api.model.runtime.RawExtension; @@ -22,13 +18,8 @@ import java.util.Map; import java.util.Set; -import static com.netcracker.core.declarative.client.reconciler.CompositeReconciler.DBAAS_NAME; -import static com.netcracker.core.declarative.client.reconciler.CompositeReconciler.MAAS_NAME; -import static com.netcracker.core.declarative.client.reconciler.CompositeReconciler.XAAS_UPDATED_STEP_NAME; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertFalse; -import static org.junit.jupiter.api.Assertions.assertNotNull; -import static org.junit.jupiter.api.Assertions.assertTrue; +import static com.netcracker.core.declarative.client.reconciler.CompositeReconciler.*; +import static org.junit.jupiter.api.Assertions.*; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.*; @@ -84,7 +75,9 @@ void reconcileInternal_modify_index() throws Exception { List.of( new CompositeStructureUpdateNotifier(MAAS_NAME, compositeClient), new CompositeStructureUpdateNotifier(DBAAS_NAME, compositeClient) - ) + ), + mock(CompositeStructureWatcher.class), + new CompositeCRHolder() ); Composite composite = new Composite(); @@ -95,7 +88,7 @@ void reconcileInternal_modify_index() throws Exception { } - @Test + @Test void reconcileInternal_no_consul() throws Exception { KubernetesClient kubernetesClient = mock(KubernetesClient.class); NamespaceableResource namespaceableResource = mock(NamespaceableResource.class); From 8ef41aa2b3e6a3eca06d1ab19da374216a64239c Mon Sep 17 00:00:00 2001 From: Evgenii Stetsenko Date: Wed, 4 Feb 2026 16:37:58 +0300 Subject: [PATCH 8/8] feat: check modify index before composite structure apply --- docs/composite-structure.md | 98 +++++++++++++++++++++++++++++++++++++ 1 file changed, 98 insertions(+) create mode 100644 docs/composite-structure.md diff --git a/docs/composite-structure.md b/docs/composite-structure.md new file mode 100644 index 00000000..2b484b0e --- /dev/null +++ b/docs/composite-structure.md @@ -0,0 +1,98 @@ +# Composite Structure + +## Overview + +A **composite structure** represents a logical grouping of namespaces or services that are treated as a single dynamic unit. + +The composite structure defines **membership only** — it does not describe behavior, configuration, or lifecycle of individual members. Its purpose is to provide a consistent and up-to-date view of which entities belong to a composite at any given time. + +--- + +## Composite Identifier + +Each composite is identified by a unique **composite ID**. + +The composite ID: + +* is used as a key prefix in Consul, +* is included in all composite structure notifications, +* uniquely identifies a composite across the platform. + +--- + +## Composite Members + +A **composite member** is a namespace (or logical unit) that belongs to a composite. + +Characteristics: + +* Membership is **dynamic** — members can be added at runtime. +* The member list represents the **current desired state** of the composite. +* Ordering of members is not significant; membership is treated as a set. + +--- + +## Storage Model + +### Source of Truth + +The composite structure is stored in **Consul**, which acts as the single source of truth. + +For each composite: + +* Members are stored as a set of keys under a composite-specific prefix. +* The presence or absence of a key determines membership. + +--- + +### Modify Index + +When reading composite membership from Consul, the response includes a **modify index**. + +The modify index: + +* is a monotonically increasing value maintained by Consul, +* changes whenever any member is added or removed, +* represents the **version of the composite structure**. + +If the composite membership does not change, the modify index remains the same. + +--- + +## Composite Structure Model + +Conceptually, a composite structure consists of: + +* **Composite ID** +* **Member set** +* **Modify index** + +This can be represented as: + +```text +CompositeStructure + ├─ compositeId + ├─ members (set) + └─ modifyIndex +``` + +The modify index applies to the structure as a whole, not to individual members. + +--- + +## Propagation and Consumers + +The composite structure is propagated to external systems as a **snapshot**, not as a delta. + +Each propagation includes: + +* the full member set, +* the current modify index. + +Consumers may use the modify index to: + +* detect whether the structure has changed since the last update, +* ignore duplicate updates, +* enforce ordering or freshness constraints. + +The interpretation of the modify index is the responsibility of the consumer.