Skip to content

Commit 935edcb

Browse files
committed
[wip] SCTP heartbeat ack chunk
1 parent 3375e7f commit 935edcb

File tree

2 files changed

+114
-1
lines changed

2 files changed

+114
-1
lines changed

microschc/protocol/sctp.py

+24
Original file line numberDiff line numberDiff line change
@@ -202,6 +202,8 @@ def _parse_chunk(self, buffer: Buffer) -> Tuple[List[FieldDescriptor], int]:
202202
chunk_fields: List[FieldDescriptor] = self._parse_chunk_selective_ack(chunk_value)
203203
elif chunk_type_value == SCTPChunkTypes.HEARTBEAT:
204204
chunk_fields: List[FieldDescriptor] = self._parse_chunk_heartbeat(chunk_value)
205+
elif chunk_type_value == SCTPChunkTypes.HEARTBEAT_ACK:
206+
chunk_fields: List[FieldDescriptor] = self._parse_chunk_heartbeat_ack(chunk_value)
205207
else:
206208
chunk_fields: List[FieldDescriptor] = [FieldDescriptor(id=SCTPFields.CHUNK_VALUE, position=0, value=chunk_value)]
207209
fields.extend(chunk_fields)
@@ -428,6 +430,28 @@ def _parse_chunk_heartbeat(self, buffer: Buffer) -> List[FieldDescriptor]:
428430
parameters = parameters[bits_consumed:]
429431

430432
return fields
433+
434+
def _parse_chunk_heartbeat_ack(self, buffer: Buffer) -> List[FieldDescriptor]:
435+
"""
436+
0 1 2 3
437+
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
438+
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
439+
| Type = 5 | Chunk Flags | Heartbeat Length |
440+
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
441+
\ \
442+
/ Heartbeat Information TLV (Variable-Length) /
443+
\ \
444+
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
445+
"""
446+
fields: List[FieldDescriptor] = []
447+
448+
parameters = buffer
449+
while parameters.length > 0:
450+
parameter_fields, bits_consumed = self._parse_parameter(parameters)
451+
fields.extend(parameter_fields)
452+
parameters = parameters[bits_consumed:]
453+
454+
return fields
431455

