This repository was archived by the owner on Aug 23, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 8
/
Copy pathTEST_protocol_details.py
5203 lines (4607 loc) · 270 KB
/
TEST_protocol_details.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
# Copyright Notice:
# Copyright 2016-2019 DMTF. All rights reserved.
# License: BSD 3-Clause License. For full text see link: https://github.com/DMTF/Redfish-Service-Conformance-Check/blob/main/LICENSE.md
#####################################################################################################
# File: rfs_check.py
# Description: Redfish service conformance check tool. This module contains implemented Section 6
# assertions for SUT.These assertions are based on operational checks on Redfish Service to verify
# that it conforms to the normative statements from the Redfish specification.
# See assertions in redfish-assertions.xlxs for assertions summary
#
# Verified/operational Python revisions (Windows OS) :
# 2.7.10
# 3.4.3
#
# Initial code released : 01/2016
# Steve Krig ~ Intel
# Fatima Saleem ~ Intel
# Priyanka Kumari ~ Texas Tech University
#####################################################################################################
# Priyanka- making changes here to access schema files in import sys-adding from schema and from collections
import sys
from schema import SchemaModel
from collections import OrderedDict
from xml.dom import minidom
import re
import rf_utility
import datetime
import xml.etree.ElementTree as ET
import urllib
# map python 2 vs 3 imports
if (sys.version_info < (3, 0)):
# Python 2
Python3 = False
from urlparse import urlparse
from StringIO import StringIO
from httplib import HTTPSConnection, HTTPConnection, HTTPResponse
import urllib
import urllib2
from urllib import urlopen
else:
# Python 3
Python3 = True
from urllib.parse import urlparse
from io import StringIO
from http.client import HTTPSConnection, HTTPConnection, HTTPResponse
from urllib.request import urlopen
import ssl
import re
import json
import argparse
import base64
import datetime
import types
import socket
import select
import os
from collections import OrderedDict
import time
# current spec followed for these assertions
REDFISH_SPEC_VERSION = "Version 1.0.5"
#####################################################################################################
# Name: CacheURI
# Description: Cache a few random URI's in order to speed up the tool
#####################################################################################################
cached_uri = None
cached_uri_no_member = None
def cacheURI(self):
global cached_uri
global cached_uri_no_member
cached_uri = self.uris
cached_uri_no_member = self.uris_no_members
#####################################################################################################
# Name: Assertion_1_2_3(self, log)
# Description: This is General Assertion Template for writing assertion code
#####################################################################################################
def Assertion_1_2_3(self, log) : # self here is service's instance..
#set id in log object, this should be same as the excel
log.AssertionID = '1.2.3'
assertion_status = log.PASS
log.assertion_log('BEGIN_ASSERTION', None)
# authorization ~ this controls whether an HTTP request contains an authorization
# header or not -~ this can be set to 'on' or 'off' ~ if its 'off' then no
authorization = 'on'
## Assertion verification logic goes here...
#sample intermediate log string going to the text logfile and the xlxs file
log.assertion_log('line', "~ GET %s" % self.Redfish_URIs['Protocol_Version'])
#sample intermediate log string to text logfile
log.assertion_log('TX_COMMENT', "~ GET %s" % self.Redfish_URIs['Protocol_Version'])
#Note: any assertion which FAILs or WARNs should place an explanation in the the reporting spreadsheet
#be careful with volume of text here ~ needs to fit in a spreadsheet cell..
if (assertion_status != log.PASS):
# this text will go only into the comment section of the xlxs assertion run spreadsheet
log.assertion_log('XL_COMMENT', ('~ GET %s : %s %s' % (self.Redfish_URIs['Protocol_Version'], assertion_status, "a meaningful failure note")) )
## log completion status
log.assertion_log(assertion_status, None)
return (assertion_status)
#
## end Assertion 1.2.3
#####################################################################################################
# Name: Assertion_6_1_8_2(self, log)
# Description:
# GET: Object or Collection retrieval
#####################################################################################################
def Assertion_6_1_8_2(self, log) :
log.AssertionID = '6.1.8.2'
assertion_status = log.PASS
log.assertion_log('BEGIN_ASSERTION', None)
# get SUTs relative uris dictionary
relative_uris = self.relative_uris
authorization = 'on'
rq_headers = self.request_headers()
# loop for each uri in relative uris dict
for relative_uri in cached_uri:
json_payload, headers, status = self.http_cached_GET(relative_uris[relative_uri], rq_headers, authorization)
assertion_status_ = self.response_status_check(relative_uris[relative_uri], status, log)
# manage assertion status
assertion_status = log.status_fixup(assertion_status,assertion_status_)
log.assertion_log(assertion_status, None)
return (assertion_status)
#
## end Assertion 6.1.8.2
##
# these next 3 assertions should be run in series ~ create/cleanup a new user account...
# ...POST/create a new user account
# ...PATCH/update the new user account -~ this assertion expects that a user acct for 'testuser' has been previosly created..
# ...DELETE the new user account
#
##
#####################################################################################################
# Name: Assertion_6_1_8_1(self, log)
# Description:
# POST: Object create, Object action, Eventing
# Spec:
# The service shall set the Location header to the URI of the newly created resource.
# The response to a successful create request should be 201 (Created) and may include
# a response body containing the "representation of the newly created resource".
# For POST request body see requiredOnCreate annotation in schemas
# TODO try a sample post, patch and delete for every resource that allows the methods, run it in series
#####################################################################################################
def Assertion_6_1_8_1(self, log) :
log.AssertionID = '6.1.8.1'
assertion_status = log.PASS
log.assertion_log('BEGIN_ASSERTION', None)
authorization = 'on'
rq_headers = self.request_headers()
user_name = self.SUT_prop['LoginName']
password = self.SUT_prop['Password']
root_link_key = 'SessionService'
sample = dict()
if root_link_key in self.sut_toplevel_uris and self.sut_toplevel_uris[root_link_key]['url']:
json_payload, headers, status = self.http_cached_GET(self.sut_toplevel_uris[root_link_key]['url'], rq_headers, authorization)
assertion_status_ = self.response_status_check(self.sut_toplevel_uris[root_link_key]['url'], status, log)
# manage assertion status
assertion_status = log.status_fixup(assertion_status, assertion_status_)
if assertion_status_ != log.PASS:
pass
elif not json_payload:
assertion_status = log.WARN
log.assertion_log('line', 'No response body returned for resource %s. This assertion for the resource could not be completed' % (self.sut_toplevel_uris[root_link_key]['url']))
else:
## get Sessions collection from payload
try :
key = 'Sessions'
session_collection = (json_payload[key])['@odata.id']
except :
assertion_status = log.WARN
log.assertion_log('line', "~ \'Sessions\' not found in the payload from GET %s" % (self.sut_toplevel_uris[root_link_key]['url']))
else:
json_payload, headers, status = self.http_cached_GET(session_collection, rq_headers, authorization)
assertion_status_ = self.response_status_check(session_collection, status, log)
# manage assertion status
assertion_status = log.status_fixup(assertion_status,assertion_status_)
if assertion_status_ != log.PASS:
pass
elif not headers:
assertion_status = log.WARN
else:
if (assertion_status == log.PASS): # Ok to create the session now
rq_body = {'UserName': user_name, 'Password': password}
authorization = 'off'
rq_headers = self.request_headers()
log.assertion_log('TX_COMMENT', 'Requesting POST on resource %s with request body %s' % (session_collection, rq_body))
json_payload, headers, status = self.http_POST(session_collection, rq_headers, rq_body, authorization)
assertion_status_ = self.response_status_check(session_collection, status, log, rf_utility.HTTP_CREATED, 'POST')
# manage assertion status
assertion_status = log.status_fixup(assertion_status,assertion_status_)
if assertion_status_ != log.PASS:
pass
elif not headers:
assertion_status = log.WARN
else:
session_location = headers['location']
# perfcorm assertions 6_1_8_1_1, 6_1_8_1_2 to make sure session was created and returned expected headers/body
if Assertion_6_1_8_1_1(headers, session_collection, log) == log.PASS:
# check resource representation in response body
subassertion_status = Assertion_6_1_8_1_2(session_location, json_payload, self, log)
if subassertion_status:
# Check status
assertion_status = subassertion_status if (subassertion_status != log.PASS) else assertion_status
else:
assertion_status = log.WARN
log.assertion_log('line', "~ Uri to resource: %s not found in redfish top level links: %s" % (root_link_key, self.sut_toplevel_uris) )
log.AssertionID = '6.1.8.1'
log.assertion_log(assertion_status, None)
return assertion_status
#
## end Assertion 6.1.8.1
#####################################################################################################
# Name: Assertion_6_1_8_1_1(self, log) Authentication
# Description:
# The response to the POST request to create a user includes:
# a "Location header that contains a link to the newly created resource.
#####################################################################################################
def Assertion_6_1_8_1_1(headers, url, log):
log.AssertionID = '6.1.8.1.1'
assertion_status = log.PASS
log.assertion_log('BEGIN_ASSERTION', None)
authorization = 'on'
response_key = 'location'
if response_key not in headers:
assertion_status = log.FAIL
log.assertion_log('line', "~ Expected header %s to be in the response header of %s ~ Not found" % (response_key, url))
log.assertion_log(assertion_status, None)
return assertion_status
#
## end Assertion 6.1.8.1.1
#####################################################################################################
# Name: Assertion_6_1_8_1_2(self, log)
# Description:
# The JSON response body 'may' contain a full representation of the newly created object
#####################################################################################################
def Assertion_6_1_8_1_2(location_url, json_payload, self, log):
log.AssertionID = '6.1.8.1.2'
assertion_status = log.PASS
log.assertion_log('BEGIN_ASSERTION', None)
authorization = 'on'
rq_headers = self.request_headers()
# get the object using GET on location and compare it against the payload returned during POST
check_payload, headers, status = self.http_cached_GET(location_url, rq_headers, authorization)
assertion_status_ = self.response_status_check(location_url, status, log)
# manage assertion status
assertion_status = log.status_fixup(assertion_status,assertion_status_)
if assertion_status_ != log.PASS:
pass
elif not check_payload:
assertion_status = log.WARN
log.assertion_log('line', 'No response body returned for resource %s. This assertion for the resource could not be completed' % (location_url))
else:
assertion_status = verifyCreatedObject(json_payload, check_payload, log)
if assertion_status == log.FAIL:
# 'may' contain a full representation....
assertion_status = log.WARN
try:
log.assertion_log('line', "~ The response body does not contain a full representation of the newly created session object at %s" % (check_payload['@odata.id'] ))
except:
log.assertion_log('line', "~ The response body does not contain a full representation of the newly created session object")
log.assertion_log(assertion_status, None)
return assertion_status
#
## end Assertion 6.1.8.1.2
#####################################################################################################
# Name: verifyCreatedObject (object_payload, check_payload)
# Description:
# Verify an object represented by a json body on its creation with the object requested via GET for
# the same resource using its location from header
#####################################################################################################
def verifyCreatedObject(object_payload, check_payload, log):
exclude_keys = [
'@odata.id',
'@odata.context',
'@odata.type',
'@odata.etag'
]
assertion_status = log.PASS
if not isinstance(object_payload, dict):
assertion_status = log.FAIL
log.assertion_log('line', '~ The response body (type {}) returned from POST is not a JSON object, so cannot check it against the GET response'
.format(type(object_payload)))
return assertion_status
# check for mismatch...
for key in check_payload.keys():
if key in exclude_keys:
# Allow for the service to have added @odata by skipping them
# (they will not be in the object we created so the checks for
# these tags in the original object will fail)
continue
if (key in object_payload):
if (check_payload[key] != object_payload[key]):
assertion_status = log.FAIL
log.assertion_log('line', "~ Property \'%s\' : \'%s\' does not match what was specified at Creation %s" % (key, check_payload[key], object_payload[key]) )
else:
assertion_status = log.FAIL
log.assertion_log('line', "~ The response body does not contain \'%s\' Property of the newly created object" % (key ))
return assertion_status
#####################################################################################################
# Name: Assertion_6_1_8_3(self, log)
# Description:
# PATCH Object update
#####################################################################################################
def Assertion_6_1_8_3(self, log) :
log.AssertionID = '6.1.8.3'
assertion_status = log.PASS
log.assertion_log('BEGIN_ASSERTION', None)
authorization = 'on'
rq_headers = self.request_headers()
user_name = 'xxuserxx'
root_link_key = 'AccountService'
if root_link_key in self.sut_toplevel_uris and self.sut_toplevel_uris[root_link_key]['url']:
# GET user accounts
json_payload, headers, status = self.http_cached_GET(self.sut_toplevel_uris[root_link_key]['url'], rq_headers, authorization)
assertion_status_ = self.response_status_check(self.sut_toplevel_uris[root_link_key]['url'], status, log)
# manage assertion status
assertion_status = log.status_fixup(assertion_status,assertion_status_)
if assertion_status_ != log.PASS:
pass
elif not json_payload:
assertion_status = log.WARN
log.assertion_log('line', 'No response body returned for resource %s. This assertion for the resource could not be completed' % (self.sut_toplevel_uris[root_link_key]['url']))
else:
## get Accounts collection from payload
try :
key = 'Accounts'
acc_collection = (json_payload[key])['@odata.id']
except : # no user accounts?
assertion_status = log.WARN
log.assertion_log('line', "~ no accounts collection was returned from GET %s" % self.sut_toplevel_uris[root_link_key]['url'])
else:
found = False
members = self.get_resource_members(acc_collection)
# check for user we want to PATCH
for json_payload, headers in members:
if json_payload is None:
continue
if json_payload['UserName'] == user_name:
found = True
account_url = json_payload['@odata.id']
patch_key = 'RoleId'
patch_value = 'Operator'
rq_body = {'UserName': user_name, 'Password': '12345678' , 'RoleId' : patch_value}
rq_headers['Content-Type'] = rf_utility.content_type['json']
json_payload_, headers_, status_ = self.http_PATCH(account_url, rq_headers, rq_body, authorization)
assertion_status_ = self.response_status_check(account_url, status_, log, request_type = 'PATCH')
# manage assertion status
assertion_status = log.status_fixup(assertion_status,assertion_status_)
if assertion_status_ != log.PASS:
break
else:
log.assertion_log('line', "~ note: PATCH %s PASS" % (account_url))
#check if the patch succeeded:
json_payload, json_headers, status = self.http_cached_GET(account_url, rq_headers, authorization)
assertion_status_ = self.response_status_check(account_url, status, log)
# manage assertion status
assertion_status = log.status_fixup(assertion_status,assertion_status_)
if assertion_status_ != log.PASS:
log.assertion_log('line', "~ note: Unable to verify if PATCH succeeded, status %s" % status)
break
elif json_payload:
#if patch_value not in json_payload, FAIL
if 'RoleId' in json_payload:
if patch_value not in json_payload['RoleId']:
assertion_status = log.FAIL
log.assertion_log('line', "~ note : Expected value of patched %s : %s found %s" % (patch_key, patch_value, json_payload[patch_key]) )
break
if found == False:
assertion_status = log.WARN
log.assertion_log('line', "~ note: PATCH could not be verified")
else:
assertion_status = log.WARN
log.assertion_log('line', "~ Uri to resource: %s not found in redfish top level links: %s" % (root_link_key, self.sut_toplevel_uris) )
log.assertion_log(assertion_status, None)
return assertion_status
## end Assertion 6.1.8.3
#####################################################################################################
# Name: Assertion_6_1_8_4(self, log)
# Description: Test DELETE method by creating a Session and then deleting it
# Method: DELETE Object delete
#####################################################################################################
def Assertion_6_1_8_4(self, log):
log.AssertionID = '6.1.8.4'
assertion_status = log.PASS
log.assertion_log('BEGIN_ASSERTION', None)
authorization = 'on'
rq_headers = self.request_headers()
user_name = self.SUT_prop['LoginName']
password = self.SUT_prop['Password']
root_link_key = 'SessionService'
if root_link_key in self.sut_toplevel_uris and self.sut_toplevel_uris[root_link_key]['url']:
# perform a GET on the SessionService
json_payload, headers, status = self.http_cached_GET(self.sut_toplevel_uris[root_link_key]['url'], rq_headers,
authorization)
assertion_status_ = self.response_status_check(self.sut_toplevel_uris[root_link_key]['url'], status, log)
assertion_status = log.status_fixup(assertion_status, assertion_status_)
if assertion_status_ != log.PASS:
pass
elif not json_payload:
assertion_status = log.WARN
log.assertion_log('line',
'No response body returned for resource %s. This assertion could not be completed' %
(self.sut_toplevel_uris[root_link_key]['url']))
else:
# get Sessions collection from payload
try:
key = 'Sessions'
session_collection = (json_payload[key])['@odata.id']
except:
assertion_status = log.WARN
log.assertion_log('line', "~ \'Sessions\' not found in the payload from GET %s" % (
self.sut_toplevel_uris[root_link_key]['url']))
else:
# perform a GET on the Sessions collection
json_payload, headers, status = self.http_cached_GET(session_collection, rq_headers, authorization)
assertion_status_ = self.response_status_check(session_collection, status, log)
assertion_status = log.status_fixup(assertion_status, assertion_status_)
if assertion_status_ != log.PASS:
pass
elif not headers:
assertion_status = log.WARN
log.assertion_log('line', "~ No headers returned with \'Sessions\' payload from GET %s" % (
self.sut_toplevel_uris[root_link_key]['url']))
else:
# Ok to create the session now
rq_body = {'UserName': user_name, 'Password': password}
post_rq_headers = self.request_headers()
log.assertion_log('TX_COMMENT', 'Requesting POST on resource %s with request body %s' % (
session_collection, {'UserName': user_name, 'Password': '********'}))
authorization = 'off'
json_payload, headers, status = self.http_POST(session_collection, post_rq_headers,
rq_body, authorization)
assertion_status_ = self.response_status_check(session_collection, status, log,
rf_utility.HTTP_CREATED, request_type='POST')
assertion_status = log.status_fixup(assertion_status, assertion_status_)
if assertion_status_ != log.PASS:
if json_payload is not None:
log.assertion_log('line', 'Response body from failed POST: {}'.format(json_payload))
elif not headers or 'location' not in headers:
assertion_status = log.WARN
log.assertion_log('line', "~ No Location header returned from POST %s" % session_collection)
else:
session_location = headers.get('location')
x_auth_token = headers.get('x-auth-token')
log.assertion_log('line', 'DEBUG response headers: Location: {}; X-Auth-Token: {}'
.format(session_location, x_auth_token))
delete_rq_headers = self.request_headers()
if x_auth_token is not None:
delete_rq_headers['X-Auth-Token'] = x_auth_token
else:
log.assertion_log('line',
"~ No X-Auth-Token header returned from POST %s" % session_collection)
authorization = 'off'
json_payload, headers, status = self.http_DELETE(session_location, delete_rq_headers,
authorization)
assertion_status_ = self.response_status_check(session_location, status, log,
request_type='DELETE')
assertion_status = log.status_fixup(assertion_status, assertion_status_)
if assertion_status_ != log.PASS:
if json_payload is not None:
log.assertion_log('line', 'Response body from failed DELETE: {}'.format(json_payload))
else:
assertion_status = log.WARN
log.assertion_log('line', "~ Uri to resource: %s not found in redfish top level links: %s" % (
root_link_key, self.sut_toplevel_uris))
log.AssertionID = '6.1.8.4'
log.assertion_log(assertion_status, None)
return assertion_status
#
## end Assertion 6.1.8.4
###################################################################################################
# Name: Assertion_6_1_9(self, log)
# Description: Other HTTP methods are not allowed and shall receive a 405 response.
# Method: TRACE ~ should not be supported, expected status 405
# OPTIONS ~ is not a Required one but it can be supported. Depends on server.
###################################################################################################
def Assertion_6_1_9(self, log) :
log.AssertionID = '6.1.9'
assertion_status = log.PASS
log.assertion_log('BEGIN_ASSERTION', None)
relative_uris = self.relative_uris
authorization = 'on'
rq_headers = self.request_headers()
for relative_uri in cached_uri:
json_payload, headers, status = self.http_TRACE(relative_uris[relative_uri], rq_headers, None, authorization)
assertion_status_ = self.response_status_check(relative_uris[relative_uri], status, log, rf_utility.HTTP_METHODNOTALLOWED, 'TRACE')
if(status== 405):
assertion_status = log.PASS
# manage assertion status
assertion_status = log.status_fixup(assertion_status,assertion_status_)
'''json_payload, headers, status = self.http_OPTIONS(relative_uris[relative_uri], rq_headers, None, authorization)
assertion_status_ = self.response_status_check(relative_uris[relative_uri], status, log, rf_utility.HTTP_METHODNOTALLOWED, 'OPTIONS')
# manage assertion status
assertion_status = log.status_fixup(assertion_status,assertion_status_)'''
log.assertion_log(assertion_status, None)
return (assertion_status)
#
## end Assertion 6.1.9
###################################################################################################
# Name: Assertion_6_1_11(self, log)
# Description:
# All resources shall be made available using the JSON media type "application/json".
###################################################################################################
def Assertion_6_1_11(self, log) :
log.AssertionID = '6.1.11'
assertion_status = log.PASS
log.assertion_log('BEGIN_ASSERTION', None)
relative_uris = self.relative_uris
rq_headers = self.request_headers()
header_key = 'Content-Type'
header_value = rf_utility.accept_type['json']
rq_headers[header_key] = header_value
authorization = 'on'
for relative_uri in cached_uri:
json_payload, headers, status = self.http_cached_GET(relative_uris[relative_uri], rq_headers, authorization)
assertion_status_ = self.response_status_check(relative_uris[relative_uri], status, log)
# manage assertion status
assertion_status = log.status_fixup(assertion_status,assertion_status_)
log.assertion_log(assertion_status, None)
return (assertion_status)
#
## end Assertion 6.1.11
###################################################################################################
# Name: Assertion 6_1_12(self,log)
# Description:
# Redfish services shall make every resource available in a representation based on JSON, as specified in RFC4627.
# Receivers shall not reject a message because it is encoded in JSON, and shall offer at least one response representation based on JSON.
###################################################################################################
def Assertion_6_1_12(self,log) :
log.AssertionID = '6.1.12'
assertion_status = log.PASS
log.assertion_log('BEGIN_ASSERTION', None)
relative_uris = self.relative_uris
print('Checking if all responses are in JSON')
authorization = 'on'
rq_headers = self.request_headers()
for relative_uri in cached_uri:
json_payload, headers, status = self.http_cached_GET(relative_uris[relative_uri], rq_headers, authorization)
if status == 200 :
if not json_payload:
print ('The response received is not in JSON %s'% json_payload)
assertion_status = log.FAIL
else :
continue
log.assertion_log(assertion_status, None)
return (assertion_status)
#
## end Assertion 6.1.12
#####################################################################################################
# Name: Assertion_6_1_14(self, log)
# Description:
# ETag headers: For certain requests and responses implementation shall support returning ETag headers
#####################################################################################################
def Assertion_6_1_14(self, log) :
log.AssertionID = '6.1.14'
assertion_status = log.PASS
log.assertion_log('BEGIN_ASSERTION', None)
# get SUTs relative uris dictionary
relative_uris = self.relative_uris
authorization = 'on'
rq_headers = self.request_headers()
# loop for each uri in relative uris dict
for relative_uri in cached_uri:
json_payload, headers, status = self.http_cached_GET(relative_uris[relative_uri], rq_headers, authorization)
if status == rf_utility.HTTP_OK:
if 'etag' in headers:
print('ETag header is present in {}'.format(relative_uris[relative_uri]))
print('The ETag is {}'.format(headers.get('etag')))
assertion_status_ = log.PASS
else:
typename = None
if json_payload is not None and '@odata.type' in json_payload:
namespace, typename = rf_utility.parse_odata_type(json_payload.get('@odata.type'))
if typename == 'ManagerAccount':
assertion_status_ = log.FAIL
log.assertion_log('line', '~ ETag header missing in response from {}; required for {} resources'
.format(relative_uris[relative_uri], typename))
else:
assertion_status_ = log.WARN
print('ETag header missing in response from {}'.format(relative_uris[relative_uri]))
assertion_status = log.status_fixup(assertion_status, assertion_status_)
log.assertion_log(assertion_status, None)
return assertion_status
#
## end Assertion 6.1.14
###################################################################################################
# Name: Assertion_6_1_13(self, log)
# Description:
# Services should support gzip compression when requested by the client. if service cannot respond
# with gzip, 406 should be returned
###################################################################################################
def Assertion_6_1_13(self, log) :
log.AssertionID = '6.1.13'
assertion_status = log.PASS
log.assertion_log('BEGIN_ASSERTION', None)
authorization = 'on'
relative_uris = self.relative_uris
rq_headers = self.request_headers()
header_key = 'Accept-Encoding'
header_value = 'gzip'
rq_headers[header_key] = header_value
for relative_uri in cached_uri:
json_payload, headers, status = self.http_cached_GET(relative_uris[relative_uri], rq_headers, authorization)
assertion_status_ = self.response_status_check(relative_uris[relative_uri], status, log, rf_utility.HTTP_NOTACCEPTABLE)
# manage assertion status
assertion_status = log.status_fixup(assertion_status,assertion_status_)
if assertion_status == log.FAIL and status == rf_utility.HTTP_OK:
log.assertion_log('line', '%s:%s is also an acceptable status' % (rf_utility.HTTP_OK, rf_utility.HTTP_status_string(rf_utility.HTTP_OK)) )
assertion_status = log.PASS
log.assertion_log(assertion_status, None)
return (assertion_status)
#
## end Assertion 6.1.13
###################################################################################################
# Name: Assertion_6_2_3(self, log)
# Description:
# A GET on the resource "/redfish" shall return the following body: json { "v1": "/redfish/v1/" }
###################################################################################################
def Assertion_6_2_3(self, log) :
log.AssertionID = '6.2.3'
assertion_status = log.PASS
log.assertion_log('BEGIN_ASSERTION', None)
authorization = 'on'
rq_headers = self.request_headers()
json_payload, headers, status = self.http_cached_GET(self.Redfish_URIs['Protocol_Version'], rq_headers, authorization)
assertion_status_ = self.response_status_check(self.Redfish_URIs['Protocol_Version'], status, log)
# manage assertion status
assertion_status = log.status_fixup(assertion_status,assertion_status_)
if assertion_status_ != log.PASS:
pass
elif not json_payload:
assertion_status_ = log.WARN
# manage assertion status
assertion_status = log.status_fixup(assertion_status,assertion_status_)
log.assertion_log('line', 'No response body returned for resource %s. This assertion for the resource could not be completed' % (self.Redfish_URIs['Protocol_Version']))
else:
# parse the json data returned and verify
key ='v1'
value = '/redfish/v1/'
if key not in json_payload:
assertion_status = log.FAIL
log.assertion_log('line', "~ unable to locate 'v1' key in JSON payload returned from GET %s" % (self.Redfish_URIs['Protocol_Version']))
elif (json_payload[key] != value) :
assertion_status = log.FAIL
log.assertion_log('line', "~ invalid value returned from GET %s. Expected %s" % (json_payload[key], self.Redfish_URIs['Protocol_Version'], value))
log.assertion_log(assertion_status, None)
return (assertion_status)
#
## end Assertion 6.2.3
###################################################################################################
# Name: Assertion 6_1_0(self,log)- HTTP_NOT_FOUND status added by Priyanka TTU
# Description:
# The following method check the URI status of 404 and is reported if found.
###################################################################################################
def Assertion_6_1_0(self,log) :
log.AssertionID = '6.1.0'
assertion_status = log.PASS
log.assertion_log('BEGIN_ASSERTION', None)
relative_uris = self.relative_uris
print('Checking all URIs')
authorization = 'on'
rq_headers = self.request_headers()
for relative_uri in cached_uri:
json_payload, headers, status = self.http_cached_GET(relative_uris[relative_uri], rq_headers, authorization)
'''if status == rf_utility.HTTP_OK:
print('HTTP OK - 200 for ', relative_uri )'''
if status == rf_utility.HTTP_NOT_FOUND:
print ('HTTP Not Found- 404 for ', relative_uri , json_payload)
log.assertion_log('TX_COMMENT',"WARN: GET %s failed : HTTP status %s:%s" % (relative_uris[relative_uri], status, rf_utility.HTTP_status_string(status)) )
continue
log.assertion_log(assertion_status, None)
return (assertion_status)
#
## end Assertion 6.1.0
######################################################################################################################################################
# Name: Assertion 6_1_1(self,log)
# Description:
# A Redfish interface shall be exposed through a web service endpoint implemented using Hypertext Transfer Protocols, version 1.1 (RFCL616).
######################################################################################################################################################
def Assertion_6_1_1(self,log) :
log.AssertionID = '6.1.1'
assertion_status = log.WARN
log.assertion_log('BEGIN_ASSERTION', None)
relative_uris = self.relative_uris
print('A web service endpoint is endpoint is the URL where your service can be accessed by a client application. It should always contain protocol://host:port/partOfThePath. In case of Redfish interface it shall be https ')
authorization = 'on'
rq_headers = self.request_headers()
print('This assertion is checked by accessing the all redfish root resources through HTTP and if this passes, this is passes, else failed')
json_payload, headers, status_1 = self.http_cached_GET(self.Redfish_URIs['Service_Root'], rq_headers, authorization)
json_payload, headers, status_2 = self.http_cached_GET(self.Redfish_URIs['Protocol_Version'], rq_headers, authorization)
json_payload, headers, status_3 = self.http_cached_GET(self.Redfish_URIs['Service_Odata_Doc'], rq_headers, authorization)
# Update the Accept header in the request since the metadata doc is XML
rq_headers['Accept'] = rf_utility.accept_type['xml']
json_payload, headers, status_4 = self.http_cached_GET(self.Redfish_URIs['Service_Metadata_Doc'], rq_headers, authorization)
print('The different status are %s, %s, %s, %s' %(status_1, status_2, status_3, status_4))
if ( status_1 == status_2 == status_3 == status_4 == 200) :
assertion_status = log.PASS
else:
print ('This is a fail')
assertion_status = log.FAIL
#log.assertion_log('TX_COMMENT',"WARN: GET failed : HTTP status %s:%s" %( assertion_status, rf_utility.HTTP_status_string(assertion_status)) )
log.assertion_log(assertion_status, None)
return (assertion_status)
#
## end Assertion 6.1.1
#########################################################################################################################################################################
# Name: Assertion 6_1_2(self,log)
# Description:
# Each unique instance of a resource shall be identified by a URI; thus a URI cannot reference multiple resources though it may reference a single collection resource.
#########################################################################################################################################################################
def Assertion_6_1_2(self,log) :
log.AssertionID = '6.1.2'
assertion_status = log.PASS
log.assertion_log('BEGIN_ASSERTION', None)
print('URI provides a simple and extensible means for identifying a resource and each unique instance of a resource is identified by a URI. It is defined in clause 5, Reference Resolution, of RFC3986. ')
authorization = 'on'
relative_uris = self.relative_uris
rq_headers = self.request_headers()
uri = set ()
uniq = []
for relative_uri in cached_uri:
json_payload, headers, status = self.http_cached_GET(relative_uris[relative_uri], rq_headers, authorization)
if status == rf_utility.HTTP_OK:
print('HTTP OK - 200 for ', relative_uri )
if relative_uri not in uri :
uniq.append(relative_uri)
uri.add(relative_uri)
continue
else :
assertion_status = log.FAIL
break
elif status == rf_utility.HTTP_NOT_FOUND:
print ('HTTP Not Found- 404 for ', relative_uri , json_payload)
log.assertion_log('TX_COMMENT',"WARN: GET %s failed : HTTP status %s:%s" % (relative_uris[relative_uri], status, rf_utility.HTTP_status_string(status)) )
continue
log.assertion_log(assertion_status, None)
return (assertion_status)
#
## end Assertion 6.1.2
###################################################################################################
# Name: Assertion_6_3_1(self, log)
# Description:
# The following Redfish-defined URIs shall be supported by a Redfish service and authorization
# should not be required to get the uri
# /redfish The URI that is used to return the protocol version
# /redfish/v1/ The URI for the Redfish Service Root
# /redfish/v1/odata The URI for the Redfish OData Service Document
# /redfish/v1/metadata The URI for the Redfish schema Document
###################################################################################################
def Assertion_6_3_1(self, log) :
log.AssertionID = '6.3.1'
assertion_status = log.PASS
log.assertion_log('BEGIN_ASSERTION', None)
# authorization should not be required for these GETs
authorization = 'off'
rq_headers = self.request_headers()
json_payload, headers, status = self.http_cached_GET(self.Redfish_URIs['Protocol_Version'] , rq_headers, authorization)
assertion_status_ = self.response_status_check(self.Redfish_URIs['Protocol_Version'], status, log)
# manage assertion status
assertion_status = log.status_fixup(assertion_status,assertion_status_)
if assertion_status == log.PASS:
log.assertion_log('line',"~ GET %s : HTTP status %s:%s" % (self.Redfish_URIs['Protocol_Version'], status, rf_utility.HTTP_status_string(status)) )
json_payload, headers, status = self.http_cached_GET(self.Redfish_URIs['Service_Root'], rq_headers, authorization)
assertion_status_ = self.response_status_check(self.Redfish_URIs['Service_Root'], status, log)
# manage assertion status
assertion_status = log.status_fixup(assertion_status,assertion_status_)
if assertion_status_ != log.PASS:
pass
else:
log.assertion_log('line',"~ GET %s : HTTP status %s:%s" % (self.Redfish_URIs['Service_Root'], status, rf_utility.HTTP_status_string(status)) )
if assertion_status == log.PASS and json_payload:
# dont need to do this?
oem_key = 'OEM'
# do a quick-check on the json payload -~ get the oem name from the root service payload
stat, oem_name = rf_utility.json_get_key_value(json_payload, oem_key)
if (stat != False):
#might be useful info...
self.SUT_OEM['key'] = oem_key
self.SUT_OEM['name'] = oem_name
json_payload, headers, status = self.http_cached_GET(self.Redfish_URIs['Service_Odata_Doc'], rq_headers, authorization)
assertion_status_ = self.response_status_check(self.Redfish_URIs['Service_Odata_Doc'], status, log)
# manage assertion status
assertion_status = log.status_fixup(assertion_status,assertion_status_)
if assertion_status == log.PASS:
log.assertion_log('line',"~ GET %s : HTTP status %s:%s" % (self.Redfish_URIs['Service_Odata_Doc'], status, rf_utility.HTTP_status_string(status)) )
rq_headers ['Accept'] = rf_utility.accept_type['xml']
json_payload, headers, status = self.http_cached_GET(self.Redfish_URIs['Service_Metadata_Doc'], rq_headers, authorization)
assertion_status_ = self.response_status_check(self.Redfish_URIs['Service_Metadata_Doc'], status, log)
# manage assertion status
assertion_status = log.status_fixup(assertion_status,assertion_status_)
if assertion_status == log.PASS:
log.assertion_log('line',"~ GET %s : HTTP status %s:%s" % (self.Redfish_URIs['Service_Metadata_Doc'], status, rf_utility.HTTP_status_string(status)) )
## log completion status
log.assertion_log(assertion_status, None)
return assertion_status
#
## end Assertion 6.3.1
###################################################################################################
# Name: Assertion_6_3_2(self, log)
# Description:
# the following URI without a trailing slash shall be either Redirected to the Associated
# Redfish-defined URI shown below or else shall be treated by the service as the equivalent URI
# to the associated Redfish-defined URI:
# /redfish/v1 /redfish/v1/
###################################################################################################
def Assertion_6_3_2(self, log) :
log.AssertionID = '6.3.2'
assertion_status = log.PASS
log.assertion_log('BEGIN_ASSERTION', None)
authorization = 'on'
rq_headers = self.request_headers()
root_redirect = [self.Redfish_URIs['Service_Root'][:-1]][0]
json_payload, headers, status = self.http_cached_GET(root_redirect, rq_headers, authorization)
assertion_status_ = self.response_status_check(root_redirect, status, log)
# manage assertion status
assertion_status = log.status_fixup(assertion_status,assertion_status_)
if assertion_status_ != log.PASS:
log.assertion_log('line', "~ Expected the service to treat %s equivalent to %s" % (root_redirect, self.Redfish_URIs['Service_Root']))
## log completion status
log.assertion_log(assertion_status, None)
return (assertion_status)
#
## end Assertion 6.3.2
###################################################################################################
# Name: Assertion_6_3_3(self, log) WIP unclear only tested in python 2.7.x
# Description:
# All relative URIs used by the service shall start with a double forward slash ("//") and include
# the authority (e.g. //mgmt.vendor.com/redfish/v1/Systems) or a single forward slash ("/") and
# include the absolute-path e.g:/redfish/v1/Systems
#
# Note: From https://www.w3.org/Protocols/rfcc2616/rfcc2616-sec5.html:
# one form is : GET http://www.w3.org/pub/WWW/TheProject.html HTTP/1.1
# antoher form is : GET /pub/WWW/TheProject.html HTTP/1.1
###################################################################################################
def Assertion_6_3_3(self, log) :
log.AssertionID = '6.3.3'
assertion_status = log.PASS
log.assertion_log('BEGIN_ASSERTION', None)
relative_uris = self.relative_uris
# default to https unless overridden via "UseHttp" in the SUT config
proto = "https"
if "UseHttp" in self.SUT_prop:
use_http = self.SUT_prop.get("UseHttp").lower()
if use_http in ["yes", "true", "on"]:
proto = "http"
authorization = 'on'
rq_headers = self.request_headers()
#1. single slash
# example: GET /pub/WWW/TheProject.html HTTP/1.1
# Host: www.w3.org
'''for relative_uri in cached_uri:
json_payload, headers, status = self.http_cached_GET(relative_uris[relative_uri], rq_headers, authorization)
assertion_status_ = self.response_status_check(relative_uris[relative_uri], status, log)
# manage assertion status
assertion_status = log.status_fixup(assertion_status,assertion_status_)'''
#TODO
# httplib has url form limitation, using urllib2 for python 2.7
# http://www.w3.org/pub/WWW/TheProject.html HTTP/1.1
# try path with //auth/path
url = proto + "://" + self.SUT_prop['DnsName'] + self.Redfish_URIs['Service_Root'] + 'Systems'
# Made changes in accessing the url and opening the url and reading - Priyanka.
json_payload, headers, status = self.http_cached_GET(url, rq_headers, authorization)
print('Status is %s' %status)
if status != 200:
assertion_status = log.FAIL
log.assertion_log(assertion_status, None)
return (assertion_status)
#
## end Assertion 6.3.3
###################################################################################################
# Name: Assertion_6_4_1(self, log)
# Description:
# Redfish Services shall understand and be able to process the headers if in the specification the
# value in the Required Column is set to "Yes" or "Conditional". It covers 6.4.2 as well- WIP Priyanka
###################################################################################################