Skip to content

Commit 26a4da3

Browse files
authored
Merge pull request #564 from scribe/cancel
Cancel
2 parents 69ad930 + a42c6d6 commit 26a4da3

File tree

15 files changed

+464
-77
lines changed

15 files changed

+464
-77
lines changed

build.gradle

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
*/
1515

1616
buildscript {
17-
ext.kotlin_version = '1.1.51'
17+
ext.kotlin_version = '1.2.70'
1818

1919
repositories {
2020
mavenCentral()
@@ -36,7 +36,7 @@ plugins {
3636

3737
allprojects {
3838
group = 'com.spectralogic.ds3'
39-
version = '5.0.3'
39+
version = '5.0.4'
4040
}
4141

4242
subprojects {

ds3-sdk-integration/src/test/java/com/spectralogic/ds3client/integration/PutJobManagement_Test.java

Lines changed: 137 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
import com.google.common.collect.ImmutableList;
1919
import com.google.common.collect.ImmutableMap;
20+
import com.google.common.collect.Iterables;
2021
import com.google.common.collect.Lists;
2122
import com.spectralogic.ds3client.Ds3Client;
2223
import com.spectralogic.ds3client.Ds3ClientBuilder;
@@ -29,6 +30,7 @@
2930
import com.spectralogic.ds3client.helpers.events.FailureEvent;
3031
import com.spectralogic.ds3client.helpers.events.SameThreadEventRunner;
3132
import com.spectralogic.ds3client.helpers.options.WriteJobOptions;
33+
import com.spectralogic.ds3client.helpers.pagination.GetBucketKeyLoaderFactory;
3234
import com.spectralogic.ds3client.helpers.strategy.blobstrategy.*;
3335
import com.spectralogic.ds3client.helpers.strategy.channelstrategy.ChannelStrategy;
3436
import com.spectralogic.ds3client.helpers.strategy.channelstrategy.SequentialFileReaderChannelStrategy;
@@ -43,6 +45,7 @@
4345
import com.spectralogic.ds3client.utils.ByteArraySeekableByteChannel;
4446
import com.spectralogic.ds3client.utils.Platform;
4547
import com.spectralogic.ds3client.utils.ResourceUtils;
48+
import com.spectralogic.ds3client.utils.collections.LazyIterable;
4649
import com.spectralogic.ds3client.utils.hashing.ChecksumUtils;
4750
import com.spectralogic.ds3client.utils.hashing.Hasher;
4851
import org.apache.commons.io.FileUtils;
@@ -91,6 +94,11 @@ public class PutJobManagement_Test {
9194
private static TempStorageIds envStorageIds;
9295
private static UUID envDataPolicyId;
9396

97+
private static final String SOURCE_DIRECTORY = "little_files/";
98+
private static final String SOURCE_FILE_BASE_NAME = "tape";
99+
private static final String SOURCE_FILE_EXTENSION = "png";
100+
private static final String SOURCE_FILE_NAME = SOURCE_FILE_BASE_NAME + "." + SOURCE_FILE_EXTENSION;
101+
94102
@BeforeClass
95103
public static void startup() throws IOException {
96104
envDataPolicyId = TempStorageUtil.setupDataPolicy(TEST_ENV_NAME, false, ChecksumType.Type.MD5, client);
@@ -1946,31 +1954,7 @@ public void testPutting15000Files() throws IOException, URISyntaxException {
19461954
Path tempDirectory = Files.createTempDirectory(Paths.get("."), tempPathPrefix);
19471955

19481956
try {
1949-
final String sourceDirectory = "little_files/";
1950-
final String sourceFileName = "tape.png";
1951-
1952-
final Path sourceFilePath = ResourceUtils.loadFileResource(sourceDirectory + sourceFileName);
1953-
1954-
Files.copy(sourceFilePath, Paths.get(tempDirectory.toString(), sourceFileName));
1955-
1956-
final List<String> fileNames = new ArrayList<>(15001);
1957-
fileNames.add(sourceFileName);
1958-
1959-
for (int i = 1; i <= 15000; ++i) {
1960-
final String destinationFileName = "tape" + i + ".png";
1961-
fileNames.add(destinationFileName);
1962-
Files.copy(sourceFilePath, Paths.get(tempDirectory.toString(), destinationFileName));
1963-
}
1964-
1965-
final List<Ds3Object> ds3Objects = new ArrayList<>();
1966-
1967-
for (final String fileName : fileNames) {
1968-
final Path filePath = Paths.get(tempDirectory.toString(), fileName);
1969-
final long fileSize = Files.size(filePath);
1970-
final Ds3Object ds3Object = new Ds3Object(fileName, fileSize);
1971-
1972-
ds3Objects.add(ds3Object);
1973-
}
1957+
final ImmutableList<Ds3Object> ds3Objects = copyFilesAndGenerateDsObjects(tempDirectory);
19741958

19751959
final Ds3ClientHelpers ds3ClientHelpers = Ds3ClientHelpers.wrap(client);
19761960
final Ds3ClientHelpers.Job writeJob = ds3ClientHelpers.startWriteJob(BUCKET_NAME, ds3Objects);
@@ -1982,26 +1966,150 @@ public void testPutting15000Files() throws IOException, URISyntaxException {
19821966
tempDirectory = Files.createTempDirectory(Paths.get("."), tempPathPrefix);
19831967

19841968
final Ds3ClientHelpers.Job readJob = ds3ClientHelpers.startReadJob(BUCKET_NAME, Lists.newArrayList(
1985-
new Ds3Object(sourceFileName, Files.size(sourceFilePath))));
1969+
new Ds3Object(SOURCE_FILE_NAME, Files.size(sourceFilePath()))));
19861970

19871971
final GetJobSpectraS3Response jobSpectraS3Response = client.getJobSpectraS3(new GetJobSpectraS3Request(readJob.getJobId()));
19881972

19891973
assertThat(jobSpectraS3Response.getMasterObjectListResult(), is(notNullValue()));
19901974

19911975
readJob.transfer(new FileObjectGetter(tempDirectory));
19921976

1993-
final File originalFile = ResourceUtils.loadFileResource(sourceDirectory + sourceFileName).toFile();
1994-
final File fileCopiedFromBP = Paths.get(tempDirectory.toString(), sourceFileName).toFile();
1977+
final File originalFile = ResourceUtils.loadFileResource(SOURCE_DIRECTORY + SOURCE_FILE_NAME).toFile();
1978+
final File fileCopiedFromBP = Paths.get(tempDirectory.toString(), SOURCE_FILE_NAME).toFile();
19951979
assertTrue(FileUtils.contentEquals(originalFile, fileCopiedFromBP));
19961980
} catch (final org.apache.http.client.ClientProtocolException e) {
1997-
fail("This test makes sure that we don't run out of connections when transferring lots of small files. Oops");
1981+
fail("This test makes sure that we don't run out of connections when transferring lots of small files. Oops.");
19981982
} finally {
19991983
FileUtils.deleteDirectory(tempDirectory.toFile());
20001984
cancelAllJobsForBucket(client, BUCKET_NAME);
20011985
deleteAllContents(client, BUCKET_NAME);
20021986
}
20031987
}
20041988

1989+
private Path sourceFilePath() throws IOException, URISyntaxException {
1990+
return ResourceUtils.loadFileResource(SOURCE_DIRECTORY + SOURCE_FILE_NAME);
1991+
}
1992+
1993+
private ImmutableList<Ds3Object> copyFilesAndGenerateDsObjects(final Path destinationDirectory) throws IOException, URISyntaxException {
1994+
final Path sourceFilePath = ResourceUtils.loadFileResource(SOURCE_DIRECTORY + SOURCE_FILE_NAME);
1995+
1996+
final ImmutableList<String> fileNames = generateFileNames(SOURCE_FILE_BASE_NAME, SOURCE_FILE_EXTENSION, 15000);
1997+
copyFiles(sourceFilePath, destinationDirectory, fileNames);
1998+
1999+
return generateDs3Objects(destinationDirectory, fileNames);
2000+
}
2001+
2002+
private ImmutableList<String> generateFileNames(final String baseFileName, final String baseFileExtension, final int numFileNames) {
2003+
final ImmutableList.Builder<String> listBuilder = ImmutableList.builder();
2004+
2005+
listBuilder.add(baseFileName + "." + baseFileExtension);
2006+
2007+
for (int i = 1; i < numFileNames; ++i) {
2008+
listBuilder.add(baseFileName + i + "." + baseFileExtension);
2009+
}
2010+
2011+
return listBuilder.build();
2012+
}
2013+
2014+
private void copyFiles(final Path sourceFilePath, final Path destinationDirectory, final ImmutableList<String> fileNames) throws IOException {
2015+
for (final String fileName : fileNames) {
2016+
Files.copy(sourceFilePath, Paths.get(destinationDirectory.toString(), fileName));
2017+
}
2018+
}
2019+
2020+
private ImmutableList<Ds3Object> generateDs3Objects(final Path destinationDirectory, final ImmutableList<String> fileNames) throws IOException {
2021+
final ImmutableList.Builder<Ds3Object> listBuilder = ImmutableList.builder();
2022+
2023+
for (final String fileName : fileNames) {
2024+
final Path filePath = Paths.get(destinationDirectory.toString(), fileName);
2025+
final long fileSize = Files.size(filePath);
2026+
final Ds3Object ds3Object = new Ds3Object(fileName, fileSize);
2027+
2028+
listBuilder.add(ds3Object);
2029+
}
2030+
2031+
return listBuilder.build();
2032+
}
2033+
2034+
@Test
2035+
public void testCancelingJob() throws URISyntaxException, InterruptedException {
2036+
final String tempPathPrefix = null;
2037+
Path tempDirectory;
2038+
2039+
try {
2040+
tempDirectory = Files.createTempDirectory(Paths.get("."), tempPathPrefix);
2041+
} catch (final IOException e) {
2042+
fail("Could not create temp folder.");
2043+
return;
2044+
}
2045+
2046+
try {
2047+
final ImmutableList<Ds3Object> ds3Objects = copyFilesAndGenerateDsObjects(tempDirectory);
2048+
2049+
final Ds3ClientHelpers ds3ClientHelpers = Ds3ClientHelpers.wrap(client);
2050+
2051+
final Ds3ClientHelpers.Job writeJob = ds3ClientHelpers.startWriteJob(BUCKET_NAME, ds3Objects);
2052+
2053+
final AtomicBoolean caughtExceptionWhileCanceling = new AtomicBoolean(false);
2054+
final AtomicBoolean cancelHandlerCalled = new AtomicBoolean(false);
2055+
final CountDownLatch countDownLatch = new CountDownLatch(1);
2056+
2057+
final AtomicBoolean hasBeenCancelled = new AtomicBoolean(false);
2058+
writeJob.attachDataTransferredListener(dataTransferred -> {
2059+
if(hasBeenCancelled.compareAndSet(false, true)) {
2060+
new Thread(() -> {
2061+
try {
2062+
writeJob.cancel();
2063+
} catch (IOException e) {
2064+
fail();
2065+
} finally {
2066+
countDownLatch.countDown();
2067+
}
2068+
}).start();
2069+
}
2070+
});
2071+
2072+
writeJob.attachCanceledEventObserver(new CanceledEventObserver(eventData -> {
2073+
cancelHandlerCalled.set(true);
2074+
assertEquals(writeJob.getJobId(), eventData);
2075+
}));
2076+
2077+
writeJob.transfer(new FileObjectPutter(tempDirectory));
2078+
2079+
countDownLatch.await();
2080+
2081+
assertFalse(caughtExceptionWhileCanceling.get());
2082+
assertTrue(cancelHandlerCalled.get());
2083+
2084+
final String prefix = "";
2085+
final String nextMarker = null;
2086+
final int maxKeys = 15000;
2087+
final String delimiter = null;
2088+
final int numRetries = 5;
2089+
2090+
final GetBucketKeyLoaderFactory<Contents> getBucketKeyLoaderFactory = new GetBucketKeyLoaderFactory<>(client, BUCKET_NAME, prefix, delimiter, nextMarker, maxKeys, numRetries, GetBucketKeyLoaderFactory.contentsFunction);
2091+
final LazyIterable<?> iterable = new LazyIterable<>(getBucketKeyLoaderFactory);
2092+
final int numThingsInBucket = Iterables.size(iterable);
2093+
2094+
assertTrue(numThingsInBucket < maxKeys);
2095+
2096+
final GetCanceledJobSpectraS3Response getCanceledJobSpectraS3Response = client.getCanceledJobSpectraS3(new GetCanceledJobSpectraS3Request(writeJob.getJobId().toString()));
2097+
final CanceledJob canceledJob = getCanceledJobSpectraS3Response.getCanceledJobResult();
2098+
assertEquals(writeJob.getJobId(), canceledJob.getId());
2099+
} catch (final org.apache.http.client.ClientProtocolException e) {
2100+
fail("This test makes sure that we don't run out of connections when transferring lots of small files. Oops.");
2101+
} catch (final IOException e) {
2102+
fail("IOException from something other than job cancelation.");
2103+
} finally {
2104+
try {
2105+
FileUtils.deleteDirectory(tempDirectory.toFile());
2106+
deleteAllContents(client, BUCKET_NAME);
2107+
} catch (final IOException e) {
2108+
LOG.error("Failure cleaning up.", e);
2109+
}
2110+
}
2111+
}
2112+
20052113
@Test
20062114
public void testThatFifoIsNotProcessed() throws IOException, InterruptedException {
20072115
Assume.assumeFalse(Platform.isWindows());

ds3-sdk/src/main/java/com/spectralogic/ds3client/helpers/Ds3ClientHelpers.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import com.spectralogic.ds3client.helpers.options.ReadJobOptions;
2222
import com.spectralogic.ds3client.helpers.options.WriteJobOptions;
2323
import com.spectralogic.ds3client.helpers.pagination.FileSystemKey;
24+
import com.spectralogic.ds3client.helpers.strategy.transferstrategy.CanceledEventObserver;
2425
import com.spectralogic.ds3client.helpers.strategy.transferstrategy.TransferStrategy;
2526
import com.spectralogic.ds3client.models.Contents;
2627
import com.spectralogic.ds3client.models.bulk.Ds3Object;
@@ -127,6 +128,25 @@ void transfer(final ObjectChannelBuilder channelBuilder)
127128
* @throws IOException
128129
*/
129130
void transfer() throws IOException;
131+
132+
/**
133+
* Cancels a transfer in progress in the current job.
134+
* @throws IOException
135+
*/
136+
void cancel() throws IOException;
137+
138+
/**
139+
* Attaches a handler that fires when a job transfer in progress is canceled.
140+
* @param canceledEventObserver An event notification object that carries the job UUID.
141+
* @return An observer instance that can be later used for de-registration.
142+
*/
143+
CanceledEventObserver attachCanceledEventObserver(final CanceledEventObserver canceledEventObserver);
144+
145+
/**
146+
* Remove an event handler previously registered with a call to {@link #attachCanceledEventObserver(CanceledEventObserver)}
147+
* @param canceledEventObserver An instance of {@link CanceledEventObserver} returned from a call to {@link #attachCanceledEventObserver(CanceledEventObserver)}
148+
*/
149+
void removeCanceledEventObserver(final CanceledEventObserver canceledEventObserver);
130150
}
131151

132152
/**

ds3-sdk/src/main/java/com/spectralogic/ds3client/helpers/JobImpl.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,9 @@
1616
package com.spectralogic.ds3client.helpers;
1717

1818
import com.spectralogic.ds3client.helpers.Ds3ClientHelpers.Job;
19+
import com.spectralogic.ds3client.helpers.strategy.transferstrategy.CanceledEventObserver;
1920
import com.spectralogic.ds3client.helpers.strategy.transferstrategy.EventDispatcher;
21+
import com.spectralogic.ds3client.helpers.strategy.transferstrategy.TransferStrategy;
2022
import com.spectralogic.ds3client.helpers.strategy.transferstrategy.TransferStrategyBuilder;
2123

2224
import org.slf4j.Logger;
@@ -130,6 +132,18 @@ public void removeObjectCompletedListener(final ObjectCompletedListener listener
130132
eventDispatcher().removeObjectCompletedListener(listener);
131133
}
132134

135+
@Override
136+
public CanceledEventObserver attachCanceledEventObserver(final CanceledEventObserver canceledEventObserver) {
137+
checkRunning();
138+
return eventDispatcher().attachCanceledEventObserver(canceledEventObserver);
139+
}
140+
141+
@Override
142+
public void removeCanceledEventObserver(final CanceledEventObserver canceledEventObserver) {
143+
checkRunning();
144+
eventDispatcher().removeCanceledEventObserver(canceledEventObserver);
145+
}
146+
133147
@Override
134148
public void transfer(final Ds3ClientHelpers.ObjectChannelBuilder channelBuilder) throws IOException {
135149
transferStrategyBuilder.withChannelBuilder(channelBuilder);
@@ -142,4 +156,10 @@ protected TransferStrategyBuilder transferStrategyBuilder() {
142156
protected EventDispatcher eventDispatcher() {
143157
return transferStrategyBuilder.eventDispatcher();
144158
}
159+
160+
protected void cancel(final TransferStrategy transferStrategy) throws IOException {
161+
if (transferStrategy != null) {
162+
transferStrategy.cancel();
163+
}
164+
}
145165
}

ds3-sdk/src/main/java/com/spectralogic/ds3client/helpers/ReadJobImpl.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,4 +77,9 @@ public void transfer() throws IOException {
7777
running = true;
7878
transferStrategy.transfer();
7979
}
80+
81+
@Override
82+
public void cancel() throws IOException {
83+
cancel(transferStrategy);
84+
}
8085
}

ds3-sdk/src/main/java/com/spectralogic/ds3client/helpers/WriteJobImpl.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,5 +81,10 @@ public void transfer() throws IOException {
8181
running = true;
8282
transferStrategy.transfer();
8383
}
84+
85+
@Override
86+
public void cancel() throws IOException {
87+
cancel(transferStrategy);
88+
}
8489
}
8590

0 commit comments

Comments
 (0)