Skip to content

Commit eb9f898

Browse files
committed
fix: updated kafka/conversion.py and test cases to check for valid attributes
Signed-off-by: vivjd <[email protected]>
1 parent ac8255f commit eb9f898

File tree

2 files changed

+11
-8
lines changed

2 files changed

+11
-8
lines changed

cloudevents/kafka/conversion.py

+5-2
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,7 @@ def from_binary(
126126
for header, value in message.headers.items():
127127
header = header.lower()
128128
if header == "content-type":
129-
attributes["content-type"] = value.decode()
129+
attributes["datacontenttype"] = value.decode()
130130
elif header.startswith("ce_"):
131131
attributes[header[3:]] = value.decode()
132132

@@ -255,7 +255,10 @@ def from_structured(
255255
attributes[name] = decoded_value
256256

257257
for header, val in message.headers.items():
258-
attributes[header.lower()] = val.decode()
258+
if header.lower() == "content-type":
259+
attributes["datacontenttype"] = val.decode()
260+
else:
261+
attributes[header.lower()] = val.decode()
259262
if event_type:
260263
result = event_type.create(attributes, data)
261264
else:

cloudevents/tests/test_kafka_conversions.py

+6-6
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,7 @@ def test_sets_headers(self, source_event):
123123
assert result.headers["ce_source"] == source_event["source"].encode("utf-8")
124124
assert result.headers["ce_type"] == source_event["type"].encode("utf-8")
125125
assert result.headers["ce_time"] == source_event["time"].encode("utf-8")
126-
assert result.headers["content-type"] == source_event["content-type"].encode(
126+
assert result.headers["content-type"] == source_event["datacontenttype"].encode(
127127
"utf-8"
128128
)
129129
assert "data" not in result.headers
@@ -163,7 +163,7 @@ def source_binary_bytes_message(self) -> KafkaMessage:
163163
"ce_time": datetime.datetime(2000, 1, 1, 6, 42, 33)
164164
.isoformat()
165165
.encode("utf-8"),
166-
"content-type": "foo".encode("utf-8"),
166+
"datacontenttype": "foo".encode("utf-8"),
167167
},
168168
value=simple_serialize(self.expected_data),
169169
key="test_key_123",
@@ -205,7 +205,7 @@ def test_sets_attrs_from_headers(self, source_binary_json_message):
205205
assert result["type"] == source_binary_json_message.headers["ce_type"].decode()
206206
assert result["time"] == source_binary_json_message.headers["ce_time"].decode()
207207
assert (
208-
result["content-type"]
208+
result["datacontenttype"]
209209
== source_binary_json_message.headers["content-type"].decode()
210210
)
211211

@@ -328,7 +328,7 @@ def test_no_key(self, source_event):
328328
def test_sets_headers(self, source_event):
329329
result = to_structured(source_event)
330330
assert len(result.headers) == 1
331-
assert result.headers["content-type"] == source_event["content-type"].encode(
331+
assert result.headers["content-type"] == source_event["datacontenttype"].encode(
332332
"utf-8"
333333
)
334334

@@ -474,7 +474,7 @@ def test_sets_content_type_default_envelope_unmarshaller(
474474
):
475475
result = from_structured(source_structured_json_message)
476476
assert (
477-
result["content-type"]
477+
result["datacontenttype"]
478478
== source_structured_json_message.headers["content-type"].decode()
479479
)
480480

@@ -487,7 +487,7 @@ def test_sets_content_type_custom_envelope_unmarshaller(
487487
envelope_unmarshaller=custom_unmarshaller,
488488
)
489489
assert (
490-
result["content-type"]
490+
result["datacontenttype"]
491491
== source_structured_bytes_bytes_message.headers["content-type"].decode()
492492
)
493493

0 commit comments

Comments
 (0)