13
13
14
14
from enum import Enum
15
15
from functools import reduce
16
- from typing import Callable , Dict , List , Tuple
16
+ from typing import Callable , Dict , List , Tuple , Type
17
17
from microschc .binary .buffer import Buffer , Padding
18
18
from microschc .parser import HeaderParser , ParserError
19
19
from microschc .protocol .compute import ComputeFunctionDependenciesType , ComputeFunctionType
20
- from microschc .rfc8724 import FieldDescriptor , HeaderDescriptor , RuleFieldDescriptor
20
+ from microschc .protocol .registry import ProtocolsIDs , REGISTER_PARSER , PARSERS
21
+ from microschc .rfc8724 import FieldDescriptor , HeaderDescriptor
21
22
22
- IPv6_HEADER_ID = 'IPv6'
23
+ IPV6_HEADER_ID = 'IPv6'
23
24
24
25
class IPv6Fields (str , Enum ):
25
- VERSION = f'{ IPv6_HEADER_ID } :Version'
26
- TRAFFIC_CLASS = f'{ IPv6_HEADER_ID } :Traffic Class'
27
- FLOW_LABEL = f'{ IPv6_HEADER_ID } :Flow Label'
28
- PAYLOAD_LENGTH = f'{ IPv6_HEADER_ID } :Payload Length'
29
- NEXT_HEADER = f'{ IPv6_HEADER_ID } :Next Header'
30
- HOP_LIMIT = f'{ IPv6_HEADER_ID } :Hop Limit'
31
- SRC_ADDRESS = f'{ IPv6_HEADER_ID } :Source Address'
32
- DST_ADDRESS = f'{ IPv6_HEADER_ID } :Destination Address'
33
-
26
+ VERSION = f'{ IPV6_HEADER_ID } :Version'
27
+ TRAFFIC_CLASS = f'{ IPV6_HEADER_ID } :Traffic Class'
28
+ FLOW_LABEL = f'{ IPV6_HEADER_ID } :Flow Label'
29
+ PAYLOAD_LENGTH = f'{ IPV6_HEADER_ID } :Payload Length'
30
+ NEXT_HEADER = f'{ IPV6_HEADER_ID } :Next Header'
31
+ HOP_LIMIT = f'{ IPV6_HEADER_ID } :Hop Limit'
32
+ SRC_ADDRESS = f'{ IPV6_HEADER_ID } :Source Address'
33
+ DST_ADDRESS = f'{ IPV6_HEADER_ID } :Destination Address'
34
+
35
+ IPV6_SUPPORTED_PAYLOAD_PROTOCOLS : List [ProtocolsIDs ] = [
36
+ ProtocolsIDs .UDP ,
37
+ ProtocolsIDs .SCTP
38
+ ]
34
39
35
40
class IPv6Parser (HeaderParser ):
36
41
37
- def __init__ (self ) -> None :
38
- super ().__init__ (name = IPv6_HEADER_ID )
42
+ def __init__ (self , predict_next :bool = False ) -> None :
43
+ super ().__init__ (name = IPV6_HEADER_ID , predict_next = predict_next )
44
+ self .predict_next : bool = predict_next
39
45
40
46
def parse (self , buffer :Buffer ) -> HeaderDescriptor :
41
47
"""
@@ -87,7 +93,7 @@ def parse(self, buffer:Buffer) -> HeaderDescriptor:
87
93
destination_address :Buffer = buffer [192 :320 ]
88
94
89
95
header_descriptor :HeaderDescriptor = HeaderDescriptor (
90
- id = IPv6_HEADER_ID ,
96
+ id = IPV6_HEADER_ID ,
91
97
length = 320 ,
92
98
fields = [
93
99
FieldDescriptor (id = IPv6Fields .VERSION , position = 0 , value = version ),
@@ -100,6 +106,15 @@ def parse(self, buffer:Buffer) -> HeaderDescriptor:
100
106
FieldDescriptor (id = IPv6Fields .DST_ADDRESS , position = 0 , value = destination_address )
101
107
]
102
108
)
109
+
110
+ if self .predict_next is True :
111
+ next_header_value : int = next_header .value (type = 'unsigned int' )
112
+ if next_header_value in IPV6_SUPPORTED_PAYLOAD_PROTOCOLS :
113
+ next_parser_class : Type [HeaderParser ] = PARSERS [next_header_value ]
114
+ next_parser : HeaderParser = next_parser_class (predict_next = True )
115
+ next_header_descriptor : HeaderDescriptor = next_parser .parse (buffer [320 :])
116
+ header_descriptor .fields .extend (next_header_descriptor .fields )
117
+ header_descriptor .length += next_header_descriptor .length
103
118
return header_descriptor
104
119
105
120
def _compute_payload_length ( decompressed_fields : List [Tuple [str , Buffer ]], rule_field_position :int ) -> Buffer :
@@ -117,3 +132,4 @@ def _compute_payload_length( decompressed_fields: List[Tuple[str, Buffer]], rule
117
132
IPv6Fields .PAYLOAD_LENGTH : (_compute_payload_length , {})
118
133
}
119
134
135
+ REGISTER_PARSER (protocol_id = ProtocolsIDs .IPV6 , parser_class = IPv6Parser )
0 commit comments