diff --git a/Cargo.toml b/Cargo.toml index 60148afa..8d0e1e21 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -30,6 +30,7 @@ warp = ["warp-lib", "bytes", "http", "hyper"] axum = ["bytes", "http", "hyper", "axum-lib", "http-body", "async-trait"] poem = ["bytes", "http", "poem-lib", "hyper", "async-trait"] nats = ["nats-lib"] +fe2o3-amqp = ["fe2o3-amqp-types"] [dependencies] serde = { version = "^1.0", features = ["derive"] } @@ -57,6 +58,7 @@ axum-lib = { version = "^0.5", optional = true , package="axum"} http-body = { version = "^0.4", optional = true } poem-lib = { version = "=1.2.34", optional = true, package = "poem" } nats-lib = { version = "0.21.0", optional = true, package = "nats" } +fe2o3-amqp-types = { version = "0.5.1", optional = true } [target."cfg(not(target_arch = \"wasm32\"))".dependencies] hostname = "^0.3" @@ -79,3 +81,4 @@ mockito = "0.25.1" tokio = { version = "^1.0", features = ["full"] } mime = "0.3" tower = { version = "0.4", features = ["util"] } +fe2o3-amqp = { version = "0.6.1" } diff --git a/example-projects/fe2o3-amqp-example/Cargo.toml b/example-projects/fe2o3-amqp-example/Cargo.toml new file mode 100644 index 00000000..ae8d8b46 --- /dev/null +++ b/example-projects/fe2o3-amqp-example/Cargo.toml @@ -0,0 +1,12 @@ +[package] +name = "fe2o3-amqp-example" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +cloudevents-sdk = { path = "../..", features = ["fe2o3-amqp"] } +fe2o3-amqp = "0.5.1" +tokio = { version = "1", features = ["macros", "net", "rt", "rt-multi-thread"] } +serde_json = "1" \ No newline at end of file diff --git a/example-projects/fe2o3-amqp-example/src/main.rs b/example-projects/fe2o3-amqp-example/src/main.rs new file mode 100644 index 00000000..5ee6f50f --- /dev/null +++ b/example-projects/fe2o3-amqp-example/src/main.rs @@ -0,0 +1,109 @@ +//! AMQP 1.0 binding example +//! +//! You need a running AMQP 1.0 broker to try out this example. +//! With docker: docker run -it --rm -e ARTEMIS_USERNAME=guest -e ARTEMIS_PASSWORD=guest -p 5672:5672 vromero/activemq-artemis + +use cloudevents::{ + binding::fe2o3_amqp::{EventMessage}, message::MessageDeserializer, Event, EventBuilder, + EventBuilderV10, AttributesReader, event::ExtensionValue, +}; +use fe2o3_amqp::{Connection, Receiver, Sender, Session}; +use serde_json::{json, from_slice, from_str}; + +type BoxError = Box; +type Result = std::result::Result; + +const EXAMPLE_TYPE: &str = "example.test"; +const EXAMPLE_SOURCE: &str = "localhost"; +const EXTENSION_NAME: &str = "ext-name"; +const EXTENSION_VALUE: &str = "AMQP"; + +async fn send_binary_event(sender: &mut Sender, i: usize, value: serde_json::Value) -> Result<()> { + let event = EventBuilderV10::new() + .id(i.to_string()) + .ty(EXAMPLE_TYPE) + .source(EXAMPLE_SOURCE) + .extension(EXTENSION_NAME, EXTENSION_VALUE) + .data("application/json", value) + .build()?; + let event_message = EventMessage::from_binary_event(event)?; + sender.send(event_message).await?.accepted_or("not accepted")?; + Ok(()) +} + +async fn send_structured_event(sender: &mut Sender, i: usize, value: serde_json::Value) -> Result<()> { + let event = EventBuilderV10::new() + .id(i.to_string()) + .ty(EXAMPLE_TYPE) + .source(EXAMPLE_SOURCE) + .extension(EXTENSION_NAME, EXTENSION_VALUE) + .data("application/json", value) + .build()?; + let event_message = EventMessage::from_structured_event(event)?; + sender.send(event_message).await?.accepted_or("not accepted")?; + Ok(()) +} + +async fn recv_event(receiver: &mut Receiver) -> Result { + let delivery = receiver.recv().await?; + receiver.accept(&delivery).await?; + + let event_message = EventMessage::from(delivery.into_message()); + let event = MessageDeserializer::into_event(event_message)?; + Ok(event) +} + +fn convert_data_into_json_value(data: &cloudevents::Data) -> Result { + let value = match data { + cloudevents::Data::Binary(bytes) => from_slice(bytes)?, + cloudevents::Data::String(s) => from_str(s)?, + cloudevents::Data::Json(value) => value.clone(), + }; + Ok(value) +} + +#[tokio::main] +async fn main() { + let mut connection = + Connection::open("cloudevents-sdk-rust", "amqp://guest:guest@localhost:5672") + .await + .unwrap(); + let mut session = Session::begin(&mut connection).await.unwrap(); + let mut sender = Sender::attach(&mut session, "sender", "q1").await.unwrap(); + let mut receiver = Receiver::attach(&mut session, "receiver", "q1") + .await + .unwrap(); + + let expected = json!({"hello": "world"}); + + // Binary content mode + send_binary_event(&mut sender, 1, expected.clone()).await.unwrap(); + let event = recv_event(&mut receiver).await.unwrap(); + let value = convert_data_into_json_value(event.data().unwrap()).unwrap(); + assert_eq!(event.id(), "1"); + assert_eq!(event.ty(), EXAMPLE_TYPE); + assert_eq!(event.source(), EXAMPLE_SOURCE); + match event.extension(EXTENSION_NAME).unwrap() { + ExtensionValue::String(value) => assert_eq!(value, EXTENSION_VALUE), + _ => panic!("Expect a String"), + } + assert_eq!(value, expected); + + // Structured content mode + send_structured_event(&mut sender, 2, expected.clone()).await.unwrap(); + let event = recv_event(&mut receiver).await.unwrap(); + let value = convert_data_into_json_value(event.data().unwrap()).unwrap(); + assert_eq!(event.id(), "2"); + assert_eq!(event.ty(), EXAMPLE_TYPE); + assert_eq!(event.source(), EXAMPLE_SOURCE); + match event.extension(EXTENSION_NAME).unwrap() { + ExtensionValue::String(value) => assert_eq!(value, EXTENSION_VALUE), + _ => panic!("Expect a String"), + } + assert_eq!(value, expected); + + sender.close().await.unwrap(); + receiver.close().await.unwrap(); + session.end().await.unwrap(); + connection.close().await.unwrap(); +} diff --git a/src/binding/fe2o3_amqp/constants.rs b/src/binding/fe2o3_amqp/constants.rs new file mode 100644 index 00000000..dddf1c07 --- /dev/null +++ b/src/binding/fe2o3_amqp/constants.rs @@ -0,0 +1,24 @@ +// Required +pub(super) const ID: &str = "id"; +pub(super) const SOURCE: &str = "source"; +pub(super) const SPECVERSION: &str = "specversion"; +pub(super) const TYPE: &str = "type"; + +// Optional +pub(super) const DATACONTENTTYPE: &str = "datacontenttype"; +pub(super) const DATASCHEMA: &str = "dataschema"; +pub(super) const SUBJECT: &str = "subject"; +pub(super) const TIME: &str = "time"; + +pub(super) mod prefixed { + // Required + pub const ID: &str = "cloudEvents:id"; + pub const SOURCE: &str = "cloudEvents:source"; + pub const SPECVERSION: &str = "cloudEvents:specversion"; + pub const TYPE: &str = "cloudEvents:type"; + + // Optional + pub const DATASCHEMA: &str = "cloudEvents:dataschema"; + pub const SUBJECT: &str = "cloudEvents:subject"; + pub const TIME: &str = "cloudEvents:time"; +} diff --git a/src/binding/fe2o3_amqp/deserializer.rs b/src/binding/fe2o3_amqp/deserializer.rs new file mode 100644 index 00000000..3590e56c --- /dev/null +++ b/src/binding/fe2o3_amqp/deserializer.rs @@ -0,0 +1,104 @@ +use std::convert::TryFrom; + +use fe2o3_amqp_types::primitives::{SimpleValue, Symbol}; + +use crate::{ + binding::CLOUDEVENTS_JSON_HEADER, + event::SpecVersion, + message::{ + BinaryDeserializer, BinarySerializer, Encoding, Error, MessageAttributeValue, + MessageDeserializer, Result, StructuredDeserializer, StructuredSerializer, + }, +}; + +use super::{ + constants::{prefixed, DATACONTENTTYPE}, + EventMessage, ATTRIBUTE_PREFIX, +}; + +impl BinaryDeserializer for EventMessage { + fn deserialize_binary>( + mut self, + mut serializer: V, + ) -> Result { + use fe2o3_amqp_types::messaging::Body; + + // specversion + let spec_version = { + let value = self + .application_properties + .as_mut() + .ok_or(Error::WrongEncoding {})? + .remove(prefixed::SPECVERSION) + .ok_or(Error::WrongEncoding {}) + .map(|val| match val { + SimpleValue::String(s) => Ok(s), + _ => Err(Error::WrongEncoding {}), + })??; + SpecVersion::try_from(&value[..])? + }; + serializer = serializer.set_spec_version(spec_version.clone())?; + + // datacontenttype + serializer = match self.content_type { + Some(Symbol(content_type)) => serializer + .set_attribute(DATACONTENTTYPE, MessageAttributeValue::String(content_type))?, + None => serializer, + }; + + // remaining attributes + let attributes = spec_version.attribute_names(); + + if let Some(application_properties) = self.application_properties { + for (key, value) in application_properties.0.into_iter() { + if let Some(key) = key.strip_prefix(ATTRIBUTE_PREFIX) { + if attributes.contains(&key) { + let value = MessageAttributeValue::try_from((key, value))?; + serializer = serializer.set_attribute(key, value)?; + } else { + let value = MessageAttributeValue::try_from(value)?; + serializer = serializer.set_extension(key, value)?; + } + } + } + } + + match self.body { + Body::Data(data) => { + let bytes = data.0.into_vec(); + serializer.end_with_data(bytes) + } + Body::Empty => serializer.end(), + Body::Sequence(_) | Body::Value(_) => Err(Error::WrongEncoding {}), + } + } +} + +impl StructuredDeserializer for EventMessage { + fn deserialize_structured>( + self, + serializer: V, + ) -> Result { + use fe2o3_amqp_types::messaging::Body; + let bytes = match self.body { + Body::Data(data) => data.0.into_vec(), + Body::Empty => vec![], + Body::Sequence(_) | Body::Value(_) => return Err(Error::WrongEncoding {}), + }; + serializer.set_structured_event(bytes) + } +} + +impl MessageDeserializer for EventMessage { + fn encoding(&self) -> Encoding { + match self + .content_type + .as_ref() + .map(|s| s.starts_with(CLOUDEVENTS_JSON_HEADER)) + { + Some(true) => Encoding::STRUCTURED, + Some(false) => Encoding::BINARY, + None => Encoding::UNKNOWN, + } + } +} diff --git a/src/binding/fe2o3_amqp/mod.rs b/src/binding/fe2o3_amqp/mod.rs new file mode 100644 index 00000000..c696b06d --- /dev/null +++ b/src/binding/fe2o3_amqp/mod.rs @@ -0,0 +1,360 @@ +//! This module integrated the [cloudevents-sdk](https://docs.rs/cloudevents-sdk) with +//! [fe2o3-amqp](https://docs.rs/fe2o3-amqp/) to easily send and receive CloudEvents +//! +//! To send CloudEvents +//! +//! ```rust +//! use serde_json::json; +//! use fe2o3_amqp::{Connection, Sender, Session}; +//! use cloudevents::{ +//! EventBuilder, EventBuilderV10, +//! binding::fe2o3_amqp::{EventMessage, AmqpMessage} +//! }; +//! +//! // You need a running AMQP 1.0 broker to try out this example. +//! // With docker: docker run -it --rm -e ARTEMIS_USERNAME=guest -e ARTEMIS_PASSWORD=guest -p 5672:5672 vromero/activemq-artemis +//! +//! # async fn send_event() { +//! let mut connection = +//! Connection::open("cloudevents-sdk-rust", "amqp://guest:guest@localhost:5672") +//! .await +//! .unwrap(); +//! let mut session = Session::begin(&mut connection).await.unwrap(); +//! let mut sender = Sender::attach(&mut session, "sender", "q1").await.unwrap(); +//! +//! let event = EventBuilderV10::new() +//! .id("1") +//! .ty("example.test") +//! .source("localhost") +//! .extension("ext-name", "AMQP") +//! .data("application/json", json!({"hello": "world"})) +//! .build() +//! .unwrap(); +//! +//! let event_message = EventMessage::from_binary_event(event).unwrap(); +//! let message = AmqpMessage::from(event_message); +//! sender.send(message).await.unwrap() +//! .accepted_or("not accepted").unwrap(); +//! +//! sender.close().await.unwrap(); +//! session.end().await.unwrap(); +//! connection.close().await.unwrap(); +//! # } +//! ``` +//! +//! To receiver CloudEvents +//! +//! ```rust +//! use fe2o3_amqp::{Connection, Receiver, Session}; +//! use cloudevents::{ +//! EventBuilderV10, message::MessageDeserializer, +//! binding::fe2o3_amqp::{EventMessage, AmqpMessage} +//! }; +//! +//! // You need a running AMQP 1.0 broker to try out this example. +//! // With docker: docker run -it --rm -e ARTEMIS_USERNAME=guest -e ARTEMIS_PASSWORD=guest -p 5672:5672 vromero/activemq-artemis +//! +//! # async fn receive_event() { +//! let mut connection = +//! Connection::open("cloudevents-sdk-rust", "amqp://guest:guest@localhost:5672") +//! .await +//! .unwrap(); +//! let mut session = Session::begin(&mut connection).await.unwrap(); +//! let mut receiver = Receiver::attach(&mut session, "receiver", "q1").await.unwrap(); +//! +//! let delivery = receiver.recv().await.unwrap(); +//! receiver.accept(&delivery).await.unwrap(); +//! +//! let message: AmqpMessage = delivery.into_message(); +//! let event_message = EventMessage::from(message); +//! let event = MessageDeserializer::into_event(event_message).unwrap(); +//! +//! receiver.close().await.unwrap(); +//! session.end().await.unwrap(); +//! connection.close().await.unwrap(); +//! # } +//! ``` + +use std::convert::TryFrom; + +use chrono::{TimeZone, Utc}; +use fe2o3_amqp_types::messaging::{ApplicationProperties, Body, Message, Properties}; +use fe2o3_amqp_types::primitives::{Binary, SimpleValue, Symbol, Timestamp, Value}; + +use crate::event::AttributeValue; +use crate::message::{BinaryDeserializer, Error, MessageAttributeValue, StructuredDeserializer}; +use crate::Event; + +use self::constants::{ + prefixed, DATACONTENTTYPE, DATASCHEMA, ID, SOURCE, SPECVERSION, SUBJECT, TIME, TYPE, +}; + +const ATTRIBUTE_PREFIX: &str = "cloudEvents:"; + +pub mod deserializer; +pub mod serializer; + +mod constants; + +/// Type alias for an AMQP 1.0 message +/// +/// The generic parameter can be anything that implements `Serialize` and `Deserialize` but is of +/// no importance because all CloudEvents are using the `Body::Data` as the body section type. For +/// convenience, this type alias chooses `Value` as the value of the generic parameter +pub type AmqpMessage = Message; + +/// Type alias for an AMQP 1.0 Body +/// +/// The generic parameter can be anything that implements `Serialize` and `Deserialize` but is of +/// no importance because all CloudEvents are using the `Body::Data` as the body section type. For +/// convenience, this type alias chooses `Value` as the value of the generic parameter +pub type AmqpBody = Body; + +/// This struct contains the necessary fields required for AMQP 1.0 binding. +/// It provides conversion between [`Event`] and [`AmqpMessage`] +/// +/// # Examples +/// +/// ## [`Event`] -> [`AmqpMessage`] in binary content mode +/// +/// ```rust +/// use serde_json::json; +/// use fe2o3_amqp_types::messaging::Message; +/// use cloudevents::{EventBuilder, EventBuilderV10, binding::fe2o3_amqp::EventMessage}; +/// +/// let event = EventBuilderV10::new() +/// .id("1") +/// .ty("example.test") +/// .source("localhost") +/// .extension("ext-name", "AMQP") +/// .data("application/json", json!({"hello": "world"})) +/// .build() +/// .unwrap(); +/// let event_message = EventMessage::from_binary_event(event).unwrap(); +/// let amqp_message = Message::from(event_message); +/// ``` +/// +/// ## [`Event`] -> [`AmqpMessage`] in structured content mode +/// +/// ```rust +/// use serde_json::json; +/// use fe2o3_amqp_types::messaging::Message; +/// use cloudevents::{EventBuilder, EventBuilderV10, binding::fe2o3_amqp::EventMessage}; +/// +/// let event = EventBuilderV10::new() +/// .id("1") +/// .ty("example.test") +/// .source("localhost") +/// .extension("ext-name", "AMQP") +/// .data("application/json", json!({"hello": "world"})) +/// .build() +/// .unwrap(); +/// let event_message = EventMessage::from_structured_event(event).unwrap(); +/// let amqp_message = Message::from(event_message); +/// ``` +/// +/// ## [`AmqpMessage`] -> [`Event`] +/// +/// ```rust +/// use fe2o3_amqp::Receiver; +/// use cloudevents::{ +/// message::MessageDeserializer, +/// binding::fe2o3_amqp::{AmqpMessage, EventMessage} +/// }; +/// +/// # async fn receive_event(receiver: &mut Receiver) { +/// let delivery = receiver.recv().await.unwrap(); +/// receiver.accept(&delivery).await.unwrap(); +/// let amqp_message: AmqpMessage = delivery.into_message(); +/// let event_message = EventMessage::from(amqp_message); +/// let event = MessageDeserializer::into_event(event_message).unwrap(); +/// # } +/// ``` +pub struct EventMessage { + pub content_type: Option, + pub application_properties: Option, + pub body: AmqpBody, +} + +impl EventMessage { + fn new() -> Self { + Self { + content_type: None, + application_properties: None, + body: Body::Empty, + } + } + + /// Create an [`EventMessage`] from an event using a binary serializer + pub fn from_binary_event(event: Event) -> Result { + BinaryDeserializer::deserialize_binary(event, Self::new()) + } + + /// Create an [`EventMessage`] from an event using a structured serializer + pub fn from_structured_event(event: Event) -> Result { + StructuredDeserializer::deserialize_structured(event, Self::new()) + } +} + +impl From for AmqpMessage { + fn from(event: EventMessage) -> Self { + let properties = Properties { + content_type: event.content_type, + ..Default::default() + }; + Message { + header: None, + delivery_annotations: None, + message_annotations: None, + properties: Some(properties), + application_properties: event.application_properties, + body: event.body, + footer: None, + } + } +} + +impl From for EventMessage { + fn from(message: AmqpMessage) -> Self { + let content_type = message.properties.and_then(|p| p.content_type); + Self { + content_type, + application_properties: message.application_properties, + body: message.body, + } + } +} + +impl<'a> From> for SimpleValue { + fn from(value: AttributeValue) -> Self { + match value { + AttributeValue::SpecVersion(spec_ver) => { + SimpleValue::String(String::from(spec_ver.as_str())) + } + AttributeValue::String(s) => SimpleValue::String(String::from(s)), + AttributeValue::URI(uri) => SimpleValue::String(String::from(uri.as_str())), + AttributeValue::URIRef(uri) => SimpleValue::String(uri.clone()), + AttributeValue::Boolean(val) => SimpleValue::Bool(*val), + AttributeValue::Integer(val) => SimpleValue::Long(*val), + AttributeValue::Time(datetime) => { + let millis = datetime.timestamp_millis(); + let timestamp = Timestamp::from_milliseconds(millis); + SimpleValue::Timestamp(timestamp) + } + } + } +} + +impl<'a> From> for Value { + fn from(value: AttributeValue) -> Self { + match value { + AttributeValue::SpecVersion(spec_ver) => Value::String(String::from(spec_ver.as_str())), + AttributeValue::String(s) => Value::String(String::from(s)), + AttributeValue::URI(uri) => Value::String(String::from(uri.as_str())), + AttributeValue::URIRef(uri) => Value::String(uri.clone()), + AttributeValue::Boolean(val) => Value::Bool(*val), + AttributeValue::Integer(val) => Value::Long(*val), + AttributeValue::Time(datetime) => { + let millis = datetime.timestamp_millis(); + let timestamp = Timestamp::from_milliseconds(millis); + Value::Timestamp(timestamp) + } + } + } +} + +impl From for SimpleValue { + fn from(value: MessageAttributeValue) -> Self { + match value { + MessageAttributeValue::String(s) => SimpleValue::String(s), + MessageAttributeValue::Uri(uri) => SimpleValue::String(String::from(uri.as_str())), + MessageAttributeValue::UriRef(uri) => SimpleValue::String(uri), + MessageAttributeValue::Boolean(val) => SimpleValue::Bool(val), + MessageAttributeValue::Integer(val) => SimpleValue::Long(val), + MessageAttributeValue::DateTime(datetime) => { + let millis = datetime.timestamp_millis(); + let timestamp = Timestamp::from_milliseconds(millis); + SimpleValue::Timestamp(timestamp) + } + MessageAttributeValue::Binary(val) => SimpleValue::Binary(Binary::from(val)), + } + } +} + +impl From for Value { + fn from(value: MessageAttributeValue) -> Self { + match value { + MessageAttributeValue::String(s) => Value::String(s), + MessageAttributeValue::Uri(uri) => Value::String(String::from(uri.as_str())), + MessageAttributeValue::UriRef(uri) => Value::String(uri), + MessageAttributeValue::Boolean(val) => Value::Bool(val), + MessageAttributeValue::Integer(val) => Value::Long(val), + MessageAttributeValue::DateTime(datetime) => { + let millis = datetime.timestamp_millis(); + let timestamp = Timestamp::from_milliseconds(millis); + Value::Timestamp(timestamp) + } + MessageAttributeValue::Binary(val) => Value::Binary(Binary::from(val)), + } + } +} + +impl TryFrom for MessageAttributeValue { + type Error = Error; + + fn try_from(value: SimpleValue) -> Result { + match value { + SimpleValue::Bool(val) => Ok(MessageAttributeValue::Boolean(val)), + SimpleValue::Long(val) => Ok(MessageAttributeValue::Integer(val)), + SimpleValue::Timestamp(val) => { + let datetime = Utc.timestamp_millis(val.into_inner()); + Ok(MessageAttributeValue::DateTime(datetime)) + } + SimpleValue::Binary(val) => Ok(MessageAttributeValue::Binary(val.into_vec())), + SimpleValue::String(val) => Ok(MessageAttributeValue::String(val)), + _ => Err(Error::WrongEncoding {}), + } + } +} + +impl<'a> TryFrom<(&'a str, SimpleValue)> for MessageAttributeValue { + type Error = Error; + + fn try_from((key, value): (&'a str, SimpleValue)) -> Result { + match key { + // String + ID | prefixed::ID + // String + | SPECVERSION | prefixed::SPECVERSION + // String + | TYPE | prefixed::TYPE + // String + | DATACONTENTTYPE + // String + | SUBJECT | prefixed::SUBJECT => { + let val = String::try_from(value).map_err(|_| Error::WrongEncoding {})?; + Ok(MessageAttributeValue::String(val)) + }, + // URI-reference + SOURCE | prefixed::SOURCE => { + let val = String::try_from(value).map_err(|_| Error::WrongEncoding {})?; + Ok(MessageAttributeValue::UriRef(val)) + }, + // URI + DATASCHEMA | prefixed::DATASCHEMA => { + let val = String::try_from(value).map_err(|_| Error::WrongEncoding { })?; + let url_val = url::Url::parse(&val)?; + Ok(MessageAttributeValue::Uri(url_val)) + } + // Timestamp + TIME | prefixed::TIME => { + let val = Timestamp::try_from(value).map_err(|_| Error::WrongEncoding { })?; + let datetime = Utc.timestamp_millis(val.into_inner()); + Ok(MessageAttributeValue::DateTime(datetime)) + } + _ => { + MessageAttributeValue::try_from(value) + } + } + } +} diff --git a/src/binding/fe2o3_amqp/serializer.rs b/src/binding/fe2o3_amqp/serializer.rs new file mode 100644 index 00000000..66051d42 --- /dev/null +++ b/src/binding/fe2o3_amqp/serializer.rs @@ -0,0 +1,88 @@ +use fe2o3_amqp_types::messaging::{ApplicationProperties, Data as AmqpData}; +use fe2o3_amqp_types::primitives::{Binary, SimpleValue, Symbol}; + +use crate::binding::header_prefix; +use crate::message::StructuredSerializer; +use crate::{ + event::SpecVersion, + message::{BinarySerializer, Error, MessageAttributeValue}, +}; + +use super::constants::DATACONTENTTYPE; +use super::{AmqpBody, EventMessage, ATTRIBUTE_PREFIX}; + +impl BinarySerializer for EventMessage { + fn set_spec_version(mut self, spec_version: SpecVersion) -> crate::message::Result { + let key = String::from("cloudEvents:specversion"); + let value = String::from(spec_version.as_str()); + self.application_properties + .get_or_insert(ApplicationProperties::default()) + .insert(key, SimpleValue::from(value)); + Ok(self) + } + + fn set_attribute( + mut self, + name: &str, + value: MessageAttributeValue, + ) -> crate::message::Result { + // For the binary mode, the AMQP content-type property field value maps directly to the + // CloudEvents datacontenttype attribute. + // + // All CloudEvents attributes with exception of datacontenttype MUST be individually mapped + // to and from the AMQP application-properties section. + if name == DATACONTENTTYPE { + self.content_type = match value { + MessageAttributeValue::String(s) => Some(Symbol::from(s)), + _ => return Err(Error::WrongEncoding {}), + } + } else { + // CloudEvent attributes are prefixed with "cloudEvents:" for use in the + // application-properties section + let key = header_prefix(ATTRIBUTE_PREFIX, name); + let value = SimpleValue::from(value); + self.application_properties + .get_or_insert(ApplicationProperties::default()) + .insert(key, value); + } + + Ok(self) + } + + // Extension attributes are always serialized according to binding rules like standard + // attributes. However this specification does not prevent an extension from copying event + // attribute values to other parts of a message, in order to interact with non-CloudEvents + // systems that also process the message. Extension specifications that do this SHOULD specify + // how receivers are to interpret messages if the copied values differ from the cloud-event + // serialized values. + fn set_extension( + mut self, + name: &str, + value: MessageAttributeValue, + ) -> crate::message::Result { + let key = header_prefix(ATTRIBUTE_PREFIX, name); + let value = SimpleValue::from(value); + self.application_properties + .get_or_insert(ApplicationProperties::default()) + .insert(key, value); + Ok(self) + } + + fn end_with_data(mut self, bytes: Vec) -> crate::message::Result { + let data = Binary::from(bytes); + self.body = AmqpBody::Data(AmqpData(data)); + Ok(self) + } + + fn end(self) -> crate::message::Result { + Ok(self) + } +} + +impl StructuredSerializer for EventMessage { + fn set_structured_event(mut self, bytes: Vec) -> crate::message::Result { + self.content_type = Some(Symbol::from("application/cloudevents+json; charset=utf-8")); + self.body = AmqpBody::Data(AmqpData(Binary::from(bytes))); + Ok(self) + } +} diff --git a/src/binding/mod.rs b/src/binding/mod.rs index abb0388a..9c2736f4 100644 --- a/src/binding/mod.rs +++ b/src/binding/mod.rs @@ -6,6 +6,9 @@ pub mod actix; #[cfg_attr(docsrs, doc(cfg(feature = "axum")))] #[cfg(feature = "axum")] pub mod axum; +#[cfg_attr(docsrs, doc(cfg(feature = "fe2o3-amqp")))] +#[cfg(feature = "fe2o3-amqp")] +pub mod fe2o3_amqp; #[cfg_attr( docsrs,