diff --git a/microschc/protocol/coap.py b/microschc/protocol/coap.py index e63fbd6..9bfd672 100644 --- a/microschc/protocol/coap.py +++ b/microschc/protocol/coap.py @@ -33,8 +33,8 @@ from enum import Enum, IntEnum from microschc.compat import StrEnum import re -from typing import Dict, List, Tuple, Type, Union -from microschc.binary.buffer import Buffer, Padding +from typing import Dict, List, Optional, Set, Tuple, Union +from microschc.binary.buffer import Buffer from microschc.parser import HeaderParser, ParserError from microschc.parser.parser import UnparserError from microschc.protocol.registry import REGISTER_PARSER, ProtocolsIDs @@ -83,45 +83,44 @@ class CoAPFields(StrEnum): OPTION_SIZE1 = f'{COAP_HEADER_ID}:Option Size1' OPTION_UNKNOWN = f'{COAP_HEADER_ID}:Option Unknown' - class CoAPOptionIDs(IntEnum): - IF_MATCH = 1 - URI_HOST = 3 - ETAG = 4 - IF_NONE_MATCH = 5 - OBSERVE = 6 - URI_PORT = 7 - LOCATION_PATH = 8 - URI_PATH = 11 - CONTENT_FORMAT = 12 - MAX_AGE = 14 - URI_QUERY = 15 - ACCEPT = 17 - LOCATION_QUERY = 20 - BLOCK2 = 23 - BLOCK1 = 27 - PROXY_URI = 35 - PROXY_SCHEME = 39 - SIZE1 = 60 + OPTION_IF_MATCH = 1 + OPTION_URI_HOST = 3 + OPTION_ETAG = 4 + OPTION_IF_NONE_MATCH = 5 + OPTION_OBSERVE = 6 + OPTION_URI_PORT = 7 + OPTION_LOCATION_PATH = 8 + OPTION_URI_PATH = 11 + OPTION_CONTENT_FORMAT = 12 + OPTION_MAX_AGE = 14 + OPTION_URI_QUERY = 15 + OPTION_ACCEPT = 17 + OPTION_LOCATION_QUERY = 20 + OPTION_BLOCK2 = 23 + OPTION_BLOCK1 = 27 + OPTION_PROXY_URI = 35 + OPTION_PROXY_SCHEME = 39 + OPTION_SIZE1 = 60 COAP_OPTIONS_NUMBER_TO_NAME = { - CoAPOptionIDs.IF_MATCH: CoAPFields.OPTION_IF_MATCH, - CoAPOptionIDs.URI_HOST: CoAPFields.OPTION_URI_HOST, - CoAPOptionIDs.ETAG: CoAPFields.OPTION_ETAG, - CoAPOptionIDs.IF_NONE_MATCH: CoAPFields.OPTION_IF_NONE_MATCH, - CoAPOptionIDs.OBSERVE: CoAPFields.OPTION_OBSERVE, - CoAPOptionIDs.URI_PORT: CoAPFields.OPTION_URI_PORT, - CoAPOptionIDs.LOCATION_PATH: CoAPFields.OPTION_LOCATION_PATH, - CoAPOptionIDs.URI_PATH: CoAPFields.OPTION_URI_PATH, - CoAPOptionIDs.CONTENT_FORMAT: CoAPFields.OPTION_CONTENT_FORMAT, - CoAPOptionIDs.MAX_AGE: CoAPFields.OPTION_MAX_AGE, - CoAPOptionIDs.URI_QUERY: CoAPFields.OPTION_URI_QUERY, - CoAPOptionIDs.ACCEPT: CoAPFields.OPTION_ACCEPT, - CoAPOptionIDs.LOCATION_QUERY: CoAPFields.OPTION_LOCATION_QUERY, - CoAPOptionIDs.BLOCK1: CoAPFields.OPTION_BLOCK1, - CoAPOptionIDs.PROXY_URI: CoAPFields.OPTION_PROXY_URI, - CoAPOptionIDs.PROXY_SCHEME: CoAPFields.OPTION_PROXY_SCHEME, - CoAPOptionIDs.SIZE1: CoAPFields.OPTION_SIZE1 + CoAPOptionIDs.OPTION_IF_MATCH: CoAPFields.OPTION_IF_MATCH, + CoAPOptionIDs.OPTION_URI_HOST: CoAPFields.OPTION_URI_HOST, + CoAPOptionIDs.OPTION_ETAG: CoAPFields.OPTION_ETAG, + CoAPOptionIDs.OPTION_IF_NONE_MATCH: CoAPFields.OPTION_IF_NONE_MATCH, + CoAPOptionIDs.OPTION_OBSERVE: CoAPFields.OPTION_OBSERVE, + CoAPOptionIDs.OPTION_URI_PORT: CoAPFields.OPTION_URI_PORT, + CoAPOptionIDs.OPTION_LOCATION_PATH: CoAPFields.OPTION_LOCATION_PATH, + CoAPOptionIDs.OPTION_URI_PATH: CoAPFields.OPTION_URI_PATH, + CoAPOptionIDs.OPTION_CONTENT_FORMAT: CoAPFields.OPTION_CONTENT_FORMAT, + CoAPOptionIDs.OPTION_MAX_AGE: CoAPFields.OPTION_MAX_AGE, + CoAPOptionIDs.OPTION_URI_QUERY: CoAPFields.OPTION_URI_QUERY, + CoAPOptionIDs.OPTION_ACCEPT: CoAPFields.OPTION_ACCEPT, + CoAPOptionIDs.OPTION_LOCATION_QUERY: CoAPFields.OPTION_LOCATION_QUERY, + CoAPOptionIDs.OPTION_BLOCK1: CoAPFields.OPTION_BLOCK1, + CoAPOptionIDs.OPTION_PROXY_URI: CoAPFields.OPTION_PROXY_URI, + CoAPOptionIDs.OPTION_PROXY_SCHEME: CoAPFields.OPTION_PROXY_SCHEME, + CoAPOptionIDs.OPTION_SIZE1: CoAPFields.OPTION_SIZE1 } COAP_OPTIONS_NAME_TO_NUMBER = {v:k for k,v in COAP_OPTIONS_NUMBER_TO_NAME.items()} @@ -137,6 +136,7 @@ class CoAPDefinitions(bytes, Enum): class CoAPOptionMode(StrEnum): SYNTACTIC = 'syntactic' SEMANTIC = 'semantic' + LAURENT = 'semantic' class CoAPParser(HeaderParser): @@ -244,34 +244,34 @@ def unparse(self, decompressed_fields: List[Tuple[str, Buffer]]) -> List[Tuple[s # option delta if option_delta < 13: - unparsed_fields.append((CoAPFields.OPTION_DELTA, Buffer(content=option_delta.to_bytes(length=1, byteorder='little'), length=4, padding=Padding.LEFT))) + unparsed_fields.append((CoAPFields.OPTION_DELTA, create_target_value(option_delta, length=4))) elif option_delta < 269: - unparsed_fields.append((CoAPFields.OPTION_DELTA, Buffer(content=(13).to_bytes(length=1, byteorder='little'), length=4, padding=Padding.LEFT))) + unparsed_fields.append((CoAPFields.OPTION_DELTA, create_target_value(CoAPDefinitions.OPTION_DELTA_EXTENDED_8BITS, length=4))) else: - unparsed_fields.append((CoAPFields.OPTION_DELTA, Buffer(content=(14).to_bytes(length=1, byteorder='little'), length=4, padding=Padding.LEFT))) + unparsed_fields.append((CoAPFields.OPTION_DELTA, create_target_value(CoAPDefinitions.OPTION_DELTA_EXTENDED_16BITS, length=4))) previous_option_number = option_number # option length option_length_bytes: int = field_value.length//8 if option_length_bytes < 12: - unparsed_fields.append((CoAPFields.OPTION_LENGTH, Buffer(content=option_length_bytes.to_bytes(length=1, byteorder='little'), length=4, padding=Padding.LEFT))) + unparsed_fields.append((CoAPFields.OPTION_LENGTH, create_target_value(option_length_bytes, length=4))) elif option_length_bytes < 269: - unparsed_fields.append((CoAPFields.OPTION_LENGTH, Buffer(content=(13).to_bytes(length=1, byteorder='little'), length=4, padding=Padding.LEFT))) + unparsed_fields.append((CoAPFields.OPTION_LENGTH, create_target_value(CoAPDefinitions.OPTION_LENGTH_EXTENDED_8BITS, length=4))) else: - unparsed_fields.append((CoAPFields.OPTION_LENGTH, Buffer(content=(14).to_bytes(length=1, byteorder='little'), length=4, padding=Padding.LEFT))) + unparsed_fields.append((CoAPFields.OPTION_LENGTH, create_target_value(CoAPDefinitions.OPTION_LENGTH_EXTENDED_16BITS, length=4))) # option delta extended if option_delta > 13 and option_delta < 269: - unparsed_fields.append((CoAPFields.OPTION_DELTA_EXTENDED, Buffer(content=(option_delta-13).to_bytes(length=1, byteorder='little'), length=8, padding=Padding.LEFT))) + unparsed_fields.append((CoAPFields.OPTION_DELTA_EXTENDED, create_target_value(option_delta-13, length=8))) elif option_delta > 268: - unparsed_fields.append((CoAPFields.OPTION_DELTA_EXTENDED, Buffer(content=(option_delta-269).to_bytes(length=2, byteorder='little'), length=16, padding=Padding.LEFT))) + unparsed_fields.append((CoAPFields.OPTION_DELTA_EXTENDED, create_target_value(option_delta-269, length=8))) # option length extended if option_length_bytes > 11 and option_length_bytes < 269: - unparsed_fields.append((CoAPFields.OPTION_LENGTH_EXTENDED, Buffer(content=(option_length_bytes-13).to_bytes(length=1, byteorder='little'), length=8, padding=Padding.LEFT))) + unparsed_fields.append((CoAPFields.OPTION_LENGTH_EXTENDED, create_target_value(option_length_bytes-13, length=8))) elif option_length_bytes > 268: - unparsed_fields.append((CoAPFields.OPTION_LENGTH_EXTENDED, Buffer(content=(option_length_bytes-269).to_bytes(length=2, byteorder='little'), length=16, padding=Padding.LEFT))) + unparsed_fields.append((CoAPFields.OPTION_LENGTH_EXTENDED, create_target_value(option_length_bytes-269, length=8))) # option value unparsed_fields.append((CoAPFields.OPTION_VALUE, field_value)) @@ -339,6 +339,7 @@ def _parse_options(buffer: Buffer, mode:CoAPOptionMode) -> Tuple[List[FieldDescr elif option_length == CoAPDefinitions.OPTION_LENGTH_EXTENDED_16BITS: # option_length_extended: 16 bits + option_length_int = 269 option_length_extended: Buffer = option_bytes[option_offset:option_offset+16] option_length_extended_int: int = option_length_extended.value() option_field_positions[CoAPFields.OPTION_LENGTH_EXTENDED] += 1 @@ -531,6 +532,7 @@ def coap_base_header_template( - MESSAGE_ID - TOKEN (only included if token is provided) """ + # Create target values for each field target_values = { CoAPFields.VERSION: create_target_value(1, length=2), # CoAP version 1 @@ -587,88 +589,261 @@ def coap_base_header_template( return field_descriptors def coap_option_template( - option_delta: Union[bytes, Buffer, int], - option_length: Union[bytes, Buffer, int], + option_name: Union[str, None] = None, + option_delta: Union[bytes, Buffer, TargetValue, List, Dict, int, None] = None, + option_length:Union[bytes, Buffer, TargetValue, List, Dict, int, None] = None, + option_delta_extended: Union[bytes, Buffer, TargetValue, List, Dict, int, None] = None, + option_length_extended: Union[bytes, Buffer, TargetValue, List, Dict, int, None] = None, option_value: Union[bytes, Buffer, int, None] = None, - option_delta_extended: Union[bytes, Buffer, int, None] = None, - option_length_extended: Union[bytes, Buffer, int, None] = None + last_option_name: Union[str, None] = None, + position: int = 1, + direction: DI = DI.BIDIRECTIONAL ) -> List[RuleFieldDescriptor]: """ Rule descriptor template for CoAP option. + + This can be used in two ways: + - If `option_name` is provided, generates descriptors for the Option Delta, Option Length, + Option Delta Extended, Option Length Extended (if needed) and Option Value. + - If `option_length` is provided, `option_length` is assumed to be the length in bytes (int). In that case, + the proper Option Length and Option Length Extended fields are generated and the length of the Option Value + is calculated. + - If `option_length` is *not* provided, `option_value` must be a Buffer or bytes. + The length of the Option Value is calculated from the length of the buffer. + + - If no `option_name` is provided, `option_delta` and `option_length` and `option_value` are expected. + Args: - option_delta: The option delta value - option_length: The option length value + option_name: Optional name of the option. + option_length: Optional, the option length in bytes option_value: The option value - option_delta_extended: Optional extended delta value - option_length_extended: Optional extended length value Returns: List[RuleFieldDescriptor]: List of rule field descriptors for the CoAP option """ - # Create target values for each field - target_values = { - CoAPFields.OPTION_DELTA: create_target_value(option_delta, length=4), - CoAPFields.OPTION_LENGTH: create_target_value(option_length, length=4), - CoAPFields.OPTION_VALUE: create_target_value(option_value) if option_value is not None else None, + option_fields_summary = { + CoAPFields.OPTION_DELTA: {'target_value': None, 'field_length': 4}, + CoAPFields.OPTION_LENGTH: {'target_value': None, 'field_length': 4}, } - - # Calculate option value length in bits - option_value_length = 0 - option_length_extended_length = None - option_delta_extended_length = None - - # Handle extended length - if isinstance(target_values[CoAPFields.OPTION_LENGTH], Buffer): - if target_values[CoAPFields.OPTION_LENGTH].content == CoAPDefinitions.OPTION_LENGTH_EXTENDED_8BITS: - option_value_length = (option_length_extended + 13) * 8 - target_values[CoAPFields.OPTION_LENGTH_EXTENDED] = create_target_value(option_length_extended, length=8) - option_length_extended_length = 8 - elif target_values[CoAPFields.OPTION_LENGTH].content == CoAPDefinitions.OPTION_LENGTH_EXTENDED_16BITS: - option_value_length = (option_length_extended + 269) * 8 - target_values[CoAPFields.OPTION_LENGTH_EXTENDED] = create_target_value(option_length_extended, length=16) - option_length_extended_length = 16 + + if option_name is not None: + # retrieve the option number for calculating the option delta + option_field_id: str = coap_option_name_to_field_id(option_name) + option_number: int = coap_option_field_id_to_number(option_field_id) + if last_option_name is not None: + last_option_field_id: str = coap_option_name_to_field_id(last_option_name) + last_option_number: int = coap_option_field_id_to_number(last_option_field_id) else: - option_value_length = option_length * 8 - - # Handle extended delta - if isinstance(target_values[CoAPFields.OPTION_DELTA], Buffer): - if target_values[CoAPFields.OPTION_DELTA].content == CoAPDefinitions.OPTION_DELTA_EXTENDED_8BITS: - target_values[CoAPFields.OPTION_DELTA_EXTENDED] = create_target_value(option_delta_extended, length=8) - option_delta_extended_length = 8 - elif target_values[CoAPFields.OPTION_DELTA].content == CoAPDefinitions.OPTION_DELTA_EXTENDED_16BITS: - target_values[CoAPFields.OPTION_DELTA_EXTENDED] = create_target_value(option_delta_extended, length=16) - option_delta_extended_length = 16 - + last_option_number = 0 + + option_delta: int = option_number - last_option_number + _coap_option_variable_encoding_summary(CoAPFields.OPTION_DELTA, option_delta, option_fields_summary) + + # `option_length` is provided as a number of bytes + if option_length is not None: + # if option_value is not an instance of MatchMapping, Buffer, bytes, use option_length in create_target_value + if not isinstance(option_value, (Buffer, MatchMapping, bytes, list, dict)): + option_value_tv: TargetValue = create_target_value(option_value, option_length*8) + else: + option_value_tv: TargetValue = create_target_value(option_value) + _coap_option_variable_encoding_summary(CoAPFields.OPTION_LENGTH, option_length, option_fields_summary) + option_fields_summary[CoAPFields.OPTION_VALUE] = {} + option_fields_summary[CoAPFields.OPTION_VALUE]['target_value'] = option_value_tv + option_fields_summary[CoAPFields.OPTION_VALUE]['field_length'] = option_length * 8 if isinstance(option_length, int) else 0 + else: + # If `option_length` is *not* provided, `option_value` must be a Buffer or bytes or match_mapping + option_value_tv: TargetValue = create_target_value(option_value) + if isinstance(option_value_tv, Buffer): + _coap_option_variable_encoding_summary(CoAPFields.OPTION_LENGTH, option_value_tv.length//8, option_fields_summary) + option_fields_summary[CoAPFields.OPTION_VALUE] = {} + option_fields_summary[CoAPFields.OPTION_VALUE]['target_value'] = option_value_tv + option_fields_summary[CoAPFields.OPTION_VALUE]['field_length'] = option_value_tv.length + elif isinstance(option_value_tv, MatchMapping): + mapping_values: List[Buffer] = list(option_value_tv.forward.keys()) + option_lengths: Set[int] = set([mv.length//8 for mv in mapping_values]) + _coap_option_variable_encoding_summary(CoAPFields.OPTION_LENGTH, option_lengths, option_fields_summary) + option_fields_summary[CoAPFields.OPTION_VALUE] = {} + option_fields_summary[CoAPFields.OPTION_VALUE]['target_value'] = option_value_tv + option_fields_summary[CoAPFields.OPTION_VALUE]['field_length'] = 0 + else: + raise ValueError(f"option value must be a Buffer or bytes if option length is not provided") + + else: + option_delta_tv = create_target_value(option_delta, length=4) + option_length_tv = create_target_value(option_length, length=4) + option_fields_summary[CoAPFields.OPTION_DELTA]['target_value'] = option_delta_tv + option_fields_summary[CoAPFields.OPTION_LENGTH]['target_value'] = option_length_tv + + if option_delta_extended is not None: + option_delta_extended_length = None + if isinstance(option_delta_tv, Buffer) and option_delta_tv.content == CoAPDefinitions.OPTION_DELTA_EXTENDED_8BITS: + option_delta_extended_length = 8 + option_delta_extended_fl = 8 + elif isinstance(option_delta_tv, Buffer) and option_delta_tv.content == CoAPDefinitions.OPTION_DELTA_EXTENDED_16BITS: + option_delta_extended_length = 16 + option_delta_extended_fl = 16 + else: + option_delta_extended_length = None + option_delta_extended_fl = 0 + + option_delta_extended_tv = create_target_value(option_delta_extended, length=option_delta_extended_length) + option_fields_summary[CoAPFields.OPTION_DELTA_EXTENDED]= {} + option_fields_summary[CoAPFields.OPTION_DELTA_EXTENDED]['target_value'] = option_delta_extended_tv + option_fields_summary[CoAPFields.OPTION_DELTA_EXTENDED]['field_length'] = option_delta_extended_fl + + if option_length_extended is not None: + option_length_extended_length = None + if isinstance(option_length_tv, Buffer) and option_delta_tv.content == CoAPDefinitions.OPTION_LENGTH_EXTENDED_8BITS: + option_length_extended_length = 8 + option_length_extended_fl = 8 + elif isinstance(option_delta_tv, Buffer) and option_delta_tv.content == CoAPDefinitions.OPTION_LENGTH_EXTENDED_16BITS: + option_length_extended_length = 16 + option_length_extended_fl = 16 + else: + option_length_extended_length = None + option_length_extended_fl = 0 + option_length_extended_tv = create_target_value(option_length_extended, length=option_length_extended_length) + option_fields_summary[CoAPFields.OPTION_LENGTH_EXTENDED]= {} + option_fields_summary[CoAPFields.OPTION_LENGTH_EXTENDED]['target_value'] = option_length_extended_tv + option_fields_summary[CoAPFields.OPTION_LENGTH_EXTENDED]['field_length'] = option_length_extended_fl + + # determine option_value_fl + if isinstance(option_length_tv, Buffer) and option_length_tv.length == 4: + # option_length assumes a single value, use it for option_value_fl + if option_length_tv.value() < 13: + option_value_fl = option_length_tv.value() * 8 + elif option_length_tv.content == CoAPDefinitions.OPTION_LENGTH_EXTENDED_8BITS and isinstance(option_length_extended_tv, Buffer) and option_length_extended_tv.length == 8: + option_value_fl = (13 + option_length_extended_tv.value()) * 8 + elif option_length_tv.content == CoAPDefinitions.OPTION_LENGTH_EXTENDED_16BITS and isinstance(option_length_extended_tv, Buffer): + option_value_fl = (269 + option_length_extended_tv.value()) * 8 + else: + option_value_fl = 0 + else: + option_value_fl = 0 + + # if option_value is not an instance of MatchMapping, Buffer, bytes, use option_value_fl in create_target_value + if not isinstance(option_value, (Buffer, MatchMapping, bytes, list, dict)): + option_value_tv: TargetValue = create_target_value(option_value, option_value_fl) + else: + option_value_tv: TargetValue = create_target_value(option_value) + + option_fields_summary[CoAPFields.OPTION_VALUE] = {} + option_fields_summary[CoAPFields.OPTION_VALUE]['target_value'] = option_value_tv + option_fields_summary[CoAPFields.OPTION_VALUE]['field_length'] = option_value_fl + # Generate rule field descriptors from option fields field_descriptors = [] - for field in COAP_OPTION_TEMPLATE: - if field.id in target_values: - # For OPTION_VALUE, use the calculated length from option_length and option_length_extended - field_length = field.length - if field.id == CoAPFields.OPTION_VALUE: - field_length = option_value_length - # for OPTION_DELTA_EXTENDED, use field_length = 8 if OPTION_DELTA is OPTION_DELTA_EXTENDED_8BITS - # or field_length = 16 if OPTION_DELTA is OPTION_DELTA_EXTENDED_16BITS - elif field.id == CoAPFields.OPTION_DELTA_EXTENDED and option_delta_extended_length is not None: - field_length = option_delta_extended_length - elif field.id == CoAPFields.OPTION_LENGTH_EXTENDED and option_length_extended_length is not None: - field_length = option_length_extended_length - - matching_operator = select_mo(target_values[field.id], field_length) - compression_decompression_action = select_cda(matching_operator) - field_descriptors.append( - RuleFieldDescriptor( - id=field.id, - length=field_length, - position=field.position, - direction=field.direction, - matching_operator=matching_operator, - compression_decompression_action=compression_decompression_action, - target_value=target_values[field.id] - ) + + for field_id, field_summary in option_fields_summary.items(): + field_length = field_summary['field_length'] + field_target_value = field_summary['target_value'] + + matching_operator = select_mo(field_target_value, field_length) + compression_decompression_action = select_cda(matching_operator) + field_descriptors.append( + RuleFieldDescriptor( + id=field_id, + length=field_length, + position=position, + direction=direction, + matching_operator=matching_operator, + compression_decompression_action=compression_decompression_action, + target_value=field_target_value ) - + ) return field_descriptors +def coap_semantic_option_template(option_name: str, option_value: Union[bytes, int, TargetValue, List[Buffer]], option_length=None, position:Optional[int] = 1) -> List[RuleFieldDescriptor]: + """ + Creates a single RuleFieldDescriptor for a semantic CoAP option. + + Args: + option_name: The name of the CoAP option (e.g., 'uri-path', 'content-format'). + option_value: The option_value of the CoAP option. Can be a bytes object, an integer, a TargetValue, or a list of Buffers. + option_length: The length of the option in *bytes*. If None, it will be inferred from the option_value. + position: The position of the option in the CoAP message. Default is 1. + + Returns: + List[RuleFieldDescriptor]: A list containing one RuleFieldDescriptor for the semantic option. + """ + field_id: str = coap_option_name_to_field_id(option_name) + + target_value = create_target_value(option_value) + if option_length is not None: + field_length = option_length * 8 # Convert bytes to bits + else: + field_length = target_value.length + + mo: MO = select_mo(target_value=target_value, field_length=field_length) + cda: CDA = select_cda(matching_operator=mo, field_id=field_id) + + return [ + RuleFieldDescriptor( + id=field_id, + length=field_length, + position=position, + direction=DI.BIDIRECTIONAL, + matching_operator=mo, + compression_decompression_action=cda, + target_value=target_value + ) + ] + +def coap_option_name_to_field_id(option_name: str) -> str: + if option_name in list(CoAPFields): + return option_name + else: + option_id: str = f"OPTION_{option_name.replace('-', '_').upper()}" + try: + field_id: str = getattr(CoAPFields, option_id) + except KeyError: + raise ValueError(f"Invalid CoAP option name: {option_name}") + return field_id + +def coap_option_field_id_to_number(option_id: str) -> int: + if option_id in list(CoAPFields): + return COAP_OPTIONS_NAME_TO_NUMBER[option_id] + else: + raise ValueError(f"Invalid CoAP option ID: {option_id}") + +def _coap_option_variable_encoding_summary(field: str, value: int, summary:Dict) -> None: + if field == CoAPFields.OPTION_DELTA: + extended_field = CoAPFields.OPTION_DELTA_EXTENDED + elif field == CoAPFields.OPTION_LENGTH: + extended_field = CoAPFields.OPTION_LENGTH_EXTENDED + else: + raise ValueError(f"Invalid field: {field}") + + if isinstance(value, int): + if value < 13: + summary[field]['target_value'] = create_target_value(value, length=4) + elif value < 269: + summary[field]['target_value'] = create_target_value(CoAPDefinitions.OPTION_DELTA_EXTENDED_8BITS, length=4) + summary[extended_field] = {} + summary[extended_field]['target_value'] = create_target_value(value-13, length=8) + summary[extended_field]['field_length'] = 8 + else: + summary[field]['target_value'] = create_target_value(CoAPDefinitions.OPTION_DELTA_EXTENDED_16BITS, length=4) + summary[extended_field] = {} + summary[extended_field]['target_value'] = create_target_value(value-269, length=16) + summary[extended_field]['field_length'] = 16 + + elif isinstance(value, list) and all(isinstance(od, int) for od in value): + if all(od < 13 for od in value): + summary[field]['target_value'] = create_target_value(value, length=4) + elif all(od >= 13 and od < 269 for od in value): + summary[field]['target_value'] = create_target_value(CoAPDefinitions.OPTION_DELTA_EXTENDED_8BITS, length=4) + summary[extended_field] = {} + summary[extended_field]['target_value'] = create_target_value([od-13 for od in value], length=8) + summary[extended_field]['field_length'] = 8 + elif all(od >=269 for od in value): + summary[field]['target_value'] = create_target_value(CoAPDefinitions.OPTION_LENGTH_EXTENDED_16BITS, length=4) + summary[extended_field] = {} + summary[extended_field]['target_value'] = create_target_value([od-269 for od in value], length=16) + summary[extended_field]['field_length'] = 16 + else: + raise ValueError(f"Invalid option delta combinations (requiring different encodings): {value}") + + REGISTER_PARSER(protocol_id=ProtocolsIDs.COAP, parser_class=CoAPParser) diff --git a/microschc/rfc8724.py b/microschc/rfc8724.py index fb7db5b..c3d0b62 100644 --- a/microschc/rfc8724.py +++ b/microschc/rfc8724.py @@ -28,6 +28,12 @@ def __repr__(self) -> str: repr: str = "{" + ",".join([f"{k}:{v}" for k,v in self.reverse.items()]) + "}" return repr + def __eq__(self, other:object): + if not isinstance(other, MatchMapping): + return False + return self.forward == other.forward and self.reverse == other.reverse + + def json(self, indent=None, separators=None): return json.dumps(self.__json__(), indent=indent, separators=separators) diff --git a/microschc/tools/targetvalue.py b/microschc/tools/targetvalue.py index b7b1775..e4219f4 100644 --- a/microschc/tools/targetvalue.py +++ b/microschc/tools/targetvalue.py @@ -14,6 +14,7 @@ def create_target_value(value, length=None, padding=None) -> TargetValue: - list of (bytes, bytes) or (int, int) or (int, bytes): Creates a MatchMapping from pairs - dict with Buffer keys and values: Creates a MatchMapping from the dictionary - MatchMapping or Buffer: Returns the value unchanged + - None length: For Buffer creation, the bit length (required for int and bytes inputs) padding: For Buffer creation, the padding type (defaults to LEFT padding) @@ -25,7 +26,7 @@ def create_target_value(value, length=None, padding=None) -> TargetValue: ValueError: If required parameters are missing for the input type """ # If already a TargetValue, return as is - if isinstance(value, (Buffer, MatchMapping)): + if isinstance(value, (Buffer, MatchMapping)) or (value is None): return value # Handle integer input - create Buffer diff --git a/tests/protocol/test_coap.py b/tests/protocol/test_coap.py index 45a4291..701e9dd 100644 --- a/tests/protocol/test_coap.py +++ b/tests/protocol/test_coap.py @@ -1,6 +1,7 @@ from typing import List, Tuple -from microschc.protocol.coap import CoAPFields, CoAPOptionMode, CoAPParser, coap_base_header_template, coap_option_template -from microschc.rfc8724 import CDA, DI, MO, FieldDescriptor, HeaderDescriptor +from venv import create +from microschc.protocol.coap import CoAPDefinitions, CoAPFields, CoAPOptionMode, CoAPParser, coap_base_header_template, coap_option_template, coap_semantic_option_template +from microschc.rfc8724 import CDA, DI, MO, FieldDescriptor, HeaderDescriptor, MatchMapping from microschc.binary.buffer import Buffer from microschc.tools.targetvalue import create_target_value @@ -367,7 +368,126 @@ def test_coap_parser_parse_semantic(): assert payload_marker_fd.id == CoAPFields.PAYLOAD_MARKER assert payload_marker_fd.position == 0 assert payload_marker_fd.value == Buffer(content=b'\xff', length=8) + +def test_coap_parser_parse_semantic_lmode(): + """test: CoAP header parser parses CoAP packet + + The packet is made of a CoAP header with following fields: + - id='Version' length=2 position=0 value=b'\x01' + - id='Type' length=2 position=0 value=b'\x00' + - id='Token Length' length=4 position=0 value=b'\x08' + - id='Code' length=8 position=0 value=b'\x02' + - id='message ID' length=16 position=0 value=b'\x84\x99' + - id='Token' length=32 position=0 value=b'\x74\xcd\xe8\xcb\x4e\x8c\x0d\xb7' + + - id='Option Uri-Path' length=16 position=1 value=b'\x72\64' + + - id='Option Content-Format' length=8 position=1 value=b'\x28' + + - id='Option Uri-Query' length=24 position=1 value=b'\x62\x3d\x55' + + - id='Option Uri-Query' length=72 position=2 value=b'\x6c\x77\x6d\x32\x6d\x3d\x31\x2e\x31' + + - id='Option Uri-Query' length=48 position=3 value=b'\x6c\x74\x3d\x33\x30\x30' + + - id='Option Uri-Query' length=120 position=4 value=b'\x65\x70\x3d\x38\x35\x62\x61\x39\x62\x64\x61\x63\x30\x62\x65' + + - id='Option Block1' length=8 position=1 value=b'\x0d' + + - id='Option Size1' length=16 position=1 value=b'\x07\x2b' + + - id='Payload Marker' length=8 position=0 value=b'\xff' + + """ + valid_coap_packet:bytes = bytes(b"\x48\x02\x84\x99\x74\xcd\xe8\xcb\x4e\x8c\x0d\xb7\xb2\x72\x64\x11" \ + b"\x28\x33\x62\x3d\x55\x09\x6c\x77\x6d\x32\x6d\x3d\x31\x2e\x31\x06" \ + b"\x6c\x74\x3d\x33\x30\x30\x0d\x02\x65\x70\x3d\x38\x35\x62\x61\x39" \ + b"\x62\x64\x61\x63\x30\x62\x65\xc1\x0d\xd2\x14\x07\x2b\xff") + + valid_coap_packet_buffer: Buffer = Buffer(content=valid_coap_packet, length=len(valid_coap_packet)*8) + parser:CoAPParser = CoAPParser(interpret_options=CoAPOptionMode.LAURENT) + coap_header_descriptor: HeaderDescriptor = parser.parse(buffer=valid_coap_packet_buffer) + + assert isinstance(coap_header_descriptor, HeaderDescriptor) + assert len(coap_header_descriptor.fields) == 15 + for field in coap_header_descriptor.fields: + assert isinstance(field, FieldDescriptor) + + # Header fields + version_fd = coap_header_descriptor.fields[0] + assert version_fd.id == CoAPFields.VERSION + assert version_fd.position == 0 + assert version_fd.value == Buffer(content=b'\x01', length=2) + + type_fd = coap_header_descriptor.fields[1] + assert type_fd.id == CoAPFields.TYPE + assert type_fd.position == 0 + assert type_fd.value == Buffer(content=b'\x00', length=2) + + token_length_fd = coap_header_descriptor.fields[2] + assert token_length_fd.id == CoAPFields.TOKEN_LENGTH + assert token_length_fd.position == 0 + assert token_length_fd.value == Buffer(content=b'\x08', length=4) + + code_fd = coap_header_descriptor.fields[3] + assert code_fd.id == CoAPFields.CODE + assert code_fd.position == 0 + assert code_fd.value == Buffer(content=b'\x02', length=8) + + message_id_fd = coap_header_descriptor.fields[4] + assert message_id_fd.id == CoAPFields.MESSAGE_ID + assert message_id_fd.position == 0 + assert message_id_fd.value == Buffer(content=b'\x84\x99', length=16) + + token_fd = coap_header_descriptor.fields[5] + assert token_fd.id == CoAPFields.TOKEN + assert token_fd.position == 0 + assert token_fd.value == Buffer(content=b'\x74\xcd\xe8\xcb\x4e\x8c\x0d\xb7', length=64) + + option_uri_path_fd = coap_header_descriptor.fields[6] + assert option_uri_path_fd.id == CoAPFields.OPTION_URI_PATH + assert option_uri_path_fd.position == 1 + assert option_uri_path_fd.value == Buffer(content=b'\x72\x64', length=16) + + option_content_format_fd = coap_header_descriptor.fields[7] + assert option_content_format_fd.id == CoAPFields.OPTION_CONTENT_FORMAT + assert option_content_format_fd.position == 1 + assert option_content_format_fd.value == Buffer(content=b'\x28', length=8) + + option_uri_query_1_fd = coap_header_descriptor.fields[8] + assert option_uri_query_1_fd.id == CoAPFields.OPTION_URI_QUERY + assert option_uri_query_1_fd.position == 1 + assert option_uri_query_1_fd.value == Buffer(content=b'\x62\x3d\x55', length=24) + + option_uri_query_2_fd = coap_header_descriptor.fields[9] + assert option_uri_query_2_fd.id == CoAPFields.OPTION_URI_QUERY + assert option_uri_query_2_fd.position == 2 + assert option_uri_query_2_fd.value == Buffer(content=b'\x6c\x77\x6d\x32\x6d\x3d\x31\x2e\x31', length=72) + + option_uri_query_3_fd = coap_header_descriptor.fields[10] + assert option_uri_query_3_fd.id == CoAPFields.OPTION_URI_QUERY + assert option_uri_query_3_fd.position == 3 + assert option_uri_query_3_fd.value == Buffer(content=b'\x6c\x74\x3d\x33\x30\x30', length=48) + + option_uri_query_4_fd = coap_header_descriptor.fields[11] + assert option_uri_query_4_fd.id == CoAPFields.OPTION_URI_QUERY + assert option_uri_query_4_fd.position == 4 + assert option_uri_query_4_fd.value == Buffer(content=b'\x65\x70\x3d\x38\x35\x62\x61\x39\x62\x64\x61\x63\x30\x62\x65', length=120) + + option_block1_fd = coap_header_descriptor.fields[12] + assert option_block1_fd.id == CoAPFields.OPTION_BLOCK1 + assert option_block1_fd.position == 1 + assert option_block1_fd.value == Buffer(content=b'\x0d', length=8) + option_size1_fd = coap_header_descriptor.fields[13] + assert option_size1_fd.id == CoAPFields.OPTION_SIZE1 + assert option_size1_fd.position == 1 + assert option_size1_fd.value == Buffer(content=b'\x07\x2b', length=16) + + payload_marker_fd = coap_header_descriptor.fields[14] + assert payload_marker_fd.id == CoAPFields.PAYLOAD_MARKER + assert payload_marker_fd.position == 0 + assert payload_marker_fd.value == Buffer(content=b'\xff', length=8) def test_coap_parser_unparse_semantic(): @@ -621,94 +741,369 @@ def test_coap_base_header_template(): # Verify TOKEN field is not present assert not any(f.id == CoAPFields.TOKEN for f in field_descriptors) -def test_coap_option_template(): +def test_coap_semantic_option_template(): + """Test the CoAP semantic option template generation.""" + # Test with basic option (no extended fields) + option_descriptors = coap_semantic_option_template( + option_name='Uri-Path', + option_length=2, # 2 bytes + option_value=b"rd" # "rd" in bytes + ) + assert len(option_descriptors) == 1 + field_descriptor_0 = option_descriptors[0] + assert field_descriptor_0.id == CoAPFields.OPTION_URI_PATH + assert field_descriptor_0.length == 16 + assert field_descriptor_0.position == 1 + assert field_descriptor_0.direction == DI.BIDIRECTIONAL + assert field_descriptor_0.matching_operator == MO.EQUAL + assert field_descriptor_0.compression_decompression_action == CDA.NOT_SENT + assert field_descriptor_0.target_value == Buffer(content=b"rd") + +def test_coap_option_template_basic(): """Test the CoAP option template generation.""" # Test with basic option (no extended fields) option_descriptors = coap_option_template( - option_delta=11, # URI-Path option + option_name='Uri-Path', + option_length=2, # Length of "rd" in bytes + option_value=b"rd" # "rd" in bytes + ) + assert len(option_descriptors) == 3 # OPTION_DELTA, OPTION_LENGTH, OPTION_VALUE + field_descriptor_0 = option_descriptors[0] + assert field_descriptor_0.id == CoAPFields.OPTION_DELTA + assert field_descriptor_0.length == 4 + assert field_descriptor_0.position == 1 + assert field_descriptor_0.direction == DI.BIDIRECTIONAL + assert field_descriptor_0.matching_operator == MO.EQUAL + assert field_descriptor_0.compression_decompression_action == CDA.NOT_SENT + assert field_descriptor_0.target_value == create_target_value(value=11, length=4) + + + field_descriptor_1 = option_descriptors[1] + assert field_descriptor_1.id == CoAPFields.OPTION_LENGTH + assert field_descriptor_1.length == 4 + assert field_descriptor_1.position == 1 + assert field_descriptor_1.direction == DI.BIDIRECTIONAL + assert field_descriptor_1.matching_operator == MO.EQUAL + assert field_descriptor_1.compression_decompression_action == CDA.NOT_SENT + assert field_descriptor_1.target_value == create_target_value(value=2, length=4) + +def test_coap_option_template_match_mapping(): + """Test the CoAP option template generation with a match mapping.""" + # Test with extended fields and a match mapping + option_descriptors = coap_option_template( + option_name='Uri-Path', + option_length=2, + option_value=[b"rd", b"ra"] + ) + assert len(option_descriptors) == 3 # OPTION_DELTA, OPTION_LENGTH, OPTION_VALUE + + field_descriptor_0 = option_descriptors[0] + assert field_descriptor_0.id == CoAPFields.OPTION_DELTA + assert field_descriptor_0.length == 4 + assert field_descriptor_0.matching_operator == MO.EQUAL + assert field_descriptor_0.compression_decompression_action == CDA.NOT_SENT + assert field_descriptor_0.target_value == create_target_value(value=11, length=4) + + field_descriptor_1 = option_descriptors[1] + assert field_descriptor_1.id == CoAPFields.OPTION_LENGTH + assert field_descriptor_1.length == 4 + assert field_descriptor_1.matching_operator == MO.EQUAL + assert field_descriptor_1.compression_decompression_action == CDA.NOT_SENT + assert field_descriptor_1.target_value == create_target_value(value=2, length=4) + + field_descriptor_2 = option_descriptors[2] + assert field_descriptor_2.id == CoAPFields.OPTION_VALUE + assert field_descriptor_2.length == 16 + assert field_descriptor_2.matching_operator == MO.MATCH_MAPPING + assert field_descriptor_2.compression_decompression_action == CDA.MAPPING_SENT + assert isinstance(field_descriptor_2.target_value, MatchMapping) + assert list(field_descriptor_2.target_value.forward.keys()) == [create_target_value(b"rd"), create_target_value(b"ra")] + +def test_coap_option_template_two_matchmappings(): + """Test the CoAP option semantic template generation with two MatchMappings.""" + # Test with basic option (no extended fields) + option_descriptors = coap_option_template( + option_name='Uri-Path', + option_length=[2,3], # 2 and 3 bytes + option_value=[b"rd", b"rad"] # "rd" and "ra" in bytes + ) + + assert len(option_descriptors) == 3 # OPTION_DELTA, OPTION_LENGTH, OPTION_VALUE + + field_descriptor_0 = option_descriptors[0] + assert field_descriptor_0.id == CoAPFields.OPTION_DELTA + assert field_descriptor_0.length == 4 + field_descriptor_0.target_value = create_target_value(11, length=4) + + field_descriptor_1 = option_descriptors[1] + assert field_descriptor_1.id == CoAPFields.OPTION_LENGTH + assert field_descriptor_1.length == 4 + assert field_descriptor_1.target_value == create_target_value([2,3], length=4) + + +def test_coap_option_template_semantic_basic(): + """Test the CoAP option semantic template generation.""" + # Test with basic option (no extended fields) + option_descriptors = coap_option_template( + option_name='Uri-Path', option_length=2, # 2 bytes option_value=b"rd" # "rd" in bytes ) - # Verify the number of fields assert len(option_descriptors) == 3 # OPTION_DELTA, OPTION_LENGTH, OPTION_VALUE + field_descriptor_0 = option_descriptors[0] + assert field_descriptor_0.id == CoAPFields.OPTION_DELTA + assert field_descriptor_0.length == 4 + assert field_descriptor_0.target_value == create_target_value(11, length=4) + + field_descriptor_1 = option_descriptors[1] + assert field_descriptor_1.id == CoAPFields.OPTION_LENGTH + assert field_descriptor_1.length == 4 + assert field_descriptor_1.target_value == create_target_value(2, length=4) + + field_descriptor_2 = option_descriptors[2] + assert field_descriptor_2.id == CoAPFields.OPTION_VALUE + assert field_descriptor_2.length == 16 + assert field_descriptor_2.target_value == create_target_value(b"rd", length=16) - # Verify each field's properties - for field in option_descriptors: - assert field.direction == DI.BIDIRECTIONAL - assert field.position == 1 # Options are at position 1 +def test_coap_option_template_semantic_last_option_name(): + """Test the CoAP option semantic template generation.""" + # Test with basic option (no extended fields) + option_descriptors = coap_option_template( + option_name='Uri-Path', + option_length=2, # 2 bytes + option_value=b"rd", # "rd" in bytes + last_option_name='Uri-Path' + ) - if field.id == CoAPFields.OPTION_DELTA: - assert field.length == 4 - assert field.matching_operator == MO.EQUAL - assert field.compression_decompression_action == CDA.NOT_SENT - assert field.target_value == create_target_value(11, length=4) + assert len(option_descriptors) == 3 # OPTION_DELTA, OPTION_LENGTH, OPTION_VALUE + field_descriptor_0 = option_descriptors[0] + assert field_descriptor_0.id == CoAPFields.OPTION_DELTA + assert field_descriptor_0.length == 4 + assert field_descriptor_0.target_value == create_target_value(0, length=4) + + field_descriptor_1 = option_descriptors[1] + assert field_descriptor_1.id == CoAPFields.OPTION_LENGTH + assert field_descriptor_1.length == 4 + assert field_descriptor_1.target_value == create_target_value(2, length=4) + + field_descriptor_2 = option_descriptors[2] + assert field_descriptor_2.id == CoAPFields.OPTION_VALUE + assert field_descriptor_2.length == 16 + assert field_descriptor_2.target_value == create_target_value(b"rd", length=16) - elif field.id == CoAPFields.OPTION_LENGTH: - assert field.length == 4 - assert field.matching_operator == MO.EQUAL - assert field.compression_decompression_action == CDA.NOT_SENT - assert field.target_value == create_target_value(2, length=4) +def test_coap_option_template_semantic_extended_length(): + """Test the CoAP option semantic template generation.""" + # Test with basic option (no extended fields) + option_descriptors = coap_option_template( + option_name='Uri-Path', + option_length=15, # 15 bytes + option_value=b"a"*15 + ) - elif field.id == CoAPFields.OPTION_VALUE: - assert field.length == 16 # 2 bytes = 16 bits - assert field.matching_operator == MO.EQUAL - assert field.compression_decompression_action == CDA.NOT_SENT - assert field.target_value == create_target_value(b"rd", length=16) - - # Test with extended option delta - extended_delta_descriptors = coap_option_template( - option_delta=13, # Extended delta marker - option_length=1, # 1 byte - option_value=b"x", # Single byte value - option_delta_extended=13 # Extended delta value + assert len(option_descriptors) == 4 # OPTION_DELTA, OPTION_LENGTH, OPTION_LENGTH_EXTENDED, OPTION_VALUE + + field_descriptor_0 = option_descriptors[0] + assert field_descriptor_0.id == CoAPFields.OPTION_DELTA + assert field_descriptor_0.length == 4 + assert field_descriptor_0.target_value == create_target_value(11, length=4) + + field_descriptor_1 = option_descriptors[1] + assert field_descriptor_1.id == CoAPFields.OPTION_LENGTH + assert field_descriptor_1.length == 4 + assert field_descriptor_1.target_value == create_target_value(CoAPDefinitions.OPTION_LENGTH_EXTENDED_8BITS, length=4) + + field_descriptor_2 = option_descriptors[2] + assert field_descriptor_2.id == CoAPFields.OPTION_LENGTH_EXTENDED + assert field_descriptor_2.length == 8 + assert field_descriptor_2.target_value == create_target_value(15-13, length=8) + + + field_descriptor_3 = option_descriptors[3] + assert field_descriptor_3.id == CoAPFields.OPTION_VALUE + assert field_descriptor_3.length == 15*8 + assert field_descriptor_3.target_value == create_target_value(b"a"*15) + +def test_coap_option_template_semantic_extended_delta(): + """Test the CoAP option semantic template generation.""" + # Test with basic option (no extended fields) + option_descriptors = coap_option_template( + option_name='Size1', + option_length=2, # 2 bytes + option_value=44 + ) + + assert len(option_descriptors) == 4 # OPTION_DELTA, OPTION_LENGTH, OPTION_LENGTH_EXTENDED, OPTION_VALUE + + field_descriptor_0 = option_descriptors[0] + assert field_descriptor_0.id == CoAPFields.OPTION_DELTA + assert field_descriptor_0.length == 4 + assert field_descriptor_0.target_value == create_target_value(CoAPDefinitions.OPTION_DELTA_EXTENDED_8BITS, length=4) + + field_descriptor_1 = option_descriptors[1] + assert field_descriptor_1.id == CoAPFields.OPTION_LENGTH + assert field_descriptor_1.length == 4 + assert field_descriptor_1.target_value == create_target_value(2, length=4) + + field_descriptor_2 = option_descriptors[2] + assert field_descriptor_2.id == CoAPFields.OPTION_DELTA_EXTENDED + assert field_descriptor_2.length == 8 + assert field_descriptor_2.target_value == create_target_value(60-13, length=8) + + + field_descriptor_3 = option_descriptors[3] + assert field_descriptor_3.id == CoAPFields.OPTION_VALUE + assert field_descriptor_3.length == 16 + assert field_descriptor_3.target_value == create_target_value(44, length=16) + + +def test_coap_option_template_semantic_extended_delta_extended_length(): + """Test the CoAP option semantic template generation.""" + # Test with basic option (no extended fields) + option_descriptors = coap_option_template( + option_name='Proxy-Uri', + option_length=16, # 16 bytes + option_value=b"a"*16 + ) + + assert len(option_descriptors) == 5 # OPTION_DELTA, OPTION_LENGTH, OPTION_DELTA_EXTENDED OPTION_LENGTH_EXTENDED, OPTION_VALUE + + field_descriptor_0 = option_descriptors[0] + assert field_descriptor_0.id == CoAPFields.OPTION_DELTA + assert field_descriptor_0.length == 4 + assert field_descriptor_0.target_value == create_target_value(CoAPDefinitions.OPTION_DELTA_EXTENDED_8BITS, length=4) + + field_descriptor_1 = option_descriptors[1] + assert field_descriptor_1.id == CoAPFields.OPTION_LENGTH + assert field_descriptor_1.length == 4 + assert field_descriptor_1.target_value == create_target_value(CoAPDefinitions.OPTION_LENGTH_EXTENDED_8BITS, length=4) + + field_descriptor_2 = option_descriptors[2] + assert field_descriptor_2.id == CoAPFields.OPTION_DELTA_EXTENDED + assert field_descriptor_2.length == 8 + assert field_descriptor_2.target_value == create_target_value(35-13, length=8) + + field_descriptor_3 = option_descriptors[3] + assert field_descriptor_3.id == CoAPFields.OPTION_LENGTH_EXTENDED + assert field_descriptor_3.length == 8 + assert field_descriptor_3.target_value == create_target_value(16-13, length=8) + + field_descriptor_4 = option_descriptors[4] + assert field_descriptor_4.id == CoAPFields.OPTION_VALUE + assert field_descriptor_4.length == 16 * 8 + assert field_descriptor_4.target_value == create_target_value(b"a"*16) + +def test_coap_syntactic_option_template_basic(): + option_descriptors = coap_option_template( + option_delta=b'\x06', + option_length=1, + option_value=Buffer(content=b'\xff') ) - # Verify the number of fields - assert len(extended_delta_descriptors) == 4 # OPTION_DELTA, OPTION_LENGTH, OPTION_DELTA_EXTENDED, OPTION_VALUE - - # Verify extended delta field - delta_extended_field = next(f for f in extended_delta_descriptors if f.id == CoAPFields.OPTION_DELTA_EXTENDED) - assert delta_extended_field.length == 8 - assert delta_extended_field.matching_operator == MO.EQUAL - assert delta_extended_field.compression_decompression_action == CDA.NOT_SENT - assert delta_extended_field.target_value == create_target_value(13, length=8) - - # Test with extended option length - extended_length_descriptors = coap_option_template( - option_delta=1, # Basic delta - option_length=13, # Extended length marker - option_value=b"long value", # Value longer than 12 bytes - option_length_extended=9 # Extended length value + assert len(option_descriptors) == 3 # OPTION_DELTA, OPTION_LENGTH, OPTION_VALUE + + field_descriptor_0 = option_descriptors[0] + assert field_descriptor_0.id == CoAPFields.OPTION_DELTA + assert field_descriptor_0.length == 4 + assert field_descriptor_0.target_value == create_target_value(6, length=4) + + field_descriptor_1 = option_descriptors[1] + assert field_descriptor_1.id == CoAPFields.OPTION_LENGTH + assert field_descriptor_1.length == 4 + assert field_descriptor_1.target_value == create_target_value(1, length=4) + + field_descriptor_2 = option_descriptors[2] + assert field_descriptor_2.id == CoAPFields.OPTION_VALUE + assert field_descriptor_2.matching_operator == MO.EQUAL + assert field_descriptor_2.compression_decompression_action == CDA.NOT_SENT + assert field_descriptor_2.length == 8 + assert field_descriptor_2.target_value == Buffer(content=b'\xff', length=8) + +def test_coap_syntactic_option_template_extended_delta(): + option_descriptors = coap_option_template( + option_delta=CoAPDefinitions.OPTION_DELTA_EXTENDED_8BITS, + option_delta_extended=1, + option_length=1, + option_value=Buffer(content=b'\xff') ) - # Verify the number of fields - assert len(extended_length_descriptors) == 4 # OPTION_DELTA, OPTION_LENGTH, OPTION_LENGTH_EXTENDED, OPTION_VALUE - - # Verify extended length field - length_extended_field = next(f for f in extended_length_descriptors if f.id == CoAPFields.OPTION_LENGTH_EXTENDED) - assert length_extended_field.length == 8 - assert length_extended_field.matching_operator == MO.EQUAL - assert length_extended_field.compression_decompression_action == CDA.NOT_SENT - assert length_extended_field.target_value == create_target_value(9, length=8) - - # Test with both extended fields - both_extended_descriptors = coap_option_template( - option_delta=14, # Extended delta marker (16 bits) - option_length=14, # Extended length marker (16 bits) - option_value=b"very long value", # Long value - option_delta_extended=269, # Extended delta value - option_length_extended=269 # Extended length value + assert len(option_descriptors) == 4 # OPTION_DELTA, OPTION_LENGTH, OPTION_DELTA_EXTENDED, OPTION_VALUE + + field_descriptor_0 = option_descriptors[0] + assert field_descriptor_0.id == CoAPFields.OPTION_DELTA + assert field_descriptor_0.length == 4 + assert field_descriptor_0.target_value == create_target_value(CoAPDefinitions.OPTION_DELTA_EXTENDED_8BITS, length=4) + + field_descriptor_1 = option_descriptors[1] + assert field_descriptor_1.id == CoAPFields.OPTION_LENGTH + assert field_descriptor_1.length == 4 + assert field_descriptor_1.target_value == create_target_value(1, length=4) + + field_descriptor_2 = option_descriptors[2] + assert field_descriptor_2.id == CoAPFields.OPTION_DELTA_EXTENDED + assert field_descriptor_2.matching_operator == MO.EQUAL + assert field_descriptor_2.compression_decompression_action == CDA.NOT_SENT + assert field_descriptor_2.length == 8 + assert field_descriptor_2.target_value == create_target_value(1, length=8) + + field_descriptor_3 = option_descriptors[3] + assert field_descriptor_3.id == CoAPFields.OPTION_VALUE + assert field_descriptor_3.matching_operator == MO.EQUAL + assert field_descriptor_3.compression_decompression_action == CDA.NOT_SENT + assert field_descriptor_3.length == 8 + assert field_descriptor_3.target_value == Buffer(content=b'\xff', length=8) + +def test_coap_syntactic_option_template_msb(): + option_descriptors = coap_option_template( + option_delta=6, + option_length=3, # option_length is longer that option_value in bytes + option_value=Buffer(content=b'\xff') ) - # Verify the number of fields - assert len(both_extended_descriptors) == 5 # All fields including both extended fields + assert len(option_descriptors) == 3 # OPTION_DELTA, OPTION_LENGTH, OPTION_VALUE + + field_descriptor_0 = option_descriptors[0] + assert field_descriptor_0.id == CoAPFields.OPTION_DELTA + assert field_descriptor_0.length == 4 + assert field_descriptor_0.target_value == create_target_value(6, length=4) + + field_descriptor_1 = option_descriptors[1] + assert field_descriptor_1.id == CoAPFields.OPTION_LENGTH + assert field_descriptor_1.length == 4 + assert field_descriptor_1.target_value == create_target_value(3, length=4) + + field_descriptor_2 = option_descriptors[2] + assert field_descriptor_2.id == CoAPFields.OPTION_VALUE + assert field_descriptor_2.matching_operator == MO.MSB + assert field_descriptor_2.compression_decompression_action == CDA.LSB + assert field_descriptor_2.length == 24 + assert field_descriptor_2.target_value == create_target_value(b'\xff', length=8) + + +def test_coap_syntactic_option_template_option_lenght_matchmapping(): + option_descriptors = coap_option_template( + option_delta=b'\x06', + option_length=[1, 2], + option_value=None + ) + + assert len(option_descriptors) == 3 # OPTION_DELTA, OPTION_LENGTH, OPTION_VALUE + + field_descriptor_0 = option_descriptors[0] + assert field_descriptor_0.id == CoAPFields.OPTION_DELTA + assert field_descriptor_0.length == 4 + assert field_descriptor_0.target_value == create_target_value(6, length=4) + + field_descriptor_1 = option_descriptors[1] + assert field_descriptor_1.id == CoAPFields.OPTION_LENGTH + assert field_descriptor_1.length == 4 + assert field_descriptor_1.target_value == create_target_value([1,2], length=4) + + field_descriptor_2 = option_descriptors[2] + assert field_descriptor_2.id == CoAPFields.OPTION_VALUE + assert field_descriptor_2.matching_operator == MO.IGNORE + assert field_descriptor_2.compression_decompression_action == CDA.VALUE_SENT + assert field_descriptor_2.length == 0 + assert field_descriptor_2.target_value == None - # Verify 16-bit extended fields - delta_extended_field = next(f for f in both_extended_descriptors if f.id == CoAPFields.OPTION_DELTA_EXTENDED) - assert delta_extended_field.length == 16 - assert delta_extended_field.target_value == create_target_value(269, length=16) - length_extended_field = next(f for f in both_extended_descriptors if f.id == CoAPFields.OPTION_LENGTH_EXTENDED) - assert length_extended_field.length == 16 - assert length_extended_field.target_value == create_target_value(269, length=16) \ No newline at end of file diff --git a/tests/tools/test_targetvalue.py b/tests/tools/test_targetvalue.py index 65202b8..fb7f7a3 100644 --- a/tests/tools/test_targetvalue.py +++ b/tests/tools/test_targetvalue.py @@ -141,11 +141,6 @@ def test_error_cases(): with pytest.raises(ValueError): create_target_value(42) - # Test unsupported type - with pytest.raises(TypeError): - create_target_value(None) - - # Test invalid mapping values with pytest.raises(TypeError): create_target_value({1: None}, length=8) # None can't be a Buffer