diff --git a/orca-core/src/main/java/com/netflix/spinnaker/orca/pipeline/persistence/ExecutionRepository.java b/orca-core/src/main/java/com/netflix/spinnaker/orca/pipeline/persistence/ExecutionRepository.java index 393872e9f2..d454fb2cbe 100644 --- a/orca-core/src/main/java/com/netflix/spinnaker/orca/pipeline/persistence/ExecutionRepository.java +++ b/orca-core/src/main/java/com/netflix/spinnaker/orca/pipeline/persistence/ExecutionRepository.java @@ -72,6 +72,14 @@ default void updateStatus(@Nonnull PipelineExecution execution) { PipelineExecution retrieve(@Nonnull ExecutionType type, @Nonnull String id) throws ExecutionNotFoundException; + @Nonnull + default PipelineExecution retrieve( + @Nonnull ExecutionType type, @Nonnull String id, boolean includeNestedExecutions) + throws ExecutionNotFoundException { + // Default behavior: ignore `includeNestedExecutions` + return retrieve(type, id); + } + void delete(@Nonnull ExecutionType type, @Nonnull String id); void delete(@Nonnull ExecutionType type, @Nonnull List idsToDelete); @@ -92,6 +100,15 @@ Observable retrieve( Observable retrievePipelinesForPipelineConfigId( @Nonnull String pipelineConfigId, @Nonnull ExecutionCriteria criteria); + @Nonnull + default Observable retrievePipelinesForPipelineConfigId( + @Nonnull String pipelineConfigId, + @Nonnull ExecutionCriteria criteria, + Boolean includeNestedExecutions) { + // Default behavior: ignore `includeNestedExecutions` + return retrievePipelinesForPipelineConfigId(pipelineConfigId, criteria); + } + @Nonnull Collection retrievePipelineConfigIdsForApplication(@Nonnull String application); diff --git a/orca-queue-sql/src/test/kotlin/com/netflix/spinnaker/orca/q/sql/SqlQueuePipelineRefIntegrationTest.kt b/orca-queue-sql/src/test/kotlin/com/netflix/spinnaker/orca/q/sql/SqlQueuePipelineRefIntegrationTest.kt new file mode 100644 index 0000000000..3a50500d08 --- /dev/null +++ b/orca-queue-sql/src/test/kotlin/com/netflix/spinnaker/orca/q/sql/SqlQueuePipelineRefIntegrationTest.kt @@ -0,0 +1,211 @@ +/* + * Copyright 2025 Harness, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.netflix.spinnaker.orca.q.sql + +import com.fasterxml.jackson.databind.DeserializationFeature +import com.fasterxml.jackson.databind.ObjectMapper +import com.fasterxml.jackson.databind.module.SimpleModule +import com.fasterxml.jackson.module.kotlin.KotlinModule +import com.netflix.spectator.api.Registry +import com.netflix.spinnaker.config.ExecutionCompressionProperties +import com.netflix.spinnaker.config.ObjectMapperSubtypeProperties +import com.netflix.spinnaker.config.OrcaSqlProperties +import com.netflix.spinnaker.config.SpringObjectMapperConfigurer +import com.netflix.spinnaker.kork.dynamicconfig.DynamicConfigService +import com.netflix.spinnaker.kork.jedis.RedisClientDelegate +import com.netflix.spinnaker.kork.jedis.RedisClientSelector +import com.netflix.spinnaker.kork.sql.config.RetryProperties +import com.netflix.spinnaker.kork.sql.config.SqlProperties +import com.netflix.spinnaker.kork.sql.config.SqlRetryProperties +import com.netflix.spinnaker.kork.sql.test.SqlTestUtil +import com.netflix.spinnaker.orca.TaskResolver +import com.netflix.spinnaker.orca.api.pipeline.models.ExecutionType +import com.netflix.spinnaker.orca.config.JedisConfiguration +import com.netflix.spinnaker.orca.config.RedisConfiguration +import com.netflix.spinnaker.orca.pipeline.model.support.CustomTriggerDeserializerSupplier +import com.netflix.spinnaker.orca.pipeline.model.support.TriggerDeserializer +import com.netflix.spinnaker.orca.pipeline.persistence.ExecutionRepository +import com.netflix.spinnaker.orca.q.PipelineRefQueueIntegrationTest +import com.netflix.spinnaker.orca.q.TestConfig +import com.netflix.spinnaker.orca.q.migration.ExecutionTypeDeserializer +import com.netflix.spinnaker.orca.q.migration.TaskTypeDeserializer +import com.netflix.spinnaker.orca.q.sql.pending.SqlPendingExecutionService +import com.netflix.spinnaker.orca.sql.PipelineRefTriggerDeserializerSupplier +import com.netflix.spinnaker.orca.sql.pipeline.persistence.SqlExecutionRepository +import com.netflix.spinnaker.orca.test.redis.EmbeddedRedisConfiguration +import com.netflix.spinnaker.q.Queue +import com.netflix.spinnaker.q.metrics.EventPublisher +import com.netflix.spinnaker.q.metrics.MonitorableQueue +import com.netflix.spinnaker.q.sql.SqlQueue +import de.huxhorn.sulky.ulid.ULID +import org.jooq.DSLContext +import org.junit.jupiter.api.extension.ExtendWith +import org.springframework.boot.test.context.SpringBootTest +import org.springframework.boot.test.mock.mockito.MockBean +import org.springframework.context.annotation.Bean +import org.springframework.context.annotation.Configuration +import org.springframework.test.context.junit.jupiter.SpringExtension +import java.time.Clock +import java.time.Duration +import java.util.Optional +import javax.sql.DataSource + +@Configuration +class SqlPipelineRefTestConfig { + @Bean + fun jooq(): DSLContext { + val testDatabase = SqlTestUtil.initTcMysqlDatabase() + return testDatabase.context + } + + @Bean + fun sqlQueueObjectMapper( + mapper: ObjectMapper, + objectMapperSubtypeProperties: ObjectMapperSubtypeProperties, + taskResolver: TaskResolver + ): ObjectMapper { + return mapper.apply { + registerModule(KotlinModule.Builder().build()) + registerModule( + SimpleModule() + .addDeserializer(ExecutionType::class.java, ExecutionTypeDeserializer()) + .addDeserializer(Class::class.java, TaskTypeDeserializer(taskResolver)) + ) + disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES) + + SpringObjectMapperConfigurer( + objectMapperSubtypeProperties.apply { + messagePackages = messagePackages + listOf("com.netflix.spinnaker.orca.q") + attributePackages = attributePackages + listOf("com.netflix.spinnaker.orca.q") + } + ).registerSubtypes(this) + } + } + + @Bean + fun queue( + jooq: DSLContext, + clock: Clock, + mapper: ObjectMapper, + publisher: EventPublisher + ): MonitorableQueue = + SqlQueue( + "test", + 1, + jooq, + clock, + 1, + mapper, Optional.empty(), + Duration.ofSeconds(1), + emptyList(), + true, + publisher, + SqlRetryProperties(), + ULID() + ) + + @Bean + fun sqlExecutionRepository( + dsl: DSLContext, + mapper: ObjectMapper, + registry: Registry, + properties: SqlProperties, + orcaSqlProperties: OrcaSqlProperties, + compressionProperties: ExecutionCompressionProperties, + dataSource: DataSource + ) = SqlExecutionRepository( + orcaSqlProperties.partitionName, + dsl, + mapper, + properties.retries.transactions, + orcaSqlProperties.batchReadSize, + orcaSqlProperties.stageReadSize, + interlink = null, + compressionProperties = compressionProperties, + pipelineRefEnabled = true, + dataSource = dataSource + ) + + @Bean + fun pipelineRefTriggerDeserializer(): CustomTriggerDeserializerSupplier { + val customTrigger = PipelineRefTriggerDeserializerSupplier(pipelineRefEnabled = true) + TriggerDeserializer.customTriggerSuppliers.add(customTrigger) + return customTrigger + } + + @Bean + fun pendingExecutionService( + jooq: DSLContext, + queue: Queue, + repository: ExecutionRepository, + mapper: ObjectMapper, + clock: Clock, + registry: Registry + ) = + SqlPendingExecutionService( + "test", + jooq, + queue, + repository, + mapper, + clock, + registry, + RetryProperties(), + 5 + ) + + @Bean + fun orcaSqlProperties(): OrcaSqlProperties { + return OrcaSqlProperties() + } + + // TODO: remove this once Redis is no longer needed for distributed locking + @Bean + fun redisClientSelector(redisClientDelegates: List) = + RedisClientSelector(redisClientDelegates) +} + +@ExtendWith(SpringExtension::class) +@SpringBootTest( + classes = [ + SqlTestConfig::class, + SqlProperties::class, + ExecutionCompressionProperties::class, + TestConfig::class, + DynamicConfigService.NoopDynamicConfig::class, + EmbeddedRedisConfiguration::class, + JedisConfiguration::class, + RedisConfiguration::class + ], + properties = [ + "queue.retry.delay.ms=10", + "logging.level.root=ERROR", + "logging.level.org.springframework.test=ERROR", + "logging.level.com.netflix.spinnaker=FATAL", + "execution-repository.sql.enabled=true", + "execution-repository.redis.enabled=false", + "keiko.queue.redis.enabled=false", + "keiko.queue.sql.enabled=true", + "keiko.queue.fillExecutorEachCycle=false", + "sql.enabled=true", + "spring.application.name=orcaTest" + ] +) +class SqlQueuePipelineRefIntegrationTest: PipelineRefQueueIntegrationTest() { + @MockBean + var dataSource: DataSource? = null +} diff --git a/orca-queue-tck/orca-queue-tck.gradle b/orca-queue-tck/orca-queue-tck.gradle index 817660734a..092d580bbf 100644 --- a/orca-queue-tck/orca-queue-tck.gradle +++ b/orca-queue-tck/orca-queue-tck.gradle @@ -31,6 +31,7 @@ dependencies { implementation(project(":keiko-spring")) implementation("junit:junit") implementation("org.springframework:spring-test") + implementation project(":orca-sql") implementation("org.jetbrains.spek:spek-api") implementation("org.jetbrains.spek:spek-subject-extension") diff --git a/orca-queue-tck/src/main/kotlin/com/netflix/spinnaker/orca/q/PipelineRefIntegrationTest.kt b/orca-queue-tck/src/main/kotlin/com/netflix/spinnaker/orca/q/PipelineRefIntegrationTest.kt new file mode 100644 index 0000000000..ba16a3f74c --- /dev/null +++ b/orca-queue-tck/src/main/kotlin/com/netflix/spinnaker/orca/q/PipelineRefIntegrationTest.kt @@ -0,0 +1,365 @@ +/* + * Copyright 2025 Harness, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.netflix.spinnaker.orca.q + +import com.netflix.spectator.api.NoopRegistry +import com.netflix.spectator.api.Registry +import com.netflix.spinnaker.config.OrcaQueueConfiguration +import com.netflix.spinnaker.config.QueueConfiguration +import com.netflix.spinnaker.kork.discovery.DiscoveryStatusChangeEvent +import com.netflix.spinnaker.kork.discovery.InstanceStatus +import com.netflix.spinnaker.kork.discovery.RemoteStatusChangedEvent +import com.netflix.spinnaker.orca.api.pipeline.CancellableStage +import com.netflix.spinnaker.orca.api.pipeline.SkippableTask +import com.netflix.spinnaker.orca.api.pipeline.SyntheticStageOwner +import com.netflix.spinnaker.orca.api.pipeline.TaskResult +import com.netflix.spinnaker.orca.api.pipeline.graph.StageDefinitionBuilder +import com.netflix.spinnaker.orca.api.pipeline.graph.StageGraphBuilder +import com.netflix.spinnaker.orca.api.pipeline.graph.TaskNode +import com.netflix.spinnaker.orca.api.pipeline.models.ExecutionStatus +import com.netflix.spinnaker.orca.api.pipeline.models.ExecutionType +import com.netflix.spinnaker.orca.api.pipeline.models.StageExecution +import com.netflix.spinnaker.orca.api.test.pipeline +import com.netflix.spinnaker.orca.api.test.stage +import com.netflix.spinnaker.orca.config.OrcaConfiguration +import com.netflix.spinnaker.orca.exceptions.DefaultExceptionHandler +import com.netflix.spinnaker.orca.ext.withTask +import com.netflix.spinnaker.orca.listeners.DelegatingApplicationEventMulticaster +import com.netflix.spinnaker.orca.pipeline.RestrictExecutionDuringTimeWindow +import com.netflix.spinnaker.orca.pipeline.StageExecutionFactory +import com.netflix.spinnaker.orca.pipeline.model.DefaultTrigger +import com.netflix.spinnaker.orca.pipeline.model.PipelineTrigger +import com.netflix.spinnaker.orca.pipeline.persistence.ExecutionRepository +import com.netflix.spinnaker.orca.pipeline.util.ContextParameterProcessor +import com.netflix.spinnaker.orca.pipeline.util.StageNavigator +import com.netflix.spinnaker.orca.sql.pipeline.persistence.PipelineRefTrigger +import com.netflix.spinnaker.q.DeadMessageCallback +import com.netflix.spinnaker.q.Queue +import com.netflix.spinnaker.q.memory.InMemoryQueue +import com.netflix.spinnaker.q.metrics.EventPublisher +import com.nhaarman.mockito_kotlin.any +import com.nhaarman.mockito_kotlin.doReturn +import com.nhaarman.mockito_kotlin.mock +import com.nhaarman.mockito_kotlin.reset +import com.nhaarman.mockito_kotlin.whenever +import org.assertj.core.api.Assertions.assertThat +import org.junit.jupiter.api.AfterEach +import org.junit.jupiter.api.BeforeEach +import org.junit.jupiter.api.Test +import org.junit.jupiter.api.extension.ExtendWith +import org.springframework.beans.factory.annotation.Autowired +import org.springframework.beans.factory.annotation.Qualifier +import org.springframework.beans.factory.annotation.Value +import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean +import org.springframework.boot.autoconfigure.context.PropertyPlaceholderAutoConfiguration +import org.springframework.boot.test.context.SpringBootTest +import org.springframework.context.ConfigurableApplicationContext +import org.springframework.context.annotation.Bean +import org.springframework.context.annotation.Configuration +import org.springframework.context.annotation.Import +import org.springframework.context.event.ApplicationEventMulticaster +import org.springframework.context.event.SimpleApplicationEventMulticaster +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor +import org.springframework.test.context.junit.jupiter.SpringExtension +import java.time.Clock +import java.time.Duration +import java.time.ZoneId + +@SpringBootTest( + classes = [TestConfig::class], + properties = ["queue.retry.delay.ms=10"] +) +@ExtendWith(SpringExtension::class) +abstract class PipelineRefQueueIntegrationTest { + @Autowired + lateinit var queue: Queue + @Autowired + lateinit var runner: QueueExecutionRunner + @Autowired + lateinit var repository: ExecutionRepository + @Autowired + lateinit var dummyTask: DummyTask + @Autowired + lateinit var context: ConfigurableApplicationContext + + @Value("\${tasks.execution-window.timezone:America/Los_Angeles}") + lateinit var timeZoneId: String + private val timeZone by lazy { ZoneId.of(timeZoneId) } + + @BeforeEach + fun discoveryUp() { + context.publishEvent(RemoteStatusChangedEvent(DiscoveryStatusChangeEvent(InstanceStatus.STARTING, InstanceStatus.UP))) + } + + @AfterEach + fun discoveryDown() { + context.publishEvent(RemoteStatusChangedEvent(DiscoveryStatusChangeEvent(InstanceStatus.UP, InstanceStatus.OUT_OF_SERVICE))) + } + + @AfterEach + fun resetMocks() { + reset(dummyTask) + whenever(dummyTask.extensionClass) doReturn dummyTask::class.java + whenever(dummyTask.getDynamicTimeout(any())) doReturn 2000L + whenever(dummyTask.isEnabledPropertyName) doReturn SkippableTask.isEnabledPropertyName(dummyTask.javaClass.simpleName) + } + + @Test + fun `can run a simple pipeline with pipelineRef and includedNestedExecution flag works as expected`() { + val parentPipeline = pipeline { + application = "spinnaker" + stage { + refId = "1" + type = "dummy" + } + } + val childPipeline = pipeline { + application = "spinnaker" + trigger = PipelineRefTrigger(parentExecutionId = parentPipeline.id) + stage { + refId = "1" + type = "dummy" + } + } + + repository.store(parentPipeline) + repository.store(childPipeline) + + whenever(dummyTask.execute(any())) doReturn TaskResult.SUCCEEDED + + context.runToCompletion(parentPipeline, runner::start, repository) + + assertThat(repository.retrieve(ExecutionType.PIPELINE, parentPipeline.id, true).status) + .isEqualTo(ExecutionStatus.SUCCEEDED) + + context.runToCompletion(childPipeline, runner::start, repository) + + val childExecutionWithNestedExecutions = repository.retrieve(ExecutionType.PIPELINE, childPipeline.id, true) + assertThat(childExecutionWithNestedExecutions.status).isEqualTo(ExecutionStatus.SUCCEEDED) + assertThat(childExecutionWithNestedExecutions.trigger) + .isInstanceOf(PipelineTrigger::class.java) + .extracting("parentExecution.id") + .isEqualTo(parentPipeline.id) + + val childExecutionWithoutNestedExecutions = repository.retrieve(ExecutionType.PIPELINE, childPipeline.id, false) + assertThat(childExecutionWithoutNestedExecutions.status).isEqualTo(ExecutionStatus.SUCCEEDED) + assertThat(childExecutionWithoutNestedExecutions.trigger) + .isInstanceOf(PipelineRefTrigger::class.java) + .extracting("parentExecutionId") + .isEqualTo(parentPipeline.id) + } + + @Test + fun `can run a simple pipeline with pipelineTrigger and it is transformed to pipelineRefTrigger`() { + val parentPipeline = pipeline { + application = "spinnaker" + stage { + refId = "1" + type = "dummy" + } + } + val childPipeline = pipeline { + application = "spinnaker" + trigger = PipelineTrigger(parentExecution = parentPipeline) + stage { + refId = "1" + type = "dummy" + } + } + + repository.store(parentPipeline) + repository.store(childPipeline) + + whenever(dummyTask.execute(any())) doReturn TaskResult.SUCCEEDED + + context.runToCompletion(parentPipeline, runner::start, repository) + + assertThat(repository.retrieve(ExecutionType.PIPELINE, parentPipeline.id, true).status) + .isEqualTo(ExecutionStatus.SUCCEEDED) + + context.runToCompletion(childPipeline, runner::start, repository) + + val childExecutionWithNestedExecutions = repository.retrieve(ExecutionType.PIPELINE, childPipeline.id, true) + assertThat(childExecutionWithNestedExecutions.status).isEqualTo(ExecutionStatus.SUCCEEDED) + assertThat(childExecutionWithNestedExecutions.trigger) + .isInstanceOf(PipelineTrigger::class.java) + .extracting("parentExecution.id") + .isEqualTo(parentPipeline.id) + + val childExecutionWithoutNestedExecutions = repository.retrieve(ExecutionType.PIPELINE, childPipeline.id, false) + assertThat(childExecutionWithoutNestedExecutions.status).isEqualTo(ExecutionStatus.SUCCEEDED) + assertThat(childExecutionWithoutNestedExecutions.trigger) + .isInstanceOf(PipelineRefTrigger::class.java) + .extracting("parentExecutionId") + .isEqualTo(parentPipeline.id) + } + + @Test + fun `can run a simple pipeline with DefaultTrigger and includeNestedExecutions does not make any iteration`() { + val parentPipeline = pipeline { + application = "spinnaker" + stage { + refId = "1" + type = "dummy" + } + } + val childPipeline = pipeline { + application = "spinnaker" + stage { + refId = "1" + type = "dummy" + } + } + + repository.store(parentPipeline) + repository.store(childPipeline) + + whenever(dummyTask.execute(any())) doReturn TaskResult.SUCCEEDED + + context.runToCompletion(parentPipeline, runner::start, repository) + + assertThat(repository.retrieve(ExecutionType.PIPELINE, parentPipeline.id, false).status) + .isEqualTo(ExecutionStatus.SUCCEEDED) + + context.runToCompletion(childPipeline, runner::start, repository) + + val childExecutionWithNestedExecutions = repository.retrieve(ExecutionType.PIPELINE, childPipeline.id, true) + assertThat(childExecutionWithNestedExecutions.status).isEqualTo(ExecutionStatus.SUCCEEDED) + assertThat(childExecutionWithNestedExecutions.trigger).isInstanceOf(DefaultTrigger::class.java) + + val childExecutionWithoutNestedExecutions = repository.retrieve(ExecutionType.PIPELINE, childPipeline.id, false) + assertThat(childExecutionWithoutNestedExecutions.status).isEqualTo(ExecutionStatus.SUCCEEDED) + assertThat(childExecutionWithoutNestedExecutions.trigger).isInstanceOf(DefaultTrigger::class.java) + } +} + +@Configuration +@Import( + PropertyPlaceholderAutoConfiguration::class, + OrcaConfiguration::class, + QueueConfiguration::class, + StageNavigator::class, + RestrictExecutionDuringTimeWindow::class, + OrcaQueueConfiguration::class +) +class PipelineRefTestConfig { + + @Bean + fun registry(): Registry = NoopRegistry() + + @Bean + fun dummyTask(): DummyTask = mock { + on { extensionClass } doReturn DummyTask::class.java + on { getDynamicTimeout(any()) } doReturn Duration.ofMinutes(2).toMillis() + on { isEnabledPropertyName } doReturn SkippableTask.isEnabledPropertyName(DummyTask::class.java.simpleName) + } + + @Bean + fun dummyStage() = object : StageDefinitionBuilder { + override fun taskGraph(stage: StageExecution, builder: TaskNode.Builder) { + builder.withTask("dummy") + } + + override fun getType() = "dummy" + } + + @Bean + fun parallelStage() = object : StageDefinitionBuilder { + override fun beforeStages(parent: StageExecution, graph: StageGraphBuilder) { + listOf("us-east-1", "us-west-2", "eu-west-1") + .map { region -> + StageExecutionFactory.newStage(parent.execution, "dummy", "dummy $region", parent.context + mapOf("region" to region), parent, SyntheticStageOwner.STAGE_BEFORE) + } + .forEach { graph.add(it) } + } + + override fun getType() = "parallel" + } + + @Bean + fun syntheticFailureStage() = object : StageDefinitionBuilder { + override fun getType() = "syntheticFailure" + + override fun taskGraph(stage: StageExecution, builder: TaskNode.Builder) { + builder.withTask("dummy") + } + + override fun onFailureStages(stage: StageExecution, graph: StageGraphBuilder) { + graph.add { + it.type = "dummy" + it.name = "onFailure1" + it.context = stage.context + } + + graph.add { + it.type = "dummy" + it.name = "onFailure2" + it.context = stage.context + } + } + } + + @Bean + fun pipelineStage(@Autowired repository: ExecutionRepository): StageDefinitionBuilder = + object : CancellableStage, StageDefinitionBuilder { + override fun taskGraph(stage: StageExecution, builder: TaskNode.Builder) { + builder.withTask("dummy") + } + + override fun getType() = "pipeline" + + override fun cancel(stage: StageExecution?): CancellableStage.Result { + repository.cancel(ExecutionType.PIPELINE, stage!!.context["executionId"] as String) + return CancellableStage.Result(stage, mapOf("foo" to "bar")) + } + } + + @Bean + fun currentInstanceId() = "localhost" + + @Bean + fun contextParameterProcessor() = ContextParameterProcessor() + + @Bean + fun defaultExceptionHandler() = DefaultExceptionHandler() + + @Bean + fun deadMessageHandler(): DeadMessageCallback = { _, _ -> } + + @Bean + @ConditionalOnMissingBean(Queue::class) + fun queue( + clock: Clock, + deadMessageHandler: DeadMessageCallback, + publisher: EventPublisher + ) = + InMemoryQueue( + clock = clock, + deadMessageHandlers = listOf(deadMessageHandler), + publisher = publisher + ) + + @Bean + fun applicationEventMulticaster(@Qualifier("applicationEventTaskExecutor") taskExecutor: ThreadPoolTaskExecutor): ApplicationEventMulticaster { + // TODO rz - Add error handlers + val async = SimpleApplicationEventMulticaster() + async.setTaskExecutor(taskExecutor) + val sync = SimpleApplicationEventMulticaster() + + return DelegatingApplicationEventMulticaster(sync, async) + } +} diff --git a/orca-queue/src/main/kotlin/com/netflix/spinnaker/orca/q/handler/CompleteStageHandler.kt b/orca-queue/src/main/kotlin/com/netflix/spinnaker/orca/q/handler/CompleteStageHandler.kt index ea12dac042..7d755d1a73 100644 --- a/orca-queue/src/main/kotlin/com/netflix/spinnaker/orca/q/handler/CompleteStageHandler.kt +++ b/orca-queue/src/main/kotlin/com/netflix/spinnaker/orca/q/handler/CompleteStageHandler.kt @@ -221,7 +221,7 @@ class CompleteStageHandler( } if (hasPlannedStages) { - this.setExecution(repository.retrieve(this.execution.type, this.execution.id)) + this.setExecution(repository.retrieve(this.execution.type, this.execution.id, true)) } } diff --git a/orca-queue/src/main/kotlin/com/netflix/spinnaker/orca/q/handler/OrcaMessageHandler.kt b/orca-queue/src/main/kotlin/com/netflix/spinnaker/orca/q/handler/OrcaMessageHandler.kt index b91a83b9f4..50bbb90bcf 100644 --- a/orca-queue/src/main/kotlin/com/netflix/spinnaker/orca/q/handler/OrcaMessageHandler.kt +++ b/orca-queue/src/main/kotlin/com/netflix/spinnaker/orca/q/handler/OrcaMessageHandler.kt @@ -92,7 +92,7 @@ internal interface OrcaMessageHandler : MessageHandler { fun ExecutionLevel.withExecution(block: (PipelineExecution) -> Unit) = try { - val execution = repository.retrieve(executionType, executionId) + val execution = repository.retrieve(executionType, executionId, true) block.invoke(execution) } catch (e: ExecutionNotFoundException) { queue.push(InvalidExecutionId(this)) diff --git a/orca-queue/src/test/java/com/netflix/spinnaker/orca/q/handler/RunTaskHandlerExceptionHandlerTest.java b/orca-queue/src/test/java/com/netflix/spinnaker/orca/q/handler/RunTaskHandlerExceptionHandlerTest.java index 4c22409bbf..edb18fe28b 100644 --- a/orca-queue/src/test/java/com/netflix/spinnaker/orca/q/handler/RunTaskHandlerExceptionHandlerTest.java +++ b/orca-queue/src/test/java/com/netflix/spinnaker/orca/q/handler/RunTaskHandlerExceptionHandlerTest.java @@ -242,7 +242,7 @@ void setup() { taskId, DummyTask.class); - when(executionRepository.retrieve(PIPELINE, runTaskMessage.getExecutionId())) + when(executionRepository.retrieve(PIPELINE, runTaskMessage.getExecutionId(), true)) .thenReturn(pipeline); } diff --git a/orca-queue/src/test/kotlin/com/netflix/spinnaker/orca/q/handler/AbortStageHandlerTest.kt b/orca-queue/src/test/kotlin/com/netflix/spinnaker/orca/q/handler/AbortStageHandlerTest.kt index 05e35e8290..627f0a4536 100644 --- a/orca-queue/src/test/kotlin/com/netflix/spinnaker/orca/q/handler/AbortStageHandlerTest.kt +++ b/orca-queue/src/test/kotlin/com/netflix/spinnaker/orca/q/handler/AbortStageHandlerTest.kt @@ -34,6 +34,7 @@ import com.netflix.spinnaker.time.fixedClock import com.nhaarman.mockito_kotlin.any import com.nhaarman.mockito_kotlin.check import com.nhaarman.mockito_kotlin.doReturn +import com.nhaarman.mockito_kotlin.eq import com.nhaarman.mockito_kotlin.mock import com.nhaarman.mockito_kotlin.never import com.nhaarman.mockito_kotlin.reset @@ -78,7 +79,7 @@ object AbortStageHandlerTest : SubjectSpek({ val message = AbortStage(pipeline.stageByRef("1")) beforeGroup { - whenever(repository.retrieve(PIPELINE, pipeline.id)) doReturn pipeline + whenever(repository.retrieve(eq(PIPELINE), eq(pipeline.id), any())) doReturn pipeline } afterGroup(::resetMocks) @@ -107,7 +108,7 @@ object AbortStageHandlerTest : SubjectSpek({ val message = AbortStage(pipeline.stageByRef("1")) beforeGroup { - whenever(repository.retrieve(PIPELINE, pipeline.id)) doReturn pipeline + whenever(repository.retrieve(eq(PIPELINE), eq(pipeline.id), any())) doReturn pipeline } afterGroup(::resetMocks) @@ -161,7 +162,7 @@ object AbortStageHandlerTest : SubjectSpek({ val message = AbortStage(pipeline.stageByRef("1<1")) beforeGroup { - whenever(repository.retrieve(PIPELINE, pipeline.id)) doReturn pipeline + whenever(repository.retrieve(eq(PIPELINE), eq(pipeline.id), any())) doReturn pipeline } afterGroup(::resetMocks) diff --git a/orca-queue/src/test/kotlin/com/netflix/spinnaker/orca/q/handler/CancelExecutionHandlerTest.kt b/orca-queue/src/test/kotlin/com/netflix/spinnaker/orca/q/handler/CancelExecutionHandlerTest.kt index 2ed126a297..f92173fee3 100644 --- a/orca-queue/src/test/kotlin/com/netflix/spinnaker/orca/q/handler/CancelExecutionHandlerTest.kt +++ b/orca-queue/src/test/kotlin/com/netflix/spinnaker/orca/q/handler/CancelExecutionHandlerTest.kt @@ -28,8 +28,10 @@ import com.netflix.spinnaker.orca.q.CancelExecution import com.netflix.spinnaker.orca.q.RescheduleExecution import com.netflix.spinnaker.orca.q.ResumeStage import com.netflix.spinnaker.q.Queue +import com.nhaarman.mockito_kotlin.any import com.nhaarman.mockito_kotlin.check import com.nhaarman.mockito_kotlin.doReturn +import com.nhaarman.mockito_kotlin.eq import com.nhaarman.mockito_kotlin.mock import com.nhaarman.mockito_kotlin.reset import com.nhaarman.mockito_kotlin.verify @@ -69,7 +71,7 @@ object CancelExecutionHandlerTest : SubjectSpek({ val message = CancelExecution(pipeline, "fzlem@netflix.com", "because") beforeGroup { - whenever(repository.retrieve(PIPELINE, pipeline.id)) doReturn pipeline + whenever(repository.retrieve(eq(PIPELINE), eq(pipeline.id), any())) doReturn pipeline } afterGroup(::resetMocks) @@ -113,7 +115,7 @@ object CancelExecutionHandlerTest : SubjectSpek({ val message = CancelExecution(pipeline, "fzlem@netflix.com", "because") beforeGroup { - whenever(repository.retrieve(PIPELINE, pipeline.id)) doReturn pipeline + whenever(repository.retrieve(eq(PIPELINE), eq(pipeline.id), any())) doReturn pipeline } afterGroup(::resetMocks) diff --git a/orca-queue/src/test/kotlin/com/netflix/spinnaker/orca/q/handler/CancelStageHandlerTest.kt b/orca-queue/src/test/kotlin/com/netflix/spinnaker/orca/q/handler/CancelStageHandlerTest.kt index f772aab459..c1ac0b11c7 100644 --- a/orca-queue/src/test/kotlin/com/netflix/spinnaker/orca/q/handler/CancelStageHandlerTest.kt +++ b/orca-queue/src/test/kotlin/com/netflix/spinnaker/orca/q/handler/CancelStageHandlerTest.kt @@ -38,6 +38,7 @@ import com.netflix.spinnaker.orca.q.singleTaskStage import com.netflix.spinnaker.q.Queue import com.nhaarman.mockito_kotlin.any import com.nhaarman.mockito_kotlin.doReturn +import com.nhaarman.mockito_kotlin.eq import com.nhaarman.mockito_kotlin.mock import com.nhaarman.mockito_kotlin.never import com.nhaarman.mockito_kotlin.reset @@ -117,7 +118,7 @@ object CancelStageHandlerTest : SubjectSpek({ beforeGroup { whenever(cancellableStage.type) doReturn "cancellable" - whenever(repository.retrieve(PIPELINE, pipeline.id)) doReturn pipeline + whenever(repository.retrieve(eq(PIPELINE), eq(pipeline.id), any())) doReturn pipeline } afterGroup(::resetMocks) @@ -146,7 +147,7 @@ object CancelStageHandlerTest : SubjectSpek({ beforeGroup { whenever(cancellableStage.type) doReturn "cancellable" - whenever(repository.retrieve(pipeline.type, pipeline.id)) doReturn pipeline + whenever(repository.retrieve(eq(PIPELINE), eq(pipeline.id), any())) doReturn pipeline } afterGroup(::resetMocks) diff --git a/orca-queue/src/test/kotlin/com/netflix/spinnaker/orca/q/handler/CompleteExecutionHandlerTest.kt b/orca-queue/src/test/kotlin/com/netflix/spinnaker/orca/q/handler/CompleteExecutionHandlerTest.kt index dedd05574a..797df1df80 100644 --- a/orca-queue/src/test/kotlin/com/netflix/spinnaker/orca/q/handler/CompleteExecutionHandlerTest.kt +++ b/orca-queue/src/test/kotlin/com/netflix/spinnaker/orca/q/handler/CompleteExecutionHandlerTest.kt @@ -83,7 +83,7 @@ object CompleteExecutionHandlerTest : SubjectSpek({ val message = CompleteExecution(pipeline) beforeGroup { - whenever(repository.retrieve(PIPELINE, message.executionId)) doReturn pipeline + whenever(repository.retrieve(eq(PIPELINE), eq(message.executionId), any())) doReturn pipeline } afterGroup(::resetMocks) @@ -125,7 +125,7 @@ object CompleteExecutionHandlerTest : SubjectSpek({ val message = CompleteExecution(pipeline) beforeGroup { - whenever(repository.retrieve(PIPELINE, message.executionId)) doReturn pipeline + whenever(repository.retrieve(eq(PIPELINE), eq(message.executionId), any())) doReturn pipeline } afterGroup(::resetMocks) @@ -159,7 +159,7 @@ object CompleteExecutionHandlerTest : SubjectSpek({ val message = CompleteExecution(pipeline) beforeGroup { - whenever(repository.retrieve(PIPELINE, message.executionId)) doReturn pipeline + whenever(repository.retrieve(eq(PIPELINE), eq(message.executionId), any())) doReturn pipeline } afterGroup(::resetMocks) @@ -203,7 +203,7 @@ object CompleteExecutionHandlerTest : SubjectSpek({ val message = CompleteExecution(pipeline) beforeGroup { - whenever(repository.retrieve(PIPELINE, message.executionId)) doReturn pipeline + whenever(repository.retrieve(eq(PIPELINE), eq(message.executionId), any())) doReturn pipeline } afterGroup(::resetMocks) @@ -255,7 +255,7 @@ object CompleteExecutionHandlerTest : SubjectSpek({ val message = CompleteExecution(pipeline) beforeGroup { - whenever(repository.retrieve(PIPELINE, message.executionId)) doReturn pipeline + whenever(repository.retrieve(eq(PIPELINE), eq(message.executionId), any())) doReturn pipeline } afterGroup(::resetMocks) @@ -304,7 +304,7 @@ object CompleteExecutionHandlerTest : SubjectSpek({ val message = CompleteExecution(pipeline) beforeGroup { - whenever(repository.retrieve(PIPELINE, message.executionId)) doReturn pipeline + whenever(repository.retrieve(eq(PIPELINE), eq(message.executionId), any())) doReturn pipeline } afterGroup(::resetMocks) @@ -359,7 +359,7 @@ object CompleteExecutionHandlerTest : SubjectSpek({ val message = CompleteExecution(pipeline) beforeGroup { - whenever(repository.retrieve(PIPELINE, message.executionId)) doReturn pipeline + whenever(repository.retrieve(eq(PIPELINE), eq(message.executionId), any())) doReturn pipeline } afterGroup(::resetMocks) @@ -415,8 +415,8 @@ object CompleteExecutionHandlerTest : SubjectSpek({ val message2 = CompleteExecution(waitingPipeline) beforeGroup { - whenever(repository.retrieve(PIPELINE, message1.executionId)) doReturn runningPipeline - whenever(repository.retrieve(PIPELINE, message2.executionId)) doReturn waitingPipeline + whenever(repository.retrieve(eq(PIPELINE), eq(message1.executionId), any())) doReturn runningPipeline + whenever(repository.retrieve(eq(PIPELINE), eq(message2.executionId), any())) doReturn waitingPipeline } afterGroup(::resetMocks) @@ -466,8 +466,8 @@ object CompleteExecutionHandlerTest : SubjectSpek({ val message2 = CompleteExecution(runningPipeline2) beforeGroup { - whenever(repository.retrieve(PIPELINE, message1.executionId)) doReturn runningPipeline1 - whenever(repository.retrieve(PIPELINE, message2.executionId)) doReturn runningPipeline2 + whenever(repository.retrieve(eq(PIPELINE), eq(message1.executionId), any())) doReturn runningPipeline1 + whenever(repository.retrieve(eq(PIPELINE), eq(message2.executionId), any())) doReturn runningPipeline2 } afterGroup(::resetMocks) diff --git a/orca-queue/src/test/kotlin/com/netflix/spinnaker/orca/q/handler/CompleteStageHandlerTest.kt b/orca-queue/src/test/kotlin/com/netflix/spinnaker/orca/q/handler/CompleteStageHandlerTest.kt index 58e38e52e1..6a5c5f59e5 100644 --- a/orca-queue/src/test/kotlin/com/netflix/spinnaker/orca/q/handler/CompleteStageHandlerTest.kt +++ b/orca-queue/src/test/kotlin/com/netflix/spinnaker/orca/q/handler/CompleteStageHandlerTest.kt @@ -75,6 +75,7 @@ import com.nhaarman.mockito_kotlin.argumentCaptor import com.nhaarman.mockito_kotlin.atLeastOnce import com.nhaarman.mockito_kotlin.check import com.nhaarman.mockito_kotlin.doReturn +import com.nhaarman.mockito_kotlin.eq import com.nhaarman.mockito_kotlin.isA import com.nhaarman.mockito_kotlin.mock import com.nhaarman.mockito_kotlin.never @@ -203,7 +204,7 @@ object CompleteStageHandlerTest : SubjectSpek({ val message = CompleteStage(pipeline.stageByRef("1")) beforeGroup { - whenever(repository.retrieve(PIPELINE, message.executionId)) doReturn pipeline + whenever(repository.retrieve(eq(PIPELINE), eq(message.executionId), any())) doReturn pipeline } afterGroup(::resetMocks) @@ -232,7 +233,7 @@ object CompleteStageHandlerTest : SubjectSpek({ val message = CompleteStage(pipeline.stageByRef("1")) beforeGroup { - whenever(repository.retrieve(PIPELINE, message.executionId)) doReturn pipeline + whenever(repository.retrieve(eq(PIPELINE), eq(message.executionId), any())) doReturn pipeline } afterGroup(::resetMocks) @@ -288,7 +289,7 @@ object CompleteStageHandlerTest : SubjectSpek({ val message = CompleteStage(pipeline.stageByRef("1")) beforeGroup { - whenever(repository.retrieve(PIPELINE, message.executionId)) doReturn pipeline + whenever(repository.retrieve(eq(PIPELINE), eq(message.executionId), any())) doReturn pipeline } afterGroup(::resetMocks) @@ -345,7 +346,7 @@ object CompleteStageHandlerTest : SubjectSpek({ val message = CompleteStage(pipeline.stageByRef("1")) beforeGroup { - whenever(repository.retrieve(PIPELINE, message.executionId)) doReturn pipeline + whenever(repository.retrieve(eq(PIPELINE), eq(message.executionId), any())) doReturn pipeline } afterGroup(::resetMocks) @@ -380,7 +381,7 @@ object CompleteStageHandlerTest : SubjectSpek({ val message = CompleteStage(pipeline.stageByRef("1")) beforeGroup { - whenever(repository.retrieve(PIPELINE, message.executionId)) doReturn pipeline + whenever(repository.retrieve(eq(PIPELINE), eq(message.executionId), any())) doReturn pipeline } afterGroup(::resetMocks) @@ -413,7 +414,7 @@ object CompleteStageHandlerTest : SubjectSpek({ val message = CompleteStage(pipeline.stageByRef("1")) beforeGroup { - whenever(repository.retrieve(PIPELINE, message.executionId)) doReturn pipeline + whenever(repository.retrieve(eq(PIPELINE), eq(message.executionId), any())) doReturn pipeline } afterGroup(::resetMocks) @@ -443,7 +444,7 @@ object CompleteStageHandlerTest : SubjectSpek({ val message = CompleteStage(pipeline.stageByRef("1")) beforeGroup { - whenever(repository.retrieve(PIPELINE, pipeline.id)) doReturn pipeline + whenever(repository.retrieve(eq(PIPELINE), eq(message.executionId), any())) doReturn pipeline } afterGroup(::resetMocks) @@ -484,7 +485,7 @@ object CompleteStageHandlerTest : SubjectSpek({ val message = CompleteStage(pipeline.stageByRef("1")) beforeGroup { - whenever(repository.retrieve(PIPELINE, pipeline.id)) doReturn pipeline + whenever(repository.retrieve(eq(PIPELINE), eq(message.executionId), any())) doReturn pipeline assertThat(pipeline.stages.map { it.type }).isEqualTo(listOf(stageThatBlowsUpPlanningAfterStages.type)) } @@ -529,7 +530,7 @@ object CompleteStageHandlerTest : SubjectSpek({ beforeGroup { - whenever(repository.retrieve(PIPELINE, message.executionId)) doReturn pipeline + whenever(repository.retrieve(eq(PIPELINE), eq(message.executionId), any())) doReturn pipeline } afterGroup(::resetMocks) @@ -613,7 +614,7 @@ object CompleteStageHandlerTest : SubjectSpek({ ) beforeGroup { - whenever(repository.retrieve(PIPELINE, message.executionId)) doReturn pipeline + whenever(repository.retrieve(eq(PIPELINE), eq(message.executionId), any())) doReturn pipeline } afterGroup(::resetMocks) @@ -663,7 +664,7 @@ object CompleteStageHandlerTest : SubjectSpek({ val message = CompleteStage(pipeline.stageByRef("1")) beforeGroup { - whenever(repository.retrieve(PIPELINE, pipeline.id)) doReturn pipeline + whenever(repository.retrieve(eq(PIPELINE), eq(message.executionId), any())) doReturn pipeline } afterGroup(::resetMocks) @@ -707,7 +708,7 @@ object CompleteStageHandlerTest : SubjectSpek({ val message = CompleteStage(pipeline.stageByRef("1")) beforeGroup { - whenever(repository.retrieve(PIPELINE, message.executionId)) doReturn pipeline + whenever(repository.retrieve(eq(PIPELINE), eq(message.executionId), any())) doReturn pipeline } afterGroup(::resetMocks) @@ -776,7 +777,7 @@ object CompleteStageHandlerTest : SubjectSpek({ val message = CompleteStage(pipeline.stageByRef("1")) beforeGroup { - whenever(repository.retrieve(PIPELINE, message.executionId)) doReturn pipeline + whenever(repository.retrieve(eq(PIPELINE), eq(message.executionId), any())) doReturn pipeline } afterGroup(::resetMocks) @@ -840,7 +841,7 @@ object CompleteStageHandlerTest : SubjectSpek({ val message = CompleteStage(pipeline.stageByRef("1")) beforeGroup { - whenever(repository.retrieve(PIPELINE, message.executionId)) doReturn pipeline + whenever(repository.retrieve(eq(PIPELINE), eq(message.executionId), any())) doReturn pipeline } afterGroup(::resetMocks) @@ -877,7 +878,7 @@ object CompleteStageHandlerTest : SubjectSpek({ .filter { it.syntheticStageOwner == STAGE_BEFORE } .forEach { it.status = SKIPPED } pipeline.stageById(message.stageId).tasks.forEach { it.status = SUCCEEDED } - whenever(repository.retrieve(PIPELINE, message.executionId)) doReturn pipeline + whenever(repository.retrieve(eq(PIPELINE), eq(message.executionId), any())) doReturn pipeline } afterGroup(::resetMocks) @@ -909,7 +910,7 @@ object CompleteStageHandlerTest : SubjectSpek({ val message = CompleteStage(pipeline.stageByRef("1")) beforeGroup { - whenever(repository.retrieve(PIPELINE, message.executionId)) doReturn pipeline + whenever(repository.retrieve(eq(PIPELINE), eq(message.executionId), any())) doReturn pipeline } afterGroup(::resetMocks) @@ -967,7 +968,7 @@ object CompleteStageHandlerTest : SubjectSpek({ .stages .first { it.syntheticStageOwner == syntheticType } .status = failureStatus - whenever(repository.retrieve(PIPELINE, message.executionId)) doReturn pipeline + whenever(repository.retrieve(eq(PIPELINE), eq(message.executionId), any())) doReturn pipeline } afterGroup(::resetMocks) @@ -1009,7 +1010,7 @@ object CompleteStageHandlerTest : SubjectSpek({ .first { it.syntheticStageOwner == syntheticType } .status = FAILED_CONTINUE pipeline.stageById(message.stageId).tasks.forEach { it.status = SUCCEEDED } - whenever(repository.retrieve(PIPELINE, message.executionId)) doReturn pipeline + whenever(repository.retrieve(eq(PIPELINE), eq(message.executionId), any())) doReturn pipeline } afterGroup(::resetMocks) @@ -1051,7 +1052,7 @@ object CompleteStageHandlerTest : SubjectSpek({ .filter { it.syntheticStageOwner == STAGE_AFTER } .forEach { it.status = SUCCEEDED } pipeline.stageById(message.stageId).tasks.forEach { it.status = SUCCEEDED } - whenever(repository.retrieve(PIPELINE, message.executionId)) doReturn pipeline + whenever(repository.retrieve(eq(PIPELINE), eq(message.executionId), any())) doReturn pipeline } afterGroup(::resetMocks) @@ -1107,7 +1108,7 @@ object CompleteStageHandlerTest : SubjectSpek({ val message = CompleteStage(pipeline.stageByRef("2=1")) beforeGroup { - whenever(repository.retrieve(PIPELINE, message.executionId)) doReturn pipeline + whenever(repository.retrieve(eq(PIPELINE), eq(message.executionId), any())) doReturn pipeline } afterGroup(::resetMocks) @@ -1144,7 +1145,7 @@ object CompleteStageHandlerTest : SubjectSpek({ tasks.first().status = SUCCEEDED } - whenever(repository.retrieve(PIPELINE, pipeline.id)) doReturn pipeline + whenever(repository.retrieve(eq(PIPELINE), eq(message.executionId), any())) doReturn pipeline } afterGroup(::resetMocks) @@ -1172,7 +1173,7 @@ object CompleteStageHandlerTest : SubjectSpek({ tasks.first().status = SUCCEEDED } - whenever(repository.retrieve(PIPELINE, pipeline.id)) doReturn pipeline + whenever(repository.retrieve(eq(PIPELINE), eq(message.executionId), any())) doReturn pipeline } afterGroup(::resetMocks) @@ -1214,7 +1215,7 @@ object CompleteStageHandlerTest : SubjectSpek({ tasks.first().status = SUCCEEDED } - whenever(repository.retrieve(PIPELINE, pipeline.id)) doReturn pipeline + whenever(repository.retrieve(eq(PIPELINE), eq(message.executionId), any())) doReturn pipeline } afterGroup(::resetMocks) @@ -1245,7 +1246,7 @@ object CompleteStageHandlerTest : SubjectSpek({ tasks.first().status = SUCCEEDED } - whenever(repository.retrieve(PIPELINE, pipeline.id)) doReturn pipeline + whenever(repository.retrieve(eq(PIPELINE), eq(message.executionId), any())) doReturn pipeline } afterGroup(::resetMocks) @@ -1285,7 +1286,7 @@ object CompleteStageHandlerTest : SubjectSpek({ tasks.first().status = FAILED_CONTINUE } - whenever(repository.retrieve(PIPELINE, message.executionId)) doReturn pipeline + whenever(repository.retrieve(eq(PIPELINE), eq(message.executionId), any())) doReturn pipeline } on("receiving the message") { @@ -1320,7 +1321,7 @@ object CompleteStageHandlerTest : SubjectSpek({ tasks.first().status = FAILED_CONTINUE } - whenever(repository.retrieve(PIPELINE, message.executionId)) doReturn pipeline + whenever(repository.retrieve(eq(PIPELINE), eq(message.executionId), any())) doReturn pipeline } on("receiving the message") { @@ -1354,7 +1355,7 @@ object CompleteStageHandlerTest : SubjectSpek({ tasks.first().status = taskStatus } - whenever(repository.retrieve(PIPELINE, message.executionId)) doReturn pipeline + whenever(repository.retrieve(eq(PIPELINE), eq(message.executionId), any())) doReturn pipeline } on("receiving the message") { @@ -1392,7 +1393,7 @@ object CompleteStageHandlerTest : SubjectSpek({ tasks.first().status = TERMINAL } - whenever(repository.retrieve(PIPELINE, message.executionId)) doReturn pipeline + whenever(repository.retrieve(eq(PIPELINE), eq(message.executionId), any())) doReturn pipeline } on("receiving the message") { @@ -1432,7 +1433,7 @@ object CompleteStageHandlerTest : SubjectSpek({ beforeGroup { pipeline.stageById(message.stageId).status = RUNNING - whenever(repository.retrieve(PIPELINE, pipeline.id)) doReturn pipeline + whenever(repository.retrieve(eq(PIPELINE), eq(message.executionId), any())) doReturn pipeline } afterGroup(::resetMocks) @@ -1469,7 +1470,7 @@ object CompleteStageHandlerTest : SubjectSpek({ pipeline.stageByRef("1<2").status = SUCCEEDED pipeline.stageByRef("1<3").status = SUCCEEDED - whenever(repository.retrieve(PIPELINE, pipeline.id)) doReturn pipeline + whenever(repository.retrieve(eq(PIPELINE), eq(message.executionId), any())) doReturn pipeline } afterGroup(::resetMocks) @@ -1513,7 +1514,7 @@ object CompleteStageHandlerTest : SubjectSpek({ val message = CompleteStage(pipeline.stageByRef("1")) beforeGroup { - whenever(repository.retrieve(PIPELINE, pipeline.id)) doReturn pipeline + whenever(repository.retrieve(eq(PIPELINE), eq(message.executionId), any())) doReturn pipeline } afterGroup(::resetMocks) @@ -1549,7 +1550,7 @@ object CompleteStageHandlerTest : SubjectSpek({ val message = CompleteStage(pipeline.stageByRef("1")) beforeGroup { - whenever(repository.retrieve(PIPELINE, pipeline.id)) doReturn pipeline + whenever(repository.retrieve(eq(PIPELINE), eq(message.executionId), any())) doReturn pipeline } afterGroup(::resetMocks) @@ -1586,7 +1587,7 @@ object CompleteStageHandlerTest : SubjectSpek({ val message = CompleteStage(pipeline.stageByRef("1")) beforeGroup { - whenever(repository.retrieve(PIPELINE, pipeline.id)) doReturn pipeline + whenever(repository.retrieve(eq(PIPELINE), eq(message.executionId), any())) doReturn pipeline } afterGroup(::resetMocks) @@ -1619,7 +1620,7 @@ object CompleteStageHandlerTest : SubjectSpek({ tasks.first().status = TERMINAL } - whenever(repository.retrieve(PIPELINE, message.executionId)) doReturn pipeline + whenever(repository.retrieve(eq(PIPELINE), eq(message.executionId), any())) doReturn pipeline } afterGroup(::resetMocks) @@ -1660,7 +1661,7 @@ object CompleteStageHandlerTest : SubjectSpek({ it.status = SUCCEEDED } - whenever(repository.retrieve(PIPELINE, message.executionId)) doReturn pipeline + whenever(repository.retrieve(eq(PIPELINE), eq(message.executionId), any())) doReturn pipeline } afterGroup(::resetMocks) @@ -1699,7 +1700,7 @@ object CompleteStageHandlerTest : SubjectSpek({ status = RUNNING } - whenever(repository.retrieve(PIPELINE, pipeline.id)) doReturn pipeline + whenever(repository.retrieve(eq(PIPELINE), eq(message.executionId), any())) doReturn pipeline } afterGroup(::resetMocks) @@ -1727,7 +1728,7 @@ object CompleteStageHandlerTest : SubjectSpek({ it.status = SUCCEEDED } - whenever(repository.retrieve(PIPELINE, pipeline.id)) doReturn pipeline + whenever(repository.retrieve(eq(PIPELINE), eq(message.executionId), any())) doReturn pipeline } afterGroup(::resetMocks) diff --git a/orca-queue/src/test/kotlin/com/netflix/spinnaker/orca/q/handler/CompleteTaskHandlerTest.kt b/orca-queue/src/test/kotlin/com/netflix/spinnaker/orca/q/handler/CompleteTaskHandlerTest.kt index d43048445c..6909560d99 100644 --- a/orca-queue/src/test/kotlin/com/netflix/spinnaker/orca/q/handler/CompleteTaskHandlerTest.kt +++ b/orca-queue/src/test/kotlin/com/netflix/spinnaker/orca/q/handler/CompleteTaskHandlerTest.kt @@ -53,6 +53,7 @@ import com.netflix.spinnaker.time.fixedClock import com.nhaarman.mockito_kotlin.any import com.nhaarman.mockito_kotlin.check import com.nhaarman.mockito_kotlin.doReturn +import com.nhaarman.mockito_kotlin.eq import com.nhaarman.mockito_kotlin.mock import com.nhaarman.mockito_kotlin.never import com.nhaarman.mockito_kotlin.reset @@ -101,7 +102,7 @@ object CompleteTaskHandlerTest : SubjectSpek({ ) beforeGroup { - whenever(repository.retrieve(PIPELINE, message.executionId)) doReturn pipeline + whenever(repository.retrieve(eq(PIPELINE), eq(message.executionId), any())) doReturn pipeline } afterGroup(::resetMocks) @@ -165,7 +166,7 @@ object CompleteTaskHandlerTest : SubjectSpek({ ) beforeGroup { - whenever(repository.retrieve(PIPELINE, message.executionId)) doReturn pipeline + whenever(repository.retrieve(eq(PIPELINE), eq(message.executionId), any())) doReturn pipeline } afterGroup(::resetMocks) @@ -225,7 +226,7 @@ object CompleteTaskHandlerTest : SubjectSpek({ tasks[2].status = SUCCEEDED } - whenever(repository.retrieve(PIPELINE, pipeline.id)) doReturn pipeline + whenever(repository.retrieve(eq(PIPELINE), eq(message.executionId), any())) doReturn pipeline } afterGroup(::resetMocks) @@ -277,7 +278,7 @@ object CompleteTaskHandlerTest : SubjectSpek({ ) beforeGroup { - whenever(repository.retrieve(PIPELINE, message.executionId)) doReturn pipeline + whenever(repository.retrieve(eq(PIPELINE), eq(message.executionId), any())) doReturn pipeline } afterGroup(::resetMocks) @@ -366,7 +367,7 @@ object CompleteTaskHandlerTest : SubjectSpek({ ) beforeGroup { - whenever(repository.retrieve(PIPELINE, message.executionId)) doReturn pipeline + whenever(repository.retrieve(eq(PIPELINE), eq(message.executionId), any())) doReturn pipeline } afterGroup(::resetMocks) @@ -406,7 +407,7 @@ object CompleteTaskHandlerTest : SubjectSpek({ ) beforeGroup { - whenever(repository.retrieve(PIPELINE, message.executionId)) doReturn pipeline + whenever(repository.retrieve(eq(PIPELINE), eq(message.executionId), any())) doReturn pipeline } afterGroup(::resetMocks) diff --git a/orca-queue/src/test/kotlin/com/netflix/spinnaker/orca/q/handler/ContinueParentStageHandlerTest.kt b/orca-queue/src/test/kotlin/com/netflix/spinnaker/orca/q/handler/ContinueParentStageHandlerTest.kt index 538149b296..e1b879523d 100644 --- a/orca-queue/src/test/kotlin/com/netflix/spinnaker/orca/q/handler/ContinueParentStageHandlerTest.kt +++ b/orca-queue/src/test/kotlin/com/netflix/spinnaker/orca/q/handler/ContinueParentStageHandlerTest.kt @@ -39,7 +39,9 @@ import com.netflix.spinnaker.orca.q.stageWithSyntheticBefore import com.netflix.spinnaker.orca.q.stageWithSyntheticBeforeAndNoTasks import com.netflix.spinnaker.q.Queue import com.netflix.spinnaker.spek.and +import com.nhaarman.mockito_kotlin.any import com.nhaarman.mockito_kotlin.doReturn +import com.nhaarman.mockito_kotlin.eq import com.nhaarman.mockito_kotlin.mock import com.nhaarman.mockito_kotlin.reset import com.nhaarman.mockito_kotlin.verify @@ -82,7 +84,7 @@ object ContinueParentStageHandlerTest : SubjectSpek( beforeGroup { pipeline.stageByRef("1<1").status = status pipeline.stageByRef("1<2").status = RUNNING - whenever(repository.retrieve(PIPELINE, pipeline.id)) doReturn pipeline + whenever(repository.retrieve(eq(PIPELINE), eq(pipeline.id), any())) doReturn pipeline } afterGroup(::resetMocks) @@ -111,7 +113,7 @@ object ContinueParentStageHandlerTest : SubjectSpek( beforeGroup { pipeline.stageByRef("1<1").status = status pipeline.stageByRef("1<2").status = TERMINAL - whenever(repository.retrieve(PIPELINE, pipeline.id)) doReturn pipeline + whenever(repository.retrieve(eq(PIPELINE), eq(pipeline.id), any())) doReturn pipeline } afterGroup(::resetMocks) @@ -143,7 +145,7 @@ object ContinueParentStageHandlerTest : SubjectSpek( and("they have not started yet") { beforeGroup { - whenever(repository.retrieve(PIPELINE, pipeline.id)) doReturn pipeline + whenever(repository.retrieve(eq(PIPELINE), eq(pipeline.id), any())) doReturn pipeline } afterGroup(::resetMocks) @@ -160,7 +162,7 @@ object ContinueParentStageHandlerTest : SubjectSpek( and("they have already started") { beforeGroup { pipeline.stageByRef("1").tasks.first().status = RUNNING - whenever(repository.retrieve(PIPELINE, pipeline.id)) doReturn pipeline + whenever(repository.retrieve(eq(PIPELINE), eq(pipeline.id), any())) doReturn pipeline } afterGroup(::resetMocks) @@ -189,7 +191,7 @@ object ContinueParentStageHandlerTest : SubjectSpek( beforeGroup { pipeline.stageByRef("1").beforeStages().forEach { it.status = status } - whenever(repository.retrieve(PIPELINE, pipeline.id)) doReturn pipeline + whenever(repository.retrieve(eq(PIPELINE), eq(pipeline.id), any())) doReturn pipeline } afterGroup(::resetMocks) @@ -222,7 +224,7 @@ object ContinueParentStageHandlerTest : SubjectSpek( beforeGroup { pipeline.stageByRef("1>1").status = status pipeline.stageByRef("1>2").status = RUNNING - whenever(repository.retrieve(PIPELINE, pipeline.id)) doReturn pipeline + whenever(repository.retrieve(eq(PIPELINE), eq(pipeline.id), any())) doReturn pipeline } afterGroup(::resetMocks) @@ -251,7 +253,7 @@ object ContinueParentStageHandlerTest : SubjectSpek( beforeGroup { pipeline.stageByRef("1>1").status = status pipeline.stageByRef("1>2").status = TERMINAL - whenever(repository.retrieve(PIPELINE, pipeline.id)) doReturn pipeline + whenever(repository.retrieve(eq(PIPELINE), eq(pipeline.id), any())) doReturn pipeline } afterGroup(::resetMocks) @@ -280,7 +282,7 @@ object ContinueParentStageHandlerTest : SubjectSpek( beforeGroup { pipeline.stageByRef("1>1").status = status pipeline.stageByRef("1>2").status = SUCCEEDED - whenever(repository.retrieve(PIPELINE, pipeline.id)) doReturn pipeline + whenever(repository.retrieve(eq(PIPELINE), eq(pipeline.id), any())) doReturn pipeline } afterGroup(::resetMocks) diff --git a/orca-queue/src/test/kotlin/com/netflix/spinnaker/orca/q/handler/PauseStageHandlerTest.kt b/orca-queue/src/test/kotlin/com/netflix/spinnaker/orca/q/handler/PauseStageHandlerTest.kt index fc09e8fc28..985707c082 100644 --- a/orca-queue/src/test/kotlin/com/netflix/spinnaker/orca/q/handler/PauseStageHandlerTest.kt +++ b/orca-queue/src/test/kotlin/com/netflix/spinnaker/orca/q/handler/PauseStageHandlerTest.kt @@ -26,8 +26,10 @@ import com.netflix.spinnaker.orca.q.buildBeforeStages import com.netflix.spinnaker.orca.q.singleTaskStage import com.netflix.spinnaker.orca.q.stageWithSyntheticBefore import com.netflix.spinnaker.q.Queue +import com.nhaarman.mockito_kotlin.any import com.nhaarman.mockito_kotlin.check import com.nhaarman.mockito_kotlin.doReturn +import com.nhaarman.mockito_kotlin.eq import com.nhaarman.mockito_kotlin.mock import com.nhaarman.mockito_kotlin.reset import com.nhaarman.mockito_kotlin.verify @@ -67,7 +69,7 @@ object PauseStageHandlerTest : SubjectSpek({ val message = PauseStage(PIPELINE, pipeline.id, "foo", pipeline.stages.first().id) beforeGroup { - whenever(repository.retrieve(PIPELINE, message.executionId)) doReturn pipeline + whenever(repository.retrieve(eq(PIPELINE), eq(message.executionId), any())) doReturn pipeline } afterGroup(::resetMocks) @@ -102,7 +104,7 @@ object PauseStageHandlerTest : SubjectSpek({ val message = PauseStage(pipeline.type, pipeline.id, "foo", pipeline.stageByRef("1<1").id) beforeGroup { - whenever(repository.retrieve(pipeline.type, message.executionId)) doReturn pipeline + whenever(repository.retrieve(eq(pipeline.type), eq(message.executionId), any())) doReturn pipeline } action("the handler receives a message") { diff --git a/orca-queue/src/test/kotlin/com/netflix/spinnaker/orca/q/handler/PauseTaskHandlerTest.kt b/orca-queue/src/test/kotlin/com/netflix/spinnaker/orca/q/handler/PauseTaskHandlerTest.kt index 1b186ddea7..f9a53f26da 100644 --- a/orca-queue/src/test/kotlin/com/netflix/spinnaker/orca/q/handler/PauseTaskHandlerTest.kt +++ b/orca-queue/src/test/kotlin/com/netflix/spinnaker/orca/q/handler/PauseTaskHandlerTest.kt @@ -27,8 +27,10 @@ import com.netflix.spinnaker.orca.q.PauseTask import com.netflix.spinnaker.orca.q.buildTasks import com.netflix.spinnaker.orca.q.multiTaskStage import com.netflix.spinnaker.q.Queue +import com.nhaarman.mockito_kotlin.any import com.nhaarman.mockito_kotlin.check import com.nhaarman.mockito_kotlin.doReturn +import com.nhaarman.mockito_kotlin.eq import com.nhaarman.mockito_kotlin.mock import com.nhaarman.mockito_kotlin.reset import com.nhaarman.mockito_kotlin.verify @@ -61,7 +63,7 @@ object PauseTaskHandlerTest : SubjectSpek({ val message = PauseTask(pipeline.type, pipeline.id, "foo", pipeline.stages.first().id, "1") beforeGroup { - whenever(repository.retrieve(PIPELINE, message.executionId)) doReturn pipeline + whenever(repository.retrieve(eq(PIPELINE), eq(message.executionId), any())) doReturn pipeline } afterGroup(::resetMocks) diff --git a/orca-queue/src/test/kotlin/com/netflix/spinnaker/orca/q/handler/RescheduleExecutionHandlerTest.kt b/orca-queue/src/test/kotlin/com/netflix/spinnaker/orca/q/handler/RescheduleExecutionHandlerTest.kt index 6e97c55b7f..cf86b8fc03 100644 --- a/orca-queue/src/test/kotlin/com/netflix/spinnaker/orca/q/handler/RescheduleExecutionHandlerTest.kt +++ b/orca-queue/src/test/kotlin/com/netflix/spinnaker/orca/q/handler/RescheduleExecutionHandlerTest.kt @@ -27,7 +27,9 @@ import com.netflix.spinnaker.orca.q.RescheduleExecution import com.netflix.spinnaker.orca.q.RunTask import com.netflix.spinnaker.orca.q.TasksProvider import com.netflix.spinnaker.q.Queue +import com.nhaarman.mockito_kotlin.any import com.nhaarman.mockito_kotlin.doReturn +import com.nhaarman.mockito_kotlin.eq import com.nhaarman.mockito_kotlin.mock import com.nhaarman.mockito_kotlin.reset import com.nhaarman.mockito_kotlin.verify @@ -86,7 +88,7 @@ object RescheduleExecutionHandlerTest : SubjectSpek( val message = RescheduleExecution(pipeline.type, pipeline.id, pipeline.application) beforeGroup { - whenever(repository.retrieve(pipeline.type, pipeline.id)) doReturn pipeline + whenever(repository.retrieve(eq(pipeline.type), eq(pipeline.id), any())) doReturn pipeline } afterGroup(::resetMocks) diff --git a/orca-queue/src/test/kotlin/com/netflix/spinnaker/orca/q/handler/RestartStageHandlerTest.kt b/orca-queue/src/test/kotlin/com/netflix/spinnaker/orca/q/handler/RestartStageHandlerTest.kt index 1fe66b7924..ef41c9da43 100644 --- a/orca-queue/src/test/kotlin/com/netflix/spinnaker/orca/q/handler/RestartStageHandlerTest.kt +++ b/orca-queue/src/test/kotlin/com/netflix/spinnaker/orca/q/handler/RestartStageHandlerTest.kt @@ -123,7 +123,7 @@ object RestartStageHandlerTest : SubjectSpek({ val message = RestartStage(pipeline.type, pipeline.id, "foo", pipeline.stageByRef("1").id, "fzlem@netflix.com") beforeGroup { - whenever(repository.retrieve(message.executionType, message.executionId)) doReturn pipeline + whenever(repository.retrieve(message.executionType, message.executionId, true)) doReturn pipeline } afterGroup(::resetMocks) @@ -174,7 +174,7 @@ object RestartStageHandlerTest : SubjectSpek({ beforeGroup { stageWithSyntheticBefore.plan(pipeline.stageByRef("2>1")) - whenever(repository.retrieve(PIPELINE, message.executionId)) doReturn pipeline + whenever(repository.retrieve(eq(PIPELINE), eq(message.executionId), any())) doReturn pipeline } afterGroup(::resetMocks) @@ -303,7 +303,7 @@ object RestartStageHandlerTest : SubjectSpek({ val message = RestartStage(pipeline.type, pipeline.id, "foo", pipeline.stageByRef("1").id, "fzlem@netflix.com") beforeGroup { - whenever(repository.retrieve(message.executionType, message.executionId)) doReturn pipeline + whenever(repository.retrieve(eq(message.executionType), eq(message.executionId), any())) doReturn pipeline } afterGroup(::resetMocks) @@ -376,7 +376,7 @@ object RestartStageHandlerTest : SubjectSpek({ val message = RestartStage(pipeline.type, pipeline.id, "foo", pipeline.stageByRef("1").id, "fzlem@netflix.com") beforeGroup { - whenever(repository.retrieve(message.executionType, message.executionId)) doReturn pipeline + whenever(repository.retrieve(eq(message.executionType), eq(message.executionId), any())) doReturn pipeline } afterGroup(::resetMocks) @@ -426,7 +426,7 @@ object RestartStageHandlerTest : SubjectSpek({ val message = RestartStage(pipeline.type, pipeline.id, "foo", syntheticStage.id, "fzlem@netflix.com") beforeGroup { - whenever(repository.retrieve(message.executionType, message.executionId)) doReturn pipeline + whenever(repository.retrieve(eq(message.executionType), eq(message.executionId), any())) doReturn pipeline } afterGroup(::resetMocks) @@ -477,7 +477,7 @@ object RestartStageHandlerTest : SubjectSpek({ application = "foo" status = RUNNING } - whenever(repository.retrieve(message.executionType, message.executionId)) doReturn pipeline + whenever(repository.retrieve(eq(message.executionType), eq(message.executionId), any())) doReturn pipeline whenever(repository.retrievePipelinesForPipelineConfigId( pipeline.pipelineConfigId, ExecutionRepository.ExecutionCriteria().setPageSize(2).setStatuses(RUNNING))) doReturn Observable.from(listOf(runningPipeline)) @@ -524,7 +524,7 @@ object RestartStageHandlerTest : SubjectSpek({ application = "foo" status = RUNNING } - whenever(repository.retrieve(message.executionType, message.executionId)) doReturn pipeline + whenever(repository.retrieve(eq(message.executionType), eq(message.executionId), any())) doReturn pipeline whenever(repository.retrievePipelinesForPipelineConfigId( pipeline.pipelineConfigId, ExecutionRepository.ExecutionCriteria().setPageSize(2).setStatuses(RUNNING))) doReturn Observable.from(listOf(runningPipeline)) diff --git a/orca-queue/src/test/kotlin/com/netflix/spinnaker/orca/q/handler/ResumeExecutionHandlerTest.kt b/orca-queue/src/test/kotlin/com/netflix/spinnaker/orca/q/handler/ResumeExecutionHandlerTest.kt index c0179dac43..45b9917a90 100644 --- a/orca-queue/src/test/kotlin/com/netflix/spinnaker/orca/q/handler/ResumeExecutionHandlerTest.kt +++ b/orca-queue/src/test/kotlin/com/netflix/spinnaker/orca/q/handler/ResumeExecutionHandlerTest.kt @@ -25,7 +25,9 @@ import com.netflix.spinnaker.orca.pipeline.persistence.ExecutionRepository import com.netflix.spinnaker.orca.q.ResumeExecution import com.netflix.spinnaker.orca.q.ResumeStage import com.netflix.spinnaker.q.Queue +import com.nhaarman.mockito_kotlin.any import com.nhaarman.mockito_kotlin.doReturn +import com.nhaarman.mockito_kotlin.eq import com.nhaarman.mockito_kotlin.mock import com.nhaarman.mockito_kotlin.reset import com.nhaarman.mockito_kotlin.verify @@ -74,7 +76,7 @@ object ResumeExecutionHandlerTest : SubjectSpek({ val message = ResumeExecution(pipeline.type, pipeline.id, pipeline.application) beforeGroup { - whenever(repository.retrieve(pipeline.type, pipeline.id)) doReturn pipeline + whenever(repository.retrieve(eq(pipeline.type), eq(pipeline.id), any())) doReturn pipeline } afterGroup(::resetMocks) diff --git a/orca-queue/src/test/kotlin/com/netflix/spinnaker/orca/q/handler/ResumeStageHandlerTest.kt b/orca-queue/src/test/kotlin/com/netflix/spinnaker/orca/q/handler/ResumeStageHandlerTest.kt index c31c9978c6..a30b645695 100644 --- a/orca-queue/src/test/kotlin/com/netflix/spinnaker/orca/q/handler/ResumeStageHandlerTest.kt +++ b/orca-queue/src/test/kotlin/com/netflix/spinnaker/orca/q/handler/ResumeStageHandlerTest.kt @@ -28,8 +28,10 @@ import com.netflix.spinnaker.orca.pipeline.persistence.ExecutionRepository import com.netflix.spinnaker.orca.q.ResumeStage import com.netflix.spinnaker.orca.q.ResumeTask import com.netflix.spinnaker.q.Queue +import com.nhaarman.mockito_kotlin.any import com.nhaarman.mockito_kotlin.check import com.nhaarman.mockito_kotlin.doReturn +import com.nhaarman.mockito_kotlin.eq import com.nhaarman.mockito_kotlin.mock import com.nhaarman.mockito_kotlin.reset import com.nhaarman.mockito_kotlin.verify @@ -76,7 +78,7 @@ object ResumeStageHandlerTest : SubjectSpek({ val message = ResumeStage(pipeline.type, pipeline.id, pipeline.application, pipeline.stages.first().id) beforeGroup { - whenever(repository.retrieve(PIPELINE, pipeline.id)) doReturn pipeline + whenever(repository.retrieve(eq(PIPELINE), eq(pipeline.id), any())) doReturn pipeline } afterGroup(::resetMocks) diff --git a/orca-queue/src/test/kotlin/com/netflix/spinnaker/orca/q/handler/ResumeTaskHandlerTest.kt b/orca-queue/src/test/kotlin/com/netflix/spinnaker/orca/q/handler/ResumeTaskHandlerTest.kt index edc8491d97..d456436105 100644 --- a/orca-queue/src/test/kotlin/com/netflix/spinnaker/orca/q/handler/ResumeTaskHandlerTest.kt +++ b/orca-queue/src/test/kotlin/com/netflix/spinnaker/orca/q/handler/ResumeTaskHandlerTest.kt @@ -29,8 +29,10 @@ import com.netflix.spinnaker.orca.q.ResumeTask import com.netflix.spinnaker.orca.q.RunTask import com.netflix.spinnaker.orca.q.TasksProvider import com.netflix.spinnaker.q.Queue +import com.nhaarman.mockito_kotlin.any import com.nhaarman.mockito_kotlin.check import com.nhaarman.mockito_kotlin.doReturn +import com.nhaarman.mockito_kotlin.eq import com.nhaarman.mockito_kotlin.mock import com.nhaarman.mockito_kotlin.reset import com.nhaarman.mockito_kotlin.verify @@ -70,7 +72,7 @@ object ResumeTaskHandlerTest : SubjectSpek({ val message = ResumeTask(pipeline.type, pipeline.id, pipeline.application, pipeline.stages.first().id, "1") beforeGroup { - whenever(repository.retrieve(PIPELINE, pipeline.id)) doReturn pipeline + whenever(repository.retrieve(eq(PIPELINE), eq(pipeline.id), any())) doReturn pipeline } afterGroup(::resetMocks) diff --git a/orca-queue/src/test/kotlin/com/netflix/spinnaker/orca/q/handler/RunTaskHandlerTest.kt b/orca-queue/src/test/kotlin/com/netflix/spinnaker/orca/q/handler/RunTaskHandlerTest.kt index b4a70647ec..bbc93695fd 100644 --- a/orca-queue/src/test/kotlin/com/netflix/spinnaker/orca/q/handler/RunTaskHandlerTest.kt +++ b/orca-queue/src/test/kotlin/com/netflix/spinnaker/orca/q/handler/RunTaskHandlerTest.kt @@ -133,7 +133,7 @@ object RunTaskHandlerTest : SubjectSpek({ taskExecutionInterceptors.forEach { whenever(it.beforeTaskExecution(task, stage)) doReturn stage } taskExecutionInterceptors.forEach { whenever(it.afterTaskExecution(task, stage, taskResult)) doReturn taskResult } whenever(task.execute(any())) doReturn taskResult - whenever(repository.retrieve(PIPELINE, message.executionId)) doReturn pipeline + whenever(repository.retrieve(eq(PIPELINE), eq(message.executionId), any())) doReturn pipeline setupRetriableLock(true, retriableLock) } @@ -169,7 +169,7 @@ object RunTaskHandlerTest : SubjectSpek({ taskExecutionInterceptors.forEach { whenever(it.beforeTaskExecution(task, stage)) doReturn stage } taskExecutionInterceptors.forEach { whenever(it.afterTaskExecution(task, stage, taskResult)) doReturn taskResult } whenever(task.execute(any())) doReturn taskResult - whenever(repository.retrieve(PIPELINE, message.executionId)) doReturn pipeline + whenever(repository.retrieve(eq(PIPELINE), eq(message.executionId), any())) doReturn pipeline setupRetriableLock(true, retriableLock) } @@ -197,7 +197,7 @@ object RunTaskHandlerTest : SubjectSpek({ taskExecutionInterceptors.forEach { whenever(it.beforeTaskExecution(task, stage)) doReturn stage } taskExecutionInterceptors.forEach { whenever(it.afterTaskExecution(task, stage, taskResult)) doReturn taskResult } whenever(task.execute(any())) doReturn taskResult - whenever(repository.retrieve(PIPELINE, message.executionId)) doReturn pipeline + whenever(repository.retrieve(eq(PIPELINE), eq(message.executionId), any())) doReturn pipeline setupRetriableLock(true, retriableLock) } @@ -228,7 +228,7 @@ object RunTaskHandlerTest : SubjectSpek({ taskExecutionInterceptors.forEach { whenever(it.beforeTaskExecution(task, stage)) doReturn stage } taskExecutionInterceptors.forEach { whenever(it.afterTaskExecution(task, stage, taskResult)) doReturn taskResult } whenever(task.execute(any())) doReturn taskResult - whenever(repository.retrieve(PIPELINE, message.executionId)) doReturn pipeline + whenever(repository.retrieve(eq(PIPELINE), eq(message.executionId), any())) doReturn pipeline setupRetriableLock(true, retriableLock) } @@ -272,7 +272,7 @@ object RunTaskHandlerTest : SubjectSpek({ taskExecutionInterceptors.forEach { whenever(it.beforeTaskExecution(task, stage)) doReturn stage } taskExecutionInterceptors.forEach { whenever(it.afterTaskExecution(task, stage, taskResult)) doReturn taskResult } whenever(task.execute(any())) doReturn taskResult - whenever(repository.retrieve(PIPELINE, message.executionId)) doReturn pipeline + whenever(repository.retrieve(eq(PIPELINE), eq(message.executionId), any())) doReturn pipeline setupRetriableLock(true, retriableLock) } @@ -308,7 +308,7 @@ object RunTaskHandlerTest : SubjectSpek({ taskExecutionInterceptors.forEach { whenever(it.beforeTaskExecution(task, stage)) doReturn stage } taskExecutionInterceptors.forEach { whenever(it.afterTaskExecution(task, stage, taskResult)) doReturn taskResult } whenever(task.execute(any())) doReturn taskResult - whenever(repository.retrieve(PIPELINE, message.executionId)) doReturn pipeline + whenever(repository.retrieve(eq(PIPELINE), eq(message.executionId), any())) doReturn pipeline setupRetriableLock(true, retriableLock) } @@ -336,7 +336,7 @@ object RunTaskHandlerTest : SubjectSpek({ taskExecutionInterceptors.forEach { whenever(it.beforeTaskExecution(task, stage)) doReturn stage } taskExecutionInterceptors.forEach { whenever(it.afterTaskExecution(task, stage, taskResult)) doReturn taskResult } whenever(task.execute(any())) doReturn taskResult - whenever(repository.retrieve(PIPELINE, message.executionId)) doReturn pipeline + whenever(repository.retrieve(eq(PIPELINE), eq(message.executionId), any())) doReturn pipeline setupRetriableLock(true, retriableLock) } @@ -367,7 +367,7 @@ object RunTaskHandlerTest : SubjectSpek({ taskExecutionInterceptors.forEach { whenever(it.beforeTaskExecution(task, stage)) doReturn stage } taskExecutionInterceptors.forEach { whenever(it.afterTaskExecution(task, stage, taskResult)) doReturn taskResult } whenever(task.execute(any())) doReturn taskResult - whenever(repository.retrieve(PIPELINE, message.executionId)) doReturn pipeline + whenever(repository.retrieve(eq(PIPELINE), eq(message.executionId), any())) doReturn pipeline setupRetriableLock(true, retriableLock) } @@ -413,7 +413,7 @@ object RunTaskHandlerTest : SubjectSpek({ whenever(task.execute(any())) doReturn taskResult whenever(task.getDynamicBackoffPeriod(any(), any())) doReturn taskBackoffMs whenever(dynamicConfigService.getConfig(eq(Long::class.java), eq("tasks.global.backOffPeriod"), any())) doReturn 0L - whenever(repository.retrieve(PIPELINE, message.executionId)) doReturn pipeline + whenever(repository.retrieve(eq(PIPELINE), eq(message.executionId), any())) doReturn pipeline setupRetriableLock(true, retriableLock) } @@ -450,7 +450,7 @@ object RunTaskHandlerTest : SubjectSpek({ taskExecutionInterceptors.forEach { whenever(it.beforeTaskExecution(task, stage)) doReturn stage } taskExecutionInterceptors.forEach { whenever(it.afterTaskExecution(task, stage, taskResult)) doReturn taskResult } whenever(task.execute(any())) doReturn taskResult - whenever(repository.retrieve(PIPELINE, message.executionId)) doReturn pipeline + whenever(repository.retrieve(eq(PIPELINE), eq(message.executionId), any())) doReturn pipeline setupRetriableLock(true, retriableLock) } @@ -477,7 +477,7 @@ object RunTaskHandlerTest : SubjectSpek({ } tasks.forEach { whenever(it.extensionClass) doReturn it::class.java } whenever(task.execute(any())) doReturn taskResult - whenever(repository.retrieve(PIPELINE, message.executionId)) doReturn pipeline + whenever(repository.retrieve(eq(PIPELINE), eq(message.executionId), any())) doReturn pipeline setupRetriableLock(true, retriableLock) } @@ -505,7 +505,7 @@ object RunTaskHandlerTest : SubjectSpek({ } tasks.forEach { whenever(it.extensionClass) doReturn it::class.java } whenever(task.execute(any())) doReturn taskResult - whenever(repository.retrieve(PIPELINE, message.executionId)) doReturn pipeline + whenever(repository.retrieve(eq(PIPELINE), eq(message.executionId), any())) doReturn pipeline setupRetriableLock(true, retriableLock) } @@ -556,7 +556,7 @@ object RunTaskHandlerTest : SubjectSpek({ taskExecutionInterceptors.forEach { whenever(it.beforeTaskExecution(task, stage)) doReturn stage } taskExecutionInterceptors.forEach { whenever(it.afterTaskExecution(task, stage, taskResult)) doThrow RuntimeException("o noes") } whenever(task.execute(any())) doThrow RuntimeException("o noes") - whenever(repository.retrieve(PIPELINE, message.executionId)) doReturn pipeline + whenever(repository.retrieve(eq(PIPELINE), eq(message.executionId), any())) doReturn pipeline whenever(exceptionHandler.handles(any())) doReturn true whenever(exceptionHandler.handle(anyOrNull(), any())) doReturn exceptionDetails setupRetriableLock(true, retriableLock) @@ -602,7 +602,7 @@ object RunTaskHandlerTest : SubjectSpek({ tasks.forEach { whenever(it.extensionClass) doReturn it::class.java } whenever(task.execute(any())) doThrow RuntimeException("o noes") - whenever(repository.retrieve(PIPELINE, message.executionId)) doReturn pipeline + whenever(repository.retrieve(eq(PIPELINE), eq(message.executionId), any())) doReturn pipeline whenever(exceptionHandler.handles(any())) doReturn true whenever(exceptionHandler.handle(anyOrNull(), any())) doReturn exceptionDetails setupRetriableLock(true, retriableLock) @@ -633,7 +633,7 @@ object RunTaskHandlerTest : SubjectSpek({ tasks.forEach { whenever(it.extensionClass) doReturn it::class.java } whenever(task.execute(any())) doThrow RuntimeException("o noes") - whenever(repository.retrieve(PIPELINE, message.executionId)) doReturn pipeline + whenever(repository.retrieve(eq(PIPELINE), eq(message.executionId), any())) doReturn pipeline whenever(exceptionHandler.handles(any())) doReturn true whenever(exceptionHandler.handle(anyOrNull(), any())) doReturn exceptionDetails setupRetriableLock(true, retriableLock) @@ -669,7 +669,7 @@ object RunTaskHandlerTest : SubjectSpek({ tasks.forEach { whenever(it.extensionClass) doReturn it::class.java } whenever(task.getDynamicBackoffPeriod(any(), any())) doReturn taskBackoffMs whenever(task.execute(any())) doThrow RuntimeException("o noes") - whenever(repository.retrieve(PIPELINE, message.executionId)) doReturn pipeline + whenever(repository.retrieve(eq(PIPELINE), eq(message.executionId), any())) doReturn pipeline whenever(exceptionHandler.handles(any())) doReturn true whenever(exceptionHandler.handle(anyOrNull(), any())) doReturn exceptionDetails setupRetriableLock(true, retriableLock) @@ -704,7 +704,7 @@ object RunTaskHandlerTest : SubjectSpek({ beforeGroup { tasks.forEach { whenever(it.extensionClass) doReturn it::class.java } taskExecutionInterceptors.forEach { whenever(it.beforeTaskExecution(task, stage)) doReturn stage } - whenever(repository.retrieve(PIPELINE, message.executionId)) doReturn pipeline + whenever(repository.retrieve(eq(PIPELINE), eq(message.executionId), any())) doReturn pipeline setupRetriableLock(true, retriableLock) } @@ -753,7 +753,7 @@ object RunTaskHandlerTest : SubjectSpek({ beforeGroup { tasks.forEach { whenever(it.extensionClass) doReturn it::class.java } taskExecutionInterceptors.forEach { whenever(it.beforeTaskExecution(task, stage)) doReturn stage } - whenever(repository.retrieve(PIPELINE, message.executionId)) doReturn pipeline + whenever(repository.retrieve(eq(PIPELINE), eq(message.executionId), any())) doReturn pipeline setupRetriableLock(true, retriableLock) } @@ -800,7 +800,7 @@ object RunTaskHandlerTest : SubjectSpek({ beforeGroup { tasks.forEach { whenever(it.extensionClass) doReturn it::class.java } taskExecutionInterceptors.forEach { whenever(it.beforeTaskExecution(task, stage)) doReturn stage } - whenever(repository.retrieve(PIPELINE, message.executionId)) doReturn pipeline + whenever(repository.retrieve(eq(PIPELINE), eq(message.executionId), any())) doReturn pipeline setupRetriableLock(true, retriableLock) } @@ -840,7 +840,7 @@ object RunTaskHandlerTest : SubjectSpek({ beforeGroup { tasks.forEach { whenever(it.extensionClass) doReturn it::class.java } taskExecutionInterceptors.forEach { whenever(it.beforeTaskExecution(task, stage)) doReturn stage } - whenever(repository.retrieve(PIPELINE, message.executionId)) doReturn pipeline + whenever(repository.retrieve(eq(PIPELINE), eq(message.executionId), any())) doReturn pipeline whenever(task.getDynamicTimeout(any())) doReturn timeout.toMillis() setupRetriableLock(true, retriableLock) } @@ -880,7 +880,7 @@ object RunTaskHandlerTest : SubjectSpek({ beforeGroup { tasks.forEach { whenever(it.extensionClass) doReturn it::class.java } taskExecutionInterceptors.forEach { whenever(it.beforeTaskExecution(task, stage)) doReturn stage } - whenever(repository.retrieve(PIPELINE, message.executionId)) doReturn pipeline + whenever(repository.retrieve(eq(PIPELINE), eq(message.executionId), any())) doReturn pipeline whenever(task.getDynamicTimeout(any())) doReturn timeout.toMillis() setupRetriableLock(true, retriableLock) } @@ -915,7 +915,7 @@ object RunTaskHandlerTest : SubjectSpek({ beforeGroup { tasks.forEach { whenever(it.extensionClass) doReturn it::class.java } taskExecutionInterceptors.forEach { whenever(it.beforeTaskExecution(task, stage)) doReturn stage } - whenever(repository.retrieve(PIPELINE, message.executionId)) doReturn pipeline + whenever(repository.retrieve(eq(PIPELINE), eq(message.executionId), any())) doReturn pipeline whenever(task.getDynamicTimeout(any())) doReturn timeout.toMillis() setupRetriableLock(true, retriableLock) } @@ -957,7 +957,7 @@ object RunTaskHandlerTest : SubjectSpek({ beforeGroup { tasks.forEach { whenever(it.extensionClass) doReturn it::class.java } taskExecutionInterceptors.forEach { whenever(it.beforeTaskExecution(task, stage)) doReturn stage } - whenever(repository.retrieve(PIPELINE, message.executionId)) doReturn pipeline + whenever(repository.retrieve(eq(PIPELINE), eq(message.executionId), any())) doReturn pipeline whenever(task.getDynamicTimeout(any())) doReturn timeout.toMillis() setupRetriableLock(true, retriableLock) } @@ -995,7 +995,7 @@ object RunTaskHandlerTest : SubjectSpek({ beforeGroup { tasks.forEach { whenever(it.extensionClass) doReturn it::class.java } taskExecutionInterceptors.forEach { whenever(it.beforeTaskExecution(task, stage)) doReturn stage } - whenever(repository.retrieve(PIPELINE, message.executionId)) doReturn pipeline + whenever(repository.retrieve(eq(PIPELINE), eq(message.executionId), any())) doReturn pipeline whenever(task.getDynamicTimeout(any())) doReturn timeout.toMillis() setupRetriableLock(true, retriableLock) } @@ -1037,7 +1037,7 @@ object RunTaskHandlerTest : SubjectSpek({ beforeGroup { tasks.forEach { whenever(it.extensionClass) doReturn it::class.java } taskExecutionInterceptors.forEach { whenever(it.beforeTaskExecution(task, stage)) doReturn stage } - whenever(repository.retrieve(PIPELINE, message.executionId)) doReturn pipeline + whenever(repository.retrieve(eq(PIPELINE), eq(message.executionId), any())) doReturn pipeline whenever(task.getDynamicTimeout(any())) doReturn timeout.toMillis() setupRetriableLock(true, retriableLock) } @@ -1087,7 +1087,7 @@ object RunTaskHandlerTest : SubjectSpek({ beforeGroup { tasks.forEach { whenever(it.extensionClass) doReturn it::class.java } taskExecutionInterceptors.forEach { whenever(it.beforeTaskExecution(task, stage)) doReturn stage } - whenever(repository.retrieve(PIPELINE, message.executionId)) doReturn pipeline + whenever(repository.retrieve(eq(PIPELINE), eq(message.executionId), any())) doReturn pipeline whenever(task.getDynamicTimeout(any())) doReturn timeout.toMillis() setupRetriableLock(true, retriableLock) } @@ -1131,7 +1131,7 @@ object RunTaskHandlerTest : SubjectSpek({ beforeGroup { tasks.forEach { whenever(it.extensionClass) doReturn it::class.java } - whenever(repository.retrieve(PIPELINE, message.executionId)) doReturn pipeline + whenever(repository.retrieve(eq(PIPELINE), eq(message.executionId), any())) doReturn pipeline whenever(task.getDynamicTimeout(any())) doReturn timeout.toMillis() setupRetriableLock(true, retriableLock) } @@ -1171,7 +1171,7 @@ object RunTaskHandlerTest : SubjectSpek({ beforeGroup { tasks.forEach { whenever(it.extensionClass) doReturn it::class.java } taskExecutionInterceptors.forEach { whenever(it.beforeTaskExecution(task, stage)) doReturn stage } - whenever(repository.retrieve(PIPELINE, message.executionId)) doReturn pipeline + whenever(repository.retrieve(eq(PIPELINE), eq(message.executionId), any())) doReturn pipeline whenever(task.getDynamicTimeout(any())) doReturn timeout.toMillis() whenever(task.onTimeout(any())) doReturn TaskResult.ofStatus(FAILED_CONTINUE) setupRetriableLock(true, retriableLock) @@ -1196,7 +1196,7 @@ object RunTaskHandlerTest : SubjectSpek({ beforeGroup { tasks.forEach { whenever(it.extensionClass) doReturn it::class.java } taskExecutionInterceptors.forEach { whenever(it.beforeTaskExecution(task, stage)) doReturn stage } - whenever(repository.retrieve(PIPELINE, message.executionId)) doReturn pipeline + whenever(repository.retrieve(eq(PIPELINE), eq(message.executionId), any())) doReturn pipeline whenever(task.getDynamicTimeout(any())) doReturn timeout.toMillis() whenever(task.onTimeout(any())) doReturn TaskResult.ofStatus(TERMINAL) setupRetriableLock(true, retriableLock) @@ -1220,7 +1220,7 @@ object RunTaskHandlerTest : SubjectSpek({ given("the task returns succeeded") { beforeGroup { tasks.forEach { whenever(it.extensionClass) doReturn it::class.java } - whenever(repository.retrieve(PIPELINE, message.executionId)) doReturn pipeline + whenever(repository.retrieve(eq(PIPELINE), eq(message.executionId), any())) doReturn pipeline whenever(task.getDynamicTimeout(any())) doReturn timeout.toMillis() whenever(task.onTimeout(any())) doReturn TaskResult.ofStatus(SUCCEEDED) setupRetriableLock(true, retriableLock) @@ -1270,7 +1270,7 @@ object RunTaskHandlerTest : SubjectSpek({ tasks.forEach { whenever(it.extensionClass) doReturn it::class.java } taskExecutionInterceptors.forEach { whenever(it.beforeTaskExecution(task, stage)) doReturn stage } taskExecutionInterceptors.forEach { whenever(it.afterTaskExecution(task, stage, taskResult)) doReturn taskResult } - whenever(repository.retrieve(PIPELINE, message.executionId)) doReturn pipeline + whenever(repository.retrieve(eq(PIPELINE), eq(message.executionId), any())) doReturn pipeline whenever(task.getDynamicTimeout(any())) doReturn timeout.toMillis() setupRetriableLock(true, retriableLock) } @@ -1308,7 +1308,7 @@ object RunTaskHandlerTest : SubjectSpek({ tasks.forEach { whenever(it.extensionClass) doReturn it::class.java } taskExecutionInterceptors.forEach { whenever(it.beforeTaskExecution(task, stage)) doReturn stage } taskExecutionInterceptors.forEach { whenever(it.afterTaskExecution(task, stage, taskResult)) doReturn taskResult } - whenever(repository.retrieve(PIPELINE, message.executionId)) doReturn pipeline + whenever(repository.retrieve(eq(PIPELINE), eq(message.executionId), any())) doReturn pipeline whenever(task.getDynamicTimeout(any())) doReturn timeout.toMillis() setupRetriableLock(true, retriableLock) } @@ -1353,7 +1353,7 @@ object RunTaskHandlerTest : SubjectSpek({ tasks.forEach { whenever(it.extensionClass) doReturn it::class.java } taskExecutionInterceptors.forEach { whenever(it.beforeTaskExecution(task, stage)) doReturn stage } taskExecutionInterceptors.forEach { whenever(it.afterTaskExecution(task, stage, taskResult)) doReturn taskResult } - whenever(repository.retrieve(PIPELINE, message.executionId)) doReturn pipeline + whenever(repository.retrieve(eq(PIPELINE), eq(message.executionId), any())) doReturn pipeline whenever(task.getDynamicTimeout(any())) doReturn timeout.toMillis() setupRetriableLock(true, retriableLock) } @@ -1390,7 +1390,7 @@ object RunTaskHandlerTest : SubjectSpek({ beforeGroup { tasks.forEach { whenever(it.extensionClass) doReturn it::class.java } - whenever(repository.retrieve(PIPELINE, message.executionId)) doReturn pipeline + whenever(repository.retrieve(eq(PIPELINE), eq(message.executionId), any())) doReturn pipeline whenever(task.getDynamicTimeout(any())) doReturn timeout.toMillis() setupRetriableLock(true, retriableLock) } @@ -1431,7 +1431,7 @@ object RunTaskHandlerTest : SubjectSpek({ beforeGroup { tasks.forEach { whenever(it.extensionClass) doReturn it::class.java } - whenever(repository.retrieve(PIPELINE, message.executionId)) doReturn pipeline + whenever(repository.retrieve(eq(PIPELINE), eq(message.executionId), any())) doReturn pipeline whenever(task.getDynamicTimeout(any())) doReturn timeout.toMillis() setupRetriableLock(true, retriableLock) } @@ -1474,7 +1474,7 @@ object RunTaskHandlerTest : SubjectSpek({ tasks.forEach { whenever(it.extensionClass) doReturn it::class.java } taskExecutionInterceptors.forEach { whenever(it.beforeTaskExecution(timeoutOverrideTask, stage)) doReturn stage } taskExecutionInterceptors.forEach { whenever(it.afterTaskExecution(timeoutOverrideTask, stage, taskResult)) doReturn taskResult } - whenever(repository.retrieve(PIPELINE, message.executionId)) doReturn pipeline + whenever(repository.retrieve(eq(PIPELINE), eq(message.executionId), any())) doReturn pipeline whenever(timeoutOverrideTask.timeout) doReturn timeout.toMillis() setupRetriableLock(true, retriableLock) } @@ -1519,7 +1519,7 @@ object RunTaskHandlerTest : SubjectSpek({ whenever(timeoutOverrideTask.execute(any())) doReturn taskResult taskExecutionInterceptors.forEach { whenever(it.beforeTaskExecution(timeoutOverrideTask, stage)) doReturn stage } taskExecutionInterceptors.forEach { whenever(it.afterTaskExecution(timeoutOverrideTask, stage, taskResult)) doReturn taskResult } - whenever(repository.retrieve(PIPELINE, message.executionId)) doReturn pipeline + whenever(repository.retrieve(eq(PIPELINE), eq(message.executionId), any())) doReturn pipeline setupRetriableLock(true, retriableLock) } @@ -1570,7 +1570,7 @@ object RunTaskHandlerTest : SubjectSpek({ taskExecutionInterceptors.forEach { whenever(it.beforeTaskExecution(task, stage)) doReturn stage } taskExecutionInterceptors.forEach { whenever(it.afterTaskExecution(task, stage, taskResult)) doReturn taskResult } whenever(task.execute(any())) doReturn taskResult - whenever(repository.retrieve(PIPELINE, message.executionId)) doReturn pipeline + whenever(repository.retrieve(eq(PIPELINE), eq(message.executionId), any())) doReturn pipeline setupRetriableLock(true, retriableLock) } @@ -1618,7 +1618,7 @@ object RunTaskHandlerTest : SubjectSpek({ taskExecutionInterceptors.forEach { whenever(it.beforeTaskExecution(task, stage)) doReturn stage } taskExecutionInterceptors.forEach { whenever(it.afterTaskExecution(task, stage, taskResult)) doReturn taskResult } whenever(task.execute(any())) doReturn taskResult - whenever(repository.retrieve(PIPELINE, message.executionId)) doReturn pipeline + whenever(repository.retrieve(eq(PIPELINE), eq(message.executionId), any())) doReturn pipeline setupRetriableLock(true, retriableLock) } @@ -1675,7 +1675,7 @@ object RunTaskHandlerTest : SubjectSpek({ taskExecutionInterceptors.forEach { whenever(it.beforeTaskExecution(task, stage)) doReturn stage } taskExecutionInterceptors.forEach { whenever(it.afterTaskExecution(task, stage, taskResult)) doReturn taskResult } whenever(task.execute(any())) doReturn taskResult - whenever(repository.retrieve(PIPELINE, message.executionId)) doReturn pipeline + whenever(repository.retrieve(eq(PIPELINE), eq(message.executionId), any())) doReturn pipeline setupRetriableLock(true, retriableLock) } @@ -1718,7 +1718,7 @@ object RunTaskHandlerTest : SubjectSpek({ taskExecutionInterceptors.forEach { whenever(it.beforeTaskExecution(task, stage)) doReturn stage } taskExecutionInterceptors.forEach { whenever(it.afterTaskExecution(task, stage, taskResult)) doReturn taskResult } whenever(task.execute(any())) doReturn taskResult - whenever(repository.retrieve(PIPELINE, message.executionId)) doReturn pipeline + whenever(repository.retrieve(eq(PIPELINE), eq(message.executionId), any())) doReturn pipeline setupRetriableLock(true, retriableLock) } @@ -1763,7 +1763,7 @@ object RunTaskHandlerTest : SubjectSpek({ taskExecutionInterceptors.forEach { whenever(it.beforeTaskExecution(task, stage)) doReturn stage } taskExecutionInterceptors.forEach { whenever(it.afterTaskExecution(task, stage, taskResult)) doReturn taskResult } whenever(task.execute(any())) doReturn taskResult - whenever(repository.retrieve(PIPELINE, message.executionId)) doReturn pipeline + whenever(repository.retrieve(eq(PIPELINE), eq(message.executionId), any())) doReturn pipeline setupRetriableLock(true, retriableLock) } @@ -1797,7 +1797,7 @@ object RunTaskHandlerTest : SubjectSpek({ beforeGroup { tasks.forEach { whenever(it.extensionClass) doReturn it::class.java } - whenever(repository.retrieve(PIPELINE, message.executionId)) doReturn pipeline + whenever(repository.retrieve(eq(PIPELINE), eq(message.executionId), any())) doReturn pipeline setupRetriableLock(true, retriableLock) } @@ -1838,7 +1838,7 @@ object RunTaskHandlerTest : SubjectSpek({ tasks.forEach { whenever(it.extensionClass) doReturn it::class.java } taskExecutionInterceptors.forEach { whenever(it.beforeTaskExecution(task, stage)) doReturn stage } taskExecutionInterceptors.forEach { whenever(it.afterTaskExecution(task, stage, taskResult)) doReturn taskResult } - whenever(repository.retrieve(PIPELINE, message.executionId)) doReturn pipeline + whenever(repository.retrieve(eq(PIPELINE), eq(message.executionId), any())) doReturn pipeline setupRetriableLock(true, retriableLock) } @@ -1897,7 +1897,7 @@ object RunTaskHandlerTest : SubjectSpek({ whenever(cloudProviderAwareTask.getCredentials(any())) doReturn "someAccount" whenever(dynamicConfigService.getConfig(eq(Long::class.java), eq("tasks.aws.someAccount.backOffPeriod"), any())) doReturn backOff.accountBackOffMs setupRetriableLock(true, retriableLock) - whenever(repository.retrieve(PIPELINE, message.executionId)) doReturn pipeline + whenever(repository.retrieve(eq(PIPELINE), eq(message.executionId), any())) doReturn pipeline } afterGroup(::resetMocks) diff --git a/orca-queue/src/test/kotlin/com/netflix/spinnaker/orca/q/handler/SkipStageHandlerTest.kt b/orca-queue/src/test/kotlin/com/netflix/spinnaker/orca/q/handler/SkipStageHandlerTest.kt index 07bf74f824..c8f366aebb 100644 --- a/orca-queue/src/test/kotlin/com/netflix/spinnaker/orca/q/handler/SkipStageHandlerTest.kt +++ b/orca-queue/src/test/kotlin/com/netflix/spinnaker/orca/q/handler/SkipStageHandlerTest.kt @@ -37,6 +37,7 @@ import com.nhaarman.mockito_kotlin.any import com.nhaarman.mockito_kotlin.argumentCaptor import com.nhaarman.mockito_kotlin.check import com.nhaarman.mockito_kotlin.doReturn +import com.nhaarman.mockito_kotlin.eq import com.nhaarman.mockito_kotlin.mock import com.nhaarman.mockito_kotlin.never import com.nhaarman.mockito_kotlin.reset @@ -80,7 +81,7 @@ object SkipStageHandlerTest : SubjectSpek({ val message = SkipStage(pipeline.stageByRef("1")) beforeGroup { - whenever(repository.retrieve(PIPELINE, message.executionId)) doReturn pipeline + whenever(repository.retrieve(eq(PIPELINE), eq(message.executionId), any())) doReturn pipeline } afterGroup(::resetMocks) @@ -108,7 +109,7 @@ object SkipStageHandlerTest : SubjectSpek({ val message = SkipStage(pipeline.stageByRef("1")) beforeGroup { - whenever(repository.retrieve(PIPELINE, message.executionId)) doReturn pipeline + whenever(repository.retrieve(eq(PIPELINE), eq(message.executionId), any())) doReturn pipeline } afterGroup(::resetMocks) @@ -163,7 +164,7 @@ object SkipStageHandlerTest : SubjectSpek({ val message = SkipStage(pipeline.stageByRef("1")) beforeGroup { - whenever(repository.retrieve(PIPELINE, message.executionId)) doReturn pipeline + whenever(repository.retrieve(eq(PIPELINE), eq(message.executionId), any())) doReturn pipeline } afterGroup(::resetMocks) @@ -219,7 +220,7 @@ object SkipStageHandlerTest : SubjectSpek({ val message = SkipStage(pipeline.stageByRef("1")) beforeGroup { - whenever(repository.retrieve(PIPELINE, message.executionId)) doReturn pipeline + whenever(repository.retrieve(eq(PIPELINE), eq(message.executionId), any())) doReturn pipeline } afterGroup(::resetMocks) @@ -253,7 +254,7 @@ object SkipStageHandlerTest : SubjectSpek({ val message = SkipStage(pipeline.stageByRef("1")) beforeGroup { - whenever(repository.retrieve(PIPELINE, message.executionId)) doReturn pipeline + whenever(repository.retrieve(eq(PIPELINE), eq(message.executionId), any())) doReturn pipeline } afterGroup(::resetMocks) @@ -293,7 +294,7 @@ object SkipStageHandlerTest : SubjectSpek({ val message = SkipStage(pipeline.stageByRef("1")) beforeGroup { - whenever(repository.retrieve(PIPELINE, message.executionId)) doReturn pipeline + whenever(repository.retrieve(eq(PIPELINE), eq(message.executionId), any())) doReturn pipeline } afterGroup(::resetMocks) @@ -331,7 +332,7 @@ object SkipStageHandlerTest : SubjectSpek({ val message = SkipStage(pipeline.stageByRef("1")) beforeGroup { - whenever(repository.retrieve(PIPELINE, message.executionId)) doReturn pipeline + whenever(repository.retrieve(eq(PIPELINE), eq(message.executionId), any())) doReturn pipeline } afterGroup(::resetMocks) diff --git a/orca-queue/src/test/kotlin/com/netflix/spinnaker/orca/q/handler/StartExecutionHandlerTest.kt b/orca-queue/src/test/kotlin/com/netflix/spinnaker/orca/q/handler/StartExecutionHandlerTest.kt index 1cc2da8d7b..a5cb03e051 100644 --- a/orca-queue/src/test/kotlin/com/netflix/spinnaker/orca/q/handler/StartExecutionHandlerTest.kt +++ b/orca-queue/src/test/kotlin/com/netflix/spinnaker/orca/q/handler/StartExecutionHandlerTest.kt @@ -83,7 +83,7 @@ object StartExecutionHandlerTest : SubjectSpek({ val message = StartExecution(pipeline) beforeGroup { - whenever(repository.retrieve(message.executionType, message.executionId)) doReturn pipeline + whenever(repository.retrieve(eq(message.executionType), eq(message.executionId), any())) doReturn pipeline } afterGroup(::resetMocks) @@ -123,7 +123,7 @@ object StartExecutionHandlerTest : SubjectSpek({ val message = StartExecution(pipeline) beforeGroup { - whenever(repository.retrieve(message.executionType, message.executionId)) doReturn pipeline + whenever(repository.retrieve(eq(message.executionType), eq(message.executionId), any())) doReturn pipeline } afterGroup(::resetMocks) @@ -158,7 +158,7 @@ object StartExecutionHandlerTest : SubjectSpek({ val message = StartExecution(pipeline) beforeGroup { - whenever(repository.retrieve(message.executionType, message.executionId)) doReturn pipeline + whenever(repository.retrieve(eq(message.executionType), eq(message.executionId), any())) doReturn pipeline } afterGroup(::resetMocks) @@ -195,7 +195,7 @@ object StartExecutionHandlerTest : SubjectSpek({ val message = StartExecution(pipeline) beforeGroup { - whenever(repository.retrieve(message.executionType, message.executionId)) doReturn pipeline + whenever(repository.retrieve(eq(message.executionType), eq(message.executionId), any())) doReturn pipeline } afterGroup(::resetMocks) @@ -221,7 +221,7 @@ object StartExecutionHandlerTest : SubjectSpek({ val message = StartExecution(pipeline) beforeGroup { - whenever(repository.retrieve(message.executionType, message.executionId)) doReturn pipeline + whenever(repository.retrieve(eq(message.executionType), eq(message.executionId), any())) doReturn pipeline } afterGroup(::resetMocks) @@ -254,7 +254,7 @@ object StartExecutionHandlerTest : SubjectSpek({ val message = StartExecution(pipeline) beforeGroup { - whenever(repository.retrieve(message.executionType, message.executionId)) doReturn pipeline + whenever(repository.retrieve(eq(message.executionType), eq(message.executionId), any())) doReturn pipeline } afterGroup(::resetMocks) @@ -289,7 +289,7 @@ object StartExecutionHandlerTest : SubjectSpek({ val message = StartExecution(pipeline) beforeGroup { - whenever(repository.retrieve(message.executionType, message.executionId)) doReturn pipeline + whenever(repository.retrieve(eq(message.executionType), eq(message.executionId), any())) doReturn pipeline } afterGroup(::resetMocks) @@ -339,7 +339,7 @@ object StartExecutionHandlerTest : SubjectSpek({ ) doReturn just(runningPipeline) whenever( - repository.retrieve(message.executionType, message.executionId) + repository.retrieve(eq(message.executionType), eq(message.executionId), any()) ) doReturn pipeline } @@ -374,7 +374,7 @@ object StartExecutionHandlerTest : SubjectSpek({ ) doReturn just(runningPipeline) whenever( - repository.retrieve(message.executionType, message.executionId) + repository.retrieve(eq(message.executionType), eq(message.executionId), any()) ) doReturn pipeline } @@ -402,7 +402,7 @@ object StartExecutionHandlerTest : SubjectSpek({ ) doReturn just(pipeline) whenever( - repository.retrieve(message.executionType, message.executionId) + repository.retrieve(eq(message.executionType), eq(message.executionId), any()) ) doReturn pipeline } @@ -473,7 +473,7 @@ object StartExecutionHandlerTest : SubjectSpek({ ) doReturn just(pipeline) whenever( - repository.retrieve(message.executionType, message.executionId) + repository.retrieve(eq(message.executionType), eq(message.executionId), any()) ) doReturn pipeline } @@ -499,7 +499,7 @@ object StartExecutionHandlerTest : SubjectSpek({ ) doReturn just(runningPipeline1) whenever( - repository.retrieve(message.executionType, message.executionId) + repository.retrieve(eq(message.executionType), eq(message.executionId), any()) ) doReturn pipeline } @@ -525,7 +525,7 @@ object StartExecutionHandlerTest : SubjectSpek({ ) doReturn just(runningPipeline1,runningPipeline2) whenever( - repository.retrieve(message.executionType, message.executionId) + repository.retrieve(eq(message.executionType), eq(message.executionId), any()) ) doReturn pipeline } @@ -551,7 +551,7 @@ object StartExecutionHandlerTest : SubjectSpek({ ) doReturn just(runningPipeline1,runningPipeline2,runningPipeline3) whenever( - repository.retrieve(message.executionType, message.executionId) + repository.retrieve(eq(message.executionType), eq(message.executionId), any()) ) doReturn pipeline } diff --git a/orca-queue/src/test/kotlin/com/netflix/spinnaker/orca/q/handler/StartStageHandlerTest.kt b/orca-queue/src/test/kotlin/com/netflix/spinnaker/orca/q/handler/StartStageHandlerTest.kt index 592385fe8b..e5af5e85db 100644 --- a/orca-queue/src/test/kotlin/com/netflix/spinnaker/orca/q/handler/StartStageHandlerTest.kt +++ b/orca-queue/src/test/kotlin/com/netflix/spinnaker/orca/q/handler/StartStageHandlerTest.kt @@ -157,7 +157,7 @@ object StartStageHandlerTest : SubjectSpek({ val message = StartStage(pipeline.type, pipeline.id, "foo", pipeline.stages.first().id) beforeGroup { - whenever(repository.retrieve(PIPELINE, message.executionId)) doReturn pipeline + whenever(repository.retrieve(eq(PIPELINE), eq(message.executionId), any())) doReturn pipeline } afterGroup(::resetMocks) @@ -217,7 +217,7 @@ object StartStageHandlerTest : SubjectSpek({ val message = StartStage(pipeline.stageByRef("1")) beforeGroup { - whenever(repository.retrieve(PIPELINE, message.executionId)) doReturn pipeline + whenever(repository.retrieve(eq(PIPELINE), eq(message.executionId), any())) doReturn pipeline } afterGroup(::resetMocks) @@ -262,7 +262,7 @@ object StartStageHandlerTest : SubjectSpek({ val message = StartStage(pipeline.stageByRef("1")) beforeGroup { - whenever(repository.retrieve(PIPELINE, message.executionId)) doReturn pipeline + whenever(repository.retrieve(eq(PIPELINE), eq(message.executionId), any())) doReturn pipeline } afterGroup(::resetMocks) @@ -307,7 +307,7 @@ object StartStageHandlerTest : SubjectSpek({ val message = StartStage(pipeline.type, pipeline.id, "foo", pipeline.stages.first().id) beforeGroup { - whenever(repository.retrieve(PIPELINE, message.executionId)) doReturn pipeline + whenever(repository.retrieve(eq(PIPELINE), eq(message.executionId), any())) doReturn pipeline } on("receiving a message") { @@ -369,7 +369,7 @@ object StartStageHandlerTest : SubjectSpek({ val message = StartStage(pipeline.type, pipeline.id, "foo", pipeline.stages.first().id) beforeGroup { - whenever(repository.retrieve(PIPELINE, message.executionId)) doReturn pipeline + whenever(repository.retrieve(eq(PIPELINE), eq(message.executionId), any())) doReturn pipeline } on("receiving a message") { @@ -409,7 +409,7 @@ object StartStageHandlerTest : SubjectSpek({ val message = StartStage(pipeline.type, pipeline.id, "foo", pipeline.stages.first().id) beforeGroup { - whenever(repository.retrieve(PIPELINE, message.executionId)) doReturn pipeline + whenever(repository.retrieve(eq(PIPELINE), eq(message.executionId), any())) doReturn pipeline } afterGroup(::resetMocks) @@ -459,7 +459,7 @@ object StartStageHandlerTest : SubjectSpek({ beforeGroup { pipeline.stageByRef("1").status = SUCCEEDED pipeline.stageByRef("2").status = RUNNING - whenever(repository.retrieve(PIPELINE, message.executionId)) doReturn pipeline + whenever(repository.retrieve(eq(PIPELINE), eq(message.executionId), any())) doReturn pipeline } afterGroup(::resetMocks) @@ -489,7 +489,7 @@ object StartStageHandlerTest : SubjectSpek({ beforeGroup { pipeline.stageByRef("1").status = SUCCEEDED pipeline.stageByRef("2").status = SUCCEEDED - whenever(repository.retrieve(PIPELINE, message.executionId)) doReturn pipeline + whenever(repository.retrieve(eq(PIPELINE), eq(message.executionId), any())) doReturn pipeline } afterGroup(::resetMocks) @@ -511,7 +511,7 @@ object StartStageHandlerTest : SubjectSpek({ beforeGroup { pipeline.stageByRef("1").status = SUCCEEDED pipeline.stageByRef("2").status = TERMINAL - whenever(repository.retrieve(PIPELINE, message.executionId)) doReturn pipeline + whenever(repository.retrieve(eq(PIPELINE), eq(message.executionId), any())) doReturn pipeline } afterGroup(::resetMocks) @@ -535,7 +535,7 @@ object StartStageHandlerTest : SubjectSpek({ pipeline.stageByRef("1").status = SUCCEEDED pipeline.stageByRef("2").status = SUCCEEDED pipeline.stageByRef("3").status = RUNNING - whenever(repository.retrieve(PIPELINE, message.executionId)) doReturn pipeline + whenever(repository.retrieve(eq(PIPELINE), eq(message.executionId), any())) doReturn pipeline } afterGroup(::resetMocks) @@ -572,7 +572,7 @@ object StartStageHandlerTest : SubjectSpek({ } beforeGroup { - whenever(repository.retrieve(PIPELINE, message.executionId)) doReturn pipeline + whenever(repository.retrieve(eq(PIPELINE), eq(message.executionId), any())) doReturn pipeline } afterGroup(::resetMocks) @@ -598,7 +598,7 @@ object StartStageHandlerTest : SubjectSpek({ val message = StartStage(pipeline.type, pipeline.id, "foo", pipeline.stages.first().id) beforeGroup { - whenever(repository.retrieve(PIPELINE, message.executionId)) doReturn pipeline + whenever(repository.retrieve(eq(PIPELINE), eq(message.executionId), any())) doReturn pipeline } afterGroup(::resetMocks) @@ -644,7 +644,7 @@ object StartStageHandlerTest : SubjectSpek({ val message = StartStage(pipeline.type, pipeline.id, "foo", pipeline.stages.first().id) beforeGroup { - whenever(repository.retrieve(PIPELINE, message.executionId)) doReturn pipeline + whenever(repository.retrieve(eq(PIPELINE), eq(message.executionId), any())) doReturn pipeline } afterGroup(::resetMocks) @@ -679,7 +679,7 @@ object StartStageHandlerTest : SubjectSpek({ val message = StartStage(pipeline.type, pipeline.id, "foo", pipeline.stages.first().id) beforeGroup { - whenever(repository.retrieve(PIPELINE, message.executionId)) doReturn pipeline + whenever(repository.retrieve(eq(PIPELINE), eq(message.executionId), any())) doReturn pipeline } afterGroup(::resetMocks) @@ -728,7 +728,7 @@ object StartStageHandlerTest : SubjectSpek({ val message = StartStage(pipeline.type, pipeline.id, "foo", pipeline.stages.first().id) beforeGroup { - whenever(repository.retrieve(PIPELINE, message.executionId)) doReturn pipeline + whenever(repository.retrieve(eq(PIPELINE), eq(message.executionId), any())) doReturn pipeline } afterGroup(::resetMocks) @@ -775,7 +775,7 @@ object StartStageHandlerTest : SubjectSpek({ val message = StartStage(pipeline.type, pipeline.id, "foo", pipeline.stages.first().id) beforeGroup { - whenever(repository.retrieve(PIPELINE, message.executionId)) doReturn pipeline + whenever(repository.retrieve(eq(PIPELINE), eq(message.executionId), any())) doReturn pipeline } afterGroup(::resetMocks) @@ -813,7 +813,7 @@ object StartStageHandlerTest : SubjectSpek({ and("the pipeline should fail") { beforeGroup { - whenever(repository.retrieve(PIPELINE, message.executionId)) doReturn pipeline + whenever(repository.retrieve(eq(PIPELINE), eq(message.executionId), any())) doReturn pipeline whenever(exceptionHandler.handles(any())) doReturn true whenever(exceptionHandler.handle(anyOrNull(), any())) doReturn exceptionDetails } @@ -853,7 +853,7 @@ object StartStageHandlerTest : SubjectSpek({ context["beforeStagePlanningFailed"] = null } - whenever(repository.retrieve(PIPELINE, message.executionId)) doReturn pipeline + whenever(repository.retrieve(eq(PIPELINE), eq(message.executionId), any())) doReturn pipeline whenever(exceptionHandler.handles(any())) doReturn true whenever(exceptionHandler.handle(anyOrNull(), any())) doReturn exceptionDetails } @@ -901,7 +901,7 @@ object StartStageHandlerTest : SubjectSpek({ context["beforeStagePlanningFailed"] = null } - whenever(repository.retrieve(PIPELINE, message.executionId)) doReturn pipeline + whenever(repository.retrieve(eq(PIPELINE), eq(message.executionId), any())) doReturn pipeline whenever(exceptionHandler.handles(any())) doReturn true whenever(exceptionHandler.handle(anyOrNull(), any())) doReturn exceptionDetails } @@ -951,7 +951,7 @@ object StartStageHandlerTest : SubjectSpek({ ) beforeGroup { - whenever(repository.retrieve(PIPELINE, message.executionId)) doReturn pipeline + whenever(repository.retrieve(eq(PIPELINE), eq(message.executionId), any())) doReturn pipeline whenever(exceptionHandler.handles(any())) doReturn true whenever(exceptionHandler.handle(anyOrNull(), any())) doReturn exceptionDetails } @@ -979,7 +979,7 @@ object StartStageHandlerTest : SubjectSpek({ val message = StartStage(pipeline.type, pipeline.id, "foo", pipeline.stages.first().id) beforeGroup { - whenever(repository.retrieve(PIPELINE, message.executionId)) doReturn pipeline + whenever(repository.retrieve(eq(PIPELINE), eq(message.executionId), any())) doReturn pipeline } afterGroup(::resetMocks) @@ -1051,7 +1051,7 @@ object StartStageHandlerTest : SubjectSpek({ val message = StartStage(pipeline.type, pipeline.id, "foo", pipeline.stageByRef("1").id) beforeGroup { - whenever(repository.retrieve(PIPELINE, pipeline.id)) doReturn pipeline + whenever(repository.retrieve(eq(PIPELINE), eq(message.executionId), any())) doReturn pipeline } afterGroup(::resetMocks) @@ -1105,7 +1105,7 @@ object StartStageHandlerTest : SubjectSpek({ val message = StartStage(pipeline.type, pipeline.id, "foo", pipeline.stages[0].id) beforeGroup { - whenever(repository.retrieve(PIPELINE, pipeline.id)) doReturn pipeline + whenever(repository.retrieve(eq(PIPELINE), eq(message.executionId), any())) doReturn pipeline } afterGroup(::resetMocks) @@ -1141,7 +1141,7 @@ object StartStageHandlerTest : SubjectSpek({ val message = StartStage(pipeline.type, pipeline.id, "foo", pipeline.stageByRef("1").id) beforeGroup { - whenever(repository.retrieve(PIPELINE, pipeline.id)) doReturn pipeline + whenever(repository.retrieve(eq(PIPELINE), eq(message.executionId), any())) doReturn pipeline } afterGroup(::resetMocks) @@ -1192,7 +1192,7 @@ object StartStageHandlerTest : SubjectSpek({ val message = StartStage(pipeline.stages.first()) beforeGroup { - whenever(repository.retrieve(PIPELINE, pipeline.id)) doReturn pipeline + whenever(repository.retrieve(eq(PIPELINE), eq(message.executionId), any())) doReturn pipeline } afterGroup(::resetMocks) @@ -1221,7 +1221,7 @@ object StartStageHandlerTest : SubjectSpek({ val message = StartStage(pipeline.stages.first()) beforeGroup { - whenever(repository.retrieve(PIPELINE, pipeline.id)) doReturn pipeline + whenever(repository.retrieve(eq(PIPELINE), eq(message.executionId), any())) doReturn pipeline } afterGroup(::resetMocks) @@ -1265,7 +1265,7 @@ object StartStageHandlerTest : SubjectSpek({ and("the stage should be run") { beforeGroup { - whenever(repository.retrieve(PIPELINE, pipeline.id)) doReturn pipeline + whenever(repository.retrieve(eq(PIPELINE), eq(message.executionId), any())) doReturn pipeline } afterGroup(::resetMocks) @@ -1299,7 +1299,7 @@ object StartStageHandlerTest : SubjectSpek({ val message = StartStage(pipeline.stageByRef("2")) beforeGroup { - whenever(repository.retrieve(PIPELINE, pipeline.id)) doReturn pipeline + whenever(repository.retrieve(eq(PIPELINE), eq(message.executionId), any())) doReturn pipeline } afterGroup(::resetMocks) @@ -1321,7 +1321,7 @@ object StartStageHandlerTest : SubjectSpek({ given("no such execution") { beforeGroup { - whenever(repository.retrieve(PIPELINE, message.executionId)) doThrow ExecutionNotFoundException("No Pipeline found for ${message.executionId}") + whenever(repository.retrieve(eq(PIPELINE), eq(message.executionId), any())) doThrow ExecutionNotFoundException("No Pipeline found for ${message.executionId}") } afterGroup(::resetMocks) @@ -1342,7 +1342,7 @@ object StartStageHandlerTest : SubjectSpek({ } beforeGroup { - whenever(repository.retrieve(PIPELINE, message.executionId)) doReturn pipeline + whenever(repository.retrieve(eq(PIPELINE), eq(message.executionId), any())) doReturn pipeline } afterGroup(::resetMocks) diff --git a/orca-queue/src/test/kotlin/com/netflix/spinnaker/orca/q/handler/StartTaskHandlerTest.kt b/orca-queue/src/test/kotlin/com/netflix/spinnaker/orca/q/handler/StartTaskHandlerTest.kt index 69dca9cdd5..f77b8711ce 100644 --- a/orca-queue/src/test/kotlin/com/netflix/spinnaker/orca/q/handler/StartTaskHandlerTest.kt +++ b/orca-queue/src/test/kotlin/com/netflix/spinnaker/orca/q/handler/StartTaskHandlerTest.kt @@ -40,10 +40,12 @@ import com.netflix.spinnaker.orca.q.buildTasks import com.netflix.spinnaker.orca.q.singleTaskStage import com.netflix.spinnaker.q.Queue import com.netflix.spinnaker.time.fixedClock +import com.nhaarman.mockito_kotlin.any import com.nhaarman.mockito_kotlin.argumentCaptor import com.nhaarman.mockito_kotlin.check import com.nhaarman.mockito_kotlin.doReturn import com.nhaarman.mockito_kotlin.doThrow +import com.nhaarman.mockito_kotlin.eq import com.nhaarman.mockito_kotlin.mock import com.nhaarman.mockito_kotlin.reset import com.nhaarman.mockito_kotlin.verify @@ -89,7 +91,7 @@ object StartTaskHandlerTest : SubjectSpek({ val message = StartTask(pipeline.type, pipeline.id, "foo", pipeline.stages.first().id, "1") beforeGroup { - whenever(repository.retrieve(PIPELINE, message.executionId)) doReturn pipeline + whenever(repository.retrieve(eq(PIPELINE), eq(message.executionId), any())) doReturn pipeline whenever(environment.getProperty("tasks.dummyTask.enabled", Boolean::class.java, true)) doReturn true } @@ -146,7 +148,7 @@ object StartTaskHandlerTest : SubjectSpek({ val message = StartTask(pipeline.type, pipeline.id, "foo", pipeline.stages.first().id, "1") beforeGroup { - whenever(repository.retrieve(PIPELINE, message.executionId)) doReturn pipeline + whenever(repository.retrieve(eq(PIPELINE), eq(message.executionId), any())) doReturn pipeline whenever(environment.getProperty("tasks.dummyTask.enabled", Boolean::class.java, true)) doReturn false } @@ -199,7 +201,7 @@ object StartTaskHandlerTest : SubjectSpek({ val message = StartTask(pipeline.type, pipeline.id, "foo", pipeline.stages.first().id, "1") beforeGroup { - whenever(repository.retrieve(PIPELINE, message.executionId)) doThrow NullPointerException() + whenever(repository.retrieve(eq(PIPELINE), eq(message.executionId), any())) doThrow NullPointerException() } afterGroup(::resetMocks) diff --git a/orca-sql/src/main/kotlin/com/netflix/spinnaker/orca/sql/pipeline/persistence/ExecutionMapper.kt b/orca-sql/src/main/kotlin/com/netflix/spinnaker/orca/sql/pipeline/persistence/ExecutionMapper.kt index d98ed79a09..23fcf536e8 100644 --- a/orca-sql/src/main/kotlin/com/netflix/spinnaker/orca/sql/pipeline/persistence/ExecutionMapper.kt +++ b/orca-sql/src/main/kotlin/com/netflix/spinnaker/orca/sql/pipeline/persistence/ExecutionMapper.kt @@ -69,7 +69,7 @@ class ExecutionMapper( } } - fun map(rs: ResultSet, context: DSLContext): Collection { + fun map(rs: ResultSet, context: DSLContext, includeNestedExecutions: Boolean): Collection { val results = mutableListOf() val executionMap = mutableMapOf() val legacyMap = mutableMapOf() @@ -80,7 +80,7 @@ class ExecutionMapper( mapper.readValue(body) .also { execution -> - convertPipelineRefTrigger(execution, context) + convertPipelineRefTrigger(execution, context, includeNestedExecutions) execution.setSize(body.length.toLong()) results.add(execution) execution.partition = rs.getString("partition") @@ -138,9 +138,9 @@ class ExecutionMapper( } @VisibleForTesting - fun convertPipelineRefTrigger(execution: PipelineExecution, context: DSLContext) { + fun convertPipelineRefTrigger(execution: PipelineExecution, context: DSLContext, includeNestedExecutions: Boolean) { val trigger = execution.trigger - if (trigger is PipelineRefTrigger) { + if (trigger is PipelineRefTrigger && (!pipelineRefEnabled || includeNestedExecutions)) { val parentExecution = fetchParentExecution(execution.type, trigger, context) if (parentExecution == null) { diff --git a/orca-sql/src/main/kotlin/com/netflix/spinnaker/orca/sql/pipeline/persistence/SqlExecutionRepository.kt b/orca-sql/src/main/kotlin/com/netflix/spinnaker/orca/sql/pipeline/persistence/SqlExecutionRepository.kt index c0a8bf18f6..042e386f70 100644 --- a/orca-sql/src/main/kotlin/com/netflix/spinnaker/orca/sql/pipeline/persistence/SqlExecutionRepository.kt +++ b/orca-sql/src/main/kotlin/com/netflix/spinnaker/orca/sql/pipeline/persistence/SqlExecutionRepository.kt @@ -408,6 +408,10 @@ class SqlExecutionRepository( selectExecution(jooq, type, id) ?: throw ExecutionNotFoundException("No $type found for $id") + override fun retrieve(type: ExecutionType, id: String, includeNestedExecutions: Boolean) = + selectExecution(jooq, type, id, includeNestedExecutions = includeNestedExecutions) + ?: throw ExecutionNotFoundException("No $type found for $id") + override fun retrieve(type: ExecutionType): Observable = Observable.from( fetchExecutions { pageSize, cursor -> @@ -419,8 +423,8 @@ class SqlExecutionRepository( return retrieve(type, criteria, null) } - private fun retrieve(type: ExecutionType, criteria: ExecutionCriteria, partition: String?): Observable { - withPool(readPoolName) { + private fun retrieve(type: ExecutionType, criteria: ExecutionCriteria, partition: String?, includeNestedExecutions: Boolean = false): Observable { + withPool(poolName) { val select = jooq.selectExecutions( type, fields = selectExecutionFields(compressionProperties) + field("status"), @@ -444,7 +448,7 @@ class SqlExecutionRepository( } ) - return Observable.from(select.fetchExecutions()) + return Observable.from(select.fetchExecutions(includeNestedExecutions)) } } @@ -605,13 +609,21 @@ class SqlExecutionRepository( .fetch() log.debug("getting stage information for all the executions found so far") - return ExecutionMapper(mapper, stageReadSize,compressionProperties, pipelineRefEnabled).map(baseQuery.intoResultSet(), jooq) + return ExecutionMapper(mapper, stageReadSize,compressionProperties, pipelineRefEnabled).map(baseQuery.intoResultSet(), jooq, false) } } override fun retrievePipelinesForPipelineConfigId( pipelineConfigId: String, criteria: ExecutionCriteria + ): Observable { + return retrievePipelinesForPipelineConfigId(pipelineConfigId, criteria, false) + } + + override fun retrievePipelinesForPipelineConfigId( + pipelineConfigId: String, + criteria: ExecutionCriteria, + includeNestedExecutions: Boolean ): Observable { // When not filtering by status, provide an index hint to ensure use of `pipeline_config_id_idx` which // fully satisfies the where clause and order by. Without, some lookups by config_id matching thousands @@ -644,7 +656,7 @@ class SqlExecutionRepository( ) } - return Observable.from(select.fetchExecutions()) + return Observable.from(select.fetchExecutions(includeNestedExecutions)) } } @@ -687,7 +699,7 @@ class SqlExecutionRepository( ordered.offset((criteria.page - 1) * criteria.pageSize).limit(criteria.pageSize) } - ).fetchExecutions().toMutableList() + ).fetchExecutions(false).toMutableList() } } @@ -710,7 +722,7 @@ class SqlExecutionRepository( ) as Any ) ) - .fetchExecution() + .fetchExecution(false) if (execution == null) { throw ExecutionNotFoundException("No Orchestration found for correlation ID $correlationId") @@ -744,7 +756,7 @@ class SqlExecutionRepository( ) as Any ) ) - .fetchExecution() + .fetchExecution(false) if (execution == null) { throw ExecutionNotFoundException("No Pipeline found for correlation ID $correlationId") @@ -770,8 +782,8 @@ class SqlExecutionRepository( .setStatuses(BUFFERED) .let { criteria -> rx.Observable.merge( - retrieve(ORCHESTRATION, criteria, partitionName), - retrieve(PIPELINE, criteria, partitionName) + retrieve(ORCHESTRATION, criteria, partitionName, false), + retrieve(PIPELINE, criteria, partitionName, false) ).toList().toBlocking().single() } @@ -902,7 +914,7 @@ class SqlExecutionRepository( else -> field("id").asc() } ) - return select.fetchExecutions().toList() + return select.fetchExecutions(false).toList() } } @@ -1302,11 +1314,16 @@ class SqlExecutionRepository( private fun selectExecution( ctx: DSLContext, type: ExecutionType, - id: String + id: String, + forUpdate: Boolean = false, + includeNestedExecutions: Boolean = false ): PipelineExecution? { withPool(poolName) { val select = ctx.selectExecution(type, compressionProperties).where(id.toWhereCondition()) - return select.fetchExecution() + if (forUpdate) { + select.forUpdate() + } + return select.fetchExecution(includeNestedExecutions) } } @@ -1314,6 +1331,7 @@ class SqlExecutionRepository( type: ExecutionType, limit: Int, cursor: String?, + includeNestedExecutions: Boolean = false, where: ((SelectJoinStep) -> SelectConditionStep)? = null ): Collection { withPool(readPoolName) { @@ -1340,7 +1358,7 @@ class SqlExecutionRepository( } ) - return select.fetchExecutions() + return select.fetchExecutions(includeNestedExecutions) } } @@ -1393,11 +1411,11 @@ class SqlExecutionRepository( .let { seek(it) } } - private fun SelectForUpdateStep.fetchExecutions() = - ExecutionMapper(mapper, stageReadSize, compressionProperties, pipelineRefEnabled).map(fetch().intoResultSet(), jooq) + private fun SelectForUpdateStep.fetchExecutions(fetchNestedExecution: Boolean) = + ExecutionMapper(mapper, stageReadSize, compressionProperties, pipelineRefEnabled).map(fetch().intoResultSet(), jooq, fetchNestedExecution) - private fun SelectForUpdateStep.fetchExecution() = - fetchExecutions().firstOrNull() + private fun SelectForUpdateStep.fetchExecution(includeNestedExecutions: Boolean) = + fetchExecutions(includeNestedExecutions).firstOrNull() private fun fetchExecutions(nextPage: (Int, String?) -> Iterable) = object : Iterable { diff --git a/orca-sql/src/main/kotlin/com/netflix/spinnaker/orca/sql/pipeline/persistence/jooq.kt b/orca-sql/src/main/kotlin/com/netflix/spinnaker/orca/sql/pipeline/persistence/jooq.kt index aa6eecb618..4e35304560 100644 --- a/orca-sql/src/main/kotlin/com/netflix/spinnaker/orca/sql/pipeline/persistence/jooq.kt +++ b/orca-sql/src/main/kotlin/com/netflix/spinnaker/orca/sql/pipeline/persistence/jooq.kt @@ -142,7 +142,7 @@ internal fun SelectForUpdateStep.fetchExecutions( jooq: DSLContext, pipelineRefEnabled: Boolean ) = - ExecutionMapper(mapper, stageReadSize, compressionProperties, pipelineRefEnabled).map(fetch().intoResultSet(), jooq) + ExecutionMapper(mapper, stageReadSize, compressionProperties, pipelineRefEnabled).map(fetch().intoResultSet(), jooq, pipelineRefEnabled) private fun selectStageFields(compressionProperties: ExecutionCompressionProperties): List> { diff --git a/orca-sql/src/test/kotlin/com/netflix/spinnaker/orca/sql/pipeline/persistence/ExecutionMapperTest.kt b/orca-sql/src/test/kotlin/com/netflix/spinnaker/orca/sql/pipeline/persistence/ExecutionMapperTest.kt index 4f8b35478a..c54791059b 100644 --- a/orca-sql/src/test/kotlin/com/netflix/spinnaker/orca/sql/pipeline/persistence/ExecutionMapperTest.kt +++ b/orca-sql/src/test/kotlin/com/netflix/spinnaker/orca/sql/pipeline/persistence/ExecutionMapperTest.kt @@ -90,7 +90,7 @@ class ExecutionMapperTest : JUnit5Minutests { val spyMapper = Mockito.spy(mapper) doReturn(DefaultTrigger(type = "default")).`when`(mockedExecution).trigger - spyMapper.convertPipelineRefTrigger(mockedExecution, database) + spyMapper.convertPipelineRefTrigger(mockedExecution, database, false) verify(mockedExecution, times(1)).trigger verify(spyMapper, times(0)).fetchParentExecution(any(), any(), any()) } @@ -103,11 +103,24 @@ class ExecutionMapperTest : JUnit5Minutests { doReturn(PipelineRefTrigger(parentExecutionId = "test-parent-id")).`when`(mockedExecution).trigger doReturn(ExecutionType.PIPELINE).`when`(mockedExecution).type doReturn(null).`when`(spyMapper).fetchParentExecution(any(), any(), any()) - spyMapper.convertPipelineRefTrigger(mockedExecution, database) + spyMapper.convertPipelineRefTrigger(mockedExecution, database, true) verify(mockedExecution, times(1)).trigger verify(spyMapper, times(1)).fetchParentExecution(any(), any(), any()) } + test("conversion ignored when trigger is PipelineRef but includeNestedExecutions is false") { + val mockedExecution = mock() + doReturn(PipelineRefTrigger(parentExecutionId = "test-parent-id")).`when`(mockedExecution).trigger + doReturn(ExecutionType.PIPELINE).`when`(mockedExecution).type + + val mapper = ExecutionMapper(mapper = ObjectMapper(), stageBatchSize = 200, compressionProperties = compressionProperties, true) + val spyMapper = Mockito.spy(mapper) + + spyMapper.convertPipelineRefTrigger(mockedExecution, database, false) + verify(mockedExecution, times(1)).trigger + verify(spyMapper, times(0)).fetchParentExecution(any(), any(), any()) + } + test("conversion is processed when trigger is PipelineRef") { val correlationId = "test-correlation" val parentExecutionId = "test-execution" @@ -134,7 +147,51 @@ class ExecutionMapperTest : JUnit5Minutests { doReturn(mockedParentExecution).`when`(spyMapper).fetchParentExecution(any(), any(), any()) - spyMapper.convertPipelineRefTrigger(execution, database) + spyMapper.convertPipelineRefTrigger(execution, database, true) + + expectThat(execution.trigger) { + isA() + get { this.correlationId }.isEqualTo(correlationId) + get { this.parameters }.isEqualTo(parameters) + get { this.artifacts }.isEqualTo(artifacts) + get { this.resolvedExpectedArtifacts }.isEqualTo(resolvedExpectedArtifact) + get { this.other }.isEqualTo(otherTest) + get { this.notifications }.isEmpty() + } + + expectThat(execution.trigger as PipelineTrigger) + .get(PipelineTrigger::parentExecution).isEqualTo(mockedParentExecution) + + verify(spyMapper, times(1)).fetchParentExecution(any(), any(), any()) + } + + test("conversion is processed when trigger is PipelineRef and feature flag is disabled") { + val correlationId = "test-correlation" + val parentExecutionId = "test-execution" + val parameters = mutableMapOf("test-parameter" to "test-body") + val artifacts = mutableListOf(Artifact.builder().build()) + val resolvedExpectedArtifact = mutableListOf(ExpectedArtifact.builder().boundArtifact(Artifact.builder().build()).build()) + val otherTest = mutableMapOf("test-other" to "other-body") + + val execution = PipelineExecutionImpl(ExecutionType.PIPELINE, "test-app").apply { + trigger = PipelineRefTrigger( + correlationId = correlationId, + parentExecutionId = parentExecutionId, + parameters = parameters, + artifacts = artifacts + ).apply { + resolvedExpectedArtifacts = resolvedExpectedArtifact + other = otherTest + } + } + + val mockedParentExecution = mock() + val mapper = ExecutionMapper(mapper = ObjectMapper(), stageBatchSize = 200, compressionProperties = compressionProperties, false) + val spyMapper = Mockito.spy(mapper) + + doReturn(mockedParentExecution).`when`(spyMapper).fetchParentExecution(any(), any(), any()) + + spyMapper.convertPipelineRefTrigger(execution, database, true) expectThat(execution.trigger) { isA() diff --git a/orca-sql/src/test/kotlin/com/netflix/spinnaker/orca/sql/pipeline/persistence/SqlExecutionRepositoryTest.kt b/orca-sql/src/test/kotlin/com/netflix/spinnaker/orca/sql/pipeline/persistence/SqlExecutionRepositoryTest.kt index 1d0632282a..7f7e6cb8d9 100644 --- a/orca-sql/src/test/kotlin/com/netflix/spinnaker/orca/sql/pipeline/persistence/SqlExecutionRepositoryTest.kt +++ b/orca-sql/src/test/kotlin/com/netflix/spinnaker/orca/sql/pipeline/persistence/SqlExecutionRepositoryTest.kt @@ -633,6 +633,100 @@ class SqlExecutionRepositoryTest : JUnit5Minutests { } } + + context("Execution repository can include or skip nested executions when PipelineRef is enabled") { + val testType = ExecutionType.PIPELINE + val testApplication = "test-application" + val parentExecutionPipeline = PipelineExecutionImpl(testType, testApplication) + + val pipelineExecutionWithPipelineTrigger = PipelineExecutionImpl(testType, testApplication).also { + it.trigger = PipelineTrigger(parentExecution = parentExecutionPipeline) + }.apply { stage {} } + val pipelineIdWithPipelineTrigger = pipelineExecutionWithPipelineTrigger.id + + before { + this.addCustomDeserializerWithFeatureFlagEnabled() + sqlExecutionRepositoryWithPipelineRefOnly.store(parentExecutionPipeline) + } + + test("retrieve execution contains PipelineTrigger if includeNestedExecutions=true") { + sqlExecutionRepositoryWithPipelineRefOnly.store(pipelineExecutionWithPipelineTrigger) + pipelineExecutionWithPipelineTrigger.trigger = PipelineTrigger(parentExecution = parentExecutionPipeline) + + val actualPipelineExecution = sqlExecutionRepositoryWithPipelineRefOnly.retrieve(testType, pipelineIdWithPipelineTrigger, true) + assertThat(actualPipelineExecution).isEqualTo(pipelineExecutionWithPipelineTrigger) + assertThat(actualPipelineExecution.trigger).isInstanceOf(PipelineTrigger::class.java) + } + + test("retrieve execution contains PipelineRefTrigger if includeNestedExecutions=false") { + sqlExecutionRepositoryWithPipelineRefOnly.store(pipelineExecutionWithPipelineTrigger) + pipelineExecutionWithPipelineTrigger.trigger = PipelineTrigger(parentExecution = parentExecutionPipeline) + + val actualPipelineExecution = sqlExecutionRepositoryWithPipelineRefOnly.retrieve(testType, pipelineIdWithPipelineTrigger, false) + assertThat(actualPipelineExecution).isEqualTo(pipelineExecutionWithPipelineTrigger) + assertThat(actualPipelineExecution.trigger).isInstanceOf(PipelineRefTrigger::class.java) + } + } + + context("Execution repository can include or skip nested executions when PipelineRef is disabled") { + val testType = ExecutionType.PIPELINE + val testApplication = "test-application" + val parentExecutionPipeline = PipelineExecutionImpl(testType, testApplication) + + val pipelineExecutionWithPipelineTrigger = PipelineExecutionImpl(testType, testApplication).also { + it.trigger = PipelineTrigger(parentExecution = parentExecutionPipeline) + }.apply { stage {} } + val pipelineIdWithPipelineTrigger = pipelineExecutionWithPipelineTrigger.id + + before { + this.addCustomDeserializerWithFeatureFlagDisabled() + sqlExecutionRepositoryNoCompression.store(parentExecutionPipeline) + } + + test("retrieve execution contains PipelineTrigger if includeNestedExecutions=true") { + sqlExecutionRepositoryNoCompression.store(pipelineExecutionWithPipelineTrigger) + pipelineExecutionWithPipelineTrigger.trigger = PipelineTrigger(parentExecution = parentExecutionPipeline) + + val actualPipelineExecution = sqlExecutionRepositoryNoCompression.retrieve(testType, pipelineIdWithPipelineTrigger, true) + assertThat(actualPipelineExecution).isEqualTo(pipelineExecutionWithPipelineTrigger) + assertThat(actualPipelineExecution.trigger).isInstanceOf(PipelineTrigger::class.java) + } + + test("retrieve execution contains PipelineTrigger if includeNestedExecutions=false") { + sqlExecutionRepositoryNoCompression.store(pipelineExecutionWithPipelineTrigger) + pipelineExecutionWithPipelineTrigger.trigger = PipelineTrigger(parentExecution = parentExecutionPipeline) + + val actualPipelineExecution = sqlExecutionRepositoryNoCompression.retrieve(testType, pipelineIdWithPipelineTrigger, false) + assertThat(actualPipelineExecution).isEqualTo(pipelineExecutionWithPipelineTrigger) + assertThat(actualPipelineExecution.trigger).isInstanceOf(PipelineTrigger::class.java) + } + + test("retrieve execution contains PipelineTrigger if includeNestedExecutions=false and execution in repository has a PipelineRefTrigger") { + //simulate a pipeline with pipelineTrigger is stored as pipelineRef + this.addCustomDeserializerWithFeatureFlagEnabled() + sqlExecutionRepositoryNoCompression.store(pipelineExecutionWithPipelineTrigger) + pipelineExecutionWithPipelineTrigger.trigger = PipelineTrigger(parentExecution = parentExecutionPipeline) + + //disabling the flag simulate the flag was disabled but we still have an execution with PipelineRefTrigger + this.addCustomDeserializerWithFeatureFlagDisabled() + val actualPipelineExecution = sqlExecutionRepositoryNoCompression.retrieve(testType, pipelineIdWithPipelineTrigger, false) + assertThat(actualPipelineExecution).isEqualTo(pipelineExecutionWithPipelineTrigger) + assertThat(actualPipelineExecution.trigger).isInstanceOf(PipelineTrigger::class.java) + } + + test("retrieve execution contains PipelineTrigger if includeNestedExecutions=true and execution in repository has a PipelineRefTrigger") { + //simulate a pipeline with pipelineTrigger is stored as pipelineRef + this.addCustomDeserializerWithFeatureFlagEnabled() + sqlExecutionRepositoryNoCompression.store(pipelineExecutionWithPipelineTrigger) + pipelineExecutionWithPipelineTrigger.trigger = PipelineTrigger(parentExecution = parentExecutionPipeline) + + //disabling the flag simulate the flag was disabled but we still have an execution with PipelineRefTrigger + this.addCustomDeserializerWithFeatureFlagDisabled() + val actualPipelineExecution = sqlExecutionRepositoryNoCompression.retrieve(testType, pipelineIdWithPipelineTrigger, true) + assertThat(actualPipelineExecution).isEqualTo(pipelineExecutionWithPipelineTrigger) + assertThat(actualPipelineExecution.trigger).isInstanceOf(PipelineTrigger::class.java) + } + } } private inner class Fixture { diff --git a/orca-web/src/main/groovy/com/netflix/spinnaker/orca/controllers/TaskController.groovy b/orca-web/src/main/groovy/com/netflix/spinnaker/orca/controllers/TaskController.groovy index 3f4c4b9f79..9598e1d116 100644 --- a/orca-web/src/main/groovy/com/netflix/spinnaker/orca/controllers/TaskController.groovy +++ b/orca-web/src/main/groovy/com/netflix/spinnaker/orca/controllers/TaskController.groovy @@ -243,7 +243,7 @@ class TaskController { List ids = pipelineConfigIds.split(',') List allPipelines = rx.Observable.merge(ids.collect { - executionRepository.retrievePipelinesForPipelineConfigId(it, executionCriteria) + executionRepository.retrievePipelinesForPipelineConfigId(it, executionCriteria,true) }).subscribeOn(Schedulers.io()).toList().toBlocking().single().sort(startTimeOrId) if (!expand) { @@ -516,7 +516,7 @@ class TaskController { Map evaluateExpressionForExecution(@PathVariable("id") String id, @RequestParam("expression") String expression) { - def execution = executionRepository.retrieve(PIPELINE, id) + def execution = executionRepository.retrieve(PIPELINE, id, true) def context = [ execution: execution, trigger : mapper.convertValue(execution.trigger, Map.class) @@ -535,7 +535,7 @@ class TaskController { Map evaluateExpressionForExecutionAtStage(@PathVariable("id") String id, @PathVariable("stageId") String stageId, @RequestParam("expression") String expression) { - def execution = executionRepository.retrieve(PIPELINE, id) + def execution = executionRepository.retrieve(PIPELINE, id,true) def stage = execution.stages.find { it.id == stageId } if (stage == null) { diff --git a/orca-web/src/test/groovy/com/netflix/spinnaker/orca/controllers/TaskControllerSpec.groovy b/orca-web/src/test/groovy/com/netflix/spinnaker/orca/controllers/TaskControllerSpec.groovy index 7f03042fe5..e4f139fc4d 100644 --- a/orca-web/src/test/groovy/com/netflix/spinnaker/orca/controllers/TaskControllerSpec.groovy +++ b/orca-web/src/test/groovy/com/netflix/spinnaker/orca/controllers/TaskControllerSpec.groovy @@ -270,7 +270,7 @@ class TaskControllerSpec extends Specification { void '/applications/{application}/evaluateExpressions precomputes values'() { given: - executionRepository.retrieve(ExecutionType.PIPELINE, "1") >> { + executionRepository.retrieve(ExecutionType.PIPELINE, "1", true) >> { pipeline { id = "1" application = "doesn't matter" @@ -307,7 +307,7 @@ class TaskControllerSpec extends Specification { [pipelineConfigId: "3", id: "started-5", application: "covfefe", startTime: clock.instant().minus(daysOfExecutionHistory, DAYS).minus(2, HOURS).toEpochMilli(), id: 'old-3'] ] - executionRepository.retrievePipelinesForPipelineConfigId("1", _) >> rx.Observable.from(pipelines.findAll { + executionRepository.retrievePipelinesForPipelineConfigId("1", _, true) >> rx.Observable.from(pipelines.findAll { it.pipelineConfigId == "1" }.collect { config -> pipeline { @@ -317,7 +317,7 @@ class TaskControllerSpec extends Specification { pipelineConfigId = config.pipelineConfigId } }) - executionRepository.retrievePipelinesForPipelineConfigId("2", _) >> rx.Observable.from(pipelines.findAll { + executionRepository.retrievePipelinesForPipelineConfigId("2", _, true) >> rx.Observable.from(pipelines.findAll { it.pipelineConfigId == "2" }.collect { config -> pipeline { @@ -327,7 +327,7 @@ class TaskControllerSpec extends Specification { pipelineConfigId = config.pipelineConfigId } }) - executionRepository.retrievePipelinesForPipelineConfigId("3", _) >> rx.Observable.from(pipelines.findAll { + executionRepository.retrievePipelinesForPipelineConfigId("3", _, true) >> rx.Observable.from(pipelines.findAll { it.pipelineConfigId == "3" }.collect { config -> pipeline { diff --git a/orca-web/src/test/kotlin/com/netflix/spinnaker/orca/controllers/TaskControllerTest.kt b/orca-web/src/test/kotlin/com/netflix/spinnaker/orca/controllers/TaskControllerTest.kt index 6a5a2b3e23..0520d9ea6b 100644 --- a/orca-web/src/test/kotlin/com/netflix/spinnaker/orca/controllers/TaskControllerTest.kt +++ b/orca-web/src/test/kotlin/com/netflix/spinnaker/orca/controllers/TaskControllerTest.kt @@ -33,8 +33,6 @@ import dev.minutest.rootContext import org.jooq.exception.DataAccessException import org.jooq.impl.DSL.field import org.jooq.impl.DSL.table -import org.junit.Assert.assertThrows -import org.junit.jupiter.api.assertThrows import org.mockito.Mockito import org.springframework.test.web.servlet.MockMvc import org.springframework.test.web.servlet.request.MockMvcRequestBuilders.get