Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Modified content-type to abide by attribute naming conventions for cloudevents #232

Merged
merged 5 commits into from
May 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 10 additions & 7 deletions cloudevents/kafka/conversion.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,10 +87,10 @@ def to_binary(
)

headers = {}
if event["content-type"]:
headers["content-type"] = event["content-type"].encode("utf-8")
if event["datacontenttype"]:
headers["content-type"] = event["datacontenttype"].encode("utf-8")
for attr, value in event.get_attributes().items():
if attr not in ["data", "partitionkey", "content-type"]:
if attr not in ["data", "partitionkey", "datacontenttype"]:
if value is not None:
headers["ce_{0}".format(attr)] = value.encode("utf-8")

Expand Down Expand Up @@ -126,7 +126,7 @@ def from_binary(
for header, value in message.headers.items():
header = header.lower()
if header == "content-type":
attributes["content-type"] = value.decode()
attributes["datacontenttype"] = value.decode()
elif header.startswith("ce_"):
attributes[header[3:]] = value.decode()

Expand Down Expand Up @@ -189,8 +189,8 @@ def to_structured(
attrs["data"] = data

headers = {}
if "content-type" in attrs:
headers["content-type"] = attrs.pop("content-type").encode("utf-8")
if "datacontenttype" in attrs:
headers["content-type"] = attrs.pop("datacontenttype").encode("utf-8")

try:
value = envelope_marshaller(attrs)
Expand Down Expand Up @@ -255,7 +255,10 @@ def from_structured(
attributes[name] = decoded_value

for header, val in message.headers.items():
attributes[header.lower()] = val.decode()
if header.lower() == "content-type":
attributes["datacontenttype"] = val.decode()
else:
attributes[header.lower()] = val.decode()
if event_type:
result = event_type.create(attributes, data)
else:
Expand Down
14 changes: 7 additions & 7 deletions cloudevents/tests/test_kafka_conversions.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ def source_event(self) -> CloudEvent:
"source": "pytest",
"type": "com.pytest.test",
"time": datetime.datetime(2000, 1, 1, 6, 42, 33).isoformat(),
"content-type": "foo",
"datacontenttype": "foo",
"partitionkey": "test_key_123",
},
data=self.expected_data,
Expand Down Expand Up @@ -123,7 +123,7 @@ def test_sets_headers(self, source_event):
assert result.headers["ce_source"] == source_event["source"].encode("utf-8")
assert result.headers["ce_type"] == source_event["type"].encode("utf-8")
assert result.headers["ce_time"] == source_event["time"].encode("utf-8")
assert result.headers["content-type"] == source_event["content-type"].encode(
assert result.headers["content-type"] == source_event["datacontenttype"].encode(
"utf-8"
)
assert "data" not in result.headers
Expand Down Expand Up @@ -163,7 +163,7 @@ def source_binary_bytes_message(self) -> KafkaMessage:
"ce_time": datetime.datetime(2000, 1, 1, 6, 42, 33)
.isoformat()
.encode("utf-8"),
"content-type": "foo".encode("utf-8"),
"datacontenttype": "foo".encode("utf-8"),
},
value=simple_serialize(self.expected_data),
key="test_key_123",
Expand Down Expand Up @@ -205,7 +205,7 @@ def test_sets_attrs_from_headers(self, source_binary_json_message):
assert result["type"] == source_binary_json_message.headers["ce_type"].decode()
assert result["time"] == source_binary_json_message.headers["ce_time"].decode()
assert (
result["content-type"]
result["datacontenttype"]
== source_binary_json_message.headers["content-type"].decode()
)

Expand Down Expand Up @@ -328,7 +328,7 @@ def test_no_key(self, source_event):
def test_sets_headers(self, source_event):
result = to_structured(source_event)
assert len(result.headers) == 1
assert result.headers["content-type"] == source_event["content-type"].encode(
assert result.headers["content-type"] == source_event["datacontenttype"].encode(
"utf-8"
)

Expand Down Expand Up @@ -474,7 +474,7 @@ def test_sets_content_type_default_envelope_unmarshaller(
):
result = from_structured(source_structured_json_message)
assert (
result["content-type"]
result["datacontenttype"]
== source_structured_json_message.headers["content-type"].decode()
)

Expand All @@ -487,7 +487,7 @@ def test_sets_content_type_custom_envelope_unmarshaller(
envelope_unmarshaller=custom_unmarshaller,
)
assert (
result["content-type"]
result["datacontenttype"]
== source_structured_bytes_bytes_message.headers["content-type"].decode()
)

Expand Down
Loading