-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathexample_gpt_aws_s3_rds.py
102 lines (77 loc) · 3.68 KB
/
example_gpt_aws_s3_rds.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
import boto3
import os
os.environ['AWS_DEFAULT_REGION'] = ''
os.environ['AWS_ACCESS_KEY_ID'] = ''
os.environ['AWS_SECRET_ACCESS_KEY'] = ''
import boto3
import os
from botocore.exceptions import ClientError
import uuid
def create_s3_bucket(bucket_name):
s3 = boto3.client('s3', aws_access_key_id=os.environ['AWS_ACCESS_KEY_ID'],
aws_secret_access_key=os.environ['AWS_SECRET_ACCESS_KEY'],
region_name=os.environ['AWS_DEFAULT_REGION'])
try:
s3.create_bucket(Bucket=bucket_name, CreateBucketConfiguration={'LocationConstraint': os.environ['AWS_DEFAULT_REGION']})
except ClientError as e:
print(e)
def create_rds_instance(instance_name, master_username, master_password):
rds = boto3.client('rds', aws_access_key_id=os.environ['AWS_ACCESS_KEY_ID'],
aws_secret_access_key=os.environ['AWS_SECRET_ACCESS_KEY'],
region_name=os.environ['AWS_DEFAULT_REGION'])
try:
rds.create_db_instance(
AllocatedStorage=20,
DBInstanceIdentifier=instance_name,
Engine='mysql',
MasterUsername=master_username,
MasterUserPassword=master_password,
DBInstanceClass='db.t2.micro',
VpcSecurityGroupIds=[],
AvailabilityZone=os.environ['AWS_DEFAULT_REGION'] + 'a'
)
except ClientError as e:
print(e)
def test_data_transfer(bucket_name, instance_name, master_username, master_password):
import pymysql
import time
s3 = boto3.resource('s3', aws_access_key_id=os.environ['AWS_ACCESS_KEY_ID'],
aws_secret_access_key=os.environ['AWS_SECRET_ACCESS_KEY'],
region_name=os.environ['AWS_DEFAULT_REGION'])
rds = boto3.client('rds', aws_access_key_id=os.environ['AWS_ACCESS_KEY_ID'],
aws_secret_access_key=os.environ['AWS_SECRET_ACCESS_KEY'],
region_name=os.environ['AWS_DEFAULT_REGION'])
try:
s3.Object(bucket_name, 'test_data.txt').put(Body='Hello World!')
except ClientError as e:
print(e)
instance = rds.describe_db_instances(DBInstanceIdentifier=instance_name)['DBInstances'][0]
while instance['DBInstanceStatus'] != 'available':
time.sleep(10)
instance = rds.describe_db_instances(DBInstanceIdentifier=instance_name)['DBInstances'][0]
try:
connection = pymysql.connect(
host=instance['Endpoint']['Address'],
user=master_username,
password=master_password,
db='mysql'
)
with connection.cursor() as cursor:
cursor.execute('CREATE DATABASE IF NOT EXISTS test_db')
cursor.execute('USE test_db')
cursor.execute('CREATE TABLE IF NOT EXISTS test_data (id INT AUTO_INCREMENT PRIMARY KEY, content TEXT NOT NULL)')
cursor.execute("INSERT INTO test_data (content) VALUES ('Hello World!')")
connection.commit()
cursor.execute('SELECT * FROM test_data')
result = cursor.fetchone()
assert result[1] == 'Hello World!'
print('Data transfer test passed.')
except pymysql.MySQLError as e:
print(e)
def deploy_solution(bucket_name, instance_name, master_username, master_password):
unique_id = str(uuid.uuid4())
create_s3_bucket(bucket_name + '-' + unique_id)
create_rds_instance(instance_name + '-' + unique_id, master_username, master_password)
test_data_transfer(bucket_name + '-' + unique_id, instance_name + '-' + unique_id, master_username, master_password)
if __name__ == '__main__':
deploy_solution('my-test-s3-bucket', 'my-test-rds-instance', 'myuser', 'mypassword')