Skip to content

Commit a5d9240

Browse files
committed
Merging
2 parents 7b0a5f5 + 6fd709c commit a5d9240

File tree

7 files changed

+128
-19
lines changed

7 files changed

+128
-19
lines changed
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
/*
2+
* ******************************************************************************
3+
* Copyright 2014-2015 Spectra Logic Corporation. All Rights Reserved.
4+
* Licensed under the Apache License, Version 2.0 (the "License"). You may not use
5+
* this file except in compliance with the License. A copy of the License is located at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* or in the "license" file accompanying this file.
10+
* This file is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
11+
* CONDITIONS OF ANY KIND, either express or implied. See the License for the
12+
* specific language governing permissions and limitations under the License.
13+
* ****************************************************************************
14+
*/
15+
16+
package com.spectralogic.ds3client.exceptions;
17+
18+
import java.io.IOException;
19+
20+
public class Ds3NoMoreRetriesException extends IOException {
21+
22+
public Ds3NoMoreRetriesException (final int retryAfter) {
23+
super (String.format("Reached the maximum number of retries (%d)", retryAfter));
24+
}
25+
}

ds3-sdk/src/main/java/com/spectralogic/ds3client/helpers/Ds3ClientHelpers.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,10 @@ public static Ds3ClientHelpers wrap(final Ds3Client client) {
8282
return new Ds3ClientHelpersImpl(client);
8383
}
8484

