Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/dead letter queue #63

Open
wants to merge 18 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,12 @@ public class RabbitMQConfig {
@Value("${ods.rabbitmq.routingkey}")
String routingKey;

@Value("${ods.rabbitmq.dead-letter-routing-key}")
private String deadLetterRoutingKey;

@Value("${ods.rabbitmq.dead-letter-queue}")
private String deadLetterQueueName;

@Bean
public Gson gson() {
GsonBuilder builder = new GsonBuilder()
Expand All @@ -34,6 +40,12 @@ Queue userQueue(){
return new Queue(this.queueName, true, false, false);
}

@Bean
public Queue deadLetterQueue() {
return new Queue(this.deadLetterQueueName, true,false, false);
}


@Bean
public DirectExchange exchange(){
return new DirectExchange(exchange);
Expand All @@ -45,4 +57,12 @@ public Binding binding(DirectExchange exchange, Queue userQueue){
.to(exchange)
.with(routingKey);
}


@Bean
public Binding deadLetterBinding(DirectExchange exchange, Queue deadLetterQueue){
return BindingBuilder.bind(deadLetterQueue)
.to(exchange)
.with(deadLetterRoutingKey);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,20 +9,24 @@
import org.onedatashare.transferservice.odstransferservice.Enum.EndpointType;
import org.onedatashare.transferservice.odstransferservice.model.EntityInfo;
import org.onedatashare.transferservice.odstransferservice.model.TransferJobRequest;
import org.onedatashare.transferservice.odstransferservice.model.DeadLetterQueueData;
import org.onedatashare.transferservice.odstransferservice.model.optimizer.TransferApplicationParams;
import org.onedatashare.transferservice.odstransferservice.pools.ThreadPoolManager;
import org.onedatashare.transferservice.odstransferservice.service.DatabaseService.CrudService;
import org.onedatashare.transferservice.odstransferservice.service.DeadLetterQueueService;
import org.onedatashare.transferservice.odstransferservice.service.JobControl;
import org.onedatashare.transferservice.odstransferservice.service.JobParamService;
import org.onedatashare.transferservice.odstransferservice.service.VfsExpander;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;

import java.util.ArrayList;
Expand All @@ -47,7 +51,17 @@ public class RabbitMQConsumer {

VfsExpander vfsExpander;

public RabbitMQConsumer(VfsExpander vfsExpander, Queue userQueue, JobParamService jobParamService, JobLauncher asyncJobLauncher, JobControl jc, CrudService crudService, ThreadPoolManager threadPoolManager) {
DeadLetterQueueService deadLetterQueueService;

@Value("${ods.rabbitmq.exchange}")
private String deadLetterExchange;

@Value("${ods.rabbitmq.dead-letter-routing-key}")
private String deadLetterRoutingKey;

AmqpTemplate rmqTemplate;

public RabbitMQConsumer(VfsExpander vfsExpander, Queue userQueue, JobParamService jobParamService, JobLauncher asyncJobLauncher, JobControl jc, CrudService crudService, ThreadPoolManager threadPoolManager, AmqpTemplate rmqTemplate, DeadLetterQueueService deadLetterQueueService) {
this.vfsExpander = vfsExpander;
this.userQueue = userQueue;
this.jobParamService = jobParamService;
Expand All @@ -58,12 +72,25 @@ public RabbitMQConsumer(VfsExpander vfsExpander, Queue userQueue, JobParamServic
this.objectMapper = new ObjectMapper();
this.objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
this.objectMapper.setDefaultPropertyInclusion(JsonInclude.Include.ALWAYS);
this.rmqTemplate = rmqTemplate;
this.deadLetterQueueService = deadLetterQueueService;
}

//Uncomment and use in future for processing the messages from DeadLetterQueue
// @RabbitListener(queues = "ods.dead-letter-queue")
// public void handleFailedMessage( Message message) {
// Object payload = new SimpleMessageConverter().fromMessage(message);
// if (payload instanceof DeadLetterQueueData) {
// DeadLetterQueueData myObject = (DeadLetterQueueData) payload;
// }
// }

@RabbitListener(queues = "#{userQueue}")
public void consumeDefaultMessage(final Message message) {
String jsonStr = new String(message.getBody());
logger.info("Message recv: {}", jsonStr);
JobParameters parameters = null;
List <Throwable> failureException = new ArrayList<>();
try {
TransferJobRequest request = objectMapper.readValue(jsonStr, TransferJobRequest.class);
logger.info("Job Recieved: {}",request.toString());
Expand All @@ -72,15 +99,17 @@ public void consumeDefaultMessage(final Message message) {
request.getSource().setInfoList(new ArrayList<>(fileExpandedList));
}
try {
JobParameters parameters = jobParamService.translate(new JobParametersBuilder(), request);
parameters = jobParamService.translate(new JobParametersBuilder(), request);
crudService.insertBeforeTransfer(request);
jc.setRequest(request);
asyncJobLauncher.run(jc.concurrentJobDefinition(), parameters);
return;
} catch (Exception e) {
failureException.add(e);
e.printStackTrace();
}
} catch (JsonProcessingException e) {
failureException.add(e);
logger.debug("Failed to parse jsonStr:{} to TransferJobRequest.java", jsonStr);
}
try {
Expand All @@ -89,6 +118,9 @@ public void consumeDefaultMessage(final Message message) {
this.threadPoolManager.applyOptimizer(params.getConcurrency(), params.getParallelism());
} catch (JsonProcessingException e) {
logger.info("Did not apply transfer params due to parsing message failure");
failureException.add(e);
DeadLetterQueueData failedMessage = deadLetterQueueService.convertDataToDLQ( parameters, failureException, null);
rmqTemplate.convertAndSend(deadLetterExchange,deadLetterRoutingKey,failedMessage);
e.printStackTrace();
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package org.onedatashare.transferservice.odstransferservice.model;

import lombok.Data;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.StepExecution;

import java.io.Serializable;
import java.util.Collection;
import java.util.List;

@Data
public class DeadLetterQueueData implements Serializable {
public List <Throwable> failureException;
public JobParameters jobParameters;
public Collection<StepExecution> stepExecutions;
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package org.onedatashare.transferservice.odstransferservice.service;

import org.onedatashare.transferservice.odstransferservice.model.DeadLetterQueueData;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.StepExecution;
import org.springframework.stereotype.Service;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;

@Service
public class DeadLetterQueueService {

public DeadLetterQueueData convertDataToDLQ(JobParameters jobParameters, List<Throwable> failureExceptions, Collection<StepExecution> stepExecutions){
DeadLetterQueueData data = new DeadLetterQueueData();
data.failureException = new ArrayList<>(failureExceptions);
data.jobParameters = jobParameters;
data.stepExecutions = new ArrayList<>(stepExecutions);
return data;
}

}
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
package org.onedatashare.transferservice.odstransferservice.service.listner;

import org.onedatashare.transferservice.odstransferservice.constant.ODSConstants;
import org.onedatashare.transferservice.odstransferservice.model.DeadLetterQueueData;
import org.onedatashare.transferservice.odstransferservice.model.optimizer.OptimizerCreateRequest;
import org.onedatashare.transferservice.odstransferservice.model.optimizer.OptimizerDeleteRequest;
import org.onedatashare.transferservice.odstransferservice.pools.ThreadPoolManager;
import org.onedatashare.transferservice.odstransferservice.service.ConnectionBag;
import org.onedatashare.transferservice.odstransferservice.service.DeadLetterQueueService;
import org.onedatashare.transferservice.odstransferservice.service.OptimizerService;
import org.onedatashare.transferservice.odstransferservice.service.cron.MetricsCollector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.listener.JobExecutionListenerSupport;
import org.springframework.beans.factory.annotation.Value;
Expand Down Expand Up @@ -45,12 +48,24 @@ public class JobCompletionListener extends JobExecutionListenerSupport {
int maxPipe;
boolean optimizerEnable;

public JobCompletionListener(ThreadPoolManager threadPoolManager, OptimizerService optimizerService, MetricsCollector metricsCollector, ConnectionBag connectionBag) {
DeadLetterQueueService deadLetterQueueService;

@Value("${ods.rabbitmq.exchange}")
private String deadLetterExchange;

@Value("${ods.rabbitmq.dead-letter-routing-key}")
private String deadLetterRoutingKey;

AmqpTemplate rmqTemplate;

public JobCompletionListener(ThreadPoolManager threadPoolManager, OptimizerService optimizerService, MetricsCollector metricsCollector, ConnectionBag connectionBag, AmqpTemplate rmqTemplate, DeadLetterQueueService deadLetterQueueService) {
this.threadPoolManager = threadPoolManager;
this.optimizerService = optimizerService;
this.metricsCollector = metricsCollector;
this.connectionBag = connectionBag;
this.optimizerEnable = false;
this.rmqTemplate = rmqTemplate;
this.deadLetterQueueService = deadLetterQueueService;
}


Expand All @@ -60,7 +75,7 @@ public void beforeJob(JobExecution jobExecution) {
long fileCount = jobExecution.getJobParameters().getLong(ODSConstants.FILE_COUNT);
String optimizerType = jobExecution.getJobParameters().getString(ODSConstants.OPTIMIZER);
if(optimizerType != null){
if(!optimizerType.equals("None")) {
if(!optimizerType.equals("None") && !optimizerType.isEmpty()) {
OptimizerCreateRequest createRequest = new OptimizerCreateRequest(appName, maxConc, maxParallel, maxPipe, optimizerType, fileCount);
optimizerService.createOptimizerBlocking(createRequest);
this.optimizerEnable = true;
Expand All @@ -84,6 +99,11 @@ public void afterJob(JobExecution jobExecution) {
this.optimizerService.deleteOptimizerBlocking(new OptimizerDeleteRequest(appName));
this.optimizerEnable = false;
}
String exitCode = jobExecution.getExitStatus().getExitCode();
if(!exitCode.equals("EXECUTING") && !exitCode.equals("COMPLETED")){
DeadLetterQueueData failedMessage = deadLetterQueueService.convertDataToDLQ(jobExecution.getJobParameters(), jobExecution.getFailureExceptions(), jobExecution.getStepExecutions());
rmqTemplate.convertAndSend(deadLetterExchange,deadLetterRoutingKey,failedMessage);
}
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -53,13 +53,11 @@ public ExitStatus afterStep(StepExecution stepExecution) throws IOException {
return stepExecution.getExitStatus();
}

public void prepareDirectories() {
public void prepareDirectories() throws IOException{
try {
Files.createDirectories(Paths.get(this.destinationPath));
} catch (FileAlreadyExistsException fileAlreadyExistsException) {
logger.warn("Already have the file with this path \t" + this.filePath.toString());
} catch (IOException e) {
e.printStackTrace();
}
}

Expand Down
2 changes: 2 additions & 0 deletions src/main/resources/application.properties
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ spring.batch.job.enabled=false

#RabitMQ
ods.rabbitmq.exchange=ods.exchange
ods.rabbitmq.dead-letter-routing-key= ods.dead-letter-routing-key
ods.rabbitmq.dead-letter-queue= ods.dead-letter-queue
#for vfs nodes this should be the APP_NAME which is always lowercase.
ods.rabbitmq.queue=${CONNECTOR_QUEUE:transferQueue}
ods.rabbitmq.routingkey=${CONNECTOR_QUEUE:ods.routing}
Expand Down