diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..bae73d0 --- /dev/null +++ b/.gitignore @@ -0,0 +1,56 @@ +# Logs +logs +*.log +npm-debug.log* + +# Runtime data +pids +*.pid +*.seed + +# Directory for instrumented libs generated by jscoverage/JSCover +lib-cov + +# Coverage directory used by tools like istanbul +coverage + +# nyc test coverage +.nyc_output + +# Grunt intermediate storage (http://gruntjs.com/creating-plugins#storing-task-files) +.grunt + +# node-waf configuration +.lock-wscript + +# Compiled binary addons (http://nodejs.org/api/addons.html) +build/Release + +# Dependency directories +node_modules +jspm_packages + +# Optional npm cache directory +.npm + +# Optional REPL history +.node_repl_history + +# MacOs junk +.DS_Store + +# ignore Blotter environment file +.env + +# Librarian server key +librarian.key + +Librarian/configs/*.json + +# IntelliJ Configs +.idea/* +ThreadPoolDownload.iml + +target/* + +*.class \ No newline at end of file diff --git a/pom.xml b/pom.xml new file mode 100644 index 0000000..f77c0d3 --- /dev/null +++ b/pom.xml @@ -0,0 +1,100 @@ + + + 4.0.0 + + main.java + ThreadPoolDownload + 1.0-SNAPSHOT + + + + + org.apache.maven.plugins + maven-compiler-plugin + + 1.8 + 1.8 + + + + org.apache.maven.plugins + maven-source-plugin + 2.2.1 + + + attach-sources + + jar + + + + + + org.apache.maven.plugins + maven-jar-plugin + 2.4 + + + src/main/resources/META-INF/MANIFEST.MF + + + + + org.apache.maven.plugins + maven-shade-plugin + 1.6 + + true + + + *:* + + META-INF/*.SF + META-INF/*.DSA + META-INF/*.RSA + + + + + + + package + + shade + + + + + + com.download.NewMultipleDownloadManager + + + + + + + + org.apache.maven.plugins + maven-deploy-plugin + 2.7 + + true + + + + + + + commons-io + commons-io + 2.5 + + + commons-cli + commons-cli + 1.3.1 + + + \ No newline at end of file diff --git a/src/main/java/com/download/ConnectionData.java b/src/main/java/com/download/ConnectionData.java new file mode 100644 index 0000000..91fe040 --- /dev/null +++ b/src/main/java/com/download/ConnectionData.java @@ -0,0 +1,59 @@ +package com.download; + +import java.net.URL; + +/** + * Created by harshmathur on 17/02/17. + */ +public class ConnectionData { + + private Integer job; + + private ConnectionStatus connectionStatus; + + private URL url; + + private final Long speed; + + public String getFileName() { + return fileName; + } + + private final String fileName; + + public ConnectionData(Integer job, ConnectionStatus connectionStatus, URL url, Long speed, String fileName) { + this.job = job; + this.connectionStatus = connectionStatus; + this.url = url; + this.speed = speed; + this.fileName = fileName; + } + + public Long getSpeed() { + return speed; + } + + public Integer getJob() { + return job; + } + + public void setJob(Integer job) { + this.job = job; + } + + public ConnectionStatus getConnectionStatus() { + return connectionStatus; + } + + public void setConnectionStatus(ConnectionStatus connectionStatus) { + this.connectionStatus = connectionStatus; + } + + public URL getUrl() { + return url; + } + + public void setUrl(URL url) { + this.url = url; + } +} diff --git a/src/main/java/com/download/ConnectionStatus.java b/src/main/java/com/download/ConnectionStatus.java new file mode 100644 index 0000000..ed4ef55 --- /dev/null +++ b/src/main/java/com/download/ConnectionStatus.java @@ -0,0 +1,14 @@ +package com.download; + +/** + * Created by harshmathur on 17/02/17. + */ +public enum ConnectionStatus { + PART_DOWNLOADED, + PART_DOWNLOAD_FAILED, + DOWNLOADED, + DOWNLOAD_FAILED, + MERGED, + MERGE_FAILED, + ZERO_LENGTH_CONTENT +} diff --git a/src/main/java/com/download/NewDownloadWorker.java b/src/main/java/com/download/NewDownloadWorker.java new file mode 100644 index 0000000..7a73133 --- /dev/null +++ b/src/main/java/com/download/NewDownloadWorker.java @@ -0,0 +1,79 @@ +package com.download; + +import com.sun.org.apache.xpath.internal.operations.Bool; +import org.apache.commons.io.FilenameUtils; + +import java.io.*; +import java.net.URL; +import java.net.URLConnection; +import java.util.Date; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + +/** + * Created by harshmathur on 14/02/17. + */ +public class NewDownloadWorker { + + private static final int BUFFER_SIZE = 256000; + private FileOutputStream out; + private final URL url; + private final Long numberofBytesPerConnection; + private final Integer connectionNumber; + private final ExecutorService service; + private final Boolean resume; + + public NewDownloadWorker(Integer connectionNumber, URL url, + Long numberofBytesPerConnection, Boolean resume, ExecutorService service) { + this.url = url; + this.numberofBytesPerConnection = numberofBytesPerConnection; + this.connectionNumber = connectionNumber; + this.service = service; + this.resume = resume; + } + + public ConnectionData download() { + Long startTime = new Date().getTime(); + Long endTime = null; + String fileName = Utilities.getFileName(url, Integer.toString(connectionNumber), "/tmp"); + try { + Long start = new Long(connectionNumber * numberofBytesPerConnection); + Long end = start+numberofBytesPerConnection-1; + if (this.resume) { + File fileExists = new File(fileName); + Long length = fileExists.length(); + if (length < numberofBytesPerConnection) { + start = start + length; + } else if (length == numberofBytesPerConnection) { + return new ConnectionData(connectionNumber, ConnectionStatus.PART_DOWNLOADED, this.url, Long.MAX_VALUE, fileName); + } + } + URLConnection connection = this.url.openConnection(); + connection.setRequestProperty("Range", "bytes="+start+"-"+end); + BufferedInputStream in = new BufferedInputStream(connection.getInputStream()); + byte[] buffer = new byte[BUFFER_SIZE]; + Integer bytesRead = -1; + this.out = new FileOutputStream(fileName); + while (true) { + bytesRead = in.read(buffer); + if(bytesRead == -1) { + break; + } + out.write(buffer, 0, bytesRead); + } + out.close(); + in.close(); + endTime = new Date().getTime(); + return new ConnectionData(connectionNumber, ConnectionStatus.PART_DOWNLOADED, url, (end-start)/(endTime-startTime), fileName); + } catch (IOException e) { + e.printStackTrace(); + return new ConnectionData(connectionNumber, ConnectionStatus.PART_DOWNLOAD_FAILED, url, 0L, null); + } + catch (Exception e) { + e.printStackTrace(); + return new ConnectionData(connectionNumber, ConnectionStatus.PART_DOWNLOAD_FAILED, url, 0L, null); + } + } +} diff --git a/src/main/java/com/download/NewMultipleDownloadManager.java b/src/main/java/com/download/NewMultipleDownloadManager.java new file mode 100644 index 0000000..5c9f379 --- /dev/null +++ b/src/main/java/com/download/NewMultipleDownloadManager.java @@ -0,0 +1,88 @@ +package com.download; +import org.apache.commons.cli.*; + + +import java.util.*; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.stream.Collectors; + +/** + * Created by harshmathur on 17/02/17. + */ +public class NewMultipleDownloadManager { + + public static void main(String args[]) { + Options options = new Options(); + + Option parallelism = new Option("j", "no-of-threads", true, "Parallelism"); + parallelism.setRequired(false); + options.addOption(parallelism); + + Option urlList = new Option("f", "urls", true, "URLs in comma separated format"); + urlList.setRequired(true); + options.addOption(urlList); + + Option maxNUmberOfConnections = new Option("x", "max-number-of-connections", true, "Maximum Number of Connections per URL"); + maxNUmberOfConnections.setRequired(true); + options.addOption(maxNUmberOfConnections); + + Option saveDirOption = new Option("d", "save-dir", true, "Directory to Save files"); + saveDirOption.setRequired(true); + options.addOption(saveDirOption); + + Option forceFresh = new Option("k", "fresh", false, "Force The Manager to Download all parts again, don't resume"); + forceFresh.setRequired(false); + options.addOption(forceFresh); + + CommandLineParser parser = new DefaultParser(); + HelpFormatter formatter = new HelpFormatter(); + CommandLine cmd; + + try { + cmd = parser.parse(options, args); + } catch (ParseException e) { + System.out.println(e.getMessage()); + formatter.printHelp("utility-name", options); + System.exit(1); + return; + } + + Integer numberOfThreads = Integer.parseInt(cmd.getOptionValue("no-of-threads")); + List urls = new ArrayList(Arrays.asList(cmd.getOptionValue("urls").split(","))); + Integer maxNumberOfConnections = Integer.parseInt(cmd.getOptionValue("max-number-of-connections")); + String saveDir = cmd.getOptionValue("save-dir"); + Boolean resume = !cmd.hasOption("fresh"); + + final ExecutorService service = Executors.newFixedThreadPool(numberOfThreads); + + List> downLoadStatus = urls.stream() + .map(url -> { + return CompletableFuture.supplyAsync(() -> { + return new NewURLDownloadManager( + url, saveDir, maxNumberOfConnections, resume, service).startDownload(); + + }, service); + }) + .map(urlDownloadFuture -> urlDownloadFuture.join()) + .collect(Collectors.toList()); + + CompletableFuture> urlDownloadStatusAll = Utilities.sequence(downLoadStatus, service); + CompletableFuture> urlDownloadStatusesList = urlDownloadStatusAll.thenApplyAsync(urlDownloadStatuses -> urlDownloadStatuses.stream() + .map(urlDownloadStatus -> printStatus(urlDownloadStatus)) + .collect(Collectors.toList()) + ); + + urlDownloadStatusesList.thenApply(urlDownloadStatuses -> { + service.shutdown(); + return null; + }); + + } + + public static URLDownloadStatus printStatus(URLDownloadStatus urlStatus) { + System.out.println("URL: "+urlStatus.getUrl()+" status: "+urlStatus.getStatus()+" speed: "+urlStatus.getSpeed() + " KBps"); + return urlStatus; + } +} diff --git a/src/main/java/com/download/NewURLDownloadManager.java b/src/main/java/com/download/NewURLDownloadManager.java new file mode 100644 index 0000000..a717849 --- /dev/null +++ b/src/main/java/com/download/NewURLDownloadManager.java @@ -0,0 +1,120 @@ +package com.download; + +import org.apache.commons.io.FilenameUtils; + +import java.io.*; +import java.net.MalformedURLException; +import java.net.URL; +import java.net.URLConnection; +import java.util.*; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.stream.Collectors; + +/** + * Created by harshmathur on 17/02/17. + */ +public class NewURLDownloadManager { + + private final String uri; + private final String saveDir; + private static final int BUFFER_SIZE = 256000; + private final ExecutorService service; + private final Integer maxNumberOfConnections; + private final Boolean resume; + + public NewURLDownloadManager(String uri, String saveDir, + Integer maxNumberOfConnections, Boolean resume, ExecutorService service) { + this.uri = uri; + this.saveDir = saveDir; + this.service = service; + this.maxNumberOfConnections = maxNumberOfConnections; + this.resume = resume; + } + + public CompletableFuture startDownload() { + final URL url; + try { + url = new URL(this.uri); + URLConnection connection = url.openConnection(); + Long contentLength = connection.getContentLengthLong(); + if (contentLength < 1) { + CompletableFuture.completedFuture(new URLDownloadStatus(this.uri, ConnectionStatus.ZERO_LENGTH_CONTENT, 0L)); + } + final Integer numberOfConnections; + final Long numberofBytesPerConnection; + String acceptRanges = connection.getHeaderField("Accept-Ranges"); + if (acceptRanges != null && acceptRanges.equals("bytes")) { + numberOfConnections = this.maxNumberOfConnections; + } else { + numberOfConnections = 1; + } + numberofBytesPerConnection = contentLength / numberOfConnections; + + List connectionRange = Utilities.getRange(0, numberOfConnections); + + List> connectionDataListOfFutures = connectionRange + .stream() + .map(connectionNumber -> CompletableFuture.supplyAsync( + () -> getPartialData( + connectionNumber, url, numberofBytesPerConnection, resume, service + ), + service + ) + ) + .collect(Collectors.toList()); + + CompletableFuture> futureListConnectionData = Utilities.sequence(connectionDataListOfFutures, service); + + return futureListConnectionData.thenApplyAsync(connectionDatas -> merge(connectionDatas, url), service); + + } catch (MalformedURLException e) { + e.printStackTrace(); + return CompletableFuture.completedFuture(new URLDownloadStatus(uri, ConnectionStatus.DOWNLOAD_FAILED, 0L)); + } catch (IOException e) { + e.printStackTrace(); + return CompletableFuture.completedFuture(new URLDownloadStatus(uri, ConnectionStatus.DOWNLOAD_FAILED, 0L)); + } + } + + public ConnectionData getPartialData(Integer connectionNumber, + URL url, Long numberofBytesPerConnection, Boolean resume, ExecutorService service) { + NewDownloadWorker worker = new NewDownloadWorker(connectionNumber, url, numberofBytesPerConnection, resume, service); + return worker.download(); + } + + public URLDownloadStatus merge (List connectionDataList, URL url){ + try{ + Long totalSpeed = 0L; + InputStream inputStream = null; + FileOutputStream out = new FileOutputStream(Utilities.getFileName(url, "", this.saveDir)); + for (ConnectionData connectionData: connectionDataList) { + if (connectionData.getConnectionStatus() != ConnectionStatus.PART_DOWNLOADED) { + return new URLDownloadStatus(uri, ConnectionStatus.DOWNLOAD_FAILED, 0L); + } + totalSpeed += connectionData.getSpeed(); + inputStream = new FileInputStream(connectionData.getFileName()); + byte[] buffer = new byte[BUFFER_SIZE]; + Integer bytesRead = -1; + while((bytesRead = inputStream.read(buffer)) != -1) { + out.write(buffer, 0, bytesRead); + } + inputStream.close(); + new File(connectionData.getFileName()).delete(); + } + out.close(); + return new URLDownloadStatus(uri, ConnectionStatus.MERGED, totalSpeed); + } + catch(FileNotFoundException e) { + e.printStackTrace(); + return new URLDownloadStatus(uri, ConnectionStatus.MERGE_FAILED, 0L); + } catch (IOException e) { + e.printStackTrace(); + return new URLDownloadStatus(uri, ConnectionStatus.MERGE_FAILED, 0L); + } + catch (Exception e) { + e.printStackTrace(); + return new URLDownloadStatus(uri, ConnectionStatus.MERGE_FAILED, 0L); + } + } +} diff --git a/src/main/java/com/download/URLDownloadStatus.java b/src/main/java/com/download/URLDownloadStatus.java new file mode 100644 index 0000000..2636127 --- /dev/null +++ b/src/main/java/com/download/URLDownloadStatus.java @@ -0,0 +1,31 @@ +package com.download; + +/** + * Created by harshmathur on 02/03/17. + */ +public class URLDownloadStatus { + + public URLDownloadStatus(String url, ConnectionStatus status, Long speed) { + this.url = url; + this.status = status; + this.speed = speed; + } + + private String url; + + public String getUrl() { + return url; + } + + public ConnectionStatus getStatus() { + return status; + } + + private ConnectionStatus status; + + public Long getSpeed() { + return speed; + } + + private Long speed; +} diff --git a/src/main/java/com/download/Utilities.java b/src/main/java/com/download/Utilities.java new file mode 100644 index 0000000..66f3dc3 --- /dev/null +++ b/src/main/java/com/download/Utilities.java @@ -0,0 +1,44 @@ +package com.download; + +import org.apache.commons.io.FilenameUtils; + +import java.io.UnsupportedEncodingException; +import java.net.URL; +import java.net.URLDecoder; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; +import java.util.stream.Collectors; + +/** + * Created by harshmathur on 02/03/17. + */ +public class Utilities { + public static CompletableFuture> sequence(List> futures, Executor service) { + CompletableFuture allDoneFuture = + CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()])); + return allDoneFuture.thenApplyAsync(v -> + futures.stream(). + map(future -> future.join()). + collect(Collectors.toList()), + service + ); + } + + public static List getRange(Integer first, Integer count) { + List ints = new ArrayList<>(); + for (int i = 0; i < count; i++) { + ints.add(first + i); + } + return ints; + } + + public static String getFileName(URL url, String connectionNumber, String saveDir) { + try { + return saveDir+"/"+ URLDecoder.decode(FilenameUtils.getName(url.getPath()), "UTF-8")+connectionNumber; + } catch (UnsupportedEncodingException e) { + return saveDir+"/"+ FilenameUtils.getName(url.getPath())+connectionNumber; + } + } +} diff --git a/src/main/resources/META-INF/MANIFEST.MF b/src/main/resources/META-INF/MANIFEST.MF new file mode 100644 index 0000000..ce2f420 --- /dev/null +++ b/src/main/resources/META-INF/MANIFEST.MF @@ -0,0 +1,10 @@ +Manifest-Version: 1.0 +Implementation-Title: com.download +Implementation-Version: 1.0-SNAPSHOT +Implementation-Vendor-Id: com.download +Build-Jdk: 1.8 +Built-By: harshmathur.1990 +Created-By: Apache Maven +Main-Class: com.download.NewMultipleDownloadManager +Can-Retransform-Classes: true +Can-Redefine-Classes: true