diff --git a/microschc/__init__.py b/microschc/__init__.py index 538eb5d..435d64b 100644 --- a/microschc/__init__.py +++ b/microschc/__init__.py @@ -1 +1 @@ -__version__ = '0.16.4' +__version__ = '0.17.0' diff --git a/microschc/protocol/sctp.py b/microschc/protocol/sctp.py new file mode 100644 index 0000000..48c0d45 --- /dev/null +++ b/microschc/protocol/sctp.py @@ -0,0 +1,624 @@ +""" +SCTP header declarations + +Declarations for the SCTP protocol header as defined in RFC9260 [1]. + +[1] "RFC9260: Stream Control Transmission Protocol, R. Stewart et al. +""" + + +from enum import Enum + +SCTP_HEADER_ID = 'SCTP' + +from enum import Enum +from typing import Dict, List, Tuple +from microschc.binary.buffer import Buffer +from microschc.parser import HeaderParser, ParserError +from microschc.rfc8724 import FieldDescriptor, HeaderDescriptor + +SCTP_HEADER_ID = 'SCTP' + +class SCTPFields(str, Enum): + SOURCE_PORT = f'{SCTP_HEADER_ID}:Source Port' + DESTINATION_PORT = f'{SCTP_HEADER_ID}:Destination Port' + VERIFICATION_TAG = f'{SCTP_HEADER_ID}:Verification Tag' + CHECKSUM = f'{SCTP_HEADER_ID}:Checksum' + CHUNK_TYPE = f'{SCTP_HEADER_ID}:Chunk Type' + CHUNK_FLAGS = f'{SCTP_HEADER_ID}:Chunk Flags' + CHUNK_LENGTH = f'{SCTP_HEADER_ID}:Chunk Length' + CHUNK_VALUE = f'{SCTP_HEADER_ID}:Chunk Value' + CHUNK_PADDING = f'{SCTP_HEADER_ID}:Chunk Padding' + + CHUNK_DATA_TSN = f'{SCTP_HEADER_ID}:Data TSN' + CHUNK_DATA_STREAM_IDENTIFIER = f'{SCTP_HEADER_ID}:Data Stream Identifier S' + CHUNK_DATA_STREAM_SEQUENCE_NUMBER = f'{SCTP_HEADER_ID}:Data Stream Sequence Number n' + CHUNK_DATA_PAYLOAD_PROTOCOL_IDENTIFIER = f'{SCTP_HEADER_ID}:Data Payload Protocol Identifier' + CHUNK_DATA_PAYLOAD = f'{SCTP_HEADER_ID}:Data Payload' + + CHUNK_INIT_INITIATE_TAG = f'{SCTP_HEADER_ID}:Init Initiate Tag' + CHUNK_INIT_ADVERTISED_RECEIVER_WINDOW_CREDIT = f'{SCTP_HEADER_ID}:Init Advertised Receiver Window Credit' + CHUNK_INIT_NUMBER_OF_OUTBOUND_STREAMS = f'{SCTP_HEADER_ID}:Init Number of Outbound Streams' + CHUNK_INIT_NUMBER_OF_INBOUND_STREAMS = f'{SCTP_HEADER_ID}:Init Number of Inbound Streams' + CHUNK_INIT_INITIAL_TSN = f'{SCTP_HEADER_ID}:Init Initial TSN' + + CHUNK_INIT_ACK_INITIATE_TAG = f'{SCTP_HEADER_ID}:Init Ack Initiate Tag' + CHUNK_INIT_ACK_ADVERTISED_RECEIVER_WINDOW_CREDIT = f'{SCTP_HEADER_ID}:Init Ack Advertised Receiver Window Credit' + CHUNK_INIT_ACK_NUMBER_OF_OUTBOUND_STREAMS = f'{SCTP_HEADER_ID}:Init Ack Number of Outbound Streams' + CHUNK_INIT_ACK_NUMBER_OF_INBOUND_STREAMS = f'{SCTP_HEADER_ID}:Init Ack Number of Inbound Streams' + CHUNK_INIT_ACK_INITIAL_TSN = f'{SCTP_HEADER_ID}:Init Ack Initial TSN' + + CHUNK_SACK_CUMULATIVE_TSN_ACK = f'{SCTP_HEADER_ID}:Selective Ack Cumulative TSN Ack' + CHUNK_SACK_ADVERTISED_RECEIVER_WINDOW_CREDIT = f'{SCTP_HEADER_ID}:Selective Ack Advertised Receiver Window Credit' + CHUNK_SACK_NUMBER_GAP_ACK_BLOCKS = f'{SCTP_HEADER_ID}:Selective Ack Number Gap Ack Blocks' + CHUNK_SACK_NUMBER_DUPLICATE_TSNS = f'{SCTP_HEADER_ID}:Selective Ack Number Duplicate TSNs' + CHUNK_SACK_GAP_ACK_BLOCK_START = f'{SCTP_HEADER_ID}:Selective Ack Gap Ack BLock Start' + CHUNK_SACK_GAP_ACK_BLOCK_END = f'{SCTP_HEADER_ID}:Selective Ack Gap Ack BLock End' + CHUNK_SACK_DUPLICATE_TSN = f'{SCTP_HEADER_ID}:Selective Ack Duplicate TSN' + + CHUNK_SHUTDOWN_CUMULATIVE_TSN_ACK = f'{SCTP_HEADER_ID}:Shutdown Cumulative TSN' + + CHUNK_COOKIE_ECHO_COOKIE = f'{SCTP_HEADER_ID}:Cookie Echo Cookie' + + PARAMETER_TYPE = f'{SCTP_HEADER_ID}:Parameter Type' + PARAMETER_LENGTH = f'{SCTP_HEADER_ID}:Parameter Length' + PARAMETER_VALUE = f'{SCTP_HEADER_ID}:Parameter Value' + PARAMETER_PADDING = f'{SCTP_HEADER_ID}:Parameter Padding' + + + +class SCTPChunkTypes(int, Enum): + # ID Value Chunk Type + # ----- ---------- + # 0 - Payload Data (DATA) + # 1 - Initiation (INIT) + # 2 - Initiation Acknowledgement (INIT ACK) + # 3 - Selective Acknowledgement (SACK) + # 4 - Heartbeat Request (HEARTBEAT) + # 5 - Heartbeat Acknowledgement (HEARTBEAT ACK) + # 6 - Abort (ABORT) + # 7 - Shutdown (SHUTDOWN) + # 8 - Shutdown Acknowledgement (SHUTDOWN ACK) + # 9 - Operation Error (ERROR) + # 10 - State Cookie (COOKIE ECHO) + # 11 - Cookie Acknowledgement (COOKIE ACK) + # 12 - Reserved for Explicit Congestion Notification Echo (ECNE) + # 13 - Reserved for Congestion Window Reduced (CWR) + # 14 - Shutdown Complete (SHUTDOWN COMPLETE) + # 15 to 62 - reserved by IETF + # 63 - IETF-defined Chunk Extensions + # 64 to 126 - reserved by IETF + # 127 - IETF-defined Chunk Extensions + # 128 to 190 - reserved by IETF + # 191 - IETF-defined Chunk Extensions + # 192 to 254 - reserved by IETF + # 255 - IETF-defined Chunk Extensions + DATA = 0 + INIT = 1 + INIT_ACK = 2 + SACK = 3 + HEARTBEAT = 4 + HEARTBEAT_ACK = 5 + ABORT = 6 + SHUTDOWN = 7 + SHUTDOWN_ACK = 8 + ERROR = 9 + COOKIE_ECHO = 10 + COOKIE_ACK = 11 + ECNE = 12 + CWR = 13 + SHUTDOWN_COMPLETE = 14 + + + +class SCTPParser(HeaderParser): + def __init__(self) -> None: + super().__init__(name=SCTP_HEADER_ID) + + def match(self, buffer: Buffer) -> bool: + return buffer.length >= 12 * 8 # SCTP header is at least 12 bytes + + def parse(self, buffer: Buffer) -> HeaderDescriptor: + """ + 0 1 2 3 + 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | Source Port Number | Destination Port Number | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | Verification Tag | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | Checksum | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | | + | Chunk #1 | + | | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | | + | Chunk #2 | + | | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + """ + if buffer.length < 12 * 8: + raise ParserError(buffer=buffer, message=f'length too short: {buffer.length} < 96') + + # Source Port: 16 bits + source_port: Buffer = buffer[0:16] + # Destination Port: 16 bits + destination_port: Buffer = buffer[16:32] + # Verification Tag: 32 bits + verification_tag: Buffer = buffer[32:64] + # Checksum: 32 bits + checksum: Buffer = buffer[64:96] + + header_fields: List[FieldDescriptor] = [ + FieldDescriptor(id=SCTPFields.SOURCE_PORT, position=0, value=source_port), + FieldDescriptor(id=SCTPFields.DESTINATION_PORT, position=0, value=destination_port), + FieldDescriptor(id=SCTPFields.VERIFICATION_TAG, position=0, value=verification_tag), + FieldDescriptor(id=SCTPFields.CHECKSUM, position=0, value=checksum), + ] + + chunks: Buffer = buffer[96:] + + while chunks.length > 0: + chunks_fields, chunks_bits_consumed = self._parse_chunk(chunks) + chunks = chunks[chunks_bits_consumed:] + header_fields.extend(chunks_fields) + + + header_descriptor: HeaderDescriptor = HeaderDescriptor( + id=SCTP_HEADER_ID, + length=buffer.length, + fields=header_fields + ) + return header_descriptor + + + def _parse_chunk(self, buffer: Buffer) -> Tuple[List[FieldDescriptor], int]: + fields: List[FieldDescriptor] = [] + + # Chunk Type: 8 bits + chunk_type: Buffer = buffer[0:8] + fields.append(FieldDescriptor(id=SCTPFields.CHUNK_TYPE, position=0, value=chunk_type)) + + # Chunk Flags: 8 bits + chunk_flags: Buffer = buffer[8:16] + fields.append(FieldDescriptor(id=SCTPFields.CHUNK_FLAGS, position=0, value=chunk_flags)) + + # Chunk Length: 16 bits + chunk_length: Buffer = buffer[16:32] + fields.append(FieldDescriptor(id=SCTPFields.CHUNK_LENGTH, position=0, value=chunk_length)) + + chunk_length_value: int = chunk_length.value(type='unsigned int') * 8 + + # Chunk Value: variable length + chunk_value_length = chunk_length_value - 32 # Length includes the 4 bytes of type, flags, and length + if chunk_value_length > 0: + chunk_type_value: int = chunk_type.value() + chunk_value: Buffer = buffer[32: 32 + chunk_value_length] + + if chunk_type_value == SCTPChunkTypes.DATA: + chunk_fields: List[FieldDescriptor] = self._parse_chunk_data(chunk_value) + elif chunk_type_value == SCTPChunkTypes.INIT: + chunk_fields: List[FieldDescriptor] = self._parse_chunk_init(chunk_value) + elif chunk_type_value == SCTPChunkTypes.INIT_ACK: + chunk_fields: List[FieldDescriptor] = self._parse_chunk_init_ack(chunk_value) + elif chunk_type_value == SCTPChunkTypes.SACK: + chunk_fields: List[FieldDescriptor] = self._parse_chunk_selective_ack(chunk_value) + elif chunk_type_value == SCTPChunkTypes.HEARTBEAT: + chunk_fields: List[FieldDescriptor] = self._parse_chunk_heartbeat(chunk_value) + elif chunk_type_value == SCTPChunkTypes.HEARTBEAT_ACK: + chunk_fields: List[FieldDescriptor] = self._parse_chunk_heartbeat_ack(chunk_value) + elif chunk_type_value == SCTPChunkTypes.ABORT: + chunk_fields: List[FieldDescriptor] = self._parse_chunk_abort(chunk_value) + elif chunk_type_value == SCTPChunkTypes.SHUTDOWN: + chunk_fields: List[FieldDescriptor] = self._parse_chunk_shutdown(chunk_value) + elif chunk_type_value == SCTPChunkTypes.SHUTDOWN_ACK: + chunk_fields: List[FieldDescriptor] = self._parse_chunk_shutdown_ack(chunk_value) + elif chunk_type_value == SCTPChunkTypes.ERROR: + chunk_fields: List[FieldDescriptor] = self._parse_chunk_error(chunk_value) + elif chunk_type_value == SCTPChunkTypes.COOKIE_ECHO: + chunk_fields: List[FieldDescriptor] = self._parse_chunk_cookie_echo(chunk_value) + elif chunk_type_value == SCTPChunkTypes.COOKIE_ACK: + chunk_fields: List[FieldDescriptor] = self._parse_chunk_cookie_ack(chunk_value) + elif chunk_type_value == SCTPChunkTypes.SHUTDOWN_COMPLETE: + chunk_fields: List[FieldDescriptor] = self._parse_chunk_shutdown_complete(chunk_value) + else: + chunk_fields: List[FieldDescriptor] = [FieldDescriptor(id=SCTPFields.CHUNK_VALUE, position=0, value=chunk_value)] + fields.extend(chunk_fields) + + chunk_padding_length: int = (32 - chunk_length_value%32)%32 + if chunk_padding_length > 0: + chunk_padding: Buffer = buffer[chunk_length_value: chunk_length_value+chunk_padding_length] + fields.append(FieldDescriptor(id=SCTPFields.CHUNK_PADDING, position=0, value=chunk_padding)) + + + bits_consumed = chunk_length_value + chunk_padding_length + + return fields, bits_consumed + + + def _parse_chunk_data(self, buffer: Buffer) -> List[FieldDescriptor]: + """ + 0 1 2 3 + 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | Type = 0 | Reserved|U|B|E| Length | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | TSN | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | Stream Identifier S | Stream Sequence Number n | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | Payload Protocol Identifier | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + \ \ + / User Data (seq n of Stream S) / + \ \ + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + """ + fields: List[FieldDescriptor] = [] + tsn: Buffer = buffer[0:32] + stream_identifier_s: Buffer = buffer[32:48] + stream_sequence_number_n: Buffer = buffer[48:64] + payload_protocol_identifier: Buffer = buffer[64:96] + user_data: Buffer = buffer[96:] + + fields.extend( + [ + FieldDescriptor(id=SCTPFields.CHUNK_DATA_TSN, value=tsn, position=0), + FieldDescriptor(id=SCTPFields.CHUNK_DATA_STREAM_IDENTIFIER, value=stream_identifier_s, position=0), + FieldDescriptor(id=SCTPFields.CHUNK_DATA_STREAM_SEQUENCE_NUMBER, value=stream_sequence_number_n, position=0), + FieldDescriptor(id=SCTPFields.CHUNK_DATA_PAYLOAD_PROTOCOL_IDENTIFIER, value=payload_protocol_identifier, position=0), + FieldDescriptor(id=SCTPFields.CHUNK_DATA_PAYLOAD, value=user_data, position=0) + ] + ) + return fields + + def _parse_chunk_init(self, buffer: Buffer) -> List[FieldDescriptor]: + """ + 0 1 2 3 + 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | Type = 1 | Chunk Flags | Chunk Length | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | Initiate Tag | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | Advertised Receiver Window Credit (a_rwnd) | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | Number of Outbound Streams | Number of Inbound Streams | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | Initial TSN | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + \ \ + / Optional/Variable-Length Parameters / + \ \ + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + """ + fields: List[FieldDescriptor] = [] + initiate_tag: Buffer = buffer[0:32] + advertised_receiver_window_credit: Buffer = buffer[32:64] + number_outbound_streams: Buffer = buffer[64:80] + number_inbound_streams: Buffer = buffer[80:96] + initial_tsn: Buffer = buffer[96:128] + + fields.extend([ + FieldDescriptor(id=SCTPFields.CHUNK_INIT_INITIATE_TAG, value=initiate_tag, position=0), + FieldDescriptor(id=SCTPFields.CHUNK_INIT_ADVERTISED_RECEIVER_WINDOW_CREDIT, value=advertised_receiver_window_credit, position=0), + FieldDescriptor(id=SCTPFields.CHUNK_INIT_NUMBER_OF_OUTBOUND_STREAMS, value=number_outbound_streams, position=0), + FieldDescriptor(id=SCTPFields.CHUNK_INIT_NUMBER_OF_INBOUND_STREAMS, value=number_inbound_streams, position=0), + FieldDescriptor(id=SCTPFields.CHUNK_INIT_INITIAL_TSN, value=initial_tsn, position=0) + ]) + + parameters = buffer[128:] + while parameters.length > 0: + parameter_fields, bits_consumed = self._parse_parameter(parameters) + fields.extend(parameter_fields) + parameters = parameters[bits_consumed:] + + return fields + + def _parse_chunk_init_ack(self, buffer: Buffer) -> List[FieldDescriptor]: + """ + 0 1 2 3 + 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | Type = 2 | Chunk Flags | Chunk Length | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | Initiate Tag | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | Advertised Receiver Window Credit (a_rwnd) | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | Number of Outbound Streams | Number of Inbound Streams | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | Initial TSN | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + \ \ + / Optional/Variable-Length Parameters / + \ \ + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + """ + fields: List[FieldDescriptor] = [] + initiate_tag: Buffer = buffer[0:32] + advertised_receiver_window_credit: Buffer = buffer[32:64] + number_outbound_streams: Buffer = buffer[64:80] + number_inbound_streams: Buffer = buffer[80:96] + initial_tsn: Buffer = buffer[96:128] + + fields.extend([ + FieldDescriptor(id=SCTPFields.CHUNK_INIT_ACK_INITIATE_TAG, value=initiate_tag, position=0), + FieldDescriptor(id=SCTPFields.CHUNK_INIT_ACK_ADVERTISED_RECEIVER_WINDOW_CREDIT, value=advertised_receiver_window_credit, position=0), + FieldDescriptor(id=SCTPFields.CHUNK_INIT_ACK_NUMBER_OF_OUTBOUND_STREAMS, value=number_outbound_streams, position=0), + FieldDescriptor(id=SCTPFields.CHUNK_INIT_ACK_NUMBER_OF_INBOUND_STREAMS, value=number_inbound_streams, position=0), + FieldDescriptor(id=SCTPFields.CHUNK_INIT_ACK_INITIAL_TSN, value=initial_tsn, position=0) + ]) + + parameters = buffer[128:] + while parameters.length > 0: + parameter_fields, bits_consumed = self._parse_parameter(parameters) + fields.extend(parameter_fields) + parameters = parameters[bits_consumed:] + + return fields + + + def _parse_chunk_selective_ack(self, buffer: Buffer) -> List[FieldDescriptor]: + """ + 0 1 2 3 + 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | Type = 3 | Chunk Flags | Chunk Length | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | Cumulative TSN Ack | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | Advertised Receiver Window Credit (a_rwnd) | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | Number of Gap Ack Blocks = N | Number of Duplicate TSNs = M | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | Gap Ack Block #1 Start | Gap Ack Block #1 End | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + / / + \ ... \ + / / + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | Gap Ack Block #N Start | Gap Ack Block #N End | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | Duplicate TSN 1 | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + / / + \ ... \ + / / + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | Duplicate TSN M | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + """ + fields: List[FieldDescriptor] = [] + cumulative_tsn_ack: Buffer = buffer[0:32] + advertised_receiver_window_credit: Buffer = buffer[32:64] + number_gap_ack_blocks: Buffer = buffer[64:80] + number_duplicate_tsns: Buffer = buffer[80:96] + + fields.extend([ + FieldDescriptor(id=SCTPFields.CHUNK_SACK_CUMULATIVE_TSN_ACK, value=cumulative_tsn_ack, position=0), + FieldDescriptor(id=SCTPFields.CHUNK_SACK_ADVERTISED_RECEIVER_WINDOW_CREDIT, value=advertised_receiver_window_credit, position=0), + FieldDescriptor(id=SCTPFields.CHUNK_SACK_NUMBER_GAP_ACK_BLOCKS, value=number_gap_ack_blocks, position=0), + FieldDescriptor(id=SCTPFields.CHUNK_SACK_NUMBER_DUPLICATE_TSNS, value=number_duplicate_tsns, position=0) + ]) + + remainer: Buffer = buffer[96:] + # Gap ack Blocks + number_gap_ack_blocks_value: int = number_gap_ack_blocks.value() + + for _ in range(number_gap_ack_blocks_value): + gap_ack_block_start: Buffer = remainer[0:16] + gap_ack_block_end: Buffer = remainer[16:32] + fields.extend([ + FieldDescriptor(id=SCTPFields.CHUNK_SACK_GAP_ACK_BLOCK_START, value=gap_ack_block_start), + FieldDescriptor(id=SCTPFields.CHUNK_SACK_GAP_ACK_BLOCK_END, value=gap_ack_block_end) + ]) + remainer = remainer[32:] + + # Duplicate TSNs + number_duplicate_tsns_value: int = number_duplicate_tsns.value() + for _ in range(number_duplicate_tsns_value): + duplicate_tsn: Buffer = remainer[0:32] + fields.extend([ + FieldDescriptor(id=SCTPFields.CHUNK_SACK_DUPLICATE_TSN, value=duplicate_tsn), + ]) + remainer = remainer[32:] + + return fields + + def _parse_chunk_heartbeat(self, buffer: Buffer) -> List[FieldDescriptor]: + """ + 0 1 2 3 + 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | Type = 4 | Chunk Flags | Heartbeat Length | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + \ \ + / Heartbeat Information TLV (Variable-Length) / + \ \ + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + """ + fields: List[FieldDescriptor] = [] + + parameters = buffer + while parameters.length > 0: + parameter_fields, bits_consumed = self._parse_parameter(parameters) + fields.extend(parameter_fields) + parameters = parameters[bits_consumed:] + + return fields + + def _parse_chunk_heartbeat_ack(self, buffer: Buffer) -> List[FieldDescriptor]: + """ + 0 1 2 3 + 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | Type = 5 | Chunk Flags | Heartbeat Length | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + \ \ + / Heartbeat Information TLV (Variable-Length) / + \ \ + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + """ + fields: List[FieldDescriptor] = [] + + parameters = buffer + while parameters.length > 0: + parameter_fields, bits_consumed = self._parse_parameter(parameters) + fields.extend(parameter_fields) + parameters = parameters[bits_consumed:] + + return fields + + def _parse_chunk_abort(self, buffer: Buffer) -> List[FieldDescriptor]: + """ + 0 1 2 3 + 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | Type = 6 | Reserved |T| Length | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + \ \ + / zero or more Error Causes / + \ \ + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + """ + fields: List[FieldDescriptor] = [] + + parameters = buffer + while parameters.length > 0: + parameter_fields, bits_consumed = self._parse_parameter(parameters) + fields.extend(parameter_fields) + parameters = parameters[bits_consumed:] + + return fields + + def _parse_chunk_shutdown(self, buffer: Buffer) -> List[FieldDescriptor]: + """ + 0 1 2 3 + 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | Type = 7 | Chunk Flags | Length = 8 | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | Cumulative TSN Ack | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + """ + fields: List[FieldDescriptor] = [] + + fields: List[FieldDescriptor] = [] + cumulative_tsn_ack: Buffer = buffer[0:32] + fields.append(FieldDescriptor(id=SCTPFields.CHUNK_SHUTDOWN_CUMULATIVE_TSN_ACK, value=cumulative_tsn_ack, position=0)) + + return fields + + def _parse_chunk_shutdown_ack(self, buffer: Buffer) -> List[FieldDescriptor]: + """ + 0 1 2 3 + 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | Type = 8 | Chunk Flags | Length = 4 | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + """ + fields: List[FieldDescriptor] = [] + return fields + + def _parse_chunk_error(self, buffer: Buffer) -> List[FieldDescriptor]: + """ + 0 1 2 3 + 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | Type = 9 | Chunk Flags | Length | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + \ \ + / one or more Error Causes / + \ \ + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + """ + fields: List[FieldDescriptor] = [] + + parameters = buffer + while parameters.length > 0: + parameter_fields, bits_consumed = self._parse_parameter(parameters) + fields.extend(parameter_fields) + parameters = parameters[bits_consumed:] + + return fields + + def _parse_chunk_cookie_echo(self, buffer: Buffer) -> List[FieldDescriptor]: + """ + 0 1 2 3 + 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | Type = 10 | Chunk Flags | Length | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + / Cookie / + \ \ + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + """ + fields: List[FieldDescriptor] = [] + + cookie: Buffer = buffer + fields.append(FieldDescriptor(id=SCTPFields.CHUNK_COOKIE_ECHO_COOKIE, value=cookie, position=0)) + + return fields + + def _parse_chunk_cookie_ack(self, buffer: Buffer) -> List[FieldDescriptor]: + """ + 0 1 2 3 + 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | Type = 11 | Chunk Flags | Length = 4 | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + """ + fields: List[FieldDescriptor] = [] + return fields + + def _parse_chunk_shutdown_complete(self, buffer: Buffer) -> List[FieldDescriptor]: + """ + 0 1 2 3 + 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | Type = 14 | Chunk Flags | Length = 4 | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + """ + fields: List[FieldDescriptor] = [] + return fields + + def _parse_parameter(self, buffer: Buffer) -> Tuple[List[FieldDescriptor], int]: + """ + 0 1 2 3 + 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | Parameter Type | Parameter Length | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + \ \ + / Parameter Value / + \ \ + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + """ + fields: List[FieldDescriptor] = [] + parameter_type: Buffer = buffer[0:16] + parameter_length: Buffer = buffer[16:32] + + fields.extend([ + FieldDescriptor(id=SCTPFields.PARAMETER_TYPE, value=parameter_type, position=0), + FieldDescriptor(id=SCTPFields.PARAMETER_LENGTH, value=parameter_length, position=0), + ]) + + parameter_length_value: int = parameter_length.value() * 8 + parameter_value_length: int = parameter_length_value - 32 + if parameter_value_length > 0: + parameter_value: Buffer = buffer[32: parameter_length_value] + fields.append(FieldDescriptor(id=SCTPFields.PARAMETER_VALUE, value=parameter_value, position=0)) + + parameter_padding_length: int = (32 - parameter_value_length%32)%32 + if parameter_padding_length > 0: + parameter_padding: Buffer = buffer[parameter_length_value: parameter_length_value + parameter_padding_length] + fields.append( + FieldDescriptor(id=SCTPFields.PARAMETER_PADDING, value=parameter_padding, position=0) + ) + return fields, parameter_length_value + parameter_padding_length + + + + \ No newline at end of file diff --git a/tests/protocol/test_sctp.py b/tests/protocol/test_sctp.py new file mode 100644 index 0000000..4e05e26 --- /dev/null +++ b/tests/protocol/test_sctp.py @@ -0,0 +1,1107 @@ +from typing import Dict, List, Tuple +from microschc.protocol.sctp import SCTPParser, SCTPFields +from microschc.parser.parser import HeaderDescriptor +from microschc.rfc8724 import FieldDescriptor +from microschc.binary.buffer import Buffer + +def test_sctp_parser_import(): + """test: SCTP header parser import and instanciation + The test instanciate an SCTP parser and checks for import errors + """ + parser = SCTPParser() + assert( isinstance(parser, SCTPParser) ) + +def test_sctp_parser_parse_data(): + """test: SCTP header parser parses SCTP Header with DATA chunk + + The packet is made of a SCTP header with the following fields: + - id='Source Port Number' length=16 position=0 value=b'\x25\x0f' + - id='Destination Port Number' length=16 position=0 value=b'\x96\x0c' + - id='Verification Tag' length=32 position=0 value=b'\xc9\x59\x6d\xb9' + - id='Checksum' length=32 position=0 value=b'\x00\x00\x00\x00' + - id='Chunk Type' length=8 position=0 value=b'\x00' + - id='Chunk Flags' length=8 position=0 value=b'\x03' + - id='Chunk Length' length=16 position=0 value=b'\x00\x49' + - id='Data TSN' length=32 position=0 value=b'\xbc\x5b\xc0\x68' + - id='Data Stream Identifier S' length=16 position=0 value=b'\x00\x00' + - id='Data Stream Sequence Number n' length=16 position=0 value=b'\x00\x00' + - id='Data Payload Protocol Identifier' length=16 position=0 value=b'\x00\x00\x00\x3c' + - id='Data Payload' length=32 position=0 value=b'\x00\x15\x00\x35\x00\x00\x04\x00\x1b\x00\x08\x00\x02\xf8\x39\x10' + b'\x00\x01\x02\x00\x52\x40\x09\x03\x00\x66\x72\x65\x65\x35\x67\x63' + b'\x00\x66\x00\x10\x00\x00\x00\x00\x01\x00\x02\xf8\x39\x00\x00\x10' + b'\x08\x01\x02\x03\x00\x15\x40\x01\x40' + - id='Chunk Padding' length=32 position=0 value=b'\x00\x00\x00' + + """ + + valid_sctp_packet:bytes = bytes(b'\x25\x0f\x96\x0c\xc9\x59\x6d\xb9\x00\x00\x00\x00\x00\x03\x00\x49' + b'\xbc\x5b\xc0\x68\x00\x00\x00\x00\x00\x00\x00\x3c' + b'\x00\x15\x00\x35\x00\x00\x04\x00\x1b\x00\x08\x00\x02\xf8\x39\x10' + b'\x00\x01\x02\x00\x52\x40\x09\x03\x00\x66\x72\x65\x65\x35\x67\x63' + b'\x00\x66\x00\x10\x00\x00\x00\x00\x01\x00\x02\xf8\x39\x00\x00\x10' + b'\x08\x01\x02\x03\x00\x15\x40\x01\x40\x00\x00\x00' + ) + valid_sctp_packet_buffer: Buffer = Buffer(content=valid_sctp_packet, length=len(valid_sctp_packet)*8) + parser:SCTPParser = SCTPParser() + + sctp_header_descriptor: HeaderDescriptor = parser.parse(buffer=valid_sctp_packet_buffer) + + # test sctp_header_descriptor type + assert isinstance(sctp_header_descriptor, HeaderDescriptor) + + # test for sctp_header_descriptor.fields length + assert len(sctp_header_descriptor.fields) == 13 + + # test for sctp_header_descriptor.fields types + for field in sctp_header_descriptor.fields: + assert isinstance(field, FieldDescriptor) + + # assert field descriptors match SCTP header content + # - common header fields + source_port_fd:FieldDescriptor = sctp_header_descriptor.fields[0] + assert source_port_fd.id == SCTPFields.SOURCE_PORT + assert source_port_fd.position == 0 + assert source_port_fd.value == Buffer(content=b'\x25\x0f', length=16) + + destination_port_fd:FieldDescriptor = sctp_header_descriptor.fields[1] + assert destination_port_fd.id == SCTPFields.DESTINATION_PORT + assert destination_port_fd.position == 0 + assert destination_port_fd.value == Buffer(content=b'\x96\x0c', length=16) + + verification_tag_fd:FieldDescriptor = sctp_header_descriptor.fields[2] + assert verification_tag_fd.id == SCTPFields.VERIFICATION_TAG + assert verification_tag_fd.position == 0 + assert verification_tag_fd.value == Buffer(content=b'\xc9\x59\x6d\xb9', length=32) + + checksum_fd:FieldDescriptor = sctp_header_descriptor.fields[3] + assert checksum_fd.id == SCTPFields.CHECKSUM + assert checksum_fd.position == 0 + assert checksum_fd.value == Buffer(content=b'\x00\x00\x00\x00', length=32) + + + # - chunk common header fields + chunk_type_fd:FieldDescriptor = sctp_header_descriptor.fields[4] + assert chunk_type_fd.id == SCTPFields.CHUNK_TYPE + assert chunk_type_fd.position == 0 + assert chunk_type_fd.value == Buffer(content=b'\x00', length=8) + + chunk_flags_fd:FieldDescriptor = sctp_header_descriptor.fields[5] + assert chunk_flags_fd.id == SCTPFields.CHUNK_FLAGS + assert chunk_flags_fd.position == 0 + assert chunk_flags_fd.value == Buffer(content=b'\x03', length=8) + + chunk_length_fd:FieldDescriptor = sctp_header_descriptor.fields[6] + assert chunk_length_fd.id == SCTPFields.CHUNK_LENGTH + assert chunk_length_fd.position == 0 + assert chunk_length_fd.value == Buffer(content=b'\x00\x49', length=16) + + # - data chunk header fields + data_tsn_fd:FieldDescriptor = sctp_header_descriptor.fields[7] + assert data_tsn_fd.id == SCTPFields.CHUNK_DATA_TSN + assert data_tsn_fd.position == 0 + assert data_tsn_fd.value == Buffer(content=b'\xbc\x5b\xc0\x68', length=32) + + data_stream_identifier_fd:FieldDescriptor = sctp_header_descriptor.fields[8] + assert data_stream_identifier_fd.id == SCTPFields.CHUNK_DATA_STREAM_IDENTIFIER + assert data_stream_identifier_fd.position == 0 + assert data_stream_identifier_fd.value == Buffer(content=b'\x00\x00', length=16) + + data_stream_sequence_number_fd:FieldDescriptor = sctp_header_descriptor.fields[9] + assert data_stream_sequence_number_fd.id == SCTPFields.CHUNK_DATA_STREAM_SEQUENCE_NUMBER + assert data_stream_sequence_number_fd.position == 0 + assert data_stream_sequence_number_fd.value == Buffer(content=b'\x00\x00', length=16) + + data_payload_protocol_identifier_fd:FieldDescriptor = sctp_header_descriptor.fields[10] + assert data_payload_protocol_identifier_fd.id == SCTPFields.CHUNK_DATA_PAYLOAD_PROTOCOL_IDENTIFIER + assert data_payload_protocol_identifier_fd.position == 0 + assert data_payload_protocol_identifier_fd.value == Buffer(content=b'\x00\x00\x00\x3c', length=32) + + + data_payload_fd:FieldDescriptor = sctp_header_descriptor.fields[11] + assert data_payload_fd.id == SCTPFields.CHUNK_DATA_PAYLOAD + assert data_payload_fd.position == 0 + assert data_payload_fd.value == Buffer(content= b'\x00\x15\x00\x35\x00\x00\x04\x00\x1b\x00\x08\x00\x02\xf8\x39\x10' + b'\x00\x01\x02\x00\x52\x40\x09\x03\x00\x66\x72\x65\x65\x35\x67\x63' + b'\x00\x66\x00\x10\x00\x00\x00\x00\x01\x00\x02\xf8\x39\x00\x00\x10' + b'\x08\x01\x02\x03\x00\x15\x40\x01\x40', + length=456) + + chunk_padding_fd:FieldDescriptor = sctp_header_descriptor.fields[12] + assert chunk_padding_fd.id == SCTPFields.CHUNK_PADDING + assert chunk_padding_fd.position == 0 + assert chunk_padding_fd.value == Buffer(content=b'\x00\x00\x00', length=24) + + +def test_sctp_parser_parse_init(): + """test: SCTP header parser parses SCTP Header with INIT chunk + + The packet is made of a SCTP header with the following fields: + - id='Source Port Number' length=16 position=0 value=b'\x00\x07' + - id='Destination Port Number' length=16 position=0 value=b'\x00\x07' + - id='Verification Tag' length=32 position=0 value=b'\x00\x00\x00\x00' + - id='Checksum' length=32 position=0 value=b'\x37\x61\xa7\x46' + - id='Chunk Type' length=8 position=0 value=b'\x01' + - id='Chunk Flags' length=8 position=0 value=b'\x00' + - id='Chunk Length' length=16 position=0 value=b'\x00\x20' + - id='Initiate Tag length=32 position=0 value=b'\x43\x23\x25\x44' + - id='Advertised Receiver Window Credit' length=32 position=0 value=b'\x00\x00\xff\xff' + - id='Number of Outbound Streams' length=16 position=0 value=b'\x00\x11' + - id='Number of Inbound Streams' length=16 position=0 value=b'\x00\x11' + - id='Initial TSN length=32 position=0 value=b'\x5c\xfe\x37\x9f' + - id='Parameter Type' length=16 position=0 value=b'\xc0\x00' + - id='Parameter Length' length=16 position=0 value=b'\x00\x04' + - id='Parameter Type' length=16 position=0 value=b'\x00\x0c' + - id='Parameter Length' length=16 position=0 value=b'\x00\x06' + - id='Parameter Value' length=16 position=0 value=b'\x00\x05' + - id='Parameter Padding' length=16 position=0 value=b'\x00\x00' + + """ + + valid_sctp_packet:bytes = bytes(b'\x00\x07\x00\x07\x00\x00\x00\x00\x37\x61\xa7\x46\x01\x00\x00\x20' + b'\x43\x23\x25\x44\x00\x00\xff\xff\x00\x11\x00\x11\x5c\xfe\x37\x9f' + b'\xc0\x00\x00\x04\x00\x0c\x00\x06\x00\x05\x00\x00' + ) + valid_sctp_packet_buffer: Buffer = Buffer(content=valid_sctp_packet, length=len(valid_sctp_packet)*8) + parser:SCTPParser = SCTPParser() + + sctp_header_descriptor: HeaderDescriptor = parser.parse(buffer=valid_sctp_packet_buffer) + + # test sctp_header_descriptor type + assert isinstance(sctp_header_descriptor, HeaderDescriptor) + + # test for sctp_header_descriptor.fields length + assert len(sctp_header_descriptor.fields) == 18 + + # test for sctp_header_descriptor.fields types + for field in sctp_header_descriptor.fields: + assert isinstance(field, FieldDescriptor) + + # assert field descriptors match SCTP header content + # - common header fields + source_port_fd:FieldDescriptor = sctp_header_descriptor.fields[0] + assert source_port_fd.id == SCTPFields.SOURCE_PORT + assert source_port_fd.position == 0 + assert source_port_fd.value == Buffer(content=b'\x00\x07', length=16) + + destination_port_fd:FieldDescriptor = sctp_header_descriptor.fields[1] + assert destination_port_fd.id == SCTPFields.DESTINATION_PORT + assert destination_port_fd.position == 0 + assert destination_port_fd.value == Buffer(content=b'\x00\x07', length=16) + + verification_tag_fd:FieldDescriptor = sctp_header_descriptor.fields[2] + assert verification_tag_fd.id == SCTPFields.VERIFICATION_TAG + assert verification_tag_fd.position == 0 + assert verification_tag_fd.value == Buffer(content=b'\x00\x00\x00\x00', length=32) + + checksum_fd:FieldDescriptor = sctp_header_descriptor.fields[3] + assert checksum_fd.id == SCTPFields.CHECKSUM + assert checksum_fd.position == 0 + assert checksum_fd.value == Buffer(content=b'\x37\x61\xa7\x46', length=32) + + + # - chunk common header fields + chunk_type_fd:FieldDescriptor = sctp_header_descriptor.fields[4] + assert chunk_type_fd.id == SCTPFields.CHUNK_TYPE + assert chunk_type_fd.position == 0 + assert chunk_type_fd.value == Buffer(content=b'\x01', length=8) + + chunk_flags_fd:FieldDescriptor = sctp_header_descriptor.fields[5] + assert chunk_flags_fd.id == SCTPFields.CHUNK_FLAGS + assert chunk_flags_fd.position == 0 + assert chunk_flags_fd.value == Buffer(content=b'\x00', length=8) + + chunk_length_fd:FieldDescriptor = sctp_header_descriptor.fields[6] + assert chunk_length_fd.id == SCTPFields.CHUNK_LENGTH + assert chunk_length_fd.position == 0 + assert chunk_length_fd.value == Buffer(content=b'\x00\x20', length=16) + + initiate_tag_fd:FieldDescriptor = sctp_header_descriptor.fields[7] + assert initiate_tag_fd.id == SCTPFields.CHUNK_INIT_INITIATE_TAG + assert initiate_tag_fd.position == 0 + assert initiate_tag_fd.value == Buffer(content=b'\x43\x23\x25\x44', length=32) + + advertised_receiver_window_credit_fd:FieldDescriptor = sctp_header_descriptor.fields[8] + assert advertised_receiver_window_credit_fd.id == SCTPFields.CHUNK_INIT_ADVERTISED_RECEIVER_WINDOW_CREDIT + assert advertised_receiver_window_credit_fd.position == 0 + assert advertised_receiver_window_credit_fd.value == Buffer(content=b'\x00\x00\xff\xff', length=32) + + number_outbound_streams_fd:FieldDescriptor = sctp_header_descriptor.fields[9] + assert number_outbound_streams_fd.id == SCTPFields.CHUNK_INIT_NUMBER_OF_OUTBOUND_STREAMS + assert number_outbound_streams_fd.position == 0 + assert number_outbound_streams_fd.value == Buffer(content=b'\x00\x11', length=16) + + number_inbound_streams_fd:FieldDescriptor = sctp_header_descriptor.fields[10] + assert number_inbound_streams_fd.id == SCTPFields.CHUNK_INIT_NUMBER_OF_INBOUND_STREAMS + assert number_inbound_streams_fd.position == 0 + assert number_inbound_streams_fd.value == Buffer(content=b'\x00\x11', length=16) + + initial_tsn_fd:FieldDescriptor = sctp_header_descriptor.fields[11] + assert initial_tsn_fd.id == SCTPFields.CHUNK_INIT_INITIAL_TSN + assert initial_tsn_fd.position == 0 + assert initial_tsn_fd.value == Buffer(content=b'\x5c\xfe\x37\x9f', length=32) + + parameter_type_fd:FieldDescriptor = sctp_header_descriptor.fields[12] + assert parameter_type_fd.id == SCTPFields.PARAMETER_TYPE + assert parameter_type_fd.position == 0 + assert parameter_type_fd.value == Buffer(content=b'\xc0\x00', length=16) + + parameter_length_fd:FieldDescriptor = sctp_header_descriptor.fields[13] + assert parameter_length_fd.id == SCTPFields.PARAMETER_LENGTH + assert parameter_length_fd.position == 0 + assert parameter_length_fd.value == Buffer(content=b'\x00\x04', length=16) + + parameter_type_fd:FieldDescriptor = sctp_header_descriptor.fields[14] + assert parameter_type_fd.id == SCTPFields.PARAMETER_TYPE + assert parameter_type_fd.position == 0 + assert parameter_type_fd.value == Buffer(content=b'\x00\x0c', length=16) + + parameter_length_fd:FieldDescriptor = sctp_header_descriptor.fields[15] + assert parameter_length_fd.id == SCTPFields.PARAMETER_LENGTH + assert parameter_length_fd.position == 0 + assert parameter_length_fd.value == Buffer(content=b'\x00\x06', length=16) + + parameter_value_fd:FieldDescriptor = sctp_header_descriptor.fields[16] + assert parameter_value_fd.id == SCTPFields.PARAMETER_VALUE + assert parameter_value_fd.position == 0 + assert parameter_value_fd.value == Buffer(content=b'\x00\x05', length=16) + + parameter_padding_fd:FieldDescriptor = sctp_header_descriptor.fields[17] + assert parameter_padding_fd.id == SCTPFields.PARAMETER_PADDING + assert parameter_padding_fd.position == 0 + assert parameter_padding_fd.value == Buffer(content=b'\x00\x00', length=16) + + + + + +def test_sctp_parser_parse_init_ack(): + """test: SCTP header parser parses SCTP Header with INIT_ACK chunk + + The packet is made of a SCTP header with the following fields: + - id='Source Port Number' length=16 position=0 value=b'\x00\x07' + - id='Destination Port Number' length=16 position=0 value=b'\x00\x07' + - id='Verification Tag' length=32 position=0 value=b'\x43\x23\x25\x44' + - id='Checksum' length=32 position=0 value=b'\xc9\x01\x85\x24' + - id='Chunk Type' length=8 position=0 value=b'\x02' + - id='Chunk Flags' length=8 position=0 value=b'\x00' + - id='Chunk Length' length=16 position=0 value=b'\x00\x80' + - id='Initiate Tag length=32 position=0 value=b'\x00\x00\x0e\xb0' + - id='Advertised Receiver Window Credit' length=32 position=0 value=b'\x00\x00\x10\x00' + - id='Number of Outbound Streams' length=16 position=0 value=b'\x00\x11' + - id='Number of Inbound Streams' length=16 position=0 value=b'\x00\x11' + - id='Initial TSN length=32 position=0 value=b'\x00\x00\x36\x14' + - id='Parameter Type' length=16 position=0 value=b'\x00\x07' + - id='Parameter Length' length=16 position=0 value=b'\x00\x68' + - id='Parameter Value' length=800 position=0 value=b'\x00\x00\x0e\xb0\x00\x00\x10\x00\x00\x11\x00\x11' + b'\x00\x00\x36\x14\x43\x23\x25\x44\x00\x00\xff\xff\x00\x11\x00\x11' + b'\x5c\xfe\x37\x9f\x07\x00\x07\x00\x00\x00\x00\x00\x00\x00\x00\x00' + b'\xa2\x85\xb1\x3f\x10\x27\x00\x00\x17\xcd\x8f\x1c\x11\x76\x9b\x04' + b'\x55\xc0\xd0\xf2\x2c\x3e\x7c\x35\x00\x01\x00\x01\x00\x00\x00\x00' + b'\x00\x00\x00\x00\x00\x05\x00\x08\xc0\xa8\xaa\x38\x00\x05\x00\x08' + b'\xc0\xa8\xaa\x08\xc0\x00\x00\x04' + - id='Parameter Type' length=16 position=0 value=b'\xc0\x00' + - id='Parameter Length' length=16 position=0 value=b'\x00\x04' + + """ + + valid_sctp_packet:bytes = bytes(b'\x00\x07\x00\x07\x43\x23\x25\x44\xc9\x01\x85\x24\x02\x00\x00\x80' + b'\x00\x00\x0e\xb0\x00\x00\x10\x00\x00\x11\x00\x11\x00\x00\x36\x14' + b'\x00\x07\x00\x68\x00\x00\x0e\xb0\x00\x00\x10\x00\x00\x11\x00\x11' + b'\x00\x00\x36\x14\x43\x23\x25\x44\x00\x00\xff\xff\x00\x11\x00\x11' + b'\x5c\xfe\x37\x9f\x07\x00\x07\x00\x00\x00\x00\x00\x00\x00\x00\x00' + b'\xa2\x85\xb1\x3f\x10\x27\x00\x00\x17\xcd\x8f\x1c\x11\x76\x9b\x04' + b'\x55\xc0\xd0\xf2\x2c\x3e\x7c\x35\x00\x01\x00\x01\x00\x00\x00\x00' + b'\x00\x00\x00\x00\x00\x05\x00\x08\xc0\xa8\xaa\x38\x00\x05\x00\x08' + b'\xc0\xa8\xaa\x08\xc0\x00\x00\x04\xc0\x00\x00\x04' + ) + valid_sctp_packet_buffer: Buffer = Buffer(content=valid_sctp_packet, length=len(valid_sctp_packet)*8) + parser:SCTPParser = SCTPParser() + + sctp_header_descriptor: HeaderDescriptor = parser.parse(buffer=valid_sctp_packet_buffer) + + # test sctp_header_descriptor type + assert isinstance(sctp_header_descriptor, HeaderDescriptor) + + # test for sctp_header_descriptor.fields length + assert len(sctp_header_descriptor.fields) == 17 + + # test for sctp_header_descriptor.fields types + for field in sctp_header_descriptor.fields: + assert isinstance(field, FieldDescriptor) + + # assert field descriptors match SCTP header content + # - common header fields + source_port_fd:FieldDescriptor = sctp_header_descriptor.fields[0] + assert source_port_fd.id == SCTPFields.SOURCE_PORT + assert source_port_fd.position == 0 + assert source_port_fd.value == Buffer(content=b'\x00\x07', length=16) + + destination_port_fd:FieldDescriptor = sctp_header_descriptor.fields[1] + assert destination_port_fd.id == SCTPFields.DESTINATION_PORT + assert destination_port_fd.position == 0 + assert destination_port_fd.value == Buffer(content=b'\x00\x07', length=16) + + verification_tag_fd:FieldDescriptor = sctp_header_descriptor.fields[2] + assert verification_tag_fd.id == SCTPFields.VERIFICATION_TAG + assert verification_tag_fd.position == 0 + assert verification_tag_fd.value == Buffer(content=b'\x43\x23\x25\x44', length=32) + + checksum_fd:FieldDescriptor = sctp_header_descriptor.fields[3] + assert checksum_fd.id == SCTPFields.CHECKSUM + assert checksum_fd.position == 0 + assert checksum_fd.value == Buffer(content=b'\xc9\x01\x85\x24', length=32) + + + # - chunk common header fields + chunk_type_fd:FieldDescriptor = sctp_header_descriptor.fields[4] + assert chunk_type_fd.id == SCTPFields.CHUNK_TYPE + assert chunk_type_fd.position == 0 + assert chunk_type_fd.value == Buffer(content=b'\x02', length=8) + + chunk_flags_fd:FieldDescriptor = sctp_header_descriptor.fields[5] + assert chunk_flags_fd.id == SCTPFields.CHUNK_FLAGS + assert chunk_flags_fd.position == 0 + assert chunk_flags_fd.value == Buffer(content=b'\x00', length=8) + + chunk_length_fd:FieldDescriptor = sctp_header_descriptor.fields[6] + assert chunk_length_fd.id == SCTPFields.CHUNK_LENGTH + assert chunk_length_fd.position == 0 + assert chunk_length_fd.value == Buffer(content=b'\x00\x80', length=16) + + initiate_tag_fd:FieldDescriptor = sctp_header_descriptor.fields[7] + assert initiate_tag_fd.id == SCTPFields.CHUNK_INIT_ACK_INITIATE_TAG + assert initiate_tag_fd.position == 0 + assert initiate_tag_fd.value == Buffer(content=b'\x00\x00\x0e\xb0', length=32) + + advertised_receiver_window_credit_fd:FieldDescriptor = sctp_header_descriptor.fields[8] + assert advertised_receiver_window_credit_fd.id == SCTPFields.CHUNK_INIT_ACK_ADVERTISED_RECEIVER_WINDOW_CREDIT + assert advertised_receiver_window_credit_fd.position == 0 + assert advertised_receiver_window_credit_fd.value == Buffer(content=b'\x00\x00\x10\x00', length=32) + + number_outbound_streams_fd:FieldDescriptor = sctp_header_descriptor.fields[9] + assert number_outbound_streams_fd.id == SCTPFields.CHUNK_INIT_ACK_NUMBER_OF_OUTBOUND_STREAMS + assert number_outbound_streams_fd.position == 0 + assert number_outbound_streams_fd.value == Buffer(content=b'\x00\x11', length=16) + + number_inbound_streams_fd:FieldDescriptor = sctp_header_descriptor.fields[10] + assert number_inbound_streams_fd.id == SCTPFields.CHUNK_INIT_ACK_NUMBER_OF_INBOUND_STREAMS + assert number_inbound_streams_fd.position == 0 + assert number_inbound_streams_fd.value == Buffer(content=b'\x00\x11', length=16) + + initial_tsn_fd:FieldDescriptor = sctp_header_descriptor.fields[11] + assert initial_tsn_fd.id == SCTPFields.CHUNK_INIT_ACK_INITIAL_TSN + assert initial_tsn_fd.position == 0 + assert initial_tsn_fd.value == Buffer(content=b'\x00\x00\x36\x14', length=32) + + parameter_type_fd:FieldDescriptor = sctp_header_descriptor.fields[12] + assert parameter_type_fd.id == SCTPFields.PARAMETER_TYPE + assert parameter_type_fd.position == 0 + assert parameter_type_fd.value == Buffer(content=b'\x00\x07', length=16) + + parameter_length_fd:FieldDescriptor = sctp_header_descriptor.fields[13] + assert parameter_length_fd.id == SCTPFields.PARAMETER_LENGTH + assert parameter_length_fd.position == 0 + assert parameter_length_fd.value == Buffer(content=b'\x00\x68', length=16) + + parameter_value_fd:FieldDescriptor = sctp_header_descriptor.fields[14] + assert parameter_value_fd.id == SCTPFields.PARAMETER_VALUE + assert parameter_value_fd.position == 0 + assert parameter_value_fd.value == Buffer(content=b'\x00\x00\x0e\xb0\x00\x00\x10\x00\x00\x11\x00\x11' + b'\x00\x00\x36\x14\x43\x23\x25\x44\x00\x00\xff\xff\x00\x11\x00\x11' + b'\x5c\xfe\x37\x9f\x07\x00\x07\x00\x00\x00\x00\x00\x00\x00\x00\x00' + b'\xa2\x85\xb1\x3f\x10\x27\x00\x00\x17\xcd\x8f\x1c\x11\x76\x9b\x04' + b'\x55\xc0\xd0\xf2\x2c\x3e\x7c\x35\x00\x01\x00\x01\x00\x00\x00\x00' + b'\x00\x00\x00\x00\x00\x05\x00\x08\xc0\xa8\xaa\x38\x00\x05\x00\x08' + b'\xc0\xa8\xaa\x08\xc0\x00\x00\x04', + length=800) + + parameter_type_fd:FieldDescriptor = sctp_header_descriptor.fields[15] + assert parameter_type_fd.id == SCTPFields.PARAMETER_TYPE + assert parameter_type_fd.position == 0 + assert parameter_type_fd.value == Buffer(content=b'\xc0\x00', length=16) + + parameter_length_fd:FieldDescriptor = sctp_header_descriptor.fields[16] + assert parameter_length_fd.id == SCTPFields.PARAMETER_LENGTH + assert parameter_length_fd.position == 0 + assert parameter_length_fd.value == Buffer(content=b'\x00\x04', length=16) + + +def test_sctp_parser_selective_ack(): + """test: SCTP header parser parses SCTP Header with SACK chunk + + The packet is made of a SCTP header with the following fields: + - id='Source Port Number' length=16 position=0 value=b'\x00\x07' + - id='Destination Port Number' length=16 position=0 value=b'\x00\x07' + - id='Verification Tag' length=32 position=0 value=b'\x00\x00\x0e\xb0' + - id='Checksum' length=32 position=0 value=b'\xba\x04\x32\x58' + - id='Chunk Type' length=8 position=0 value=b'\x03' + - id='Chunk Flags' length=8 position=0 value=b'\x00' + - id='Chunk Length' length=16 position=0 value=b'\x00\x10' + - id='Cumulative TSN Ack' length=32 position=0 value=b'\x00\x00\x36\x1c' + - id='Advertised Receiver Window Credit' length=32 position=0 value=b'\x00\x00\xff\xff' + - id='Number of Gap Ack Blocks' length=16 position=0 value=b'\x00\x00' + - id='Number of Duplicate TSNs' length=16 position=0 value=b'\x00\x00' + """ + #TODO: Find SACK chunk with Gap Ack Blocks and Duplicate TSNs. + + valid_sctp_packet:bytes = bytes(b'\x00\x07\x00\x07\x00\x00\x0e\xb0\xba\x04\x32\x58\x03\x00\x00\x10' + b'\x00\x00\x36\x1c\x00\x00\xff\xff\x00\x00\x00\x00') + valid_sctp_packet_buffer: Buffer = Buffer(content=valid_sctp_packet, length=len(valid_sctp_packet)*8) + parser:SCTPParser = SCTPParser() + + sctp_header_descriptor: HeaderDescriptor = parser.parse(buffer=valid_sctp_packet_buffer) + + # test sctp_header_descriptor type + assert isinstance(sctp_header_descriptor, HeaderDescriptor) + + # test for sctp_header_descriptor.fields length + assert len(sctp_header_descriptor.fields) == 11 + + # test for sctp_header_descriptor.fields types + for field in sctp_header_descriptor.fields: + assert isinstance(field, FieldDescriptor) + + # assert field descriptors match SCTP header content + # - common header fields + source_port_fd:FieldDescriptor = sctp_header_descriptor.fields[0] + assert source_port_fd.id == SCTPFields.SOURCE_PORT + assert source_port_fd.position == 0 + assert source_port_fd.value == Buffer(content=b'\x00\x07', length=16) + + destination_port_fd:FieldDescriptor = sctp_header_descriptor.fields[1] + assert destination_port_fd.id == SCTPFields.DESTINATION_PORT + assert destination_port_fd.position == 0 + assert destination_port_fd.value == Buffer(content=b'\x00\x07', length=16) + + verification_tag_fd:FieldDescriptor = sctp_header_descriptor.fields[2] + assert verification_tag_fd.id == SCTPFields.VERIFICATION_TAG + assert verification_tag_fd.position == 0 + assert verification_tag_fd.value == Buffer(content=b'\x00\x00\x0e\xb0', length=32) + + checksum_fd:FieldDescriptor = sctp_header_descriptor.fields[3] + assert checksum_fd.id == SCTPFields.CHECKSUM + assert checksum_fd.position == 0 + assert checksum_fd.value == Buffer(content=b'\xba\x04\x32\x58', length=32) + + + # - chunk common header fields + chunk_type_fd:FieldDescriptor = sctp_header_descriptor.fields[4] + assert chunk_type_fd.id == SCTPFields.CHUNK_TYPE + assert chunk_type_fd.position == 0 + assert chunk_type_fd.value == Buffer(content=b'\x03', length=8) + + chunk_flags_fd:FieldDescriptor = sctp_header_descriptor.fields[5] + assert chunk_flags_fd.id == SCTPFields.CHUNK_FLAGS + assert chunk_flags_fd.position == 0 + assert chunk_flags_fd.value == Buffer(content=b'\x00', length=8) + + chunk_length_fd:FieldDescriptor = sctp_header_descriptor.fields[6] + assert chunk_length_fd.id == SCTPFields.CHUNK_LENGTH + assert chunk_length_fd.position == 0 + assert chunk_length_fd.value == Buffer(content=b'\x00\x10', length=16) + + cumulative_tsn_ack_fd:FieldDescriptor = sctp_header_descriptor.fields[7] + assert cumulative_tsn_ack_fd.id == SCTPFields.CHUNK_SACK_CUMULATIVE_TSN_ACK + assert cumulative_tsn_ack_fd.position == 0 + assert cumulative_tsn_ack_fd.value == Buffer(content=b'\x00\x00\x36\x1c', length=32) + + advertised_receiver_window_credit_fd:FieldDescriptor = sctp_header_descriptor.fields[8] + assert advertised_receiver_window_credit_fd.id == SCTPFields.CHUNK_SACK_ADVERTISED_RECEIVER_WINDOW_CREDIT + assert advertised_receiver_window_credit_fd.position == 0 + assert advertised_receiver_window_credit_fd.value == Buffer(content=b'\x00\x00\xff\xff', length=32) + + number_gap_ack_blocks_fd:FieldDescriptor = sctp_header_descriptor.fields[9] + assert number_gap_ack_blocks_fd.id == SCTPFields.CHUNK_SACK_NUMBER_GAP_ACK_BLOCKS + assert number_gap_ack_blocks_fd.position == 0 + assert number_gap_ack_blocks_fd.value == Buffer(content=b'\x00\x00', length=16) + + number_inbound_streams_fd:FieldDescriptor = sctp_header_descriptor.fields[10] + assert number_inbound_streams_fd.id == SCTPFields.CHUNK_SACK_NUMBER_DUPLICATE_TSNS + assert number_inbound_streams_fd.position == 0 + assert number_inbound_streams_fd.value == Buffer(content=b'\x00\x00', length=16) + + +def test_sctp_parser_parse_heartbeat(): + """test: SCTP header parser parses SCTP Header with HEARTBEAT chunk + + The packet is made of a SCTP header with the following fields: + - id='Source Port Number' length=16 position=0 value=b'\x0b\x59' + - id='Destination Port Number' length=16 position=0 value=b'\x0b\x59' + - id='Verification Tag' length=32 position=0 value=b'\x00\x00\x0e\x50' + - id='Checksum' length=32 position=0 value=b'\x53\xc3\x05\x5f' + - id='Chunk Type' length=8 position=0 value=b'\x04' + - id='Chunk Flags' length=8 position=0 value=b'\x00' + - id='Chunk Length' length=16 position=0 value=b'\x00\x18' + - id='Parameter Type' length=16 position=0 value=b'\x00\x01' + - id='Parameter Length' length=16 position=0 value=b'\x00\x14' + - id='Parameter Value' length=16 position=0 value=b'\x40\xe4\x4b\x92\x0a\x1c\x06\x2c\x1b\x66\xaf\x7e\x00\x00\x00\x00' + + """ + + valid_sctp_packet:bytes = bytes(b'\x0b\x59\x0b\x59\x00\x00\x0e\x50\x53\xc3\x05\x5f\x04\x00\x00\x18' + b'\x00\x01\x00\x14\x40\xe4\x4b\x92\x0a\x1c\x06\x2c\x1b\x66\xaf\x7e' + b'\x00\x00\x00\x00' + + ) + + valid_sctp_packet_buffer: Buffer = Buffer(content=valid_sctp_packet, length=len(valid_sctp_packet)*8) + parser:SCTPParser = SCTPParser() + + sctp_header_descriptor: HeaderDescriptor = parser.parse(buffer=valid_sctp_packet_buffer) + + # test sctp_header_descriptor type + assert isinstance(sctp_header_descriptor, HeaderDescriptor) + + # test for sctp_header_descriptor.fields length + assert len(sctp_header_descriptor.fields) == 10 + + # test for sctp_header_descriptor.fields types + for field in sctp_header_descriptor.fields: + assert isinstance(field, FieldDescriptor) + + # assert field descriptors match SCTP header content + # - common header fields + source_port_fd:FieldDescriptor = sctp_header_descriptor.fields[0] + assert source_port_fd.id == SCTPFields.SOURCE_PORT + assert source_port_fd.position == 0 + assert source_port_fd.value == Buffer(content=b'\x0b\x59', length=16) + + destination_port_fd:FieldDescriptor = sctp_header_descriptor.fields[1] + assert destination_port_fd.id == SCTPFields.DESTINATION_PORT + assert destination_port_fd.position == 0 + assert destination_port_fd.value == Buffer(content=b'\x0b\x59', length=16) + + verification_tag_fd:FieldDescriptor = sctp_header_descriptor.fields[2] + assert verification_tag_fd.id == SCTPFields.VERIFICATION_TAG + assert verification_tag_fd.position == 0 + assert verification_tag_fd.value == Buffer(content=b'\x00\x00\x0e\x50', length=32) + + checksum_fd:FieldDescriptor = sctp_header_descriptor.fields[3] + assert checksum_fd.id == SCTPFields.CHECKSUM + assert checksum_fd.position == 0 + assert checksum_fd.value == Buffer(content=b'\x53\xc3\x05\x5f', length=32) + + # - chunk common header fields + chunk_type_fd:FieldDescriptor = sctp_header_descriptor.fields[4] + assert chunk_type_fd.id == SCTPFields.CHUNK_TYPE + assert chunk_type_fd.position == 0 + assert chunk_type_fd.value == Buffer(content=b'\x04', length=8) + + chunk_flags_fd:FieldDescriptor = sctp_header_descriptor.fields[5] + assert chunk_flags_fd.id == SCTPFields.CHUNK_FLAGS + assert chunk_flags_fd.position == 0 + assert chunk_flags_fd.value == Buffer(content=b'\x00', length=8) + + chunk_length_fd:FieldDescriptor = sctp_header_descriptor.fields[6] + assert chunk_length_fd.id == SCTPFields.CHUNK_LENGTH + assert chunk_length_fd.position == 0 + assert chunk_length_fd.value == Buffer(content=b'\x00\x18', length=16) + + parameter_type_fd:FieldDescriptor = sctp_header_descriptor.fields[7] + assert parameter_type_fd.id == SCTPFields.PARAMETER_TYPE + assert parameter_type_fd.position == 0 + assert parameter_type_fd.value == Buffer(content=b'\x00\x01', length=16) + + parameter_length_fd:FieldDescriptor = sctp_header_descriptor.fields[8] + assert parameter_length_fd.id == SCTPFields.PARAMETER_LENGTH + assert parameter_length_fd.position == 0 + assert parameter_length_fd.value == Buffer(content=b'\x00\x14', length=16) + + parameter_value_fd:FieldDescriptor = sctp_header_descriptor.fields[9] + assert parameter_value_fd.id == SCTPFields.PARAMETER_VALUE + assert parameter_value_fd.position == 0 + assert parameter_value_fd.value == Buffer(content=b'\x40\xe4\x4b\x92\x0a\x1c\x06\x2c\x1b\x66\xaf\x7e\x00\x00\x00\x00', length=128) + + +def test_sctp_parser_parse_heartbeat_ack(): + """test: SCTP header parser parses SCTP Header with HEARTBEAT ACK chunk + + The packet is made of a SCTP header with the following fields: + - id='Source Port Number' length=16 position=0 value=b'\x0b\x59' + - id='Destination Port Number' length=16 position=0 value=b'\x0b\x59' + - id='Verification Tag' length=32 position=0 value=b'\x0d\x53\xe6\xfe' + - id='Checksum' length=32 position=0 value=b'\x8c\x8e\x07\x46' + - id='Chunk Type' length=8 position=0 value=b'\x05' + - id='Chunk Flags' length=8 position=0 value=b'\x00' + - id='Chunk Length' length=16 position=0 value=b'\x00\x18' + - id='Parameter Type' length=16 position=0 value=b'\x00\x01' + - id='Parameter Length' length=16 position=0 value=b'\x00\x14' + - id='Parameter Value' length=16 position=0 value=b'\x40\xe4\x4b\x92\x0a\x1c\x06\x2c\x1b\x66\xaf\x7e\x00\x00\x00\x00' + + """ + + valid_sctp_packet:bytes = bytes(b'\x0b\x59\x0b\x59\x0d\x53\xe6\xfe\x8c\x8e\x07\x46\x05\x00\x00\x18' + b'\x00\x01\x00\x14\x40\xe4\x4b\x92\x0a\x1c\x06\x2c\x1b\x66\xaf\x7e' + b'\x00\x00\x00\x00') + + valid_sctp_packet_buffer: Buffer = Buffer(content=valid_sctp_packet, length=len(valid_sctp_packet)*8) + parser:SCTPParser = SCTPParser() + + sctp_header_descriptor: HeaderDescriptor = parser.parse(buffer=valid_sctp_packet_buffer) + + # test sctp_header_descriptor type + assert isinstance(sctp_header_descriptor, HeaderDescriptor) + + # test for sctp_header_descriptor.fields length + assert len(sctp_header_descriptor.fields) == 10 + + # test for sctp_header_descriptor.fields types + for field in sctp_header_descriptor.fields: + assert isinstance(field, FieldDescriptor) + + # assert field descriptors match SCTP header content + # - common header fields + source_port_fd:FieldDescriptor = sctp_header_descriptor.fields[0] + assert source_port_fd.id == SCTPFields.SOURCE_PORT + assert source_port_fd.position == 0 + assert source_port_fd.value == Buffer(content=b'\x0b\x59', length=16) + + destination_port_fd:FieldDescriptor = sctp_header_descriptor.fields[1] + assert destination_port_fd.id == SCTPFields.DESTINATION_PORT + assert destination_port_fd.position == 0 + assert destination_port_fd.value == Buffer(content=b'\x0b\x59', length=16) + + verification_tag_fd:FieldDescriptor = sctp_header_descriptor.fields[2] + assert verification_tag_fd.id == SCTPFields.VERIFICATION_TAG + assert verification_tag_fd.position == 0 + assert verification_tag_fd.value == Buffer(content=b'\x0d\x53\xe6\xfe', length=32) + + checksum_fd:FieldDescriptor = sctp_header_descriptor.fields[3] + assert checksum_fd.id == SCTPFields.CHECKSUM + assert checksum_fd.position == 0 + assert checksum_fd.value == Buffer(content=b'\x8c\x8e\x07\x46', length=32) + + # - chunk common header fields + chunk_type_fd:FieldDescriptor = sctp_header_descriptor.fields[4] + assert chunk_type_fd.id == SCTPFields.CHUNK_TYPE + assert chunk_type_fd.position == 0 + assert chunk_type_fd.value == Buffer(content=b'\x05', length=8) + + chunk_flags_fd:FieldDescriptor = sctp_header_descriptor.fields[5] + assert chunk_flags_fd.id == SCTPFields.CHUNK_FLAGS + assert chunk_flags_fd.position == 0 + assert chunk_flags_fd.value == Buffer(content=b'\x00', length=8) + + chunk_length_fd:FieldDescriptor = sctp_header_descriptor.fields[6] + assert chunk_length_fd.id == SCTPFields.CHUNK_LENGTH + assert chunk_length_fd.position == 0 + assert chunk_length_fd.value == Buffer(content=b'\x00\x18', length=16) + + parameter_type_fd:FieldDescriptor = sctp_header_descriptor.fields[7] + assert parameter_type_fd.id == SCTPFields.PARAMETER_TYPE + assert parameter_type_fd.position == 0 + assert parameter_type_fd.value == Buffer(content=b'\x00\x01', length=16) + + parameter_length_fd:FieldDescriptor = sctp_header_descriptor.fields[8] + assert parameter_length_fd.id == SCTPFields.PARAMETER_LENGTH + assert parameter_length_fd.position == 0 + assert parameter_length_fd.value == Buffer(content=b'\x00\x14', length=16) + + parameter_value_fd:FieldDescriptor = sctp_header_descriptor.fields[9] + assert parameter_value_fd.id == SCTPFields.PARAMETER_VALUE + assert parameter_value_fd.position == 0 + + +def test_sctp_parser_parse_abort(): + # TODO: find PCPAP with abort chunk + pass + +def test_sctp_parser_parse_shutdown(): + """test: SCTP header parser parses SCTP Header with a SHUTDOWN chunk + + The packet is made of a SCTP header with the following fields: + - id='Source Port Number' length=16 position=0 value=b'\x80\x45' + - id='Destination Port Number' length=16 position=0 value=b'\x00\x50' + - id='Verification Tag' length=32 position=0 value=b'\x13\x89\x60\x07' + - id='Checksum' length=32 position=0 value=b'\x75\x58\xd7\xdb' + - id='Chunk Type' length=8 position=0 value=b'\x07' + - id='Chunk Flags' length=8 position=0 value=b'\x00' + - id='Chunk Length' length=16 position=0 value=b'\x00\x08' + - id='Cumulative TSN Ack' length=32 position=0 value=b'\xaa\x86\x96\x09' + """ + + valid_sctp_packet:bytes = bytes(b'\x80\x45\x00\x50\x13\x89\x60\x07\x75\x58\xd7\xdb\x07\x00\x00\x08' + b'\xaa\x86\x96\x09') + + valid_sctp_packet_buffer: Buffer = Buffer(content=valid_sctp_packet, length=len(valid_sctp_packet)*8) + parser:SCTPParser = SCTPParser() + + sctp_header_descriptor: HeaderDescriptor = parser.parse(buffer=valid_sctp_packet_buffer) + + # test sctp_header_descriptor type + assert isinstance(sctp_header_descriptor, HeaderDescriptor) + + # test for sctp_header_descriptor.fields length + assert len(sctp_header_descriptor.fields) == 8 + + # test for sctp_header_descriptor.fields types + for field in sctp_header_descriptor.fields: + assert isinstance(field, FieldDescriptor) + + # assert field descriptors match SCTP header content + # - common header fields + source_port_fd:FieldDescriptor = sctp_header_descriptor.fields[0] + assert source_port_fd.id == SCTPFields.SOURCE_PORT + assert source_port_fd.position == 0 + assert source_port_fd.value == Buffer(content=b'\x80\x45', length=16) + + destination_port_fd:FieldDescriptor = sctp_header_descriptor.fields[1] + assert destination_port_fd.id == SCTPFields.DESTINATION_PORT + assert destination_port_fd.position == 0 + assert destination_port_fd.value == Buffer(content=b'\x00\x50', length=16) + + verification_tag_fd:FieldDescriptor = sctp_header_descriptor.fields[2] + assert verification_tag_fd.id == SCTPFields.VERIFICATION_TAG + assert verification_tag_fd.position == 0 + assert verification_tag_fd.value == Buffer(content=b'\x13\x89\x60\x07', length=32) + + checksum_fd:FieldDescriptor = sctp_header_descriptor.fields[3] + assert checksum_fd.id == SCTPFields.CHECKSUM + assert checksum_fd.position == 0 + assert checksum_fd.value == Buffer(content=b'\x75\x58\xd7\xdb', length=32) + + + # - chunk common header fields + chunk_type_fd:FieldDescriptor = sctp_header_descriptor.fields[4] + assert chunk_type_fd.id == SCTPFields.CHUNK_TYPE + assert chunk_type_fd.position == 0 + assert chunk_type_fd.value == Buffer(content=b'\x07', length=8) + + chunk_flags_fd:FieldDescriptor = sctp_header_descriptor.fields[5] + assert chunk_flags_fd.id == SCTPFields.CHUNK_FLAGS + assert chunk_flags_fd.position == 0 + assert chunk_flags_fd.value == Buffer(content=b'\x00', length=8) + + chunk_length_fd:FieldDescriptor = sctp_header_descriptor.fields[6] + assert chunk_length_fd.id == SCTPFields.CHUNK_LENGTH + assert chunk_length_fd.position == 0 + assert chunk_length_fd.value == Buffer(content=b'\x00\x08', length=16) + + cumulative_tsn_ack_fd:FieldDescriptor = sctp_header_descriptor.fields[7] + assert cumulative_tsn_ack_fd.id == SCTPFields.CHUNK_SHUTDOWN_CUMULATIVE_TSN_ACK + assert cumulative_tsn_ack_fd.position == 0 + assert cumulative_tsn_ack_fd.value == Buffer(content=b'\xaa\x86\x96\x09', length=32) + +def test_sctp_parser_parse_shutdown_ack(): + """test: SCTP header parser parses SCTP Header with a SHUTDOWN ACK chunk + + The packet is made of a SCTP header with the following fields: + - id='Source Port Number' length=16 position=0 value=b'\x00\x50' + - id='Destination Port Number' length=16 position=0 value=b'\x80\x45' + - id='Verification Tag' length=32 position=0 value=b'\xe9\xb7\x3b\xc3' + - id='Checksum' length=32 position=0 value=b'\xc7\x4e\x59\xcd' + - id='Chunk Type' length=8 position=0 value=b'\x08' + - id='Chunk Flags' length=8 position=0 value=b'\x00' + - id='Chunk Length' length=16 position=0 value=b'\x00\x04' + """ + + valid_sctp_packet:bytes = bytes(b'\x00\x50\x80\x45\xe9\xb7\x3b\xc3\xc7\x4e\x59\xcd\x08\x00\x00\x04') + + valid_sctp_packet_buffer: Buffer = Buffer(content=valid_sctp_packet, length=len(valid_sctp_packet)*8) + parser:SCTPParser = SCTPParser() + + sctp_header_descriptor: HeaderDescriptor = parser.parse(buffer=valid_sctp_packet_buffer) + + # test sctp_header_descriptor type + assert isinstance(sctp_header_descriptor, HeaderDescriptor) + + # test for sctp_header_descriptor.fields length + assert len(sctp_header_descriptor.fields) == 7 + + # test for sctp_header_descriptor.fields types + for field in sctp_header_descriptor.fields: + assert isinstance(field, FieldDescriptor) + + # assert field descriptors match SCTP header content + # - common header fields + source_port_fd:FieldDescriptor = sctp_header_descriptor.fields[0] + assert source_port_fd.id == SCTPFields.SOURCE_PORT + assert source_port_fd.position == 0 + assert source_port_fd.value == Buffer(content=b'\x00\x50', length=16) + + destination_port_fd:FieldDescriptor = sctp_header_descriptor.fields[1] + assert destination_port_fd.id == SCTPFields.DESTINATION_PORT + assert destination_port_fd.position == 0 + assert destination_port_fd.value == Buffer(content=b'\x80\x45', length=16) + + verification_tag_fd:FieldDescriptor = sctp_header_descriptor.fields[2] + assert verification_tag_fd.id == SCTPFields.VERIFICATION_TAG + assert verification_tag_fd.position == 0 + assert verification_tag_fd.value == Buffer(content=b'\xe9\xb7\x3b\xc3', length=32) + + checksum_fd:FieldDescriptor = sctp_header_descriptor.fields[3] + assert checksum_fd.id == SCTPFields.CHECKSUM + assert checksum_fd.position == 0 + assert checksum_fd.value == Buffer(content=b'\xc7\x4e\x59\xcd', length=32) + + + # - chunk common header fields + chunk_type_fd:FieldDescriptor = sctp_header_descriptor.fields[4] + assert chunk_type_fd.id == SCTPFields.CHUNK_TYPE + assert chunk_type_fd.position == 0 + assert chunk_type_fd.value == Buffer(content=b'\x08', length=8) + + chunk_flags_fd:FieldDescriptor = sctp_header_descriptor.fields[5] + assert chunk_flags_fd.id == SCTPFields.CHUNK_FLAGS + assert chunk_flags_fd.position == 0 + assert chunk_flags_fd.value == Buffer(content=b'\x00', length=8) + + chunk_length_fd:FieldDescriptor = sctp_header_descriptor.fields[6] + assert chunk_length_fd.id == SCTPFields.CHUNK_LENGTH + assert chunk_length_fd.position == 0 + assert chunk_length_fd.value == Buffer(content=b'\x00\x04', length=16) + + +def test_sctp_parser_parse_shutdown_complete(): + """test: SCTP header parser parses SCTP Header with a SHUTDOWN COMPLETE chunk + + The packet is made of a SCTP header with the following fields: + - id='Source Port Number' length=16 position=0 value=b'\x80\x45' + - id='Destination Port Number' length=16 position=0 value=b'\x00\x50' + - id='Verification Tag' length=32 position=0 value=b'\x13\x89\x60\x07' + - id='Checksum' length=32 position=0 value=b'\xae\x51\x7e\x10' + - id='Chunk Type' length=8 position=0 value=b'\x0e' + - id='Chunk Flags' length=8 position=0 value=b'\x00' + - id='Chunk Length' length=16 position=0 value=b'\x00\x04' + """ + + valid_sctp_packet:bytes = bytes(b'\x80\x45\x00\x50\x13\x89\x60\x07\xae\x51\x7e\x10\x0e\x00\x00\x04') + + valid_sctp_packet_buffer: Buffer = Buffer(content=valid_sctp_packet, length=len(valid_sctp_packet)*8) + parser:SCTPParser = SCTPParser() + + sctp_header_descriptor: HeaderDescriptor = parser.parse(buffer=valid_sctp_packet_buffer) + + # test sctp_header_descriptor type + assert isinstance(sctp_header_descriptor, HeaderDescriptor) + + # test for sctp_header_descriptor.fields length + assert len(sctp_header_descriptor.fields) == 7 + + # test for sctp_header_descriptor.fields types + for field in sctp_header_descriptor.fields: + assert isinstance(field, FieldDescriptor) + + # assert field descriptors match SCTP header content + # - common header fields + source_port_fd:FieldDescriptor = sctp_header_descriptor.fields[0] + assert source_port_fd.id == SCTPFields.SOURCE_PORT + assert source_port_fd.position == 0 + assert source_port_fd.value == Buffer(content=b'\x80\x45', length=16) + + destination_port_fd:FieldDescriptor = sctp_header_descriptor.fields[1] + assert destination_port_fd.id == SCTPFields.DESTINATION_PORT + assert destination_port_fd.position == 0 + assert destination_port_fd.value == Buffer(content=b'\x00\x50', length=16) + + verification_tag_fd:FieldDescriptor = sctp_header_descriptor.fields[2] + assert verification_tag_fd.id == SCTPFields.VERIFICATION_TAG + assert verification_tag_fd.position == 0 + assert verification_tag_fd.value == Buffer(content=b'\x13\x89\x60\x07', length=32) + + checksum_fd:FieldDescriptor = sctp_header_descriptor.fields[3] + assert checksum_fd.id == SCTPFields.CHECKSUM + assert checksum_fd.position == 0 + assert checksum_fd.value == Buffer(content=b'\xae\x51\x7e\x10', length=32) + + + # - chunk common header fields + chunk_type_fd:FieldDescriptor = sctp_header_descriptor.fields[4] + assert chunk_type_fd.id == SCTPFields.CHUNK_TYPE + assert chunk_type_fd.position == 0 + assert chunk_type_fd.value == Buffer(content=b'\x0e', length=8) + + chunk_flags_fd:FieldDescriptor = sctp_header_descriptor.fields[5] + assert chunk_flags_fd.id == SCTPFields.CHUNK_FLAGS + assert chunk_flags_fd.position == 0 + assert chunk_flags_fd.value == Buffer(content=b'\x00', length=8) + + chunk_length_fd:FieldDescriptor = sctp_header_descriptor.fields[6] + assert chunk_length_fd.id == SCTPFields.CHUNK_LENGTH + assert chunk_length_fd.position == 0 + assert chunk_length_fd.value == Buffer(content=b'\x00\x04', length=16) + +def test_sctp_parser_parse_error(): + # TODO: find PCPAP with error chunk + pass + +def test_sctp_parser_parse_cookie_echo(): + """test: SCTP header parser parses SCTP Header with a COOKIE ECHO chunk + + The packet is made of a SCTP header with the following fields: + - id='Source Port Number' length=16 position=0 value=b'\x80\x44' + - id='Destination Port Number' length=16 position=0 value=b'\x00\x50' + - id='Verification Tag' length=32 position=0 value=b'\xd2\x6a\xc1\xe5' + - id='Checksum' length=32 position=0 value=b'\x75\x14\xa4\x9a' + - id='Chunk Type' length=8 position=0 value=b'\x0a' + - id='Chunk Flags' length=8 position=0 value=b'\x00' + - id='Chunk Length' length=16 position=0 value=b'\x00\xc4' + - id='Cookie' length=1536 position=0 value=b'\xb3\x49\x30\x15\xe1\xc2\x76\x25\xf5\x3a\xb8\x18\x55\x55\xb8\xdb' + b'\x7a\x0f\x43\xdd\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' + b'\xe5\xc1\x6a\xd2\x46\x9c\xb9\x3b\x00\x00\x00\x00\x00\x00\x00\x00' + b'\xf4\xc5\xc5\x43\x95\x2b\x00\x00\x0a\x00\x0a\x00\x16\x2a\x00\x64' + b'\x02\x00\x44\x80\x9b\xe6\x18\x9b\x00\x00\x00\x00\x00\x00\x00\x00' + b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x50\x00\x01\x00' + b'\x00\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x3c\x3b\xb9\x9c\x46' + b'\x00\x01\xa0\x00\x00\x0a\xff\xff\x2b\x2d\x7e\xb2\x00\x05\x00\x08' + b'\x9b\xe6\x18\x9b\x00\x05\x00\x08\x9b\xe6\x18\x9c\x00\x0c\x00\x06' + b'\x00\x05\x00\x00\x80\x00\x00\x04\xc0\x00\x00\x04\xc0\x06\x00\x08' + b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' + b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' + """ + + valid_sctp_packet:bytes = bytes(b'\x80\x44\x00\x50\xd2\x6a\xc1\xe5\x75\x14\xa4\x9a\x0a\x00\x00\xc4' + b'\xb3\x49\x30\x15\xe1\xc2\x76\x25\xf5\x3a\xb8\x18\x55\x55\xb8\xdb' + b'\x7a\x0f\x43\xdd\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' + b'\xe5\xc1\x6a\xd2\x46\x9c\xb9\x3b\x00\x00\x00\x00\x00\x00\x00\x00' + b'\xf4\xc5\xc5\x43\x95\x2b\x00\x00\x0a\x00\x0a\x00\x16\x2a\x00\x64' + b'\x02\x00\x44\x80\x9b\xe6\x18\x9b\x00\x00\x00\x00\x00\x00\x00\x00' + b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x50\x00\x01\x00' + b'\x00\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x3c\x3b\xb9\x9c\x46' + b'\x00\x01\xa0\x00\x00\x0a\xff\xff\x2b\x2d\x7e\xb2\x00\x05\x00\x08' + b'\x9b\xe6\x18\x9b\x00\x05\x00\x08\x9b\xe6\x18\x9c\x00\x0c\x00\x06' + b'\x00\x05\x00\x00\x80\x00\x00\x04\xc0\x00\x00\x04\xc0\x06\x00\x08' + b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' + b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' + + ) + + valid_sctp_packet_buffer: Buffer = Buffer(content=valid_sctp_packet, length=len(valid_sctp_packet)*8) + parser:SCTPParser = SCTPParser() + + sctp_header_descriptor: HeaderDescriptor = parser.parse(buffer=valid_sctp_packet_buffer) + + # test sctp_header_descriptor type + assert isinstance(sctp_header_descriptor, HeaderDescriptor) + + # test for sctp_header_descriptor.fields length + assert len(sctp_header_descriptor.fields) == 8 + + # test for sctp_header_descriptor.fields types + for field in sctp_header_descriptor.fields: + assert isinstance(field, FieldDescriptor) + + # assert field descriptors match SCTP header content + # - common header fields + source_port_fd:FieldDescriptor = sctp_header_descriptor.fields[0] + assert source_port_fd.id == SCTPFields.SOURCE_PORT + assert source_port_fd.position == 0 + assert source_port_fd.value == Buffer(content=b'\x80\x44', length=16) + + destination_port_fd:FieldDescriptor = sctp_header_descriptor.fields[1] + assert destination_port_fd.id == SCTPFields.DESTINATION_PORT + assert destination_port_fd.position == 0 + assert destination_port_fd.value == Buffer(content=b'\x00\x50', length=16) + + verification_tag_fd:FieldDescriptor = sctp_header_descriptor.fields[2] + assert verification_tag_fd.id == SCTPFields.VERIFICATION_TAG + assert verification_tag_fd.position == 0 + assert verification_tag_fd.value == Buffer(content=b'\xd2\x6a\xc1\xe5', length=32) + + checksum_fd:FieldDescriptor = sctp_header_descriptor.fields[3] + assert checksum_fd.id == SCTPFields.CHECKSUM + assert checksum_fd.position == 0 + assert checksum_fd.value == Buffer(content=b'\x75\x14\xa4\x9a', length=32) + + + # - chunk common header fields + chunk_type_fd:FieldDescriptor = sctp_header_descriptor.fields[4] + assert chunk_type_fd.id == SCTPFields.CHUNK_TYPE + assert chunk_type_fd.position == 0 + assert chunk_type_fd.value == Buffer(content=b'\x0a', length=8) + + chunk_flags_fd:FieldDescriptor = sctp_header_descriptor.fields[5] + assert chunk_flags_fd.id == SCTPFields.CHUNK_FLAGS + assert chunk_flags_fd.position == 0 + assert chunk_flags_fd.value == Buffer(content=b'\x00', length=8) + + chunk_length_fd:FieldDescriptor = sctp_header_descriptor.fields[6] + assert chunk_length_fd.id == SCTPFields.CHUNK_LENGTH + assert chunk_length_fd.position == 0 + assert chunk_length_fd.value == Buffer(content=b'\x00\xc4', length=16) + + cookie_fd:FieldDescriptor = sctp_header_descriptor.fields[7] + assert cookie_fd.id == SCTPFields.CHUNK_COOKIE_ECHO_COOKIE + assert cookie_fd.position == 0 + assert cookie_fd.value == Buffer(content=b'\xb3\x49\x30\x15\xe1\xc2\x76\x25\xf5\x3a\xb8\x18\x55\x55\xb8\xdb' + b'\x7a\x0f\x43\xdd\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' + b'\xe5\xc1\x6a\xd2\x46\x9c\xb9\x3b\x00\x00\x00\x00\x00\x00\x00\x00' + b'\xf4\xc5\xc5\x43\x95\x2b\x00\x00\x0a\x00\x0a\x00\x16\x2a\x00\x64' + b'\x02\x00\x44\x80\x9b\xe6\x18\x9b\x00\x00\x00\x00\x00\x00\x00\x00' + b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x50\x00\x01\x00' + b'\x00\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x3c\x3b\xb9\x9c\x46' + b'\x00\x01\xa0\x00\x00\x0a\xff\xff\x2b\x2d\x7e\xb2\x00\x05\x00\x08' + b'\x9b\xe6\x18\x9b\x00\x05\x00\x08\x9b\xe6\x18\x9c\x00\x0c\x00\x06' + b'\x00\x05\x00\x00\x80\x00\x00\x04\xc0\x00\x00\x04\xc0\x06\x00\x08' + b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' + b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00', + length=1536) + +def test_sctp_parser_parse_cookie_ack(): + """test: SCTP header parser parses SCTP Header with a COKKIE ACK chunk + + The packet is made of a SCTP header with the following fields: + - id='Source Port Number' length=16 position=0 value=b'\x00\x50' + - id='Destination Port Number' length=16 position=0 value=b'\x80\x44' + - id='Verification Tag' length=32 position=0 value=b'\x3b\xb9\x9c\x46' + - id='Checksum' length=32 position=0 value=b'\x81\xce\xde\x0b' + - id='Chunk Type' length=8 position=0 value=b'\x0e' + - id='Chunk Flags' length=8 position=0 value=b'\x00' + - id='Chunk Length' length=16 position=0 value=b'\x00\x04' + """ + + valid_sctp_packet:bytes = bytes(b'\x00\x50\x80\x44\x3b\xb9\x9c\x46\x81\xce\xde\x0b\x0b\x00\x00\x04') + + valid_sctp_packet_buffer: Buffer = Buffer(content=valid_sctp_packet, length=len(valid_sctp_packet)*8) + parser:SCTPParser = SCTPParser() + + sctp_header_descriptor: HeaderDescriptor = parser.parse(buffer=valid_sctp_packet_buffer) + + # test sctp_header_descriptor type + assert isinstance(sctp_header_descriptor, HeaderDescriptor) + + # test for sctp_header_descriptor.fields length + assert len(sctp_header_descriptor.fields) == 7 + + # test for sctp_header_descriptor.fields types + for field in sctp_header_descriptor.fields: + assert isinstance(field, FieldDescriptor) + + # assert field descriptors match SCTP header content + # - common header fields + source_port_fd:FieldDescriptor = sctp_header_descriptor.fields[0] + assert source_port_fd.id == SCTPFields.SOURCE_PORT + assert source_port_fd.position == 0 + assert source_port_fd.value == Buffer(content=b'\x00\x50', length=16) + + destination_port_fd:FieldDescriptor = sctp_header_descriptor.fields[1] + assert destination_port_fd.id == SCTPFields.DESTINATION_PORT + assert destination_port_fd.position == 0 + assert destination_port_fd.value == Buffer(content=b'\x80\x44', length=16) + + verification_tag_fd:FieldDescriptor = sctp_header_descriptor.fields[2] + assert verification_tag_fd.id == SCTPFields.VERIFICATION_TAG + assert verification_tag_fd.position == 0 + assert verification_tag_fd.value == Buffer(content=b'\x3b\xb9\x9c\x46', length=32) + + checksum_fd:FieldDescriptor = sctp_header_descriptor.fields[3] + assert checksum_fd.id == SCTPFields.CHECKSUM + assert checksum_fd.position == 0 + assert checksum_fd.value == Buffer(content=b'\x81\xce\xde\x0b', length=32) + + + # - chunk common header fields + chunk_type_fd:FieldDescriptor = sctp_header_descriptor.fields[4] + assert chunk_type_fd.id == SCTPFields.CHUNK_TYPE + assert chunk_type_fd.position == 0 + assert chunk_type_fd.value == Buffer(content=b'\x0b', length=8) + + chunk_flags_fd:FieldDescriptor = sctp_header_descriptor.fields[5] + assert chunk_flags_fd.id == SCTPFields.CHUNK_FLAGS + assert chunk_flags_fd.position == 0 + assert chunk_flags_fd.value == Buffer(content=b'\x00', length=8) + + chunk_length_fd:FieldDescriptor = sctp_header_descriptor.fields[6] + assert chunk_length_fd.id == SCTPFields.CHUNK_LENGTH + assert chunk_length_fd.position == 0 + assert chunk_length_fd.value == Buffer(content=b'\x00\x04', length=16) \ No newline at end of file diff --git a/tests/test_microschc.py b/tests/test_microschc.py index dce3adb..4f5d419 100644 --- a/tests/test_microschc.py +++ b/tests/test_microschc.py @@ -2,4 +2,4 @@ def test_version(): - assert __version__ == '0.16.4' + assert __version__ == '0.17.0'