Skip to content

Commit a78bdef

Browse files
authored
Merge branch 'main' into multi-tenancy-index
Signed-off-by: Brian Flores <[email protected]>
2 parents e824173 + b6952b9 commit a78bdef

File tree

12 files changed

+95
-19
lines changed

12 files changed

+95
-19
lines changed

common/src/main/java/org/opensearch/ml/common/settings/MLCommonsSettings.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -361,7 +361,7 @@ private MLCommonsSettings() {}
361361

362362
// Feature flag for enabling telemetry static metric collection job -- MLStatsJobProcessor
363363
public static final Setting<Boolean> ML_COMMONS_STATIC_METRIC_COLLECTION_ENABLED = Setting
364-
.boolSetting("plugins.ml_commons.metrics_static_collection_enabled", false, Setting.Property.NodeScope, Setting.Property.Final);
364+
.boolSetting("plugins.ml_commons.metrics_static_collection_enabled", false, Setting.Property.NodeScope, Setting.Property.Dynamic);
365365

366366
// Feature flag for Agentic memory APIs
367367
public static final Setting<Boolean> ML_COMMONS_AGENTIC_MEMORY_ENABLED = Setting

common/src/main/java/org/opensearch/ml/common/settings/MLFeatureEnabledSetting.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,12 @@ public MLFeatureEnabledSetting(ClusterService clusterService, Settings settings)
106106
clusterService.getClusterSettings().addSettingsUpdateConsumer(ML_COMMONS_AGENTIC_SEARCH_ENABLED, it -> isAgenticSearchEnabled = it);
107107
clusterService.getClusterSettings().addSettingsUpdateConsumer(ML_COMMONS_MCP_CONNECTOR_ENABLED, it -> isMcpConnectorEnabled = it);
108108
clusterService.getClusterSettings().addSettingsUpdateConsumer(ML_COMMONS_AGENTIC_MEMORY_ENABLED, it -> isAgenticMemoryEnabled = it);
109+
clusterService.getClusterSettings().addSettingsUpdateConsumer(ML_COMMONS_STATIC_METRIC_COLLECTION_ENABLED, it -> {
110+
isStaticMetricCollectionEnabled = it;
111+
for (SettingsChangeListener listener : listeners) {
112+
listener.onStaticMetricCollectionEnabledChanged(it);
113+
}
114+
});
109115
}
110116

111117
/**

common/src/main/java/org/opensearch/ml/common/settings/SettingsChangeListener.java

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,5 +18,20 @@ public interface SettingsChangeListener {
1818
* <li><code>false</code> if multi-tenancy is disabled</li>
1919
* </ul>
2020
*/
21-
void onMultiTenancyEnabledChanged(boolean isEnabled);
21+
default void onMultiTenancyEnabledChanged(boolean isEnabled) {
22+
// do nothing
23+
}
24+
25+
/**
26+
* Callback method that gets triggered when the static metric collection setting changes.
27+
*
28+
* @param isEnabled A boolean value indicating the new state of the static metric collection setting:
29+
* <ul>
30+
* <li><code>true</code> if static metric collection is enabled</li>
31+
* <li><code>false</code> if static metric collection is disabled</li>
32+
* </ul>
33+
*/
34+
default void onStaticMetricCollectionEnabledChanged(boolean isEnabled) {
35+
// do nothing
36+
}
2237
}

common/src/test/java/org/opensearch/ml/common/settings/MLFeatureEnabledSettingTests.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -170,4 +170,19 @@ public void testAgenticMemoryCanBeDisabled() {
170170
MLFeatureEnabledSetting setting = new MLFeatureEnabledSetting(mockClusterService, settings);
171171
assertFalse(setting.isAgenticMemoryEnabled());
172172
}
173+
174+
@Test
175+
public void testStaticMetricCollectionSettingChangeNotifiesListeners() {
176+
Settings settings = Settings.builder().put("plugins.ml_commons.metrics_static_collection_enabled", false).build();
177+
178+
MLFeatureEnabledSetting setting = new MLFeatureEnabledSetting(mockClusterService, settings);
179+
180+
SettingsChangeListener mockListener = mock(SettingsChangeListener.class);
181+
setting.addListener(mockListener);
182+
183+
mockClusterSettings.applySettings(Settings.builder().put("plugins.ml_commons.metrics_static_collection_enabled", true).build());
184+
185+
verify(mockListener).onStaticMetricCollectionEnabledChanged(true);
186+
assertTrue(setting.isStaticMetricCollectionEnabled());
187+
}
173188
}

