From b0298c7681647dfd10f0124df6e8f0d245cffc46 Mon Sep 17 00:00:00 2001 From: Quentin Lampin Date: Sun, 8 Dec 2024 11:45:11 +0100 Subject: [PATCH 01/10] [wip] SCTP common header, common chunk header and data chunk parsing --- microschc/protocol/sctp.py | 261 ++++++++++++++++++++++++++++++++++++ tests/protocol/test_sctp.py | 132 ++++++++++++++++++ 2 files changed, 393 insertions(+) create mode 100644 microschc/protocol/sctp.py create mode 100644 tests/protocol/test_sctp.py diff --git a/microschc/protocol/sctp.py b/microschc/protocol/sctp.py new file mode 100644 index 0000000..f860551 --- /dev/null +++ b/microschc/protocol/sctp.py @@ -0,0 +1,261 @@ +""" +SCTP header declarations + +Declarations for the SCTP protocol header as defined in RFC9260 [1]. + +Note 1: Hop by Hop Options, Routing header parsing is not implemented yet. +Note 2: Fragment header parsing is not implemented as fragmentation and reassembly + are handled by SCHC-RF. +Note 3: Authentication and Encapsulating Security payload parsing is not implemented yet. + +[1] "RFC9260: Stream Control Transmission Protocol, R. Stewart et al. +""" + + +from enum import Enum + +SCTP_HEADER_ID = 'SCTP' + +class SCTPFields(str, Enum): + SOURCE_PORT_NUMBER = f'{SCTP_HEADER_ID}:Source Port Number' + DESTINATION_PORT_NUMBER = f'{SCTP_HEADER_ID}:Destination Port Number' + VERIFICATION_TAG = f'{SCTP_HEADER_ID}:Verification Tag' + CHECKSUM = f'{SCTP_HEADER_ID}:Checksum' + CHUNK_TYPE = f'{SCTP_HEADER_ID}:Chunk Type' + CHUNK_FLAGS = f'{SCTP_HEADER_ID}:Chunk Flags' + CHUNK_LENGTH = f'{SCTP_HEADER_ID}:Chunk Length' + DATA_RES = f'{SCTP_HEADER_ID}:Data Res' + DATA_I = f'{SCTP_HEADER_ID}:Data I' + DATA_U = f'{SCTP_HEADER_ID}:Data U' + DATA_B = f'{SCTP_HEADER_ID}:Data B' + DATA_E = f'{SCTP_HEADER_ID}:Data E' + DATA_LENGTH = f'{SCTP_HEADER_ID}:Data Length' + DATA_TSN = f'{SCTP_HEADER_ID}:Data TSN' + DATA_STREAM_IDENTIFIER = f'{SCTP_HEADER_ID}:Data Stream Identifier S' + DATA_STREAM_SEQUENCE_NUMBER = f'{SCTP_HEADER_ID}:Data Stream Sequence Number n' + DATA_PAYLOAD_PROTOCOL_IDENTIFIER = f'{SCTP_HEADER_ID}:Data Payload Protocol Identifier' + INIT_CHUNK_FLAGS = f'{SCTP_HEADER_ID}:Chunk Flags' + INIT_CHUNK_LENGTH = f'{SCTP_HEADER_ID}:Chunk Length' + INIT_INITIATE_TAG = f'{SCTP_HEADER_ID}:Initiate Tag' + INIT_ADVERTISED_RECEIVER_WINDOW_CREDIT = f'{SCTP_HEADER_ID}:Advertised Receiver Window Credit' + INIT_NUMBER_OF_OUTBOUND_STREAMS = f'{SCTP_HEADER_ID}:Number of Outbound Streams' + INIT_NUMBER_OF_INBOUND_STREAMS = f'{SCTP_HEADER_ID}:Number of Inbound Streams' + INIT_INITIAL_TSN = f'{SCTP_HEADER_ID}:Initial TSN' + + INIT_ACK_CHUNK_FLAGS = f'{SCTP_HEADER_ID}:Chunk Flags' + INIT_ACK_CHUNK_LENGTH = f'{SCTP_HEADER_ID}:Chunk Length' + INIT_ACK_INITIATE_TAG = f'{SCTP_HEADER_ID}:Initiate Tag' + INIT_ACK_ADVERTISED_RECEIVER_WINDOW_CREDIT = f'{SCTP_HEADER_ID}:Advertised Receiver Window Credit' + INIT_ACK_NUMBER_OF_OUTBOUND_STREAMS = f'{SCTP_HEADER_ID}:Number of Outbound Streams' + INIT_ACK_NUMBER_OF_INBOUND_STREAMS = f'{SCTP_HEADER_ID}:Number of Inbound Streams' + INIT_ACK_INITIAL_TSN = f'{SCTP_HEADER_ID}:Initial TSN' + + +from enum import Enum +from typing import Dict, List, Tuple +from microschc.binary.buffer import Buffer +from microschc.parser import HeaderParser, ParserError +from microschc.rfc8724 import FieldDescriptor, HeaderDescriptor + +SCTP_HEADER_ID = 'SCTP' + +class SCTPFields(str, Enum): + SOURCE_PORT = f'{SCTP_HEADER_ID}:Source Port' + DESTINATION_PORT = f'{SCTP_HEADER_ID}:Destination Port' + VERIFICATION_TAG = f'{SCTP_HEADER_ID}:Verification Tag' + CHECKSUM = f'{SCTP_HEADER_ID}:Checksum' + CHUNK_TYPE = f'{SCTP_HEADER_ID}:Chunk Type' + CHUNK_FLAGS = f'{SCTP_HEADER_ID}:Chunk Flags' + CHUNK_LENGTH = f'{SCTP_HEADER_ID}:Chunk Length' + CHUNK_VALUE = f'{SCTP_HEADER_ID}:Chunk Value' + CHUNK_PADDING = f'{SCTP_HEADER_ID}:Chunk Padding' + + CHUNK_DATA_TSN = f'{SCTP_HEADER_ID}:Data TSN' + CHUNK_DATA_STREAM_IDENTIFIER = f'{SCTP_HEADER_ID}:Data Stream Identifier S' + CHUNK_DATA_STREAM_SEQUENCE_NUMBER = f'{SCTP_HEADER_ID}:Data Stream Sequence Number n' + CHUNK_DATA_PAYLOAD_PROTOCOL_IDENTIFIER = f'{SCTP_HEADER_ID}:Data Payload Protocol Identifier' + CHUNK_DATA_PAYLOAD = f'{SCTP_HEADER_ID}:Data Payload' + +class SCTPChunkTypes(int, Enum): + # ID Value Chunk Type + # ----- ---------- + # 0 - Payload Data (DATA) + # 1 - Initiation (INIT) + # 2 - Initiation Acknowledgement (INIT ACK) + # 3 - Selective Acknowledgement (SACK) + # 4 - Heartbeat Request (HEARTBEAT) + # 5 - Heartbeat Acknowledgement (HEARTBEAT ACK) + # 6 - Abort (ABORT) + # 7 - Shutdown (SHUTDOWN) + # 8 - Shutdown Acknowledgement (SHUTDOWN ACK) + # 9 - Operation Error (ERROR) + # 10 - State Cookie (COOKIE ECHO) + # 11 - Cookie Acknowledgement (COOKIE ACK) + # 12 - Reserved for Explicit Congestion Notification Echo (ECNE) + # 13 - Reserved for Congestion Window Reduced (CWR) + # 14 - Shutdown Complete (SHUTDOWN COMPLETE) + # 15 to 62 - reserved by IETF + # 63 - IETF-defined Chunk Extensions + # 64 to 126 - reserved by IETF + # 127 - IETF-defined Chunk Extensions + # 128 to 190 - reserved by IETF + # 191 - IETF-defined Chunk Extensions + # 192 to 254 - reserved by IETF + # 255 - IETF-defined Chunk Extensions + DATA = 0 + INIT = 1 + INIT_ACK = 2 + SACK = 3 + HEARTBEAT = 4 + HEARTBEAT_ACK = 5 + ABORT = 6 + SHUTDOWN = 7 + SHUTDOWN_ACK = 8 + ERROR = 9 + COOKIE_ECHO = 10 + COOKIE_ACK = 11 + ECNE = 12 + CWR = 13 + SHUTDOWN_COMPLETE = 14 + + + +class SCTPParser(HeaderParser): + def __init__(self) -> None: + super().__init__(name=SCTP_HEADER_ID) + + def match(self, buffer: Buffer) -> bool: + return buffer.length >= 12 * 8 # SCTP header is at least 12 bytes + + def parse(self, buffer: Buffer) -> HeaderDescriptor: + """ + 0 1 2 3 + 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | Source Port Number | Destination Port Number | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | Verification Tag | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | Checksum | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | | + | Chunk #1 | + | | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | | + | Chunk #2 | + | | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + """ + if buffer.length < 12 * 8: + raise ParserError(buffer=buffer, message=f'length too short: {buffer.length} < 96') + + # Source Port: 16 bits + source_port: Buffer = buffer[0:16] + # Destination Port: 16 bits + destination_port: Buffer = buffer[16:32] + # Verification Tag: 32 bits + verification_tag: Buffer = buffer[32:64] + # Checksum: 32 bits + checksum: Buffer = buffer[64:96] + + header_fields: List[FieldDescriptor] = [ + FieldDescriptor(id=SCTPFields.SOURCE_PORT, position=0, value=source_port), + FieldDescriptor(id=SCTPFields.DESTINATION_PORT, position=0, value=destination_port), + FieldDescriptor(id=SCTPFields.VERIFICATION_TAG, position=0, value=verification_tag), + FieldDescriptor(id=SCTPFields.CHECKSUM, position=0, value=checksum), + ] + + chunks: Buffer = buffer[96:] + + while chunks.length > 0: + chunks_fields, chunks_bits_consumed = self._parse_chunk(chunks) + chunks = chunks[chunks_bits_consumed:] + header_fields.extend(chunks_fields) + + + header_descriptor: HeaderDescriptor = HeaderDescriptor( + id=SCTP_HEADER_ID, + length=buffer.length, + fields=header_fields + ) + return header_descriptor + + + def _parse_chunk(self, buffer: Buffer) -> tuple[List[FieldDescriptor], int]: + fields: List[FieldDescriptor] = [] + + # Chunk Type: 8 bits + chunk_type: Buffer = buffer[0:8] + fields.append(FieldDescriptor(id=SCTPFields.CHUNK_TYPE, position=0, value=chunk_type)) + + # Chunk Flags: 8 bits + chunk_flags: Buffer = buffer[8:16] + fields.append(FieldDescriptor(id=SCTPFields.CHUNK_FLAGS, position=0, value=chunk_flags)) + + # Chunk Length: 16 bits + chunk_length: Buffer = buffer[16:32] + fields.append(FieldDescriptor(id=SCTPFields.CHUNK_LENGTH, position=0, value=chunk_length)) + + chunk_length_value: int = chunk_length.value(type='unsigned int') * 8 + + # Chunk Value: variable length + chunk_value_length = chunk_length_value - 32 # Length includes the 4 bytes of type, flags, and length + if chunk_value_length > 0: + chunk_type_value: int = chunk_type.value() + chunk_value: Buffer = buffer[32: 32 + chunk_value_length] + + if chunk_type_value == SCTPChunkTypes.DATA: + chunk_fields: List[FieldDescriptor] = self._parse_chunk_data(chunk_value) + fields.extend(chunk_fields) + else: + fields.append(FieldDescriptor(id=SCTPFields.CHUNK_VALUE, position=0, value=chunk_value)) + + chunk_padding_length: int = (32 - chunk_length_value%32)%32 + if chunk_padding_length > 0: + chunk_padding: Buffer = buffer[chunk_length_value: chunk_length_value+chunk_padding_length] + fields.append(FieldDescriptor(id=SCTPFields.CHUNK_PADDING, position=0, value=chunk_padding)) + + + bits_consumed = chunk_length_value + chunk_padding_length + + return fields, bits_consumed + + + def _parse_chunk_data(self, buffer: Buffer) -> List[FieldDescriptor]: + """ + 0 1 2 3 + 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | Type = 0 | Reserved|U|B|E| Length | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | TSN | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | Stream Identifier S | Stream Sequence Number n | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | Payload Protocol Identifier | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + \ \ + / User Data (seq n of Stream S) / + \ \ + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + """ + fields: List[FieldDescriptor] = [] + tsn: Buffer = buffer[0:32] + stream_identifier_s: Buffer = buffer[32:48] + stream_sequence_number_n: Buffer = buffer[48:64] + payload_protocol_identifier: Buffer = buffer[64:96] + user_data: Buffer = buffer[96:] + + fields.extend( + [ + FieldDescriptor(id=SCTPFields.CHUNK_DATA_TSN, value=tsn, position=0), + FieldDescriptor(id=SCTPFields.CHUNK_DATA_STREAM_IDENTIFIER, value=stream_identifier_s, position=0), + FieldDescriptor(id=SCTPFields.CHUNK_DATA_STREAM_SEQUENCE_NUMBER, value=stream_sequence_number_n, position=0), + FieldDescriptor(id=SCTPFields.CHUNK_DATA_PAYLOAD_PROTOCOL_IDENTIFIER, value=payload_protocol_identifier, position=0), + FieldDescriptor(id=SCTPFields.CHUNK_DATA_PAYLOAD, value=user_data, position=0) + ] + ) + return fields + + \ No newline at end of file diff --git a/tests/protocol/test_sctp.py b/tests/protocol/test_sctp.py new file mode 100644 index 0000000..4706ad7 --- /dev/null +++ b/tests/protocol/test_sctp.py @@ -0,0 +1,132 @@ +from typing import Dict, List, Tuple +from microschc.protocol.sctp import SCTPParser, SCTPFields +from microschc.parser.parser import HeaderDescriptor +from microschc.rfc8724 import FieldDescriptor +from microschc.binary.buffer import Buffer + +def test_sctp_parser_import(): + """test: SCTP header parser import and instanciation + The test instanciate an SCTP parser and checks for import errors + """ + parser = SCTPParser() + assert( isinstance(parser, SCTPParser) ) + +def test_sctp_parser_parse(): + """test: SCTP header parser parses SCTP Header + + The packet is made of a SCTP header with the following fields: + - id='Source Port Number' length=16 position=0 value=b'\x25\x0f' + - id='Destination Port Number' length=16 position=0 value=b'\x96\x0c' + - id='Verification Tag' length=32 position=0 value=b'\xc9\x59\x6d\xb9' + - id='Checksum' length=32 position=0 value=b'\x00\x00\x00\x00' + - id='Chunk Type' length=8 position=0 value=b'\x00' + - id='Chunk Flags' length=8 position=0 value=b'\x03' + - id='Chunk Length' length=16 position=0 value=b'\x00\x49' + - id='Data TSN' length=32 position=0 value=b'\xbc\x5b\xc0\x68' + - id='Data Stream Identifier S' length=16 position=0 value=b'\x00\x00' + - id='Data Stream Sequence Number n' length=16 position=0 value=b'\x00\x00' + - id='Data Payload Protocol Identifier' length=16 position=0 value=b'\x00\x00\x00\x3c' + - id='Data Payload' length=32 position=0 value=b'\x00\x15\x00\x35\x00\x00\x04\x00\x1b\x00\x08\x00\x02\xf8\x39\x10' + b'\x00\x01\x02\x00\x52\x40\x09\x03\x00\x66\x72\x65\x65\x35\x67\x63' + b'\x00\x66\x00\x10\x00\x00\x00\x00\x01\x00\x02\xf8\x39\x00\x00\x10' + b'\x08\x01\x02\x03\x00\x15\x40\x01\x40' + - id='Chunk Padding' length=32 position=0 value=b'\x00\x00\x00' + + """ + + valid_sctp_packet:bytes = bytes(b'\x25\x0f\x96\x0c\xc9\x59\x6d\xb9\x00\x00\x00\x00\x00\x03\x00\x49' + b'\xbc\x5b\xc0\x68\x00\x00\x00\x00\x00\x00\x00\x3c' + b'\x00\x15\x00\x35\x00\x00\x04\x00\x1b\x00\x08\x00\x02\xf8\x39\x10' + b'\x00\x01\x02\x00\x52\x40\x09\x03\x00\x66\x72\x65\x65\x35\x67\x63' + b'\x00\x66\x00\x10\x00\x00\x00\x00\x01\x00\x02\xf8\x39\x00\x00\x10' + b'\x08\x01\x02\x03\x00\x15\x40\x01\x40\x00\x00\x00' + ) + valid_sctp_packet_buffer: Buffer = Buffer(content=valid_sctp_packet, length=len(valid_sctp_packet)*8) + parser:SCTPParser = SCTPParser() + + sctp_header_descriptor: HeaderDescriptor = parser.parse(buffer=valid_sctp_packet_buffer) + + # test sctp_header_descriptor type + assert isinstance(sctp_header_descriptor, HeaderDescriptor) + + # test for sctp_header_descriptor.fields length + assert len(sctp_header_descriptor.fields) == 13 + + # test for sctp_header_descriptor.fields types + for field in sctp_header_descriptor.fields: + assert isinstance(field, FieldDescriptor) + + # assert field descriptors match SCTP header content + # - common header fields + source_port_fd:FieldDescriptor = sctp_header_descriptor.fields[0] + assert source_port_fd.id == SCTPFields.SOURCE_PORT + assert source_port_fd.position == 0 + assert source_port_fd.value == Buffer(content=b'\x25\x0f', length=16) + + destination_port_fd:FieldDescriptor = sctp_header_descriptor.fields[1] + assert destination_port_fd.id == SCTPFields.DESTINATION_PORT + assert destination_port_fd.position == 0 + assert destination_port_fd.value == Buffer(content=b'\x96\x0c', length=16) + + verification_tag_fd:FieldDescriptor = sctp_header_descriptor.fields[2] + assert verification_tag_fd.id == SCTPFields.VERIFICATION_TAG + assert verification_tag_fd.position == 0 + assert verification_tag_fd.value == Buffer(content=b'\xc9\x59\x6d\xb9', length=32) + + checksum_fd:FieldDescriptor = sctp_header_descriptor.fields[3] + assert checksum_fd.id == SCTPFields.CHECKSUM + assert checksum_fd.position == 0 + assert checksum_fd.value == Buffer(content=b'\x00\x00\x00\x00', length=32) + + + # - chunk common header fields + chunk_type_fd:FieldDescriptor = sctp_header_descriptor.fields[4] + assert chunk_type_fd.id == SCTPFields.CHUNK_TYPE + assert chunk_type_fd.position == 0 + assert chunk_type_fd.value == Buffer(content=b'\x00', length=8) + + chunk_flags_fd:FieldDescriptor = sctp_header_descriptor.fields[5] + assert chunk_flags_fd.id == SCTPFields.CHUNK_FLAGS + assert chunk_flags_fd.position == 0 + assert chunk_flags_fd.value == Buffer(content=b'\x03', length=8) + + chunk_length_fd:FieldDescriptor = sctp_header_descriptor.fields[6] + assert chunk_length_fd.id == SCTPFields.CHUNK_LENGTH + assert chunk_length_fd.position == 0 + assert chunk_length_fd.value == Buffer(content=b'\x00\x49', length=16) + + # - data chunk header fields + data_tsn_fd:FieldDescriptor = sctp_header_descriptor.fields[7] + assert data_tsn_fd.id == SCTPFields.CHUNK_DATA_TSN + assert data_tsn_fd.position == 0 + assert data_tsn_fd.value == Buffer(content=b'\xbc\x5b\xc0\x68', length=32) + + data_stream_identifier_fd:FieldDescriptor = sctp_header_descriptor.fields[8] + assert data_stream_identifier_fd.id == SCTPFields.CHUNK_DATA_STREAM_IDENTIFIER + assert data_stream_identifier_fd.position == 0 + assert data_stream_identifier_fd.value == Buffer(content=b'\x00\x00', length=16) + + data_stream_sequence_number_fd:FieldDescriptor = sctp_header_descriptor.fields[9] + assert data_stream_sequence_number_fd.id == SCTPFields.CHUNK_DATA_STREAM_SEQUENCE_NUMBER + assert data_stream_sequence_number_fd.position == 0 + assert data_stream_sequence_number_fd.value == Buffer(content=b'\x00\x00', length=16) + + data_payload_protocol_identifier_fd:FieldDescriptor = sctp_header_descriptor.fields[10] + assert data_payload_protocol_identifier_fd.id == SCTPFields.CHUNK_DATA_PAYLOAD_PROTOCOL_IDENTIFIER + assert data_payload_protocol_identifier_fd.position == 0 + assert data_payload_protocol_identifier_fd.value == Buffer(content=b'\x00\x00\x00\x3c', length=32) + + + data_payload_fd:FieldDescriptor = sctp_header_descriptor.fields[11] + assert data_payload_fd.id == SCTPFields.CHUNK_DATA_PAYLOAD + assert data_payload_fd.position == 0 + assert data_payload_fd.value == Buffer(content= b'\x00\x15\x00\x35\x00\x00\x04\x00\x1b\x00\x08\x00\x02\xf8\x39\x10' + b'\x00\x01\x02\x00\x52\x40\x09\x03\x00\x66\x72\x65\x65\x35\x67\x63' + b'\x00\x66\x00\x10\x00\x00\x00\x00\x01\x00\x02\xf8\x39\x00\x00\x10' + b'\x08\x01\x02\x03\x00\x15\x40\x01\x40', + length=456) + + chunk_padding_fd:FieldDescriptor = sctp_header_descriptor.fields[12] + assert chunk_padding_fd.id == SCTPFields.CHUNK_PADDING + assert chunk_padding_fd.position == 0 + assert chunk_padding_fd.value == Buffer(content=b'\x00\x00\x00', length=24) \ No newline at end of file From 89971588ad5fa942e99c9b7e4b6b9fe40c14d391 Mon Sep 17 00:00:00 2001 From: Quentin Lampin Date: Sun, 8 Dec 2024 11:53:17 +0100 Subject: [PATCH 02/10] [bugfix] typing of _parse_chunk fixed --- microschc/protocol/sctp.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/microschc/protocol/sctp.py b/microschc/protocol/sctp.py index f860551..aafb94c 100644 --- a/microschc/protocol/sctp.py +++ b/microschc/protocol/sctp.py @@ -182,7 +182,7 @@ def parse(self, buffer: Buffer) -> HeaderDescriptor: return header_descriptor - def _parse_chunk(self, buffer: Buffer) -> tuple[List[FieldDescriptor], int]: + def _parse_chunk(self, buffer: Buffer) -> Tuple[List[FieldDescriptor], int]: fields: List[FieldDescriptor] = [] # Chunk Type: 8 bits From 6827423007cbab1c911090fc5e5a88a21471b197 Mon Sep 17 00:00:00 2001 From: Quentin Lampin Date: Sun, 8 Dec 2024 22:30:18 +0100 Subject: [PATCH 03/10] [wip] SCTP init chunk parsing and parameters --- microschc/protocol/sctp.py | 101 +++++++++++++++++++++++-- tests/protocol/test_sctp.py | 143 +++++++++++++++++++++++++++++++++++- 2 files changed, 236 insertions(+), 8 deletions(-) diff --git a/microschc/protocol/sctp.py b/microschc/protocol/sctp.py index aafb94c..270c496 100644 --- a/microschc/protocol/sctp.py +++ b/microschc/protocol/sctp.py @@ -3,11 +3,6 @@ Declarations for the SCTP protocol header as defined in RFC9260 [1]. -Note 1: Hop by Hop Options, Routing header parsing is not implemented yet. -Note 2: Fragment header parsing is not implemented as fragmentation and reassembly - are handled by SCHC-RF. -Note 3: Authentication and Encapsulating Security payload parsing is not implemented yet. - [1] "RFC9260: Stream Control Transmission Protocol, R. Stewart et al. """ @@ -76,6 +71,19 @@ class SCTPFields(str, Enum): CHUNK_DATA_PAYLOAD_PROTOCOL_IDENTIFIER = f'{SCTP_HEADER_ID}:Data Payload Protocol Identifier' CHUNK_DATA_PAYLOAD = f'{SCTP_HEADER_ID}:Data Payload' + CHUNK_INIT_INITIATE_TAG = f'{SCTP_HEADER_ID}:Init Initiate Tag' + CHUNK_INIT_ADVERTISED_RECEIVER_WINDOW_CREDIT = f'{SCTP_HEADER_ID}:Init Advertised Receiver Window Credit' + CHUNK_INIT_NUMBER_OF_OUTBOUND_STREAMS = f'{SCTP_HEADER_ID}:Init Number of Outbound Streams' + CHUNK_INIT_NUMBER_OF_INBOUND_STREAMS = f'{SCTP_HEADER_ID}:Init Number of Inbound Streams' + CHUNK_INIT_INITIAL_TSN = f'{SCTP_HEADER_ID}:Init Initial TSN' + + PARAMETER_TYPE = f'{SCTP_HEADER_ID}:Parameter Type' + PARAMETER_LENGTH = f'{SCTP_HEADER_ID}:Parameter Length' + PARAMETER_VALUE = f'{SCTP_HEADER_ID}:Parameter Value' + PARAMETER_PADDING = f'{SCTP_HEADER_ID}:Parameter Padding' + + + class SCTPChunkTypes(int, Enum): # ID Value Chunk Type # ----- ---------- @@ -208,6 +216,9 @@ def _parse_chunk(self, buffer: Buffer) -> Tuple[List[FieldDescriptor], int]: if chunk_type_value == SCTPChunkTypes.DATA: chunk_fields: List[FieldDescriptor] = self._parse_chunk_data(chunk_value) fields.extend(chunk_fields) + elif chunk_type_value == SCTPChunkTypes.INIT: + chunk_fields: List[FieldDescriptor] = self._parse_chunk_init(chunk_value) + fields.extend(chunk_fields) else: fields.append(FieldDescriptor(id=SCTPFields.CHUNK_VALUE, position=0, value=chunk_value)) @@ -257,5 +268,83 @@ def _parse_chunk_data(self, buffer: Buffer) -> List[FieldDescriptor]: ] ) return fields + + def _parse_chunk_init(self, buffer: Buffer) -> List[FieldDescriptor]: + """ + 0 1 2 3 + 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | Type = 1 | Chunk Flags | Chunk Length | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | Initiate Tag | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | Advertised Receiver Window Credit (a_rwnd) | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | Number of Outbound Streams | Number of Inbound Streams | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | Initial TSN | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + \ \ + / Optional/Variable-Length Parameters / + \ \ + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + """ + fields: List[FieldDescriptor] = [] + initiate_tag: Buffer = buffer[0:32] + advertised_receiver_window_credit: Buffer = buffer[32:64] + number_outbound_streams: Buffer = buffer[64:80] + number_inbound_streams: Buffer = buffer[80:96] + initial_tsn: Buffer = buffer[96:128] + + fields.extend([ + FieldDescriptor(id=SCTPFields.CHUNK_INIT_INITIATE_TAG, value=initiate_tag, position=0), + FieldDescriptor(id=SCTPFields.CHUNK_INIT_ADVERTISED_RECEIVER_WINDOW_CREDIT, value=advertised_receiver_window_credit, position=0), + FieldDescriptor(id=SCTPFields.CHUNK_INIT_NUMBER_OF_OUTBOUND_STREAMS, value=number_outbound_streams, position=0), + FieldDescriptor(id=SCTPFields.CHUNK_INIT_NUMBER_OF_INBOUND_STREAMS, value=number_inbound_streams, position=0), + FieldDescriptor(id=SCTPFields.CHUNK_INIT_INITIAL_TSN, value=initial_tsn, position=0) + ]) + + parameters = buffer[128:] + while parameters.length > 0: + parameter_fields, bits_consumed = self._parse_parameter(parameters) + fields.extend(parameter_fields) + parameters = parameters[bits_consumed:] - \ No newline at end of file + return fields + + def _parse_parameter(self, buffer: Buffer) -> Tuple[List[FieldDescriptor], int]: + """ + 0 1 2 3 + 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | Parameter Type | Parameter Length | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + \ \ + / Parameter Value / + \ \ + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + """ + fields: List[FieldDescriptor] = [] + parameter_type: Buffer = buffer[0:16] + parameter_length: Buffer = buffer[16:32] + + fields.extend([ + FieldDescriptor(id=SCTPFields.PARAMETER_TYPE, value=parameter_type, position=0), + FieldDescriptor(id=SCTPFields.PARAMETER_LENGTH, value=parameter_length, position=0), + ]) + + parameter_length_value: int = parameter_length.value() * 8 + parameter_value_length: int = parameter_length_value - 32 + if parameter_value_length > 0: + parameter_value: Buffer = buffer[32: parameter_length_value] + fields.append(FieldDescriptor(id=SCTPFields.PARAMETER_VALUE, value=parameter_value, position=0)) + + parameter_padding_length: int = (32 - parameter_value_length%32)%32 + if parameter_padding_length > 0: + parameter_padding: Buffer = buffer[parameter_length_value: parameter_length_value + parameter_padding_length] + fields.append( + FieldDescriptor(id=SCTPFields.PARAMETER_PADDING, value=parameter_padding, position=0) + ) + return fields, parameter_length_value + parameter_padding_length + + \ No newline at end of file diff --git a/tests/protocol/test_sctp.py b/tests/protocol/test_sctp.py index 4706ad7..9fe729f 100644 --- a/tests/protocol/test_sctp.py +++ b/tests/protocol/test_sctp.py @@ -11,7 +11,7 @@ def test_sctp_parser_import(): parser = SCTPParser() assert( isinstance(parser, SCTPParser) ) -def test_sctp_parser_parse(): +def test_sctp_parser_parse_data(): """test: SCTP header parser parses SCTP Header The packet is made of a SCTP header with the following fields: @@ -129,4 +129,143 @@ def test_sctp_parser_parse(): chunk_padding_fd:FieldDescriptor = sctp_header_descriptor.fields[12] assert chunk_padding_fd.id == SCTPFields.CHUNK_PADDING assert chunk_padding_fd.position == 0 - assert chunk_padding_fd.value == Buffer(content=b'\x00\x00\x00', length=24) \ No newline at end of file + assert chunk_padding_fd.value == Buffer(content=b'\x00\x00\x00', length=24) + + +def test_sctp_parser_parse_init(): + """test: SCTP header parser parses SCTP Header + + The packet is made of a SCTP header with the following fields: + - id='Source Port Number' length=16 position=0 value=b'\x00\x07' + - id='Destination Port Number' length=16 position=0 value=b'\x00\x07' + - id='Verification Tag' length=32 position=0 value=b'\x00\x00\x00\x00' + - id='Checksum' length=32 position=0 value=b'\x37\x61\xa7\x46' + - id='Chunk Type' length=8 position=0 value=b'\x01' + - id='Chunk Flags' length=8 position=0 value=b'\x00' + - id='Chunk Length' length=16 position=0 value=b'\x00\x20' + - id='Initiate Tag length=32 position=0 value=b'\x43\x23\x25\x44' + - id='Advertised Receiver Window Credit' length=32 position=0 value=b'\x00\x00\xff\xff' + - id='Number of Outbound Streams' length=16 position=0 value=b'\x00\x11' + - id='Number of Inbound Streams' length=16 position=0 value=b'\x00\x11' + - id='Initial TSN length=32 position=0 value=b'\x5c\xfe\x37\x9f' + - id='Parameter Type' length=16 position=0 value=b'\xc0\x00' + - id='Parameter Length' length=16 position=0 value=b'\x00\x04' + - id='Parameter Type' length=16 position=0 value=b'\x00\x0c' + - id='Parameter Length' length=16 position=0 value=b'\x00\x06' + - id='Parameter Value' length=16 position=0 value=b'\x00\x05' + - id='Parameter Padding' length=16 position=0 value=b'\x00\x00' + + """ + + valid_sctp_packet:bytes = bytes(b'\x00\x07\x00\x07\x00\x00\x00\x00\x37\x61\xa7\x46\x01\x00\x00\x20' + b'\x43\x23\x25\x44\x00\x00\xff\xff\x00\x11\x00\x11\x5c\xfe\x37\x9f' + b'\xc0\x00\x00\x04\x00\x0c\x00\x06\x00\x05\x00\x00' + ) + valid_sctp_packet_buffer: Buffer = Buffer(content=valid_sctp_packet, length=len(valid_sctp_packet)*8) + parser:SCTPParser = SCTPParser() + + sctp_header_descriptor: HeaderDescriptor = parser.parse(buffer=valid_sctp_packet_buffer) + + # test sctp_header_descriptor type + assert isinstance(sctp_header_descriptor, HeaderDescriptor) + + # test for sctp_header_descriptor.fields length + assert len(sctp_header_descriptor.fields) == 18 + + # test for sctp_header_descriptor.fields types + for field in sctp_header_descriptor.fields: + assert isinstance(field, FieldDescriptor) + + # assert field descriptors match SCTP header content + # - common header fields + source_port_fd:FieldDescriptor = sctp_header_descriptor.fields[0] + assert source_port_fd.id == SCTPFields.SOURCE_PORT + assert source_port_fd.position == 0 + assert source_port_fd.value == Buffer(content=b'\x00\x07', length=16) + + destination_port_fd:FieldDescriptor = sctp_header_descriptor.fields[1] + assert destination_port_fd.id == SCTPFields.DESTINATION_PORT + assert destination_port_fd.position == 0 + assert destination_port_fd.value == Buffer(content=b'\x00\x07', length=16) + + verification_tag_fd:FieldDescriptor = sctp_header_descriptor.fields[2] + assert verification_tag_fd.id == SCTPFields.VERIFICATION_TAG + assert verification_tag_fd.position == 0 + assert verification_tag_fd.value == Buffer(content=b'\x00\x00\x00\x00', length=32) + + checksum_fd:FieldDescriptor = sctp_header_descriptor.fields[3] + assert checksum_fd.id == SCTPFields.CHECKSUM + assert checksum_fd.position == 0 + assert checksum_fd.value == Buffer(content=b'\x37\x61\xa7\x46', length=32) + + + # - chunk common header fields + chunk_type_fd:FieldDescriptor = sctp_header_descriptor.fields[4] + assert chunk_type_fd.id == SCTPFields.CHUNK_TYPE + assert chunk_type_fd.position == 0 + assert chunk_type_fd.value == Buffer(content=b'\x01', length=8) + + chunk_flags_fd:FieldDescriptor = sctp_header_descriptor.fields[5] + assert chunk_flags_fd.id == SCTPFields.CHUNK_FLAGS + assert chunk_flags_fd.position == 0 + assert chunk_flags_fd.value == Buffer(content=b'\x00', length=8) + + chunk_length_fd:FieldDescriptor = sctp_header_descriptor.fields[6] + assert chunk_length_fd.id == SCTPFields.CHUNK_LENGTH + assert chunk_length_fd.position == 0 + assert chunk_length_fd.value == Buffer(content=b'\x00\x20', length=16) + + initiate_tag_fd:FieldDescriptor = sctp_header_descriptor.fields[7] + assert initiate_tag_fd.id == SCTPFields.CHUNK_INIT_INITIATE_TAG + assert initiate_tag_fd.position == 0 + assert initiate_tag_fd.value == Buffer(content=b'\x43\x23\x25\x44', length=32) + + advertised_receiver_window_credit_fd:FieldDescriptor = sctp_header_descriptor.fields[8] + assert advertised_receiver_window_credit_fd.id == SCTPFields.CHUNK_INIT_ADVERTISED_RECEIVER_WINDOW_CREDIT + assert advertised_receiver_window_credit_fd.position == 0 + assert advertised_receiver_window_credit_fd.value == Buffer(content=b'\x00\x00\xff\xff', length=32) + + number_outbound_streams_fd:FieldDescriptor = sctp_header_descriptor.fields[9] + assert number_outbound_streams_fd.id == SCTPFields.CHUNK_INIT_NUMBER_OF_OUTBOUND_STREAMS + assert number_outbound_streams_fd.position == 0 + assert number_outbound_streams_fd.value == Buffer(content=b'\x00\x11', length=16) + + number_inbound_streams_fd:FieldDescriptor = sctp_header_descriptor.fields[10] + assert number_inbound_streams_fd.id == SCTPFields.CHUNK_INIT_NUMBER_OF_INBOUND_STREAMS + assert number_inbound_streams_fd.position == 0 + assert number_inbound_streams_fd.value == Buffer(content=b'\x00\x11', length=16) + + initial_tsn_fd:FieldDescriptor = sctp_header_descriptor.fields[11] + assert initial_tsn_fd.id == SCTPFields.CHUNK_INIT_INITIAL_TSN + assert initial_tsn_fd.position == 0 + assert initial_tsn_fd.value == Buffer(content=b'\x5c\xfe\x37\x9f', length=32) + + parameter_type_fd:FieldDescriptor = sctp_header_descriptor.fields[12] + assert parameter_type_fd.id == SCTPFields.PARAMETER_TYPE + assert parameter_type_fd.position == 0 + assert parameter_type_fd.value == Buffer(content=b'\xc0\x00', length=16) + + parameter_length_fd:FieldDescriptor = sctp_header_descriptor.fields[13] + assert parameter_length_fd.id == SCTPFields.PARAMETER_LENGTH + assert parameter_length_fd.position == 0 + assert parameter_length_fd.value == Buffer(content=b'\x00\x04', length=16) + + parameter_type_fd:FieldDescriptor = sctp_header_descriptor.fields[14] + assert parameter_type_fd.id == SCTPFields.PARAMETER_TYPE + assert parameter_type_fd.position == 0 + assert parameter_type_fd.value == Buffer(content=b'\x00\x0c', length=16) + + parameter_length_fd:FieldDescriptor = sctp_header_descriptor.fields[15] + assert parameter_length_fd.id == SCTPFields.PARAMETER_LENGTH + assert parameter_length_fd.position == 0 + assert parameter_length_fd.value == Buffer(content=b'\x00\x06', length=16) + + parameter_value_fd:FieldDescriptor = sctp_header_descriptor.fields[16] + assert parameter_value_fd.id == SCTPFields.PARAMETER_VALUE + assert parameter_value_fd.position == 0 + assert parameter_value_fd.value == Buffer(content=b'\x00\x05', length=16) + + parameter_padding_fd:FieldDescriptor = sctp_header_descriptor.fields[17] + assert parameter_padding_fd.id == SCTPFields.PARAMETER_PADDING + assert parameter_padding_fd.position == 0 + assert parameter_padding_fd.value == Buffer(content=b'\x00\x00', length=16) \ No newline at end of file From a98e2c41f7a41426ac94d1bd9d72fc901580d1a6 Mon Sep 17 00:00:00 2001 From: Quentin Lampin Date: Sun, 8 Dec 2024 22:57:05 +0100 Subject: [PATCH 04/10] [wip] SCTP init ack chunk parsing --- microschc/protocol/sctp.py | 104 +++++++++++++++++------ tests/protocol/test_sctp.py | 163 +++++++++++++++++++++++++++++++++++- 2 files changed, 238 insertions(+), 29 deletions(-) diff --git a/microschc/protocol/sctp.py b/microschc/protocol/sctp.py index 270c496..ec58226 100644 --- a/microschc/protocol/sctp.py +++ b/microschc/protocol/sctp.py @@ -55,32 +55,38 @@ class SCTPFields(str, Enum): SCTP_HEADER_ID = 'SCTP' class SCTPFields(str, Enum): - SOURCE_PORT = f'{SCTP_HEADER_ID}:Source Port' - DESTINATION_PORT = f'{SCTP_HEADER_ID}:Destination Port' - VERIFICATION_TAG = f'{SCTP_HEADER_ID}:Verification Tag' - CHECKSUM = f'{SCTP_HEADER_ID}:Checksum' - CHUNK_TYPE = f'{SCTP_HEADER_ID}:Chunk Type' - CHUNK_FLAGS = f'{SCTP_HEADER_ID}:Chunk Flags' - CHUNK_LENGTH = f'{SCTP_HEADER_ID}:Chunk Length' - CHUNK_VALUE = f'{SCTP_HEADER_ID}:Chunk Value' - CHUNK_PADDING = f'{SCTP_HEADER_ID}:Chunk Padding' - - CHUNK_DATA_TSN = f'{SCTP_HEADER_ID}:Data TSN' - CHUNK_DATA_STREAM_IDENTIFIER = f'{SCTP_HEADER_ID}:Data Stream Identifier S' - CHUNK_DATA_STREAM_SEQUENCE_NUMBER = f'{SCTP_HEADER_ID}:Data Stream Sequence Number n' - CHUNK_DATA_PAYLOAD_PROTOCOL_IDENTIFIER = f'{SCTP_HEADER_ID}:Data Payload Protocol Identifier' - CHUNK_DATA_PAYLOAD = f'{SCTP_HEADER_ID}:Data Payload' - - CHUNK_INIT_INITIATE_TAG = f'{SCTP_HEADER_ID}:Init Initiate Tag' - CHUNK_INIT_ADVERTISED_RECEIVER_WINDOW_CREDIT = f'{SCTP_HEADER_ID}:Init Advertised Receiver Window Credit' - CHUNK_INIT_NUMBER_OF_OUTBOUND_STREAMS = f'{SCTP_HEADER_ID}:Init Number of Outbound Streams' - CHUNK_INIT_NUMBER_OF_INBOUND_STREAMS = f'{SCTP_HEADER_ID}:Init Number of Inbound Streams' - CHUNK_INIT_INITIAL_TSN = f'{SCTP_HEADER_ID}:Init Initial TSN' - - PARAMETER_TYPE = f'{SCTP_HEADER_ID}:Parameter Type' - PARAMETER_LENGTH = f'{SCTP_HEADER_ID}:Parameter Length' - PARAMETER_VALUE = f'{SCTP_HEADER_ID}:Parameter Value' - PARAMETER_PADDING = f'{SCTP_HEADER_ID}:Parameter Padding' + SOURCE_PORT = f'{SCTP_HEADER_ID}:Source Port' + DESTINATION_PORT = f'{SCTP_HEADER_ID}:Destination Port' + VERIFICATION_TAG = f'{SCTP_HEADER_ID}:Verification Tag' + CHECKSUM = f'{SCTP_HEADER_ID}:Checksum' + CHUNK_TYPE = f'{SCTP_HEADER_ID}:Chunk Type' + CHUNK_FLAGS = f'{SCTP_HEADER_ID}:Chunk Flags' + CHUNK_LENGTH = f'{SCTP_HEADER_ID}:Chunk Length' + CHUNK_VALUE = f'{SCTP_HEADER_ID}:Chunk Value' + CHUNK_PADDING = f'{SCTP_HEADER_ID}:Chunk Padding' + + CHUNK_DATA_TSN = f'{SCTP_HEADER_ID}:Data TSN' + CHUNK_DATA_STREAM_IDENTIFIER = f'{SCTP_HEADER_ID}:Data Stream Identifier S' + CHUNK_DATA_STREAM_SEQUENCE_NUMBER = f'{SCTP_HEADER_ID}:Data Stream Sequence Number n' + CHUNK_DATA_PAYLOAD_PROTOCOL_IDENTIFIER = f'{SCTP_HEADER_ID}:Data Payload Protocol Identifier' + CHUNK_DATA_PAYLOAD = f'{SCTP_HEADER_ID}:Data Payload' + + CHUNK_INIT_INITIATE_TAG = f'{SCTP_HEADER_ID}:Init Initiate Tag' + CHUNK_INIT_ADVERTISED_RECEIVER_WINDOW_CREDIT = f'{SCTP_HEADER_ID}:Init Advertised Receiver Window Credit' + CHUNK_INIT_NUMBER_OF_OUTBOUND_STREAMS = f'{SCTP_HEADER_ID}:Init Number of Outbound Streams' + CHUNK_INIT_NUMBER_OF_INBOUND_STREAMS = f'{SCTP_HEADER_ID}:Init Number of Inbound Streams' + CHUNK_INIT_INITIAL_TSN = f'{SCTP_HEADER_ID}:Init Initial TSN' + + CHUNK_INIT_ACK_INITIATE_TAG = f'{SCTP_HEADER_ID}:Init Ack Initiate Tag' + CHUNK_INIT_ACK_ADVERTISED_RECEIVER_WINDOW_CREDIT = f'{SCTP_HEADER_ID}:Init Ack Advertised Receiver Window Credit' + CHUNK_INIT_ACK_NUMBER_OF_OUTBOUND_STREAMS = f'{SCTP_HEADER_ID}:Init Ack Number of Outbound Streams' + CHUNK_INIT_ACK_NUMBER_OF_INBOUND_STREAMS = f'{SCTP_HEADER_ID}:Init Ack Number of Inbound Streams' + CHUNK_INIT_ACK_INITIAL_TSN = f'{SCTP_HEADER_ID}:Init Ack Initial TSN' + + PARAMETER_TYPE = f'{SCTP_HEADER_ID}:Parameter Type' + PARAMETER_LENGTH = f'{SCTP_HEADER_ID}:Parameter Length' + PARAMETER_VALUE = f'{SCTP_HEADER_ID}:Parameter Value' + PARAMETER_PADDING = f'{SCTP_HEADER_ID}:Parameter Padding' @@ -219,6 +225,9 @@ def _parse_chunk(self, buffer: Buffer) -> Tuple[List[FieldDescriptor], int]: elif chunk_type_value == SCTPChunkTypes.INIT: chunk_fields: List[FieldDescriptor] = self._parse_chunk_init(chunk_value) fields.extend(chunk_fields) + elif chunk_type_value == SCTPChunkTypes.INIT_ACK: + chunk_fields: List[FieldDescriptor] = self._parse_chunk_init_ack(chunk_value) + fields.extend(chunk_fields) else: fields.append(FieldDescriptor(id=SCTPFields.CHUNK_VALUE, position=0, value=chunk_value)) @@ -311,6 +320,49 @@ def _parse_chunk_init(self, buffer: Buffer) -> List[FieldDescriptor]: parameters = parameters[bits_consumed:] return fields + + def _parse_chunk_init_ack(self, buffer: Buffer) -> List[FieldDescriptor]: + """ + 0 1 2 3 + 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | Type = 2 | Chunk Flags | Chunk Length | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | Initiate Tag | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | Advertised Receiver Window Credit (a_rwnd) | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | Number of Outbound Streams | Number of Inbound Streams | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | Initial TSN | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + \ \ + / Optional/Variable-Length Parameters / + \ \ + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + """ + fields: List[FieldDescriptor] = [] + initiate_tag: Buffer = buffer[0:32] + advertised_receiver_window_credit: Buffer = buffer[32:64] + number_outbound_streams: Buffer = buffer[64:80] + number_inbound_streams: Buffer = buffer[80:96] + initial_tsn: Buffer = buffer[96:128] + + fields.extend([ + FieldDescriptor(id=SCTPFields.CHUNK_INIT_ACK_INITIATE_TAG, value=initiate_tag, position=0), + FieldDescriptor(id=SCTPFields.CHUNK_INIT_ACK_ADVERTISED_RECEIVER_WINDOW_CREDIT, value=advertised_receiver_window_credit, position=0), + FieldDescriptor(id=SCTPFields.CHUNK_INIT_ACK_NUMBER_OF_OUTBOUND_STREAMS, value=number_outbound_streams, position=0), + FieldDescriptor(id=SCTPFields.CHUNK_INIT_ACK_NUMBER_OF_INBOUND_STREAMS, value=number_inbound_streams, position=0), + FieldDescriptor(id=SCTPFields.CHUNK_INIT_ACK_INITIAL_TSN, value=initial_tsn, position=0) + ]) + + parameters = buffer[128:] + while parameters.length > 0: + parameter_fields, bits_consumed = self._parse_parameter(parameters) + fields.extend(parameter_fields) + parameters = parameters[bits_consumed:] + + return fields def _parse_parameter(self, buffer: Buffer) -> Tuple[List[FieldDescriptor], int]: """ diff --git a/tests/protocol/test_sctp.py b/tests/protocol/test_sctp.py index 9fe729f..5484aa0 100644 --- a/tests/protocol/test_sctp.py +++ b/tests/protocol/test_sctp.py @@ -12,7 +12,7 @@ def test_sctp_parser_import(): assert( isinstance(parser, SCTPParser) ) def test_sctp_parser_parse_data(): - """test: SCTP header parser parses SCTP Header + """test: SCTP header parser parses SCTP Header with DATA chunk The packet is made of a SCTP header with the following fields: - id='Source Port Number' length=16 position=0 value=b'\x25\x0f' @@ -133,7 +133,7 @@ def test_sctp_parser_parse_data(): def test_sctp_parser_parse_init(): - """test: SCTP header parser parses SCTP Header + """test: SCTP header parser parses SCTP Header with INIT chunk The packet is made of a SCTP header with the following fields: - id='Source Port Number' length=16 position=0 value=b'\x00\x07' @@ -268,4 +268,161 @@ def test_sctp_parser_parse_init(): parameter_padding_fd:FieldDescriptor = sctp_header_descriptor.fields[17] assert parameter_padding_fd.id == SCTPFields.PARAMETER_PADDING assert parameter_padding_fd.position == 0 - assert parameter_padding_fd.value == Buffer(content=b'\x00\x00', length=16) \ No newline at end of file + assert parameter_padding_fd.value == Buffer(content=b'\x00\x00', length=16) + + + + + +def test_sctp_parser_parse_init_ack(): + """test: SCTP header parser parses SCTP Header with INIT_ACK chunk + + The packet is made of a SCTP header with the following fields: + - id='Source Port Number' length=16 position=0 value=b'\x00\x07' + - id='Destination Port Number' length=16 position=0 value=b'\x00\x07' + - id='Verification Tag' length=32 position=0 value=b'\x43\x23\x25\x44' + - id='Checksum' length=32 position=0 value=b'\xc9\x01\x85\x24' + - id='Chunk Type' length=8 position=0 value=b'\x02' + - id='Chunk Flags' length=8 position=0 value=b'\x00' + - id='Chunk Length' length=16 position=0 value=b'\x00\x80' + - id='Initiate Tag length=32 position=0 value=b'\x00\x00\x0e\xb0' + - id='Advertised Receiver Window Credit' length=32 position=0 value=b'\x00\x00\x10\x00' + - id='Number of Outbound Streams' length=16 position=0 value=b'\x00\x11' + - id='Number of Inbound Streams' length=16 position=0 value=b'\x00\x11' + - id='Initial TSN length=32 position=0 value=b'\x00\x00\x36\x14' + - id='Parameter Type' length=16 position=0 value=b'\x00\x07' + - id='Parameter Length' length=16 position=0 value=b'\x00\x68' + - id='Parameter Value' length=800 position=0 value=b'\x00\x00\x0e\xb0\x00\x00\x10\x00\x00\x11\x00\x11' + b'\x00\x00\x36\x14\x43\x23\x25\x44\x00\x00\xff\xff\x00\x11\x00\x11' + b'\x5c\xfe\x37\x9f\x07\x00\x07\x00\x00\x00\x00\x00\x00\x00\x00\x00' + b'\xa2\x85\xb1\x3f\x10\x27\x00\x00\x17\xcd\x8f\x1c\x11\x76\x9b\x04' + b'\x55\xc0\xd0\xf2\x2c\x3e\x7c\x35\x00\x01\x00\x01\x00\x00\x00\x00' + b'\x00\x00\x00\x00\x00\x05\x00\x08\xc0\xa8\xaa\x38\x00\x05\x00\x08' + b'\xc0\xa8\xaa\x08\xc0\x00\x00\x04' + - id='Parameter Type' length=16 position=0 value=b'\xc0\x00' + - id='Parameter Length' length=16 position=0 value=b'\x00\x04' + + """ + + valid_sctp_packet:bytes = bytes(b'\x00\x07\x00\x07\x43\x23\x25\x44\xc9\x01\x85\x24\x02\x00\x00\x80' + b'\x00\x00\x0e\xb0\x00\x00\x10\x00\x00\x11\x00\x11\x00\x00\x36\x14' + b'\x00\x07\x00\x68\x00\x00\x0e\xb0\x00\x00\x10\x00\x00\x11\x00\x11' + b'\x00\x00\x36\x14\x43\x23\x25\x44\x00\x00\xff\xff\x00\x11\x00\x11' + b'\x5c\xfe\x37\x9f\x07\x00\x07\x00\x00\x00\x00\x00\x00\x00\x00\x00' + b'\xa2\x85\xb1\x3f\x10\x27\x00\x00\x17\xcd\x8f\x1c\x11\x76\x9b\x04' + b'\x55\xc0\xd0\xf2\x2c\x3e\x7c\x35\x00\x01\x00\x01\x00\x00\x00\x00' + b'\x00\x00\x00\x00\x00\x05\x00\x08\xc0\xa8\xaa\x38\x00\x05\x00\x08' + b'\xc0\xa8\xaa\x08\xc0\x00\x00\x04\xc0\x00\x00\x04' + ) + valid_sctp_packet_buffer: Buffer = Buffer(content=valid_sctp_packet, length=len(valid_sctp_packet)*8) + parser:SCTPParser = SCTPParser() + + sctp_header_descriptor: HeaderDescriptor = parser.parse(buffer=valid_sctp_packet_buffer) + + # test sctp_header_descriptor type + assert isinstance(sctp_header_descriptor, HeaderDescriptor) + + # test for sctp_header_descriptor.fields length + assert len(sctp_header_descriptor.fields) == 17 + + # test for sctp_header_descriptor.fields types + for field in sctp_header_descriptor.fields: + assert isinstance(field, FieldDescriptor) + + # assert field descriptors match SCTP header content + # - common header fields + source_port_fd:FieldDescriptor = sctp_header_descriptor.fields[0] + assert source_port_fd.id == SCTPFields.SOURCE_PORT + assert source_port_fd.position == 0 + assert source_port_fd.value == Buffer(content=b'\x00\x07', length=16) + + destination_port_fd:FieldDescriptor = sctp_header_descriptor.fields[1] + assert destination_port_fd.id == SCTPFields.DESTINATION_PORT + assert destination_port_fd.position == 0 + assert destination_port_fd.value == Buffer(content=b'\x00\x07', length=16) + + verification_tag_fd:FieldDescriptor = sctp_header_descriptor.fields[2] + assert verification_tag_fd.id == SCTPFields.VERIFICATION_TAG + assert verification_tag_fd.position == 0 + assert verification_tag_fd.value == Buffer(content=b'\x43\x23\x25\x44', length=32) + + checksum_fd:FieldDescriptor = sctp_header_descriptor.fields[3] + assert checksum_fd.id == SCTPFields.CHECKSUM + assert checksum_fd.position == 0 + assert checksum_fd.value == Buffer(content=b'\xc9\x01\x85\x24', length=32) + + + # - chunk common header fields + chunk_type_fd:FieldDescriptor = sctp_header_descriptor.fields[4] + assert chunk_type_fd.id == SCTPFields.CHUNK_TYPE + assert chunk_type_fd.position == 0 + assert chunk_type_fd.value == Buffer(content=b'\x02', length=8) + + chunk_flags_fd:FieldDescriptor = sctp_header_descriptor.fields[5] + assert chunk_flags_fd.id == SCTPFields.CHUNK_FLAGS + assert chunk_flags_fd.position == 0 + assert chunk_flags_fd.value == Buffer(content=b'\x00', length=8) + + chunk_length_fd:FieldDescriptor = sctp_header_descriptor.fields[6] + assert chunk_length_fd.id == SCTPFields.CHUNK_LENGTH + assert chunk_length_fd.position == 0 + assert chunk_length_fd.value == Buffer(content=b'\x00\x80', length=16) + + initiate_tag_fd:FieldDescriptor = sctp_header_descriptor.fields[7] + assert initiate_tag_fd.id == SCTPFields.CHUNK_INIT_ACK_INITIATE_TAG + assert initiate_tag_fd.position == 0 + assert initiate_tag_fd.value == Buffer(content=b'\x00\x00\x0e\xb0', length=32) + + advertised_receiver_window_credit_fd:FieldDescriptor = sctp_header_descriptor.fields[8] + assert advertised_receiver_window_credit_fd.id == SCTPFields.CHUNK_INIT_ACK_ADVERTISED_RECEIVER_WINDOW_CREDIT + assert advertised_receiver_window_credit_fd.position == 0 + assert advertised_receiver_window_credit_fd.value == Buffer(content=b'\x00\x00\x10\x00', length=32) + + number_outbound_streams_fd:FieldDescriptor = sctp_header_descriptor.fields[9] + assert number_outbound_streams_fd.id == SCTPFields.CHUNK_INIT_ACK_NUMBER_OF_OUTBOUND_STREAMS + assert number_outbound_streams_fd.position == 0 + assert number_outbound_streams_fd.value == Buffer(content=b'\x00\x11', length=16) + + number_inbound_streams_fd:FieldDescriptor = sctp_header_descriptor.fields[10] + assert number_inbound_streams_fd.id == SCTPFields.CHUNK_INIT_ACK_NUMBER_OF_INBOUND_STREAMS + assert number_inbound_streams_fd.position == 0 + assert number_inbound_streams_fd.value == Buffer(content=b'\x00\x11', length=16) + + initial_tsn_fd:FieldDescriptor = sctp_header_descriptor.fields[11] + assert initial_tsn_fd.id == SCTPFields.CHUNK_INIT_ACK_INITIAL_TSN + assert initial_tsn_fd.position == 0 + assert initial_tsn_fd.value == Buffer(content=b'\x00\x00\x36\x14', length=32) + + parameter_type_fd:FieldDescriptor = sctp_header_descriptor.fields[12] + assert parameter_type_fd.id == SCTPFields.PARAMETER_TYPE + assert parameter_type_fd.position == 0 + assert parameter_type_fd.value == Buffer(content=b'\x00\x07', length=16) + + parameter_length_fd:FieldDescriptor = sctp_header_descriptor.fields[13] + assert parameter_length_fd.id == SCTPFields.PARAMETER_LENGTH + assert parameter_length_fd.position == 0 + assert parameter_length_fd.value == Buffer(content=b'\x00\x68', length=16) + + parameter_value_fd:FieldDescriptor = sctp_header_descriptor.fields[14] + assert parameter_value_fd.id == SCTPFields.PARAMETER_VALUE + assert parameter_value_fd.position == 0 + assert parameter_value_fd.value == Buffer(content=b'\x00\x00\x0e\xb0\x00\x00\x10\x00\x00\x11\x00\x11' + b'\x00\x00\x36\x14\x43\x23\x25\x44\x00\x00\xff\xff\x00\x11\x00\x11' + b'\x5c\xfe\x37\x9f\x07\x00\x07\x00\x00\x00\x00\x00\x00\x00\x00\x00' + b'\xa2\x85\xb1\x3f\x10\x27\x00\x00\x17\xcd\x8f\x1c\x11\x76\x9b\x04' + b'\x55\xc0\xd0\xf2\x2c\x3e\x7c\x35\x00\x01\x00\x01\x00\x00\x00\x00' + b'\x00\x00\x00\x00\x00\x05\x00\x08\xc0\xa8\xaa\x38\x00\x05\x00\x08' + b'\xc0\xa8\xaa\x08\xc0\x00\x00\x04', + length=800) + + parameter_type_fd:FieldDescriptor = sctp_header_descriptor.fields[15] + assert parameter_type_fd.id == SCTPFields.PARAMETER_TYPE + assert parameter_type_fd.position == 0 + assert parameter_type_fd.value == Buffer(content=b'\xc0\x00', length=16) + + parameter_length_fd:FieldDescriptor = sctp_header_descriptor.fields[16] + assert parameter_length_fd.id == SCTPFields.PARAMETER_LENGTH + assert parameter_length_fd.position == 0 + assert parameter_length_fd.value == Buffer(content=b'\x00\x04', length=16) + + \ No newline at end of file From a76243c02788c1fba5f119a7df92c8019c3dc87f Mon Sep 17 00:00:00 2001 From: Quentin Lampin Date: Sun, 8 Dec 2024 22:58:08 +0100 Subject: [PATCH 05/10] [clean-up] duplicate fields declaration --- microschc/protocol/sctp.py | 35 ----------------------------------- 1 file changed, 35 deletions(-) diff --git a/microschc/protocol/sctp.py b/microschc/protocol/sctp.py index ec58226..49b913e 100644 --- a/microschc/protocol/sctp.py +++ b/microschc/protocol/sctp.py @@ -11,41 +11,6 @@ SCTP_HEADER_ID = 'SCTP' -class SCTPFields(str, Enum): - SOURCE_PORT_NUMBER = f'{SCTP_HEADER_ID}:Source Port Number' - DESTINATION_PORT_NUMBER = f'{SCTP_HEADER_ID}:Destination Port Number' - VERIFICATION_TAG = f'{SCTP_HEADER_ID}:Verification Tag' - CHECKSUM = f'{SCTP_HEADER_ID}:Checksum' - CHUNK_TYPE = f'{SCTP_HEADER_ID}:Chunk Type' - CHUNK_FLAGS = f'{SCTP_HEADER_ID}:Chunk Flags' - CHUNK_LENGTH = f'{SCTP_HEADER_ID}:Chunk Length' - DATA_RES = f'{SCTP_HEADER_ID}:Data Res' - DATA_I = f'{SCTP_HEADER_ID}:Data I' - DATA_U = f'{SCTP_HEADER_ID}:Data U' - DATA_B = f'{SCTP_HEADER_ID}:Data B' - DATA_E = f'{SCTP_HEADER_ID}:Data E' - DATA_LENGTH = f'{SCTP_HEADER_ID}:Data Length' - DATA_TSN = f'{SCTP_HEADER_ID}:Data TSN' - DATA_STREAM_IDENTIFIER = f'{SCTP_HEADER_ID}:Data Stream Identifier S' - DATA_STREAM_SEQUENCE_NUMBER = f'{SCTP_HEADER_ID}:Data Stream Sequence Number n' - DATA_PAYLOAD_PROTOCOL_IDENTIFIER = f'{SCTP_HEADER_ID}:Data Payload Protocol Identifier' - INIT_CHUNK_FLAGS = f'{SCTP_HEADER_ID}:Chunk Flags' - INIT_CHUNK_LENGTH = f'{SCTP_HEADER_ID}:Chunk Length' - INIT_INITIATE_TAG = f'{SCTP_HEADER_ID}:Initiate Tag' - INIT_ADVERTISED_RECEIVER_WINDOW_CREDIT = f'{SCTP_HEADER_ID}:Advertised Receiver Window Credit' - INIT_NUMBER_OF_OUTBOUND_STREAMS = f'{SCTP_HEADER_ID}:Number of Outbound Streams' - INIT_NUMBER_OF_INBOUND_STREAMS = f'{SCTP_HEADER_ID}:Number of Inbound Streams' - INIT_INITIAL_TSN = f'{SCTP_HEADER_ID}:Initial TSN' - - INIT_ACK_CHUNK_FLAGS = f'{SCTP_HEADER_ID}:Chunk Flags' - INIT_ACK_CHUNK_LENGTH = f'{SCTP_HEADER_ID}:Chunk Length' - INIT_ACK_INITIATE_TAG = f'{SCTP_HEADER_ID}:Initiate Tag' - INIT_ACK_ADVERTISED_RECEIVER_WINDOW_CREDIT = f'{SCTP_HEADER_ID}:Advertised Receiver Window Credit' - INIT_ACK_NUMBER_OF_OUTBOUND_STREAMS = f'{SCTP_HEADER_ID}:Number of Outbound Streams' - INIT_ACK_NUMBER_OF_INBOUND_STREAMS = f'{SCTP_HEADER_ID}:Number of Inbound Streams' - INIT_ACK_INITIAL_TSN = f'{SCTP_HEADER_ID}:Initial TSN' - - from enum import Enum from typing import Dict, List, Tuple from microschc.binary.buffer import Buffer From a0793fd030233201bcc01a9aafe65efdc40bc564 Mon Sep 17 00:00:00 2001 From: Quentin Lampin Date: Mon, 9 Dec 2024 09:36:37 +0100 Subject: [PATCH 06/10] [wip] SCTO selective ack chunk parsing (gap blocks and duplicate tsns not tested) --- microschc/protocol/sctp.py | 87 +++++++++++++++++++++++++++++++-- tests/protocol/test_sctp.py | 95 +++++++++++++++++++++++++++++++++++++ 2 files changed, 177 insertions(+), 5 deletions(-) diff --git a/microschc/protocol/sctp.py b/microschc/protocol/sctp.py index 49b913e..e4061db 100644 --- a/microschc/protocol/sctp.py +++ b/microschc/protocol/sctp.py @@ -47,6 +47,14 @@ class SCTPFields(str, Enum): CHUNK_INIT_ACK_NUMBER_OF_OUTBOUND_STREAMS = f'{SCTP_HEADER_ID}:Init Ack Number of Outbound Streams' CHUNK_INIT_ACK_NUMBER_OF_INBOUND_STREAMS = f'{SCTP_HEADER_ID}:Init Ack Number of Inbound Streams' CHUNK_INIT_ACK_INITIAL_TSN = f'{SCTP_HEADER_ID}:Init Ack Initial TSN' + + CHUNK_SACK_CUMULATIVE_TSN_ACK = f'{SCTP_HEADER_ID}:Selective Ack Cumulative TSN Ack' + CHUNK_SACK_ADVERTISED_RECEIVER_WINDOW_CREDIT = f'{SCTP_HEADER_ID}:Selective Ack Advertised Receiver Window Credit' + CHUNK_SACK_NUMBER_GAP_ACK_BLOCKS = f'{SCTP_HEADER_ID}:Selective Ack Number Gap Ack Blocks' + CHUNK_SACK_NUMBER_DUPLICATE_TSNS = f'{SCTP_HEADER_ID}:Selective Ack Number Duplicate TSNs' + CHUNK_SACK_GAP_ACK_BLOCK_START = f'{SCTP_HEADER_ID}:Selective Ack Gap Ack BLock Start' + CHUNK_SACK_GAP_ACK_BLOCK_END = f'{SCTP_HEADER_ID}:Selective Ack Gap Ack BLock End' + CHUNK_SACK_DUPLICATE_TSN = f'{SCTP_HEADER_ID}:Selective Ack Duplicate TSN' PARAMETER_TYPE = f'{SCTP_HEADER_ID}:Parameter Type' PARAMETER_LENGTH = f'{SCTP_HEADER_ID}:Parameter Length' @@ -186,16 +194,17 @@ def _parse_chunk(self, buffer: Buffer) -> Tuple[List[FieldDescriptor], int]: if chunk_type_value == SCTPChunkTypes.DATA: chunk_fields: List[FieldDescriptor] = self._parse_chunk_data(chunk_value) - fields.extend(chunk_fields) + elif chunk_type_value == SCTPChunkTypes.INIT: chunk_fields: List[FieldDescriptor] = self._parse_chunk_init(chunk_value) - fields.extend(chunk_fields) elif chunk_type_value == SCTPChunkTypes.INIT_ACK: chunk_fields: List[FieldDescriptor] = self._parse_chunk_init_ack(chunk_value) - fields.extend(chunk_fields) + elif chunk_type_value == SCTPChunkTypes.SACK: + chunk_fields: List[FieldDescriptor] = self._parse_chunk_selective_ack(chunk_value) else: - fields.append(FieldDescriptor(id=SCTPFields.CHUNK_VALUE, position=0, value=chunk_value)) - + chunk_fields: List[FieldDescriptor] = [FieldDescriptor(id=SCTPFields.CHUNK_VALUE, position=0, value=chunk_value)] + fields.extend(chunk_fields) + chunk_padding_length: int = (32 - chunk_length_value%32)%32 if chunk_padding_length > 0: chunk_padding: Buffer = buffer[chunk_length_value: chunk_length_value+chunk_padding_length] @@ -328,6 +337,74 @@ def _parse_chunk_init_ack(self, buffer: Buffer) -> List[FieldDescriptor]: parameters = parameters[bits_consumed:] return fields + + + def _parse_chunk_selective_ack(self, buffer: Buffer) -> List[FieldDescriptor]: + """ + 0 1 2 3 + 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | Type = 3 | Chunk Flags | Chunk Length | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | Cumulative TSN Ack | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | Advertised Receiver Window Credit (a_rwnd) | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | Number of Gap Ack Blocks = N | Number of Duplicate TSNs = M | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | Gap Ack Block #1 Start | Gap Ack Block #1 End | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + / / + \ ... \ + / / + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | Gap Ack Block #N Start | Gap Ack Block #N End | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | Duplicate TSN 1 | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + / / + \ ... \ + / / + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | Duplicate TSN M | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + """ + fields: List[FieldDescriptor] = [] + cumulative_tsn_ack: Buffer = buffer[0:32] + advertised_receiver_window_credit: Buffer = buffer[32:64] + number_gap_ack_blocks: Buffer = buffer[64:80] + number_duplicate_tsns: Buffer = buffer[80:96] + + fields.extend([ + FieldDescriptor(id=SCTPFields.CHUNK_SACK_CUMULATIVE_TSN_ACK, value=cumulative_tsn_ack, position=0), + FieldDescriptor(id=SCTPFields.CHUNK_SACK_ADVERTISED_RECEIVER_WINDOW_CREDIT, value=advertised_receiver_window_credit, position=0), + FieldDescriptor(id=SCTPFields.CHUNK_SACK_NUMBER_GAP_ACK_BLOCKS, value=number_gap_ack_blocks, position=0), + FieldDescriptor(id=SCTPFields.CHUNK_SACK_NUMBER_DUPLICATE_TSNS, value=number_duplicate_tsns, position=0) + ]) + + remainer: Buffer = buffer[96:] + # Gap ack Blocks + number_gap_ack_blocks_value: int = number_gap_ack_blocks.value() + + for _ in range(number_gap_ack_blocks_value): + gap_ack_block_start: Buffer = remainer[0:16] + gap_ack_block_end: Buffer = remainer[16:32] + fields.extend([ + FieldDescriptor(id=SCTPFields.CHUNK_SACK_GAP_ACK_BLOCK_START, value=gap_ack_block_start), + FieldDescriptor(id=SCTPFields.CHUNK_SACK_GAP_ACK_BLOCK_END, value=gap_ack_block_end) + ]) + remainer = remainer[32:] + + # Duplicate TSNs + number_duplicate_tsns_value: int = number_duplicate_tsns.value() + for _ in range(number_duplicate_tsns_value): + duplicate_tsn: Buffer = remainer[0:32] + fields.extend([ + FieldDescriptor(id=SCTPFields.CHUNK_SACK_DUPLICATE_TSN, value=duplicate_tsn), + ]) + remainer = remainer[32:] + + return fields def _parse_parameter(self, buffer: Buffer) -> Tuple[List[FieldDescriptor], int]: """ diff --git a/tests/protocol/test_sctp.py b/tests/protocol/test_sctp.py index 5484aa0..0d03570 100644 --- a/tests/protocol/test_sctp.py +++ b/tests/protocol/test_sctp.py @@ -424,5 +424,100 @@ def test_sctp_parser_parse_init_ack(): assert parameter_length_fd.id == SCTPFields.PARAMETER_LENGTH assert parameter_length_fd.position == 0 assert parameter_length_fd.value == Buffer(content=b'\x00\x04', length=16) + + +def test_sctp_parser_selective_ack(): + """test: SCTP header parser parses SCTP Header with SACK chunk + + The packet is made of a SCTP header with the following fields: + - id='Source Port Number' length=16 position=0 value=b'\x00\x07' + - id='Destination Port Number' length=16 position=0 value=b'\x00\x07' + - id='Verification Tag' length=32 position=0 value=b'\x00\x00\x0e\xb0' + - id='Checksum' length=32 position=0 value=b'\xba\x04\x32\x58' + - id='Chunk Type' length=8 position=0 value=b'\x03' + - id='Chunk Flags' length=8 position=0 value=b'\x00' + - id='Chunk Length' length=16 position=0 value=b'\x00\x10' + - id='Cumulative TSN Ack' length=32 position=0 value=b'\x00\x00\x36\x1c' + - id='Advertised Receiver Window Credit' length=32 position=0 value=b'\x00\x00\xff\xff' + - id='Number of Gap Ack Blocks' length=16 position=0 value=b'\x00\x00' + - id='Number of Duplicate TSNs' length=16 position=0 value=b'\x00\x00' + """ + #TODO: Find SACK chunk with Gap Ack Blocks and Duplicate TSNs. + + valid_sctp_packet:bytes = bytes(b'\x00\x07\x00\x07\x00\x00\x0e\xb0\xba\x04\x32\x58\x03\x00\x00\x10' + b'\x00\x00\x36\x1c\x00\x00\xff\xff\x00\x00\x00\x00') + valid_sctp_packet_buffer: Buffer = Buffer(content=valid_sctp_packet, length=len(valid_sctp_packet)*8) + parser:SCTPParser = SCTPParser() + + sctp_header_descriptor: HeaderDescriptor = parser.parse(buffer=valid_sctp_packet_buffer) + + # test sctp_header_descriptor type + assert isinstance(sctp_header_descriptor, HeaderDescriptor) + + # test for sctp_header_descriptor.fields length + assert len(sctp_header_descriptor.fields) == 11 + + # test for sctp_header_descriptor.fields types + for field in sctp_header_descriptor.fields: + assert isinstance(field, FieldDescriptor) + + # assert field descriptors match SCTP header content + # - common header fields + source_port_fd:FieldDescriptor = sctp_header_descriptor.fields[0] + assert source_port_fd.id == SCTPFields.SOURCE_PORT + assert source_port_fd.position == 0 + assert source_port_fd.value == Buffer(content=b'\x00\x07', length=16) + + destination_port_fd:FieldDescriptor = sctp_header_descriptor.fields[1] + assert destination_port_fd.id == SCTPFields.DESTINATION_PORT + assert destination_port_fd.position == 0 + assert destination_port_fd.value == Buffer(content=b'\x00\x07', length=16) + + verification_tag_fd:FieldDescriptor = sctp_header_descriptor.fields[2] + assert verification_tag_fd.id == SCTPFields.VERIFICATION_TAG + assert verification_tag_fd.position == 0 + assert verification_tag_fd.value == Buffer(content=b'\x00\x00\x0e\xb0', length=32) + + checksum_fd:FieldDescriptor = sctp_header_descriptor.fields[3] + assert checksum_fd.id == SCTPFields.CHECKSUM + assert checksum_fd.position == 0 + assert checksum_fd.value == Buffer(content=b'\xba\x04\x32\x58', length=32) + + + # - chunk common header fields + chunk_type_fd:FieldDescriptor = sctp_header_descriptor.fields[4] + assert chunk_type_fd.id == SCTPFields.CHUNK_TYPE + assert chunk_type_fd.position == 0 + assert chunk_type_fd.value == Buffer(content=b'\x03', length=8) + + chunk_flags_fd:FieldDescriptor = sctp_header_descriptor.fields[5] + assert chunk_flags_fd.id == SCTPFields.CHUNK_FLAGS + assert chunk_flags_fd.position == 0 + assert chunk_flags_fd.value == Buffer(content=b'\x00', length=8) + + chunk_length_fd:FieldDescriptor = sctp_header_descriptor.fields[6] + assert chunk_length_fd.id == SCTPFields.CHUNK_LENGTH + assert chunk_length_fd.position == 0 + assert chunk_length_fd.value == Buffer(content=b'\x00\x10', length=16) + + cumulative_tsn_ack_fd:FieldDescriptor = sctp_header_descriptor.fields[7] + assert cumulative_tsn_ack_fd.id == SCTPFields.CHUNK_SACK_CUMULATIVE_TSN_ACK + assert cumulative_tsn_ack_fd.position == 0 + assert cumulative_tsn_ack_fd.value == Buffer(content=b'\x00\x00\x36\x1c', length=32) + + advertised_receiver_window_credit_fd:FieldDescriptor = sctp_header_descriptor.fields[8] + assert advertised_receiver_window_credit_fd.id == SCTPFields.CHUNK_SACK_ADVERTISED_RECEIVER_WINDOW_CREDIT + assert advertised_receiver_window_credit_fd.position == 0 + assert advertised_receiver_window_credit_fd.value == Buffer(content=b'\x00\x00\xff\xff', length=32) + + number_gap_ack_blocks_fd:FieldDescriptor = sctp_header_descriptor.fields[9] + assert number_gap_ack_blocks_fd.id == SCTPFields.CHUNK_SACK_NUMBER_GAP_ACK_BLOCKS + assert number_gap_ack_blocks_fd.position == 0 + assert number_gap_ack_blocks_fd.value == Buffer(content=b'\x00\x00', length=16) + + number_inbound_streams_fd:FieldDescriptor = sctp_header_descriptor.fields[10] + assert number_inbound_streams_fd.id == SCTPFields.CHUNK_SACK_NUMBER_DUPLICATE_TSNS + assert number_inbound_streams_fd.position == 0 + assert number_inbound_streams_fd.value == Buffer(content=b'\x00\x00', length=16) \ No newline at end of file From 3375e7f55f85a202e59cda808f59779bd90fddd0 Mon Sep 17 00:00:00 2001 From: Quentin Lampin Date: Mon, 9 Dec 2024 12:03:27 +0100 Subject: [PATCH 07/10] [wip] SCTP heartbeat chunk --- microschc/protocol/sctp.py | 27 ++++++++++- tests/protocol/test_sctp.py | 93 ++++++++++++++++++++++++++++++++++++- 2 files changed, 118 insertions(+), 2 deletions(-) diff --git a/microschc/protocol/sctp.py b/microschc/protocol/sctp.py index e4061db..75fed39 100644 --- a/microschc/protocol/sctp.py +++ b/microschc/protocol/sctp.py @@ -194,13 +194,14 @@ def _parse_chunk(self, buffer: Buffer) -> Tuple[List[FieldDescriptor], int]: if chunk_type_value == SCTPChunkTypes.DATA: chunk_fields: List[FieldDescriptor] = self._parse_chunk_data(chunk_value) - elif chunk_type_value == SCTPChunkTypes.INIT: chunk_fields: List[FieldDescriptor] = self._parse_chunk_init(chunk_value) elif chunk_type_value == SCTPChunkTypes.INIT_ACK: chunk_fields: List[FieldDescriptor] = self._parse_chunk_init_ack(chunk_value) elif chunk_type_value == SCTPChunkTypes.SACK: chunk_fields: List[FieldDescriptor] = self._parse_chunk_selective_ack(chunk_value) + elif chunk_type_value == SCTPChunkTypes.HEARTBEAT: + chunk_fields: List[FieldDescriptor] = self._parse_chunk_heartbeat(chunk_value) else: chunk_fields: List[FieldDescriptor] = [FieldDescriptor(id=SCTPFields.CHUNK_VALUE, position=0, value=chunk_value)] fields.extend(chunk_fields) @@ -405,6 +406,28 @@ def _parse_chunk_selective_ack(self, buffer: Buffer) -> List[FieldDescriptor]: remainer = remainer[32:] return fields + + def _parse_chunk_heartbeat(self, buffer: Buffer) -> List[FieldDescriptor]: + """ + 0 1 2 3 + 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | Type = 4 | Chunk Flags | Heartbeat Length | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + \ \ + / Heartbeat Information TLV (Variable-Length) / + \ \ + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + """ + fields: List[FieldDescriptor] = [] + + parameters = buffer + while parameters.length > 0: + parameter_fields, bits_consumed = self._parse_parameter(parameters) + fields.extend(parameter_fields) + parameters = parameters[bits_consumed:] + + return fields def _parse_parameter(self, buffer: Buffer) -> Tuple[List[FieldDescriptor], int]: """ @@ -440,5 +463,7 @@ def _parse_parameter(self, buffer: Buffer) -> Tuple[List[FieldDescriptor], int]: FieldDescriptor(id=SCTPFields.PARAMETER_PADDING, value=parameter_padding, position=0) ) return fields, parameter_length_value + parameter_padding_length + + \ No newline at end of file diff --git a/tests/protocol/test_sctp.py b/tests/protocol/test_sctp.py index 0d03570..1195a59 100644 --- a/tests/protocol/test_sctp.py +++ b/tests/protocol/test_sctp.py @@ -520,4 +520,95 @@ def test_sctp_parser_selective_ack(): assert number_inbound_streams_fd.position == 0 assert number_inbound_streams_fd.value == Buffer(content=b'\x00\x00', length=16) - \ No newline at end of file + +def test_sctp_parser_parse_heartbeat(): + """test: SCTP header parser parses SCTP Header with HEARTBEAT chunk + + The packet is made of a SCTP header with the following fields: + - id='Source Port Number' length=16 position=0 value=b'\x0b\x59' + - id='Destination Port Number' length=16 position=0 value=b'\x0b\x59' + - id='Verification Tag' length=32 position=0 value=b'\x00\x00\x0e\x50' + - id='Checksum' length=32 position=0 value=b'\x53\xc3\x05\x5f' + - id='Chunk Type' length=8 position=0 value=b'\x04' + - id='Chunk Flags' length=8 position=0 value=b'\x00' + - id='Chunk Length' length=16 position=0 value=b'\x00\x18' + - id='Parameter Type' length=16 position=0 value=b'\x00\x01' + - id='Parameter Length' length=16 position=0 value=b'\x00\x14' + - id='Parameter Value' length=16 position=0 value=b'\x40\xe4\x4b\x92\x0a\x1c\x06\x2c\x1b\x66\xaf\x7e\x00\x00\x00\x00' + + """ + + valid_sctp_packet:bytes = bytes(b'\x0b\x59\x0b\x59\x00\x00\x0e\x50\x53\xc3\x05\x5f\x04\x00\x00\x18' + b'\x00\x01\x00\x14\x40\xe4\x4b\x92\x0a\x1c\x06\x2c\x1b\x66\xaf\x7e' + b'\x00\x00\x00\x00' + + ) + + valid_sctp_packet_buffer: Buffer = Buffer(content=valid_sctp_packet, length=len(valid_sctp_packet)*8) + parser:SCTPParser = SCTPParser() + + sctp_header_descriptor: HeaderDescriptor = parser.parse(buffer=valid_sctp_packet_buffer) + + # test sctp_header_descriptor type + assert isinstance(sctp_header_descriptor, HeaderDescriptor) + + # test for sctp_header_descriptor.fields length + assert len(sctp_header_descriptor.fields) == 10 + + # test for sctp_header_descriptor.fields types + for field in sctp_header_descriptor.fields: + assert isinstance(field, FieldDescriptor) + + # assert field descriptors match SCTP header content + # - common header fields + source_port_fd:FieldDescriptor = sctp_header_descriptor.fields[0] + assert source_port_fd.id == SCTPFields.SOURCE_PORT + assert source_port_fd.position == 0 + assert source_port_fd.value == Buffer(content=b'\x0b\x59', length=16) + + destination_port_fd:FieldDescriptor = sctp_header_descriptor.fields[1] + assert destination_port_fd.id == SCTPFields.DESTINATION_PORT + assert destination_port_fd.position == 0 + assert destination_port_fd.value == Buffer(content=b'\x0b\x59', length=16) + + verification_tag_fd:FieldDescriptor = sctp_header_descriptor.fields[2] + assert verification_tag_fd.id == SCTPFields.VERIFICATION_TAG + assert verification_tag_fd.position == 0 + assert verification_tag_fd.value == Buffer(content=b'\x00\x00\x0e\x50', length=32) + + checksum_fd:FieldDescriptor = sctp_header_descriptor.fields[3] + assert checksum_fd.id == SCTPFields.CHECKSUM + assert checksum_fd.position == 0 + assert checksum_fd.value == Buffer(content=b'\x53\xc3\x05\x5f', length=32) + + + # - chunk common header fields + chunk_type_fd:FieldDescriptor = sctp_header_descriptor.fields[4] + assert chunk_type_fd.id == SCTPFields.CHUNK_TYPE + assert chunk_type_fd.position == 0 + assert chunk_type_fd.value == Buffer(content=b'\x04', length=8) + + chunk_flags_fd:FieldDescriptor = sctp_header_descriptor.fields[5] + assert chunk_flags_fd.id == SCTPFields.CHUNK_FLAGS + assert chunk_flags_fd.position == 0 + assert chunk_flags_fd.value == Buffer(content=b'\x00', length=8) + + chunk_length_fd:FieldDescriptor = sctp_header_descriptor.fields[6] + assert chunk_length_fd.id == SCTPFields.CHUNK_LENGTH + assert chunk_length_fd.position == 0 + assert chunk_length_fd.value == Buffer(content=b'\x00\x18', length=16) + + parameter_type_fd:FieldDescriptor = sctp_header_descriptor.fields[7] + assert parameter_type_fd.id == SCTPFields.PARAMETER_TYPE + assert parameter_type_fd.position == 0 + assert parameter_type_fd.value == Buffer(content=b'\x00\x01', length=16) + + parameter_length_fd:FieldDescriptor = sctp_header_descriptor.fields[8] + assert parameter_length_fd.id == SCTPFields.PARAMETER_LENGTH + assert parameter_length_fd.position == 0 + assert parameter_length_fd.value == Buffer(content=b'\x00\x14', length=16) + + parameter_value_fd:FieldDescriptor = sctp_header_descriptor.fields[9] + assert parameter_value_fd.id == SCTPFields.PARAMETER_VALUE + assert parameter_value_fd.position == 0 + assert parameter_value_fd.value == Buffer(content=b'\x40\xe4\x4b\x92\x0a\x1c\x06\x2c\x1b\x66\xaf\x7e\x00\x00\x00\x00', length=128) \ No newline at end of file From 935edcb6c52842c2415ad62f0d0466726800b4c3 Mon Sep 17 00:00:00 2001 From: Quentin Lampin Date: Mon, 9 Dec 2024 14:11:23 +0100 Subject: [PATCH 08/10] [wip] SCTP heartbeat ack chunk --- microschc/protocol/sctp.py | 24 ++++++++++ tests/protocol/test_sctp.py | 91 ++++++++++++++++++++++++++++++++++++- 2 files changed, 114 insertions(+), 1 deletion(-) diff --git a/microschc/protocol/sctp.py b/microschc/protocol/sctp.py index 75fed39..96431a4 100644 --- a/microschc/protocol/sctp.py +++ b/microschc/protocol/sctp.py @@ -202,6 +202,8 @@ def _parse_chunk(self, buffer: Buffer) -> Tuple[List[FieldDescriptor], int]: chunk_fields: List[FieldDescriptor] = self._parse_chunk_selective_ack(chunk_value) elif chunk_type_value == SCTPChunkTypes.HEARTBEAT: chunk_fields: List[FieldDescriptor] = self._parse_chunk_heartbeat(chunk_value) + elif chunk_type_value == SCTPChunkTypes.HEARTBEAT_ACK: + chunk_fields: List[FieldDescriptor] = self._parse_chunk_heartbeat_ack(chunk_value) else: chunk_fields: List[FieldDescriptor] = [FieldDescriptor(id=SCTPFields.CHUNK_VALUE, position=0, value=chunk_value)] fields.extend(chunk_fields) @@ -428,6 +430,28 @@ def _parse_chunk_heartbeat(self, buffer: Buffer) -> List[FieldDescriptor]: parameters = parameters[bits_consumed:] return fields + + def _parse_chunk_heartbeat_ack(self, buffer: Buffer) -> List[FieldDescriptor]: + """ + 0 1 2 3 + 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | Type = 5 | Chunk Flags | Heartbeat Length | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + \ \ + / Heartbeat Information TLV (Variable-Length) / + \ \ + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + """ + fields: List[FieldDescriptor] = [] + + parameters = buffer + while parameters.length > 0: + parameter_fields, bits_consumed = self._parse_parameter(parameters) + fields.extend(parameter_fields) + parameters = parameters[bits_consumed:] + + return fields def _parse_parameter(self, buffer: Buffer) -> Tuple[List[FieldDescriptor], int]: """ diff --git a/tests/protocol/test_sctp.py b/tests/protocol/test_sctp.py index 1195a59..5e9d08b 100644 --- a/tests/protocol/test_sctp.py +++ b/tests/protocol/test_sctp.py @@ -581,7 +581,6 @@ def test_sctp_parser_parse_heartbeat(): assert checksum_fd.position == 0 assert checksum_fd.value == Buffer(content=b'\x53\xc3\x05\x5f', length=32) - # - chunk common header fields chunk_type_fd:FieldDescriptor = sctp_header_descriptor.fields[4] assert chunk_type_fd.id == SCTPFields.CHUNK_TYPE @@ -608,6 +607,96 @@ def test_sctp_parser_parse_heartbeat(): assert parameter_length_fd.position == 0 assert parameter_length_fd.value == Buffer(content=b'\x00\x14', length=16) + parameter_value_fd:FieldDescriptor = sctp_header_descriptor.fields[9] + assert parameter_value_fd.id == SCTPFields.PARAMETER_VALUE + assert parameter_value_fd.position == 0 + assert parameter_value_fd.value == Buffer(content=b'\x40\xe4\x4b\x92\x0a\x1c\x06\x2c\x1b\x66\xaf\x7e\x00\x00\x00\x00', length=128) + + +def test_sctp_parser_parse_heartbeat_ack(): + """test: SCTP header parser parses SCTP Header with HEARTBEAT ACK chunk + + The packet is made of a SCTP header with the following fields: + - id='Source Port Number' length=16 position=0 value=b'\x0b\x59' + - id='Destination Port Number' length=16 position=0 value=b'\x0b\x59' + - id='Verification Tag' length=32 position=0 value=b'\x0d\x53\xe6\xfe' + - id='Checksum' length=32 position=0 value=b'\x8c\x8e\x07\x46' + - id='Chunk Type' length=8 position=0 value=b'\x05' + - id='Chunk Flags' length=8 position=0 value=b'\x00' + - id='Chunk Length' length=16 position=0 value=b'\x00\x18' + - id='Parameter Type' length=16 position=0 value=b'\x00\x01' + - id='Parameter Length' length=16 position=0 value=b'\x00\x14' + - id='Parameter Value' length=16 position=0 value=b'\x40\xe4\x4b\x92\x0a\x1c\x06\x2c\x1b\x66\xaf\x7e\x00\x00\x00\x00' + + """ + + valid_sctp_packet:bytes = bytes(b'\x0b\x59\x0b\x59\x0d\x53\xe6\xfe\x8c\x8e\x07\x46\x05\x00\x00\x18' + b'\x00\x01\x00\x14\x40\xe4\x4b\x92\x0a\x1c\x06\x2c\x1b\x66\xaf\x7e' + b'\x00\x00\x00\x00') + + valid_sctp_packet_buffer: Buffer = Buffer(content=valid_sctp_packet, length=len(valid_sctp_packet)*8) + parser:SCTPParser = SCTPParser() + + sctp_header_descriptor: HeaderDescriptor = parser.parse(buffer=valid_sctp_packet_buffer) + + # test sctp_header_descriptor type + assert isinstance(sctp_header_descriptor, HeaderDescriptor) + + # test for sctp_header_descriptor.fields length + assert len(sctp_header_descriptor.fields) == 10 + + # test for sctp_header_descriptor.fields types + for field in sctp_header_descriptor.fields: + assert isinstance(field, FieldDescriptor) + + # assert field descriptors match SCTP header content + # - common header fields + source_port_fd:FieldDescriptor = sctp_header_descriptor.fields[0] + assert source_port_fd.id == SCTPFields.SOURCE_PORT + assert source_port_fd.position == 0 + assert source_port_fd.value == Buffer(content=b'\x0b\x59', length=16) + + destination_port_fd:FieldDescriptor = sctp_header_descriptor.fields[1] + assert destination_port_fd.id == SCTPFields.DESTINATION_PORT + assert destination_port_fd.position == 0 + assert destination_port_fd.value == Buffer(content=b'\x0b\x59', length=16) + + verification_tag_fd:FieldDescriptor = sctp_header_descriptor.fields[2] + assert verification_tag_fd.id == SCTPFields.VERIFICATION_TAG + assert verification_tag_fd.position == 0 + assert verification_tag_fd.value == Buffer(content=b'\x0d\x53\xe6\xfe', length=32) + + checksum_fd:FieldDescriptor = sctp_header_descriptor.fields[3] + assert checksum_fd.id == SCTPFields.CHECKSUM + assert checksum_fd.position == 0 + assert checksum_fd.value == Buffer(content=b'\x8c\x8e\x07\x46', length=32) + + # - chunk common header fields + chunk_type_fd:FieldDescriptor = sctp_header_descriptor.fields[4] + assert chunk_type_fd.id == SCTPFields.CHUNK_TYPE + assert chunk_type_fd.position == 0 + assert chunk_type_fd.value == Buffer(content=b'\x05', length=8) + + chunk_flags_fd:FieldDescriptor = sctp_header_descriptor.fields[5] + assert chunk_flags_fd.id == SCTPFields.CHUNK_FLAGS + assert chunk_flags_fd.position == 0 + assert chunk_flags_fd.value == Buffer(content=b'\x00', length=8) + + chunk_length_fd:FieldDescriptor = sctp_header_descriptor.fields[6] + assert chunk_length_fd.id == SCTPFields.CHUNK_LENGTH + assert chunk_length_fd.position == 0 + assert chunk_length_fd.value == Buffer(content=b'\x00\x18', length=16) + + parameter_type_fd:FieldDescriptor = sctp_header_descriptor.fields[7] + assert parameter_type_fd.id == SCTPFields.PARAMETER_TYPE + assert parameter_type_fd.position == 0 + assert parameter_type_fd.value == Buffer(content=b'\x00\x01', length=16) + + parameter_length_fd:FieldDescriptor = sctp_header_descriptor.fields[8] + assert parameter_length_fd.id == SCTPFields.PARAMETER_LENGTH + assert parameter_length_fd.position == 0 + assert parameter_length_fd.value == Buffer(content=b'\x00\x14', length=16) + parameter_value_fd:FieldDescriptor = sctp_header_descriptor.fields[9] assert parameter_value_fd.id == SCTPFields.PARAMETER_VALUE assert parameter_value_fd.position == 0 From 07dd61664a7a24daee2f2020da76f3437cc7ead1 Mon Sep 17 00:00:00 2001 From: Quentin Lampin Date: Mon, 9 Dec 2024 15:42:38 +0100 Subject: [PATCH 09/10] [feat] SCTP parser with support for data, init, init_ack, sack, heartbeat, heartbeat ack, abort, shutdown, shutdown ack, error, cookie echo, cookie ack chunks --- microschc/protocol/sctp.py | 131 ++++++++++++ tests/protocol/test_sctp.py | 406 +++++++++++++++++++++++++++++++++++- 2 files changed, 536 insertions(+), 1 deletion(-) diff --git a/microschc/protocol/sctp.py b/microschc/protocol/sctp.py index 96431a4..48c0d45 100644 --- a/microschc/protocol/sctp.py +++ b/microschc/protocol/sctp.py @@ -55,6 +55,10 @@ class SCTPFields(str, Enum): CHUNK_SACK_GAP_ACK_BLOCK_START = f'{SCTP_HEADER_ID}:Selective Ack Gap Ack BLock Start' CHUNK_SACK_GAP_ACK_BLOCK_END = f'{SCTP_HEADER_ID}:Selective Ack Gap Ack BLock End' CHUNK_SACK_DUPLICATE_TSN = f'{SCTP_HEADER_ID}:Selective Ack Duplicate TSN' + + CHUNK_SHUTDOWN_CUMULATIVE_TSN_ACK = f'{SCTP_HEADER_ID}:Shutdown Cumulative TSN' + + CHUNK_COOKIE_ECHO_COOKIE = f'{SCTP_HEADER_ID}:Cookie Echo Cookie' PARAMETER_TYPE = f'{SCTP_HEADER_ID}:Parameter Type' PARAMETER_LENGTH = f'{SCTP_HEADER_ID}:Parameter Length' @@ -204,6 +208,20 @@ def _parse_chunk(self, buffer: Buffer) -> Tuple[List[FieldDescriptor], int]: chunk_fields: List[FieldDescriptor] = self._parse_chunk_heartbeat(chunk_value) elif chunk_type_value == SCTPChunkTypes.HEARTBEAT_ACK: chunk_fields: List[FieldDescriptor] = self._parse_chunk_heartbeat_ack(chunk_value) + elif chunk_type_value == SCTPChunkTypes.ABORT: + chunk_fields: List[FieldDescriptor] = self._parse_chunk_abort(chunk_value) + elif chunk_type_value == SCTPChunkTypes.SHUTDOWN: + chunk_fields: List[FieldDescriptor] = self._parse_chunk_shutdown(chunk_value) + elif chunk_type_value == SCTPChunkTypes.SHUTDOWN_ACK: + chunk_fields: List[FieldDescriptor] = self._parse_chunk_shutdown_ack(chunk_value) + elif chunk_type_value == SCTPChunkTypes.ERROR: + chunk_fields: List[FieldDescriptor] = self._parse_chunk_error(chunk_value) + elif chunk_type_value == SCTPChunkTypes.COOKIE_ECHO: + chunk_fields: List[FieldDescriptor] = self._parse_chunk_cookie_echo(chunk_value) + elif chunk_type_value == SCTPChunkTypes.COOKIE_ACK: + chunk_fields: List[FieldDescriptor] = self._parse_chunk_cookie_ack(chunk_value) + elif chunk_type_value == SCTPChunkTypes.SHUTDOWN_COMPLETE: + chunk_fields: List[FieldDescriptor] = self._parse_chunk_shutdown_complete(chunk_value) else: chunk_fields: List[FieldDescriptor] = [FieldDescriptor(id=SCTPFields.CHUNK_VALUE, position=0, value=chunk_value)] fields.extend(chunk_fields) @@ -452,6 +470,119 @@ def _parse_chunk_heartbeat_ack(self, buffer: Buffer) -> List[FieldDescriptor]: parameters = parameters[bits_consumed:] return fields + + def _parse_chunk_abort(self, buffer: Buffer) -> List[FieldDescriptor]: + """ + 0 1 2 3 + 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | Type = 6 | Reserved |T| Length | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + \ \ + / zero or more Error Causes / + \ \ + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + """ + fields: List[FieldDescriptor] = [] + + parameters = buffer + while parameters.length > 0: + parameter_fields, bits_consumed = self._parse_parameter(parameters) + fields.extend(parameter_fields) + parameters = parameters[bits_consumed:] + + return fields + + def _parse_chunk_shutdown(self, buffer: Buffer) -> List[FieldDescriptor]: + """ + 0 1 2 3 + 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | Type = 7 | Chunk Flags | Length = 8 | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | Cumulative TSN Ack | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + """ + fields: List[FieldDescriptor] = [] + + fields: List[FieldDescriptor] = [] + cumulative_tsn_ack: Buffer = buffer[0:32] + fields.append(FieldDescriptor(id=SCTPFields.CHUNK_SHUTDOWN_CUMULATIVE_TSN_ACK, value=cumulative_tsn_ack, position=0)) + + return fields + + def _parse_chunk_shutdown_ack(self, buffer: Buffer) -> List[FieldDescriptor]: + """ + 0 1 2 3 + 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | Type = 8 | Chunk Flags | Length = 4 | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + """ + fields: List[FieldDescriptor] = [] + return fields + + def _parse_chunk_error(self, buffer: Buffer) -> List[FieldDescriptor]: + """ + 0 1 2 3 + 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | Type = 9 | Chunk Flags | Length | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + \ \ + / one or more Error Causes / + \ \ + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + """ + fields: List[FieldDescriptor] = [] + + parameters = buffer + while parameters.length > 0: + parameter_fields, bits_consumed = self._parse_parameter(parameters) + fields.extend(parameter_fields) + parameters = parameters[bits_consumed:] + + return fields + + def _parse_chunk_cookie_echo(self, buffer: Buffer) -> List[FieldDescriptor]: + """ + 0 1 2 3 + 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | Type = 10 | Chunk Flags | Length | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + / Cookie / + \ \ + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + """ + fields: List[FieldDescriptor] = [] + + cookie: Buffer = buffer + fields.append(FieldDescriptor(id=SCTPFields.CHUNK_COOKIE_ECHO_COOKIE, value=cookie, position=0)) + + return fields + + def _parse_chunk_cookie_ack(self, buffer: Buffer) -> List[FieldDescriptor]: + """ + 0 1 2 3 + 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | Type = 11 | Chunk Flags | Length = 4 | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + """ + fields: List[FieldDescriptor] = [] + return fields + + def _parse_chunk_shutdown_complete(self, buffer: Buffer) -> List[FieldDescriptor]: + """ + 0 1 2 3 + 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | Type = 14 | Chunk Flags | Length = 4 | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + """ + fields: List[FieldDescriptor] = [] + return fields def _parse_parameter(self, buffer: Buffer) -> Tuple[List[FieldDescriptor], int]: """ diff --git a/tests/protocol/test_sctp.py b/tests/protocol/test_sctp.py index 5e9d08b..4e05e26 100644 --- a/tests/protocol/test_sctp.py +++ b/tests/protocol/test_sctp.py @@ -700,4 +700,408 @@ def test_sctp_parser_parse_heartbeat_ack(): parameter_value_fd:FieldDescriptor = sctp_header_descriptor.fields[9] assert parameter_value_fd.id == SCTPFields.PARAMETER_VALUE assert parameter_value_fd.position == 0 - assert parameter_value_fd.value == Buffer(content=b'\x40\xe4\x4b\x92\x0a\x1c\x06\x2c\x1b\x66\xaf\x7e\x00\x00\x00\x00', length=128) \ No newline at end of file + + +def test_sctp_parser_parse_abort(): + # TODO: find PCPAP with abort chunk + pass + +def test_sctp_parser_parse_shutdown(): + """test: SCTP header parser parses SCTP Header with a SHUTDOWN chunk + + The packet is made of a SCTP header with the following fields: + - id='Source Port Number' length=16 position=0 value=b'\x80\x45' + - id='Destination Port Number' length=16 position=0 value=b'\x00\x50' + - id='Verification Tag' length=32 position=0 value=b'\x13\x89\x60\x07' + - id='Checksum' length=32 position=0 value=b'\x75\x58\xd7\xdb' + - id='Chunk Type' length=8 position=0 value=b'\x07' + - id='Chunk Flags' length=8 position=0 value=b'\x00' + - id='Chunk Length' length=16 position=0 value=b'\x00\x08' + - id='Cumulative TSN Ack' length=32 position=0 value=b'\xaa\x86\x96\x09' + """ + + valid_sctp_packet:bytes = bytes(b'\x80\x45\x00\x50\x13\x89\x60\x07\x75\x58\xd7\xdb\x07\x00\x00\x08' + b'\xaa\x86\x96\x09') + + valid_sctp_packet_buffer: Buffer = Buffer(content=valid_sctp_packet, length=len(valid_sctp_packet)*8) + parser:SCTPParser = SCTPParser() + + sctp_header_descriptor: HeaderDescriptor = parser.parse(buffer=valid_sctp_packet_buffer) + + # test sctp_header_descriptor type + assert isinstance(sctp_header_descriptor, HeaderDescriptor) + + # test for sctp_header_descriptor.fields length + assert len(sctp_header_descriptor.fields) == 8 + + # test for sctp_header_descriptor.fields types + for field in sctp_header_descriptor.fields: + assert isinstance(field, FieldDescriptor) + + # assert field descriptors match SCTP header content + # - common header fields + source_port_fd:FieldDescriptor = sctp_header_descriptor.fields[0] + assert source_port_fd.id == SCTPFields.SOURCE_PORT + assert source_port_fd.position == 0 + assert source_port_fd.value == Buffer(content=b'\x80\x45', length=16) + + destination_port_fd:FieldDescriptor = sctp_header_descriptor.fields[1] + assert destination_port_fd.id == SCTPFields.DESTINATION_PORT + assert destination_port_fd.position == 0 + assert destination_port_fd.value == Buffer(content=b'\x00\x50', length=16) + + verification_tag_fd:FieldDescriptor = sctp_header_descriptor.fields[2] + assert verification_tag_fd.id == SCTPFields.VERIFICATION_TAG + assert verification_tag_fd.position == 0 + assert verification_tag_fd.value == Buffer(content=b'\x13\x89\x60\x07', length=32) + + checksum_fd:FieldDescriptor = sctp_header_descriptor.fields[3] + assert checksum_fd.id == SCTPFields.CHECKSUM + assert checksum_fd.position == 0 + assert checksum_fd.value == Buffer(content=b'\x75\x58\xd7\xdb', length=32) + + + # - chunk common header fields + chunk_type_fd:FieldDescriptor = sctp_header_descriptor.fields[4] + assert chunk_type_fd.id == SCTPFields.CHUNK_TYPE + assert chunk_type_fd.position == 0 + assert chunk_type_fd.value == Buffer(content=b'\x07', length=8) + + chunk_flags_fd:FieldDescriptor = sctp_header_descriptor.fields[5] + assert chunk_flags_fd.id == SCTPFields.CHUNK_FLAGS + assert chunk_flags_fd.position == 0 + assert chunk_flags_fd.value == Buffer(content=b'\x00', length=8) + + chunk_length_fd:FieldDescriptor = sctp_header_descriptor.fields[6] + assert chunk_length_fd.id == SCTPFields.CHUNK_LENGTH + assert chunk_length_fd.position == 0 + assert chunk_length_fd.value == Buffer(content=b'\x00\x08', length=16) + + cumulative_tsn_ack_fd:FieldDescriptor = sctp_header_descriptor.fields[7] + assert cumulative_tsn_ack_fd.id == SCTPFields.CHUNK_SHUTDOWN_CUMULATIVE_TSN_ACK + assert cumulative_tsn_ack_fd.position == 0 + assert cumulative_tsn_ack_fd.value == Buffer(content=b'\xaa\x86\x96\x09', length=32) + +def test_sctp_parser_parse_shutdown_ack(): + """test: SCTP header parser parses SCTP Header with a SHUTDOWN ACK chunk + + The packet is made of a SCTP header with the following fields: + - id='Source Port Number' length=16 position=0 value=b'\x00\x50' + - id='Destination Port Number' length=16 position=0 value=b'\x80\x45' + - id='Verification Tag' length=32 position=0 value=b'\xe9\xb7\x3b\xc3' + - id='Checksum' length=32 position=0 value=b'\xc7\x4e\x59\xcd' + - id='Chunk Type' length=8 position=0 value=b'\x08' + - id='Chunk Flags' length=8 position=0 value=b'\x00' + - id='Chunk Length' length=16 position=0 value=b'\x00\x04' + """ + + valid_sctp_packet:bytes = bytes(b'\x00\x50\x80\x45\xe9\xb7\x3b\xc3\xc7\x4e\x59\xcd\x08\x00\x00\x04') + + valid_sctp_packet_buffer: Buffer = Buffer(content=valid_sctp_packet, length=len(valid_sctp_packet)*8) + parser:SCTPParser = SCTPParser() + + sctp_header_descriptor: HeaderDescriptor = parser.parse(buffer=valid_sctp_packet_buffer) + + # test sctp_header_descriptor type + assert isinstance(sctp_header_descriptor, HeaderDescriptor) + + # test for sctp_header_descriptor.fields length + assert len(sctp_header_descriptor.fields) == 7 + + # test for sctp_header_descriptor.fields types + for field in sctp_header_descriptor.fields: + assert isinstance(field, FieldDescriptor) + + # assert field descriptors match SCTP header content + # - common header fields + source_port_fd:FieldDescriptor = sctp_header_descriptor.fields[0] + assert source_port_fd.id == SCTPFields.SOURCE_PORT + assert source_port_fd.position == 0 + assert source_port_fd.value == Buffer(content=b'\x00\x50', length=16) + + destination_port_fd:FieldDescriptor = sctp_header_descriptor.fields[1] + assert destination_port_fd.id == SCTPFields.DESTINATION_PORT + assert destination_port_fd.position == 0 + assert destination_port_fd.value == Buffer(content=b'\x80\x45', length=16) + + verification_tag_fd:FieldDescriptor = sctp_header_descriptor.fields[2] + assert verification_tag_fd.id == SCTPFields.VERIFICATION_TAG + assert verification_tag_fd.position == 0 + assert verification_tag_fd.value == Buffer(content=b'\xe9\xb7\x3b\xc3', length=32) + + checksum_fd:FieldDescriptor = sctp_header_descriptor.fields[3] + assert checksum_fd.id == SCTPFields.CHECKSUM + assert checksum_fd.position == 0 + assert checksum_fd.value == Buffer(content=b'\xc7\x4e\x59\xcd', length=32) + + + # - chunk common header fields + chunk_type_fd:FieldDescriptor = sctp_header_descriptor.fields[4] + assert chunk_type_fd.id == SCTPFields.CHUNK_TYPE + assert chunk_type_fd.position == 0 + assert chunk_type_fd.value == Buffer(content=b'\x08', length=8) + + chunk_flags_fd:FieldDescriptor = sctp_header_descriptor.fields[5] + assert chunk_flags_fd.id == SCTPFields.CHUNK_FLAGS + assert chunk_flags_fd.position == 0 + assert chunk_flags_fd.value == Buffer(content=b'\x00', length=8) + + chunk_length_fd:FieldDescriptor = sctp_header_descriptor.fields[6] + assert chunk_length_fd.id == SCTPFields.CHUNK_LENGTH + assert chunk_length_fd.position == 0 + assert chunk_length_fd.value == Buffer(content=b'\x00\x04', length=16) + + +def test_sctp_parser_parse_shutdown_complete(): + """test: SCTP header parser parses SCTP Header with a SHUTDOWN COMPLETE chunk + + The packet is made of a SCTP header with the following fields: + - id='Source Port Number' length=16 position=0 value=b'\x80\x45' + - id='Destination Port Number' length=16 position=0 value=b'\x00\x50' + - id='Verification Tag' length=32 position=0 value=b'\x13\x89\x60\x07' + - id='Checksum' length=32 position=0 value=b'\xae\x51\x7e\x10' + - id='Chunk Type' length=8 position=0 value=b'\x0e' + - id='Chunk Flags' length=8 position=0 value=b'\x00' + - id='Chunk Length' length=16 position=0 value=b'\x00\x04' + """ + + valid_sctp_packet:bytes = bytes(b'\x80\x45\x00\x50\x13\x89\x60\x07\xae\x51\x7e\x10\x0e\x00\x00\x04') + + valid_sctp_packet_buffer: Buffer = Buffer(content=valid_sctp_packet, length=len(valid_sctp_packet)*8) + parser:SCTPParser = SCTPParser() + + sctp_header_descriptor: HeaderDescriptor = parser.parse(buffer=valid_sctp_packet_buffer) + + # test sctp_header_descriptor type + assert isinstance(sctp_header_descriptor, HeaderDescriptor) + + # test for sctp_header_descriptor.fields length + assert len(sctp_header_descriptor.fields) == 7 + + # test for sctp_header_descriptor.fields types + for field in sctp_header_descriptor.fields: + assert isinstance(field, FieldDescriptor) + + # assert field descriptors match SCTP header content + # - common header fields + source_port_fd:FieldDescriptor = sctp_header_descriptor.fields[0] + assert source_port_fd.id == SCTPFields.SOURCE_PORT + assert source_port_fd.position == 0 + assert source_port_fd.value == Buffer(content=b'\x80\x45', length=16) + + destination_port_fd:FieldDescriptor = sctp_header_descriptor.fields[1] + assert destination_port_fd.id == SCTPFields.DESTINATION_PORT + assert destination_port_fd.position == 0 + assert destination_port_fd.value == Buffer(content=b'\x00\x50', length=16) + + verification_tag_fd:FieldDescriptor = sctp_header_descriptor.fields[2] + assert verification_tag_fd.id == SCTPFields.VERIFICATION_TAG + assert verification_tag_fd.position == 0 + assert verification_tag_fd.value == Buffer(content=b'\x13\x89\x60\x07', length=32) + + checksum_fd:FieldDescriptor = sctp_header_descriptor.fields[3] + assert checksum_fd.id == SCTPFields.CHECKSUM + assert checksum_fd.position == 0 + assert checksum_fd.value == Buffer(content=b'\xae\x51\x7e\x10', length=32) + + + # - chunk common header fields + chunk_type_fd:FieldDescriptor = sctp_header_descriptor.fields[4] + assert chunk_type_fd.id == SCTPFields.CHUNK_TYPE + assert chunk_type_fd.position == 0 + assert chunk_type_fd.value == Buffer(content=b'\x0e', length=8) + + chunk_flags_fd:FieldDescriptor = sctp_header_descriptor.fields[5] + assert chunk_flags_fd.id == SCTPFields.CHUNK_FLAGS + assert chunk_flags_fd.position == 0 + assert chunk_flags_fd.value == Buffer(content=b'\x00', length=8) + + chunk_length_fd:FieldDescriptor = sctp_header_descriptor.fields[6] + assert chunk_length_fd.id == SCTPFields.CHUNK_LENGTH + assert chunk_length_fd.position == 0 + assert chunk_length_fd.value == Buffer(content=b'\x00\x04', length=16) + +def test_sctp_parser_parse_error(): + # TODO: find PCPAP with error chunk + pass + +def test_sctp_parser_parse_cookie_echo(): + """test: SCTP header parser parses SCTP Header with a COOKIE ECHO chunk + + The packet is made of a SCTP header with the following fields: + - id='Source Port Number' length=16 position=0 value=b'\x80\x44' + - id='Destination Port Number' length=16 position=0 value=b'\x00\x50' + - id='Verification Tag' length=32 position=0 value=b'\xd2\x6a\xc1\xe5' + - id='Checksum' length=32 position=0 value=b'\x75\x14\xa4\x9a' + - id='Chunk Type' length=8 position=0 value=b'\x0a' + - id='Chunk Flags' length=8 position=0 value=b'\x00' + - id='Chunk Length' length=16 position=0 value=b'\x00\xc4' + - id='Cookie' length=1536 position=0 value=b'\xb3\x49\x30\x15\xe1\xc2\x76\x25\xf5\x3a\xb8\x18\x55\x55\xb8\xdb' + b'\x7a\x0f\x43\xdd\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' + b'\xe5\xc1\x6a\xd2\x46\x9c\xb9\x3b\x00\x00\x00\x00\x00\x00\x00\x00' + b'\xf4\xc5\xc5\x43\x95\x2b\x00\x00\x0a\x00\x0a\x00\x16\x2a\x00\x64' + b'\x02\x00\x44\x80\x9b\xe6\x18\x9b\x00\x00\x00\x00\x00\x00\x00\x00' + b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x50\x00\x01\x00' + b'\x00\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x3c\x3b\xb9\x9c\x46' + b'\x00\x01\xa0\x00\x00\x0a\xff\xff\x2b\x2d\x7e\xb2\x00\x05\x00\x08' + b'\x9b\xe6\x18\x9b\x00\x05\x00\x08\x9b\xe6\x18\x9c\x00\x0c\x00\x06' + b'\x00\x05\x00\x00\x80\x00\x00\x04\xc0\x00\x00\x04\xc0\x06\x00\x08' + b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' + b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' + """ + + valid_sctp_packet:bytes = bytes(b'\x80\x44\x00\x50\xd2\x6a\xc1\xe5\x75\x14\xa4\x9a\x0a\x00\x00\xc4' + b'\xb3\x49\x30\x15\xe1\xc2\x76\x25\xf5\x3a\xb8\x18\x55\x55\xb8\xdb' + b'\x7a\x0f\x43\xdd\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' + b'\xe5\xc1\x6a\xd2\x46\x9c\xb9\x3b\x00\x00\x00\x00\x00\x00\x00\x00' + b'\xf4\xc5\xc5\x43\x95\x2b\x00\x00\x0a\x00\x0a\x00\x16\x2a\x00\x64' + b'\x02\x00\x44\x80\x9b\xe6\x18\x9b\x00\x00\x00\x00\x00\x00\x00\x00' + b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x50\x00\x01\x00' + b'\x00\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x3c\x3b\xb9\x9c\x46' + b'\x00\x01\xa0\x00\x00\x0a\xff\xff\x2b\x2d\x7e\xb2\x00\x05\x00\x08' + b'\x9b\xe6\x18\x9b\x00\x05\x00\x08\x9b\xe6\x18\x9c\x00\x0c\x00\x06' + b'\x00\x05\x00\x00\x80\x00\x00\x04\xc0\x00\x00\x04\xc0\x06\x00\x08' + b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' + b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' + + ) + + valid_sctp_packet_buffer: Buffer = Buffer(content=valid_sctp_packet, length=len(valid_sctp_packet)*8) + parser:SCTPParser = SCTPParser() + + sctp_header_descriptor: HeaderDescriptor = parser.parse(buffer=valid_sctp_packet_buffer) + + # test sctp_header_descriptor type + assert isinstance(sctp_header_descriptor, HeaderDescriptor) + + # test for sctp_header_descriptor.fields length + assert len(sctp_header_descriptor.fields) == 8 + + # test for sctp_header_descriptor.fields types + for field in sctp_header_descriptor.fields: + assert isinstance(field, FieldDescriptor) + + # assert field descriptors match SCTP header content + # - common header fields + source_port_fd:FieldDescriptor = sctp_header_descriptor.fields[0] + assert source_port_fd.id == SCTPFields.SOURCE_PORT + assert source_port_fd.position == 0 + assert source_port_fd.value == Buffer(content=b'\x80\x44', length=16) + + destination_port_fd:FieldDescriptor = sctp_header_descriptor.fields[1] + assert destination_port_fd.id == SCTPFields.DESTINATION_PORT + assert destination_port_fd.position == 0 + assert destination_port_fd.value == Buffer(content=b'\x00\x50', length=16) + + verification_tag_fd:FieldDescriptor = sctp_header_descriptor.fields[2] + assert verification_tag_fd.id == SCTPFields.VERIFICATION_TAG + assert verification_tag_fd.position == 0 + assert verification_tag_fd.value == Buffer(content=b'\xd2\x6a\xc1\xe5', length=32) + + checksum_fd:FieldDescriptor = sctp_header_descriptor.fields[3] + assert checksum_fd.id == SCTPFields.CHECKSUM + assert checksum_fd.position == 0 + assert checksum_fd.value == Buffer(content=b'\x75\x14\xa4\x9a', length=32) + + + # - chunk common header fields + chunk_type_fd:FieldDescriptor = sctp_header_descriptor.fields[4] + assert chunk_type_fd.id == SCTPFields.CHUNK_TYPE + assert chunk_type_fd.position == 0 + assert chunk_type_fd.value == Buffer(content=b'\x0a', length=8) + + chunk_flags_fd:FieldDescriptor = sctp_header_descriptor.fields[5] + assert chunk_flags_fd.id == SCTPFields.CHUNK_FLAGS + assert chunk_flags_fd.position == 0 + assert chunk_flags_fd.value == Buffer(content=b'\x00', length=8) + + chunk_length_fd:FieldDescriptor = sctp_header_descriptor.fields[6] + assert chunk_length_fd.id == SCTPFields.CHUNK_LENGTH + assert chunk_length_fd.position == 0 + assert chunk_length_fd.value == Buffer(content=b'\x00\xc4', length=16) + + cookie_fd:FieldDescriptor = sctp_header_descriptor.fields[7] + assert cookie_fd.id == SCTPFields.CHUNK_COOKIE_ECHO_COOKIE + assert cookie_fd.position == 0 + assert cookie_fd.value == Buffer(content=b'\xb3\x49\x30\x15\xe1\xc2\x76\x25\xf5\x3a\xb8\x18\x55\x55\xb8\xdb' + b'\x7a\x0f\x43\xdd\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' + b'\xe5\xc1\x6a\xd2\x46\x9c\xb9\x3b\x00\x00\x00\x00\x00\x00\x00\x00' + b'\xf4\xc5\xc5\x43\x95\x2b\x00\x00\x0a\x00\x0a\x00\x16\x2a\x00\x64' + b'\x02\x00\x44\x80\x9b\xe6\x18\x9b\x00\x00\x00\x00\x00\x00\x00\x00' + b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x50\x00\x01\x00' + b'\x00\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x3c\x3b\xb9\x9c\x46' + b'\x00\x01\xa0\x00\x00\x0a\xff\xff\x2b\x2d\x7e\xb2\x00\x05\x00\x08' + b'\x9b\xe6\x18\x9b\x00\x05\x00\x08\x9b\xe6\x18\x9c\x00\x0c\x00\x06' + b'\x00\x05\x00\x00\x80\x00\x00\x04\xc0\x00\x00\x04\xc0\x06\x00\x08' + b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' + b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00', + length=1536) + +def test_sctp_parser_parse_cookie_ack(): + """test: SCTP header parser parses SCTP Header with a COKKIE ACK chunk + + The packet is made of a SCTP header with the following fields: + - id='Source Port Number' length=16 position=0 value=b'\x00\x50' + - id='Destination Port Number' length=16 position=0 value=b'\x80\x44' + - id='Verification Tag' length=32 position=0 value=b'\x3b\xb9\x9c\x46' + - id='Checksum' length=32 position=0 value=b'\x81\xce\xde\x0b' + - id='Chunk Type' length=8 position=0 value=b'\x0e' + - id='Chunk Flags' length=8 position=0 value=b'\x00' + - id='Chunk Length' length=16 position=0 value=b'\x00\x04' + """ + + valid_sctp_packet:bytes = bytes(b'\x00\x50\x80\x44\x3b\xb9\x9c\x46\x81\xce\xde\x0b\x0b\x00\x00\x04') + + valid_sctp_packet_buffer: Buffer = Buffer(content=valid_sctp_packet, length=len(valid_sctp_packet)*8) + parser:SCTPParser = SCTPParser() + + sctp_header_descriptor: HeaderDescriptor = parser.parse(buffer=valid_sctp_packet_buffer) + + # test sctp_header_descriptor type + assert isinstance(sctp_header_descriptor, HeaderDescriptor) + + # test for sctp_header_descriptor.fields length + assert len(sctp_header_descriptor.fields) == 7 + + # test for sctp_header_descriptor.fields types + for field in sctp_header_descriptor.fields: + assert isinstance(field, FieldDescriptor) + + # assert field descriptors match SCTP header content + # - common header fields + source_port_fd:FieldDescriptor = sctp_header_descriptor.fields[0] + assert source_port_fd.id == SCTPFields.SOURCE_PORT + assert source_port_fd.position == 0 + assert source_port_fd.value == Buffer(content=b'\x00\x50', length=16) + + destination_port_fd:FieldDescriptor = sctp_header_descriptor.fields[1] + assert destination_port_fd.id == SCTPFields.DESTINATION_PORT + assert destination_port_fd.position == 0 + assert destination_port_fd.value == Buffer(content=b'\x80\x44', length=16) + + verification_tag_fd:FieldDescriptor = sctp_header_descriptor.fields[2] + assert verification_tag_fd.id == SCTPFields.VERIFICATION_TAG + assert verification_tag_fd.position == 0 + assert verification_tag_fd.value == Buffer(content=b'\x3b\xb9\x9c\x46', length=32) + + checksum_fd:FieldDescriptor = sctp_header_descriptor.fields[3] + assert checksum_fd.id == SCTPFields.CHECKSUM + assert checksum_fd.position == 0 + assert checksum_fd.value == Buffer(content=b'\x81\xce\xde\x0b', length=32) + + + # - chunk common header fields + chunk_type_fd:FieldDescriptor = sctp_header_descriptor.fields[4] + assert chunk_type_fd.id == SCTPFields.CHUNK_TYPE + assert chunk_type_fd.position == 0 + assert chunk_type_fd.value == Buffer(content=b'\x0b', length=8) + + chunk_flags_fd:FieldDescriptor = sctp_header_descriptor.fields[5] + assert chunk_flags_fd.id == SCTPFields.CHUNK_FLAGS + assert chunk_flags_fd.position == 0 + assert chunk_flags_fd.value == Buffer(content=b'\x00', length=8) + + chunk_length_fd:FieldDescriptor = sctp_header_descriptor.fields[6] + assert chunk_length_fd.id == SCTPFields.CHUNK_LENGTH + assert chunk_length_fd.position == 0 + assert chunk_length_fd.value == Buffer(content=b'\x00\x04', length=16) \ No newline at end of file From e1de08b8c3344b9ef5cbe6260e245f7f18058aa3 Mon Sep 17 00:00:00 2001 From: Quentin Lampin Date: Mon, 9 Dec 2024 15:54:14 +0100 Subject: [PATCH 10/10] [version] bump to 0.17.0 --- microschc/__init__.py | 2 +- tests/test_microschc.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/microschc/__init__.py b/microschc/__init__.py index 538eb5d..435d64b 100644 --- a/microschc/__init__.py +++ b/microschc/__init__.py @@ -1 +1 @@ -__version__ = '0.16.4' +__version__ = '0.17.0' diff --git a/tests/test_microschc.py b/tests/test_microschc.py index dce3adb..4f5d419 100644 --- a/tests/test_microschc.py +++ b/tests/test_microschc.py @@ -2,4 +2,4 @@ def test_version(): - assert __version__ == '0.16.4' + assert __version__ == '0.17.0'