forked from microsoft/AzureTRE
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy path__init__.py
231 lines (183 loc) · 11.9 KB
/
__init__.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
import logging
from typing import Optional
import azure.functions as func
import datetime
import os
import uuid
import json
from exceptions import NoFilesInRequestException, TooManyFilesInRequestException
from shared_code import blob_operations, constants
from pydantic import BaseModel, parse_obj_as
class RequestProperties(BaseModel):
request_id: str
new_status: str
previous_status: Optional[str]
type: str
workspace_unique_identifier_suffix: str
class ContainersCopyMetadata:
source_account_name: str
dest_account_name: str
def __init__(self, source_account_name: str, dest_account_name: str):
self.source_account_name = source_account_name
self.dest_account_name = dest_account_name
def main(msg: func.ServiceBusMessage, stepResultEvent: func.Out[func.EventGridOutputEvent], dataDeletionEvent: func.Out[func.EventGridOutputEvent]):
try:
request_properties = extract_properties(msg)
request_files = get_request_files(request_properties) if request_properties.new_status == constants.STAGE_SUBMITTED else None
handle_status_changed(request_properties, stepResultEvent, dataDeletionEvent, request_files)
except NoFilesInRequestException:
set_output_event_to_report_failure(stepResultEvent, request_properties, failure_reason=constants.NO_FILES_IN_REQUEST_MESSAGE, request_files=request_files)
except TooManyFilesInRequestException:
set_output_event_to_report_failure(stepResultEvent, request_properties, failure_reason=constants.TOO_MANY_FILES_IN_REQUEST_MESSAGE, request_files=request_files)
except Exception:
set_output_event_to_report_failure(stepResultEvent, request_properties, failure_reason=constants.UNKNOWN_REASON_MESSAGE, request_files=request_files)
def handle_status_changed(request_properties: RequestProperties, stepResultEvent: func.Out[func.EventGridOutputEvent], dataDeletionEvent: func.Out[func.EventGridOutputEvent], request_files):
new_status = request_properties.new_status
previous_status = request_properties.previous_status
req_id = request_properties.request_id
unique_suffix = request_properties.workspace_unique_identifier_suffix
request_type = request_properties.type
logging.info(f'Processing request with id {req_id}. new status is {new_status}, previous status is {previous_status}, unique_suffix is {unique_suffix} type is {request_type}')
if new_status == constants.STAGE_DRAFT:
account_name = get_storage_account(status=constants.STAGE_DRAFT, request_type=request_type, workspace_unique_identifier_suffix=unique_suffix)
blob_operations.create_container(account_name, req_id)
return
if new_status == constants.STAGE_CANCELLED:
storage_account_name = get_storage_account(previous_status, request_type, unique_suffix)
container_to_delete_url = blob_operations.get_blob_url(account_name=storage_account_name, container_name=req_id)
set_output_event_to_trigger_container_deletion(dataDeletionEvent, request_properties, container_url=container_to_delete_url)
return
if new_status == constants.STAGE_SUBMITTED:
set_output_event_to_report_request_files(stepResultEvent, request_properties, request_files)
if (is_require_data_copy(new_status)):
logging.info('Request with id %s. requires data copy between storage accounts', req_id)
containers_metadata = get_source_dest_for_copy(new_status=new_status, previous_status=previous_status, request_type=request_type, workspace_unique_identifier_suffix=unique_suffix)
blob_operations.create_container(containers_metadata.dest_account_name, req_id)
blob_operations.copy_data(containers_metadata.source_account_name,
containers_metadata.dest_account_name, req_id)
return
# Other statuses which do not require data copy are dismissed as we don't need to do anything...
def extract_properties(msg: func.ServiceBusMessage) -> RequestProperties:
try:
body = msg.get_body().decode('utf-8')
logging.info('Python ServiceBus queue trigger processed message: %s', body)
json_body = json.loads(body)
result = parse_obj_as(RequestProperties, json_body["data"])
if not result:
raise Exception("Failed parsing request properties")
except json.decoder.JSONDecodeError:
logging.error(f'Error decoding object: {body}')
raise
except Exception as e:
logging.error(f'Error extracting properties: {e}')
raise
return result
def is_require_data_copy(new_status: str):
if new_status.lower() in [constants.STAGE_SUBMITTED, constants.STAGE_APPROVAL_INPROGRESS, constants.STAGE_REJECTION_INPROGRESS, constants.STAGE_BLOCKING_INPROGRESS]:
return True
return False
def get_source_dest_for_copy(new_status: str, previous_status: str, request_type: str, workspace_unique_identifier_suffix: str) -> ContainersCopyMetadata:
# sanity
if is_require_data_copy(new_status) is False:
raise Exception("Given new status is not supported")
request_type = request_type.lower()
if request_type != constants.IMPORT_TYPE and request_type != constants.EXPORT_TYPE:
msg = "Airlock request type must be either '{}' or '{}".format(str(constants.IMPORT_TYPE),
str(constants.EXPORT_TYPE))
logging.error(msg)
raise Exception(msg)
source_account_name = get_storage_account(previous_status, request_type, workspace_unique_identifier_suffix)
dest_account_name = get_storage_account_destination_for_copy(new_status, request_type, workspace_unique_identifier_suffix)
return ContainersCopyMetadata(source_account_name, dest_account_name)
def get_storage_account(status: str, request_type: str, workspace_unique_identifier_suffix: str) -> str:
tre_id = _get_tre_id()
if request_type == constants.IMPORT_TYPE:
if status == constants.STAGE_DRAFT:
return constants.STORAGE_ACCOUNT_NAME_IMPORT_EXTERNAL + tre_id
elif status == constants.STAGE_APPROVED:
return constants.STORAGE_ACCOUNT_NAME_IMPORT_APPROVED + workspace_unique_identifier_suffix
elif status == constants.STAGE_REJECTED:
return constants.STORAGE_ACCOUNT_NAME_IMPORT_REJECTED + tre_id
elif status == constants.STAGE_BLOCKED_BY_SCAN:
return constants.STORAGE_ACCOUNT_NAME_IMPORT_BLOCKED + tre_id
elif status in [constants.STAGE_IN_REVIEW, constants.STAGE_SUBMITTED, constants.STAGE_APPROVAL_INPROGRESS, constants.STAGE_REJECTION_INPROGRESS, constants.STAGE_BLOCKING_INPROGRESS]:
return constants.STORAGE_ACCOUNT_NAME_IMPORT_INPROGRESS + tre_id
if request_type == constants.EXPORT_TYPE:
if status == constants.STAGE_DRAFT:
return constants.STORAGE_ACCOUNT_NAME_EXPORT_INTERNAL + workspace_unique_identifier_suffix
elif status == constants.STAGE_APPROVED:
return constants.STORAGE_ACCOUNT_NAME_EXPORT_APPROVED + tre_id
elif status == constants.STAGE_REJECTED:
return constants.STORAGE_ACCOUNT_NAME_EXPORT_REJECTED + workspace_unique_identifier_suffix
elif status == constants.STAGE_BLOCKED_BY_SCAN:
return constants.STORAGE_ACCOUNT_NAME_EXPORT_BLOCKED + workspace_unique_identifier_suffix
elif status in [constants.STAGE_IN_REVIEW, constants.STAGE_SUBMITTED, constants.STAGE_APPROVAL_INPROGRESS, constants.STAGE_REJECTION_INPROGRESS, constants.STAGE_BLOCKING_INPROGRESS]:
return constants.STORAGE_ACCOUNT_NAME_EXPORT_INPROGRESS + workspace_unique_identifier_suffix
error_message = f"Missing current storage account definition for status '{status}' and request type '{request_type}'."
logging.error(error_message)
raise Exception(error_message)
def get_storage_account_destination_for_copy(new_status: str, request_type: str, workspace_unique_identifier_suffix: str) -> str:
tre_id = _get_tre_id()
if request_type == constants.IMPORT_TYPE:
if new_status == constants.STAGE_SUBMITTED:
return constants.STORAGE_ACCOUNT_NAME_IMPORT_INPROGRESS + tre_id
elif new_status == constants.STAGE_APPROVAL_INPROGRESS:
return constants.STORAGE_ACCOUNT_NAME_IMPORT_APPROVED + workspace_unique_identifier_suffix
elif new_status == constants.STAGE_REJECTION_INPROGRESS:
return constants.STORAGE_ACCOUNT_NAME_IMPORT_REJECTED + tre_id
elif new_status == constants.STAGE_BLOCKING_INPROGRESS:
return constants.STORAGE_ACCOUNT_NAME_IMPORT_BLOCKED + tre_id
if request_type == constants.EXPORT_TYPE:
if new_status == constants.STAGE_SUBMITTED:
return constants.STORAGE_ACCOUNT_NAME_EXPORT_INPROGRESS + workspace_unique_identifier_suffix
elif new_status == constants.STAGE_APPROVAL_INPROGRESS:
return constants.STORAGE_ACCOUNT_NAME_EXPORT_APPROVED + tre_id
elif new_status == constants.STAGE_REJECTION_INPROGRESS:
return constants.STORAGE_ACCOUNT_NAME_EXPORT_REJECTED + workspace_unique_identifier_suffix
elif new_status == constants.STAGE_BLOCKING_INPROGRESS:
return constants.STORAGE_ACCOUNT_NAME_EXPORT_BLOCKED + workspace_unique_identifier_suffix
error_message = f"Missing copy destination storage account definition for status '{new_status}' and request type '{request_type}'."
logging.error(error_message)
raise Exception(error_message)
def set_output_event_to_report_failure(stepResultEvent, request_properties, failure_reason, request_files):
logging.exception(f"Failed processing Airlock request with ID: '{request_properties.request_id}', changing request status to '{constants.STAGE_FAILED}'.")
stepResultEvent.set(
func.EventGridOutputEvent(
id=str(uuid.uuid4()),
data={"completed_step": request_properties.new_status, "new_status": constants.STAGE_FAILED, "request_id": request_properties.request_id, "request_files": request_files, "status_message": failure_reason},
subject=request_properties.request_id,
event_type="Airlock.StepResult",
event_time=datetime.datetime.utcnow(),
data_version=constants.STEP_RESULT_EVENT_DATA_VERSION))
def set_output_event_to_report_request_files(stepResultEvent, request_properties, request_files):
logging.info(f'Sending file enumeration result for request with ID: {request_properties.request_id} result: {request_files}')
stepResultEvent.set(
func.EventGridOutputEvent(
id=str(uuid.uuid4()),
data={"completed_step": request_properties.new_status, "request_id": request_properties.request_id, "request_files": request_files},
subject=request_properties.request_id,
event_type="Airlock.StepResult",
event_time=datetime.datetime.utcnow(),
data_version=constants.STEP_RESULT_EVENT_DATA_VERSION))
def set_output_event_to_trigger_container_deletion(dataDeletionEvent, request_properties, container_url):
logging.info(f'Sending container deletion event for request ID: {request_properties.request_id}. container URL: {container_url}')
dataDeletionEvent.set(
func.EventGridOutputEvent(
id=str(uuid.uuid4()),
data={"blob_to_delete": container_url},
subject=request_properties.request_id,
event_type="Airlock.DataDeletion",
event_time=datetime.datetime.utcnow(),
data_version=constants.DATA_DELETION_EVENT_DATA_VERSION
)
)
def get_request_files(request_properties: RequestProperties):
storage_account_name = get_storage_account(request_properties.previous_status, request_properties.type, request_properties.workspace_unique_identifier_suffix)
return blob_operations.get_request_files(account_name=storage_account_name, request_id=request_properties.request_id)
def _get_tre_id():
try:
tre_id = os.environ["TRE_ID"]
except KeyError as e:
logging.error(f'Missing environment variable: {e}')
raise
return tre_id