Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions docker/application.yml
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
# Application Configuration

cluster:
name: default-cluster

Expand All @@ -18,6 +19,8 @@ coordinator_goal_state:
controller:
# Controller ID - REQUIRED: reads from NODE_NAME environment variable
id: ${NODE_NAME}
# Runtime environment for multi-cluster path isolation (staging, production, etc.)
runtime_env: ${RUNTIME_ENV:staging}
ttl:
seconds: 60
keepalive:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,12 @@
import io.clustercontroller.templates.TemplateManager;
import io.clustercontroller.store.MetadataStore;
import io.clustercontroller.store.EtcdMetadataStore;
import io.clustercontroller.store.EtcdPathResolver;
import io.etcd.jetcd.Client;

import io.clustercontroller.util.EnvironmentUtils;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
Expand Down Expand Up @@ -59,11 +62,16 @@ public ClusterControllerConfig config() {

/**
* MetadataStore bean - cluster-agnostic, uses etcd endpoints only
* Depends on EtcdPathResolver to ensure it's initialized first (reads its own config via @Value)
*/
@Bean
public MetadataStore metadataStore(ClusterControllerConfig config) {
public MetadataStore metadataStore(
ClusterControllerConfig config,
EnvironmentUtils envUtils,
EtcdPathResolver pathResolver) {
log.info("Initializing cluster-agnostic MetadataStore connection to etcd");
try {

EtcdMetadataStore store = EtcdMetadataStore.getInstance(config.getEtcdEndpoints());
// Configure coordinator goal state path from YAML config
store.setCoordinatorGoalStateLocation(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,12 +54,12 @@ public Set<String> listClusters() {
Set<String> clusters = new HashSet<>();
for (KeyValue kv : response.getKvs()) {
String path = kv.getKey().toString(UTF_8);
// Extract cluster ID from path: /multi-cluster/clusters/{id}/metadata
// Extract cluster ID from path: /multi-cluster/<env>/clusters/{id}/metadata
// Only include clusters that have metadata key (ignore assignment keys)
if (path.endsWith("/metadata")) {
String[] parts = path.split("/");
if (parts.length >= 4) {
clusters.add(parts[3]);
if (parts.length >= 5) {
clusters.add(parts[4]);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,12 +131,12 @@ public Set<String> listActiveControllers() {
Set<String> controllers = new HashSet<>();
for (KeyValue kv : response.getKvs()) {
String path = kv.getKey().toString(UTF_8);
// Extract controller ID from path: /multi-cluster/controllers/{id}/heartbeat
// Extract controller ID from path: /multi-cluster/<env>/controllers/{id}/heartbeat
// Only include paths that end with /heartbeat
if (path.endsWith(HEARTBEAT_SUFFIX)) {
String[] parts = path.split("/");
if (parts.length >= 4) {
controllers.add(parts[3]);
if (parts.length >= 5) {
controllers.add(parts[4]);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ public class EtcdMetadataStore implements MetadataStore {
*/
private EtcdMetadataStore(String[] etcdEndpoints) throws Exception {
this.etcdEndpoints = etcdEndpoints;
this.nodeId = EnvironmentUtils.getRequiredEnv("NODE_NAME");
this.nodeId = EnvironmentUtils.get("controller.id");

// Initialize Jackson ObjectMapper
this.objectMapper = new ObjectMapper()
Expand Down
65 changes: 43 additions & 22 deletions src/main/java/io/clustercontroller/store/EtcdPathResolver.java
Original file line number Diff line number Diff line change
@@ -1,32 +1,52 @@
package io.clustercontroller.store;

import java.nio.file.Paths;
import org.springframework.context.annotation.DependsOn;
import org.springframework.stereotype.Component;
import lombok.extern.slf4j.Slf4j;

import io.clustercontroller.util.EnvironmentUtils;

import static io.clustercontroller.config.Constants.*;

/**
* Centralized etcd path resolver for all metadata keys with multi-cluster support.
* Provides consistent path structure for tasks, search units, indices, and other cluster metadata.
* All methods accept dynamic cluster names to support multi-cluster operations.
* Stateless singleton - no cluster-specific state stored.
*
* Multi-cluster coordination paths are isolated by runtime environment (staging, production, etc.)
* to prevent controllers from different environments from interfering with each other.
*/
@Slf4j
@Component
@DependsOn("environmentUtils")
public class EtcdPathResolver {

private static final String PATH_DELIMITER = "/";

// Singleton instance - stateless
private static final EtcdPathResolver INSTANCE = new EtcdPathResolver();
// Singleton instance
private static EtcdPathResolver INSTANCE;

private EtcdPathResolver() {
// Private constructor for singleton
public EtcdPathResolver() {
INSTANCE = this;
log.info("EtcdPathResolver initialized");
}

public static EtcdPathResolver getInstance() {
if (INSTANCE == null) {
throw new IllegalStateException("EtcdPathResolver not initialized. Ensure Spring context is loaded.");
}
return INSTANCE;
}

/**
* Get the current runtime environment from config.
* Package-private for testing.
*/
String getRuntimeEnv() {
return EnvironmentUtils.get("controller.runtime_env");
}

// =================================================================
// CONTROLLER TASKS PATHS
// =================================================================
Expand Down Expand Up @@ -243,69 +263,70 @@ public String getClusterRoot(String clusterName) {

// =================================================================
// MULTI-CLUSTER COORDINATION PATHS
// All paths include runtime_env to isolate different environments
// =================================================================

/**
* Get multi-cluster root path
* Pattern: /multi-cluster
* Get multi-cluster root path with environment isolation
* Pattern: /multi-cluster/<runtime_env>
*/
public String getMultiClusterRoot() {
return Paths.get(PATH_DELIMITER, PATH_MULTI_CLUSTER).toString();
return Paths.get(PATH_DELIMITER, PATH_MULTI_CLUSTER, getRuntimeEnv()).toString();
}

/**
* Get controller heartbeat path
* Pattern: /multi-cluster/controllers/<controller-id>/heartbeat
* Pattern: /multi-cluster/<runtime_env>/controllers/<controller-id>/heartbeat
*/
public String getControllerHeartbeatPath(String controllerId) {
return Paths.get(PATH_DELIMITER, PATH_MULTI_CLUSTER, PATH_CONTROLLERS, controllerId, PATH_HEARTBEAT).toString();
return Paths.get(PATH_DELIMITER, PATH_MULTI_CLUSTER, getRuntimeEnv(), PATH_CONTROLLERS, controllerId, PATH_HEARTBEAT).toString();
}

/**
* Get controller assignment path
* Pattern: /multi-cluster/controllers/<controller-id>/assigned/<cluster-id>
* Pattern: /multi-cluster/<runtime_env>/controllers/<controller-id>/assigned/<cluster-id>
*/
public String getControllerAssignmentPath(String controllerId, String clusterId) {
return Paths.get(PATH_DELIMITER, PATH_MULTI_CLUSTER, PATH_CONTROLLERS, controllerId, PATH_ASSIGNED, clusterId).toString();
return Paths.get(PATH_DELIMITER, PATH_MULTI_CLUSTER, getRuntimeEnv(), PATH_CONTROLLERS, controllerId, PATH_ASSIGNED, clusterId).toString();
}

/**
* Get cluster lock path
* Pattern: /multi-cluster/locks/clusters/<cluster-id>
* Pattern: /multi-cluster/<runtime_env>/locks/clusters/<cluster-id>
*/
public String getClusterLockPath(String clusterId) {
return Paths.get(PATH_DELIMITER, PATH_MULTI_CLUSTER, PATH_LOCKS, PATH_CLUSTERS, clusterId).toString();
return Paths.get(PATH_DELIMITER, PATH_MULTI_CLUSTER, getRuntimeEnv(), PATH_LOCKS, PATH_CLUSTERS, clusterId).toString();
}

/**
* Get cluster registry path
* Pattern: /multi-cluster/clusters/<cluster-id>/metadata
* Pattern: /multi-cluster/<runtime_env>/clusters/<cluster-id>/metadata
*/
public String getClusterRegistryPath(String clusterId) {
return Paths.get(PATH_DELIMITER, PATH_MULTI_CLUSTER, PATH_CLUSTERS, clusterId, PATH_METADATA).toString();
return Paths.get(PATH_DELIMITER, PATH_MULTI_CLUSTER, getRuntimeEnv(), PATH_CLUSTERS, clusterId, PATH_METADATA).toString();
}

/**
* Get cluster's assigned controller path (for observability at cluster level)
* Pattern: /multi-cluster/clusters/<cluster-id>/assigned-to
* Pattern: /multi-cluster/<runtime_env>/clusters/<cluster-id>/assigned-to
*/
public String getClusterAssignedControllerPath(String clusterId) {
return Paths.get(PATH_DELIMITER, PATH_MULTI_CLUSTER, PATH_CLUSTERS, clusterId, "assigned-to").toString();
return Paths.get(PATH_DELIMITER, PATH_MULTI_CLUSTER, getRuntimeEnv(), PATH_CLUSTERS, clusterId, "assigned-to").toString();
}

/**
* Get controllers prefix for listing
* Pattern: /multi-cluster/controllers/
* Pattern: /multi-cluster/<runtime_env>/controllers/
*/
public String getControllersPrefix() {
return Paths.get(PATH_DELIMITER, PATH_MULTI_CLUSTER, PATH_CONTROLLERS, "").toString();
return Paths.get(PATH_DELIMITER, PATH_MULTI_CLUSTER, getRuntimeEnv(), PATH_CONTROLLERS, "").toString();
}

/**
* Get clusters prefix for listing
* Pattern: /multi-cluster/clusters/
* Pattern: /multi-cluster/<runtime_env>/clusters/
*/
public String getClustersPrefix() {
return Paths.get(PATH_DELIMITER, PATH_MULTI_CLUSTER, PATH_CLUSTERS, "").toString();
return Paths.get(PATH_DELIMITER, PATH_MULTI_CLUSTER, getRuntimeEnv(), PATH_CLUSTERS, "").toString();
}
}
72 changes: 53 additions & 19 deletions src/main/java/io/clustercontroller/util/EnvironmentUtils.java
Original file line number Diff line number Diff line change
@@ -1,38 +1,72 @@
package io.clustercontroller.util;

import org.springframework.core.env.Environment;
import org.springframework.stereotype.Component;
import jakarta.annotation.PostConstruct;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

/**
* Utility class for environment variable operations
* Simple utility for accessing config from application.yml via Spring Environment.
* Callers pass the key they need (e.g., "controller.id", "controller.runtime_env").
*/
public final class EnvironmentUtils {
@Component
public class EnvironmentUtils {

private static Environment springEnvironment;

// Test overrides - takes precedence over Spring Environment
private static final Map<String, String> testOverrides = new ConcurrentHashMap<>();

private EnvironmentUtils() {
// Utility class - prevent instantiation
private final Environment environment;

public EnvironmentUtils(Environment environment) {
this.environment = environment;
}

@PostConstruct
public void init() {
springEnvironment = this.environment;
}

/**
* Get required environment variable - throws exception if not set
* Get a config value by key from application.yml.
*
* @param name the environment variable name
* @return the trimmed environment variable value
* @throws IllegalStateException if the environment variable is not set or is empty
* @param key the property key (e.g., "controller.id", "controller.runtime_env")
* @return the property value
* @throws IllegalStateException if not initialized or property not set
*/
public static String getRequiredEnv(String name) {
String value = System.getenv(name);
public static String get(String key) {
// Check test overrides first
String override = testOverrides.get(key);
if (override != null) {
return override;
}

if (springEnvironment == null) {
throw new IllegalStateException("EnvironmentUtils not initialized yet");
}
String value = springEnvironment.getProperty(key);
if (value == null || value.trim().isEmpty()) {
throw new IllegalStateException("Required environment variable '" + name + "' is not set or is empty");
throw new IllegalStateException("Property '" + key + "' is not set in application.yml");
}
return value.trim();
}

// ========== Test Support ==========

/**
* Get environment variable with default value
*
* @param name the environment variable name
* @param defaultValue the default value to return if not set
* @return the environment variable value or default if not set
* For testing only - override a config value.
*/
public static void setForTesting(String key, String value) {
testOverrides.put(key, value);
}

/**
* For testing only - clear all overrides.
*/
public static String getEnv(String name, String defaultValue) {
String value = System.getenv(name);
return value != null ? value.trim() : defaultValue;
public static void clearTestOverrides() {
testOverrides.clear();
}
}
3 changes: 3 additions & 0 deletions src/main/resources/application.yml
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
# Application Configuration

etcd:
endpoints:
- http://localhost:2379
Expand All @@ -15,6 +16,8 @@ coordinator_goal_state:
controller:
# Controller ID - REQUIRED: reads from NODE_NAME environment variable
id: ${NODE_NAME}
# Runtime environment for multi-cluster path isolation (staging, production, etc.)
runtime_env: ${RUNTIME_ENV:staging}
ttl:
seconds: 60
keepalive:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package io.clustercontroller;

import io.clustercontroller.store.EtcdMetadataStore;
import io.clustercontroller.store.EtcdPathResolver;
import io.clustercontroller.store.MetadataStore;
import io.clustercontroller.util.EnvironmentUtils;
import io.etcd.jetcd.*;
import io.etcd.jetcd.election.CampaignResponse;
import io.etcd.jetcd.lease.LeaseGrantResponse;
Expand Down Expand Up @@ -47,6 +49,10 @@ class ClusterControllerApplicationTest {

@BeforeEach
void setUp() {
// Initialize EtcdPathResolver for tests
EnvironmentUtils.setForTesting("controller.runtime_env", "staging");
new EtcdPathResolver();

// Reset singleton instance before each test
EtcdMetadataStore.resetInstance();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@
import io.clustercontroller.models.IndexMetadata;
import io.clustercontroller.models.IndexSettings;
import io.clustercontroller.models.TypeMapping;
import io.clustercontroller.store.EtcdPathResolver;
import io.clustercontroller.store.MetadataStore;
import io.clustercontroller.templates.TemplateManager;
import io.clustercontroller.util.EnvironmentUtils;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
Expand Down Expand Up @@ -38,6 +40,10 @@ class IndexManagerTest {

@BeforeEach
void setUp() throws Exception {
// Initialize EtcdPathResolver for tests
EnvironmentUtils.setForTesting("controller.runtime_env", "staging");
new EtcdPathResolver();

// Mock template manager to return empty list by default (no matching templates)
// Use lenient() to avoid UnnecessaryStubbingException in tests that don't create indices
lenient().when(templateManager.findMatchingTemplates(any(), any())).thenReturn(new ArrayList<>());
Expand Down
Loading