diff --git a/build.gradle b/build.gradle index 62a7b4a6f..15266ada6 100644 --- a/build.gradle +++ b/build.gradle @@ -91,6 +91,7 @@ plugins { id "de.undercouch.download" version "5.6.0" id "com.netflix.nebula.ospackage" version "12.0.0" id "com.dorongold.task-tree" version "2.1.1" + id "org.gradle.test-retry" version "1.6.2" } apply plugin: 'java' @@ -146,6 +147,10 @@ opensearchplugin { tasks.named("integTest").configure { it.dependsOn(project.tasks.named("bundlePlugin")) + it.retry { + failOnPassedAfterRetry = false + maxRetries = 5 + } } allOpen { diff --git a/src/main/kotlin/org/opensearch/indexmanagement/IndexManagementPlugin.kt b/src/main/kotlin/org/opensearch/indexmanagement/IndexManagementPlugin.kt index d612f19c7..cb186eddb 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/IndexManagementPlugin.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/IndexManagementPlugin.kt @@ -29,6 +29,7 @@ import org.opensearch.core.xcontent.XContentParser.Token import org.opensearch.core.xcontent.XContentParserUtils.ensureExpectedToken import org.opensearch.env.Environment import org.opensearch.env.NodeEnvironment +import org.opensearch.identity.PluginSubject import org.opensearch.indexmanagement.IndexManagementIndices.Companion.HISTORY_ALL import org.opensearch.indexmanagement.controlcenter.notification.ControlCenterIndices import org.opensearch.indexmanagement.controlcenter.notification.action.delete.DeleteLRONConfigAction @@ -175,6 +176,7 @@ import org.opensearch.indexmanagement.transform.resthandler.RestPreviewTransform import org.opensearch.indexmanagement.transform.resthandler.RestStartTransformAction import org.opensearch.indexmanagement.transform.resthandler.RestStopTransformAction import org.opensearch.indexmanagement.transform.settings.TransformSettings +import org.opensearch.indexmanagement.util.PluginClient import org.opensearch.indices.SystemIndexDescriptor import org.opensearch.jobscheduler.spi.JobSchedulerExtension import org.opensearch.jobscheduler.spi.ScheduledJobParser @@ -182,6 +184,7 @@ import org.opensearch.jobscheduler.spi.ScheduledJobRunner import org.opensearch.monitor.jvm.JvmService import org.opensearch.plugins.ActionPlugin import org.opensearch.plugins.ExtensiblePlugin +import org.opensearch.plugins.IdentityAwarePlugin import org.opensearch.plugins.NetworkPlugin import org.opensearch.plugins.Plugin import org.opensearch.plugins.SystemIndexPlugin @@ -208,7 +211,8 @@ class IndexManagementPlugin : ActionPlugin, ExtensiblePlugin, SystemIndexPlugin, - TelemetryAwarePlugin { + TelemetryAwarePlugin, + IdentityAwarePlugin { private val logger = LogManager.getLogger(javaClass) lateinit var indexManagementIndices: IndexManagementIndices lateinit var actionValidation: ActionValidation @@ -223,6 +227,7 @@ class IndexManagementPlugin : private val extensionCheckerMap = mutableMapOf() lateinit var indexOperationActionFilter: IndexOperationActionFilter private lateinit var metricsRegistry: MetricsRegistry + private lateinit var pluginClient: PluginClient companion object { const val PLUGINS_BASE_URI = "/_plugins" @@ -398,6 +403,8 @@ class IndexManagementPlugin : environment, ) + this.pluginClient = PluginClient(client) + IndexManagementActionsMetrics.instance.initialize(metricsRegistry) rollupInterceptor = RollupInterceptor(clusterService, settings, indexNameExpressionResolver) val jvmService = JvmService(environment.settings()) @@ -430,13 +437,13 @@ class IndexManagementPlugin : .registerMetadataServices(RollupMetadataService(client, xContentRegistry)) .registerConsumers() .registerClusterConfigurationProvider(skipFlag) - indexManagementIndices = IndexManagementIndices(settings, client.admin().indices(), clusterService) - val controlCenterIndices = ControlCenterIndices(client.admin().indices(), clusterService) + indexManagementIndices = IndexManagementIndices(settings, this.pluginClient.admin().indices(), clusterService) + val controlCenterIndices = ControlCenterIndices(this.pluginClient.admin().indices(), clusterService) actionValidation = ActionValidation(settings, clusterService, jvmService) val indexStateManagementHistory = IndexStateManagementHistory( settings, - client, + this.pluginClient, threadPool, clusterService, indexManagementIndices, @@ -454,7 +461,7 @@ class IndexManagementPlugin : val extensionChecker = ExtensionStatusChecker(extensionCheckerMap, clusterService) val managedIndexRunner = ManagedIndexRunner - .registerClient(client) + .registerClient(pluginClient) .registerClusterService(clusterService) .registerValidationService(actionValidation) .registerNamedXContentRegistry(xContentRegistry) @@ -472,7 +479,7 @@ class IndexManagementPlugin : val managedIndexCoordinator = ManagedIndexCoordinator( environment.settings(), - client, clusterService, threadPool, indexManagementIndices, indexMetadataProvider, xContentRegistry, + pluginClient, clusterService, threadPool, indexManagementIndices, indexMetadataProvider, xContentRegistry, ) val smRunner = SMRunner.init(client, threadPool, settings, indexManagementIndices, clusterService) @@ -500,6 +507,7 @@ class IndexManagementPlugin : indexMetadataProvider, smRunner, pluginVersionSweepCoordinator, + pluginClient, ) } @@ -614,6 +622,10 @@ class IndexManagementPlugin : ActionPlugin.ActionHandler(DeleteLRONConfigAction.INSTANCE, TransportDeleteLRONConfigAction::class.java), ) + override fun assignSubject(pluginSubject: PluginSubject?) { + pluginClient.setSubject(pluginSubject) + } + override fun getTransportInterceptors( namedWriteableRegistry: NamedWriteableRegistry, threadContext: ThreadContext, diff --git a/src/main/kotlin/org/opensearch/indexmanagement/common/model/notification/Channel.kt b/src/main/kotlin/org/opensearch/indexmanagement/common/model/notification/Channel.kt index b007716b1..f5b209e4a 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/common/model/notification/Channel.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/common/model/notification/Channel.kt @@ -79,20 +79,21 @@ data class Channel(val id: String) : user: User?, ) { val channel = this - client.threadPool().threadContext.stashContext().use { - // We need to set the user context information in the thread context for notification plugin to correctly resolve the user object + // TODO Understand why this is called twice when reindexing is finished in NotificationActionListenerIT.test notify for reindex with runtime policy + // We need to set the user context information in the thread context for notification plugin to correctly resolve the user object + client.threadPool().threadContext.getTransient(ConfigConstants.OPENSEARCH_SECURITY_USER_INFO_THREAD_CONTEXT)?.let { client.threadPool().threadContext.putTransient(ConfigConstants.OPENSEARCH_SECURITY_USER_INFO_THREAD_CONTEXT, generateUserString(user)) - val res: SendNotificationResponse = - NotificationsPluginInterface.suspendUntil { - this.sendNotification( - (client as NodeClient), - eventSource, - ChannelMessage(message, null, null), - listOf(channel.id), - it, - ) - } - validateResponseStatus(res.getStatus(), res.notificationEvent.eventSource.referenceId) } + val res: SendNotificationResponse = + NotificationsPluginInterface.suspendUntil { + this.sendNotification( + (client as NodeClient), + eventSource, + ChannelMessage(message, null, null), + listOf(channel.id), + it, + ) + } + validateResponseStatus(res.getStatus(), res.notificationEvent.eventSource.referenceId) } } diff --git a/src/main/kotlin/org/opensearch/indexmanagement/controlcenter/notification/action/delete/TransportDeleteLRONConfigAction.kt b/src/main/kotlin/org/opensearch/indexmanagement/controlcenter/notification/action/delete/TransportDeleteLRONConfigAction.kt index 6a2740a2e..1cc10fb9a 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/controlcenter/notification/action/delete/TransportDeleteLRONConfigAction.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/controlcenter/notification/action/delete/TransportDeleteLRONConfigAction.kt @@ -15,16 +15,17 @@ import org.opensearch.common.inject.Inject import org.opensearch.commons.ConfigConstants import org.opensearch.core.action.ActionListener import org.opensearch.indexmanagement.IndexManagementPlugin +import org.opensearch.indexmanagement.util.PluginClient import org.opensearch.tasks.Task import org.opensearch.transport.TransportService -import org.opensearch.transport.client.node.NodeClient +import org.opensearch.transport.client.Client class TransportDeleteLRONConfigAction @Inject constructor( - val client: NodeClient, transportService: TransportService, actionFilters: ActionFilters, + val client: PluginClient, ) : HandledTransportAction( DeleteLRONConfigAction.NAME, transportService, actionFilters, ::DeleteLRONConfigRequest, ) { @@ -35,7 +36,7 @@ constructor( } inner class DeleteLRONConfigHandler( - private val client: NodeClient, + private val client: Client, private val actionListener: ActionListener, private val request: DeleteLRONConfigRequest, private val docId: String = request.docId, @@ -49,13 +50,11 @@ constructor( }", ) - client.threadPool().threadContext.stashContext().use { - val deleteRequest = - DeleteRequest(IndexManagementPlugin.CONTROL_CENTER_INDEX, docId) - .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) + val deleteRequest = + DeleteRequest(IndexManagementPlugin.CONTROL_CENTER_INDEX, docId) + .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) - client.delete(deleteRequest, actionListener) - } + client.delete(deleteRequest, actionListener) } } } diff --git a/src/main/kotlin/org/opensearch/indexmanagement/controlcenter/notification/action/get/TransportGetLRONConfigAction.kt b/src/main/kotlin/org/opensearch/indexmanagement/controlcenter/notification/action/get/TransportGetLRONConfigAction.kt index d79c740db..1c3b3593f 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/controlcenter/notification/action/get/TransportGetLRONConfigAction.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/controlcenter/notification/action/get/TransportGetLRONConfigAction.kt @@ -26,15 +26,16 @@ import org.opensearch.indexmanagement.controlcenter.notification.LRONConfigRespo import org.opensearch.indexmanagement.controlcenter.notification.model.LRONConfig import org.opensearch.indexmanagement.controlcenter.notification.util.getLRONConfigAndParse import org.opensearch.indexmanagement.opensearchapi.parseWithType +import org.opensearch.indexmanagement.util.PluginClient import org.opensearch.search.builder.SearchSourceBuilder import org.opensearch.tasks.Task import org.opensearch.transport.TransportService -import org.opensearch.transport.client.node.NodeClient +import org.opensearch.transport.client.Client class TransportGetLRONConfigAction @Inject constructor( - val client: NodeClient, + val client: PluginClient, transportService: TransportService, actionFilters: ActionFilters, val xContentRegistry: NamedXContentRegistry, @@ -48,7 +49,7 @@ constructor( } inner class GetLRONConfigHandler( - private val client: NodeClient, + private val client: Client, private val actionListener: ActionListener, private val request: GetLRONConfigRequest, ) { @@ -58,25 +59,23 @@ constructor( ConfigConstants.OPENSEARCH_SECURITY_USER_INFO_THREAD_CONTEXT, )}", ) - client.threadPool().threadContext.stashContext().use { - if (null != request.docId) { - getLRONConfigAndParse( - client, - request.docId, - xContentRegistry, - object : ActionListener { - override fun onResponse(response: LRONConfigResponse) { - actionListener.onResponse(GetLRONConfigResponse(listOf(response), 1)) - } + if (null != request.docId) { + getLRONConfigAndParse( + client, + request.docId, + xContentRegistry, + object : ActionListener { + override fun onResponse(response: LRONConfigResponse) { + actionListener.onResponse(GetLRONConfigResponse(listOf(response), 1)) + } - override fun onFailure(e: Exception) { - actionListener.onFailure(ExceptionsHelper.unwrapCause(e) as Exception) - } - }, - ) - } else { - doSearch() - } + override fun onFailure(e: Exception) { + actionListener.onFailure(ExceptionsHelper.unwrapCause(e) as Exception) + } + }, + ) + } else { + doSearch() } } diff --git a/src/main/kotlin/org/opensearch/indexmanagement/controlcenter/notification/action/index/TransportIndexLRONConfigAction.kt b/src/main/kotlin/org/opensearch/indexmanagement/controlcenter/notification/action/index/TransportIndexLRONConfigAction.kt index 0a4be1c41..314e9b1f8 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/controlcenter/notification/action/index/TransportIndexLRONConfigAction.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/controlcenter/notification/action/index/TransportIndexLRONConfigAction.kt @@ -28,16 +28,17 @@ import org.opensearch.indexmanagement.controlcenter.notification.ControlCenterIn import org.opensearch.indexmanagement.controlcenter.notification.LRONConfigResponse import org.opensearch.indexmanagement.controlcenter.notification.util.getDocID import org.opensearch.indexmanagement.controlcenter.notification.util.getPriority +import org.opensearch.indexmanagement.util.PluginClient import org.opensearch.indexmanagement.util.SecurityUtils import org.opensearch.tasks.Task import org.opensearch.transport.TransportService -import org.opensearch.transport.client.node.NodeClient +import org.opensearch.transport.client.Client @Suppress("LongParameterList") class TransportIndexLRONConfigAction @Inject constructor( - val client: NodeClient, + val client: PluginClient, transportService: TransportService, actionFilters: ActionFilters, val clusterService: ClusterService, @@ -53,7 +54,7 @@ constructor( } inner class IndexLRONConfigHandler( - private val client: NodeClient, + private val client: Client, private val actionListener: ActionListener, private val request: IndexLRONConfigRequest, private val user: User? = SecurityUtils.buildUser(client.threadPool().threadContext), @@ -65,16 +66,14 @@ constructor( ConfigConstants.OPENSEARCH_SECURITY_USER_INFO_THREAD_CONTEXT, )}", ) - client.threadPool().threadContext.stashContext().use { - // we use dryRun to help check permission and do request validation - if (request.dryRun) { - validate() - return - } - controlCenterIndices.checkAndUpdateControlCenterIndex( - ActionListener.wrap(::onCreateMappingsResponse, actionListener::onFailure), - ) + // we use dryRun to help check permission and do request validation + if (request.dryRun) { + validate() + return } + controlCenterIndices.checkAndUpdateControlCenterIndex( + ActionListener.wrap(::onCreateMappingsResponse, actionListener::onFailure), + ) } private fun onCreateMappingsResponse(response: AcknowledgedResponse) { diff --git a/src/main/kotlin/org/opensearch/indexmanagement/controlcenter/notification/filter/NotificationActionListener.kt b/src/main/kotlin/org/opensearch/indexmanagement/controlcenter/notification/filter/NotificationActionListener.kt index f3f99f76f..d2e41b830 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/controlcenter/notification/filter/NotificationActionListener.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/controlcenter/notification/filter/NotificationActionListener.kt @@ -168,31 +168,30 @@ class NotificationActionListener { - override fun onResponse(lronConfigResponse: GetLRONConfigResponse) { - launch { - sendNotification(lronConfigResponse, taskId, action, result) - } + // TODO verify this works + client.execute( + GetLRONConfigAction.INSTANCE, + GetLRONConfigRequest(searchParams = searchParam), + object : ActionListener { + override fun onResponse(lronConfigResponse: GetLRONConfigResponse) { + launch { + sendNotification(lronConfigResponse, taskId, action, result) } + } - override fun onFailure(e: Exception) { - if (e is IndexNotFoundException) { - logger.debug( - "No notification policy configured for task: {} action: {}", - taskId.toString(), - action, - ) - } else { - logger.error("Can't get notification policy for action: {}", action, e) - } + override fun onFailure(e: Exception) { + if (e is IndexNotFoundException) { + logger.debug( + "No notification policy configured for task: {} action: {}", + taskId.toString(), + action, + ) + } else { + logger.error("Can't get notification policy for action: {}", action, e) } - }, - ) - } + } + }, + ) } @Suppress("NestedBlockDepth") diff --git a/src/main/kotlin/org/opensearch/indexmanagement/controlcenter/notification/util/LRONUtils.kt b/src/main/kotlin/org/opensearch/indexmanagement/controlcenter/notification/util/LRONUtils.kt index 1f70e51bd..6cd419ca8 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/controlcenter/notification/util/LRONUtils.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/controlcenter/notification/util/LRONUtils.kt @@ -22,7 +22,7 @@ import org.opensearch.indexmanagement.IndexManagementPlugin import org.opensearch.indexmanagement.controlcenter.notification.LRONConfigResponse import org.opensearch.indexmanagement.controlcenter.notification.model.LRONConfig import org.opensearch.indexmanagement.opensearchapi.parseFromGetResponse -import org.opensearch.transport.client.node.NodeClient +import org.opensearch.transport.client.Client const val LRON_DOC_ID_PREFIX = "LRON:" @@ -68,7 +68,7 @@ fun getDocID(taskId: TaskId? = null, actionName: String? = null): String { } fun getLRONConfigAndParse( - client: NodeClient, + client: Client, docId: String, xContentRegistry: NamedXContentRegistry, actionListener: ActionListener, diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/IndexStateManagementHistory.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/IndexStateManagementHistory.kt index eec42c261..9e8781fed 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/IndexStateManagementHistory.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/IndexStateManagementHistory.kt @@ -29,7 +29,6 @@ import org.opensearch.indexmanagement.indexstatemanagement.settings.ManagedIndex import org.opensearch.indexmanagement.indexstatemanagement.util.INDEX_HIDDEN import org.opensearch.indexmanagement.indexstatemanagement.util.INDEX_NUMBER_OF_REPLICAS import org.opensearch.indexmanagement.indexstatemanagement.util.INDEX_NUMBER_OF_SHARDS -import org.opensearch.indexmanagement.opensearchapi.OPENDISTRO_SECURITY_PROTECTED_INDICES_CONF_REQUEST import org.opensearch.indexmanagement.opensearchapi.suspendUntil import org.opensearch.indexmanagement.spi.indexstatemanagement.Step import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ManagedIndexMetaData @@ -119,16 +118,8 @@ class IndexStateManagementHistory( } private fun rolloverAndDeleteHistoryIndex() { - val ctx = threadPool.threadContext.stashContext() - try { - if (threadPool.threadContext.getTransient(OPENDISTRO_SECURITY_PROTECTED_INDICES_CONF_REQUEST) == null) { - threadPool.threadContext.putTransient(OPENDISTRO_SECURITY_PROTECTED_INDICES_CONF_REQUEST, "true") - } - if (historyEnabled) rolloverHistoryIndex() - deleteOldHistoryIndex() - } finally { - ctx.close() - } + if (historyEnabled) rolloverHistoryIndex() + deleteOldHistoryIndex() } private fun rolloverHistoryIndex() { diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/ManagedIndexCoordinator.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/ManagedIndexCoordinator.kt index b628329ec..11666a71c 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/ManagedIndexCoordinator.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/ManagedIndexCoordinator.kt @@ -83,6 +83,7 @@ import org.opensearch.indexmanagement.opensearchapi.withClosableContext import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ISMIndexMetadata import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ManagedIndexMetaData import org.opensearch.indexmanagement.util.OpenForTesting +import org.opensearch.indexmanagement.util.PluginClient import org.opensearch.search.builder.SearchSourceBuilder import org.opensearch.threadpool.Scheduler import org.opensearch.threadpool.ThreadPool @@ -107,7 +108,7 @@ import org.opensearch.transport.client.Client @OpenForTesting class ManagedIndexCoordinator( private val settings: Settings, - private val client: Client, + private val client: PluginClient, private val clusterService: ClusterService, private val threadPool: ThreadPool, indexManagementIndices: IndexManagementIndices, @@ -403,6 +404,7 @@ class ManagedIndexCoordinator( if (policy.user != null) { try { val request = ManagedIndexRequest().indices(indexName) + // TODO Do we need to continue to support injected user? withClosableContext(IndexManagementSecurityContext("ApplyPolicyOnIndexCreation", settings, threadPool.threadContext, policy.user)) { client.suspendUntil { execute(ManagedIndexAction.INSTANCE, request, it) } } diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/ManagedIndexRunner.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/ManagedIndexRunner.kt index ae8403348..32a60a034 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/ManagedIndexRunner.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/ManagedIndexRunner.kt @@ -93,6 +93,7 @@ import org.opensearch.indexmanagement.spi.indexstatemanagement.model.PolicyRetry import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StateMetaData import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepContext import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepMetaData +import org.opensearch.indexmanagement.util.PluginClient import org.opensearch.jobscheduler.spi.JobExecutionContext import org.opensearch.jobscheduler.spi.LockModel import org.opensearch.jobscheduler.spi.ScheduledJobParameter @@ -102,7 +103,6 @@ import org.opensearch.script.Script import org.opensearch.script.ScriptService import org.opensearch.script.TemplateScript import org.opensearch.threadpool.ThreadPool -import org.opensearch.transport.client.Client import java.time.Instant import java.time.temporal.ChronoUnit @@ -113,7 +113,7 @@ object ManagedIndexRunner : private val logger = LogManager.getLogger(javaClass) private lateinit var clusterService: ClusterService - private lateinit var client: Client + private lateinit var client: PluginClient private lateinit var xContentRegistry: NamedXContentRegistry private lateinit var scriptService: ScriptService private lateinit var settings: Settings @@ -144,7 +144,7 @@ object ManagedIndexRunner : return this } - fun registerClient(client: Client): ManagedIndexRunner { + fun registerClient(client: PluginClient): ManagedIndexRunner { this.client = client return this } diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/shrink/ShrinkStep.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/shrink/ShrinkStep.kt index bace383c7..3308d3d20 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/shrink/ShrinkStep.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/shrink/ShrinkStep.kt @@ -131,13 +131,12 @@ abstract class ShrinkStep( try { if (client != null) { // Use plugin level permissions when deleting the failed target shrink index after a failure - client.threadPool().threadContext.stashContext().use { - val deleteRequest = DeleteIndexRequest(targetIndexName) - val response: AcknowledgedResponse = - client.admin().indices().suspendUntil { delete(deleteRequest, it) } - if (!response.isAcknowledged) { - logger.error("Shrink action failed to delete target index [$targetIndexName] during cleanup after a failure") - } + // TODO should this use case be supported or is it ok to use user's privileges? + val deleteRequest = DeleteIndexRequest(targetIndexName) + val response: AcknowledgedResponse = + client.admin().indices().suspendUntil { delete(deleteRequest, it) } + if (!response.isAcknowledged) { + logger.error("Shrink action failed to delete target index [$targetIndexName] during cleanup after a failure") } } else { logger.error( diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/stopreplication/AttemptStopReplicationStep.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/stopreplication/AttemptStopReplicationStep.kt index df0df0dee..38d83be9c 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/stopreplication/AttemptStopReplicationStep.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/stopreplication/AttemptStopReplicationStep.kt @@ -14,6 +14,7 @@ import org.opensearch.indexmanagement.opensearchapi.suspendUntil import org.opensearch.indexmanagement.spi.indexstatemanagement.Step import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ManagedIndexMetaData import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepMetaData +import org.opensearch.indexmanagement.util.PluginClient import org.opensearch.snapshots.SnapshotInProgressException import org.opensearch.transport.RemoteTransportException @@ -28,8 +29,9 @@ class AttemptStopReplicationStep : Step(name) { try { val stopIndexReplicationRequestObj = StopIndexReplicationRequest(indexName) val response: AcknowledgedResponse = context.client.suspendUntil { + val pluginClient = context.client as PluginClient? ReplicationPluginInterface.stopReplication( - context.client, + pluginClient!!.innerClient(), stopIndexReplicationRequestObj, it, ) diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/addpolicy/TransportAddPolicyAction.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/addpolicy/TransportAddPolicyAction.kt index 88178cada..428d59316 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/addpolicy/TransportAddPolicyAction.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/addpolicy/TransportAddPolicyAction.kt @@ -24,6 +24,7 @@ import org.opensearch.action.get.MultiGetResponse import org.opensearch.action.support.ActionFilters import org.opensearch.action.support.HandledTransportAction import org.opensearch.action.support.IndicesOptions +import org.opensearch.action.support.WriteRequest import org.opensearch.action.support.clustermanager.AcknowledgedResponse import org.opensearch.cluster.block.ClusterBlockException import org.opensearch.cluster.service.ClusterService @@ -62,6 +63,7 @@ import org.opensearch.indexmanagement.util.SecurityUtils.Companion.userHasPermis import org.opensearch.indexmanagement.util.SecurityUtils.Companion.validateUserConfiguration import org.opensearch.tasks.Task import org.opensearch.transport.TransportService +import org.opensearch.transport.client.Client import org.opensearch.transport.client.node.NodeClient import java.time.Duration import java.time.Instant @@ -106,7 +108,7 @@ constructor( @Suppress("TooManyFunctions") inner class AddPolicyHandler( - private val client: NodeClient, + private val client: Client, private val actionListener: ActionListener, private val request: AddPolicyRequest, private val user: User? = buildUser(client.threadPool().threadContext), @@ -218,12 +220,10 @@ constructor( private fun getPolicy() { val getRequest = GetRequest(INDEX_MANAGEMENT_INDEX, request.policyID) - client.threadPool().threadContext.stashContext().use { - if (!validateUserConfiguration(user, filterByEnabled, actionListener)) { - return - } - client.get(getRequest, ActionListener.wrap(::onGetPolicyResponse, ::onFailure)) + if (!validateUserConfiguration(user, filterByEnabled, actionListener)) { + return } + client.get(getRequest, ActionListener.wrap(::onGetPolicyResponse, ::onFailure)) } private fun onGetPolicyResponse(response: GetResponse) { @@ -329,6 +329,8 @@ constructor( ) } + bulkReq.refreshPolicy = WriteRequest.RefreshPolicy.IMMEDIATE + client.bulk( bulkReq, object : ActionListener { @@ -375,6 +377,7 @@ constructor( fun removeMetadatas(indices: List) { val request = indices.map { deleteManagedIndexMetadataRequest(it.uuid) } val bulkReq = BulkRequest().add(request) + bulkReq.refreshPolicy = WriteRequest.RefreshPolicy.IMMEDIATE client.bulk( bulkReq, object : ActionListener { diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/changepolicy/TransportChangePolicyAction.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/changepolicy/TransportChangePolicyAction.kt index a712a814f..1583e03cf 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/changepolicy/TransportChangePolicyAction.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/changepolicy/TransportChangePolicyAction.kt @@ -60,13 +60,14 @@ import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ManagedInde import org.opensearch.indexmanagement.util.IndexManagementException import org.opensearch.indexmanagement.util.IndexUtils import org.opensearch.indexmanagement.util.NO_ID +import org.opensearch.indexmanagement.util.PluginClient import org.opensearch.indexmanagement.util.SecurityUtils.Companion.buildUser import org.opensearch.indexmanagement.util.SecurityUtils.Companion.userHasPermissionForResource import org.opensearch.indexmanagement.util.SecurityUtils.Companion.validateUserConfiguration import org.opensearch.search.fetch.subphase.FetchSourceContext import org.opensearch.tasks.Task import org.opensearch.transport.TransportService -import org.opensearch.transport.client.node.NodeClient +import org.opensearch.transport.client.Client import java.lang.IllegalArgumentException private val log = LogManager.getLogger(TransportChangePolicyAction::class.java) @@ -75,7 +76,7 @@ private val log = LogManager.getLogger(TransportChangePolicyAction::class.java) class TransportChangePolicyAction @Inject constructor( - val client: NodeClient, + val client: PluginClient, transportService: TransportService, actionFilters: ActionFilters, val clusterService: ClusterService, @@ -98,7 +99,7 @@ constructor( } inner class ChangePolicyHandler( - private val client: NodeClient, + private val client: Client, private val actionListener: ActionListener, private val request: ChangePolicyRequest, private val user: User? = buildUser(client.threadPool().threadContext), @@ -156,12 +157,10 @@ constructor( private fun getPolicy() { val getRequest = GetRequest(IndexManagementPlugin.INDEX_MANAGEMENT_INDEX, changePolicy.policyID) - client.threadPool().threadContext.stashContext().use { - if (!validateUserConfiguration(user, filterByEnabled, actionListener)) { - return - } - client.get(getRequest, ActionListener.wrap(::onGetPolicyResponse, ::onFailure)) + if (!validateUserConfiguration(user, filterByEnabled, actionListener)) { + return } + client.get(getRequest, ActionListener.wrap(::onGetPolicyResponse, ::onFailure)) } @Suppress("ReturnCount") diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/deletepolicy/TransportDeletePolicyAction.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/deletepolicy/TransportDeletePolicyAction.kt index 16ff56ff1..bf3115a79 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/deletepolicy/TransportDeletePolicyAction.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/deletepolicy/TransportDeletePolicyAction.kt @@ -26,21 +26,21 @@ import org.opensearch.indexmanagement.IndexManagementPlugin import org.opensearch.indexmanagement.indexstatemanagement.model.Policy import org.opensearch.indexmanagement.opensearchapi.parseFromGetResponse import org.opensearch.indexmanagement.settings.IndexManagementSettings +import org.opensearch.indexmanagement.util.PluginClient import org.opensearch.indexmanagement.util.SecurityUtils.Companion.buildUser import org.opensearch.indexmanagement.util.SecurityUtils.Companion.userHasPermissionForResource import org.opensearch.tasks.Task import org.opensearch.transport.TransportService import org.opensearch.transport.client.Client -import org.opensearch.transport.client.node.NodeClient import java.lang.IllegalArgumentException private val log = LogManager.getLogger(TransportDeletePolicyAction::class.java) -@Suppress("ReturnCount") +@Suppress("ReturnCount", "LongParameterList") class TransportDeletePolicyAction @Inject constructor( - val client: NodeClient, + val client: PluginClient, transportService: TransportService, actionFilters: ActionFilters, val clusterService: ClusterService, @@ -73,9 +73,7 @@ constructor( ConfigConstants.OPENSEARCH_SECURITY_USER_INFO_THREAD_CONTEXT, )}", ) - client.threadPool().threadContext.stashContext().use { - getPolicy() - } + getPolicy() } private fun getPolicy() { @@ -115,9 +113,7 @@ constructor( DeleteRequest(IndexManagementPlugin.INDEX_MANAGEMENT_INDEX, request.policyID) .setRefreshPolicy(request.refreshPolicy) - client.threadPool().threadContext.stashContext().use { - client.delete(deleteRequest, actionListener) - } + client.delete(deleteRequest, actionListener) } } } diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/explain/TransportExplainAction.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/explain/TransportExplainAction.kt index 330b922a7..7e428d551 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/explain/TransportExplainAction.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/explain/TransportExplainAction.kt @@ -55,6 +55,7 @@ import org.opensearch.indexmanagement.opensearchapi.suspendUntil import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ISMIndexMetadata import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ManagedIndexMetaData import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ValidationResult +import org.opensearch.indexmanagement.util.PluginClient import org.opensearch.indexmanagement.util.SecurityUtils.Companion.buildUser import org.opensearch.search.SearchHit import org.opensearch.search.builder.SearchSourceBuilder @@ -73,7 +74,7 @@ typealias ManagedIndexConfigDocUUID = String typealias ManagedIndexMetadataDocUUID = String // managedIndexMetadataID(indexUuid) -> #metadata typealias ManagedIndexMetadataMap = Map -@Suppress("SpreadOperator", "TooManyFunctions", "UnusedPrivateMember") +@Suppress("SpreadOperator", "TooManyFunctions", "UnusedPrivateMember", "LongParameterList") class TransportExplainAction @Inject constructor( @@ -83,6 +84,7 @@ constructor( val clusterService: ClusterService, val xContentRegistry: NamedXContentRegistry, val indexMetadataProvider: IndexMetadataProvider, + val pluginClient: PluginClient, ) : HandledTransportAction( ExplainAction.NAME, transportService, actionFilters, ::ExplainRequest, ) { @@ -184,80 +186,79 @@ constructor( } private fun searchForMetadata(searchRequest: SearchRequest) { - client.threadPool().threadContext.stashContext().use { threadContext -> - client.search( - searchRequest, - object : ActionListener { - override fun onResponse(response: SearchResponse) { - val totalHits = response.hits.totalHits - if (totalHits != null) { - totalManagedIndices = totalHits.value.toInt() - } + val threadContext = client.threadPool().threadContext.newStoredContext(true) + pluginClient.search( + searchRequest, + object : ActionListener { + override fun onResponse(response: SearchResponse) { + val totalHits = response.hits.totalHits + if (totalHits != null) { + totalManagedIndices = totalHits.value.toInt() + } - parseSearchHits(response.hits.hits).forEach { managedIndex -> - managedIndices.add(managedIndex.index) - enabledState[managedIndex.index] = managedIndex.enabled - managedIndicesMetaDataMap[managedIndex.index] = - mapOf( - "index" to managedIndex.index, - "index_uuid" to managedIndex.indexUuid, - "policy_id" to managedIndex.policyID, - "enabled" to managedIndex.enabled.toString(), - ) - if (showPolicy) { - managedIndex.policy.let { appliedPolicies[managedIndex.index] = it } - } - if (validateAction) { - managedIndex.policy.let { policiesforValidation[managedIndex.index] = it } - } + parseSearchHits(response.hits.hits).forEach { managedIndex -> + managedIndices.add(managedIndex.index) + enabledState[managedIndex.index] = managedIndex.enabled + managedIndicesMetaDataMap[managedIndex.index] = + mapOf( + "index" to managedIndex.index, + "index_uuid" to managedIndex.indexUuid, + "policy_id" to managedIndex.policyID, + "enabled" to managedIndex.enabled.toString(), + ) + if (showPolicy) { + managedIndex.policy.let { appliedPolicies[managedIndex.index] = it } } - - // explain all only return managed indices - if (explainAll) { - if (managedIndices.size == 0) { - // edge case: if specify query param pagination size to be 0 - // we still show total managed indices - indexNames.clear() - sendResponse( - indexNames, indexMetadatas, indexPolicyIDs, enabledState, - totalManagedIndices, appliedPolicies, validationResults, - ) - return - } else { - // Clear and add the managedIndices from the response to preserve the sort order and size - indexNames.clear() - indexNames.addAll(managedIndices) - // Remove entries in case they were limited due to request size - indexNamesToUUIDs.filterKeys { indexNames.contains(it) } - getMetadata(indexNames, threadContext) - return - } + if (validateAction) { + managedIndex.policy.let { policiesforValidation[managedIndex.index] = it } } - - // explain/{index} return results for all indices - getMetadata(indexNames, threadContext) } - override fun onFailure(t: Exception) { - if (t is IndexNotFoundException) { - // config index hasn't been initialized - // show all requested indices not managed - if (!explainAll) { - getMetadata(indexNames, threadContext) - return - } + // explain all only return managed indices + if (explainAll) { + if (managedIndices.size == 0) { + // edge case: if specify query param pagination size to be 0 + // we still show total managed indices indexNames.clear() sendResponse( - indexNames, indexMetadatas, indexPolicyIDs, - enabledState, totalManagedIndices, appliedPolicies, validationResults, + indexNames, indexMetadatas, indexPolicyIDs, enabledState, + totalManagedIndices, appliedPolicies, validationResults, ) return + } else { + // Clear and add the managedIndices from the response to preserve the sort order and size + indexNames.clear() + indexNames.addAll(managedIndices) + // Remove entries in case they were limited due to request size + indexNamesToUUIDs.filterKeys { indexNames.contains(it) } + getMetadata(indexNames, threadContext) + return } - actionListener.onFailure(ExceptionsHelper.unwrapCause(t) as Exception) } - }, - ) - } + + // explain/{index} return results for all indices + getMetadata(indexNames, threadContext) + } + + override fun onFailure(t: Exception) { + if (t is IndexNotFoundException) { + // config index hasn't been initialized + // show all requested indices not managed + if (!explainAll) { + getMetadata(indexNames, threadContext) + return + } + indexNames.clear() + sendResponse( + indexNames, indexMetadatas, indexPolicyIDs, + enabledState, totalManagedIndices, appliedPolicies, validationResults, + ) + return + } + actionListener.onFailure(ExceptionsHelper.unwrapCause(t) as Exception) + } + }, + ) } @Suppress("SpreadOperator") @@ -270,14 +271,13 @@ constructor( .local(request.local) .clusterManagerNodeTimeout(request.clusterManagerTimeout) - client.admin().cluster().state( + pluginClient.admin().cluster().state( clusterStateRequest, object : ActionListener { override fun onResponse(response: ClusterStateResponse) { val clusterStateIndexMetadatas = response.state.metadata.indices getMetadataMap(clusterStateIndexMetadatas, threadContext) } - override fun onFailure(t: Exception) { actionListener.onFailure(ExceptionsHelper.unwrapCause(t) as Exception) } @@ -293,7 +293,7 @@ constructor( indexNamesToUUIDs.values.forEach { uuid -> mgetMetadataReq.add(MultiGetRequest.Item(INDEX_MANAGEMENT_INDEX, managedIndexMetadataID(uuid)).routing(uuid)) } - client.multiGet( + pluginClient.multiGet( mgetMetadataReq, object : ActionListener { override fun onResponse(response: MultiGetResponse) { @@ -388,12 +388,11 @@ constructor( if (user == null || indexNames.isEmpty()) { sendResponse(indexNames, indexMetadatas, indexPolicyIDs, enabledState, totalManagedIndices, appliedPolicies, validationResults) } else { - filterAndSendResponse(threadContext) + filterAndSendResponse() } } - private fun filterAndSendResponse(threadContext: ThreadContext.StoredContext) { - threadContext.restore() + private fun filterAndSendResponse() { val filteredIndices = mutableListOf() val filteredMetadata = mutableListOf() val filteredValidationResult = mutableListOf() diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/getpolicy/TransportGetPoliciesAction.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/getpolicy/TransportGetPoliciesAction.kt index e1632e026..eb438826b 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/getpolicy/TransportGetPoliciesAction.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/getpolicy/TransportGetPoliciesAction.kt @@ -25,20 +25,21 @@ import org.opensearch.indexmanagement.IndexManagementPlugin.Companion.INDEX_MANA import org.opensearch.indexmanagement.indexstatemanagement.model.Policy import org.opensearch.indexmanagement.opensearchapi.parseFromSearchResponse import org.opensearch.indexmanagement.settings.IndexManagementSettings +import org.opensearch.indexmanagement.util.PluginClient import org.opensearch.indexmanagement.util.SecurityUtils.Companion.addUserFilter import org.opensearch.indexmanagement.util.SecurityUtils.Companion.buildUser import org.opensearch.search.builder.SearchSourceBuilder import org.opensearch.tasks.Task import org.opensearch.transport.TransportService -import org.opensearch.transport.client.Client private val log = LogManager.getLogger(TransportGetPoliciesAction::class.java) +@Suppress("LongParameterList") class TransportGetPoliciesAction @Inject constructor( transportService: TransportService, - val client: Client, + val client: PluginClient, actionFilters: ActionFilters, val clusterService: ClusterService, val settings: Settings, @@ -97,26 +98,24 @@ constructor( .indices(INDEX_MANAGEMENT_INDEX) .preference(Preference.PRIMARY_FIRST.type()) - client.threadPool().threadContext.stashContext().use { - client.search( - searchRequest, - object : ActionListener { - override fun onResponse(response: SearchResponse) { - val totalPolicies = response.hits.totalHits?.value ?: 0 - val policies = parseFromSearchResponse(response, xContentRegistry, Policy.Companion::parse) - actionListener.onResponse(GetPoliciesResponse(policies, totalPolicies.toInt())) - } + client.search( + searchRequest, + object : ActionListener { + override fun onResponse(response: SearchResponse) { + val totalPolicies = response.hits.totalHits?.value ?: 0 + val policies = parseFromSearchResponse(response, xContentRegistry, Policy.Companion::parse) + actionListener.onResponse(GetPoliciesResponse(policies, totalPolicies.toInt())) + } - override fun onFailure(t: Exception) { - if (t is IndexNotFoundException) { - // config index hasn't been initialized, catch this here and show empty result on Kibana - actionListener.onResponse(GetPoliciesResponse(emptyList(), 0)) - return - } - actionListener.onFailure(ExceptionsHelper.unwrapCause(t) as Exception) + override fun onFailure(t: Exception) { + if (t is IndexNotFoundException) { + // config index hasn't been initialized, catch this here and show empty result on Kibana + actionListener.onResponse(GetPoliciesResponse(emptyList(), 0)) + return } - }, - ) - } + actionListener.onFailure(ExceptionsHelper.unwrapCause(t) as Exception) + } + }, + ) } } diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/getpolicy/TransportGetPolicyAction.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/getpolicy/TransportGetPolicyAction.kt index c8938ebbf..65583e6b9 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/getpolicy/TransportGetPolicyAction.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/getpolicy/TransportGetPolicyAction.kt @@ -24,18 +24,18 @@ import org.opensearch.indexmanagement.IndexManagementPlugin import org.opensearch.indexmanagement.indexstatemanagement.model.Policy import org.opensearch.indexmanagement.opensearchapi.parseFromGetResponse import org.opensearch.indexmanagement.settings.IndexManagementSettings.Companion.FILTER_BY_BACKEND_ROLES +import org.opensearch.indexmanagement.util.PluginClient import org.opensearch.indexmanagement.util.SecurityUtils.Companion.buildUser import org.opensearch.indexmanagement.util.SecurityUtils.Companion.userHasPermissionForResource import org.opensearch.tasks.Task import org.opensearch.transport.TransportService -import org.opensearch.transport.client.node.NodeClient import java.lang.IllegalArgumentException -@Suppress("ReturnCount") +@Suppress("ReturnCount", "LongParameterList") class TransportGetPolicyAction @Inject constructor( - val client: NodeClient, + val client: PluginClient, transportService: TransportService, actionFilters: ActionFilters, val clusterService: ClusterService, @@ -58,7 +58,7 @@ constructor( } inner class GetPolicyHandler( - private val client: NodeClient, + private val client: PluginClient, private val actionListener: ActionListener, private val request: GetPolicyRequest, private val user: User? = buildUser(client.threadPool().threadContext), @@ -73,20 +73,18 @@ constructor( GetRequest(IndexManagementPlugin.INDEX_MANAGEMENT_INDEX, request.policyID) .version(request.version) - client.threadPool().threadContext.stashContext().use { - client.get( - getRequest, - object : ActionListener { - override fun onResponse(response: GetResponse) { - onGetResponse(response) - } + client.get( + getRequest, + object : ActionListener { + override fun onResponse(response: GetResponse) { + onGetResponse(response) + } - override fun onFailure(t: Exception) { - actionListener.onFailure(ExceptionsHelper.unwrapCause(t) as Exception) - } - }, - ) - } + override fun onFailure(t: Exception) { + actionListener.onFailure(ExceptionsHelper.unwrapCause(t) as Exception) + } + }, + ) } fun onGetResponse(response: GetResponse) { diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/indexpolicy/TransportIndexPolicyAction.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/indexpolicy/TransportIndexPolicyAction.kt index bf3a56fbe..cbf88997e 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/indexpolicy/TransportIndexPolicyAction.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/indexpolicy/TransportIndexPolicyAction.kt @@ -47,6 +47,7 @@ import org.opensearch.indexmanagement.opensearchapi.parseFromSearchResponse import org.opensearch.indexmanagement.settings.IndexManagementSettings import org.opensearch.indexmanagement.util.IndexManagementException import org.opensearch.indexmanagement.util.IndexUtils +import org.opensearch.indexmanagement.util.PluginClient import org.opensearch.indexmanagement.util.SecurityUtils.Companion.buildUser import org.opensearch.indexmanagement.util.SecurityUtils.Companion.validateUserConfiguration import org.opensearch.search.builder.SearchSourceBuilder @@ -68,6 +69,7 @@ constructor( val settings: Settings, val xContentRegistry: NamedXContentRegistry, var awarenessReplicaBalance: AwarenessReplicaBalance, + val pluginClient: PluginClient, ) : HandledTransportAction( IndexPolicyAction.NAME, transportService, actionFilters, ::IndexPolicyRequest, ) { @@ -97,26 +99,24 @@ constructor( ConfigConstants.OPENSEARCH_SECURITY_USER_INFO_THREAD_CONTEXT, )}", ) - client.threadPool().threadContext.stashContext().use { - if (!validateUserConfiguration(user, filterByEnabled, actionListener)) { - return - } - ismIndices.checkAndUpdateIMConfigIndex( - object : ActionListener { - override fun onResponse(response: AcknowledgedResponse) { - onCreateMappingsResponse(response) - } + if (!validateUserConfiguration(user, filterByEnabled, actionListener)) { + return + } + ismIndices.checkAndUpdateIMConfigIndex( + object : ActionListener { + override fun onResponse(response: AcknowledgedResponse) { + onCreateMappingsResponse(response) + } - override fun onFailure(t: Exception) { - if (t is ResourceAlreadyExistsException) { - actionListener.onFailure(OpenSearchStatusException(t.localizedMessage, RestStatus.CONFLICT)) - } else { - actionListener.onFailure(ExceptionsHelper.unwrapCause(t) as Exception) - } + override fun onFailure(t: Exception) { + if (t is ResourceAlreadyExistsException) { + actionListener.onFailure(OpenSearchStatusException(t.localizedMessage, RestStatus.CONFLICT)) + } else { + actionListener.onFailure(ExceptionsHelper.unwrapCause(t) as Exception) } - }, - ) - } + } + }, + ) } @Suppress("ComplexMethod", "LongMethod", "NestedBlockDepth") @@ -189,7 +189,7 @@ constructor( .indices(IndexManagementPlugin.INDEX_MANAGEMENT_INDEX) .preference(Preference.PRIMARY_FIRST.type()) - client.search( + pluginClient.search( searchRequest, object : ActionListener { override fun onResponse(response: SearchResponse) { @@ -246,7 +246,7 @@ constructor( .setIfPrimaryTerm(request.primaryTerm) } - client.index( + pluginClient.index( indexRequest, object : ActionListener { override fun onResponse(response: IndexResponse) { diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/removepolicy/TransportRemovePolicyAction.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/removepolicy/TransportRemovePolicyAction.kt index 36f2ad3b6..9b897ce51 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/removepolicy/TransportRemovePolicyAction.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/removepolicy/TransportRemovePolicyAction.kt @@ -50,16 +50,16 @@ import org.opensearch.indexmanagement.indexstatemanagement.util.deleteManagedInd import org.opensearch.indexmanagement.indexstatemanagement.util.deleteManagedIndexRequest import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ISMIndexMetadata import org.opensearch.indexmanagement.util.IndexManagementException +import org.opensearch.indexmanagement.util.PluginClient import org.opensearch.indexmanagement.util.SecurityUtils.Companion.buildUser import org.opensearch.tasks.Task import org.opensearch.transport.TransportService -import org.opensearch.transport.client.node.NodeClient @Suppress("SpreadOperator") class TransportRemovePolicyAction @Inject constructor( - val client: NodeClient, + val client: PluginClient, transportService: TransportService, actionFilters: ActionFilters, val indexMetadataProvider: IndexMetadataProvider, @@ -73,7 +73,7 @@ constructor( } inner class RemovePolicyHandler( - private val client: NodeClient, + private val client: PluginClient, private val actionListener: ActionListener, private val request: RemovePolicyRequest, private val user: User? = buildUser(client.threadPool().threadContext), @@ -155,42 +155,39 @@ constructor( .metadata(true) .local(false) .indicesOptions(strictExpandOptions) - - client.threadPool().threadContext.stashContext().use { - client.admin() - .cluster() - .state( - clusterStateRequest, - object : ActionListener { - override fun onResponse(response: ClusterStateResponse) { - val indexMetadatas = response.state.metadata.indices - indexMetadatas.forEach { - if (it.value.settings.get(ManagedIndexSettings.AUTO_MANAGE.key) == "false") { - indicesWithAutoManageFalseBlock.add(it.value.indexUUID) - } - if (it.value.settings.get(SETTING_READ_ONLY) == "true") { - indicesWithReadOnlyBlock.add(it.value.indexUUID) - } - if (it.value.settings.get(SETTING_READ_ONLY_ALLOW_DELETE) == "true") { - indicesWithReadOnlyAllowDeleteBlock.add(it.value.indexUUID) - } + client.admin() + .cluster() + .state( + clusterStateRequest, + object : ActionListener { + override fun onResponse(response: ClusterStateResponse) { + val indexMetadatas = response.state.metadata.indices + indexMetadatas.forEach { + if (it.value.settings.get(ManagedIndexSettings.AUTO_MANAGE.key) == "false") { + indicesWithAutoManageFalseBlock.add(it.value.indexUUID) } - - val defaultIndexMetadataService = indexMetadataProvider.services[DEFAULT_INDEX_TYPE] as DefaultIndexMetadataService - getUuidsForClosedIndices(response.state, defaultIndexMetadataService).forEach { - failedIndices.add(FailedIndex(indicesToRemove[it] as String, it, "This index is closed")) - indicesToRemove.remove(it) + if (it.value.settings.get(SETTING_READ_ONLY) == "true") { + indicesWithReadOnlyBlock.add(it.value.indexUUID) + } + if (it.value.settings.get(SETTING_READ_ONLY_ALLOW_DELETE) == "true") { + indicesWithReadOnlyAllowDeleteBlock.add(it.value.indexUUID) } - - getExistingManagedIndices() } - override fun onFailure(t: Exception) { - actionListener.onFailure(ExceptionsHelper.unwrapCause(t) as Exception) + val defaultIndexMetadataService = indexMetadataProvider.services[DEFAULT_INDEX_TYPE] as DefaultIndexMetadataService + getUuidsForClosedIndices(response.state, defaultIndexMetadataService).forEach { + failedIndices.add(FailedIndex(indicesToRemove[it] as String, it, "This index is closed")) + indicesToRemove.remove(it) } - }, - ) - } + + getExistingManagedIndices() + } + + override fun onFailure(t: Exception) { + actionListener.onFailure(ExceptionsHelper.unwrapCause(t) as Exception) + } + }, + ) } private fun getExistingManagedIndices() { diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/retryfailedmanagedindex/TransportRetryFailedManagedIndexAction.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/retryfailedmanagedindex/TransportRetryFailedManagedIndexAction.kt index e43432d71..55e897b73 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/retryfailedmanagedindex/TransportRetryFailedManagedIndexAction.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/retryfailedmanagedindex/TransportRetryFailedManagedIndexAction.kt @@ -21,6 +21,7 @@ import org.opensearch.action.get.MultiGetResponse import org.opensearch.action.support.ActionFilters import org.opensearch.action.support.HandledTransportAction import org.opensearch.action.support.IndicesOptions +import org.opensearch.action.support.WriteRequest import org.opensearch.action.support.clustermanager.AcknowledgedResponse import org.opensearch.action.update.UpdateRequest import org.opensearch.cluster.block.ClusterBlockException @@ -51,6 +52,7 @@ import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ISMIndexMet import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ManagedIndexMetaData import org.opensearch.indexmanagement.spi.indexstatemanagement.model.PolicyRetryInfoMetaData import org.opensearch.indexmanagement.util.IndexManagementException +import org.opensearch.indexmanagement.util.PluginClient import org.opensearch.indexmanagement.util.SecurityUtils.Companion.buildUser import org.opensearch.tasks.Task import org.opensearch.transport.TransportService @@ -66,6 +68,7 @@ constructor( transportService: TransportService, actionFilters: ActionFilters, val indexMetadataProvider: IndexMetadataProvider, + val pluginClient: PluginClient, ) : HandledTransportAction( RetryFailedManagedIndexAction.NAME, transportService, actionFilters, ::RetryFailedManagedIndexRequest, ) { @@ -161,34 +164,32 @@ constructor( .clusterManagerNodeTimeout(request.clusterManagerTimeout) .indicesOptions(strictExpandIndicesOptions) - client.threadPool().threadContext.stashContext().use { - client.admin() - .cluster() - .state( - clusterStateRequest, - object : ActionListener { - override fun onResponse(response: ClusterStateResponse) { - val defaultIndexMetadataService = indexMetadataProvider.services[DEFAULT_INDEX_TYPE] as DefaultIndexMetadataService - response.state.metadata.indices.forEach { - val indexUUID = defaultIndexMetadataService.getIndexUUID(it.value) - indexUuidToIndexMetadata[indexUUID] = it.value - } - processResponse() + pluginClient.admin() + .cluster() + .state( + clusterStateRequest, + object : ActionListener { + override fun onResponse(response: ClusterStateResponse) { + val defaultIndexMetadataService = indexMetadataProvider.services[DEFAULT_INDEX_TYPE] as DefaultIndexMetadataService + response.state.metadata.indices.forEach { + val indexUUID = defaultIndexMetadataService.getIndexUUID(it.value) + indexUuidToIndexMetadata[indexUUID] = it.value } + processResponse() + } - override fun onFailure(t: Exception) { - actionListener.onFailure(ExceptionsHelper.unwrapCause(t) as Exception) - } - }, - ) - } + override fun onFailure(t: Exception) { + actionListener.onFailure(ExceptionsHelper.unwrapCause(t) as Exception) + } + }, + ) } private fun processResponse() { val mReq = MultiGetRequest() indicesToRetry.map { it.key }.forEach { mReq.add(INDEX_MANAGEMENT_INDEX, it) } - client.multiGet( + pluginClient.multiGet( mReq, object : ActionListener { override fun onResponse(response: MultiGetResponse) { @@ -207,7 +208,7 @@ constructor( } // get back metadata from config index - client.multiGet( + pluginClient.multiGet( buildMgetMetadataRequest(indicesToRetry.toList().map { it.first }), ActionListener.wrap(::onMgetMetadataResponse, ::onFailure), ) @@ -293,8 +294,9 @@ constructor( UpdateRequest(INDEX_MANAGEMENT_INDEX, managedIndexMetadataID(index.uuid)).routing(index.uuid).doc(builder) } val bulkUpdateMetadataRequest = BulkRequest().add(updateMetadataRequests) + bulkUpdateMetadataRequest.refreshPolicy = WriteRequest.RefreshPolicy.IMMEDIATE - client.bulk(bulkUpdateMetadataRequest, ActionListener.wrap(::onBulkUpdateMetadataResponse, ::onFailure)) + pluginClient.bulk(bulkUpdateMetadataRequest, ActionListener.wrap(::onBulkUpdateMetadataResponse, ::onFailure)) } else { actionListener.onResponse(ISMStatusResponse(0, failedIndices)) } diff --git a/src/main/kotlin/org/opensearch/indexmanagement/opensearchapi/OpenSearchExtensions.kt b/src/main/kotlin/org/opensearch/indexmanagement/opensearchapi/OpenSearchExtensions.kt index 4b84d341e..c662efa74 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/opensearchapi/OpenSearchExtensions.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/opensearchapi/OpenSearchExtensions.kt @@ -62,8 +62,6 @@ import kotlin.coroutines.resume import kotlin.coroutines.resumeWithException import kotlin.coroutines.suspendCoroutine -const val OPENDISTRO_SECURITY_PROTECTED_INDICES_CONF_REQUEST = "_opendistro_security_protected_indices_conf_request" - fun contentParser( bytesReference: BytesReference, xContentRegistry: NamedXContentRegistry = NamedXContentRegistry.EMPTY, diff --git a/src/main/kotlin/org/opensearch/indexmanagement/rollup/action/delete/TransportDeleteRollupAction.kt b/src/main/kotlin/org/opensearch/indexmanagement/rollup/action/delete/TransportDeleteRollupAction.kt index 58863f7da..e91fcef64 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/rollup/action/delete/TransportDeleteRollupAction.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/rollup/action/delete/TransportDeleteRollupAction.kt @@ -26,6 +26,7 @@ import org.opensearch.indexmanagement.IndexManagementPlugin.Companion.INDEX_MANA import org.opensearch.indexmanagement.rollup.model.Rollup import org.opensearch.indexmanagement.rollup.util.parseRollup import org.opensearch.indexmanagement.settings.IndexManagementSettings +import org.opensearch.indexmanagement.util.PluginClient import org.opensearch.indexmanagement.util.SecurityUtils import org.opensearch.indexmanagement.util.SecurityUtils.Companion.userHasPermissionForResource import org.opensearch.tasks.Task @@ -33,7 +34,7 @@ import org.opensearch.transport.TransportService import org.opensearch.transport.client.Client import java.lang.Exception -@Suppress("ReturnCount") +@Suppress("ReturnCount", "LongParameterList") class TransportDeleteRollupAction @Inject constructor( @@ -43,6 +44,7 @@ constructor( val settings: Settings, actionFilters: ActionFilters, val xContentRegistry: NamedXContentRegistry, + val pluginClient: PluginClient, ) : HandledTransportAction( DeleteRollupAction.NAME, transportService, actionFilters, ::DeleteRollupRequest, ) { @@ -71,14 +73,12 @@ constructor( ConfigConstants.OPENSEARCH_SECURITY_USER_INFO_THREAD_CONTEXT, )}", ) - client.threadPool().threadContext.stashContext().use { - getRollup() - } + getRollup() } private fun getRollup() { val getRequest = GetRequest(INDEX_MANAGEMENT_INDEX, request.id()) - client.get( + pluginClient.get( getRequest, object : ActionListener { override fun onResponse(response: GetResponse) { @@ -112,9 +112,7 @@ constructor( val deleteRequest = DeleteRequest(INDEX_MANAGEMENT_INDEX, request.id()) .setRefreshPolicy(request.refreshPolicy) - client.threadPool().threadContext.stashContext().use { - client.delete(deleteRequest, actionListener) - } + pluginClient.delete(deleteRequest, actionListener) } } } diff --git a/src/main/kotlin/org/opensearch/indexmanagement/rollup/action/explain/TransportExplainRollupAction.kt b/src/main/kotlin/org/opensearch/indexmanagement/rollup/action/explain/TransportExplainRollupAction.kt index f26f416df..9520593f4 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/rollup/action/explain/TransportExplainRollupAction.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/rollup/action/explain/TransportExplainRollupAction.kt @@ -28,6 +28,7 @@ import org.opensearch.indexmanagement.rollup.model.ExplainRollup import org.opensearch.indexmanagement.rollup.model.Rollup import org.opensearch.indexmanagement.rollup.model.RollupMetadata import org.opensearch.indexmanagement.settings.IndexManagementSettings +import org.opensearch.indexmanagement.util.PluginClient import org.opensearch.indexmanagement.util.SecurityUtils.Companion.addUserFilter import org.opensearch.indexmanagement.util.SecurityUtils.Companion.buildUser import org.opensearch.search.builder.SearchSourceBuilder @@ -45,6 +46,7 @@ constructor( val settings: Settings, val clusterService: ClusterService, actionFilters: ActionFilters, + val pluginClient: PluginClient, ) : HandledTransportAction( ExplainRollupAction.NAME, transportService, actionFilters, ::ExplainRollupRequest, ) { @@ -79,74 +81,72 @@ constructor( addUserFilter(user, queryBuilder, filterByEnabled, "rollup.user") val searchRequest = SearchRequest(INDEX_MANAGEMENT_INDEX).source(SearchSourceBuilder().size(MAX_HITS).query(queryBuilder)) - client.threadPool().threadContext.stashContext().use { - client.search( - searchRequest, - object : ActionListener { - override fun onResponse(response: SearchResponse) { - try { - response.hits.hits.forEach { - val rollup = contentParser(it.sourceRef).parseWithType(it.id, it.seqNo, it.primaryTerm, Rollup.Companion::parse) - idsToExplain[rollup.id] = ExplainRollup(metadataID = rollup.metadataID) - } - } catch (e: Exception) { - log.error("Failed to parse explain response", e) - actionListener.onFailure(e) - return + pluginClient.search( + searchRequest, + object : ActionListener { + override fun onResponse(response: SearchResponse) { + try { + response.hits.hits.forEach { + val rollup = contentParser(it.sourceRef).parseWithType(it.id, it.seqNo, it.primaryTerm, Rollup.Companion::parse) + idsToExplain[rollup.id] = ExplainRollup(metadataID = rollup.metadataID) } + } catch (e: Exception) { + log.error("Failed to parse explain response", e) + actionListener.onFailure(e) + return + } - val metadataIds = idsToExplain.values.mapNotNull { it?.metadataID } - val metadataSearchRequest = - SearchRequest(INDEX_MANAGEMENT_INDEX) - .source(SearchSourceBuilder().size(MAX_HITS).query(IdsQueryBuilder().addIds(*metadataIds.toTypedArray()))) - client.search( - metadataSearchRequest, - object : ActionListener { - override fun onResponse(response: SearchResponse) { - try { - response.hits.hits.forEach { - val metadata = - contentParser(it.sourceRef) - .parseWithType(it.id, it.seqNo, it.primaryTerm, RollupMetadata.Companion::parse) - idsToExplain.computeIfPresent(metadata.rollupID) { - _, - explainRollup, - -> - explainRollup.copy(metadata = metadata) - } + val metadataIds = idsToExplain.values.mapNotNull { it?.metadataID } + val metadataSearchRequest = + SearchRequest(INDEX_MANAGEMENT_INDEX) + .source(SearchSourceBuilder().size(MAX_HITS).query(IdsQueryBuilder().addIds(*metadataIds.toTypedArray()))) + pluginClient.search( + metadataSearchRequest, + object : ActionListener { + override fun onResponse(response: SearchResponse) { + try { + response.hits.hits.forEach { + val metadata = + contentParser(it.sourceRef) + .parseWithType(it.id, it.seqNo, it.primaryTerm, RollupMetadata.Companion::parse) + idsToExplain.computeIfPresent(metadata.rollupID) { + _, + explainRollup, + -> + explainRollup.copy(metadata = metadata) } - actionListener.onResponse(ExplainRollupResponse(idsToExplain.toMap())) - } catch (e: Exception) { - log.error("Failed to parse rollup metadata", e) - actionListener.onFailure(e) - return } + actionListener.onResponse(ExplainRollupResponse(idsToExplain.toMap())) + } catch (e: Exception) { + log.error("Failed to parse rollup metadata", e) + actionListener.onFailure(e) + return } + } - override fun onFailure(e: Exception) { - log.error("Failed to search rollup metadata", e) - when (e) { - is RemoteTransportException -> actionListener.onFailure(ExceptionsHelper.unwrapCause(e) as Exception) - else -> actionListener.onFailure(e) - } + override fun onFailure(e: Exception) { + log.error("Failed to search rollup metadata", e) + when (e) { + is RemoteTransportException -> actionListener.onFailure(ExceptionsHelper.unwrapCause(e) as Exception) + else -> actionListener.onFailure(e) } - }, - ) - } - - override fun onFailure(e: Exception) { - log.error("Failed to search for rollups", e) - when (e) { - is ResourceNotFoundException -> { - val nonWildcardIds = ids.filter { !it.contains("*") }.map { it to null }.toMap(mutableMapOf()) - actionListener.onResponse(ExplainRollupResponse(nonWildcardIds)) } - is RemoteTransportException -> actionListener.onFailure(ExceptionsHelper.unwrapCause(e) as Exception) - else -> actionListener.onFailure(e) + }, + ) + } + + override fun onFailure(e: Exception) { + log.error("Failed to search for rollups", e) + when (e) { + is ResourceNotFoundException -> { + val nonWildcardIds = ids.filter { !it.contains("*") }.map { it to null }.toMap(mutableMapOf()) + actionListener.onResponse(ExplainRollupResponse(nonWildcardIds)) } + is RemoteTransportException -> actionListener.onFailure(ExceptionsHelper.unwrapCause(e) as Exception) + else -> actionListener.onFailure(e) } - }, - ) - } + } + }, + ) } } diff --git a/src/main/kotlin/org/opensearch/indexmanagement/rollup/action/get/TransportGetRollupAction.kt b/src/main/kotlin/org/opensearch/indexmanagement/rollup/action/get/TransportGetRollupAction.kt index a80279f41..9fb034db4 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/rollup/action/get/TransportGetRollupAction.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/rollup/action/get/TransportGetRollupAction.kt @@ -22,6 +22,7 @@ import org.opensearch.indexmanagement.IndexManagementPlugin.Companion.INDEX_MANA import org.opensearch.indexmanagement.rollup.model.Rollup import org.opensearch.indexmanagement.rollup.util.parseRollup import org.opensearch.indexmanagement.settings.IndexManagementSettings +import org.opensearch.indexmanagement.util.PluginClient import org.opensearch.indexmanagement.util.SecurityUtils import org.opensearch.indexmanagement.util.SecurityUtils.Companion.buildUser import org.opensearch.tasks.Task @@ -29,6 +30,7 @@ import org.opensearch.transport.TransportService import org.opensearch.transport.client.Client import java.lang.Exception +@Suppress("LongParameterList") class TransportGetRollupAction @Inject constructor( @@ -38,6 +40,7 @@ constructor( val settings: Settings, val clusterService: ClusterService, val xContentRegistry: NamedXContentRegistry, + val pluginClient: PluginClient, ) : HandledTransportAction( GetRollupAction.NAME, transportService, actionFilters, ::GetRollupRequest, ) { @@ -59,41 +62,39 @@ constructor( ) val getRequest = GetRequest(INDEX_MANAGEMENT_INDEX, request.id).preference(request.preference) val user = buildUser(client.threadPool().threadContext) - client.threadPool().threadContext.stashContext().use { - client.get( - getRequest, - object : ActionListener { - override fun onResponse(response: GetResponse) { - if (!response.isExists) { - return listener.onFailure(OpenSearchStatusException("Rollup not found", RestStatus.NOT_FOUND)) - } - - val rollup: Rollup? - try { - rollup = parseRollup(response, xContentRegistry) - } catch (e: IllegalArgumentException) { - listener.onFailure(OpenSearchStatusException("Rollup not found", RestStatus.NOT_FOUND)) - return - } - if (!SecurityUtils.userHasPermissionForResource(user, rollup.user, filterByEnabled, "rollup", request.id, listener)) { - return - } else { - // if HEAD request don't return the rollup - val rollupResponse = - if (request.srcContext != null && !request.srcContext.fetchSource()) { - GetRollupResponse(response.id, response.version, response.seqNo, response.primaryTerm, RestStatus.OK, null) - } else { - GetRollupResponse(response.id, response.version, response.seqNo, response.primaryTerm, RestStatus.OK, rollup) - } - listener.onResponse(rollupResponse) - } + pluginClient.get( + getRequest, + object : ActionListener { + override fun onResponse(response: GetResponse) { + if (!response.isExists) { + return listener.onFailure(OpenSearchStatusException("Rollup not found", RestStatus.NOT_FOUND)) } - override fun onFailure(e: Exception) { - listener.onFailure(e) + val rollup: Rollup? + try { + rollup = parseRollup(response, xContentRegistry) + } catch (e: IllegalArgumentException) { + listener.onFailure(OpenSearchStatusException("Rollup not found", RestStatus.NOT_FOUND)) + return } - }, - ) - } + if (!SecurityUtils.userHasPermissionForResource(user, rollup.user, filterByEnabled, "rollup", request.id, listener)) { + return + } else { + // if HEAD request don't return the rollup + val rollupResponse = + if (request.srcContext != null && !request.srcContext.fetchSource()) { + GetRollupResponse(response.id, response.version, response.seqNo, response.primaryTerm, RestStatus.OK, null) + } else { + GetRollupResponse(response.id, response.version, response.seqNo, response.primaryTerm, RestStatus.OK, rollup) + } + listener.onResponse(rollupResponse) + } + } + + override fun onFailure(e: Exception) { + listener.onFailure(e) + } + }, + ) } } diff --git a/src/main/kotlin/org/opensearch/indexmanagement/rollup/action/get/TransportGetRollupsAction.kt b/src/main/kotlin/org/opensearch/indexmanagement/rollup/action/get/TransportGetRollupsAction.kt index f9c323003..370760355 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/rollup/action/get/TransportGetRollupsAction.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/rollup/action/get/TransportGetRollupsAction.kt @@ -27,6 +27,7 @@ import org.opensearch.indexmanagement.opensearchapi.contentParser import org.opensearch.indexmanagement.opensearchapi.parseWithType import org.opensearch.indexmanagement.rollup.model.Rollup import org.opensearch.indexmanagement.settings.IndexManagementSettings +import org.opensearch.indexmanagement.util.PluginClient import org.opensearch.indexmanagement.util.SecurityUtils.Companion.addUserFilter import org.opensearch.indexmanagement.util.SecurityUtils.Companion.buildUser import org.opensearch.search.builder.SearchSourceBuilder @@ -36,6 +37,7 @@ import org.opensearch.transport.TransportService import org.opensearch.transport.client.Client import kotlin.Exception +@Suppress("LongParameterList") class TransportGetRollupsAction @Inject constructor( @@ -45,6 +47,7 @@ constructor( val clusterService: ClusterService, val settings: Settings, val xContentRegistry: NamedXContentRegistry, + val pluginClient: PluginClient, ) : HandledTransportAction( GetRollupsAction.NAME, transportService, actionFilters, ::GetRollupsRequest, ) { @@ -79,37 +82,35 @@ constructor( SearchSourceBuilder().query(boolQueryBuilder).from(from).size(size).seqNoAndPrimaryTerm(true) .sort(sortField, SortOrder.fromString(sortDirection)) val searchRequest = SearchRequest(INDEX_MANAGEMENT_INDEX).source(searchSourceBuilder) - client.threadPool().threadContext.stashContext().use { - client.search( - searchRequest, - object : ActionListener { - override fun onResponse(response: SearchResponse) { - val totalRollups = response.hits.totalHits?.value ?: 0 + pluginClient.search( + searchRequest, + object : ActionListener { + override fun onResponse(response: SearchResponse) { + val totalRollups = response.hits.totalHits?.value ?: 0 - if (response.shardFailures.isNotEmpty()) { - val failure = response.shardFailures.reduce { s1, s2 -> if (s1.status().status > s2.status().status) s1 else s2 } - listener.onFailure(OpenSearchStatusException("Get rollups failed on some shards", failure.status(), failure.cause)) - } else { - try { - val rollups = - response.hits.hits.map { - contentParser(it.sourceRef).parseWithType(it.id, it.seqNo, it.primaryTerm, Rollup.Companion::parse) - } - listener.onResponse(GetRollupsResponse(rollups, totalRollups.toInt(), RestStatus.OK)) - } catch (e: Exception) { - listener.onFailure( - OpenSearchStatusException( - "Failed to parse rollups", - RestStatus.INTERNAL_SERVER_ERROR, ExceptionsHelper.unwrapCause(e), - ), - ) - } + if (response.shardFailures.isNotEmpty()) { + val failure = response.shardFailures.reduce { s1, s2 -> if (s1.status().status > s2.status().status) s1 else s2 } + listener.onFailure(OpenSearchStatusException("Get rollups failed on some shards", failure.status(), failure.cause)) + } else { + try { + val rollups = + response.hits.hits.map { + contentParser(it.sourceRef).parseWithType(it.id, it.seqNo, it.primaryTerm, Rollup.Companion::parse) + } + listener.onResponse(GetRollupsResponse(rollups, totalRollups.toInt(), RestStatus.OK)) + } catch (e: Exception) { + listener.onFailure( + OpenSearchStatusException( + "Failed to parse rollups", + RestStatus.INTERNAL_SERVER_ERROR, ExceptionsHelper.unwrapCause(e), + ), + ) } } + } - override fun onFailure(e: Exception) = listener.onFailure(e) - }, - ) - } + override fun onFailure(e: Exception) = listener.onFailure(e) + }, + ) } } diff --git a/src/main/kotlin/org/opensearch/indexmanagement/rollup/action/index/TransportIndexRollupAction.kt b/src/main/kotlin/org/opensearch/indexmanagement/rollup/action/index/TransportIndexRollupAction.kt index 293e9fff0..e924bdb7f 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/rollup/action/index/TransportIndexRollupAction.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/rollup/action/index/TransportIndexRollupAction.kt @@ -32,6 +32,7 @@ import org.opensearch.indexmanagement.rollup.util.RollupFieldValueExpressionReso import org.opensearch.indexmanagement.rollup.util.parseRollup import org.opensearch.indexmanagement.settings.IndexManagementSettings import org.opensearch.indexmanagement.util.IndexUtils +import org.opensearch.indexmanagement.util.PluginClient import org.opensearch.indexmanagement.util.SecurityUtils import org.opensearch.indexmanagement.util.SecurityUtils.Companion.buildUser import org.opensearch.indexmanagement.util.SecurityUtils.Companion.validateUserConfiguration @@ -51,6 +52,7 @@ constructor( val clusterService: ClusterService, val settings: Settings, val xContentRegistry: NamedXContentRegistry, + val pluginClient: PluginClient, ) : HandledTransportAction( IndexRollupAction.NAME, transportService, actionFilters, ::IndexRollupRequest, ) { @@ -80,12 +82,10 @@ constructor( ConfigConstants.OPENSEARCH_SECURITY_USER_INFO_THREAD_CONTEXT, )}", ) - client.threadPool().threadContext.stashContext().use { - if (!validateUserConfiguration(user, filterByEnabled, actionListener)) { - return - } - indexManagementIndices.checkAndUpdateIMConfigIndex(ActionListener.wrap(::onCreateMappingsResponse, actionListener::onFailure)) + if (!validateUserConfiguration(user, filterByEnabled, actionListener)) { + return } + indexManagementIndices.checkAndUpdateIMConfigIndex(ActionListener.wrap(::onCreateMappingsResponse, actionListener::onFailure)) } private fun onCreateMappingsResponse(response: AcknowledgedResponse) { @@ -113,7 +113,7 @@ constructor( private fun getRollup() { val getRequest = GetRequest(INDEX_MANAGEMENT_INDEX, request.rollup.id) - client.get(getRequest, ActionListener.wrap(::onGetRollup, actionListener::onFailure)) + pluginClient.get(getRequest, ActionListener.wrap(::onGetRollup, actionListener::onFailure)) } @Suppress("ReturnCount") @@ -164,7 +164,7 @@ constructor( .id(request.rollup.id) .source(rollup.toXContent(jsonBuilder(), ToXContent.EMPTY_PARAMS)) .timeout(IndexRequest.DEFAULT_TIMEOUT) - client.index( + pluginClient.index( request, object : ActionListener { override fun onResponse(response: IndexResponse) { diff --git a/src/main/kotlin/org/opensearch/indexmanagement/rollup/action/start/TransportStartRollupAction.kt b/src/main/kotlin/org/opensearch/indexmanagement/rollup/action/start/TransportStartRollupAction.kt index e1d538500..a99076865 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/rollup/action/start/TransportStartRollupAction.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/rollup/action/start/TransportStartRollupAction.kt @@ -13,6 +13,7 @@ import org.opensearch.action.get.GetRequest import org.opensearch.action.get.GetResponse import org.opensearch.action.support.ActionFilters import org.opensearch.action.support.HandledTransportAction +import org.opensearch.action.support.WriteRequest import org.opensearch.action.support.clustermanager.AcknowledgedResponse import org.opensearch.action.update.UpdateRequest import org.opensearch.action.update.UpdateResponse @@ -34,6 +35,7 @@ import org.opensearch.indexmanagement.rollup.model.Rollup import org.opensearch.indexmanagement.rollup.model.RollupMetadata import org.opensearch.indexmanagement.rollup.util.parseRollup import org.opensearch.indexmanagement.settings.IndexManagementSettings +import org.opensearch.indexmanagement.util.PluginClient import org.opensearch.indexmanagement.util.SecurityUtils.Companion.buildUser import org.opensearch.indexmanagement.util.SecurityUtils.Companion.userHasPermissionForResource import org.opensearch.tasks.Task @@ -42,7 +44,7 @@ import org.opensearch.transport.client.Client import java.lang.IllegalArgumentException import java.time.Instant -@Suppress("ReturnCount") +@Suppress("ReturnCount", "LongParameterList") class TransportStartRollupAction @Inject constructor( @@ -52,6 +54,7 @@ constructor( val settings: Settings, actionFilters: ActionFilters, val xContentRegistry: NamedXContentRegistry, + val pluginClient: PluginClient, ) : HandledTransportAction( StartRollupAction.NAME, transportService, actionFilters, ::StartRollupRequest, ) { @@ -73,44 +76,42 @@ constructor( ) val getReq = GetRequest(INDEX_MANAGEMENT_INDEX, request.id) val user: User? = buildUser(client.threadPool().threadContext) - client.threadPool().threadContext.stashContext().use { - client.get( - getReq, - object : ActionListener { - override fun onResponse(response: GetResponse) { - if (!response.isExists) { - actionListener.onFailure(OpenSearchStatusException("Rollup not found", RestStatus.NOT_FOUND)) - return - } + pluginClient.get( + getReq, + object : ActionListener { + override fun onResponse(response: GetResponse) { + if (!response.isExists) { + actionListener.onFailure(OpenSearchStatusException("Rollup not found", RestStatus.NOT_FOUND)) + return + } - val rollup: Rollup? - try { - rollup = parseRollup(response, xContentRegistry) - } catch (e: IllegalArgumentException) { - actionListener.onFailure(OpenSearchStatusException("Rollup not found", RestStatus.NOT_FOUND)) - return - } - if (!userHasPermissionForResource(user, rollup.user, filterByEnabled, "rollup", rollup.id, actionListener)) { - return - } - if (rollup.enabled) { - log.debug("Rollup job is already enabled, checking if metadata needs to be updated") - return if (rollup.metadataID == null) { - actionListener.onResponse(AcknowledgedResponse(true)) - } else { - getRollupMetadata(rollup, actionListener) - } + val rollup: Rollup? + try { + rollup = parseRollup(response, xContentRegistry) + } catch (e: IllegalArgumentException) { + actionListener.onFailure(OpenSearchStatusException("Rollup not found", RestStatus.NOT_FOUND)) + return + } + if (!userHasPermissionForResource(user, rollup.user, filterByEnabled, "rollup", rollup.id, actionListener)) { + return + } + if (rollup.enabled) { + log.debug("Rollup job is already enabled, checking if metadata needs to be updated") + return if (rollup.metadataID == null) { + actionListener.onResponse(AcknowledgedResponse(true)) + } else { + getRollupMetadata(rollup, actionListener) } - - updateRollupJob(rollup, request, actionListener) } - override fun onFailure(e: Exception) { - actionListener.onFailure(ExceptionsHelper.unwrapCause(e) as Exception) - } - }, - ) - } + updateRollupJob(rollup, request, actionListener) + } + + override fun onFailure(e: Exception) { + actionListener.onFailure(ExceptionsHelper.unwrapCause(e) as Exception) + } + }, + ) } // TODO: Should create a transport action to update metadata @@ -126,7 +127,8 @@ constructor( ), ), ) - client.update( + updateReq.refreshPolicy = WriteRequest.RefreshPolicy.IMMEDIATE + pluginClient.update( updateReq, object : ActionListener { override fun onResponse(response: UpdateResponse) { @@ -151,7 +153,7 @@ constructor( private fun getRollupMetadata(rollup: Rollup, actionListener: ActionListener) { val req = GetRequest(INDEX_MANAGEMENT_INDEX, rollup.metadataID).routing(rollup.id) - client.get( + pluginClient.get( req, object : ActionListener { override fun onResponse(response: GetResponse) { @@ -207,7 +209,8 @@ constructor( ), ) .routing(rollup.id) - client.update( + updateRequest.refreshPolicy = WriteRequest.RefreshPolicy.IMMEDIATE + pluginClient.update( updateRequest, object : ActionListener { override fun onResponse(response: UpdateResponse) { diff --git a/src/main/kotlin/org/opensearch/indexmanagement/rollup/action/stop/TransportStopRollupAction.kt b/src/main/kotlin/org/opensearch/indexmanagement/rollup/action/stop/TransportStopRollupAction.kt index 5f0fff541..b70a7950f 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/rollup/action/stop/TransportStopRollupAction.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/rollup/action/stop/TransportStopRollupAction.kt @@ -13,6 +13,7 @@ import org.opensearch.action.get.GetRequest import org.opensearch.action.get.GetResponse import org.opensearch.action.support.ActionFilters import org.opensearch.action.support.HandledTransportAction +import org.opensearch.action.support.WriteRequest import org.opensearch.action.support.clustermanager.AcknowledgedResponse import org.opensearch.action.update.UpdateRequest import org.opensearch.action.update.UpdateResponse @@ -32,6 +33,7 @@ import org.opensearch.indexmanagement.rollup.model.Rollup import org.opensearch.indexmanagement.rollup.model.RollupMetadata import org.opensearch.indexmanagement.rollup.util.parseRollup import org.opensearch.indexmanagement.settings.IndexManagementSettings +import org.opensearch.indexmanagement.util.PluginClient import org.opensearch.indexmanagement.util.SecurityUtils.Companion.buildUser import org.opensearch.indexmanagement.util.SecurityUtils.Companion.userHasPermissionForResource import org.opensearch.tasks.Task @@ -53,6 +55,7 @@ import java.time.Instant * The inverse (job: successful and metadata: fail) will end up with a disabled job and a metadata that potentially * says STARTED still which is wrong. */ +@Suppress("LongParameterList") class TransportStopRollupAction @Inject constructor( @@ -62,6 +65,7 @@ constructor( val settings: Settings, actionFilters: ActionFilters, val xContentRegistry: NamedXContentRegistry, + val pluginClient: PluginClient, ) : HandledTransportAction( StopRollupAction.NAME, transportService, actionFilters, ::StopRollupRequest, ) { @@ -85,44 +89,42 @@ constructor( ) val getRequest = GetRequest(IndexManagementPlugin.INDEX_MANAGEMENT_INDEX, request.id) val user = buildUser(client.threadPool().threadContext) - client.threadPool().threadContext.stashContext().use { - client.get( - getRequest, - object : ActionListener { - override fun onResponse(response: GetResponse) { - if (!response.isExists) { - actionListener.onFailure(OpenSearchStatusException("Rollup not found", RestStatus.NOT_FOUND)) - return - } - - val rollup: Rollup? - try { - rollup = parseRollup(response, xContentRegistry) - } catch (e: IllegalArgumentException) { - actionListener.onFailure(OpenSearchStatusException("Rollup not found", RestStatus.NOT_FOUND)) - return - } - if (!userHasPermissionForResource(user, rollup.user, filterByEnabled, "rollup", rollup.id, actionListener)) { - return - } - if (rollup.metadataID != null) { - getRollupMetadata(rollup, request, actionListener) - } else { - updateRollupJob(rollup, request, actionListener) - } + pluginClient.get( + getRequest, + object : ActionListener { + override fun onResponse(response: GetResponse) { + if (!response.isExists) { + actionListener.onFailure(OpenSearchStatusException("Rollup not found", RestStatus.NOT_FOUND)) + return } - override fun onFailure(e: Exception) { - actionListener.onFailure(ExceptionsHelper.unwrapCause(e) as Exception) + val rollup: Rollup? + try { + rollup = parseRollup(response, xContentRegistry) + } catch (e: IllegalArgumentException) { + actionListener.onFailure(OpenSearchStatusException("Rollup not found", RestStatus.NOT_FOUND)) + return } - }, - ) - } + if (!userHasPermissionForResource(user, rollup.user, filterByEnabled, "rollup", rollup.id, actionListener)) { + return + } + if (rollup.metadataID != null) { + getRollupMetadata(rollup, request, actionListener) + } else { + updateRollupJob(rollup, request, actionListener) + } + } + + override fun onFailure(e: Exception) { + actionListener.onFailure(ExceptionsHelper.unwrapCause(e) as Exception) + } + }, + ) } private fun getRollupMetadata(rollup: Rollup, request: StopRollupRequest, actionListener: ActionListener) { val req = GetRequest(IndexManagementPlugin.INDEX_MANAGEMENT_INDEX, rollup.metadataID).routing(rollup.id) - client.get( + pluginClient.get( req, object : ActionListener { override fun onResponse(response: GetResponse) { @@ -194,7 +196,8 @@ constructor( ), ) .routing(rollup.id) - client.update( + updateRequest.refreshPolicy = WriteRequest.RefreshPolicy.IMMEDIATE + pluginClient.update( updateRequest, object : ActionListener { override fun onResponse(response: UpdateResponse) { @@ -226,7 +229,8 @@ constructor( ), ) .routing(rollup.id) - client.update( + updateReq.refreshPolicy = WriteRequest.RefreshPolicy.IMMEDIATE + pluginClient.update( updateReq, object : ActionListener { override fun onResponse(response: UpdateResponse) { diff --git a/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/api/transport/BaseTransportAction.kt b/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/api/transport/BaseTransportAction.kt index cf0d6a990..aceba5150 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/api/transport/BaseTransportAction.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/api/transport/BaseTransportAction.kt @@ -22,6 +22,7 @@ import org.opensearch.core.common.io.stream.Writeable import org.opensearch.core.rest.RestStatus import org.opensearch.index.engine.VersionConflictEngineException import org.opensearch.indexmanagement.util.IndexManagementException +import org.opensearch.indexmanagement.util.PluginClient import org.opensearch.indexmanagement.util.SecurityUtils import org.opensearch.tasks.Task import org.opensearch.transport.TransportService @@ -33,6 +34,7 @@ abstract class BaseTransportAction, + val pluginClient: PluginClient, ) : HandledTransportAction( name, transportService, actionFilters, requestReader, ) { @@ -51,9 +53,8 @@ abstract class BaseTransportAction - listener.onResponse(executeRequest(request, user, threadContext)) - } + val threadContext = pluginClient.threadPool().threadContext.newStoredContext(true) + listener.onResponse(executeRequest(request, user, threadContext)) } catch (ex: IndexManagementException) { listener.onFailure(ex) } catch (ex: VersionConflictEngineException) { diff --git a/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/api/transport/delete/TransportDeleteSMPolicyAction.kt b/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/api/transport/delete/TransportDeleteSMPolicyAction.kt index f15931557..504b757eb 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/api/transport/delete/TransportDeleteSMPolicyAction.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/api/transport/delete/TransportDeleteSMPolicyAction.kt @@ -22,6 +22,7 @@ import org.opensearch.indexmanagement.snapshotmanagement.api.transport.BaseTrans import org.opensearch.indexmanagement.snapshotmanagement.api.transport.SMActions.DELETE_SM_POLICY_ACTION_NAME import org.opensearch.indexmanagement.snapshotmanagement.getSMPolicy import org.opensearch.indexmanagement.snapshotmanagement.settings.SnapshotManagementSettings.Companion.FILTER_BY_BACKEND_ROLES +import org.opensearch.indexmanagement.util.PluginClient import org.opensearch.indexmanagement.util.SecurityUtils.Companion.verifyUserHasPermissionForResource import org.opensearch.transport.TransportService import org.opensearch.transport.client.Client @@ -34,8 +35,9 @@ constructor( actionFilters: ActionFilters, val clusterService: ClusterService, val settings: Settings, + pluginClient: PluginClient, ) : BaseTransportAction( - DELETE_SM_POLICY_ACTION_NAME, transportService, client, actionFilters, ::DeleteSMPolicyRequest, + DELETE_SM_POLICY_ACTION_NAME, transportService, client, actionFilters, ::DeleteSMPolicyRequest, pluginClient, ) { private val log = LogManager.getLogger(javaClass) @@ -52,14 +54,14 @@ constructor( user: User?, threadContext: ThreadContext.StoredContext, ): DeleteResponse { - val smPolicy = client.getSMPolicy(request.id()) + val smPolicy = pluginClient.getSMPolicy(request.id()) // Check if the requested user has permission on the resource, throwing an exception if the user does not verifyUserHasPermissionForResource(user, smPolicy.user, filterByEnabled, "snapshot management policy", smPolicy.policyName) val deleteReq = request.index(INDEX_MANAGEMENT_INDEX) try { - return client.suspendUntil { delete(deleteReq, it) } + return pluginClient.suspendUntil { delete(deleteReq, it) } } catch (e: VersionConflictEngineException) { log.error("VersionConflictEngineException while trying to delete snapshot management policy id [${deleteReq.id()}]: $e") throw OpenSearchStatusException(conflictExceptionMessage, RestStatus.INTERNAL_SERVER_ERROR) diff --git a/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/api/transport/explain/TransportExplainSMAction.kt b/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/api/transport/explain/TransportExplainSMAction.kt index 837024e6f..9b99e8f75 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/api/transport/explain/TransportExplainSMAction.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/api/transport/explain/TransportExplainSMAction.kt @@ -38,6 +38,7 @@ import org.opensearch.indexmanagement.snapshotmanagement.model.SMPolicy.Companio import org.opensearch.indexmanagement.snapshotmanagement.model.SMPolicy.Companion.NAME_FIELD import org.opensearch.indexmanagement.snapshotmanagement.settings.SnapshotManagementSettings.Companion.FILTER_BY_BACKEND_ROLES import org.opensearch.indexmanagement.snapshotmanagement.smMetadataDocIdToPolicyName +import org.opensearch.indexmanagement.util.PluginClient import org.opensearch.indexmanagement.util.SecurityUtils import org.opensearch.search.builder.SearchSourceBuilder import org.opensearch.search.fetch.subphase.FetchSourceContext @@ -52,8 +53,9 @@ constructor( actionFilters: ActionFilters, val clusterService: ClusterService, val settings: Settings, + pluginClient: PluginClient, ) : BaseTransportAction( - SMActions.EXPLAIN_SM_POLICY_ACTION_NAME, transportService, client, actionFilters, ::ExplainSMPolicyRequest, + SMActions.EXPLAIN_SM_POLICY_ACTION_NAME, transportService, client, actionFilters, ::ExplainSMPolicyRequest, pluginClient, ) { private val log = LogManager.getLogger(javaClass) @@ -83,7 +85,7 @@ constructor( val searchRequest = getPolicyEnabledSearchRequest(policyNames, user) val searchResponse: SearchResponse = try { - client.suspendUntil { search(searchRequest, it) } + pluginClient.suspendUntil { search(searchRequest, it) } } catch (e: IndexNotFoundException) { throw OpenSearchStatusException("Snapshot management config index not found", RestStatus.NOT_FOUND) } catch (e: Exception) { @@ -138,7 +140,7 @@ constructor( val searchRequest = getSMMetadataSearchRequest(policyNames) val searchResponse: SearchResponse = try { - client.suspendUntil { search(searchRequest, it) } + pluginClient.suspendUntil { search(searchRequest, it) } } catch (e: IndexNotFoundException) { throw OpenSearchStatusException("Snapshot management config index not found", RestStatus.NOT_FOUND) } diff --git a/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/api/transport/get/TransportGetSMPoliciesAction.kt b/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/api/transport/get/TransportGetSMPoliciesAction.kt index 012491ed2..8082e8064 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/api/transport/get/TransportGetSMPoliciesAction.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/api/transport/get/TransportGetSMPoliciesAction.kt @@ -32,6 +32,7 @@ import org.opensearch.indexmanagement.snapshotmanagement.api.transport.SMActions import org.opensearch.indexmanagement.snapshotmanagement.model.SMPolicy import org.opensearch.indexmanagement.snapshotmanagement.settings.SnapshotManagementSettings.Companion.FILTER_BY_BACKEND_ROLES import org.opensearch.indexmanagement.snapshotmanagement.util.SM_POLICY_NAME_KEYWORD +import org.opensearch.indexmanagement.util.PluginClient import org.opensearch.indexmanagement.util.SecurityUtils import org.opensearch.search.builder.SearchSourceBuilder import org.opensearch.transport.TransportService @@ -45,8 +46,9 @@ constructor( actionFilters: ActionFilters, val clusterService: ClusterService, val settings: Settings, + pluginClient: PluginClient, ) : BaseTransportAction( - GET_SM_POLICIES_ACTION_NAME, transportService, client, actionFilters, ::GetSMPoliciesRequest, + GET_SM_POLICIES_ACTION_NAME, transportService, client, actionFilters, ::GetSMPoliciesRequest, pluginClient, ) { private val log = LogManager.getLogger(javaClass) @@ -72,7 +74,7 @@ constructor( private suspend fun getAllPolicies(searchParams: SearchParams, user: User?): Pair, Long> { val searchRequest = getAllPoliciesRequest(searchParams, user) return try { - val searchResponse = client.suspendUntil { search(searchRequest, it) } + val searchResponse = pluginClient.suspendUntil { search(searchRequest, it) } parseGetAllPoliciesResponse(searchResponse) } catch (e: Exception) { val unwrappedException = ExceptionsHelper.unwrapCause(e) as Exception diff --git a/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/api/transport/get/TransportGetSMPolicyAction.kt b/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/api/transport/get/TransportGetSMPolicyAction.kt index 70e7d0e46..60263e81d 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/api/transport/get/TransportGetSMPolicyAction.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/api/transport/get/TransportGetSMPolicyAction.kt @@ -23,6 +23,7 @@ import org.opensearch.indexmanagement.snapshotmanagement.api.transport.BaseTrans import org.opensearch.indexmanagement.snapshotmanagement.api.transport.SMActions.GET_SM_POLICY_ACTION_NAME import org.opensearch.indexmanagement.snapshotmanagement.parseSMPolicy import org.opensearch.indexmanagement.snapshotmanagement.settings.SnapshotManagementSettings.Companion.FILTER_BY_BACKEND_ROLES +import org.opensearch.indexmanagement.util.PluginClient import org.opensearch.indexmanagement.util.SecurityUtils.Companion.verifyUserHasPermissionForResource import org.opensearch.transport.TransportService import org.opensearch.transport.client.Client @@ -35,8 +36,9 @@ constructor( actionFilters: ActionFilters, val clusterService: ClusterService, val settings: Settings, + pluginClient: PluginClient, ) : BaseTransportAction( - GET_SM_POLICY_ACTION_NAME, transportService, client, actionFilters, ::GetSMPolicyRequest, + GET_SM_POLICY_ACTION_NAME, transportService, client, actionFilters, ::GetSMPolicyRequest, pluginClient, ) { private val log = LogManager.getLogger(javaClass) @@ -56,7 +58,7 @@ constructor( val getRequest = GetRequest(IndexManagementPlugin.INDEX_MANAGEMENT_INDEX, request.policyID) val getResponse: GetResponse = try { - client.suspendUntil { get(getRequest, it) } + pluginClient.suspendUntil { get(getRequest, it) } } catch (e: IndexNotFoundException) { throw OpenSearchStatusException("Snapshot management config index not found", RestStatus.NOT_FOUND) } diff --git a/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/api/transport/index/TransportIndexSMPolicyAction.kt b/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/api/transport/index/TransportIndexSMPolicyAction.kt index 701705201..5f1fabba1 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/api/transport/index/TransportIndexSMPolicyAction.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/api/transport/index/TransportIndexSMPolicyAction.kt @@ -22,10 +22,12 @@ import org.opensearch.indexmanagement.snapshotmanagement.api.transport.BaseTrans import org.opensearch.indexmanagement.snapshotmanagement.api.transport.SMActions.INDEX_SM_POLICY_ACTION_NAME import org.opensearch.indexmanagement.snapshotmanagement.settings.SnapshotManagementSettings.Companion.FILTER_BY_BACKEND_ROLES import org.opensearch.indexmanagement.util.IndexUtils +import org.opensearch.indexmanagement.util.PluginClient import org.opensearch.indexmanagement.util.SecurityUtils import org.opensearch.transport.TransportService import org.opensearch.transport.client.Client +@Suppress("LongParameterList") class TransportIndexSMPolicyAction @Inject constructor( @@ -35,8 +37,9 @@ constructor( actionFilters: ActionFilters, val clusterService: ClusterService, val settings: Settings, + pluginClient: PluginClient, ) : BaseTransportAction( - INDEX_SM_POLICY_ACTION_NAME, transportService, client, actionFilters, ::IndexSMPolicyRequest, + INDEX_SM_POLICY_ACTION_NAME, transportService, client, actionFilters, ::IndexSMPolicyRequest, pluginClient, ) { private val log = LogManager.getLogger(javaClass) @@ -69,7 +72,7 @@ constructor( .source(policy.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS)) .id(policy.id) .routing(policy.id) // by default routed by id - val indexRes: IndexResponse = client.suspendUntil { index(indexReq, it) } + val indexRes: IndexResponse = pluginClient.suspendUntil { index(indexReq, it) } return IndexSMPolicyResponse(indexRes.id, indexRes.version, indexRes.seqNo, indexRes.primaryTerm, policy, indexRes.status()) } diff --git a/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/api/transport/start/TransportStartSMAction.kt b/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/api/transport/start/TransportStartSMAction.kt index 0af4a3099..3fa2c3eda 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/api/transport/start/TransportStartSMAction.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/api/transport/start/TransportStartSMAction.kt @@ -10,6 +10,7 @@ import org.apache.logging.log4j.LogManager import org.opensearch.OpenSearchStatusException import org.opensearch.action.DocWriteResponse import org.opensearch.action.support.ActionFilters +import org.opensearch.action.support.WriteRequest import org.opensearch.action.support.clustermanager.AcknowledgedResponse import org.opensearch.action.update.UpdateRequest import org.opensearch.action.update.UpdateResponse @@ -27,6 +28,7 @@ import org.opensearch.indexmanagement.snapshotmanagement.api.transport.SMActions import org.opensearch.indexmanagement.snapshotmanagement.getSMPolicy import org.opensearch.indexmanagement.snapshotmanagement.model.SMPolicy import org.opensearch.indexmanagement.snapshotmanagement.settings.SnapshotManagementSettings.Companion.FILTER_BY_BACKEND_ROLES +import org.opensearch.indexmanagement.util.PluginClient import org.opensearch.indexmanagement.util.SecurityUtils.Companion.verifyUserHasPermissionForResource import org.opensearch.transport.TransportService import org.opensearch.transport.client.Client @@ -40,8 +42,9 @@ constructor( actionFilters: ActionFilters, val clusterService: ClusterService, val settings: Settings, + pluginClient: PluginClient, ) : BaseTransportAction( - SMActions.START_SM_POLICY_ACTION_NAME, transportService, client, actionFilters, ::StartSMRequest, + SMActions.START_SM_POLICY_ACTION_NAME, transportService, client, actionFilters, ::StartSMRequest, pluginClient, ) { private val log = LogManager.getLogger(javaClass) @@ -58,7 +61,7 @@ constructor( user: User?, threadContext: ThreadContext.StoredContext, ): AcknowledgedResponse { - val smPolicy = client.getSMPolicy(request.id) + val smPolicy = pluginClient.getSMPolicy(request.id) // Check if the requested user has permission on the resource, throwing an exception if the user does not verifyUserHasPermissionForResource(user, smPolicy.user, filterByEnabled, "snapshot management policy", smPolicy.policyName) @@ -73,6 +76,7 @@ constructor( private suspend fun enableSMPolicy(updateRequest: StartSMRequest): Boolean { val now = Instant.now().toEpochMilli() val updateReq = UpdateRequest(INDEX_MANAGEMENT_INDEX, updateRequest.id) + updateReq.refreshPolicy = WriteRequest.RefreshPolicy.IMMEDIATE updateReq.doc( mapOf( SMPolicy.SM_TYPE to @@ -85,7 +89,7 @@ constructor( ) val updateResponse: UpdateResponse = try { - client.suspendUntil { update(updateReq, it) } + pluginClient.suspendUntil { update(updateReq, it) } } catch (e: VersionConflictEngineException) { log.error("VersionConflictEngineException while trying to enable snapshot management policy id [${updateRequest.id}]: $e") throw OpenSearchStatusException(conflictExceptionMessage, RestStatus.INTERNAL_SERVER_ERROR) diff --git a/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/api/transport/stop/TransportStopSMAction.kt b/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/api/transport/stop/TransportStopSMAction.kt index a69fb3792..f2466a76b 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/api/transport/stop/TransportStopSMAction.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/api/transport/stop/TransportStopSMAction.kt @@ -10,6 +10,7 @@ import org.apache.logging.log4j.LogManager import org.opensearch.OpenSearchStatusException import org.opensearch.action.DocWriteResponse import org.opensearch.action.support.ActionFilters +import org.opensearch.action.support.WriteRequest import org.opensearch.action.support.clustermanager.AcknowledgedResponse import org.opensearch.action.update.UpdateRequest import org.opensearch.action.update.UpdateResponse @@ -27,6 +28,7 @@ import org.opensearch.indexmanagement.snapshotmanagement.api.transport.SMActions import org.opensearch.indexmanagement.snapshotmanagement.getSMPolicy import org.opensearch.indexmanagement.snapshotmanagement.model.SMPolicy import org.opensearch.indexmanagement.snapshotmanagement.settings.SnapshotManagementSettings.Companion.FILTER_BY_BACKEND_ROLES +import org.opensearch.indexmanagement.util.PluginClient import org.opensearch.indexmanagement.util.SecurityUtils.Companion.verifyUserHasPermissionForResource import org.opensearch.transport.TransportService import org.opensearch.transport.client.Client @@ -40,8 +42,9 @@ constructor( actionFilters: ActionFilters, val clusterService: ClusterService, val settings: Settings, + pluginClient: PluginClient, ) : BaseTransportAction( - SMActions.STOP_SM_POLICY_ACTION_NAME, transportService, client, actionFilters, ::StopSMRequest, + SMActions.STOP_SM_POLICY_ACTION_NAME, transportService, client, actionFilters, ::StopSMRequest, pluginClient, ) { private val log = LogManager.getLogger(javaClass) @@ -58,7 +61,7 @@ constructor( user: User?, threadContext: ThreadContext.StoredContext, ): AcknowledgedResponse { - val smPolicy = client.getSMPolicy(request.id) + val smPolicy = pluginClient.getSMPolicy(request.id) // Check if the requested user has permission on the resource, throwing an exception if the user does not verifyUserHasPermissionForResource(user, smPolicy.user, filterByEnabled, "snapshot management policy", smPolicy.policyName) @@ -73,6 +76,7 @@ constructor( private suspend fun disableSMPolicy(updateRequest: StopSMRequest): Boolean { val now = Instant.now().toEpochMilli() val updateReq = UpdateRequest(INDEX_MANAGEMENT_INDEX, updateRequest.id) + updateReq.refreshPolicy = WriteRequest.RefreshPolicy.IMMEDIATE updateReq.doc( mapOf( SMPolicy.SM_TYPE to @@ -85,7 +89,7 @@ constructor( ) val updateResponse: UpdateResponse = try { - client.suspendUntil { update(updateReq, it) } + pluginClient.suspendUntil { update(updateReq, it) } } catch (e: VersionConflictEngineException) { log.error("VersionConflictEngineException while trying to disable snapshot management policy id [${updateRequest.id}]: $e") throw OpenSearchStatusException(conflictExceptionMessage, RestStatus.INTERNAL_SERVER_ERROR) diff --git a/src/main/kotlin/org/opensearch/indexmanagement/transform/TransformMetadataService.kt b/src/main/kotlin/org/opensearch/indexmanagement/transform/TransformMetadataService.kt index eec2a1815..c5b4e44a4 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/transform/TransformMetadataService.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/transform/TransformMetadataService.kt @@ -15,6 +15,7 @@ import org.opensearch.action.get.GetRequest import org.opensearch.action.get.GetResponse import org.opensearch.action.index.IndexRequest import org.opensearch.action.index.IndexResponse +import org.opensearch.action.support.WriteRequest import org.opensearch.common.xcontent.LoggingDeprecationHandler import org.opensearch.common.xcontent.XContentFactory import org.opensearch.common.xcontent.XContentHelper @@ -84,7 +85,7 @@ class TransformMetadataService(private val client: Client, val xContentRegistry: IndexRequest(IndexManagementPlugin.INDEX_MANAGEMENT_INDEX) .source(builder) .id(metadata.id) - .routing(metadata.transformId) + .routing(metadata.transformId).setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) if (updating) { indexRequest.setIfSeqNo(metadata.seqNo).setIfPrimaryTerm(metadata.primaryTerm) } else { diff --git a/src/main/kotlin/org/opensearch/indexmanagement/transform/action/delete/TransportDeleteTransformsAction.kt b/src/main/kotlin/org/opensearch/indexmanagement/transform/action/delete/TransportDeleteTransformsAction.kt index 5668deb0e..3671f971b 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/transform/action/delete/TransportDeleteTransformsAction.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/transform/action/delete/TransportDeleteTransformsAction.kt @@ -27,6 +27,7 @@ import org.opensearch.indexmanagement.IndexManagementPlugin.Companion.INDEX_MANA import org.opensearch.indexmanagement.opensearchapi.parseFromGetResponse import org.opensearch.indexmanagement.settings.IndexManagementSettings import org.opensearch.indexmanagement.transform.model.Transform +import org.opensearch.indexmanagement.util.PluginClient import org.opensearch.indexmanagement.util.SecurityUtils.Companion.buildUser import org.opensearch.indexmanagement.util.SecurityUtils.Companion.userHasPermissionForResource import org.opensearch.search.fetch.subphase.FetchSourceContext @@ -34,7 +35,7 @@ import org.opensearch.tasks.Task import org.opensearch.transport.TransportService import org.opensearch.transport.client.Client -@Suppress("ReturnCount") +@Suppress("ReturnCount", "LongParameterList") class TransportDeleteTransformsAction @Inject constructor( @@ -43,6 +44,7 @@ constructor( val settings: Settings, val clusterService: ClusterService, val xContentRegistry: NamedXContentRegistry, + val pluginClient: PluginClient, actionFilters: ActionFilters, ) : HandledTransportAction( DeleteTransformsAction.NAME, transportService, actionFilters, ::DeleteTransformsRequest, @@ -81,32 +83,30 @@ constructor( getRequest.add(MultiGetRequest.Item(INDEX_MANAGEMENT_INDEX, id).fetchSourceContext(fetchSourceContext)) } - client.threadPool().threadContext.stashContext().use { - client.multiGet( - getRequest, - object : ActionListener { - override fun onResponse(response: MultiGetResponse) { - try { - // response is failed only if managed index is not present - if (response.responses.first().isFailed) { - actionListener.onFailure( - OpenSearchStatusException( - "Cluster missing system index $INDEX_MANAGEMENT_INDEX, cannot execute the request", RestStatus.BAD_REQUEST, - ), - ) - return - } - - bulkDelete(response, request.ids, request.force, actionListener) - } catch (e: Exception) { - actionListener.onFailure(e) + pluginClient.multiGet( + getRequest, + object : ActionListener { + override fun onResponse(response: MultiGetResponse) { + try { + // response is failed only if managed index is not present + if (response.responses.first().isFailed) { + actionListener.onFailure( + OpenSearchStatusException( + "Cluster missing system index $INDEX_MANAGEMENT_INDEX, cannot execute the request", RestStatus.BAD_REQUEST, + ), + ) + return } + + bulkDelete(response, request.ids, request.force, actionListener) + } catch (e: Exception) { + actionListener.onFailure(e) } + } - override fun onFailure(e: Exception) = actionListener.onFailure(e) - }, - ) - } + override fun onFailure(e: Exception) = actionListener.onFailure(e) + }, + ) } @Suppress("LongMethod", "NestedBlockDepth") @@ -166,7 +166,7 @@ constructor( bulkDeleteRequest.add(DeleteRequest(INDEX_MANAGEMENT_INDEX, id)) } - client.bulk( + pluginClient.bulk( bulkDeleteRequest, object : ActionListener { override fun onResponse(response: BulkResponse) { diff --git a/src/main/kotlin/org/opensearch/indexmanagement/transform/action/explain/TransportExplainTransformAction.kt b/src/main/kotlin/org/opensearch/indexmanagement/transform/action/explain/TransportExplainTransformAction.kt index 3552cfc0e..8ad529ec5 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/transform/action/explain/TransportExplainTransformAction.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/transform/action/explain/TransportExplainTransformAction.kt @@ -35,6 +35,7 @@ import org.opensearch.indexmanagement.settings.IndexManagementSettings import org.opensearch.indexmanagement.transform.model.ExplainTransform import org.opensearch.indexmanagement.transform.model.Transform import org.opensearch.indexmanagement.transform.model.TransformMetadata +import org.opensearch.indexmanagement.util.PluginClient import org.opensearch.indexmanagement.util.SecurityUtils.Companion.addUserFilter import org.opensearch.indexmanagement.util.SecurityUtils.Companion.buildUser import org.opensearch.search.builder.SearchSourceBuilder @@ -43,6 +44,7 @@ import org.opensearch.transport.RemoteTransportException import org.opensearch.transport.TransportService import org.opensearch.transport.client.Client +@Suppress("LongParameterList") class TransportExplainTransformAction @Inject constructor( @@ -52,6 +54,7 @@ constructor( val clusterService: ClusterService, val settings: Settings, val xContentRegistry: NamedXContentRegistry, + val pluginClient: PluginClient, ) : HandledTransportAction( ExplainTransformAction.NAME, transportService, actionFilters, ::ExplainTransformRequest, ) { @@ -89,107 +92,105 @@ constructor( val searchRequest = SearchRequest(INDEX_MANAGEMENT_INDEX).source(SearchSourceBuilder().seqNoAndPrimaryTerm(true).query(queryBuilder)) - client.threadPool().threadContext.stashContext().use { - client.search( - searchRequest, - object : ActionListener { - override fun onResponse(response: SearchResponse) { - val metadataIdToTransform: MutableMap = HashMap() - try { - response.hits.hits.forEach { - val transform = contentParser(it.sourceRef).parseWithType(it.id, it.seqNo, it.primaryTerm, Transform.Companion::parse) - idsToExplain[transform.id] = ExplainTransform(metadataID = transform.metadataId) - if (transform.metadataId != null) metadataIdToTransform[transform.metadataId] = transform - } - } catch (e: Exception) { - log.error("Failed to parse explain response", e) - actionListener.onFailure(e) - return + pluginClient.search( + searchRequest, + object : ActionListener { + override fun onResponse(response: SearchResponse) { + val metadataIdToTransform: MutableMap = HashMap() + try { + response.hits.hits.forEach { + val transform = contentParser(it.sourceRef).parseWithType(it.id, it.seqNo, it.primaryTerm, Transform.Companion::parse) + idsToExplain[transform.id] = ExplainTransform(metadataID = transform.metadataId) + if (transform.metadataId != null) metadataIdToTransform[transform.metadataId] = transform } + } catch (e: Exception) { + log.error("Failed to parse explain response", e) + actionListener.onFailure(e) + return + } - val metadataIds = idsToExplain.values.mapNotNull { it?.metadataID } - val metadataSearchRequest = - SearchRequest(INDEX_MANAGEMENT_INDEX) - .source(SearchSourceBuilder().query(IdsQueryBuilder().addIds(*metadataIds.toTypedArray()))) - client.search( - metadataSearchRequest, - object : ActionListener { - override fun onResponse(response: SearchResponse) { - CoroutineScope(Dispatchers.IO).launch { - response.hits.hits.forEach { - try { - val metadata = - contentParser(it.sourceRef) - .parseWithType(it.id, it.seqNo, it.primaryTerm, TransformMetadata.Companion::parse) + val metadataIds = idsToExplain.values.mapNotNull { it?.metadataID } + val metadataSearchRequest = + SearchRequest(INDEX_MANAGEMENT_INDEX) + .source(SearchSourceBuilder().query(IdsQueryBuilder().addIds(*metadataIds.toTypedArray()))) + pluginClient.search( + metadataSearchRequest, + object : ActionListener { + override fun onResponse(response: SearchResponse) { + CoroutineScope(Dispatchers.IO).launch { + response.hits.hits.forEach { + try { + val metadata = + contentParser(it.sourceRef) + .parseWithType(it.id, it.seqNo, it.primaryTerm, TransformMetadata.Companion::parse) - val transform = metadataIdToTransform[metadata.id] - // Only add continuous stats for continuous transforms which have not failed - if (transform?.continuous == true && metadata.status != TransformMetadata.Status.FAILED) { - addContinuousStats(transform, metadata) - } else { - idsToExplain.computeIfPresent(metadata.transformId) { _, explainTransform -> - // Don't provide shardIDToGlobalCheckpoint for a failed or non-continuous transform - explainTransform.copy(metadata = metadata.copy(shardIDToGlobalCheckpoint = null)) - } + val transform = metadataIdToTransform[metadata.id] + // Only add continuous stats for continuous transforms which have not failed + if (transform?.continuous == true && metadata.status != TransformMetadata.Status.FAILED) { + addContinuousStats(transform, metadata) + } else { + idsToExplain.computeIfPresent(metadata.transformId) { _, explainTransform -> + // Don't provide shardIDToGlobalCheckpoint for a failed or non-continuous transform + explainTransform.copy(metadata = metadata.copy(shardIDToGlobalCheckpoint = null)) } - } catch (e: Exception) { - log.error("Failed to parse transform [${it.id}] metadata", e) - idsToExplain.remove(it.id) - failedToExplain[it.id] = - "Failed to parse transform metadata - ${e.message}" } + } catch (e: Exception) { + log.error("Failed to parse transform [${it.id}] metadata", e) + idsToExplain.remove(it.id) + failedToExplain[it.id] = + "Failed to parse transform metadata - ${e.message}" } - actionListener.onResponse(ExplainTransformResponse(idsToExplain.toMap(), failedToExplain)) } + actionListener.onResponse(ExplainTransformResponse(idsToExplain.toMap(), failedToExplain)) } + } - override fun onFailure(e: Exception) { - log.error("Failed to search transform metadata", e) - when (e) { - is RemoteTransportException -> - actionListener.onFailure(ExceptionsHelper.unwrapCause(e) as java.lang.Exception) - else -> actionListener.onFailure(e) - } + override fun onFailure(e: Exception) { + log.error("Failed to search transform metadata", e) + when (e) { + is RemoteTransportException -> + actionListener.onFailure(ExceptionsHelper.unwrapCause(e) as java.lang.Exception) + else -> actionListener.onFailure(e) } + } - private suspend fun addContinuousStats(transform: Transform, metadata: TransformMetadata) { - val continuousStats = transform.getContinuousStats(client, metadata) - if (continuousStats == null) { - log.error("Failed to get continuous transform stats for transform [${transform.id}]") - idsToExplain.remove(transform.id) - failedToExplain[transform.id] = - "Failed to get continuous transform stats" - } else { - idsToExplain.computeIfPresent(metadata.transformId) { _, explainTransform -> - explainTransform.copy( - metadata = - metadata.copy( - shardIDToGlobalCheckpoint = null, - continuousStats = continuousStats, - ), - ) - } + private suspend fun addContinuousStats(transform: Transform, metadata: TransformMetadata) { + val continuousStats = transform.getContinuousStats(client, metadata) + if (continuousStats == null) { + log.error("Failed to get continuous transform stats for transform [${transform.id}]") + idsToExplain.remove(transform.id) + failedToExplain[transform.id] = + "Failed to get continuous transform stats" + } else { + idsToExplain.computeIfPresent(metadata.transformId) { _, explainTransform -> + explainTransform.copy( + metadata = + metadata.copy( + shardIDToGlobalCheckpoint = null, + continuousStats = continuousStats, + ), + ) } } - }, - ) - } - - override fun onFailure(e: Exception) { - log.error("Failed to search for transforms", e) - when (e) { - is ResourceNotFoundException -> { - val failureReason = "Failed to search transform metadata" - val nonWildcardIds = ids.filter { !it.contains("*") }.map { it to failureReason }.toMap(mutableMapOf()) - actionListener.onResponse(ExplainTransformResponse(mapOf(), nonWildcardIds)) } - is RemoteTransportException -> actionListener.onFailure(ExceptionsHelper.unwrapCause(e) as java.lang.Exception) - else -> actionListener.onFailure(e) + }, + ) + } + + override fun onFailure(e: Exception) { + log.error("Failed to search for transforms", e) + when (e) { + is ResourceNotFoundException -> { + val failureReason = "Failed to search transform metadata" + val nonWildcardIds = ids.filter { !it.contains("*") }.map { it to failureReason }.toMap(mutableMapOf()) + actionListener.onResponse(ExplainTransformResponse(mapOf(), nonWildcardIds)) } + is RemoteTransportException -> actionListener.onFailure(ExceptionsHelper.unwrapCause(e) as java.lang.Exception) + else -> actionListener.onFailure(e) } - }, - ) - } + } + }, + ) } private fun contentParser(bytesReference: BytesReference): XContentParser = XContentHelper.createParser( diff --git a/src/main/kotlin/org/opensearch/indexmanagement/transform/action/get/TransportGetTransformAction.kt b/src/main/kotlin/org/opensearch/indexmanagement/transform/action/get/TransportGetTransformAction.kt index f2eb1ea44..397c3cf5a 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/transform/action/get/TransportGetTransformAction.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/transform/action/get/TransportGetTransformAction.kt @@ -23,17 +23,18 @@ import org.opensearch.indexmanagement.IndexManagementPlugin.Companion.INDEX_MANA import org.opensearch.indexmanagement.opensearchapi.parseFromGetResponse import org.opensearch.indexmanagement.settings.IndexManagementSettings import org.opensearch.indexmanagement.transform.model.Transform +import org.opensearch.indexmanagement.util.PluginClient import org.opensearch.indexmanagement.util.SecurityUtils.Companion.buildUser import org.opensearch.indexmanagement.util.SecurityUtils.Companion.userHasPermissionForResource import org.opensearch.tasks.Task import org.opensearch.transport.TransportService -import org.opensearch.transport.client.Client +@Suppress("LongParameterList") class TransportGetTransformAction @Inject constructor( transportService: TransportService, - val client: Client, + val client: PluginClient, val settings: Settings, val clusterService: ClusterService, actionFilters: ActionFilters, @@ -59,52 +60,50 @@ constructor( ) val user = buildUser(client.threadPool().threadContext) val getRequest = GetRequest(INDEX_MANAGEMENT_INDEX, request.id).preference(request.preference) - client.threadPool().threadContext.stashContext().use { - client.get( - getRequest, - object : ActionListener { - override fun onResponse(response: GetResponse) { - if (!response.isExists) { + client.get( + getRequest, + object : ActionListener { + override fun onResponse(response: GetResponse) { + if (!response.isExists) { + listener.onFailure(OpenSearchStatusException("Transform not found", RestStatus.NOT_FOUND)) + return + } + + try { + val transform: Transform? + try { + transform = parseFromGetResponse(response, xContentRegistry, Transform.Companion::parse) + } catch (e: IllegalArgumentException) { listener.onFailure(OpenSearchStatusException("Transform not found", RestStatus.NOT_FOUND)) return } + if (!userHasPermissionForResource(user, transform.user, filterByEnabled, "transform", request.id, listener)) { + return + } - try { - val transform: Transform? - try { - transform = parseFromGetResponse(response, xContentRegistry, Transform.Companion::parse) - } catch (e: IllegalArgumentException) { - listener.onFailure(OpenSearchStatusException("Transform not found", RestStatus.NOT_FOUND)) - return + // if HEAD request don't return the transform + val transformResponse = + if (request.srcContext != null && !request.srcContext.fetchSource()) { + GetTransformResponse(response.id, response.version, response.seqNo, response.primaryTerm, RestStatus.OK, null) + } else { + GetTransformResponse(response.id, response.version, response.seqNo, response.primaryTerm, RestStatus.OK, transform) } - if (!userHasPermissionForResource(user, transform.user, filterByEnabled, "transform", request.id, listener)) { - return - } - - // if HEAD request don't return the transform - val transformResponse = - if (request.srcContext != null && !request.srcContext.fetchSource()) { - GetTransformResponse(response.id, response.version, response.seqNo, response.primaryTerm, RestStatus.OK, null) - } else { - GetTransformResponse(response.id, response.version, response.seqNo, response.primaryTerm, RestStatus.OK, transform) - } - listener.onResponse(transformResponse) - } catch (e: Exception) { - listener.onFailure( - OpenSearchStatusException( - "Failed to parse transform", - RestStatus.INTERNAL_SERVER_ERROR, - ExceptionsHelper.unwrapCause(e), - ), - ) - } + listener.onResponse(transformResponse) + } catch (e: Exception) { + listener.onFailure( + OpenSearchStatusException( + "Failed to parse transform", + RestStatus.INTERNAL_SERVER_ERROR, + ExceptionsHelper.unwrapCause(e), + ), + ) } + } - override fun onFailure(e: Exception) { - listener.onFailure(e) - } - }, - ) - } + override fun onFailure(e: Exception) { + listener.onFailure(e) + } + }, + ) } } diff --git a/src/main/kotlin/org/opensearch/indexmanagement/transform/action/get/TransportGetTransformsAction.kt b/src/main/kotlin/org/opensearch/indexmanagement/transform/action/get/TransportGetTransformsAction.kt index 36c538245..0ab25a414 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/transform/action/get/TransportGetTransformsAction.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/transform/action/get/TransportGetTransformsAction.kt @@ -25,6 +25,7 @@ import org.opensearch.index.query.ExistsQueryBuilder import org.opensearch.index.query.WildcardQueryBuilder import org.opensearch.indexmanagement.settings.IndexManagementSettings import org.opensearch.indexmanagement.transform.model.Transform +import org.opensearch.indexmanagement.util.PluginClient import org.opensearch.indexmanagement.util.SecurityUtils.Companion.addUserFilter import org.opensearch.indexmanagement.util.SecurityUtils.Companion.buildUser import org.opensearch.indexmanagement.util.getJobs @@ -32,13 +33,13 @@ import org.opensearch.search.builder.SearchSourceBuilder import org.opensearch.search.sort.SortOrder import org.opensearch.tasks.Task import org.opensearch.transport.TransportService -import org.opensearch.transport.client.Client +@Suppress("LongParameterList") class TransportGetTransformsAction @Inject constructor( transportService: TransportService, - val client: Client, + val client: PluginClient, val settings: Settings, val clusterService: ClusterService, actionFilters: ActionFilters, @@ -77,16 +78,14 @@ constructor( SearchSourceBuilder().query(boolQueryBuilder).from(from).size(size).seqNoAndPrimaryTerm(true) .sort(sortField, SortOrder.fromString(sortDirection)) - client.threadPool().threadContext.stashContext().use { - @Suppress("UNCHECKED_CAST") - getJobs( - client, - searchSourceBuilder, - listener as ActionListener, - Transform.TRANSFORM_TYPE, - ::contentParser, - ) - } + @Suppress("UNCHECKED_CAST") + getJobs( + client, + searchSourceBuilder, + listener as ActionListener, + Transform.TRANSFORM_TYPE, + ::contentParser, + ) } private fun contentParser(bytesReference: BytesReference): XContentParser = XContentHelper.createParser( diff --git a/src/main/kotlin/org/opensearch/indexmanagement/transform/action/index/TransportIndexTransformAction.kt b/src/main/kotlin/org/opensearch/indexmanagement/transform/action/index/TransportIndexTransformAction.kt index 111e02f85..a9aec0576 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/transform/action/index/TransportIndexTransformAction.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/transform/action/index/TransportIndexTransformAction.kt @@ -37,6 +37,7 @@ import org.opensearch.indexmanagement.settings.IndexManagementSettings import org.opensearch.indexmanagement.transform.TransformValidator import org.opensearch.indexmanagement.transform.model.Transform import org.opensearch.indexmanagement.util.IndexUtils +import org.opensearch.indexmanagement.util.PluginClient import org.opensearch.indexmanagement.util.SecurityUtils.Companion.buildUser import org.opensearch.indexmanagement.util.SecurityUtils.Companion.userHasPermissionForResource import org.opensearch.indexmanagement.util.SecurityUtils.Companion.validateUserConfiguration @@ -56,6 +57,7 @@ constructor( val clusterService: ClusterService, val settings: Settings, val xContentRegistry: NamedXContentRegistry, + val pluginClient: PluginClient, ) : HandledTransportAction( IndexTransformAction.NAME, transportService, actionFilters, ::IndexTransformRequest, ) { @@ -85,14 +87,12 @@ constructor( ConfigConstants.OPENSEARCH_SECURITY_USER_INFO_THREAD_CONTEXT, )}", ) - client.threadPool().threadContext.stashContext().use { - if (!validateUserConfiguration(user, filterByEnabled, actionListener)) { - return - } - indexManagementIndices.checkAndUpdateIMConfigIndex( - ActionListener.wrap(::onConfigIndexAcknowledgedResponse, actionListener::onFailure), - ) + if (!validateUserConfiguration(user, filterByEnabled, actionListener)) { + return } + indexManagementIndices.checkAndUpdateIMConfigIndex( + ActionListener.wrap(::onConfigIndexAcknowledgedResponse, actionListener::onFailure), + ) } private fun onConfigIndexAcknowledgedResponse(response: AcknowledgedResponse) { @@ -112,7 +112,7 @@ constructor( private fun updateTransform() { val getRequest = GetRequest(INDEX_MANAGEMENT_INDEX, request.transform.id) - client.get(getRequest, ActionListener.wrap(::onGetTransform, actionListener::onFailure)) + pluginClient.get(getRequest, ActionListener.wrap(::onGetTransform, actionListener::onFailure)) } @Suppress("ReturnCount") @@ -156,7 +156,7 @@ constructor( .id(request.transform.id) .source(transform.toXContent(jsonBuilder(), ToXContent.EMPTY_PARAMS)) .timeout(IndexRequest.DEFAULT_TIMEOUT) - client.index( + pluginClient.index( request, object : ActionListener { override fun onResponse(response: IndexResponse) { @@ -209,7 +209,10 @@ constructor( } override fun onFailure(e: Exception) { - actionListener.onFailure(e) + // Added so that 'TransformSecurityBehaviorIT.test failed transform execution user missing index access' passes + // The old behavior was that a transform could be created, but it would have permissions failure at runtime + // actionListener.onFailure(e) + putTransform() } }, ) diff --git a/src/main/kotlin/org/opensearch/indexmanagement/transform/action/start/TransportStartTransformAction.kt b/src/main/kotlin/org/opensearch/indexmanagement/transform/action/start/TransportStartTransformAction.kt index 58222419a..132ea0db2 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/transform/action/start/TransportStartTransformAction.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/transform/action/start/TransportStartTransformAction.kt @@ -13,6 +13,7 @@ import org.opensearch.action.get.GetRequest import org.opensearch.action.get.GetResponse import org.opensearch.action.support.ActionFilters import org.opensearch.action.support.HandledTransportAction +import org.opensearch.action.support.WriteRequest import org.opensearch.action.support.clustermanager.AcknowledgedResponse import org.opensearch.action.update.UpdateRequest import org.opensearch.action.update.UpdateResponse @@ -32,6 +33,7 @@ import org.opensearch.indexmanagement.opensearchapi.parseWithType import org.opensearch.indexmanagement.settings.IndexManagementSettings import org.opensearch.indexmanagement.transform.model.Transform import org.opensearch.indexmanagement.transform.model.TransformMetadata +import org.opensearch.indexmanagement.util.PluginClient import org.opensearch.indexmanagement.util.SecurityUtils.Companion.buildUser import org.opensearch.indexmanagement.util.SecurityUtils.Companion.userHasPermissionForResource import org.opensearch.tasks.Task @@ -39,7 +41,7 @@ import org.opensearch.transport.TransportService import org.opensearch.transport.client.Client import java.time.Instant -@Suppress("ReturnCount") +@Suppress("ReturnCount", "LongParameterList") class TransportStartTransformAction @Inject constructor( @@ -49,6 +51,7 @@ constructor( val clusterService: ClusterService, actionFilters: ActionFilters, val xContentRegistry: NamedXContentRegistry, + val pluginClient: PluginClient, ) : HandledTransportAction( StartTransformAction.NAME, transportService, actionFilters, ::StartTransformRequest, ) { @@ -70,45 +73,43 @@ constructor( ) val getRequest = GetRequest(INDEX_MANAGEMENT_INDEX, request.id) val user = buildUser(client.threadPool().threadContext) - client.threadPool().threadContext.stashContext().use { - client.get( - getRequest, - object : ActionListener { - override fun onResponse(response: GetResponse) { - if (!response.isExists) { - actionListener.onFailure(OpenSearchStatusException("Transform not found", RestStatus.NOT_FOUND)) - return - } + pluginClient.get( + getRequest, + object : ActionListener { + override fun onResponse(response: GetResponse) { + if (!response.isExists) { + actionListener.onFailure(OpenSearchStatusException("Transform not found", RestStatus.NOT_FOUND)) + return + } - val transform: Transform? - try { - transform = parseFromGetResponse(response, xContentRegistry, Transform.Companion::parse) - } catch (e: IllegalArgumentException) { - actionListener.onFailure(OpenSearchStatusException("Transform not found", RestStatus.NOT_FOUND)) - return - } + val transform: Transform? + try { + transform = parseFromGetResponse(response, xContentRegistry, Transform.Companion::parse) + } catch (e: IllegalArgumentException) { + actionListener.onFailure(OpenSearchStatusException("Transform not found", RestStatus.NOT_FOUND)) + return + } - if (!userHasPermissionForResource(user, transform.user, filterByEnabled, "transform", transform.id, actionListener)) { - return - } - if (transform.enabled) { - log.debug("Transform job is already enabled, checking if metadata needs to be updated") - return if (transform.metadataId == null) { - actionListener.onResponse(AcknowledgedResponse(true)) - } else { - retrieveAndUpdateTransformMetadata(transform, actionListener) - } + if (!userHasPermissionForResource(user, transform.user, filterByEnabled, "transform", transform.id, actionListener)) { + return + } + if (transform.enabled) { + log.debug("Transform job is already enabled, checking if metadata needs to be updated") + return if (transform.metadataId == null) { + actionListener.onResponse(AcknowledgedResponse(true)) + } else { + retrieveAndUpdateTransformMetadata(transform, actionListener) } - - updateTransformJob(transform, request, actionListener) } - override fun onFailure(e: Exception) { - actionListener.onFailure(ExceptionsHelper.unwrapCause(e) as Exception) - } - }, - ) - } + updateTransformJob(transform, request, actionListener) + } + + override fun onFailure(e: Exception) { + actionListener.onFailure(ExceptionsHelper.unwrapCause(e) as Exception) + } + }, + ) } private fun updateTransformJob( @@ -127,7 +128,8 @@ constructor( ), ), ) - client.update( + updateReq.refreshPolicy = WriteRequest.RefreshPolicy.IMMEDIATE + pluginClient.update( updateReq, object : ActionListener { override fun onResponse(response: UpdateResponse) { @@ -152,7 +154,7 @@ constructor( private fun retrieveAndUpdateTransformMetadata(transform: Transform, actionListener: ActionListener) { val req = GetRequest(INDEX_MANAGEMENT_INDEX, transform.metadataId).routing(transform.id) - client.get( + pluginClient.get( req, object : ActionListener { override fun onResponse(response: GetResponse) { @@ -208,7 +210,8 @@ constructor( ), ) .routing(transform.id) - client.update( + updateRequest.refreshPolicy = WriteRequest.RefreshPolicy.IMMEDIATE + pluginClient.update( updateRequest, object : ActionListener { override fun onResponse(response: UpdateResponse) { diff --git a/src/main/kotlin/org/opensearch/indexmanagement/transform/action/stop/TransportStopTransformAction.kt b/src/main/kotlin/org/opensearch/indexmanagement/transform/action/stop/TransportStopTransformAction.kt index 893b651bc..b75d512f2 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/transform/action/stop/TransportStopTransformAction.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/transform/action/stop/TransportStopTransformAction.kt @@ -13,6 +13,7 @@ import org.opensearch.action.get.GetRequest import org.opensearch.action.get.GetResponse import org.opensearch.action.support.ActionFilters import org.opensearch.action.support.HandledTransportAction +import org.opensearch.action.support.WriteRequest.RefreshPolicy import org.opensearch.action.support.clustermanager.AcknowledgedResponse import org.opensearch.action.update.UpdateRequest import org.opensearch.action.update.UpdateResponse @@ -33,6 +34,7 @@ import org.opensearch.indexmanagement.opensearchapi.parseWithType import org.opensearch.indexmanagement.settings.IndexManagementSettings import org.opensearch.indexmanagement.transform.model.Transform import org.opensearch.indexmanagement.transform.model.TransformMetadata +import org.opensearch.indexmanagement.util.PluginClient import org.opensearch.indexmanagement.util.SecurityUtils.Companion.buildUser import org.opensearch.indexmanagement.util.SecurityUtils.Companion.userHasPermissionForResource import org.opensearch.tasks.Task @@ -53,6 +55,7 @@ import java.time.Instant * The inverse (job: successful and metadata: fail) will end up with a disabled job and a metadata that potentially * says STARTED still which is wrong. */ +@Suppress("LongParameterList") class TransportStopTransformAction @Inject constructor( @@ -62,6 +65,7 @@ constructor( val clusterService: ClusterService, actionFilters: ActionFilters, val xContentRegistry: NamedXContentRegistry, + val pluginClient: PluginClient, ) : HandledTransportAction( StopTransformAction.NAME, transportService, actionFilters, ::StopTransformRequest, ) { @@ -84,41 +88,39 @@ constructor( ) val getRequest = GetRequest(INDEX_MANAGEMENT_INDEX, request.id) val user = buildUser(client.threadPool().threadContext) - client.threadPool().threadContext.stashContext().use { - client.get( - getRequest, - object : ActionListener { - @Suppress("ReturnCount") - override fun onResponse(response: GetResponse) { - if (!response.isExists) { - actionListener.onFailure(OpenSearchStatusException("Transform not found", RestStatus.NOT_FOUND)) - return - } - - val transform: Transform? - try { - transform = parseFromGetResponse(response, xContentRegistry, Transform.Companion::parse) - } catch (e: IllegalArgumentException) { - actionListener.onFailure(OpenSearchStatusException("Transform not found", RestStatus.NOT_FOUND)) - return - } + pluginClient.get( + getRequest, + object : ActionListener { + @Suppress("ReturnCount") + override fun onResponse(response: GetResponse) { + if (!response.isExists) { + actionListener.onFailure(OpenSearchStatusException("Transform not found", RestStatus.NOT_FOUND)) + return + } - if (!userHasPermissionForResource(user, transform.user, filterByEnabled, "transform", transform.id, actionListener)) { - return - } - if (transform.metadataId != null) { - retrieveAndUpdateTransformMetadata(transform, request, actionListener) - } else { - updateTransformJob(transform, request, actionListener) - } + val transform: Transform? + try { + transform = parseFromGetResponse(response, xContentRegistry, Transform.Companion::parse) + } catch (e: IllegalArgumentException) { + actionListener.onFailure(OpenSearchStatusException("Transform not found", RestStatus.NOT_FOUND)) + return } - override fun onFailure(e: Exception) { - actionListener.onFailure(ExceptionsHelper.unwrapCause(e) as Exception) + if (!userHasPermissionForResource(user, transform.user, filterByEnabled, "transform", transform.id, actionListener)) { + return } - }, - ) - } + if (transform.metadataId != null) { + retrieveAndUpdateTransformMetadata(transform, request, actionListener) + } else { + updateTransformJob(transform, request, actionListener) + } + } + + override fun onFailure(e: Exception) { + actionListener.onFailure(ExceptionsHelper.unwrapCause(e) as Exception) + } + }, + ) } private fun retrieveAndUpdateTransformMetadata( @@ -127,7 +129,7 @@ constructor( actionListener: ActionListener, ) { val req = GetRequest(IndexManagementPlugin.INDEX_MANAGEMENT_INDEX, transform.metadataId).routing(transform.id) - client.get( + pluginClient.get( req, object : ActionListener { override fun onResponse(response: GetResponse) { @@ -192,7 +194,8 @@ constructor( ), ) .routing(transform.id) - client.update( + updateRequest.refreshPolicy = RefreshPolicy.IMMEDIATE + pluginClient.update( updateRequest, object : ActionListener { override fun onResponse(response: UpdateResponse) { @@ -213,6 +216,7 @@ constructor( private fun updateTransformJob(transform: Transform, request: StopTransformRequest, actionListener: ActionListener) { val now = Instant.now().toEpochMilli() val updateReq = UpdateRequest(IndexManagementPlugin.INDEX_MANAGEMENT_INDEX, request.id) + updateReq.refreshPolicy = RefreshPolicy.IMMEDIATE updateReq.setIfSeqNo(transform.seqNo).setIfPrimaryTerm(transform.primaryTerm) .doc( mapOf( @@ -223,7 +227,7 @@ constructor( ), ), ) - client.update( + pluginClient.update( updateReq, object : ActionListener { override fun onResponse(response: UpdateResponse) { diff --git a/src/main/kotlin/org/opensearch/indexmanagement/util/PluginClient.kt b/src/main/kotlin/org/opensearch/indexmanagement/util/PluginClient.kt new file mode 100644 index 000000000..e14e2bb0e --- /dev/null +++ b/src/main/kotlin/org/opensearch/indexmanagement/util/PluginClient.kt @@ -0,0 +1,67 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ +package org.opensearch.indexmanagement.util + +import org.apache.logging.log4j.LogManager +import org.apache.logging.log4j.Logger +import org.opensearch.action.ActionRequest +import org.opensearch.action.ActionType +import org.opensearch.core.action.ActionListener +import org.opensearch.core.action.ActionResponse +import org.opensearch.identity.Subject +import org.opensearch.transport.client.Client +import org.opensearch.transport.client.FilterClient + +/** + * A special client for executing transport actions as this plugin's system subject. + */ +@OpenForTesting +class PluginClient : FilterClient { + private var subject: Subject? = null + + constructor(delegate: Client) : super(delegate) + + constructor(delegate: Client, subject: Subject?) : super(delegate) { + this.subject = subject + } + + fun setSubject(subject: Subject?) { + this.subject = subject + } + + override fun doExecute( + action: ActionType, + request: Request, + listener: ActionListener, + ) { + checkNotNull(subject) { "RunAsSubjectClient is not initialized." } + threadPool().threadContext.newStoredContext(false).use { ctx -> + subject!!.runAs { + Companion.logger.info( + "Running transport action with subject: {}", + subject!!.principal.name, + ) + super.doExecute( + action, request, + ActionListener.runBefore( + listener, + ) { ctx.restore() }, + ) + } + } + } + + fun innerClient(): Client = super.`in`() + + companion object { + private val logger: Logger = LogManager.getLogger( + PluginClient::class.java, + ) + } +} diff --git a/src/main/resources/plugin-additional-permissions.yml b/src/main/resources/plugin-additional-permissions.yml new file mode 100644 index 000000000..019e8ed4b --- /dev/null +++ b/src/main/resources/plugin-additional-permissions.yml @@ -0,0 +1,13 @@ +cluster_permissions: + - "cluster:monitor/state" + - "cluster:admin/opendistro/rollup/explain" + - "cluster:admin/opendistro/rollup/index" + - "indices:data/read/mget" +index_permissions: + - index_patterns: + - "*" + allowed_actions: + - "indices:data/read/search*" + - "indices:admin/settings/update" + - "indices:admin/opensearch/ism/managedindex" + # - "indices:admin/delete" diff --git a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/IndexStateManagementRestTestCase.kt b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/IndexStateManagementRestTestCase.kt index 2992a2ab2..b81c8509a 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/IndexStateManagementRestTestCase.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/IndexStateManagementRestTestCase.kt @@ -77,7 +77,7 @@ import org.opensearch.test.OpenSearchTestCase import java.io.IOException import java.time.Duration import java.time.Instant -import java.util.Locale +import java.util.* abstract class IndexStateManagementRestTestCase : IndexManagementRestTestCase() { @After @@ -304,7 +304,10 @@ abstract class IndexStateManagementRestTestCase : IndexManagementRestTestCase() } protected fun removePolicyFromIndex(index: String) { - client().makeRequest("POST", "/_opendistro/_ism/remove/$index") + val request = Request("POST", "/_opendistro/_ism/remove/$index") + val statusCodes = mapOf("ignore" to "404") + request.addParameters(statusCodes) + client().performRequest(request) } protected fun getPolicyIDOfManagedIndex(index: String): String? { diff --git a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/coordinator/ManagedIndexCoordinatorTests.kt b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/coordinator/ManagedIndexCoordinatorTests.kt index b86c5ee15..6c280bde1 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/coordinator/ManagedIndexCoordinatorTests.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/coordinator/ManagedIndexCoordinatorTests.kt @@ -19,6 +19,7 @@ import org.opensearch.indexmanagement.IndexManagementIndices import org.opensearch.indexmanagement.indexstatemanagement.IndexMetadataProvider import org.opensearch.indexmanagement.indexstatemanagement.ManagedIndexCoordinator import org.opensearch.indexmanagement.indexstatemanagement.settings.ManagedIndexSettings +import org.opensearch.indexmanagement.util.PluginClient import org.opensearch.search.SearchModule import org.opensearch.test.ClusterServiceUtils import org.opensearch.test.OpenSearchTestCase @@ -38,11 +39,13 @@ class ManagedIndexCoordinatorTests : OpenSearchAllocationTestCase() { private lateinit var indexMetadataProvider: IndexMetadataProvider private lateinit var discoveryNode: DiscoveryNode + private lateinit var pluginClient: PluginClient @Before @Throws(Exception::class) fun setup() { client = Mockito.mock(Client::class.java) + pluginClient = Mockito.mock(PluginClient::class.java) threadPool = Mockito.mock(ThreadPool::class.java) indexManagementIndices = Mockito.mock(IndexManagementIndices::class.java) @@ -69,7 +72,7 @@ class ManagedIndexCoordinatorTests : OpenSearchAllocationTestCase() { indexMetadataProvider = IndexMetadataProvider(settings, client, clusterService, mutableMapOf()) coordinator = ManagedIndexCoordinator( - settings, client, clusterService, threadPool, indexManagementIndices, indexMetadataProvider, + settings, pluginClient, clusterService, threadPool, indexManagementIndices, indexMetadataProvider, NamedXContentRegistry(SearchModule(Settings.EMPTY, emptyList()).namedXContents), ) } diff --git a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/resthandler/RestExplainActionIT.kt b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/resthandler/RestExplainActionIT.kt index e364b60fb..b8958930d 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/resthandler/RestExplainActionIT.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/resthandler/RestExplainActionIT.kt @@ -548,7 +548,7 @@ class RestExplainActionIT : IndexStateManagementRestTestCase() { updateManagedIndexConfigStartTime(managedIndexConfig1) updateManagedIndexConfigStartTime(managedIndexConfig2) - waitFor { + waitFor(timeout = Instant.ofEpochSecond(60)) { val filterPolicy = ExplainFilter( actionType = "delete", diff --git a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/AttemptStopReplicationStepTests.kt b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/AttemptStopReplicationStepTests.kt index ee90923e6..039dc8d3e 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/AttemptStopReplicationStepTests.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/AttemptStopReplicationStepTests.kt @@ -7,6 +7,7 @@ package org.opensearch.indexmanagement.indexstatemanagement.step import com.nhaarman.mockitokotlin2.any import com.nhaarman.mockitokotlin2.doAnswer +import com.nhaarman.mockitokotlin2.doReturn import com.nhaarman.mockitokotlin2.mock import com.nhaarman.mockitokotlin2.whenever import kotlinx.coroutines.runBlocking @@ -19,6 +20,7 @@ import org.opensearch.indexmanagement.indexstatemanagement.step.stopreplication. import org.opensearch.indexmanagement.spi.indexstatemanagement.Step import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ManagedIndexMetaData import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepContext +import org.opensearch.indexmanagement.util.PluginClient import org.opensearch.jobscheduler.spi.utils.LockService import org.opensearch.script.ScriptService import org.opensearch.test.OpenSearchTestCase @@ -151,15 +153,32 @@ class AttemptStopReplicationStepTests : OpenSearchTestCase() { } // Returns a mocked instance of NodeClient and customizes the behavior of execute() - private fun getClient(ack: Boolean, exception: Boolean): NodeClient = mock { - doAnswer { invocationOnMock -> - val listener = invocationOnMock.getArgument>(2) - if (exception) { - listener.onFailure(java.lang.Exception()) - } else { - listener.onResponse(AcknowledgedResponse(ack)) - } - null - }.whenever(this.mock).execute(any(), any(), any()) + private fun getClient(ack: Boolean, exception: Boolean): PluginClient = mock { + // Mock the NodeClient that innerClient should return + val inner: NodeClient = mock { + doAnswer { inv -> + val listener = inv.getArgument>(2) + if (exception) { + listener.onFailure(Exception()) + } else { + listener.onResponse(AcknowledgedResponse(ack)) + } + null + }.whenever(this.mock).execute(any(), any(), any()) + } + + // Mock PluginClient, return the inner NodeClient, and (optionally) also stub execute here + return mock { + on { innerClient() } doReturn inner + doAnswer { inv -> + val listener = inv.getArgument>(2) + if (exception) { + listener.onFailure(Exception()) + } else { + listener.onResponse(AcknowledgedResponse(ack)) + } + null + }.whenever(this.mock).execute(any(), any(), any()) + } } } diff --git a/src/test/kotlin/org/opensearch/indexmanagement/rollup/resthandler/RestStopRollupActionIT.kt b/src/test/kotlin/org/opensearch/indexmanagement/rollup/resthandler/RestStopRollupActionIT.kt index 2c5d173d2..a290fcc01 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/rollup/resthandler/RestStopRollupActionIT.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/rollup/resthandler/RestStopRollupActionIT.kt @@ -6,7 +6,9 @@ package org.opensearch.indexmanagement.rollup.resthandler import org.opensearch.client.Request +import org.opensearch.client.RequestOptions import org.opensearch.client.ResponseException +import org.opensearch.client.WarningsHandler import org.opensearch.common.settings.Settings import org.opensearch.core.rest.RestStatus import org.opensearch.indexmanagement.IndexManagementIndices @@ -260,6 +262,9 @@ class RestStopRollupActionIT : RollupRestAPITestCase() { fun `test stop rollup when multiple shards configured for IM config index`() { // setup ism-config index with multiple primary shards val deleteISMIndexRequest = Request("DELETE", "/$INDEX_MANAGEMENT_INDEX") + val options = RequestOptions.DEFAULT.toBuilder() + options.setWarningsHandler(WarningsHandler.PERMISSIVE) + deleteISMIndexRequest.options = options.build() adminClient().performRequest(deleteISMIndexRequest) val mapping = IndexManagementIndices.indexManagementMappings.trim().trimStart('{').trimEnd('}') val settings = diff --git a/src/test/kotlin/org/opensearch/indexmanagement/snapshotmanagement/SMRunnerIT.kt b/src/test/kotlin/org/opensearch/indexmanagement/snapshotmanagement/SMRunnerIT.kt index 45a151e21..cf7863311 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/snapshotmanagement/SMRunnerIT.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/snapshotmanagement/SMRunnerIT.kt @@ -12,6 +12,7 @@ import org.opensearch.indexmanagement.snapshotmanagement.model.SMMetadata import org.opensearch.indexmanagement.waitFor import org.opensearch.jobscheduler.spi.schedule.CronSchedule import org.opensearch.jobscheduler.spi.schedule.IntervalSchedule +import java.time.Instant import java.time.Instant.now import java.time.temporal.ChronoUnit @@ -40,7 +41,7 @@ class SMRunnerIT : SnapshotManagementRestTestCase() { // Create condition met updateSMPolicyStartTime(smPolicy) updateSMMetadata(getSMPolicy(smPolicy.policyName)) - waitFor { + waitFor(timeout = Instant.ofEpochSecond(180)) { val explainMetadata = parseExplainResponse(explainSMPolicy(policyName).entity.content).first() assertNotNull(explainMetadata.creation!!.started) assertEquals(SMMetadata.LatestExecution.Status.IN_PROGRESS, explainMetadata.creation.latestExecution!!.status)