From 718c2a9e78d046e3881483d091d935d089b93f6b Mon Sep 17 00:00:00 2001
From: Venkata Sai Dheeraj Narayanabhatla <saidheerajnvs29@gmail.com>
Date: Fri, 7 Apr 2023 12:12:24 -0400
Subject: [PATCH] Implemented HTTP pipelining using Apache HTTP Client

Implemented HTTP pipelining using Apache HTTP Client
---
 pom.xml                                       |   7 +
 .../constant/ODSConstants.java                |   3 +
 .../model/TransferOptions.java                |   1 +
 .../pools/ApacheHttpConnectionPool.java       | 101 +++++++++++
 .../service/ConnectionBag.java                |  28 ++-
 .../service/JobControl.java                   |  20 ++-
 .../service/JobParamService.java              |   1 +
 .../service/step/http/ApacheHttpReader.java   | 160 ++++++++++++++++++
 src/main/resources/application.properties     |   5 +-
 9 files changed, 319 insertions(+), 7 deletions(-)
 create mode 100644 src/main/java/org/onedatashare/transferservice/odstransferservice/pools/ApacheHttpConnectionPool.java
 create mode 100644 src/main/java/org/onedatashare/transferservice/odstransferservice/service/step/http/ApacheHttpReader.java

diff --git a/pom.xml b/pom.xml
index ab72a72f..cbc2d2db 100644
--- a/pom.xml
+++ b/pom.xml
@@ -43,6 +43,13 @@
             <groupId>com.fasterxml.jackson.datatype</groupId>
             <artifactId>jackson-datatype-jsr310</artifactId>
         </dependency>
+
+        <dependency>
+            <groupId>org.apache.httpcomponents</groupId>
+            <artifactId>httpasyncclient</artifactId>
+            <version>4.1.5</version>
+        </dependency>
+
         <dependency>
             <groupId>com.influxdb</groupId>
             <artifactId>influxdb-client-java</artifactId>