plugin/src/main/java/org/opensearch/ml/cluster/MLCommonsClusterEventListener.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ public void clusterChanged(ClusterChangedEvent event) {
9191
for (DiscoveryNode node : state.nodes()) {
9292
if (node.isDataNode() && Version.V_3_1_0.onOrAfter(node.getVersion())) {
9393
if (mlFeatureEnabledSetting.isMetricCollectionEnabled() && mlFeatureEnabledSetting.isStaticMetricCollectionEnabled()) {
94-
mlTaskManager.startStatsCollectorJob();
94+
mlTaskManager.indexStatsCollectorJob(true);
9595
}
9696

9797
if (clusterService.state().getMetadata().hasIndex(TASK_POLLING_JOB_INDEX)) {

plugin/src/main/java/org/opensearch/ml/jobs/MLJobParameter.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,14 +49,14 @@ public class MLJobParameter implements ScheduledJobParameter {
4949

5050
public MLJobParameter() {}
5151

52-
public MLJobParameter(String name, Schedule schedule, Long lockDurationSeconds, Double jitter, MLJobType jobType) {
52+
public MLJobParameter(String name, Schedule schedule, Long lockDurationSeconds, Double jitter, MLJobType jobType, boolean isEnabled) {
5353
this.jobName = name;
5454
this.schedule = schedule;
5555
this.lockDurationSeconds = lockDurationSeconds;
5656
this.jitter = jitter;
5757

5858
Instant now = Instant.now();
59-
this.isEnabled = true;
59+
this.isEnabled = isEnabled;
6060
this.enabledTime = now;
6161
this.lastUpdateTime = now;
6262
this.jobType = jobType;

plugin/src/main/java/org/opensearch/ml/jobs/MLJobRunner.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,10 @@ public void runJob(ScheduledJobParameter scheduledJobParameter, JobExecutionCont
9393
throw new IllegalArgumentException("Job parameters is invalid.");
9494
}
9595

96+
if (!jobParameter.isEnabled()) {
97+
throw new IllegalStateException(String.format("Attempted to run disabled job of type: %s", jobParameter.getJobType().name()));
98+
}
99+
96100
switch (jobParameter.getJobType()) {
97101
case STATS_COLLECTOR:
98102
MLStatsJobProcessor

plugin/src/main/java/org/opensearch/ml/plugin/MachineLearningPlugin.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -600,6 +600,8 @@ public Collection<Object> createComponents(
600600
Path dataPath = environment.dataFiles()[0];
601601

602602
mlFeatureEnabledSetting = new MLFeatureEnabledSetting(clusterService, settings);
603+
mlFeatureEnabledSetting.addListener(mlTaskManager);
604+
603605
mlIndicesHandler = new MLIndicesHandler(clusterService, client, mlFeatureEnabledSetting);
604606

605607
SdkClient sdkClient = SdkClientFactory
@@ -666,6 +668,7 @@ public Collection<Object> createComponents(
666668
mlInputDatasetHandler = new MLInputDatasetHandler(client);
667669
modelAccessControlHelper = new ModelAccessControlHelper(clusterService, settings);
668670
connectorAccessControlHelper = new ConnectorAccessControlHelper(clusterService, settings);
671+
669672
mlModelManager = new MLModelManager(
670673
clusterService,
671674
scriptService,

plugin/src/main/java/org/opensearch/ml/task/MLTaskManager.java

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@
5151
import org.opensearch.ml.common.exception.MLException;
5252
import org.opensearch.ml.common.exception.MLLimitExceededException;
5353
import org.opensearch.ml.common.exception.MLResourceNotFoundException;
54+
import org.opensearch.ml.common.settings.SettingsChangeListener;
5455
import org.opensearch.ml.engine.indices.MLIndicesHandler;
5556
import org.opensearch.ml.jobs.MLJobParameter;
5657
import org.opensearch.ml.jobs.MLJobType;
@@ -73,7 +74,7 @@
7374
* MLTaskManager is responsible for managing MLTask.
7475
*/
7576
@Log4j2
76-
public class MLTaskManager {
77+
public class MLTaskManager implements SettingsChangeListener {
7778
public static int TASK_SEMAPHORE_TIMEOUT = 5000; // 5 seconds
7879
private final Map<String, MLTaskCache> taskCaches;
7980
private final Client client;
@@ -553,7 +554,8 @@ public void startTaskPollingJob() {
553554
new IntervalSchedule(Instant.now(), 1, ChronoUnit.MINUTES),
554555
20L,
555556
null,
556-
MLJobType.BATCH_TASK_UPDATE
557+
MLJobType.BATCH_TASK_UPDATE,
558+
true
557559
);
558560

559561
IndexRequest indexRequest = new IndexRequest()
@@ -562,24 +564,27 @@ public void startTaskPollingJob() {
562564
.source(jobParameter.toXContent(JsonXContent.contentBuilder(), null))
563565
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
564566

565-
startJob(indexRequest, MLJobType.BATCH_TASK_UPDATE, () -> this.taskPollingJobStarted = true);
567+
indexJob(indexRequest, MLJobType.BATCH_TASK_UPDATE, () -> this.taskPollingJobStarted = true);
566568
} catch (IOException e) {
567569
log.error("Failed to index task polling job", e);
568570
}
569571
}
570572

571-
public void startStatsCollectorJob() {
572-
if (statsCollectorJobStarted) {
573-
return;
574-
}
573+
@Override
574+
public void onStaticMetricCollectionEnabledChanged(boolean isEnabled) {
575+
log.info("Static metric collection setting changed to: {}", isEnabled);
576+
indexStatsCollectorJob(isEnabled);
577+
}
575578

579+
public void indexStatsCollectorJob(boolean enabled) {
576580
try {
577581
MLJobParameter jobParameter = new MLJobParameter(
578582
MLJobType.STATS_COLLECTOR.name(),
579583
new IntervalSchedule(Instant.now(), 5, ChronoUnit.MINUTES),
580584
60L,
581585
null,
582-
MLJobType.STATS_COLLECTOR
586+
MLJobType.STATS_COLLECTOR,
587+
enabled
583588
);
584589

585590
IndexRequest indexRequest = new IndexRequest()
@@ -588,7 +593,7 @@ public void startStatsCollectorJob() {
588593
.source(jobParameter.toXContent(JsonXContent.contentBuilder(), null))
589594
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
590595

591-
startJob(indexRequest, MLJobType.STATS_COLLECTOR, () -> this.statsCollectorJobStarted = true);
596+
indexJob(indexRequest, MLJobType.STATS_COLLECTOR, () -> {});
592597
} catch (IOException e) {
593598
log.error("Failed to index stats collection job", e);
594599
}
@@ -601,7 +606,7 @@ public void startStatsCollectorJob() {
601606
* @param jobType the type of job being started
602607
* @param successCallback callback to execute on successful job indexing
603608
*/
604-
private void startJob(IndexRequest indexRequest, MLJobType jobType, Runnable successCallback) {
609+
private void indexJob(IndexRequest indexRequest, MLJobType jobType, Runnable successCallback) {
605610
mlIndicesHandler.initMLJobsIndex(ActionListener.wrap(success -> {
606611
if (success) {
607612
try (ThreadContext.StoredContext context = client.threadPool().getThreadContext().stashContext()) {

plugin/src/test/java/org/opensearch/ml/jobs/MLJobParameterTests.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ public void setUp() {
3535
lockDurationSeconds = 20L;
3636
jitter = 0.5;
3737
jobType = null;
38-
jobParameter = new MLJobParameter(jobName, schedule, lockDurationSeconds, jitter, jobType);
38+
jobParameter = new MLJobParameter(jobName, schedule, lockDurationSeconds, jitter, jobType, true);
3939
}
4040

4141
@Test
@@ -54,7 +54,7 @@ public void testToXContent() throws Exception {
5454
@Test
5555
public void testNullCase() throws IOException {
5656
String newJobName = "test-job";
57-
MLJobParameter nullParameter = new MLJobParameter(newJobName, null, null, null, null);
57+
MLJobParameter nullParameter = new MLJobParameter(newJobName, null, null, null, null, true);
5858
nullParameter.setLastUpdateTime(null);
5959
nullParameter.setEnabledTime(null);
6060

@@ -64,6 +64,7 @@ public void testNullCase() throws IOException {
6464

6565
assertTrue(jsonString.contains(newJobName));
6666
assertEquals(newJobName, nullParameter.getName());
67+
assertTrue(nullParameter.isEnabled());
6768
assertNull(nullParameter.getSchedule());
6869
assertNull(nullParameter.getLockDurationSeconds());
6970
assertNull(nullParameter.getJitter());

0 commit comments

Comments
 (0)