diff --git a/orca-front50/orca-front50.gradle b/orca-front50/orca-front50.gradle index 6fdd7c667a..328f989cf4 100644 --- a/orca-front50/orca-front50.gradle +++ b/orca-front50/orca-front50.gradle @@ -38,6 +38,7 @@ dependencies { testImplementation(project(":orca-test-groovy")) testImplementation(project(":orca-pipelinetemplate")) testImplementation("com.github.ben-manes.caffeine:guava") + testImplementation("org.codehaus.groovy:groovy-json") testRuntimeOnly("net.bytebuddy:byte-buddy") } diff --git a/orca-front50/src/main/groovy/com/netflix/spinnaker/orca/front50/Front50Service.groovy b/orca-front50/src/main/groovy/com/netflix/spinnaker/orca/front50/Front50Service.groovy index a515c4fa27..e587812024 100644 --- a/orca-front50/src/main/groovy/com/netflix/spinnaker/orca/front50/Front50Service.groovy +++ b/orca-front50/src/main/groovy/com/netflix/spinnaker/orca/front50/Front50Service.groovy @@ -74,6 +74,9 @@ interface Front50Service { @POST("/pipelines") Response savePipeline(@Body Map pipeline, @Query("staleCheck") boolean staleCheck) + @POST("/pipelines/batchUpdate") + Response savePipelines(@Body List> pipelines, @Query("staleCheck") boolean staleCheck) + @PUT("/pipelines/{pipelineId}") Response updatePipeline(@Path("pipelineId") String pipelineId, @Body Map pipeline) diff --git a/orca-front50/src/main/groovy/com/netflix/spinnaker/orca/front50/config/Front50Configuration.groovy b/orca-front50/src/main/groovy/com/netflix/spinnaker/orca/front50/config/Front50Configuration.groovy index fb4b3e4a80..00ec8b3a56 100644 --- a/orca-front50/src/main/groovy/com/netflix/spinnaker/orca/front50/config/Front50Configuration.groovy +++ b/orca-front50/src/main/groovy/com/netflix/spinnaker/orca/front50/config/Front50Configuration.groovy @@ -29,6 +29,7 @@ import com.netflix.spinnaker.orca.pipeline.persistence.ExecutionRepository import com.netflix.spinnaker.orca.retrofit.RetrofitConfiguration import com.netflix.spinnaker.orca.retrofit.logging.RetrofitSlf4jLog import groovy.transform.CompileStatic +import okhttp3.OkHttpClient import org.springframework.beans.factory.annotation.Autowired import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression import org.springframework.boot.context.properties.EnableConfigurationProperties @@ -42,6 +43,8 @@ import retrofit.RequestInterceptor import retrofit.RestAdapter import retrofit.converter.JacksonConverter +import java.util.concurrent.TimeUnit + import static retrofit.Endpoints.newFixedEndpoint @Configuration @@ -71,11 +74,17 @@ class Front50Configuration { } @Bean - Front50Service front50Service(Endpoint front50Endpoint, ObjectMapper mapper) { + Front50Service front50Service(Endpoint front50Endpoint, ObjectMapper mapper, Front50ConfigurationProperties front50ConfigurationProperties) { + OkHttpClient okHttpClient = clientProvider.getClient(new DefaultServiceEndpoint("front50", front50Endpoint.getUrl())); + okHttpClient = okHttpClient.newBuilder() + .readTimeout(front50ConfigurationProperties.okhttp.readTimeoutMs, TimeUnit.MILLISECONDS) + .writeTimeout(front50ConfigurationProperties.okhttp.writeTimeoutMs, TimeUnit.MILLISECONDS) + .connectTimeout(front50ConfigurationProperties.okhttp.connectTimeoutMs, TimeUnit.MILLISECONDS) + .build(); new RestAdapter.Builder() .setRequestInterceptor(spinnakerRequestInterceptor) .setEndpoint(front50Endpoint) - .setClient(new Ok3Client(clientProvider.getClient(new DefaultServiceEndpoint("front50", front50Endpoint.getUrl())))) + .setClient(new Ok3Client(okHttpClient)) .setLogLevel(retrofitLogLevel) .setLog(new RetrofitSlf4jLog(Front50Service)) .setConverter(new JacksonConverter(mapper)) diff --git a/orca-front50/src/main/groovy/com/netflix/spinnaker/orca/front50/config/Front50ConfigurationProperties.java b/orca-front50/src/main/groovy/com/netflix/spinnaker/orca/front50/config/Front50ConfigurationProperties.java index 5105a00b90..14de35840a 100644 --- a/orca-front50/src/main/groovy/com/netflix/spinnaker/orca/front50/config/Front50ConfigurationProperties.java +++ b/orca-front50/src/main/groovy/com/netflix/spinnaker/orca/front50/config/Front50ConfigurationProperties.java @@ -33,4 +33,15 @@ public class Front50ConfigurationProperties { *

When true: GET /pipelines/triggeredBy/{pipelineId}/{status} When false: GET /pipelines */ boolean useTriggeredByEndpoint = true; + + OkHttpConfigurationProperties okhttp = new OkHttpConfigurationProperties(); + + @Data + public static class OkHttpConfigurationProperties { + int readTimeoutMs = 10000; + + int writeTimeoutMs = 10000; + + int connectTimeoutMs = 10000; + } } diff --git a/orca-front50/src/main/groovy/com/netflix/spinnaker/orca/front50/spring/DependentPipelineExecutionListener.groovy b/orca-front50/src/main/groovy/com/netflix/spinnaker/orca/front50/spring/DependentPipelineExecutionListener.groovy index da3e70640c..a5493fa1f6 100644 --- a/orca-front50/src/main/groovy/com/netflix/spinnaker/orca/front50/spring/DependentPipelineExecutionListener.groovy +++ b/orca-front50/src/main/groovy/com/netflix/spinnaker/orca/front50/spring/DependentPipelineExecutionListener.groovy @@ -24,7 +24,6 @@ import com.netflix.spinnaker.orca.exceptions.PipelineTemplateValidationException import com.netflix.spinnaker.orca.api.pipeline.ExecutionPreprocessor import com.netflix.spinnaker.orca.front50.DependentPipelineStarter import com.netflix.spinnaker.orca.front50.Front50Service -import com.netflix.spinnaker.orca.front50.config.Front50Configuration import com.netflix.spinnaker.orca.front50.config.Front50ConfigurationProperties import com.netflix.spinnaker.orca.listeners.ExecutionListener import com.netflix.spinnaker.orca.listeners.Persister diff --git a/orca-front50/src/main/java/com/netflix/spinnaker/orca/front50/tasks/SavePipelineTask.java b/orca-front50/src/main/java/com/netflix/spinnaker/orca/front50/tasks/SavePipelineTask.java index 18c00fa26d..77047e270d 100644 --- a/orca-front50/src/main/java/com/netflix/spinnaker/orca/front50/tasks/SavePipelineTask.java +++ b/orca-front50/src/main/java/com/netflix/spinnaker/orca/front50/tasks/SavePipelineTask.java @@ -15,7 +15,10 @@ */ package com.netflix.spinnaker.orca.front50.tasks; +import static java.net.HttpURLConnection.HTTP_NOT_FOUND; + import com.fasterxml.jackson.databind.ObjectMapper; +import com.netflix.spinnaker.kork.retrofit.exceptions.SpinnakerHttpException; import com.netflix.spinnaker.orca.api.pipeline.RetryableTask; import com.netflix.spinnaker.orca.api.pipeline.TaskResult; import com.netflix.spinnaker.orca.api.pipeline.models.ExecutionStatus; @@ -25,6 +28,7 @@ import com.netflix.spinnaker.orca.front50.pipeline.SavePipelineStage; import java.util.*; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -36,7 +40,7 @@ @Component public class SavePipelineTask implements RetryableTask { - private Logger log = LoggerFactory.getLogger(getClass()); + private final Logger log = LoggerFactory.getLogger(getClass()); @Autowired SavePipelineTask( @@ -60,62 +64,90 @@ public TaskResult execute(StageExecution stage) { "Front50 is not enabled, no way to save pipeline. Fix this by setting front50.enabled: true"); } - if (!stage.getContext().containsKey("pipeline")) { - throw new IllegalArgumentException("pipeline context must be provided"); - } + Map pipeline = new HashMap<>(); + List> pipelines = new ArrayList<>(); + + boolean isSavingMultiplePipelines = + (boolean) stage.getContext().getOrDefault("isSavingMultiplePipelines", false); + + boolean isBulkSavingPipelines = + (boolean) stage.getContext().getOrDefault("isBulkSavingPipelines", false); - Map pipeline; - if (!(stage.getContext().get("pipeline") instanceof String)) { - pipeline = (Map) stage.getContext().get("pipeline"); + boolean staleCheck = + (Boolean) Optional.ofNullable(stage.getContext().get("staleCheck")).orElse(false); + + if (isBulkSavingPipelines) { + if (!stage.getContext().containsKey("pipelines")) { + throw new IllegalArgumentException( + "pipelines context must be provided when saving multiple pipelines"); + } + pipelines = (List>) stage.decodeBase64("/pipelines", List.class); + log.info( + "Bulk saving the following pipelines: {}", + pipelines.stream().map(p -> p.get("name")).collect(Collectors.toList())); } else { - pipeline = (Map) stage.decodeBase64("/pipeline", Map.class); + if (!stage.getContext().containsKey("pipeline")) { + throw new IllegalArgumentException( + "pipeline context must be provided when saving a single pipeline"); + } + if (!(stage.getContext().get("pipeline") instanceof String)) { + pipeline = (Map) stage.getContext().get("pipeline"); + } else { + pipeline = (Map) stage.decodeBase64("/pipeline", Map.class); + } + pipelines.add(pipeline); + log.info("Saving single pipeline {}", pipeline.get("name")); } - if (!pipeline.containsKey("index")) { - Map existingPipeline = fetchExistingPipeline(pipeline); - if (existingPipeline != null) { - pipeline.put("index", existingPipeline.get("index")); + // Preprocess pipelines before saving + for (Map pipe : pipelines) { + if (!pipe.containsKey("index")) { + Map existingPipeline = fetchExistingPipeline(pipe); + if (existingPipeline != null) { + pipe.put("index", existingPipeline.get("index")); + } } - } - String serviceAccount = (String) stage.getContext().get("pipeline.serviceAccount"); - if (serviceAccount != null) { - updateServiceAccount(pipeline, serviceAccount); - } - final Boolean isSavingMultiplePipelines = - (Boolean) - Optional.ofNullable(stage.getContext().get("isSavingMultiplePipelines")).orElse(false); - final Boolean staleCheck = - (Boolean) Optional.ofNullable(stage.getContext().get("staleCheck")).orElse(false); - if (stage.getContext().get("pipeline.id") != null - && pipeline.get("id") == null - && !isSavingMultiplePipelines) { - pipeline.put("id", stage.getContext().get("pipeline.id")); - // We need to tell front50 to regenerate cron trigger id's - pipeline.put("regenerateCronTriggerIds", true); - } + String serviceAccount = (String) stage.getContext().get("pipeline.serviceAccount"); + if (serviceAccount != null) { + updateServiceAccount(pipe, serviceAccount); + } + + if (stage.getContext().get("pipeline.id") != null + && pipe.get("id") == null + && !isSavingMultiplePipelines) { + pipe.put("id", stage.getContext().get("pipeline.id")); - pipelineModelMutators.stream() - .filter(m -> m.supports(pipeline)) - .forEach(m -> m.mutate(pipeline)); + // We need to tell front50 to regenerate cron trigger id's + pipe.put("regenerateCronTriggerIds", true); + } + + pipelineModelMutators.stream().filter(m -> m.supports(pipe)).forEach(m -> m.mutate(pipe)); + } - Response response = front50Service.savePipeline(pipeline, staleCheck); + Response response; + if (isBulkSavingPipelines) { + response = front50Service.savePipelines(pipelines, staleCheck); + } else { + response = front50Service.savePipeline(pipeline, staleCheck); + } Map outputs = new HashMap<>(); outputs.put("notification.type", "savepipeline"); - outputs.put("application", pipeline.get("application")); - outputs.put("pipeline.name", pipeline.get("name")); + outputs.put("application", stage.getContext().get("application")); + Map saveResult = new HashMap<>(); try { - Map savedPipeline = - (Map) objectMapper.readValue(response.getBody().in(), Map.class); - outputs.put("pipeline.id", savedPipeline.get("id")); + saveResult = (Map) objectMapper.readValue(response.getBody().in(), Map.class); } catch (Exception e) { - log.error("Unable to deserialize saved pipeline, reason: ", e.getMessage()); + log.error("Unable to deserialize save pipeline(s) result, reason: ", e); + } - if (pipeline.containsKey("id")) { - outputs.put("pipeline.id", pipeline.get("id")); - } + if (isBulkSavingPipelines) { + outputs.put("bulksave", saveResult); + } else { + outputs.put("pipeline.name", pipeline.get("name")); + outputs.put("pipeline.id", saveResult.getOrDefault("id", pipeline.getOrDefault("id", ""))); } final ExecutionStatus status; @@ -161,14 +193,16 @@ private void updateServiceAccount(Map pipeline, String serviceAc } private Map fetchExistingPipeline(Map newPipeline) { - String applicationName = (String) newPipeline.get("application"); String newPipelineID = (String) newPipeline.get("id"); - if (!StringUtils.isEmpty(newPipelineID)) { - return front50Service.getPipelines(applicationName).stream() - .filter(m -> m.containsKey("id")) - .filter(m -> m.get("id").equals(newPipelineID)) - .findFirst() - .orElse(null); + if (StringUtils.isNotEmpty(newPipelineID)) { + try { + return front50Service.getPipeline(newPipelineID); + } catch (SpinnakerHttpException e) { + // Return a null if pipeline with expected id not found + if (e.getResponseCode() == HTTP_NOT_FOUND) { + log.debug("Existing pipeline with id {} not found. Returning null.", newPipelineID); + } + } } return null; } diff --git a/orca-front50/src/test/groovy/com/netflix/spinnaker/orca/front50/tasks/SavePipelineTaskSpec.groovy b/orca-front50/src/test/groovy/com/netflix/spinnaker/orca/front50/tasks/SavePipelineTaskSpec.groovy index 0b9e092444..f38a092735 100644 --- a/orca-front50/src/test/groovy/com/netflix/spinnaker/orca/front50/tasks/SavePipelineTaskSpec.groovy +++ b/orca-front50/src/test/groovy/com/netflix/spinnaker/orca/front50/tasks/SavePipelineTaskSpec.groovy @@ -17,12 +17,16 @@ package com.netflix.spinnaker.orca.front50.tasks import com.fasterxml.jackson.databind.ObjectMapper import com.google.common.collect.ImmutableMap +import com.netflix.spinnaker.orca.api.pipeline.TaskResult import com.netflix.spinnaker.orca.api.pipeline.models.ExecutionStatus +import com.netflix.spinnaker.orca.api.pipeline.models.ExecutionType import com.netflix.spinnaker.orca.front50.Front50Service import com.netflix.spinnaker.orca.front50.PipelineModelMutator import com.netflix.spinnaker.orca.pipeline.model.PipelineExecutionImpl import com.netflix.spinnaker.orca.pipeline.model.StageExecutionImpl +import groovy.json.JsonOutput import retrofit.client.Response +import retrofit.mime.TypedString import spock.lang.Specification import spock.lang.Subject @@ -42,10 +46,12 @@ class SavePipelineTaskSpec extends Specification { def pipeline = [ application: 'orca', name: 'my pipeline', + id: 'my id', stages: [] ] def stage = new StageExecutionImpl(PipelineExecutionImpl.newPipeline("orca"), "whatever", [ - pipeline: Base64.encoder.encodeToString(objectMapper.writeValueAsString(pipeline).bytes) + pipeline: Base64.encoder.encodeToString(objectMapper.writeValueAsString(pipeline).bytes), + application: 'orca' ]) when: @@ -61,7 +67,8 @@ class SavePipelineTaskSpec extends Specification { result.context == ImmutableMap.copyOf([ 'notification.type': 'savepipeline', 'application': 'orca', - 'pipeline.name': 'my pipeline' + 'pipeline.name': 'my pipeline', + 'pipeline.id': 'my id' ]) } @@ -84,7 +91,7 @@ class SavePipelineTaskSpec extends Specification { ] when: - front50Service.getPipelines(_) >> [existingPipeline] + front50Service.getPipeline(_ as String) >> existingPipeline front50Service.savePipeline(_, _) >> { Map newPipeline, Boolean staleCheck -> receivedIndex = newPipeline.get("index") new Response('http://front50', 200, 'OK', [], null) @@ -216,4 +223,123 @@ class SavePipelineTaskSpec extends Specification { then: result.status == ExecutionStatus.FAILED_CONTINUE } + + def "should save multiple pipelines"() { + given: + def pipelines = [ + [ + application: 'test_app1', + name: 'pipeline1', + id: "id1", + index: 1 + ], + [ + application: 'test_app1', + name: 'pipeline2', + id: "id2", + index: 2 + ], + [ + application: 'test_app2', + name: 'pipeline1', + id: "id3", + index: 1 + ], + [ + application: 'test_ap2', + name: 'pipeline2', + id: "id4", + index: 2 + ] + ] + def stage = new StageExecutionImpl( + new PipelineExecutionImpl(ExecutionType.ORCHESTRATION, "bulk_save_app"), + "savePipeline", + [ + application: "bulk_save_app", + pipelines: Base64.encoder.encodeToString(objectMapper.writeValueAsString(pipelines).bytes), + isBulkSavingPipelines: true + ] + ) + + when: + def result = task.execute(stage) + + then: + 1 * front50Service.savePipelines(pipelines, _) >> { + new Response('http://front50', + 200, + 'OK', + [], + new TypedString(new JsonOutput().toJson( + [ + successful_pipelines_count: 4, + successful_pipelines: ["pipeline1", "pipeline2", "pipeline3", "pipeline4"], + failed_pipelines_count: 0, + failed_pipelines: [] + ] + )) + ) + } + result == TaskResult.builder(ExecutionStatus.SUCCEEDED) + .context([ + "notification.type": "savepipeline", + application: "bulk_save_app", + bulksave: [ + successful_pipelines_count: 4, + successful_pipelines: ["pipeline1", "pipeline2", "pipeline3", "pipeline4"], + failed_pipelines_count: 0, + failed_pipelines: [] + ]]) + .build() + } + + def "should fail save multiple pipelines if no pipelines provided"() { + given: + def stage = new StageExecutionImpl( + new PipelineExecutionImpl(ExecutionType.ORCHESTRATION, "bulk_save_app"), + "savePipeline", + [ + application : "bulk_save_app", + isBulkSavingPipelines: true + ] + ) + + when: + task.execute(stage) + + then: + def error = thrown(IllegalArgumentException) + error.getMessage() == "pipelines context must be provided when saving multiple pipelines" + } + + def "should fail task when front 50 save pipelines call fails"() { + given: + def pipelines = [ + [ + application: 'test_app1', + name: 'pipeline1', + id: "id1", + index: 1 + ] + ] + def stage = new StageExecutionImpl( + new PipelineExecutionImpl(ExecutionType.ORCHESTRATION, "bulk_save_app"), + "savePipeline", + [ + application : "bulk_save_app", + isBulkSavingPipelines: true, + pipelines: Base64.encoder.encodeToString(objectMapper.writeValueAsString(pipelines).bytes) + ] + ) + + when: + def result = task.execute(stage) + + then: + 1 * front50Service.savePipelines(pipelines, _) >> { + new Response('http://front50', 500, 'OK', [], null) + } + result.status == ExecutionStatus.TERMINAL + } }