Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

moved expansion logic to transfer-service from scheduler #100

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@ public class EntityInfo {
private long size;
private int chunkSize;

private String name;
private String parent;
private String checksum;

@Override
public String toString(){
ObjectMapper objectMapper = new ObjectMapper();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,18 @@ public EndpointCredential(String accountId){
this.accountId = accountId;
}

public static AccountEndpointCredential getAccountCredential(EndpointCredential endpointCredential){
if(endpointCredential instanceof AccountEndpointCredential){
return (AccountEndpointCredential) endpointCredential;
}else{
return null;
}
}
public static OAuthEndpointCredential getOAuthCredential(EndpointCredential endpointCredential){
if(endpointCredential instanceof OAuthEndpointCredential){
return (OAuthEndpointCredential) endpointCredential;
}else{
return null;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.onedatashare.transferservice.odstransferservice.utility.ODSUtility;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
Expand Down Expand Up @@ -42,6 +43,9 @@ public class CarbonJobMeasure {
@Value("${ods.user}")
private String odsUser;

@Autowired
private ExpanderService expanderService;

public CarbonJobMeasure(IMap<UUID, HazelcastJsonValue> carbonIntensityMap, IMap<UUID, HazelcastJsonValue> fileTransferScheduleMap, PmeterParser pmeterParser, ObjectMapper objectMapper) {
this.carbonIntensityMap = carbonIntensityMap;
this.fileTransferScheduleMap = fileTransferScheduleMap;
Expand All @@ -61,7 +65,8 @@ public List<TransferJobRequest> getPotentialJobsFromMap() {
Collection<HazelcastJsonValue> jsonJobs = this.fileTransferScheduleMap.values(potentialJobs);
return jsonJobs.stream().map(hazelcastJsonValue -> {
try {
return this.objectMapper.readValue(hazelcastJsonValue.getValue(), TransferJobRequest.class);
TransferJobRequest job = this.objectMapper.readValue(hazelcastJsonValue.getValue(), TransferJobRequest.class);
return expanderService.expandFiles(job);
} catch (JsonProcessingException e) {
logger.error("Json Processing Exception: {}\n With message: {}", e, e.getMessage());
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package org.onedatashare.transferservice.odstransferservice.service;

import org.onedatashare.transferservice.odstransferservice.model.EntityInfo;

import java.util.List;

public abstract class DestinationChunkSize {

/**
* A class should override this if that protocol needs to get the chunkSize determined by Destination
* @param expandedFiles
* @param basePath
* @return
*/
public List<EntityInfo> destinationChunkSize(List<EntityInfo> expandedFiles, String basePath, Integer userChunkSize){
if(userChunkSize > 15000000){
userChunkSize = 14900000;
}
for(EntityInfo fileInfo : expandedFiles){
fileInfo.setChunkSize(userChunkSize);
}
return expandedFiles;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package org.onedatashare.transferservice.odstransferservice.service;

import org.onedatashare.transferservice.odstransferservice.model.EntityInfo;
import org.onedatashare.transferservice.odstransferservice.model.TransferJobRequest;
import org.onedatashare.transferservice.odstransferservice.service.expanders.*;
import org.springframework.stereotype.Service;

import java.util.List;

@Service
public class ExpanderService {

public TransferJobRequest expandFiles(TransferJobRequest job){
TransferJobRequest.Source source = job.getSource();
switch(source.getType()){
case vfs -> {
VfsExpander vfsExpander = new VfsExpander();
List<EntityInfo> fileInfo = vfsExpander.expandedFileSystem(source.getInfoList(), source.getFileSourcePath());
job.getSource().setInfoList(fileInfo);
}

case http -> {
HttpExpander httpExpander = new HttpExpander();
List<EntityInfo> fileInfo = httpExpander.expandedFileSystem(source.getInfoList(), source.getFileSourcePath());
job.getSource().setInfoList(fileInfo);
}

case box -> {
BoxExpander boxExpander = new BoxExpander();
List<EntityInfo> fileInfo = boxExpander.expandedFileSystem(source.getInfoList(), source.getFileSourcePath());
job.getSource().setInfoList(fileInfo);
}
case dropbox -> {
DropBoxExpander dropBoxExpander = new DropBoxExpander();
List<EntityInfo> fileInfo = dropBoxExpander.expandedFileSystem(source.getInfoList(), source.getFileSourcePath());
job.getSource().setInfoList(fileInfo);
}
case gdrive -> {
GDriveExpander gDriveExpander = new GDriveExpander();
List<EntityInfo> fileInfo = gDriveExpander.expandedFileSystem(source.getInfoList(), source.getFileSourcePath());
job.getSource().setInfoList(fileInfo);
}
case s3 -> {
S3Expander s3Expander = new S3Expander();
List<EntityInfo> fileInfo = s3Expander.expandedFileSystem(source.getInfoList(), source.getFileSourcePath());
job.getSource().setInfoList(fileInfo);

}
case sftp -> {
SFTPExpander sftpExpander = new SFTPExpander();
List<EntityInfo> fileInfo = sftpExpander.expandedFileSystem(source.getInfoList(), source.getFileSourcePath());
job.getSource().setInfoList(fileInfo);
}
case ftp -> {
FTPExpander ftpExpander = new FTPExpander();
List<EntityInfo> fileInfo = ftpExpander.expandedFileSystem(source.getInfoList(), source.getFileSourcePath());
job.getSource().setInfoList(fileInfo);
}
}
return job;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
package org.onedatashare.transferservice.odstransferservice.service.expanders;

import com.box.sdk.*;
import org.onedatashare.transferservice.odstransferservice.model.EntityInfo;
import org.onedatashare.transferservice.odstransferservice.model.credential.EndpointCredential;
import org.onedatashare.transferservice.odstransferservice.model.credential.OAuthEndpointCredential;
import org.onedatashare.transferservice.odstransferservice.service.DestinationChunkSize;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;

import java.util.ArrayList;
import java.util.List;
import java.util.Stack;

@Component
public class BoxExpander extends DestinationChunkSize implements FileExpander {

BoxAPIConnection connection;
Logger logger = LoggerFactory.getLogger(BoxExpander.class);

@Override
public void createClient(EndpointCredential credential) {
OAuthEndpointCredential oAuthEndpointCredential = EndpointCredential.getOAuthCredential(credential);
connection = new BoxAPIConnection(oAuthEndpointCredential.getToken());
}

@Override
public List<EntityInfo> expandedFileSystem(List<EntityInfo> userSelectedResources, String basePath) {
List<EntityInfo> transferFiles = new ArrayList<>();
Stack<BoxFolder> travStack = new Stack<>();//this will only hold folders to traverse
if(userSelectedResources.isEmpty()) return new ArrayList<>(); //we need to signal the cancellation of this transferjob request.
for(EntityInfo selectedResource : userSelectedResources){
boolean isFile = false;
try{
BoxFile temp = new BoxFile(this.connection, selectedResource.getId());
transferFiles.add(boxFileToEntityInfo(temp));
isFile = true;
}catch (BoxAPIException ignored){
logger.info("Tried to open {} as a file but it did not work", selectedResource.toString());
isFile = false;
}
if(!isFile){
try{
BoxFolder temp = new BoxFolder(this.connection, selectedResource.getId());
travStack.push(temp);
}catch (BoxAPIException ignored){
logger.info("Tried to open {} as a folder but it did not work", selectedResource.toString());
}
}
}
while(!travStack.isEmpty()){
BoxFolder folder = travStack.pop();
for(BoxItem.Info child : folder){
if (child instanceof BoxFile.Info) {
BoxFile.Info fileInfo = (BoxFile.Info) child;
BoxFile boxFile = new BoxFile(this.connection, fileInfo.getID());
transferFiles.add(boxFileToEntityInfo(boxFile));
} else if (child instanceof BoxFolder.Info) {
BoxFolder.Info folderInfo = (BoxFolder.Info) child;
BoxFolder childFolder = new BoxFolder(this.connection, folderInfo.getID());
travStack.push(childFolder);
}
}
}
return transferFiles;
}

@Override
public List<EntityInfo> destinationChunkSize(List<EntityInfo> expandedFiles, String basePath, Integer userChunkSize) {
BoxFolder destinationUploadFolder = new BoxFolder(this.connection, basePath);
for(EntityInfo entityInfo : expandedFiles){
if(entityInfo.getSize() < 1024*1024*20){
entityInfo.setChunkSize(Math.toIntExact(entityInfo.getSize()));
}else{
BoxFileUploadSession.Info uploadSession = destinationUploadFolder.createUploadSession(entityInfo.getId(), entityInfo.getSize());
entityInfo.setChunkSize(uploadSession.getPartSize());
}
}
return expandedFiles;
}


public EntityInfo boxFileToEntityInfo(BoxFile boxFile) {
BoxFile.Info boxFileInfo = boxFile.getInfo();
EntityInfo fileInfo = new EntityInfo();
fileInfo.setId(boxFileInfo.getID());
fileInfo.setName(boxFileInfo.getName());
//todo - should this be entire path or just parent id?
fileInfo.setPath(boxFileInfo.getParent().getID());
fileInfo.setSize(boxFileInfo.getSize());
//todo - check if etag or sha1
fileInfo.setChecksum(boxFileInfo.getSha1());
fileInfo.setParent(boxFileInfo.getParent().getID());
return fileInfo;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
package org.onedatashare.transferservice.odstransferservice.service.expanders;

import com.dropbox.core.DbxException;
import com.dropbox.core.DbxRequestConfig;
import com.dropbox.core.v2.DbxClientV2;
import com.dropbox.core.v2.files.FileMetadata;
import com.dropbox.core.v2.files.FolderMetadata;
import com.dropbox.core.v2.files.Metadata;
import org.onedatashare.transferservice.odstransferservice.model.EntityInfo;
import org.onedatashare.transferservice.odstransferservice.model.credential.EndpointCredential;
import org.onedatashare.transferservice.odstransferservice.service.DestinationChunkSize;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Stack;

@Component
public class DropBoxExpander extends DestinationChunkSize implements FileExpander {

private DbxClientV2 client;

@Value("${dropbox.identifier}")
private String odsClientID = "OneDataShare-DIDCLab";


@Override
public void createClient(EndpointCredential credential) {
DbxRequestConfig config = DbxRequestConfig.newBuilder(odsClientID).build();
this.client = new DbxClientV2(config, ((EndpointCredential.getOAuthCredential(credential))).getToken());
}

@Override
public List<EntityInfo> expandedFileSystem(List<EntityInfo> userSelectedResources, String parentPath) {
Stack<Metadata> traversalQueue = new Stack<>();
List<EntityInfo> expandedFiles = new ArrayList<>();
if (parentPath == null || parentPath.isEmpty()) parentPath = "";
//Expand all the files.
if (userSelectedResources == null || userSelectedResources.isEmpty()) {
List<Metadata> resources = listOp(parentPath);
for (Metadata resource : resources) {
if (resource instanceof FileMetadata) {
expandedFiles.add(metaDataToFileInfo((FileMetadata) resource));
} else if (resource instanceof FolderMetadata) {
traversalQueue.push(resource);
}
}
} else {
for (EntityInfo fileInfo : userSelectedResources) {
List<Metadata> dropBoxFiles = listOp(fileInfo.getPath());
dropBoxFiles.forEach(metadata -> {
if (metadata instanceof FileMetadata) {
expandedFiles.add(metaDataToFileInfo((FileMetadata) metadata));
} else if (metadata instanceof FolderMetadata) {
traversalQueue.push(metadata);
}
});
}
}
while (!traversalQueue.isEmpty()) {
FolderMetadata folderMetadata = (FolderMetadata) traversalQueue.pop();
List<Metadata> folderList = listOp(folderMetadata.getPathLower());
for (Metadata res : folderList) {
if (res instanceof FileMetadata) {
expandedFiles.add(metaDataToFileInfo((FileMetadata) res));
} else if (res instanceof FolderMetadata) {
traversalQueue.push(res);
}
}
}
return expandedFiles;
}

public EntityInfo metaDataToFileInfo(FileMetadata file) {
EntityInfo fileInfo = new EntityInfo();
fileInfo.setSize(file.getSize());
fileInfo.setId(file.getId());
fileInfo.setPath(file.getPathLower());
return fileInfo;
}

public List<Metadata> listOp(String path) {
try {
return this.client.files().listFolderBuilder(path).start().getEntries();
} catch (DbxException e) {}
try{
return Collections.singletonList(this.client.files().getMetadata(path));
} catch (DbxException e){}
return new ArrayList<>();
}

@Override
public List<EntityInfo> destinationChunkSize(List<EntityInfo> expandedFiles, String basePath, Integer userChunkSize){
for(EntityInfo fileInfo : expandedFiles){
if(fileInfo.getSize() < 8L << 20){
fileInfo.setChunkSize(Long.valueOf(fileInfo.getSize()).intValue());
}else if(userChunkSize < 4L << 20){
fileInfo.setChunkSize(10000000);
}else{
fileInfo.setChunkSize(userChunkSize);
}
}
return expandedFiles;
}
}
Loading