Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/reauth credentials #74

Open
wants to merge 12 commits into
base: master
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.repository.support.JobRepositoryFactoryBean;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.cloud.client.loadbalancer.LoadBalanced;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.web.client.RestTemplate;

import javax.sql.DataSource;

Expand All @@ -31,5 +33,12 @@ public JobRepository roachRepository(DataSource dataSource, PlatformTransactionM
factory.setMaxVarCharLength(1000);
return factory.getObject();
}

@LoadBalanced
@Bean
public RestTemplate credentialTemplate(){
RestTemplate credenTemplate = new RestTemplate();
return credenTemplate;
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@ public class ODSConstants {
public static final String FILE_SIZE = "fileSize";
public static final String FILE_PATH = "filePath";
public static final String FILE_ID = "file_id";

public static final String DROPBOX= "dropbox";
public static final String GOOGLEDRIVE = "gdrive";
public static final String BOX= "box";
public static final String TIME = "time";
public static final String SOURCE_ACCOUNT_ID_PASS = "sourceAccountIdPass";
public static final String SOURCE_HOST = "sourceURI";
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
package org.onedatashare.transferservice.odstransferservice.model.credential;

import lombok.Getter;

/**
* Base class for storing one user credential
*/

@Getter
public class EndpointCredential{
protected String accountId;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package org.onedatashare.transferservice.odstransferservice.service;

import org.onedatashare.transferservice.odstransferservice.model.credential.OAuthEndpointCredential;
import org.springframework.stereotype.Service;
import org.springframework.web.client.RestTemplate;


@Service
public class AuthenticateCredentials {
RestTemplate credentialTemplate;
String baseUrl = "http://EndPointCredentialService/{}/{}/{}";

public AuthenticateCredentials(RestTemplate credentialTemplate){
this.credentialTemplate = credentialTemplate;
}
public OAuthEndpointCredential checkExpiryAndGenerateNew(String userId, String type, String accountId){
String endpoint = baseUrl.replaceFirst("\\{}", userId)
.replaceFirst("\\{}", type)
.replaceFirst("\\{}", accountId);
return this.credentialTemplate.getForObject(endpoint, OAuthEndpointCredential.class);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,9 @@ public class JobControl extends DefaultBatchConfigurer {
@Autowired
JobRepository roachRepository;

@Autowired
AuthenticateCredentials authenticateCredentials;


@Lazy
@Bean
Expand Down Expand Up @@ -161,18 +164,18 @@ protected AbstractItemCountingItemStreamItemReader<DataChunk> getRightReader(End
amazonS3Reader.setPool(connectionBag.getS3ReaderPool());
return amazonS3Reader;
case box:
BoxReader boxReader = new BoxReader(request.getSource().getOauthSourceCredential(), fileInfo);
BoxReader boxReader = new BoxReader(request.getSource().getOauthSourceCredential(), fileInfo,this.authenticateCredentials);
boxReader.setMaxRetry(this.request.getOptions().getRetry());
return boxReader;
case dropbox:
DropBoxReader dropBoxReader = new DropBoxReader(request.getSource().getOauthSourceCredential(), fileInfo);
DropBoxReader dropBoxReader = new DropBoxReader(request.getSource().getOauthSourceCredential(), fileInfo, this.authenticateCredentials);
return dropBoxReader;
case scp:
SCPReader reader = new SCPReader(fileInfo);
reader.setPool(connectionBag.getSftpReaderPool());
return reader;
case gdrive:
GDriveReader dDriveReader = new GDriveReader(request.getSource().getOauthSourceCredential(), fileInfo);
GDriveReader dDriveReader = new GDriveReader(request.getSource().getOauthSourceCredential(), fileInfo, this.authenticateCredentials);
return dDriveReader;
}
return null;
Expand Down Expand Up @@ -208,11 +211,11 @@ protected ItemWriter<DataChunk> getRightWriter(EndpointType type, EntityInfo fil
BoxWriterSmallFile boxWriterSmallFile = new BoxWriterSmallFile(request.getDestination().getOauthDestCredential(), fileInfo, this.metricsCollector, this.influxCache);
return boxWriterSmallFile;
} else {
BoxWriterLargeFile boxWriterLargeFile = new BoxWriterLargeFile(request.getDestination().getOauthDestCredential(), fileInfo, this.metricsCollector, this.influxCache);
BoxWriterLargeFile boxWriterLargeFile = new BoxWriterLargeFile(request.getDestination().getOauthDestCredential(), fileInfo, this.metricsCollector, this.influxCache, this.authenticateCredentials);
return boxWriterLargeFile;
}
case dropbox:
DropBoxChunkedWriter dropBoxChunkedWriter = new DropBoxChunkedWriter(request.getDestination().getOauthDestCredential(), this.metricsCollector, this.influxCache);
DropBoxChunkedWriter dropBoxChunkedWriter = new DropBoxChunkedWriter(request.getDestination().getOauthDestCredential(), this.metricsCollector, this.influxCache, this.authenticateCredentials);
return dropBoxChunkedWriter;
case scp:
SCPWriter scpWriter = new SCPWriter(fileInfo, this.metricsCollector, this.influxCache);
Expand All @@ -223,7 +226,7 @@ protected ItemWriter<DataChunk> getRightWriter(EndpointType type, EntityInfo fil
GDriveSimpleWriter writer = new GDriveSimpleWriter(request.getDestination().getOauthDestCredential(),fileInfo);
return writer;
}else{
GDriveResumableWriter writer = new GDriveResumableWriter(request.getDestination().getOauthDestCredential(),fileInfo);
GDriveResumableWriter writer = new GDriveResumableWriter(request.getDestination().getOauthDestCredential(),fileInfo, this.authenticateCredentials);
writer.setPool(connectionBag.getGoogleDriveWriterPool());
return writer;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,13 @@

import com.box.sdk.BoxAPIConnection;
import com.box.sdk.BoxFile;
import com.box.sdk.BoxUser;
import org.onedatashare.transferservice.odstransferservice.constant.ODSConstants;
import org.onedatashare.transferservice.odstransferservice.model.DataChunk;
import org.onedatashare.transferservice.odstransferservice.model.EntityInfo;
import org.onedatashare.transferservice.odstransferservice.model.FilePart;
import org.onedatashare.transferservice.odstransferservice.model.credential.OAuthEndpointCredential;
import org.onedatashare.transferservice.odstransferservice.service.AuthenticateCredentials;
import org.onedatashare.transferservice.odstransferservice.service.FilePartitioner;
import org.onedatashare.transferservice.odstransferservice.utility.ODSUtility;
import org.slf4j.Logger;
Expand All @@ -17,6 +20,8 @@

import java.io.ByteArrayOutputStream;

import static org.onedatashare.transferservice.odstransferservice.constant.ODSConstants.OWNER_ID;

public class BoxReader extends AbstractItemCountingItemStreamItemReader<DataChunk> {

private OAuthEndpointCredential credential;
Expand All @@ -27,21 +32,35 @@ public class BoxReader extends AbstractItemCountingItemStreamItemReader<DataChun
int retry;
Logger logger = LoggerFactory.getLogger(BoxReader.class);

public BoxReader(OAuthEndpointCredential credential, EntityInfo fileInfo) {
String usersEmail;
String ownerId;

private AuthenticateCredentials authenticateCredentials;

public BoxReader(OAuthEndpointCredential credential, EntityInfo fileInfo, AuthenticateCredentials authenticateCredentials) {
this.credential = credential;
this.setName(ClassUtils.getShortName(BoxReader.class));
filePartitioner = new FilePartitioner(fileInfo.getChunkSize());
this.fileInfo = fileInfo;
retry = 1;
this.authenticateCredentials = authenticateCredentials;
}

@BeforeStep
public void beforeStep(StepExecution stepExecution){
this.ownerId = stepExecution.getJobParameters().getString(OWNER_ID);
}
/**
* Read in those chunks
*
* @return
*/
@Override
protected DataChunk doRead() {
if(this.credential.isTokenExpires()){
this.credential = authenticateCredentials.checkExpiryAndGenerateNew( usersEmail, ODSConstants.BOX, ownerId);
doOpen();
}
FilePart filePart = filePartitioner.nextPart();
if (filePart == null) return null;
ByteArrayOutputStream byteArray = new ByteArrayOutputStream();
Expand All @@ -60,6 +79,9 @@ protected DataChunk doRead() {
protected void doOpen() {
filePartitioner.createParts(this.fileInfo.getSize(), this.fileInfo.getId());
this.boxAPIConnection = new BoxAPIConnection(credential.getToken());
BoxUser user = new BoxUser(this.boxAPIConnection, this.credential.getAccountId());
BoxUser.Info userInfo = user.getInfo("login");
this.usersEmail = userInfo.getLogin();
this.currentFile = new BoxFile(this.boxAPIConnection, this.fileInfo.getId());
this.currentFile.getInfo("name");
this.boxAPIConnection.setMaxRetryAttempts(this.retry);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,12 @@
import com.box.sdk.BoxFileUploadSession;
import com.box.sdk.BoxFileUploadSessionPart;
import com.box.sdk.BoxFolder;
import com.box.sdk.BoxUser;
import org.onedatashare.transferservice.odstransferservice.constant.ODSConstants;
import org.onedatashare.transferservice.odstransferservice.model.DataChunk;
import org.onedatashare.transferservice.odstransferservice.model.EntityInfo;
import org.onedatashare.transferservice.odstransferservice.model.credential.OAuthEndpointCredential;
import org.onedatashare.transferservice.odstransferservice.service.AuthenticateCredentials;
import org.onedatashare.transferservice.odstransferservice.service.InfluxCache;
import org.onedatashare.transferservice.odstransferservice.service.cron.MetricsCollector;
import org.onedatashare.transferservice.odstransferservice.service.step.ODSBaseWriter;
Expand All @@ -26,14 +29,15 @@
import java.util.List;

import static org.onedatashare.transferservice.odstransferservice.constant.ODSConstants.DEST_BASE_PATH;
import static org.onedatashare.transferservice.odstransferservice.constant.ODSConstants.OWNER_ID;

/**
* This class is responsible for writing to Box using the chunked upload approach & small file upload
* Ideally we should separate this out I think.
*/
public class BoxWriterLargeFile extends ODSBaseWriter implements ItemWriter<DataChunk> {

private final OAuthEndpointCredential credential;
private OAuthEndpointCredential credential;
private BoxAPIConnection boxAPIConnection;
EntityInfo fileInfo;
private HashMap<String, BoxFileUploadSession> fileMap;
Expand All @@ -42,22 +46,30 @@ public class BoxWriterLargeFile extends ODSBaseWriter implements ItemWriter<Data
String destinationBasePath;
BoxFolder boxFolder;
Logger logger = LoggerFactory.getLogger(BoxWriterLargeFile.class);
String usersEmail;
String ownerId;
private AuthenticateCredentials authenticateCredentials;

public BoxWriterLargeFile(OAuthEndpointCredential oAuthDestCredential, EntityInfo fileInfo, MetricsCollector metricsCollector, InfluxCache influxCache) {
public BoxWriterLargeFile(OAuthEndpointCredential oAuthDestCredential, EntityInfo fileInfo, MetricsCollector metricsCollector, InfluxCache influxCache, AuthenticateCredentials authenticateCredentials) {
super(metricsCollector, influxCache);
this.boxAPIConnection = new BoxAPIConnection(oAuthDestCredential.getToken());
this.fileInfo = fileInfo;
this.fileMap = new HashMap<>();
this.digestMap = new HashMap<>();
this.parts = new ArrayList<>();
this.credential = oAuthDestCredential;
this.authenticateCredentials = authenticateCredentials;
BoxUser user = new BoxUser(this.boxAPIConnection, this.credential.getAccountId());
BoxUser.Info userInfo = user.getInfo("login");
this.usersEmail = userInfo.getLogin();
}

@BeforeStep
public void beforeStep(StepExecution stepExecution) {
this.destinationBasePath = stepExecution.getJobParameters().getString(DEST_BASE_PATH); //path to place the files
this.boxFolder = new BoxFolder(this.boxAPIConnection, this.destinationBasePath);
this.stepExecution = stepExecution;
this.ownerId = stepExecution.getJobParameters().getString(OWNER_ID);
}

/**
Expand Down Expand Up @@ -111,6 +123,10 @@ private boolean ready(String fileName) {
*/
@Override
public void write(List<? extends DataChunk> items) throws NoSuchAlgorithmException {
if(this.credential.isTokenExpires()){
this.credential = authenticateCredentials.checkExpiryAndGenerateNew( usersEmail, ODSConstants.BOX, ownerId);
this.boxAPIConnection = new BoxAPIConnection(this.credential.getToken());
}
String fileName = items.get(0).getFileName();
prepareForUpload(fileName);
BoxFileUploadSession session = this.fileMap.get(fileName);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,17 @@
package org.onedatashare.transferservice.odstransferservice.service.step.dropbox;

import com.box.sdk.BoxUser;
import com.dropbox.core.DbxException;
import com.dropbox.core.v2.DbxClientV2;
import com.dropbox.core.v2.files.CommitInfo;
import com.dropbox.core.v2.files.FileMetadata;
import com.dropbox.core.v2.files.UploadSessionCursor;
import com.dropbox.core.v2.users.FullAccount;
import lombok.SneakyThrows;
import org.onedatashare.transferservice.odstransferservice.constant.ODSConstants;
import org.onedatashare.transferservice.odstransferservice.model.DataChunk;
import org.onedatashare.transferservice.odstransferservice.model.credential.OAuthEndpointCredential;
import org.onedatashare.transferservice.odstransferservice.service.AuthenticateCredentials;
import org.onedatashare.transferservice.odstransferservice.service.InfluxCache;
import org.onedatashare.transferservice.odstransferservice.service.cron.MetricsCollector;
import org.onedatashare.transferservice.odstransferservice.service.step.ODSBaseWriter;
Expand All @@ -25,10 +29,11 @@
import java.util.List;

import static org.onedatashare.transferservice.odstransferservice.constant.ODSConstants.DEST_BASE_PATH;
import static org.onedatashare.transferservice.odstransferservice.constant.ODSConstants.OWNER_ID;

public class DropBoxChunkedWriter extends ODSBaseWriter implements ItemWriter<DataChunk> {

private final OAuthEndpointCredential credential;
private OAuthEndpointCredential credential;
private String destinationPath;
private DbxClientV2 client;
String sessionId;
Expand All @@ -37,10 +42,16 @@ public class DropBoxChunkedWriter extends ODSBaseWriter implements ItemWriter<Da
private String fileName;
private FileMetadata uploadSessionFinishUploader;

private AuthenticateCredentials authenticateCredentials;

public DropBoxChunkedWriter(OAuthEndpointCredential credential, MetricsCollector metricsCollector, InfluxCache influxCache) {
String ownerId;
String usersEmail;


public DropBoxChunkedWriter(OAuthEndpointCredential credential, MetricsCollector metricsCollector, InfluxCache influxCache, AuthenticateCredentials authenticateCredentials) {
super(metricsCollector, influxCache);
this.credential = credential;
this.authenticateCredentials = authenticateCredentials;
}

@BeforeStep
Expand All @@ -51,7 +62,8 @@ public void beforeStep(StepExecution stepExecution) throws DbxException {
sessionId = this.client.files().uploadSessionStart().finish().getSessionId();
this.stepExecution = stepExecution;
this.cursor = new UploadSessionCursor(sessionId, 0);

this.ownerId = stepExecution.getJobParameters().getString(OWNER_ID);
this.usersEmail = client.users().getCurrentAccount().getEmail();
}

@SneakyThrows
Expand All @@ -66,6 +78,10 @@ public ExitStatus afterStep(StepExecution stepExecution) {

@Override
public void write(List<? extends DataChunk> items) throws Exception {
if(this.credential.isTokenExpires()){
this.credential = authenticateCredentials.checkExpiryAndGenerateNew( usersEmail, ODSConstants.DROPBOX, ownerId);
this.client = new DbxClientV2(ODSUtility.dbxRequestConfig, this.credential.getToken());
}
for(DataChunk chunk : items){
this.client.files().uploadSessionAppendV2(cursor).uploadAndFinish(new ByteArrayInputStream(chunk.getData()));
this.cursor = new UploadSessionCursor(sessionId, chunk.getStartPosition() + chunk.getSize());
Expand Down
Loading