Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implemented HTTP pipelining using Apache HTTP Client #67

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,13 @@
<groupId>com.fasterxml.jackson.datatype</groupId>
<artifactId>jackson-datatype-jsr310</artifactId>
</dependency>

<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpasyncclient</artifactId>
<version>4.1.5</version>
</dependency>

<dependency>
<groupId>com.influxdb</groupId>
<artifactId>influxdb-client-java</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@ public class ODSConstants {
public static final String CONCURRENCY = "concurrency";
public static final String PARALLELISM = "parallelism";
public static final String PIPELINING = "pipelining";

public static final String HTTP_PIPELINING = "http-pipelining";

public static final String OPTIMIZER = "optimizer";
public static final String RETRY = "retry";
public static final String APP_NAME = "appName";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,5 @@ public class TransferOptions {
private int concurrencyThreadCount;
private int parallelThreadCount;
private int pipeSize;
private int httpPipelining;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
package org.onedatashare.transferservice.odstransferservice.pools;

import lombok.Getter;
import org.apache.commons.pool2.ObjectPool;
import org.apache.http.HttpClientConnection;
import org.apache.http.impl.nio.client.CloseableHttpPipeliningClient;
import org.apache.http.impl.nio.client.HttpAsyncClients;
import org.onedatashare.transferservice.odstransferservice.model.credential.AccountEndpointCredential;

import java.io.IOException;
import java.net.http.HttpClient;
import java.time.Duration;
import java.util.NoSuchElementException;
import java.util.concurrent.LinkedBlockingQueue;

@Getter
public class ApacheHttpConnectionPool implements ObjectPool<CloseableHttpPipeliningClient> {

AccountEndpointCredential credential;
int bufferSize;
int connectionCount;
public LinkedBlockingQueue<CloseableHttpPipeliningClient> connectionPool;
private boolean compress;
public ApacheHttpConnectionPool(AccountEndpointCredential credential, int bufferSize){
this.credential = credential;
this.bufferSize = bufferSize;
this.connectionPool = new LinkedBlockingQueue<>();
}

public void setCompress(boolean compress) {
this.compress = compress;
}

public boolean getCompress() {
return this.compress;
}

public int getSize() {
return this.connectionPool.size();
}

@Override
public void addObject() throws Exception, IllegalStateException, UnsupportedOperationException {
CloseableHttpPipeliningClient httpClient = HttpAsyncClients.createPipelining();
connectionPool.add(httpClient);
}

@Override
public void addObjects(int count) throws Exception {
for (int i = 0; i < count; i++) {
this.addObject();
}
}

@Override
public CloseableHttpPipeliningClient borrowObject() throws Exception, NoSuchElementException, IllegalStateException {

CloseableHttpPipeliningClient httpClient = this.connectionPool.take();
httpClient.start();
return httpClient;
}

@Override
public void clear() throws Exception, UnsupportedOperationException {
this.connectionPool.clear();
}

@Override
public void close() {
for (CloseableHttpPipeliningClient httpClient : this.connectionPool) {
try {
httpClient.close();
} catch (IOException e) {
throw new RuntimeException(e);
}
this.connectionPool.remove(httpClient);
}
}

@Override
public int getNumActive() {
return 0;
}

@Override
public int getNumIdle() {
return 0;
}

@Override
public void invalidateObject(CloseableHttpPipeliningClient closeableHttpPipeliningClient) throws Exception {
closeableHttpPipeliningClient.close();
this.connectionPool.remove(closeableHttpPipeliningClient);
}

@Override
public void returnObject(CloseableHttpPipeliningClient closeableHttpPipeliningClient) throws Exception {
closeableHttpPipeliningClient.close();
this.connectionPool.add(closeableHttpPipeliningClient);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,10 @@
import org.onedatashare.transferservice.odstransferservice.model.credential.AccountEndpointCredential;
import org.onedatashare.transferservice.odstransferservice.model.credential.OAuthEndpointCredential;
import org.onedatashare.transferservice.odstransferservice.pools.*;
import org.onedatashare.transferservice.odstransferservice.pools.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

/**
Expand All @@ -18,12 +20,14 @@
@Component
public class ConnectionBag {
private Logger logger = LoggerFactory.getLogger(ConnectionBag.class);

@Value("${ods.apache.httpclient.enabled}")
boolean apacheHttpClientEnabled;
private JschSessionPool sftpReaderPool;
private JschSessionPool sftpWriterPool;
private FtpConnectionPool ftpReaderPool;
private FtpConnectionPool ftpWriterPool;
private HttpConnectionPool httpReaderPool;
private ApacheHttpConnectionPool apacheHttpReaderPool;
private S3ConnectionPool s3ReaderPool;
private S3ConnectionPool s3WriterPool;
private GDriveConnectionPool googleDriveWriterPool;
Expand Down Expand Up @@ -75,7 +79,10 @@ public void preparePools(TransferJobRequest request) {
if (request.getSource().getType().equals(EndpointType.http)) {
readerMade = true;
readerType = EndpointType.http;
this.createHttpReaderPool(request.getSource().getVfsSourceCredential(), request.getOptions().getConcurrencyThreadCount(), request.getChunkSize());
if(apacheHttpClientEnabled)
this.createApacheHttpReaderPool(request.getSource().getVfsSourceCredential(), request.getOptions().getConcurrencyThreadCount(), request.getChunkSize());
else
this.createHttpReaderPool(request.getSource().getVfsSourceCredential(), request.getOptions().getConcurrencyThreadCount(), request.getChunkSize());
}
if (request.getDestination().getType().equals(EndpointType.gdrive)) {
writerMade = true;
Expand All @@ -98,7 +105,10 @@ public void closePools() {
if (readerType != null) {
switch (readerType) {
case http:
this.httpReaderPool.close();
if(!apacheHttpClientEnabled)
this.httpReaderPool.close();
else
this.apacheHttpReaderPool.close();
break;
case ftp:
this.ftpReaderPool.close();
Expand Down Expand Up @@ -178,6 +188,18 @@ private void createGoogleDriveWriterPool(OAuthEndpointCredential oauthDestCreden
this.compression = false;
}

public void createApacheHttpReaderPool(AccountEndpointCredential credential, int connectionCount, int chunkSize) {

this.apacheHttpReaderPool = new ApacheHttpConnectionPool(credential, chunkSize);
this.apacheHttpReaderPool.setCompress(false);
try {
this.apacheHttpReaderPool.addObjects(connectionCount);
}catch(Exception ex){
ex.printStackTrace();
}
this.compression = false;
}

public void createS3ReaderPool(AccountEndpointCredential credential, TransferOptions transferOptions) {
this.compression = transferOptions.getCompress();
this.s3ReaderPool = new S3ConnectionPool(credential, transferOptions);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.onedatashare.transferservice.odstransferservice.service.step.googleDrive.GDriveReader;
import org.onedatashare.transferservice.odstransferservice.service.step.googleDrive.GDriveResumableWriter;
import org.onedatashare.transferservice.odstransferservice.service.step.googleDrive.GDriveSimpleWriter;
import org.onedatashare.transferservice.odstransferservice.service.step.http.ApacheHttpReader;
import org.onedatashare.transferservice.odstransferservice.service.step.http.HttpReader;
import org.onedatashare.transferservice.odstransferservice.service.step.scp.SCPReader;
import org.onedatashare.transferservice.odstransferservice.service.step.scp.SCPWriter;
Expand All @@ -48,6 +49,7 @@
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.support.AbstractItemCountingItemStreamItemReader;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Lazy;
import org.springframework.retry.policy.SimpleRetryPolicy;
Expand Down Expand Up @@ -75,6 +77,10 @@ public class JobControl extends DefaultBatchConfigurer {

Logger logger = LoggerFactory.getLogger(JobControl.class);


@Value("${ods.apache.httpclient.enabled}")
boolean apacheHttpClientEnabled;

@Autowired
ThreadPoolManager threadPoolManager;

Expand Down Expand Up @@ -141,9 +147,17 @@ private List<Flow> createConcurrentFlow(List<EntityInfo> infoList, String basePa
protected AbstractItemCountingItemStreamItemReader<DataChunk> getRightReader(EndpointType type, EntityInfo fileInfo) {
switch (type) {
case http:
HttpReader hr = new HttpReader(fileInfo, request.getSource().getVfsSourceCredential());
hr.setPool(connectionBag.getHttpReaderPool());
return hr;

if(apacheHttpClientEnabled){
ApacheHttpReader hr = new ApacheHttpReader(fileInfo, request.getSource().getVfsSourceCredential());
hr.setPool(connectionBag.getApacheHttpReaderPool());
return hr;
}else {
HttpReader hr = new HttpReader(fileInfo, request.getSource().getVfsSourceCredential());
hr.setPool(connectionBag.getHttpReaderPool());
return hr;
}

case vfs:
VfsReader vfsReader = new VfsReader(request.getSource().getVfsSourceCredential(), fileInfo);
return vfsReader;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ public JobParameters translate(JobParametersBuilder builder, TransferJobRequest
builder.addLong(CONCURRENCY, (long) request.getOptions().getConcurrencyThreadCount());
builder.addLong(PARALLELISM, (long) request.getOptions().getParallelThreadCount());
builder.addLong(PIPELINING, (long) request.getOptions().getPipeSize());
builder.addLong(HTTP_PIPELINING,(long) request.getOptions().getHttpPipelining());
builder.addString(CHUNK_SIZE, String.valueOf(request.getChunkSize()));
builder.addString(COMPRESS, String.valueOf(request.getOptions().getCompress()));
builder.addLong(RETRY, (long) request.getOptions().getRetry());
Expand Down
Loading