-
Notifications
You must be signed in to change notification settings - Fork 14
/
Copy pathadd-amazon-s3-access.py
101 lines (78 loc) · 4.08 KB
/
add-amazon-s3-access.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
#!/usr/bin/env python
import boto3
import click
import time
def create_data_set(dataexchange, data_set_name):
data_set_creation_response = dataexchange.create_data_set(
AssetType='S3_DATA_ACCESS',
Name=data_set_name,
Description='S3 Data Access data set'
)
return data_set_creation_response['Id']
def add_shares(dataexchange, data_set_id, revision_id, bucket, prefix, key, kms_keys):
# Ensure that all prefixes end with a trailing slash; otherwise, these would not be resolved by Amazon S3
def format_prefix(p):
if p.endswith('/'):
return p
return p + '/'
prefixes = list(map(format_prefix, prefix))
def wrap_kms_key(k):
return {
"KmsKeyArn": k
}
create_job_details = {
'CreateS3DataAccessFromS3Bucket': {
'AssetSource': {
'Bucket': bucket,
'KeyPrefixes': prefixes,
'Keys': key
},
'DataSetId': data_set_id,
'RevisionId': revision_id
}
}
if (len(kms_keys) > 0):
create_job_details['CreateS3DataAccessFromS3Bucket']['AssetSource']['KmsKeysToGrant'] = list(map(wrap_kms_key, kms_keys))
create_job_response = dataexchange.create_job(
Details=create_job_details,
Type='CREATE_S3_DATA_ACCESS_FROM_S3_BUCKET'
)
job_id = create_job_response['Id']
job_state = create_job_response['State']
if (job_state == 'ERROR'):
raise click.ClickException(f'Data set creation failed with status {job_state}!')
return job_id
def wait_for_job_to_complete(dataexchange, job_id):
dataexchange.start_job(JobId=job_id)
while True:
job_status_response = dataexchange.get_job(JobId=job_id)
job_state = job_status_response['State']
if job_state in ['ABORTED', 'FAILED', 'ERROR']:
raise click.ClickException(f'Data set creation failed with status {job_state}!')
if job_state == 'COMPLETED':
return
click.echo(f'Still waiting for job {job_id} to finish.')
time.sleep(5)
@click.command()
@click.option('--data-set-name', required=True, help='Name of the AWS Data Exchange data set to create.')
@click.option('--bucket', required=True, help='Name of the Amazon S3 bucket that contains the prefixes and keys to share.')
@click.option('--data-set-id', help='If supplied, the data set ID to which the data access will be added. Existing access will be replaced.')
@click.option('--region', default='us-east-1', help='AWS Region of the Amazon S3 bucket, and where the data set will be.')
@click.option('--prefix', default=[], help='Prefix of an Amazon S3 location to share. Multiple values permitted.', multiple=True)
@click.option('--key', default=[], help='Key of an Amazon S3 object to share. Multiple values permitted.', multiple=True)
@click.option('--kms-key-arn', default=[], help='Amazon Resource Name of the KMS key used to encrypt the shared objects. Multiple values permitted.', multiple=True)
def main(data_set_name, bucket, data_set_id, region, prefix, key, kms_key_arn):
if (len(prefix) + len(key)) > 5:
raise click.UsageError('No more than a total of 5 prefixes and keys can be provided.')
if (len(kms_key_arn)) > 10:
raise click.UsageError('No more than a total of 10 KMS keys can be provided.')
dataexchange = boto3.client('dataexchange', region_name=region)
data_set_id_to_use = create_data_set(dataexchange, data_set_name) if data_set_id is None else data_set_id
create_revision_response = dataexchange.create_revision(DataSetId=data_set_id_to_use)
revision_id = create_revision_response['Id']
job_id = add_shares(dataexchange, data_set_id_to_use, revision_id, bucket, prefix, key, kms_key_arn)
wait_for_job_to_complete(dataexchange, job_id)
finalize_revision_response = dataexchange.update_revision(DataSetId=data_set_id_to_use, RevisionId=revision_id, Finalized=True)
click.echo(f'Data set {data_set_id_to_use} configured with Amazon S3 data access.')
if __name__ == '__main__':
main()