-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathintegration_test.py
executable file
·379 lines (286 loc) · 11.1 KB
/
integration_test.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
#!/usr/bin/env python
import argparse
import logging
import requests
import sys
import pprint
from datetime import datetime, timedelta
import maya
import time
import json
import base64
from zenroom import zenroom
logging.basicConfig(format='%(asctime)s - %(message)s', level=logging.INFO)
POLICYSTORE_PREFIX = '/twirp/decode.iot.policystore.PolicyStore/'
ENCODER_PREFIX = '/twirp/decode.iot.encoder.Encoder/'
DATASTORE_PREFIX = '/twirp/decode.iot.datastore.Datastore/'
def main():
help = """
Execute a series of integration tests against DECODE components in order to
verify they are behaving as expected. When executed this command runs the
following sequence of steps:
* create a policy and verify we can read this policy back from the policy
store
* create an encrypted stream for a device that applies this policy
* read some data from the encrypted datastore for the policy
* decrypt the data using zenroom and display this to the screen
* clean up all created resources"""
parser = argparse.ArgumentParser(
formatter_class=argparse.RawDescriptionHelpFormatter, description=help)
parser.add_argument(
'-v',
'--verbose',
action='store_true',
dest='verbose',
help='enable verbose mode',
default=False)
parser.add_argument(
'--device-token',
dest='device_token',
help='device token we wish to use for testing',
required=True)
parser.add_argument(
'--datastore',
dest='datastore',
help='URL of the encrypted datastore',
default='http://localhost:8080')
parser.add_argument(
'--policystore',
dest='policystore',
help='URL of the policy store',
default='http://localhost:8082')
parser.add_argument(
'--encoder',
dest='encoder',
help='URL of the stream encoder',
default='http://localhost:8081')
parser.set_defaults(func=run)
args = parser.parse_args()
args.func(args)
def run(args):
"""This is the list of operations this integration test performs.
Within this test script, we create a policy with a generated key/pair using
Zenroom, we then verify that we can read back the policy from the
policystore, then create an encrypted stream on the stream encoder. We then
verify that we can read values for the policy back from the datastore, and
finally we verify that are able to decrypt the written data again using
Zenroom.
Once the test completes we then try and delete all created resources."""
logging.info('Starting integration test')
# read configuration from our args
verbose = args.verbose
device_token = args.device_token
policystore_url = args.policystore
encoder_url = args.encoder
datastore_url = args.datastore
# create a policy
policy_credentials = create_policy(policystore_url,
create_policy_request(), verbose)
# read the policies back from the policystore
policies = list_policies(policystore_url, verbose)
# search for our created policy in the returned list
for p in policies['policies']:
if policy_credentials['policy_id'] == p['policy_id']:
policy = p
break
else:
# clean up the previously created policy
delete_policy(policystore_url, policy_credentials, verbose)
sys.exit(
"Unable to find created policy in list read from policy store")
# create a stream
stream_credentials = create_stream(
encoder_url, create_stream_request(device_token, policy), verbose)
# read some data from the datastore for the policy
read_request = create_read_request(policy)
success = read_data(datastore_url, read_request, verbose)
# delete the created stream
delete_stream(encoder_url, stream_credentials, verbose)
# delete the created policy
delete_policy(policystore_url, policy_credentials, verbose)
if success:
logging.info('SUCCESS: All tests succeeded')
else:
sys.exit('FAILED: Failed to read data')
def create_policy_request():
"""Return a static configuration for a policy"""
return {
'public_key':
r'BBLewg4VqLR38b38daE7Fj\/uhr543uGrEpyoPFgmFZK6EZ9g2XdK\/i65RrSJ6sJ96aXD3DJHY3Me2GJQO9\/ifjE=',
'label':
'Integration Test Policy',
'operations': [{
'sensor_id': 10,
'action': 'SHARE',
}, {
'sensor_id': 53,
'action': 'BIN',
'bins': [30.0, 60.0, 90.0]
}, {
'sensor_id': 55,
'action': 'MOVING_AVG',
'interval': 300
}]
}
def create_stream_request(device_token, policy):
"""Return the json object we must send to create a stream"""
return {
'device_token':
device_token,
'policy_id':
policy['policy_id'],
'recipient_public_key':
r'BBLewg4VqLR38b38daE7Fj\/uhr543uGrEpyoPFgmFZK6EZ9g2XdK\/i65RrSJ6sJ96aXD3DJHY3Me2GJQO9\/ifjE=',
'location': {
'longitude': 2.156746,
'latitude': 41.401642
},
'exposure':
'INDOOR',
'operations':
policy['operations']
}
def create_read_request(policy):
"""Return the json object for a read request"""
start_time = maya.MayaDT.from_datetime(datetime.now() -
timedelta(minutes=15)).rfc3339()
return {'policy_id': policy['policy_id'], 'start_time': start_time}
def headers():
"""Return static headers that all requests need"""
return {
'user-agent': 'integration-tester',
'content-type': 'application/json',
}
def create_policy(policystore_url, create_policy_request, verbose):
"""Create a new entitlement policy.
This sends a request to the policystore to create a new policy using the
static configuration defined previously in this script."""
if verbose:
logging.info('Creating policy')
pprint.pprint(create_policy_request)
create_url = policystore_url + POLICYSTORE_PREFIX + 'CreateEntitlementPolicy'
r = requests.post(
create_url, headers=headers(), json=create_policy_request)
if r.status_code != 200:
logging.error(f'ERROR: Unexpected response: {r.status_code}')
pprint.pprint(r.json())
sys.exit('Failed to create policy')
resp = r.json()
logging.info(
f'SUCCESS: Created policy - ID: {resp["policy_id"]}, Token: {resp["token"]}'
)
return resp
def delete_policy(policystore_url, policy_credentials, verbose):
"""Delete existing entitlement policy.
This sends a request to the policystore to delete an existing policy. This
requires the use of the generated ID and token from a previous create
call."""
if verbose:
logging.info('Deleting policy')
pprint.pprint(policy_credentials)
delete_url = policystore_url + POLICYSTORE_PREFIX + 'DeleteEntitlementPolicy'
r = requests.post(delete_url, headers=headers(), json=policy_credentials)
if r.status_code != 200:
logging.error(f'ERROR: Unexpected response: {r.status_code}')
pprint.pprint(r.json())
sys.exit('Failed to delete policy')
logging.info('SUCCESS: Deleted policy')
def list_policies(policystore_url, verbose):
"""List available entitlement policies
This sends a request to the policystore to read a list of all available
policies."""
if verbose:
logging.info('Listing policies')
list_url = policystore_url + POLICYSTORE_PREFIX + 'ListEntitlementPolicies'
r = requests.post(list_url, headers=headers(), json={})
if r.status_code != 200:
logging.error(f'ERROR: Unexpected response: {r.status_code}')
pprint.pprint(r.json())
sys.exit('Failed to list policies')
logging.info('SUCCESS: Listed policies')
resp = r.json()
if verbose:
logging.info('Policies retrieved')
pprint.pprint(resp)
return resp
def create_stream(encoder_url, request, verbose):
"""Create a new encoded stream"""
if verbose:
logging.info('Creating encoded stream')
pprint.pprint(request)
create_url = encoder_url + ENCODER_PREFIX + 'CreateStream'
r = requests.post(create_url, headers=headers(), json=request)
if r.status_code != 200:
logging.error(f'ERROR: Unexpected response: {r.status_code}')
pprint.pprint(r.json())
sys.exit('Failed to create policy')
resp = r.json()
logging.info(
f'SUCCESS: Created stream - ID: {resp["stream_uid"]}, Token: {resp["token"]}'
)
return resp
def delete_stream(encoder_url, stream_credentials, verbose):
"""Delete a stream on being given the credentials for that stream"""
if verbose:
logging.info('Deleting stream')
pprint.pprint(stream_credentials)
delete_url = encoder_url + ENCODER_PREFIX + 'DeleteStream'
r = requests.post(delete_url, headers=headers(), json=stream_credentials)
if r.status_code != 200:
logging.error(f'ERROR: Unexpected response: {r.status_code}')
pprint.pprint(r.json())
sys.exit('Failed to delete stream')
logging.info('SUCCESS: Stream deleted')
def read_data(datastore_url, read_request, verbose):
"""Attempt to read encrypted data from the datastore"""
logging.info('Checking if data is available')
if verbose:
pprint.pprint(read_request)
read_url = datastore_url + DATASTORE_PREFIX + 'ReadData'
timeout = time.time() + 60 * 2 # 2 minutes from now
while True:
print('.', end='', flush=True)
r = requests.post(read_url, headers=headers(), json=read_request)
if r.status_code != 200:
logging.error(f'ERROR: Unexpected response: {r.status_code}')
pprint.pprint(r.json())
return False
resp = r.json()
if verbose:
pprint.pprint(resp)
if len(resp['events']) > 0:
print('')
logging.info('SUCCESS: Read encrypted data')
return decrypt_data(resp['events'][0], verbose)
if time.time() > timeout:
print('')
logging.warning(
'ERROR: Failed to read any data for the policy. Please check device token and try again'
)
return False
time.sleep(10)
return True
def decrypt_data(event, verbose):
keys = json.dumps({
'community_seckey':
r'D19GsDTGjLBX23J281SNpXWUdu+oL6hdAJ0Zh6IrRHA='
})
with open('decrypt.lua') as file:
script = file.read()
decoded = base64.decodebytes(event['data'].encode())
result = zenroom.execute(script.encode(), keys=keys.encode(), data=decoded)
packet = json.loads(result)
data = json.loads(packet['data'])
if verbose:
logging.info("Decrypted data")
pprint.pprint(data)
# verify the presence of an expected key
if 'token' in data:
logging.info('SUCCESS: Decrypted packet correctly')
return True
else:
logging.info('ERROR: Unexpected content after decryption')
pprint.pprint(data)
return False
if __name__ == "__main__":
main()