diff --git a/pom.xml b/pom.xml
index ab72a72f..cbc2d2db 100644
--- a/pom.xml
+++ b/pom.xml
@@ -43,6 +43,13 @@
com.fasterxml.jackson.datatype
jackson-datatype-jsr310
+
+
+ org.apache.httpcomponents
+ httpasyncclient
+ 4.1.5
+
+
com.influxdb
influxdb-client-java
diff --git a/src/main/java/org/onedatashare/transferservice/odstransferservice/constant/ODSConstants.java b/src/main/java/org/onedatashare/transferservice/odstransferservice/constant/ODSConstants.java
index e465a825..7956437d 100644
--- a/src/main/java/org/onedatashare/transferservice/odstransferservice/constant/ODSConstants.java
+++ b/src/main/java/org/onedatashare/transferservice/odstransferservice/constant/ODSConstants.java
@@ -60,6 +60,9 @@ public class ODSConstants {
public static final String CONCURRENCY = "concurrency";
public static final String PARALLELISM = "parallelism";
public static final String PIPELINING = "pipelining";
+
+ public static final String HTTP_PIPELINING = "http-pipelining";
+
public static final String OPTIMIZER = "optimizer";
public static final String RETRY = "retry";
public static final String APP_NAME = "appName";
diff --git a/src/main/java/org/onedatashare/transferservice/odstransferservice/model/TransferOptions.java b/src/main/java/org/onedatashare/transferservice/odstransferservice/model/TransferOptions.java
index 3043d2c3..4c4b67e3 100644
--- a/src/main/java/org/onedatashare/transferservice/odstransferservice/model/TransferOptions.java
+++ b/src/main/java/org/onedatashare/transferservice/odstransferservice/model/TransferOptions.java
@@ -15,4 +15,5 @@ public class TransferOptions {
private int concurrencyThreadCount;
private int parallelThreadCount;
private int pipeSize;
+ private int httpPipelining;
}
diff --git a/src/main/java/org/onedatashare/transferservice/odstransferservice/pools/ApacheHttpConnectionPool.java b/src/main/java/org/onedatashare/transferservice/odstransferservice/pools/ApacheHttpConnectionPool.java
new file mode 100644
index 00000000..a9751ee6
--- /dev/null
+++ b/src/main/java/org/onedatashare/transferservice/odstransferservice/pools/ApacheHttpConnectionPool.java
@@ -0,0 +1,101 @@
+package org.onedatashare.transferservice.odstransferservice.pools;
+
+import lombok.Getter;
+import org.apache.commons.pool2.ObjectPool;
+import org.apache.http.HttpClientConnection;
+import org.apache.http.impl.nio.client.CloseableHttpPipeliningClient;
+import org.apache.http.impl.nio.client.HttpAsyncClients;
+import org.onedatashare.transferservice.odstransferservice.model.credential.AccountEndpointCredential;
+
+import java.io.IOException;
+import java.net.http.HttpClient;
+import java.time.Duration;
+import java.util.NoSuchElementException;
+import java.util.concurrent.LinkedBlockingQueue;
+
+@Getter
+public class ApacheHttpConnectionPool implements ObjectPool {
+
+ AccountEndpointCredential credential;
+ int bufferSize;
+ int connectionCount;
+ public LinkedBlockingQueue connectionPool;
+ private boolean compress;
+ public ApacheHttpConnectionPool(AccountEndpointCredential credential, int bufferSize){
+ this.credential = credential;
+ this.bufferSize = bufferSize;
+ this.connectionPool = new LinkedBlockingQueue<>();
+ }
+
+ public void setCompress(boolean compress) {
+ this.compress = compress;
+ }
+
+ public boolean getCompress() {
+ return this.compress;
+ }
+
+ public int getSize() {
+ return this.connectionPool.size();
+ }
+
+ @Override
+ public void addObject() throws Exception, IllegalStateException, UnsupportedOperationException {
+ CloseableHttpPipeliningClient httpClient = HttpAsyncClients.createPipelining();
+ connectionPool.add(httpClient);
+ }
+
+ @Override
+ public void addObjects(int count) throws Exception {
+ for (int i = 0; i < count; i++) {
+ this.addObject();
+ }
+ }
+
+ @Override
+ public CloseableHttpPipeliningClient borrowObject() throws Exception, NoSuchElementException, IllegalStateException {
+
+ CloseableHttpPipeliningClient httpClient = this.connectionPool.take();
+ httpClient.start();
+ return httpClient;
+ }
+
+ @Override
+ public void clear() throws Exception, UnsupportedOperationException {
+ this.connectionPool.clear();
+ }
+
+ @Override
+ public void close() {
+ for (CloseableHttpPipeliningClient httpClient : this.connectionPool) {
+ try {
+ httpClient.close();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ this.connectionPool.remove(httpClient);
+ }
+ }
+
+ @Override
+ public int getNumActive() {
+ return 0;
+ }
+
+ @Override
+ public int getNumIdle() {
+ return 0;
+ }
+
+ @Override
+ public void invalidateObject(CloseableHttpPipeliningClient closeableHttpPipeliningClient) throws Exception {
+ closeableHttpPipeliningClient.close();
+ this.connectionPool.remove(closeableHttpPipeliningClient);
+ }
+
+ @Override
+ public void returnObject(CloseableHttpPipeliningClient closeableHttpPipeliningClient) throws Exception {
+ closeableHttpPipeliningClient.close();
+ this.connectionPool.add(closeableHttpPipeliningClient);
+ }
+}
diff --git a/src/main/java/org/onedatashare/transferservice/odstransferservice/service/ConnectionBag.java b/src/main/java/org/onedatashare/transferservice/odstransferservice/service/ConnectionBag.java
index c67ded0d..c063ee68 100644
--- a/src/main/java/org/onedatashare/transferservice/odstransferservice/service/ConnectionBag.java
+++ b/src/main/java/org/onedatashare/transferservice/odstransferservice/service/ConnectionBag.java
@@ -7,8 +7,10 @@
import org.onedatashare.transferservice.odstransferservice.model.credential.AccountEndpointCredential;
import org.onedatashare.transferservice.odstransferservice.model.credential.OAuthEndpointCredential;
import org.onedatashare.transferservice.odstransferservice.pools.*;
+import org.onedatashare.transferservice.odstransferservice.pools.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
/**
@@ -18,12 +20,14 @@
@Component
public class ConnectionBag {
private Logger logger = LoggerFactory.getLogger(ConnectionBag.class);
-
+ @Value("${ods.apache.httpclient.enabled}")
+ boolean apacheHttpClientEnabled;
private JschSessionPool sftpReaderPool;
private JschSessionPool sftpWriterPool;
private FtpConnectionPool ftpReaderPool;
private FtpConnectionPool ftpWriterPool;
private HttpConnectionPool httpReaderPool;
+ private ApacheHttpConnectionPool apacheHttpReaderPool;
private S3ConnectionPool s3ReaderPool;
private S3ConnectionPool s3WriterPool;
private GDriveConnectionPool googleDriveWriterPool;
@@ -75,7 +79,10 @@ public void preparePools(TransferJobRequest request) {
if (request.getSource().getType().equals(EndpointType.http)) {
readerMade = true;
readerType = EndpointType.http;
- this.createHttpReaderPool(request.getSource().getVfsSourceCredential(), request.getOptions().getConcurrencyThreadCount(), request.getChunkSize());
+ if(apacheHttpClientEnabled)
+ this.createApacheHttpReaderPool(request.getSource().getVfsSourceCredential(), request.getOptions().getConcurrencyThreadCount(), request.getChunkSize());
+ else
+ this.createHttpReaderPool(request.getSource().getVfsSourceCredential(), request.getOptions().getConcurrencyThreadCount(), request.getChunkSize());
}
if (request.getDestination().getType().equals(EndpointType.gdrive)) {
writerMade = true;
@@ -98,7 +105,10 @@ public void closePools() {
if (readerType != null) {
switch (readerType) {
case http:
- this.httpReaderPool.close();
+ if(!apacheHttpClientEnabled)
+ this.httpReaderPool.close();
+ else
+ this.apacheHttpReaderPool.close();
break;
case ftp:
this.ftpReaderPool.close();
@@ -178,6 +188,18 @@ private void createGoogleDriveWriterPool(OAuthEndpointCredential oauthDestCreden
this.compression = false;
}
+ public void createApacheHttpReaderPool(AccountEndpointCredential credential, int connectionCount, int chunkSize) {
+
+ this.apacheHttpReaderPool = new ApacheHttpConnectionPool(credential, chunkSize);
+ this.apacheHttpReaderPool.setCompress(false);
+ try {
+ this.apacheHttpReaderPool.addObjects(connectionCount);
+ }catch(Exception ex){
+ ex.printStackTrace();
+ }
+ this.compression = false;
+ }
+
public void createS3ReaderPool(AccountEndpointCredential credential, TransferOptions transferOptions) {
this.compression = transferOptions.getCompress();
this.s3ReaderPool = new S3ConnectionPool(credential, transferOptions);
diff --git a/src/main/java/org/onedatashare/transferservice/odstransferservice/service/JobControl.java b/src/main/java/org/onedatashare/transferservice/odstransferservice/service/JobControl.java
index 13d6092c..b2128e10 100644
--- a/src/main/java/org/onedatashare/transferservice/odstransferservice/service/JobControl.java
+++ b/src/main/java/org/onedatashare/transferservice/odstransferservice/service/JobControl.java
@@ -23,6 +23,7 @@
import org.onedatashare.transferservice.odstransferservice.service.step.googleDrive.GDriveReader;
import org.onedatashare.transferservice.odstransferservice.service.step.googleDrive.GDriveResumableWriter;
import org.onedatashare.transferservice.odstransferservice.service.step.googleDrive.GDriveSimpleWriter;
+import org.onedatashare.transferservice.odstransferservice.service.step.http.ApacheHttpReader;
import org.onedatashare.transferservice.odstransferservice.service.step.http.HttpReader;
import org.onedatashare.transferservice.odstransferservice.service.step.scp.SCPReader;
import org.onedatashare.transferservice.odstransferservice.service.step.scp.SCPWriter;
@@ -48,6 +49,7 @@
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.support.AbstractItemCountingItemStreamItemReader;
import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Lazy;
import org.springframework.retry.policy.SimpleRetryPolicy;
@@ -75,6 +77,10 @@ public class JobControl extends DefaultBatchConfigurer {
Logger logger = LoggerFactory.getLogger(JobControl.class);
+
+ @Value("${ods.apache.httpclient.enabled}")
+ boolean apacheHttpClientEnabled;
+
@Autowired
ThreadPoolManager threadPoolManager;
@@ -141,9 +147,17 @@ private List createConcurrentFlow(List infoList, String basePa
protected AbstractItemCountingItemStreamItemReader getRightReader(EndpointType type, EntityInfo fileInfo) {
switch (type) {
case http:
- HttpReader hr = new HttpReader(fileInfo, request.getSource().getVfsSourceCredential());
- hr.setPool(connectionBag.getHttpReaderPool());
- return hr;
+
+ if(apacheHttpClientEnabled){
+ ApacheHttpReader hr = new ApacheHttpReader(fileInfo, request.getSource().getVfsSourceCredential());
+ hr.setPool(connectionBag.getApacheHttpReaderPool());
+ return hr;
+ }else {
+ HttpReader hr = new HttpReader(fileInfo, request.getSource().getVfsSourceCredential());
+ hr.setPool(connectionBag.getHttpReaderPool());
+ return hr;
+ }
+
case vfs:
VfsReader vfsReader = new VfsReader(request.getSource().getVfsSourceCredential(), fileInfo);
return vfsReader;
diff --git a/src/main/java/org/onedatashare/transferservice/odstransferservice/service/JobParamService.java b/src/main/java/org/onedatashare/transferservice/odstransferservice/service/JobParamService.java
index 2bd8a2a0..93b08c9f 100644
--- a/src/main/java/org/onedatashare/transferservice/odstransferservice/service/JobParamService.java
+++ b/src/main/java/org/onedatashare/transferservice/odstransferservice/service/JobParamService.java
@@ -47,6 +47,7 @@ public JobParameters translate(JobParametersBuilder builder, TransferJobRequest
builder.addLong(CONCURRENCY, (long) request.getOptions().getConcurrencyThreadCount());
builder.addLong(PARALLELISM, (long) request.getOptions().getParallelThreadCount());
builder.addLong(PIPELINING, (long) request.getOptions().getPipeSize());
+ builder.addLong(HTTP_PIPELINING,(long) request.getOptions().getHttpPipelining());
builder.addString(CHUNK_SIZE, String.valueOf(request.getChunkSize()));
builder.addString(COMPRESS, String.valueOf(request.getOptions().getCompress()));
builder.addLong(RETRY, (long) request.getOptions().getRetry());
diff --git a/src/main/java/org/onedatashare/transferservice/odstransferservice/service/step/http/ApacheHttpReader.java b/src/main/java/org/onedatashare/transferservice/odstransferservice/service/step/http/ApacheHttpReader.java
new file mode 100644
index 00000000..322fd5cf
--- /dev/null
+++ b/src/main/java/org/onedatashare/transferservice/odstransferservice/service/step/http/ApacheHttpReader.java
@@ -0,0 +1,160 @@
+package org.onedatashare.transferservice.odstransferservice.service.step.http;
+
+import org.apache.commons.pool2.ObjectPool;
+
+import org.apache.http.*;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.impl.nio.client.CloseableHttpPipeliningClient;
+import org.onedatashare.transferservice.odstransferservice.constant.ODSConstants;
+import org.onedatashare.transferservice.odstransferservice.model.DataChunk;
+import org.onedatashare.transferservice.odstransferservice.model.EntityInfo;
+import org.onedatashare.transferservice.odstransferservice.model.FilePart;
+import org.onedatashare.transferservice.odstransferservice.model.SetPool;
+import org.onedatashare.transferservice.odstransferservice.model.credential.AccountEndpointCredential;
+import org.onedatashare.transferservice.odstransferservice.pools.ApacheHttpConnectionPool;
+import org.onedatashare.transferservice.odstransferservice.pools.HttpConnectionPool;
+import org.onedatashare.transferservice.odstransferservice.service.FilePartitioner;
+import org.onedatashare.transferservice.odstransferservice.utility.ODSUtility;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.batch.core.JobParameters;
+import org.springframework.batch.core.StepExecution;
+import org.springframework.batch.core.annotation.AfterChunk;
+import org.springframework.batch.core.annotation.AfterChunkError;
+import org.springframework.batch.core.annotation.AfterRead;
+import org.springframework.batch.core.annotation.BeforeStep;
+import org.springframework.batch.core.scope.context.ChunkContext;
+import org.springframework.batch.item.support.AbstractItemCountingItemStreamItemReader;
+import org.springframework.util.ClassUtils;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.List;
+import java.util.concurrent.Future;
+
+import static org.onedatashare.transferservice.odstransferservice.constant.ODSConstants.*;
+
+
+public class ApacheHttpReader extends AbstractItemCountingItemStreamItemReader {
+ Logger logger = LoggerFactory.getLogger(ApacheHttpReader.class);
+ String sBasePath;
+ String fileName;
+ FilePartitioner filePartitioner;
+ EntityInfo fileInfo;
+ CloseableHttpPipeliningClient client;
+ ApacheHttpConnectionPool apacheHttpConnectionPool;
+ Boolean range;
+ AccountEndpointCredential sourceCred;
+ Boolean compressable;
+ Boolean compress;
+ HttpHost host;
+ long httpPipeSize;
+ private String uri;
+
+ public ApacheHttpReader(EntityInfo fileInfo, AccountEndpointCredential credential) {
+ this.setExecutionContextName(ClassUtils.getShortName(ApacheHttpReader.class));
+ this.fileInfo = fileInfo;
+ this.filePartitioner = new FilePartitioner(fileInfo.getChunkSize());
+ this.sourceCred = credential;
+ }
+
+
+ public void setPool(ApacheHttpConnectionPool connectionPool) {
+ this.apacheHttpConnectionPool = connectionPool;
+ }
+
+
+ @BeforeStep
+ public void beforeStep(StepExecution stepExecution) throws IOException, InterruptedException {
+ JobParameters params = stepExecution.getJobExecution().getJobParameters();
+ this.httpPipeSize = params.getLong(HTTP_PIPELINING);
+ this.sBasePath = params.getString(SOURCE_BASE_PATH);
+ this.filePartitioner.createParts(this.fileInfo.getSize(), this.fileInfo.getId());
+ this.compress = this.apacheHttpConnectionPool.getCompress();
+ this.fileName = fileInfo.getId();
+ this.uri = sourceCred.getUri();
+ }
+
+
+ @Override
+ protected DataChunk doRead() throws Exception {
+ int requestCount=0;
+ List requests = new ArrayList<>();
+ long startPosition=-1,partIndex=-1,resultChunkSize=0;
+ while(requestCount < httpPipeSize){
+ FilePart filePart = this.filePartitioner.nextPart();
+ if (filePart == null)
+ break;
+ //Retrieving the least start position
+ if(startPosition==-1){
+ startPosition = filePart.getStart();
+ }else{
+ startPosition = filePart.getStart() < startPosition ? filePart.getStart() : startPosition;
+ }
+
+ if(partIndex==-1){
+ partIndex = filePart.getPartIdx();
+ }else{
+ partIndex = filePart.getPartIdx() < partIndex ? filePart.getPartIdx() : partIndex;
+ }
+
+ //Adding all the request chunks size
+ resultChunkSize+=filePart.getSize();
+ HttpGet get = new HttpGet(fileInfo.getPath());
+ get.setProtocolVersion(HttpVersion.HTTP_1_1);
+ get.setHeader(range ? ODSConstants.RANGE : ODSConstants.AccessControlExposeHeaders,
+ range ? String.format(ODSConstants.byteRange,filePart.getStart(), filePart.getEnd()) : ODSConstants.ContentRange);
+ if(compress && compressable) {
+ get.setHeader(ODSConstants.ACCEPT_ENCODING, ODSConstants.GZIP);
+ }
+ requests.add(get);
+ requestCount++;
+ }
+ if(requests.size()==0){
+ return null;
+ }
+ Future> future = client.execute(host, requests, null);
+
+ List responses = future.get();
+ DataChunk chunk = null;
+ try(ByteArrayOutputStream outputStream = new ByteArrayOutputStream()){
+ for (HttpResponse response : responses) {
+ response.getEntity().getContent().transferTo(outputStream);
+ }
+ chunk = ODSUtility.makeChunk(resultChunkSize, outputStream.toByteArray(), startPosition, (int)partIndex, this.fileName);
+ }
+
+ logger.info(chunk.toString());
+ return chunk;
+ }
+
+ @Override
+ protected void doOpen() throws Exception {
+ this.client = this.apacheHttpConnectionPool.borrowObject();
+ this.host = HttpHost.create(uri);
+ HttpGet[] requests = { new HttpGet(fileInfo.getPath()) };
+ requests[0].setHeader(ODSConstants.ACCEPT_ENCODING, ODSConstants.GZIP);
+ requests[0].setHeader(ODSConstants.RANGE, String.format(ODSConstants.byteRange,0, 1));
+ Future> future = client.execute(host, Arrays.asList(requests), null);
+ List responses = future.get();
+ responses.forEach((response) -> {
+ this.range = response.getStatusLine().getStatusCode() == 206;
+ compressable = response.containsHeader(ODSConstants.CONTENT_ENCODING);
+ });
+ if(compressable && compress) this.fileName = this.fileName + ".gzip";
+
+
+ }
+
+ @Override
+ protected void doClose() throws Exception {
+ this.apacheHttpConnectionPool.returnObject(this.client);
+ this.client=null;
+ }
+
+
+}
diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties
index 5f73a508..66ed950d 100644
--- a/src/main/resources/application.properties
+++ b/src/main/resources/application.properties
@@ -60,4 +60,7 @@ spring.datasource.hikari.maximum-pool-size=${HIKARI_POOL_SIZE:33}
transfer.service.concurrency=${MAX_CONCURRENCY:32}
transfer.service.parallelism=${MAX_PARALLELISM:32}
-transfer.service.pipelining=${MAX_PIPELINING:32}
\ No newline at end of file
+transfer.service.pipelining=${MAX_PIPELINING:32}
+
+#HTTP
+ods.apache.httpclient.enabled = false
\ No newline at end of file