diff --git a/src/main/java/org/onedatashare/transferservice/odstransferservice/constant/ODSConstants.java b/src/main/java/org/onedatashare/transferservice/odstransferservice/constant/ODSConstants.java
index e465a825..7956437d 100644
--- a/src/main/java/org/onedatashare/transferservice/odstransferservice/constant/ODSConstants.java
+++ b/src/main/java/org/onedatashare/transferservice/odstransferservice/constant/ODSConstants.java
@@ -60,6 +60,9 @@ public class ODSConstants {
     public static final String CONCURRENCY = "concurrency";
     public static final String PARALLELISM = "parallelism";
     public static final String PIPELINING = "pipelining";
+
+    public static final String HTTP_PIPELINING = "http-pipelining";
+
     public static final String OPTIMIZER = "optimizer";
     public static final String RETRY = "retry";
     public static final String APP_NAME = "appName";
diff --git a/src/main/java/org/onedatashare/transferservice/odstransferservice/model/TransferOptions.java b/src/main/java/org/onedatashare/transferservice/odstransferservice/model/TransferOptions.java
index 3043d2c3..4c4b67e3 100644
--- a/src/main/java/org/onedatashare/transferservice/odstransferservice/model/TransferOptions.java
+++ b/src/main/java/org/onedatashare/transferservice/odstransferservice/model/TransferOptions.java
@@ -15,4 +15,5 @@ public class TransferOptions {
     private int concurrencyThreadCount;
     private int parallelThreadCount;
     private int pipeSize;
+    private int httpPipelining;
 }
diff --git a/src/main/java/org/onedatashare/transferservice/odstransferservice/pools/ApacheHttpConnectionPool.java b/src/main/java/org/onedatashare/transferservice/odstransferservice/pools/ApacheHttpConnectionPool.java
new file mode 100644
index 00000000..a9751ee6
--- /dev/null
+++ b/src/main/java/org/onedatashare/transferservice/odstransferservice/pools/ApacheHttpConnectionPool.java
@@ -0,0 +1,101 @@
+package org.onedatashare.transferservice.odstransferservice.pools;
+
+import lombok.Getter;
+import org.apache.commons.pool2.ObjectPool;
+import org.apache.http.HttpClientConnection;
+import org.apache.http.impl.nio.client.CloseableHttpPipeliningClient;
+import org.apache.http.impl.nio.client.HttpAsyncClients;
+import org.onedatashare.transferservice.odstransferservice.model.credential.AccountEndpointCredential;
+
+import java.io.IOException;
+import java.net.http.HttpClient;
+import java.time.Duration;
+import java.util.NoSuchElementException;
+import java.util.concurrent.LinkedBlockingQueue;
+
+@Getter
+public class ApacheHttpConnectionPool implements ObjectPool<CloseableHttpPipeliningClient> {
+
+    AccountEndpointCredential credential;
+    int bufferSize;
+    int connectionCount;
+    public LinkedBlockingQueue<CloseableHttpPipeliningClient> connectionPool;
+    private boolean compress;
+    public ApacheHttpConnectionPool(AccountEndpointCredential credential, int bufferSize){
+        this.credential = credential;
+        this.bufferSize = bufferSize;
+        this.connectionPool = new LinkedBlockingQueue<>();
+    }
+
+    public void setCompress(boolean compress) {
+        this.compress = compress;
+    }
+
+    public boolean getCompress() {
+        return this.compress;
+    }
+
+    public int getSize() {
+        return this.connectionPool.size();
+    }
+
+    @Override
+    public void addObject() throws Exception, IllegalStateException, UnsupportedOperationException {
+        CloseableHttpPipeliningClient httpClient = HttpAsyncClients.createPipelining();
+        connectionPool.add(httpClient);
+    }
+
+    @Override
+    public void addObjects(int count) throws Exception {
+        for (int i = 0; i < count; i++) {
+            this.addObject();
+        }
+    }
+
+    @Override
+    public CloseableHttpPipeliningClient borrowObject() throws Exception, NoSuchElementException, IllegalStateException {
+
+        CloseableHttpPipeliningClient httpClient = this.connectionPool.take();
+        httpClient.start();
+        return httpClient;
+    }
+
+    @Override
+    public void clear() throws Exception, UnsupportedOperationException {
+        this.connectionPool.clear();
+    }
+
+    @Override
+    public void close() {
+        for (CloseableHttpPipeliningClient httpClient : this.connectionPool) {
+            try {
+                httpClient.close();
+            } catch (IOException e) {
+                throw new RuntimeException(e);
+            }
+            this.connectionPool.remove(httpClient);
+        }
+    }
+
+    @Override
+    public int getNumActive() {
+        return 0;
+    }
+
+    @Override
+    public int getNumIdle() {
+        return 0;
+    }
+
+    @Override
+    public void invalidateObject(CloseableHttpPipeliningClient closeableHttpPipeliningClient) throws Exception {
+        closeableHttpPipeliningClient.close();
+        this.connectionPool.remove(closeableHttpPipeliningClient);
+    }
+
+    @Override
+    public void returnObject(CloseableHttpPipeliningClient closeableHttpPipeliningClient) throws Exception {
+        closeableHttpPipeliningClient.close();
+        this.connectionPool.add(closeableHttpPipeliningClient);
+    }
+}
diff --git a/src/main/java/org/onedatashare/transferservice/odstransferservice/service/ConnectionBag.java b/src/main/java/org/onedatashare/transferservice/odstransferservice/service/ConnectionBag.java
index c67ded0d..c063ee68 100644
--- a/src/main/java/org/onedatashare/transferservice/odstransferservice/service/ConnectionBag.java
+++ b/src/main/java/org/onedatashare/transferservice/odstransferservice/service/ConnectionBag.java
@@ -7,8 +7,10 @@
 import org.onedatashare.transferservice.odstransferservice.model.credential.AccountEndpointCredential;
 import org.onedatashare.transferservice.odstransferservice.model.credential.OAuthEndpointCredential;
 import org.onedatashare.transferservice.odstransferservice.pools.*;
+import org.onedatashare.transferservice.odstransferservice.pools.*;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Value;
 import org.springframework.stereotype.Component;
 
 /**
@@ -18,12 +20,14 @@
 @Component
 public class ConnectionBag {
     private Logger logger = LoggerFactory.getLogger(ConnectionBag.class);
-
+    @Value("${ods.apache.httpclient.enabled}")
+    boolean apacheHttpClientEnabled;
     private JschSessionPool sftpReaderPool;
     private JschSessionPool sftpWriterPool;
     private FtpConnectionPool ftpReaderPool;
     private FtpConnectionPool ftpWriterPool;
     private HttpConnectionPool httpReaderPool;
+    private ApacheHttpConnectionPool apacheHttpReaderPool;
     private S3ConnectionPool s3ReaderPool;
     private S3ConnectionPool s3WriterPool;
     private GDriveConnectionPool googleDriveWriterPool;
@@ -75,7 +79,10 @@ public void preparePools(TransferJobRequest request) {
         if (request.getSource().getType().equals(EndpointType.http)) {
             readerMade = true;
             readerType = EndpointType.http;
-            this.createHttpReaderPool(request.getSource().getVfsSourceCredential(), request.getOptions().getConcurrencyThreadCount(), request.getChunkSize());
+            if(apacheHttpClientEnabled)
+                this.createApacheHttpReaderPool(request.getSource().getVfsSourceCredential(), request.getOptions().getConcurrencyThreadCount(), request.getChunkSize());
+            else
+                this.createHttpReaderPool(request.getSource().getVfsSourceCredential(), request.getOptions().getConcurrencyThreadCount(), request.getChunkSize());
         }
         if (request.getDestination().getType().equals(EndpointType.gdrive)) {
             writerMade = true;
@@ -98,7 +105,10 @@ public void closePools() {
         if (readerType != null) {
             switch (readerType) {
                 case http:
-                    this.httpReaderPool.close();
+                    if(!apacheHttpClientEnabled)
+                        this.httpReaderPool.close();
+                    else
+                        this.apacheHttpReaderPool.close();
                     break;
                 case ftp:
                     this.ftpReaderPool.close();
@@ -178,6 +188,18 @@ private void createGoogleDriveWriterPool(OAuthEndpointCredential oauthDestCreden
         this.compression = false;
     }
 
+    public void createApacheHttpReaderPool(AccountEndpointCredential credential, int connectionCount, int chunkSize) {
+
+        this.apacheHttpReaderPool = new ApacheHttpConnectionPool(credential, chunkSize);
+        this.apacheHttpReaderPool.setCompress(false);
+        try {
+            this.apacheHttpReaderPool.addObjects(connectionCount);
+        }catch(Exception ex){
+            ex.printStackTrace();
+        }
+        this.compression = false;
+    }
+
     public void createS3ReaderPool(AccountEndpointCredential credential, TransferOptions transferOptions) {
         this.compression = transferOptions.getCompress();
         this.s3ReaderPool = new S3ConnectionPool(credential, transferOptions);
diff --git a/src/main/java/org/onedatashare/transferservice/odstransferservice/service/JobControl.java b/src/main/java/org/onedatashare/transferservice/odstransferservice/service/JobControl.java
index 13d6092c..b2128e10 100644
--- a/src/main/java/org/onedatashare/transferservice/odstransferservice/service/JobControl.java
+++ b/src/main/java/org/onedatashare/transferservice/odstransferservice/service/JobControl.java
@@ -23,6 +23,7 @@
 import org.onedatashare.transferservice.odstransferservice.service.step.googleDrive.GDriveReader;
 import org.onedatashare.transferservice.odstransferservice.service.step.googleDrive.GDriveResumableWriter;
 import org.onedatashare.transferservice.odstransferservice.service.step.googleDrive.GDriveSimpleWriter;
+import org.onedatashare.transferservice.odstransferservice.service.step.http.ApacheHttpReader;
 import org.onedatashare.transferservice.odstransferservice.service.step.http.HttpReader;
 import org.onedatashare.transferservice.odstransferservice.service.step.scp.SCPReader;
 import org.onedatashare.transferservice.odstransferservice.service.step.scp.SCPWriter;
@@ -48,6 +49,7 @@
 import org.springframework.batch.item.ItemWriter;
 import org.springframework.batch.item.support.AbstractItemCountingItemStreamItemReader;
 import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Lazy;
 import org.springframework.retry.policy.SimpleRetryPolicy;
@@ -75,6 +77,10 @@ public class JobControl extends DefaultBatchConfigurer {
 
     Logger logger = LoggerFactory.getLogger(JobControl.class);
 
+
+    @Value("${ods.apache.httpclient.enabled}")
+    boolean apacheHttpClientEnabled;
+
     @Autowired
     ThreadPoolManager threadPoolManager;
 
@@ -141,9 +147,17 @@ private List<Flow> createConcurrentFlow(List<EntityInfo> infoList, String basePa
     protected AbstractItemCountingItemStreamItemReader<DataChunk> getRightReader(EndpointType type, EntityInfo fileInfo) {
         switch (type) {
             case http:
-                HttpReader hr = new HttpReader(fileInfo, request.getSource().getVfsSourceCredential());
-                hr.setPool(connectionBag.getHttpReaderPool());
-                return hr;
+
+                if(apacheHttpClientEnabled){
+                    ApacheHttpReader hr = new ApacheHttpReader(fileInfo, request.getSource().getVfsSourceCredential());
+                    hr.setPool(connectionBag.getApacheHttpReaderPool());
+                    return hr;
+                }else {
+                    HttpReader hr = new HttpReader(fileInfo, request.getSource().getVfsSourceCredential());
+                    hr.setPool(connectionBag.getHttpReaderPool());
+                    return hr;
+                }
+
             case vfs:
                 VfsReader vfsReader = new VfsReader(request.getSource().getVfsSourceCredential(), fileInfo);
                 return vfsReader;
diff --git a/src/main/java/org/onedatashare/transferservice/odstransferservice/service/JobParamService.java b/src/main/java/org/onedatashare/transferservice/odstransferservice/service/JobParamService.java
index 2bd8a2a0..93b08c9f 100644
--- a/src/main/java/org/onedatashare/transferservice/odstransferservice/service/JobParamService.java
+++ b/src/main/java/org/onedatashare/transferservice/odstransferservice/service/JobParamService.java
@@ -47,6 +47,7 @@ public JobParameters translate(JobParametersBuilder builder, TransferJobRequest
         builder.addLong(CONCURRENCY, (long) request.getOptions().getConcurrencyThreadCount());
         builder.addLong(PARALLELISM, (long) request.getOptions().getParallelThreadCount());
         builder.addLong(PIPELINING, (long) request.getOptions().getPipeSize());
+        builder.addLong(HTTP_PIPELINING,(long) request.getOptions().getHttpPipelining());
         builder.addString(CHUNK_SIZE, String.valueOf(request.getChunkSize()));
         builder.addString(COMPRESS, String.valueOf(request.getOptions().getCompress()));
         builder.addLong(RETRY, (long) request.getOptions().getRetry());
diff --git a/src/main/java/org/onedatashare/transferservice/odstransferservice/service/step/http/ApacheHttpReader.java b/src/main/java/org/onedatashare/transferservice/odstransferservice/service/step/http/ApacheHttpReader.java
new file mode 100644
index 00000000..322fd5cf
--- /dev/null
+++ b/src/main/java/org/onedatashare/transferservice/odstransferservice/service/step/http/ApacheHttpReader.java
@@ -0,0 +1,160 @@
+package org.onedatashare.transferservice.odstransferservice.service.step.http;
+
+import org.apache.commons.pool2.ObjectPool;
+
+import org.apache.http.*;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.impl.nio.client.CloseableHttpPipeliningClient;
+import org.onedatashare.transferservice.odstransferservice.constant.ODSConstants;
+import org.onedatashare.transferservice.odstransferservice.model.DataChunk;
+import org.onedatashare.transferservice.odstransferservice.model.EntityInfo;
+import org.onedatashare.transferservice.odstransferservice.model.FilePart;
+import org.onedatashare.transferservice.odstransferservice.model.SetPool;
+import org.onedatashare.transferservice.odstransferservice.model.credential.AccountEndpointCredential;
+import org.onedatashare.transferservice.odstransferservice.pools.ApacheHttpConnectionPool;
+import org.onedatashare.transferservice.odstransferservice.pools.HttpConnectionPool;
+import org.onedatashare.transferservice.odstransferservice.service.FilePartitioner;
+import org.onedatashare.transferservice.odstransferservice.utility.ODSUtility;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.batch.core.JobParameters;
+import org.springframework.batch.core.StepExecution;
+import org.springframework.batch.core.annotation.AfterChunk;
+import org.springframework.batch.core.annotation.AfterChunkError;
+import org.springframework.batch.core.annotation.AfterRead;
+import org.springframework.batch.core.annotation.BeforeStep;
+import org.springframework.batch.core.scope.context.ChunkContext;
+import org.springframework.batch.item.support.AbstractItemCountingItemStreamItemReader;
+import org.springframework.util.ClassUtils;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.List;
+import java.util.concurrent.Future;
+
+import static org.onedatashare.transferservice.odstransferservice.constant.ODSConstants.*;
+
+
+public class ApacheHttpReader extends AbstractItemCountingItemStreamItemReader<DataChunk> {
+    Logger logger = LoggerFactory.getLogger(ApacheHttpReader.class);
+    String sBasePath;
+    String fileName;
+    FilePartitioner filePartitioner;
+    EntityInfo fileInfo;
+    CloseableHttpPipeliningClient client;
+    ApacheHttpConnectionPool apacheHttpConnectionPool;
+    Boolean range;
+    AccountEndpointCredential sourceCred;
+    Boolean compressable;
+    Boolean compress;
+    HttpHost host;
+    long httpPipeSize;
+    private String uri;
+
+    public ApacheHttpReader(EntityInfo fileInfo, AccountEndpointCredential credential) {
+        this.setExecutionContextName(ClassUtils.getShortName(ApacheHttpReader.class));
+        this.fileInfo = fileInfo;
+        this.filePartitioner = new FilePartitioner(fileInfo.getChunkSize());
+        this.sourceCred = credential;
+    }
+
+
+    public void setPool(ApacheHttpConnectionPool connectionPool) {
+        this.apacheHttpConnectionPool = connectionPool;
+    }
+
+
+    @BeforeStep
+    public void beforeStep(StepExecution stepExecution) throws IOException, InterruptedException {
+        JobParameters params = stepExecution.getJobExecution().getJobParameters();
+        this.httpPipeSize = params.getLong(HTTP_PIPELINING);
+        this.sBasePath = params.getString(SOURCE_BASE_PATH);
+        this.filePartitioner.createParts(this.fileInfo.getSize(), this.fileInfo.getId());
+        this.compress = this.apacheHttpConnectionPool.getCompress();
+        this.fileName = fileInfo.getId();
+        this.uri = sourceCred.getUri();
+    }
+
+
+    @Override
+    protected DataChunk doRead() throws Exception {
+        int requestCount=0;
+        List<HttpRequest> requests = new ArrayList<>();
+        long startPosition=-1,partIndex=-1,resultChunkSize=0;
+        while(requestCount < httpPipeSize){
+            FilePart filePart = this.filePartitioner.nextPart();
+            if (filePart == null)
+                break;
+            //Retrieving the least start position
+            if(startPosition==-1){
+                startPosition = filePart.getStart();
+            }else{
+                startPosition = filePart.getStart() < startPosition ? filePart.getStart() : startPosition;
+            }
+
+            if(partIndex==-1){
+                partIndex = filePart.getPartIdx();
+            }else{
+                partIndex = filePart.getPartIdx() < partIndex ? filePart.getPartIdx() : partIndex;
+            }
+
+            //Adding all the request chunks size
+            resultChunkSize+=filePart.getSize();
+            HttpGet get = new HttpGet(fileInfo.getPath());
+            get.setProtocolVersion(HttpVersion.HTTP_1_1);
+            get.setHeader(range ? ODSConstants.RANGE : ODSConstants.AccessControlExposeHeaders,
+                    range ? String.format(ODSConstants.byteRange,filePart.getStart(), filePart.getEnd()) : ODSConstants.ContentRange);
+            if(compress && compressable) {
+                get.setHeader(ODSConstants.ACCEPT_ENCODING, ODSConstants.GZIP);
+            }
+            requests.add(get);
+            requestCount++;
+        }
+        if(requests.size()==0){
+            return null;
+        }
+        Future<List<HttpResponse>> future = client.execute(host, requests, null);
+
+        List<HttpResponse> responses = future.get();
+        DataChunk chunk = null;
+        try(ByteArrayOutputStream outputStream = new ByteArrayOutputStream()){
+            for (HttpResponse response : responses) {
+                response.getEntity().getContent().transferTo(outputStream);
+            }
+            chunk = ODSUtility.makeChunk(resultChunkSize, outputStream.toByteArray(), startPosition, (int)partIndex, this.fileName);
+        }
+
+        logger.info(chunk.toString());
+        return chunk;
+    }
+
+    @Override
+    protected void doOpen() throws Exception {
+        this.client = this.apacheHttpConnectionPool.borrowObject();
+        this.host = HttpHost.create(uri);
+        HttpGet[] requests = { new HttpGet(fileInfo.getPath()) };
+        requests[0].setHeader(ODSConstants.ACCEPT_ENCODING, ODSConstants.GZIP);
+        requests[0].setHeader(ODSConstants.RANGE, String.format(ODSConstants.byteRange,0, 1));
+        Future<List<HttpResponse>> future = client.execute(host, Arrays.<HttpRequest>asList(requests), null);
+        List<HttpResponse> responses = future.get();
+        responses.forEach((response) -> {
+            this.range = response.getStatusLine().getStatusCode() == 206;
+            compressable = response.containsHeader(ODSConstants.CONTENT_ENCODING);
+        });
+        if(compressable && compress) this.fileName = this.fileName + ".gzip";
+
+
+    }
+
+    @Override
+    protected void doClose() throws Exception {
+        this.apacheHttpConnectionPool.returnObject(this.client);
+        this.client=null;
+    }
+
+
+}
diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties
index 5f73a508..66ed950d 100644
--- a/src/main/resources/application.properties
+++ b/src/main/resources/application.properties
@@ -60,4 +60,7 @@ spring.datasource.hikari.maximum-pool-size=${HIKARI_POOL_SIZE:33}
 
 transfer.service.concurrency=${MAX_CONCURRENCY:32}
 transfer.service.parallelism=${MAX_PARALLELISM:32}
-transfer.service.pipelining=${MAX_PIPELINING:32}
\ No newline at end of file
+transfer.service.pipelining=${MAX_PIPELINING:32}
+
+#HTTP
+ods.apache.httpclient.enabled = false
\ No newline at end of file