Skip to content

Commit d9f524a

Browse files
committed
Merge pull request #142 from shabtaisharon/transfer_speeds
Add instrumentation to measure transfer speeds and print it to the log
2 parents 7e30f23 + 9354f38 commit d9f524a

File tree

10 files changed

+170
-57
lines changed

10 files changed

+170
-57
lines changed

ds3-sdk-samples/src/main/java/com/spectralogic/ds3client/samples/BulkPutExample.java

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
import com.spectralogic.ds3client.Ds3ClientBuilder;
2020
import com.spectralogic.ds3client.helpers.Ds3ClientHelpers;
2121
import com.spectralogic.ds3client.helpers.FileObjectPutter;
22-
import com.spectralogic.ds3client.models.Credentials;
2322
import com.spectralogic.ds3client.models.bulk.Ds3Object;
2423
import com.spectralogic.ds3client.serializer.XmlProcessingException;
2524

@@ -32,10 +31,7 @@ public class BulkPutExample {
3231

3332
public static void main(final String args[]) throws IOException, SignatureException, XmlProcessingException {
3433

35-
try (final Ds3Client client = Ds3ClientBuilder.create("endpoint:8080",
36-
new Credentials("accessId", "secretKey"))
37-
.withHttps(false)
38-
.build()) {
34+
try (final Ds3Client client = Ds3ClientBuilder.fromEnv().withHttps(false).build()) {
3935

4036
// Wrap the Ds3Client with the helper functions
4137
final Ds3ClientHelpers helper = Ds3ClientHelpers.wrap(client);

ds3-sdk-samples/src/main/java/com/spectralogic/ds3client/samples/Ds3BulkGetExample.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ public static void main(final String args[]) throws IOException, SignatureExcept
3939
// Get a client builder and then build a client instance. This is the main entry point to the SDK.
4040
try (final Ds3Client client = Ds3ClientBuilder.fromEnv().withHttps(false).build()) {
4141

42-
final String bucket = "bucketName"; //The bucket we are interested in getting objects from.
42+
final String bucket = "my_bucket"; //The bucket we are interested in getting objects from.
4343

4444
// Get the list of objects from the bucket that you want to perform the bulk get with.
4545
final GetBucketResponse response = client.getBucket(new GetBucketRequest(bucket));

ds3-sdk/src/main/java/com/spectralogic/ds3client/Ds3ClientImpl.java

Lines changed: 22 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -15,16 +15,15 @@
1515

1616
package com.spectralogic.ds3client;
1717

18-
import java.io.IOException;
19-
import java.security.SignatureException;
20-
21-
2218
import com.spectralogic.ds3client.commands.*;
2319
import com.spectralogic.ds3client.commands.notifications.*;
2420
import com.spectralogic.ds3client.models.bulk.Node;
2521
import com.spectralogic.ds3client.networking.ConnectionDetails;
2622
import com.spectralogic.ds3client.networking.NetworkClient;
2723

24+
import java.io.IOException;
25+
import java.security.SignatureException;
26+
2827
public class Ds3ClientImpl implements Ds3Client {
2928
private final NetworkClient netClient;
3029

@@ -86,7 +85,8 @@ public GetObjectResponse getObject(final GetObjectRequest request) throws IOExce
8685
return new GetObjectResponse(
8786
this.netClient.getResponse(request),
8887
request.getDestinationChannel(),
89-
this.netClient.getConnectionDetails().getBufferSize()
88+
this.netClient.getConnectionDetails().getBufferSize(),
89+
request.getObjectName()
9090
);
9191
}
9292

@@ -176,7 +176,7 @@ public NotificationResponse createObjectCachedNotification(final CreateObjectCac
176176
}
177177

178178
@Override
179-
public NotificationResponse getObjectCachedNotification(GetObjectCachedNotificationRequest request) throws IOException, SignatureException {
179+
public NotificationResponse getObjectCachedNotification(final GetObjectCachedNotificationRequest request) throws IOException, SignatureException {
180180
return new NotificationResponse(this.netClient.getResponse(request));
181181
}
182182

@@ -191,7 +191,7 @@ public NotificationResponse createJobCompletedNotification(final CreateJobComple
191191
}
192192

193193
@Override
194-
public NotificationResponse getJobCompletedNotification(GetJobCompletedNotificationRequest request) throws IOException, SignatureException {
194+
public NotificationResponse getJobCompletedNotification(final GetJobCompletedNotificationRequest request) throws IOException, SignatureException {
195195
return new NotificationResponse(this.netClient.getResponse(request));
196196
}
197197

@@ -201,77 +201,77 @@ public DeleteNotificationResponse deleteJobCompleteNotification(final DeleteJobC
201201
}
202202

203203
@Override
204-
public NotificationResponse createJobCreatedNotification(CreateJobCreatedNotificationRequest request) throws IOException, SignatureException {
204+
public NotificationResponse createJobCreatedNotification(final CreateJobCreatedNotificationRequest request) throws IOException, SignatureException {
205205
return new NotificationResponse(this.netClient.getResponse(request));
206206
}
207207

208208
@Override
209-
public NotificationResponse getJobCreatedNotification(GetJobCreatedNotificationRequest request) throws IOException, SignatureException {
209+
public NotificationResponse getJobCreatedNotification(final GetJobCreatedNotificationRequest request) throws IOException, SignatureException {
210210
return new NotificationResponse(this.netClient.getResponse(request));
211211
}
212212

213213
@Override
214-
public DeleteNotificationResponse deleteJobCreatedNotification(DeleteJobCreatedNotificationRequest request) throws IOException, SignatureException {
214+
public DeleteNotificationResponse deleteJobCreatedNotification(final DeleteJobCreatedNotificationRequest request) throws IOException, SignatureException {
215215
return new DeleteNotificationResponse(this.netClient.getResponse(request));
216216
}
217217

218218
@Override
219-
public NotificationResponse createObjectLostNotification(CreateObjectLostNotificationRequest request) throws IOException, SignatureException {
219+
public NotificationResponse createObjectLostNotification(final CreateObjectLostNotificationRequest request) throws IOException, SignatureException {
220220
return new NotificationResponse(this.netClient.getResponse(request));
221221
}
222222

223223
@Override
224-
public NotificationResponse getObjectLostNotification(GetObjectLostNotificationRequest request) throws IOException, SignatureException {
224+
public NotificationResponse getObjectLostNotification(final GetObjectLostNotificationRequest request) throws IOException, SignatureException {
225225
return new NotificationResponse(this.netClient.getResponse(request));
226226
}
227227

228228
@Override
229-
public DeleteNotificationResponse deleteObjectLostNotification(DeleteObjectLostNotificationRequest request) throws IOException, SignatureException {
229+
public DeleteNotificationResponse deleteObjectLostNotification(final DeleteObjectLostNotificationRequest request) throws IOException, SignatureException {
230230
return new DeleteNotificationResponse(this.netClient.getResponse(request));
231231
}
232232

233233
@Override
234-
public NotificationResponse createObjectPersistedNotification(CreateObjectPersistedNotificationRequest request) throws IOException, SignatureException {
234+
public NotificationResponse createObjectPersistedNotification(final CreateObjectPersistedNotificationRequest request) throws IOException, SignatureException {
235235
return new NotificationResponse(this.netClient.getResponse(request));
236236
}
237237

238238
@Override
239-
public NotificationResponse getObjectPersistedNotification(GetObjectPersistedNotificationRequest request) throws IOException, SignatureException {
239+
public NotificationResponse getObjectPersistedNotification(final GetObjectPersistedNotificationRequest request) throws IOException, SignatureException {
240240
return new NotificationResponse(this.netClient.getResponse(request));
241241
}
242242

243243
@Override
244-
public DeleteNotificationResponse deleteObjectPersistedNotification(DeleteObjectPersistedNotificationRequest request) throws IOException, SignatureException {
244+
public DeleteNotificationResponse deleteObjectPersistedNotification(final DeleteObjectPersistedNotificationRequest request) throws IOException, SignatureException {
245245
return new DeleteNotificationResponse(this.netClient.getResponse(request));
246246
}
247247

248248
@Override
249-
public NotificationResponse createPartitionFailureNotification(CreatePartitionFailureNotificationRequest request) throws IOException, SignatureException {
249+
public NotificationResponse createPartitionFailureNotification(final CreatePartitionFailureNotificationRequest request) throws IOException, SignatureException {
250250
return new NotificationResponse(this.netClient.getResponse(request));
251251
}
252252

253253
@Override
254-
public NotificationResponse getPartitionFailureNotification(GetPartitionFailureNotificationRequest request) throws IOException, SignatureException {
254+
public NotificationResponse getPartitionFailureNotification(final GetPartitionFailureNotificationRequest request) throws IOException, SignatureException {
255255
return new NotificationResponse(this.netClient.getResponse(request));
256256
}
257257

258258
@Override
259-
public DeleteNotificationResponse deletePartitionFailureNotification(DeletePartitionFailureNotificationRequest request) throws IOException, SignatureException {
259+
public DeleteNotificationResponse deletePartitionFailureNotification(final DeletePartitionFailureNotificationRequest request) throws IOException, SignatureException {
260260
return new DeleteNotificationResponse(this.netClient.getResponse(request));
261261
}
262262

263263
@Override
264-
public NotificationResponse createTapeFailureNotification(CreateTapeFailureNotificationRequest request) throws IOException, SignatureException {
264+
public NotificationResponse createTapeFailureNotification(final CreateTapeFailureNotificationRequest request) throws IOException, SignatureException {
265265
return new NotificationResponse(this.netClient.getResponse(request));
266266
}
267267

268268
@Override
269-
public NotificationResponse getTapeFailureNotification(GetTapeFailureNotificationRequest request) throws IOException, SignatureException {
269+
public NotificationResponse getTapeFailureNotification(final GetTapeFailureNotificationRequest request) throws IOException, SignatureException {
270270
return new NotificationResponse(this.netClient.getResponse(request));
271271
}
272272

273273
@Override
274-
public DeleteNotificationResponse deleteTapeFailureNotification(DeleteTapeFailureNotificationRequest request) throws IOException, SignatureException {
274+
public DeleteNotificationResponse deleteTapeFailureNotification(final DeleteTapeFailureNotificationRequest request) throws IOException, SignatureException {
275275
return new DeleteNotificationResponse(this.netClient.getResponse(request));
276276
}
277277

ds3-sdk/src/main/java/com/spectralogic/ds3client/Ds3InputStreamEntity.java

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,9 @@
1515

1616
package com.spectralogic.ds3client;
1717

18+
import com.spectralogic.ds3client.exceptions.ContentLengthNotMatchException;
1819
import com.spectralogic.ds3client.utils.IOUtils;
20+
import com.spectralogic.ds3client.utils.PerformanceUtils;
1921
import org.apache.http.entity.ContentType;
2022
import org.apache.http.entity.InputStreamEntity;
2123

@@ -26,9 +28,11 @@
2628
public class Ds3InputStreamEntity extends InputStreamEntity {
2729

2830
private int bufferSize = 1024 * 1024;
31+
private final String path;
2932

30-
public Ds3InputStreamEntity(final InputStream inStream, final long length, final ContentType contentType) {
33+
public Ds3InputStreamEntity(final InputStream inStream, final long length, final ContentType contentType, final String path) {
3134
super(inStream, length, contentType);
35+
this.path = path;
3236
}
3337

3438
public void setBufferSize(final int bufferSize) {
@@ -41,6 +45,14 @@ public long getBufferSize() {
4145

4246
@Override
4347
public void writeTo(final OutputStream outStream) throws IOException {
44-
IOUtils.copy(this.getContent(), outStream, bufferSize);
48+
final long startTime = PerformanceUtils.getCurrentTime();
49+
final long totalBytes = IOUtils.copy(this.getContent(), outStream, bufferSize);
50+
final long endTime = PerformanceUtils.getCurrentTime();
51+
52+
if (this.getContentLength() != -1 && totalBytes != this.getContentLength()) {
53+
throw new ContentLengthNotMatchException(path, this.getContentLength(), totalBytes);
54+
}
55+
56+
PerformanceUtils.logMbps(startTime, endTime, totalBytes, path, true);
4557
}
4658
}

ds3-sdk/src/main/java/com/spectralogic/ds3client/NetworkClientImpl.java

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -187,7 +187,7 @@ private HttpRequest buildHttpRequest() throws IOException {
187187
if (this.content != null) {
188188
final BasicHttpEntityEnclosingRequest httpRequest = new BasicHttpEntityEnclosingRequest(verb, path);
189189

190-
final Ds3InputStreamEntity entityStream = new Ds3InputStreamEntity(this.content, this.ds3Request.getSize(), ContentType.create(this.ds3Request.getContentType()));
190+
final Ds3InputStreamEntity entityStream = new Ds3InputStreamEntity(this.content, this.ds3Request.getSize(), ContentType.create(this.ds3Request.getContentType()), this.ds3Request.getPath());
191191
entityStream.setBufferSize(NetworkClientImpl.this.connectionDetails.getBufferSize());
192192
httpRequest.setEntity(entityStream);
193193
return httpRequest;
@@ -260,21 +260,21 @@ private String canonicalizeResource(final String path, final Map<String, String>
260260
return canonicalizedResource.toString();
261261
}
262262

263-
private String canonicalizeAmzHeaders(
264-
final Multimap<String, String> customHeaders) {
265-
StringBuilder ret = new StringBuilder();
266-
for (final Map.Entry<String, Collection<String>> header : customHeaders
267-
.asMap().entrySet()) {
268-
final String key = header.getKey().toLowerCase();
269-
if (key.startsWith(PutObjectRequest.AMZ_META_HEADER)
270-
&& header.getValue().size() > 0) {
271-
ret.append(key).append(":");
272-
ret.append(Joiner.on(",").join(header.getValue()));
273-
ret.append('\n');
274-
}
275-
}
276-
return ret.toString();
277-
}
263+
private String canonicalizeAmzHeaders(
264+
final Multimap<String, String> customHeaders) {
265+
final StringBuilder ret = new StringBuilder();
266+
for (final Map.Entry<String, Collection<String>> header : customHeaders
267+
.asMap().entrySet()) {
268+
final String key = header.getKey().toLowerCase();
269+
if (key.startsWith(PutObjectRequest.AMZ_META_HEADER)
270+
&& header.getValue().size() > 0) {
271+
ret.append(key).append(":");
272+
ret.append(Joiner.on(",").join(header.getValue()));
273+
ret.append('\n');
274+
}
275+
}
276+
return ret.toString();
277+
}
278278

279279
private String buildHash() throws IOException {
280280
return this.ds3Request.getChecksum().match(new HashGeneratingMatchHandler(this.content, this.checksumType));

ds3-sdk/src/main/java/com/spectralogic/ds3client/commands/GetObjectResponse.java

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,11 @@
1515

1616
package com.spectralogic.ds3client.commands;
1717

18+
import com.spectralogic.ds3client.exceptions.ContentLengthNotMatchException;
1819
import com.spectralogic.ds3client.networking.Metadata;
1920
import com.spectralogic.ds3client.networking.WebResponse;
2021
import com.spectralogic.ds3client.utils.IOUtils;
22+
import com.spectralogic.ds3client.utils.PerformanceUtils;
2123

2224
import java.io.IOException;
2325
import java.io.InputStream;
@@ -26,9 +28,9 @@
2628
public class GetObjectResponse extends AbstractResponse {
2729
private Metadata metadata;
2830
private long objectSize;
29-
public GetObjectResponse(final WebResponse response, final WritableByteChannel destinationChannel, final int bufferSize) throws IOException {
31+
public GetObjectResponse(final WebResponse response, final WritableByteChannel destinationChannel, final int bufferSize, final String objName) throws IOException {
3032
super(response);
31-
download(destinationChannel, bufferSize);
33+
download(destinationChannel, bufferSize, objName);
3234
}
3335

3436
public Metadata getMetadata() {
@@ -42,12 +44,20 @@ protected void processResponse() throws IOException {
4244
this.objectSize = getSizeFromHeaders(this.getResponse().getHeaders());
4345
}
4446

45-
protected void download(final WritableByteChannel destinationChannel, final int bufferSize) throws IOException {
47+
protected void download(final WritableByteChannel destinationChannel, final int bufferSize, final String objName) throws IOException {
4648
try (
4749
final WebResponse response = this.getResponse();
4850
final InputStream responseStream = response.getResponseStream()) {
49-
IOUtils.copy(responseStream, destinationChannel, bufferSize);
51+
final long startTime = PerformanceUtils.getCurrentTime();
52+
final long totalBytes = IOUtils.copy(responseStream, destinationChannel, bufferSize);
5053
destinationChannel.close();
54+
final long endTime = PerformanceUtils.getCurrentTime();
55+
56+
if (this.objectSize != -1 && totalBytes != this.objectSize) {
57+
throw new ContentLengthNotMatchException(objName, objectSize, totalBytes);
58+
}
59+
60+
PerformanceUtils.logMbps(startTime, endTime, totalBytes, objName, false);
5161
}
5262
}
5363

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
/*
2+
* ******************************************************************************
3+
* Copyright 2014-2015 Spectra Logic Corporation. All Rights Reserved.
4+
* Licensed under the Apache License, Version 2.0 (the "License"). You may not use
5+
* this file except in compliance with the License. A copy of the License is located at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* or in the "license" file accompanying this file.
10+
* This file is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
11+
* CONDITIONS OF ANY KIND, either express or implied. See the License for the
12+
* specific language governing permissions and limitations under the License.
13+
* ****************************************************************************
14+
*/
15+
16+
package com.spectralogic.ds3client.exceptions;
17+
18+
import java.io.IOException;
19+
20+
public class ContentLengthNotMatchException extends IOException {
21+
private final String fileName;
22+
private final long contentLenght;
23+
private final long totalBytes;
24+
public ContentLengthNotMatchException(final String fileName, final long contentLenght, final long totalBytes) {
25+
super(String.format("The Content length for %s (%d) not match the number of byte read (%d)", fileName, contentLenght, totalBytes));
26+
27+
this.fileName = fileName;
28+
this.contentLenght = contentLenght;
29+
this.totalBytes = totalBytes;
30+
}
31+
32+
public String getFileName() {
33+
return this.fileName;
34+
}
35+
36+
public long getContentLenght() {
37+
return this.contentLenght;
38+
}
39+
40+
public long getTotalBytes() {
41+
return this.totalBytes;
42+
}
43+
}

ds3-sdk/src/main/java/com/spectralogic/ds3client/utils/IOUtils.java

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,31 +22,41 @@
2222
import java.nio.channels.WritableByteChannel;
2323

2424
public class IOUtils {
25-
public static void copy(
25+
26+
public static long copy(
2627
final InputStream inputStream,
2728
final OutputStream outputStream,
2829
final int bufferSize)
2930
throws IOException {
3031
final byte[] buffer = new byte[bufferSize];
3132
int len;
33+
long totalBytes = 0;
34+
3235
while ((len = inputStream.read(buffer)) != -1) {
36+
totalBytes += len;
3337
outputStream.write(buffer, 0, len);
3438
}
39+
40+
return totalBytes;
3541
}
3642

37-
public static void copy(
43+
public static long copy(
3844
final InputStream inputStream,
3945
final WritableByteChannel writableByteChannel,
4046
final int bufferSize)
4147
throws IOException {
4248
final byte[] buffer = new byte[bufferSize];
4349
final ByteBuffer byteBuffer = ByteBuffer.wrap(buffer);
4450
int len;
51+
long totalBytes = 0;
52+
4553
while ((len = inputStream.read(buffer)) != -1) {
54+
totalBytes += len;
4655
byteBuffer.position(0);
4756
byteBuffer.limit(len);
4857
writableByteChannel.write(byteBuffer);
4958
}
59+
return totalBytes;
5060
}
5161
}
5262

0 commit comments

Comments
 (0)