diff --git a/src/main/java/org/onedatashare/transferservice/odstransferservice/config/RabbitMQConfig.java b/src/main/java/org/onedatashare/transferservice/odstransferservice/config/RabbitMQConfig.java index be81ad00..502cf6da 100644 --- a/src/main/java/org/onedatashare/transferservice/odstransferservice/config/RabbitMQConfig.java +++ b/src/main/java/org/onedatashare/transferservice/odstransferservice/config/RabbitMQConfig.java @@ -21,6 +21,12 @@ public class RabbitMQConfig { @Value("${ods.rabbitmq.routingkey}") String routingKey; + @Value("${ods.rabbitmq.dead-letter-routing-key}") + private String deadLetterRoutingKey; + + @Value("${ods.rabbitmq.dead-letter-queue}") + private String deadLetterQueueName; + @Bean public Gson gson() { GsonBuilder builder = new GsonBuilder() @@ -34,6 +40,12 @@ Queue userQueue(){ return new Queue(this.queueName, true, false, false); } + @Bean + public Queue deadLetterQueue() { + return new Queue(this.deadLetterQueueName, true,false, false); + } + + @Bean public DirectExchange exchange(){ return new DirectExchange(exchange); @@ -45,4 +57,12 @@ public Binding binding(DirectExchange exchange, Queue userQueue){ .to(exchange) .with(routingKey); } + + + @Bean + public Binding deadLetterBinding(DirectExchange exchange, Queue deadLetterQueue){ + return BindingBuilder.bind(deadLetterQueue) + .to(exchange) + .with(deadLetterRoutingKey); + } } diff --git a/src/main/java/org/onedatashare/transferservice/odstransferservice/consumer/RabbitMQConsumer.java b/src/main/java/org/onedatashare/transferservice/odstransferservice/consumer/RabbitMQConsumer.java index e5afc255..793e7a59 100644 --- a/src/main/java/org/onedatashare/transferservice/odstransferservice/consumer/RabbitMQConsumer.java +++ b/src/main/java/org/onedatashare/transferservice/odstransferservice/consumer/RabbitMQConsumer.java @@ -9,20 +9,24 @@ import org.onedatashare.transferservice.odstransferservice.Enum.EndpointType; import org.onedatashare.transferservice.odstransferservice.model.EntityInfo; import org.onedatashare.transferservice.odstransferservice.model.TransferJobRequest; +import org.onedatashare.transferservice.odstransferservice.model.DeadLetterQueueData; import org.onedatashare.transferservice.odstransferservice.model.optimizer.TransferApplicationParams; import org.onedatashare.transferservice.odstransferservice.pools.ThreadPoolManager; import org.onedatashare.transferservice.odstransferservice.service.DatabaseService.CrudService; +import org.onedatashare.transferservice.odstransferservice.service.DeadLetterQueueService; import org.onedatashare.transferservice.odstransferservice.service.JobControl; import org.onedatashare.transferservice.odstransferservice.service.JobParamService; import org.onedatashare.transferservice.odstransferservice.service.VfsExpander; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.amqp.core.AmqpTemplate; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.Queue; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.batch.core.JobParameters; import org.springframework.batch.core.JobParametersBuilder; import org.springframework.batch.core.launch.JobLauncher; +import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; import java.util.ArrayList; @@ -47,7 +51,17 @@ public class RabbitMQConsumer { VfsExpander vfsExpander; - public RabbitMQConsumer(VfsExpander vfsExpander, Queue userQueue, JobParamService jobParamService, JobLauncher asyncJobLauncher, JobControl jc, CrudService crudService, ThreadPoolManager threadPoolManager) { + DeadLetterQueueService deadLetterQueueService; + + @Value("${ods.rabbitmq.exchange}") + private String deadLetterExchange; + + @Value("${ods.rabbitmq.dead-letter-routing-key}") + private String deadLetterRoutingKey; + + AmqpTemplate rmqTemplate; + + public RabbitMQConsumer(VfsExpander vfsExpander, Queue userQueue, JobParamService jobParamService, JobLauncher asyncJobLauncher, JobControl jc, CrudService crudService, ThreadPoolManager threadPoolManager, AmqpTemplate rmqTemplate, DeadLetterQueueService deadLetterQueueService) { this.vfsExpander = vfsExpander; this.userQueue = userQueue; this.jobParamService = jobParamService; @@ -58,12 +72,25 @@ public RabbitMQConsumer(VfsExpander vfsExpander, Queue userQueue, JobParamServic this.objectMapper = new ObjectMapper(); this.objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); this.objectMapper.setDefaultPropertyInclusion(JsonInclude.Include.ALWAYS); + this.rmqTemplate = rmqTemplate; + this.deadLetterQueueService = deadLetterQueueService; } + //Uncomment and use in future for processing the messages from DeadLetterQueue +// @RabbitListener(queues = "ods.dead-letter-queue") +// public void handleFailedMessage( Message message) { +// Object payload = new SimpleMessageConverter().fromMessage(message); +// if (payload instanceof DeadLetterQueueData) { +// DeadLetterQueueData myObject = (DeadLetterQueueData) payload; +// } +// } + @RabbitListener(queues = "#{userQueue}") public void consumeDefaultMessage(final Message message) { String jsonStr = new String(message.getBody()); logger.info("Message recv: {}", jsonStr); + JobParameters parameters = null; + List failureException = new ArrayList<>(); try { TransferJobRequest request = objectMapper.readValue(jsonStr, TransferJobRequest.class); logger.info("Job Recieved: {}",request.toString()); @@ -72,15 +99,17 @@ public void consumeDefaultMessage(final Message message) { request.getSource().setInfoList(new ArrayList<>(fileExpandedList)); } try { - JobParameters parameters = jobParamService.translate(new JobParametersBuilder(), request); + parameters = jobParamService.translate(new JobParametersBuilder(), request); crudService.insertBeforeTransfer(request); jc.setRequest(request); asyncJobLauncher.run(jc.concurrentJobDefinition(), parameters); return; } catch (Exception e) { + failureException.add(e); e.printStackTrace(); } } catch (JsonProcessingException e) { + failureException.add(e); logger.debug("Failed to parse jsonStr:{} to TransferJobRequest.java", jsonStr); } try { @@ -89,6 +118,9 @@ public void consumeDefaultMessage(final Message message) { this.threadPoolManager.applyOptimizer(params.getConcurrency(), params.getParallelism()); } catch (JsonProcessingException e) { logger.info("Did not apply transfer params due to parsing message failure"); + failureException.add(e); + DeadLetterQueueData failedMessage = deadLetterQueueService.convertDataToDLQ( parameters, failureException, null); + rmqTemplate.convertAndSend(deadLetterExchange,deadLetterRoutingKey,failedMessage); e.printStackTrace(); } } diff --git a/src/main/java/org/onedatashare/transferservice/odstransferservice/model/DeadLetterQueueData.java b/src/main/java/org/onedatashare/transferservice/odstransferservice/model/DeadLetterQueueData.java new file mode 100644 index 00000000..e5e3ab0f --- /dev/null +++ b/src/main/java/org/onedatashare/transferservice/odstransferservice/model/DeadLetterQueueData.java @@ -0,0 +1,17 @@ +package org.onedatashare.transferservice.odstransferservice.model; + +import lombok.Data; +import org.springframework.batch.core.JobParameters; +import org.springframework.batch.core.StepExecution; + +import java.io.Serializable; +import java.util.Collection; +import java.util.List; + +@Data +public class DeadLetterQueueData implements Serializable { + public List failureException; + public JobParameters jobParameters; + public Collection stepExecutions; +} + diff --git a/src/main/java/org/onedatashare/transferservice/odstransferservice/service/DeadLetterQueueService.java b/src/main/java/org/onedatashare/transferservice/odstransferservice/service/DeadLetterQueueService.java new file mode 100644 index 00000000..95897327 --- /dev/null +++ b/src/main/java/org/onedatashare/transferservice/odstransferservice/service/DeadLetterQueueService.java @@ -0,0 +1,23 @@ +package org.onedatashare.transferservice.odstransferservice.service; + +import org.onedatashare.transferservice.odstransferservice.model.DeadLetterQueueData; +import org.springframework.batch.core.JobParameters; +import org.springframework.batch.core.StepExecution; +import org.springframework.stereotype.Service; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; + +@Service +public class DeadLetterQueueService { + + public DeadLetterQueueData convertDataToDLQ(JobParameters jobParameters, List failureExceptions, Collection stepExecutions){ + DeadLetterQueueData data = new DeadLetterQueueData(); + data.failureException = new ArrayList<>(failureExceptions); + data.jobParameters = jobParameters; + data.stepExecutions = new ArrayList<>(stepExecutions); + return data; + } + +} diff --git a/src/main/java/org/onedatashare/transferservice/odstransferservice/service/listner/JobCompletionListener.java b/src/main/java/org/onedatashare/transferservice/odstransferservice/service/listner/JobCompletionListener.java index 6a92a8b7..30d96bdc 100644 --- a/src/main/java/org/onedatashare/transferservice/odstransferservice/service/listner/JobCompletionListener.java +++ b/src/main/java/org/onedatashare/transferservice/odstransferservice/service/listner/JobCompletionListener.java @@ -1,14 +1,17 @@ package org.onedatashare.transferservice.odstransferservice.service.listner; import org.onedatashare.transferservice.odstransferservice.constant.ODSConstants; +import org.onedatashare.transferservice.odstransferservice.model.DeadLetterQueueData; import org.onedatashare.transferservice.odstransferservice.model.optimizer.OptimizerCreateRequest; import org.onedatashare.transferservice.odstransferservice.model.optimizer.OptimizerDeleteRequest; import org.onedatashare.transferservice.odstransferservice.pools.ThreadPoolManager; import org.onedatashare.transferservice.odstransferservice.service.ConnectionBag; +import org.onedatashare.transferservice.odstransferservice.service.DeadLetterQueueService; import org.onedatashare.transferservice.odstransferservice.service.OptimizerService; import org.onedatashare.transferservice.odstransferservice.service.cron.MetricsCollector; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.amqp.core.AmqpTemplate; import org.springframework.batch.core.JobExecution; import org.springframework.batch.core.listener.JobExecutionListenerSupport; import org.springframework.beans.factory.annotation.Value; @@ -45,12 +48,24 @@ public class JobCompletionListener extends JobExecutionListenerSupport { int maxPipe; boolean optimizerEnable; - public JobCompletionListener(ThreadPoolManager threadPoolManager, OptimizerService optimizerService, MetricsCollector metricsCollector, ConnectionBag connectionBag) { + DeadLetterQueueService deadLetterQueueService; + + @Value("${ods.rabbitmq.exchange}") + private String deadLetterExchange; + + @Value("${ods.rabbitmq.dead-letter-routing-key}") + private String deadLetterRoutingKey; + + AmqpTemplate rmqTemplate; + + public JobCompletionListener(ThreadPoolManager threadPoolManager, OptimizerService optimizerService, MetricsCollector metricsCollector, ConnectionBag connectionBag, AmqpTemplate rmqTemplate, DeadLetterQueueService deadLetterQueueService) { this.threadPoolManager = threadPoolManager; this.optimizerService = optimizerService; this.metricsCollector = metricsCollector; this.connectionBag = connectionBag; this.optimizerEnable = false; + this.rmqTemplate = rmqTemplate; + this.deadLetterQueueService = deadLetterQueueService; } @@ -60,7 +75,7 @@ public void beforeJob(JobExecution jobExecution) { long fileCount = jobExecution.getJobParameters().getLong(ODSConstants.FILE_COUNT); String optimizerType = jobExecution.getJobParameters().getString(ODSConstants.OPTIMIZER); if(optimizerType != null){ - if(!optimizerType.equals("None")) { + if(!optimizerType.equals("None") && !optimizerType.isEmpty()) { OptimizerCreateRequest createRequest = new OptimizerCreateRequest(appName, maxConc, maxParallel, maxPipe, optimizerType, fileCount); optimizerService.createOptimizerBlocking(createRequest); this.optimizerEnable = true; @@ -84,6 +99,11 @@ public void afterJob(JobExecution jobExecution) { this.optimizerService.deleteOptimizerBlocking(new OptimizerDeleteRequest(appName)); this.optimizerEnable = false; } + String exitCode = jobExecution.getExitStatus().getExitCode(); + if(!exitCode.equals("EXECUTING") && !exitCode.equals("COMPLETED")){ + DeadLetterQueueData failedMessage = deadLetterQueueService.convertDataToDLQ(jobExecution.getJobParameters(), jobExecution.getFailureExceptions(), jobExecution.getStepExecutions()); + rmqTemplate.convertAndSend(deadLetterExchange,deadLetterRoutingKey,failedMessage); + } } } diff --git a/src/main/java/org/onedatashare/transferservice/odstransferservice/service/step/vfs/VfsWriter.java b/src/main/java/org/onedatashare/transferservice/odstransferservice/service/step/vfs/VfsWriter.java index 20408e4b..29146d38 100644 --- a/src/main/java/org/onedatashare/transferservice/odstransferservice/service/step/vfs/VfsWriter.java +++ b/src/main/java/org/onedatashare/transferservice/odstransferservice/service/step/vfs/VfsWriter.java @@ -53,13 +53,11 @@ public ExitStatus afterStep(StepExecution stepExecution) throws IOException { return stepExecution.getExitStatus(); } - public void prepareDirectories() { + public void prepareDirectories() throws IOException{ try { Files.createDirectories(Paths.get(this.destinationPath)); } catch (FileAlreadyExistsException fileAlreadyExistsException) { logger.warn("Already have the file with this path \t" + this.filePath.toString()); - } catch (IOException e) { - e.printStackTrace(); } } diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties index f5ef659b..810b25d0 100644 --- a/src/main/resources/application.properties +++ b/src/main/resources/application.properties @@ -32,6 +32,8 @@ spring.batch.job.enabled=false #RabitMQ ods.rabbitmq.exchange=ods.exchange +ods.rabbitmq.dead-letter-routing-key= ods.dead-letter-routing-key +ods.rabbitmq.dead-letter-queue= ods.dead-letter-queue #for vfs nodes this should be the APP_NAME which is always lowercase. ods.rabbitmq.queue=${CONNECTOR_QUEUE:transferQueue} ods.rabbitmq.routingkey=${CONNECTOR_QUEUE:ods.routing}