85+
public static Ds3ClientHelpers wrap(final Ds3Client client, final int retryAfter) {
86+
return new Ds3ClientHelpersImpl(client, retryAfter);
87+
}
88+
8589
/**
8690
* Performs a bulk put job creation request and returns an {@link WriteJob}.
8791
* See {@link WriteJob} for information on how to write the objects for the job.

ds3-sdk/src/main/java/com/spectralogic/ds3client/helpers/Ds3ClientHelpersImpl.java

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -40,9 +40,15 @@ class Ds3ClientHelpersImpl extends Ds3ClientHelpers {
4040

4141
private static final int DEFAULT_MAX_KEYS = 1000;
4242
private final Ds3Client client;
43+
private final int retryAfter;
4344

44-
Ds3ClientHelpersImpl(final Ds3Client client) {
45+
public Ds3ClientHelpersImpl(final Ds3Client client) {
46+
this(client, -1);
47+
}
48+
49+
public Ds3ClientHelpersImpl(final Ds3Client client, final int retryAfter) {
4550
this.client = client;
51+
this.retryAfter = retryAfter;
4652
}
4753

4854
@Override
@@ -70,7 +76,7 @@ private Ds3ClientHelpers.Job innerStartWriteJob(final String bucket,
7076
.withPriority(options.getPriority())
7177
.withWriteOptimization(options.getWriteOptimization())
7278
.withMaxUploadSize(options.getMaxUploadSize()));
73-
return new WriteJobImpl(this.client, prime.getResult());
79+
return new WriteJobImpl(this.client, prime.getResult(), this.retryAfter);
7480
}
7581

7682
@Override
@@ -97,7 +103,7 @@ private Ds3ClientHelpers.Job innerStartReadJob(final String bucket, final Iterab
97103

98104
final ImmutableMultimap<String, Range> partialRanges = PartialObjectHelpers.getPartialObjectsRanges(objects);
99105

100-
return new ReadJobImpl(this.client, prime.getResult(), partialRanges);
106+
return new ReadJobImpl(this.client, prime.getResult(), partialRanges, this.retryAfter);
101107
}
102108

103109
@Override
@@ -134,7 +140,7 @@ public Ds3ClientHelpers.Job recoverWriteJob(final UUID jobId) throws SignatureEx
134140
throw new JobRecoveryException(RequestType.PUT.toString(), jobResponse.getMasterObjectList().getRequestType().toString() );
135141
}
136142

137-
return new WriteJobImpl(this.client, jobResponse.getMasterObjectList());
143+
return new WriteJobImpl(this.client, jobResponse.getMasterObjectList(), this.retryAfter);
138144
}
139145

140146
@Override
@@ -145,7 +151,7 @@ public Ds3ClientHelpers.Job recoverReadJob(final UUID jobId) throws SignatureExc
145151
throw new JobRecoveryException(RequestType.GET.toString(), jobResponse.getMasterObjectList().getRequestType().toString() );
146152
}
147153

148-
return new ReadJobImpl(this.client, jobResponse.getMasterObjectList(), ImmutableMultimap.<String, Range>of());
154+
return new ReadJobImpl(this.client, jobResponse.getMasterObjectList(), ImmutableMultimap.<String, Range>of(), this.retryAfter);
149155
}
150156

151157
@Override

ds3-sdk/src/main/java/com/spectralogic/ds3client/helpers/ReadJobImpl.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import com.spectralogic.ds3client.commands.GetAvailableJobChunksRequest;
2424
import com.spectralogic.ds3client.commands.GetAvailableJobChunksResponse;
2525
import com.spectralogic.ds3client.commands.GetObjectRequest;
26+
import com.spectralogic.ds3client.exceptions.Ds3NoMoreRetriesException;
2627
import com.spectralogic.ds3client.helpers.ChunkTransferrer.ItemTransferrer;
2728
import com.spectralogic.ds3client.helpers.Ds3ClientHelpers.ObjectChannelBuilder;
2829
import com.spectralogic.ds3client.helpers.util.PartialObjectHelpers;
@@ -42,14 +43,17 @@ class ReadJobImpl extends JobImpl {
4243
private final JobPartTracker partTracker;
4344
private final List<Objects> chunks;
4445
private final ImmutableMap<String, ImmutableMultimap<BulkObject, Range>> blobToRanges;
46+
private final int retryAfter; // Negative retryAfter value represent infinity retries
47+
private int retryAfterLeft; // The number of retries left
4548

46-
public ReadJobImpl(final Ds3Client client, final MasterObjectList masterObjectList, final ImmutableMultimap<String, Range> objectRanges) {
49+
public ReadJobImpl(final Ds3Client client, final MasterObjectList masterObjectList, final ImmutableMultimap<String, Range> objectRanges, final int retryAfter) {
4750
super(client, masterObjectList);
4851

4952
this.chunks = this.masterObjectList.getObjects();
5053
this.partTracker = JobPartTrackerFactory
5154
.buildPartTracker(Iterables.concat(chunks));
5255
this.blobToRanges = PartialObjectHelpers.mapRangesToBlob(masterObjectList.getObjects(), objectRanges);
56+
this.retryAfter = this.retryAfterLeft = retryAfter;
5357
}
5458

5559
@Override
@@ -100,9 +104,15 @@ private void transferNextChunks(final ChunkTransferrer chunkTransferrer)
100104
case AVAILABLE: {
101105
final MasterObjectList availableMol = availableJobChunks.getMasterObjectList();
102106
chunkTransferrer.transferChunks(availableMol.getNodes(), availableMol.getObjects());
107+
retryAfterLeft = retryAfter; // Reset the number of retries to the initial value
103108
break;
104109
}
105110
case RETRYLATER: {
111+
if (retryAfterLeft == 0) {
112+
throw new Ds3NoMoreRetriesException(this.retryAfter);
113+
}
114+
retryAfterLeft--;
115+
106116
Thread.sleep(availableJobChunks.getRetryAfterSeconds() * 1000);
107117
break;
108118
}

ds3-sdk/src/main/java/com/spectralogic/ds3client/helpers/WriteJobImpl.java

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import com.spectralogic.ds3client.commands.AllocateJobChunkRequest;
2323
import com.spectralogic.ds3client.commands.AllocateJobChunkResponse;
2424
import com.spectralogic.ds3client.commands.PutObjectRequest;
25+
import com.spectralogic.ds3client.exceptions.Ds3NoMoreRetriesException;
2526
import com.spectralogic.ds3client.helpers.ChunkTransferrer.ItemTransferrer;
2627
import com.spectralogic.ds3client.helpers.Ds3ClientHelpers.ObjectChannelBuilder;
2728
import com.spectralogic.ds3client.models.Range;
@@ -42,10 +43,10 @@ class WriteJobImpl extends JobImpl {
4243
static private final Logger LOG = LoggerFactory.getLogger(WriteJobImpl.class);
4344
private final JobPartTracker partTracker;
4445
private final List<Objects> filteredChunks;
46+
private final int retryAfter; // Negative retryAfter value represent infinity retries
47+
private int retryAfterLeft; // The number of retries left
4548

46-
public WriteJobImpl(
47-
final Ds3Client client,
48-
final MasterObjectList masterObjectList) {
49+
public WriteJobImpl(final Ds3Client client, final MasterObjectList masterObjectList, final int retryAfter) {
4950
super(client, masterObjectList);
5051
if (this.masterObjectList == null || this.masterObjectList.getObjects() == null) {
5152
LOG.info("Job has no data to transfer");
@@ -57,7 +58,7 @@ public WriteJobImpl(
5758
this.partTracker = JobPartTrackerFactory
5859
.buildPartTracker(Iterables.concat(filteredChunks));
5960
}
60-
61+
this.retryAfter = this.retryAfterLeft = retryAfter;
6162
}
6263

6364
@Override
@@ -123,9 +124,15 @@ private Objects tryAllocateChunk(final Objects filtered) throws IOException, Sig
123124
LOG.info("AllocatedJobChunkResponse status: " + response.getStatus().toString());
124125
switch (response.getStatus()) {
125126
case ALLOCATED:
127+
retryAfterLeft = retryAfter; // Reset the number of retries to the initial value
126128
return response.getObjects();
127129
case RETRYLATER:
128130
try {
131+
if (retryAfterLeft == 0) {
132+
throw new Ds3NoMoreRetriesException(this.retryAfter);
133+
}
134+
retryAfterLeft--;
135+
129136
final int retryAfter = response.getRetryAfterSeconds() * 1000;
130137
LOG.debug("Will retry allocate chunk call after " + retryAfter + " seconds");
131138
Thread.sleep(retryAfter);

ds3-sdk/src/test/java/com/spectralogic/ds3client/helpers/Ds3ClientHelpers_Test.java

Lines changed: 54 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import com.google.common.collect.Lists;
2020
import com.spectralogic.ds3client.Ds3Client;
2121
import com.spectralogic.ds3client.commands.*;
22+
import com.spectralogic.ds3client.exceptions.Ds3NoMoreRetriesException;
2223
import com.spectralogic.ds3client.helpers.Ds3ClientHelpers.Job;
2324
import com.spectralogic.ds3client.helpers.Ds3ClientHelpers.ObjectChannelBuilder;
2425
import com.spectralogic.ds3client.models.Contents;
@@ -29,7 +30,6 @@
2930
import com.spectralogic.ds3client.networking.ConnectionDetails;
3031
import com.spectralogic.ds3client.serializer.XmlProcessingException;
3132
import com.spectralogic.ds3client.utils.ByteArraySeekableByteChannel;
32-
3333
import org.junit.Assert;
3434
import org.junit.Test;
3535
import org.mockito.Mockito;
@@ -45,6 +45,7 @@
4545
import static org.hamcrest.Matchers.*;
4646
import static org.junit.Assert.*;
4747
import static org.mockito.Mockito.mock;
48+
import static org.mockito.Mockito.when;
4849

4950
public class Ds3ClientHelpers_Test {
5051
private static final String MYBUCKET = "mybucket";
@@ -556,4 +557,56 @@ public void testStripLeadingPath() {
556557
assertEquals(Ds3ClientHelpers.stripLeadingPath("bar", "foo/"), "bar");
557558
}
558559

560+
@Test(expected = Ds3NoMoreRetriesException.class)
561+
public void testWriteObjectsWithRetryAfter() throws SignatureException, IOException, XmlProcessingException {
562+
final Ds3Client ds3Client = buildDs3ClientForBulk();
563+
564+
final BulkPutResponse bulkPutResponse = buildBulkPutResponse();
565+
Mockito.when(ds3Client.bulkPut(Mockito.any(BulkPutRequest.class))).thenReturn(bulkPutResponse);
566+
567+
final AllocateJobChunkResponse allocateResponse1 = buildAllocateResponse1();
568+
Mockito.when(ds3Client.allocateJobChunk(hasChunkId(CHUNK_ID_1)))
569+
.thenReturn(allocateResponse1);
570+
571+
final PutObjectResponse response = mock(PutObjectResponse.class);
572+
Mockito.when(ds3Client.putObject(putRequestHas(MYBUCKET, "foo", jobId, 0, "foo co"))).thenReturn(response);
573+
574+
final Job job = Ds3ClientHelpers.wrap(ds3Client, 1).startWriteJob(MYBUCKET, Lists.newArrayList(
575+
new Ds3Object("foo", 12)
576+
));
577+
578+
job.transfer(new ObjectChannelBuilder() {
579+
@Override
580+
public SeekableByteChannel buildChannel(final String key) throws IOException {
581+
// We don't care about the contents since we just want to know that the exception handling works correctly.
582+
return new ByteArraySeekableByteChannel();
583+
}
584+
});
585+
}
586+
587+
@Test(expected = Ds3NoMoreRetriesException.class)
588+
public void testReadObjectsWithRetryAfter() throws SignatureException, IOException, XmlProcessingException {
589+
final Ds3Client ds3Client = mock(Ds3Client.class);
590+
591+
final BulkGetResponse buildBulkGetResponse = buildBulkGetResponse();
592+
Mockito.when(ds3Client.bulkGet(hasChunkOrdering(ChunkClientProcessingOrderGuarantee.NONE))).thenReturn(buildBulkGetResponse);
593+
594+
final GetAvailableJobChunksResponse jobChunksResponse = mock(GetAvailableJobChunksResponse.class);
595+
when(jobChunksResponse.getStatus()).thenReturn(GetAvailableJobChunksResponse.Status.RETRYLATER);
596+
597+
Mockito.when(ds3Client.getAvailableJobChunks(hasJobId(jobId))).thenReturn(jobChunksResponse);
598+
599+
final Job job = Ds3ClientHelpers.wrap(ds3Client, 1).startReadJob(MYBUCKET, Lists.newArrayList(
600+
new Ds3Object("foo")
601+
));
602+
603+
job.transfer(new ObjectChannelBuilder() {
604+
@Override
605+
public SeekableByteChannel buildChannel(final String key) throws IOException {
606+
// We don't care about the contents since we just want to know that the exception handling works correctly.
607+
return new ByteArraySeekableByteChannel();
608+
}
609+
});
610+
611+
}
559612
}

ds3-sdk/src/test/java/com/spectralogic/ds3client/helpers/FileObjectPutter_Test.java

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -10,17 +10,21 @@
1010
import java.nio.file.Path;
1111
import java.nio.file.StandardOpenOption;
1212

13-
import static org.hamcrest.CoreMatchers.is;
14-
import static org.hamcrest.CoreMatchers.notNullValue;
13+
import static org.hamcrest.CoreMatchers.*;
1514
import static org.junit.Assert.assertThat;
15+
import static org.junit.Assume.assumeThat;
1616

1717
public class FileObjectPutter_Test {
1818

1919
final private static String testString = "This is some test data.";
2020
final private static byte[] testData = testString.getBytes(Charset.forName("UTF-8"));
2121

22+
/**
23+
* This test cannot run on Windows without extra privileges
24+
*/
2225
@Test
2326
public void testSymlink() throws IOException {
27+
assumeThat(System.getProperty("os.name"), not(containsString("Windows")));
2428
final Path tempDir = Files.createTempDirectory("ds3_file_object_putter_");
2529
final Path tempPath = Files.createTempFile(tempDir, "temp_", ".txt");
2630

@@ -62,14 +66,14 @@ public void testRegularFile() throws IOException {
6266
}
6367
final FileObjectPutter putter = new FileObjectPutter(tempDir);
6468

65-
final SeekableByteChannel newChannel = putter.buildChannel(tempPath.getFileName().toString());
66-
assertThat(newChannel, is(notNullValue()));
69+
try (final SeekableByteChannel newChannel = putter.buildChannel(tempPath.getFileName().toString())) {
70+
assertThat(newChannel, is(notNullValue()));
6771

68-
final ByteBuffer buff = ByteBuffer.allocate(testData.length);
69-
assertThat(newChannel.read(buff), is(testData.length));
70-
71-
assertThat(new String(buff.array(), Charset.forName("UTF-8")), is(testString));
72+
final ByteBuffer buff = ByteBuffer.allocate(testData.length);
73+
assertThat(newChannel.read(buff), is(testData.length));
7274

75+
assertThat(new String(buff.array(), Charset.forName("UTF-8")), is(testString));
76+
}
7377
} finally {
7478
Files.deleteIfExists(tempPath);
7579
Files.deleteIfExists(tempDir);

0 commit comments

Comments
 (0)