Skip to content

Commit 00d58af

Browse files
authored
Merge pull request #40 from umnmsi/checksum
Add support for checksums
2 parents cba9dc8 + e95065f commit 00d58af

File tree

2 files changed

+115
-4
lines changed

2 files changed

+115
-4
lines changed

ds3/ds3network.py

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -202,10 +202,14 @@ def send_request(self, request):
202202

203203
# add additonal header information if specficied in the request. This might be a byte range for example
204204
amz_headers = {}
205+
checksum = ''
205206
for key, value in request.headers.items():
206207
if key == 'Content-Length':
207208
# Add to headers, but not to amz-headers
208209
headers[key] = value
210+
elif key in ['Content-CRC32', 'Content-CRC32C', 'Content-MD5', 'Content-SHA256', 'Content-SHA512']:
211+
headers[key] = value
212+
checksum = value
209213
elif not key.startswith('x-amz-meta-'):
210214
amz_headers['x-amz-meta-' + key] = self.canonicalize_header_value(value)
211215
else:
@@ -217,6 +221,7 @@ def send_request(self, request):
217221
canonicalized_amz_header = self.canonicalized_amz_headers(amz_headers)
218222
headers['Content-Type'] = 'application/octet-stream'
219223
headers['Authorization'] = self.build_authorization(verb=request.http_verb,
224+
checksum=checksum,
220225
date=date,
221226
content_type='application/octet-stream',
222227
canonicalized_amz_header=canonicalized_amz_header,
@@ -262,18 +267,18 @@ def canonicalize_path(self, request_path, query_params):
262267
path += '=' + str(query_params['uploads'])
263268
return path
264269

265-
def build_authorization(self, verb='', date='', content_type='', resource='', canonicalized_amz_header=''):
270+
def build_authorization(self, verb='', checksum='', date='', content_type='', resource='', canonicalized_amz_header=''):
266271
###Build the S3 authorization###
267-
signature = self.aws_signature(self.credentials.key, verb=verb, content_type=content_type,
272+
signature = self.aws_signature(self.credentials.key, verb=verb, checksum=checksum, content_type=content_type,
268273
date=date, canonicalized_amz_header=canonicalized_amz_header,
269274
canonicalized_resource=resource)
270275
return 'AWS ' + self.credentials.accessId + ':' + signature
271276

272-
def aws_signature(self, key, verb='GET', md5='', content_type='', date='', canonicalized_amz_header='',
277+
def aws_signature(self, key, verb='GET', checksum='', content_type='', date='', canonicalized_amz_header='',
273278
canonicalized_resource=''):
274279
###compute and sign S3 signature###
275280
signature_string = verb + '\n'
276-
signature_string += md5 + '\n'
281+
signature_string += checksum + '\n'
277282
signature_string += content_type + '\n'
278283
signature_string += date + '\n'
279284
signature_string += canonicalized_amz_header

samples/puttingDataWithCRC32.py

Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
# Copyright 2014-2017 Spectra Logic Corporation. All Rights Reserved.
2+
# Licensed under the Apache License, Version 2.0 (the "License"). You may not use
3+
# this file except in compliance with the License. A copy of the License is located at
4+
#
5+
# http://www.apache.org/licenses/LICENSE-2.0
6+
#
7+
# or in the "license" file accompanying this file.
8+
# This file is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
9+
# CONDITIONS OF ANY KIND, either express or implied. See the License for the
10+
# specific language governing permissions and limitations under the License.
11+
12+
from ds3 import ds3
13+
import os
14+
import time
15+
import sys
16+
import base64
17+
import zlib
18+
19+
client = ds3.createClientFromEnv()
20+
21+
bucketName = "books"
22+
23+
# make sure the bucket that we will be sending objects to exists
24+
client.put_bucket(ds3.PutBucketRequest(bucketName))
25+
26+
# create your list of objects that will be sent to DS3
27+
# this example assumes that these files exist on the file system
28+
29+
fileList = ["beowulf.txt", "sherlock_holmes.txt", "tale_of_two_cities.txt", "ulysses.txt"]
30+
31+
# this method is used to map a file path to a Ds3PutObject
32+
def fileNameToDs3PutObject(filePath, prefix=""):
33+
size = os.stat(pathForResource(filePath)).st_size
34+
return ds3.Ds3PutObject(prefix + filePath, size)
35+
36+
# this method is used to get the os specific path for an object located in the resources folder
37+
def pathForResource(resourceName):
38+
currentPath = os.path.dirname(str(__file__))
39+
return os.path.join(currentPath, "resources", resourceName)
40+
41+
# get the sizes for each file
42+
fileList = list(map(fileNameToDs3PutObject, fileList))
43+
44+
# submit the put bulk request to DS3
45+
bulkResult = client.put_bulk_job_spectra_s3(ds3.PutBulkJobSpectraS3Request(bucketName, fileList))
46+
47+
# the bulk request will split the files over several chunks if it needs to.
48+
# we then need to ask what chunks we can send, and then send them making
49+
# sure we don't resend the same chunks
50+
51+
# create a set of the chunk ids which will be used to track
52+
# what chunks have not been sent
53+
chunkIds = set([x['ChunkId'] for x in bulkResult.result['ObjectsList']])
54+
55+
# while we still have chunks to send
56+
while len(chunkIds) > 0:
57+
# get a list of the available chunks that we can send
58+
availableChunks = client.get_job_chunks_ready_for_client_processing_spectra_s3(
59+
ds3.GetJobChunksReadyForClientProcessingSpectraS3Request(bulkResult.result['JobId']))
60+
61+
chunks = availableChunks.result['ObjectsList']
62+
63+
# check to make sure we got some chunks, if we did not
64+
# sleep and retry. This could mean that the cache is full
65+
if len(chunks) == 0:
66+
time.sleep(availableChunks.retryAfter)
67+
continue
68+
69+
# for each chunk that is available, check to make sure
70+
# we have not sent it, and if not, send that object
71+
for chunk in chunks:
72+
if not chunk['ChunkId'] in chunkIds:
73+
continue
74+
75+
chunkIds.remove(chunk['ChunkId'])
76+
for obj in chunk['ObjectList']:
77+
# it is possible that if we start resending a chunk, due to the program crashing, that
78+
# some objects will already be in cache. Check to make sure that they are not, and then
79+
# send the object to Spectra S3
80+
if obj['InCache'] == 'false':
81+
localFileName = "resources/" + obj['Name']
82+
objectDataStream = open(localFileName, "rb")
83+
objectDataStream.seek(int(obj['Offset']), 0)
84+
objectChunk = objectDataStream.read(int(obj['Length']))
85+
checksum = zlib.crc32(objectChunk)
86+
encodedChecksum = base64.b64encode(checksum.to_bytes((checksum.bit_length() + 7) // 8, byteorder='big')).decode()
87+
objectDataStream.seek(int(obj['Offset']), 0)
88+
client.put_object(ds3.PutObjectRequest(bucketName,
89+
obj['Name'],
90+
obj['Length'],
91+
objectDataStream,
92+
offset=int(obj['Offset']),
93+
job=bulkResult.result['JobId'],
94+
headers={"Content-CRC32": encodedChecksum}))
95+
96+
# we now verify that all our objects have been sent to DS3
97+
bucketResponse = client.get_bucket(ds3.GetBucketRequest(bucketName))
98+
99+
for obj in bucketResponse.result['ContentsList']:
100+
print(obj['Key'])
101+
102+
# delete the bucket by first deleting all the objects, and then deleting the bucket
103+
for obj in bucketResponse.result['ContentsList']:
104+
client.delete_object(ds3.DeleteObjectRequest(bucketName, obj['Key']))
105+
106+
client.delete_bucket(ds3.DeleteBucketRequest(bucketName))

0 commit comments

Comments
 (0)