432456
def _parse_parameter(self, buffer: Buffer) -> Tuple[List[FieldDescriptor], int]:
433457
"""

tests/protocol/test_sctp.py

+90-1
Original file line numberDiff line numberDiff line change
@@ -581,7 +581,6 @@ def test_sctp_parser_parse_heartbeat():
581581
assert checksum_fd.position == 0
582582
assert checksum_fd.value == Buffer(content=b'\x53\xc3\x05\x5f', length=32)
583583

584-
585584
# - chunk common header fields
586585
chunk_type_fd:FieldDescriptor = sctp_header_descriptor.fields[4]
587586
assert chunk_type_fd.id == SCTPFields.CHUNK_TYPE
@@ -608,6 +607,96 @@ def test_sctp_parser_parse_heartbeat():
608607
assert parameter_length_fd.position == 0
609608
assert parameter_length_fd.value == Buffer(content=b'\x00\x14', length=16)
610609

610+
parameter_value_fd:FieldDescriptor = sctp_header_descriptor.fields[9]
611+
assert parameter_value_fd.id == SCTPFields.PARAMETER_VALUE
612+
assert parameter_value_fd.position == 0
613+
assert parameter_value_fd.value == Buffer(content=b'\x40\xe4\x4b\x92\x0a\x1c\x06\x2c\x1b\x66\xaf\x7e\x00\x00\x00\x00', length=128)
614+
615+
616+
def test_sctp_parser_parse_heartbeat_ack():
617+
"""test: SCTP header parser parses SCTP Header with HEARTBEAT ACK chunk
618+
619+
The packet is made of a SCTP header with the following fields:
620+
- id='Source Port Number' length=16 position=0 value=b'\x0b\x59'
621+
- id='Destination Port Number' length=16 position=0 value=b'\x0b\x59'
622+
- id='Verification Tag' length=32 position=0 value=b'\x0d\x53\xe6\xfe'
623+
- id='Checksum' length=32 position=0 value=b'\x8c\x8e\x07\x46'
624+
- id='Chunk Type' length=8 position=0 value=b'\x05'
625+
- id='Chunk Flags' length=8 position=0 value=b'\x00'
626+
- id='Chunk Length' length=16 position=0 value=b'\x00\x18'
627+
- id='Parameter Type' length=16 position=0 value=b'\x00\x01'
628+
- id='Parameter Length' length=16 position=0 value=b'\x00\x14'
629+
- id='Parameter Value' length=16 position=0 value=b'\x40\xe4\x4b\x92\x0a\x1c\x06\x2c\x1b\x66\xaf\x7e\x00\x00\x00\x00'
630+
631+
"""
632+
633+
valid_sctp_packet:bytes = bytes(b'\x0b\x59\x0b\x59\x0d\x53\xe6\xfe\x8c\x8e\x07\x46\x05\x00\x00\x18'
634+
b'\x00\x01\x00\x14\x40\xe4\x4b\x92\x0a\x1c\x06\x2c\x1b\x66\xaf\x7e'
635+
b'\x00\x00\x00\x00')
636+
637+
valid_sctp_packet_buffer: Buffer = Buffer(content=valid_sctp_packet, length=len(valid_sctp_packet)*8)
638+
parser:SCTPParser = SCTPParser()
639+
640+
sctp_header_descriptor: HeaderDescriptor = parser.parse(buffer=valid_sctp_packet_buffer)
641+
642+
# test sctp_header_descriptor type
643+
assert isinstance(sctp_header_descriptor, HeaderDescriptor)
644+
645+
# test for sctp_header_descriptor.fields length
646+
assert len(sctp_header_descriptor.fields) == 10
647+
648+
# test for sctp_header_descriptor.fields types
649+
for field in sctp_header_descriptor.fields:
650+
assert isinstance(field, FieldDescriptor)
651+
652+
# assert field descriptors match SCTP header content
653+
# - common header fields
654+
source_port_fd:FieldDescriptor = sctp_header_descriptor.fields[0]
655+
assert source_port_fd.id == SCTPFields.SOURCE_PORT
656+
assert source_port_fd.position == 0
657+
assert source_port_fd.value == Buffer(content=b'\x0b\x59', length=16)
658+
659+
destination_port_fd:FieldDescriptor = sctp_header_descriptor.fields[1]
660+
assert destination_port_fd.id == SCTPFields.DESTINATION_PORT
661+
assert destination_port_fd.position == 0
662+
assert destination_port_fd.value == Buffer(content=b'\x0b\x59', length=16)
663+
664+
verification_tag_fd:FieldDescriptor = sctp_header_descriptor.fields[2]
665+
assert verification_tag_fd.id == SCTPFields.VERIFICATION_TAG
666+
assert verification_tag_fd.position == 0
667+
assert verification_tag_fd.value == Buffer(content=b'\x0d\x53\xe6\xfe', length=32)
668+
669+
checksum_fd:FieldDescriptor = sctp_header_descriptor.fields[3]
670+
assert checksum_fd.id == SCTPFields.CHECKSUM
671+
assert checksum_fd.position == 0
672+
assert checksum_fd.value == Buffer(content=b'\x8c\x8e\x07\x46', length=32)
673+
674+
# - chunk common header fields
675+
chunk_type_fd:FieldDescriptor = sctp_header_descriptor.fields[4]
676+
assert chunk_type_fd.id == SCTPFields.CHUNK_TYPE
677+
assert chunk_type_fd.position == 0
678+
assert chunk_type_fd.value == Buffer(content=b'\x05', length=8)
679+
680+
chunk_flags_fd:FieldDescriptor = sctp_header_descriptor.fields[5]
681+
assert chunk_flags_fd.id == SCTPFields.CHUNK_FLAGS
682+
assert chunk_flags_fd.position == 0
683+
assert chunk_flags_fd.value == Buffer(content=b'\x00', length=8)
684+
685+
chunk_length_fd:FieldDescriptor = sctp_header_descriptor.fields[6]
686+
assert chunk_length_fd.id == SCTPFields.CHUNK_LENGTH
687+
assert chunk_length_fd.position == 0
688+
assert chunk_length_fd.value == Buffer(content=b'\x00\x18', length=16)
689+
690+
parameter_type_fd:FieldDescriptor = sctp_header_descriptor.fields[7]
691+
assert parameter_type_fd.id == SCTPFields.PARAMETER_TYPE
692+
assert parameter_type_fd.position == 0
693+
assert parameter_type_fd.value == Buffer(content=b'\x00\x01', length=16)
694+
695+
parameter_length_fd:FieldDescriptor = sctp_header_descriptor.fields[8]
696+
assert parameter_length_fd.id == SCTPFields.PARAMETER_LENGTH
697+
assert parameter_length_fd.position == 0
698+
assert parameter_length_fd.value == Buffer(content=b'\x00\x14', length=16)
699+
611700
parameter_value_fd:FieldDescriptor = sctp_header_descriptor.fields[9]
612701
assert parameter_value_fd.id == SCTPFields.PARAMETER_VALUE
613702
assert parameter_value_fd.position == 0

0 commit comments

Comments
 (0)