diff --git a/cloudevents/kafka/conversion.py b/cloudevents/kafka/conversion.py index 832594d1..97c355f2 100644 --- a/cloudevents/kafka/conversion.py +++ b/cloudevents/kafka/conversion.py @@ -87,10 +87,10 @@ def to_binary( ) headers = {} - if event["content-type"]: - headers["content-type"] = event["content-type"].encode("utf-8") + if event["datacontenttype"]: + headers["content-type"] = event["datacontenttype"].encode("utf-8") for attr, value in event.get_attributes().items(): - if attr not in ["data", "partitionkey", "content-type"]: + if attr not in ["data", "partitionkey", "datacontenttype"]: if value is not None: headers["ce_{0}".format(attr)] = value.encode("utf-8") @@ -126,7 +126,7 @@ def from_binary( for header, value in message.headers.items(): header = header.lower() if header == "content-type": - attributes["content-type"] = value.decode() + attributes["datacontenttype"] = value.decode() elif header.startswith("ce_"): attributes[header[3:]] = value.decode() @@ -189,8 +189,8 @@ def to_structured( attrs["data"] = data headers = {} - if "content-type" in attrs: - headers["content-type"] = attrs.pop("content-type").encode("utf-8") + if "datacontenttype" in attrs: + headers["content-type"] = attrs.pop("datacontenttype").encode("utf-8") try: value = envelope_marshaller(attrs) @@ -255,7 +255,10 @@ def from_structured( attributes[name] = decoded_value for header, val in message.headers.items(): - attributes[header.lower()] = val.decode() + if header.lower() == "content-type": + attributes["datacontenttype"] = val.decode() + else: + attributes[header.lower()] = val.decode() if event_type: result = event_type.create(attributes, data) else: diff --git a/cloudevents/tests/test_kafka_conversions.py b/cloudevents/tests/test_kafka_conversions.py index 696e75cb..5580773a 100644 --- a/cloudevents/tests/test_kafka_conversions.py +++ b/cloudevents/tests/test_kafka_conversions.py @@ -59,7 +59,7 @@ def source_event(self) -> CloudEvent: "source": "pytest", "type": "com.pytest.test", "time": datetime.datetime(2000, 1, 1, 6, 42, 33).isoformat(), - "content-type": "foo", + "datacontenttype": "foo", "partitionkey": "test_key_123", }, data=self.expected_data, @@ -123,7 +123,7 @@ def test_sets_headers(self, source_event): assert result.headers["ce_source"] == source_event["source"].encode("utf-8") assert result.headers["ce_type"] == source_event["type"].encode("utf-8") assert result.headers["ce_time"] == source_event["time"].encode("utf-8") - assert result.headers["content-type"] == source_event["content-type"].encode( + assert result.headers["content-type"] == source_event["datacontenttype"].encode( "utf-8" ) assert "data" not in result.headers @@ -163,7 +163,7 @@ def source_binary_bytes_message(self) -> KafkaMessage: "ce_time": datetime.datetime(2000, 1, 1, 6, 42, 33) .isoformat() .encode("utf-8"), - "content-type": "foo".encode("utf-8"), + "datacontenttype": "foo".encode("utf-8"), }, value=simple_serialize(self.expected_data), key="test_key_123", @@ -205,7 +205,7 @@ def test_sets_attrs_from_headers(self, source_binary_json_message): assert result["type"] == source_binary_json_message.headers["ce_type"].decode() assert result["time"] == source_binary_json_message.headers["ce_time"].decode() assert ( - result["content-type"] + result["datacontenttype"] == source_binary_json_message.headers["content-type"].decode() ) @@ -328,7 +328,7 @@ def test_no_key(self, source_event): def test_sets_headers(self, source_event): result = to_structured(source_event) assert len(result.headers) == 1 - assert result.headers["content-type"] == source_event["content-type"].encode( + assert result.headers["content-type"] == source_event["datacontenttype"].encode( "utf-8" ) @@ -474,7 +474,7 @@ def test_sets_content_type_default_envelope_unmarshaller( ): result = from_structured(source_structured_json_message) assert ( - result["content-type"] + result["datacontenttype"] == source_structured_json_message.headers["content-type"].decode() ) @@ -487,7 +487,7 @@ def test_sets_content_type_custom_envelope_unmarshaller( envelope_unmarshaller=custom_unmarshaller, ) assert ( - result["content-type"] + result["datacontenttype"] == source_structured_bytes_bytes_message.headers["content-type"].decode() )