Skip to content

Commit

Permalink
Removed builders from eventhubs; removed impl Into<String> from event…
Browse files Browse the repository at this point in the history
…hubs and AMQP; Fixed time conversion issue with 1-1-0001 (#1892)

* Removed builders from eventhubs; removed impl Into<String> from eventhubs and AMQP; Fixed time conversion issue with 1-1-0001

* Get, remove, contains_key don't require symmetrical eq
  • Loading branch information
LarryOsterman authored Nov 8, 2024
1 parent 817434d commit 9908ea7
Show file tree
Hide file tree
Showing 30 changed files with 723 additions and 956 deletions.
23 changes: 12 additions & 11 deletions sdk/core/azure_core_amqp/src/cbs.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
// Copyright (c) Microsoft Corporation. All Rights reserved
// Licensed under the MIT license.
// cspell: words amqp sasl
// cspell: words amqp sasl sastoken

use azure_core::error::Result;
use std::fmt::Debug;

use super::session::AmqpSession;

Expand Down Expand Up @@ -37,9 +36,10 @@ pub trait AmqpClaimsBasedSecurityApis {
///
/// # Parameters
///
/// - `path`: A `String` reference representing the AMQP path to be authorized.
/// - `secret`: An implementor of `Into<String>` representing the secret used for authorization. This is typically a JSON Web token.
/// - `expires_on`: A `time::OffsetDateTime` representing the expiration time of the authorization.
/// - `path`: A string representing the AMQP path to be authorized.
/// - `token_type`: An optional string representing the type of token used for authorization. This is either "servicebus.windows.net:sastoken" or "jwt". If it is not supplied, "jwt" is assumed.
/// - `secret`: A string representing the secret used for authorization. This is typically a JSON Web token.
/// - `expires_on`: The expiration time of the authorization.
///
/// # Returns
///
Expand All @@ -49,13 +49,13 @@ pub trait AmqpClaimsBasedSecurityApis {
///
fn authorize_path(
&self,
path: impl Into<String> + Debug,
secret: impl Into<String>,
path: String,
token_type: Option<String>,
secret: String,
expires_on: time::OffsetDateTime,
) -> impl std::future::Future<Output = Result<()>>;
}

#[derive(Debug)]
pub struct AmqpClaimsBasedSecurity<'a> {
implementation: CbsImplementation<'a>,
}
Expand All @@ -71,12 +71,13 @@ impl<'a> AmqpClaimsBasedSecurity<'a> {
impl<'a> AmqpClaimsBasedSecurityApis for AmqpClaimsBasedSecurity<'a> {
async fn authorize_path(
&self,
path: impl Into<String> + Debug,
secret: impl Into<String>,
path: String,
token_type: Option<String>,
secret: String,
expires_on: time::OffsetDateTime,
) -> Result<()> {
self.implementation
.authorize_path(path, secret, expires_on)
.authorize_path(path, token_type, secret, expires_on)
.await
}
async fn attach(&self) -> Result<()> {
Expand Down
38 changes: 19 additions & 19 deletions sdk/core/azure_core_amqp/src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,23 +36,23 @@ impl AmqpConnectionOptions {
pub fn channel_max(&self) -> Option<u16> {
self.channel_max
}
pub fn idle_timeout(&self) -> Option<Duration> {
self.idle_timeout
pub fn idle_timeout(&self) -> Option<&Duration> {
self.idle_timeout.as_ref()
}
pub fn outgoing_locales(&self) -> Option<Vec<String>> {
self.outgoing_locales.clone()
pub fn outgoing_locales(&self) -> Option<&Vec<String>> {
self.outgoing_locales.as_ref()
}
pub fn incoming_locales(&self) -> Option<Vec<String>> {
self.incoming_locales.clone()
pub fn incoming_locales(&self) -> Option<&Vec<String>> {
self.incoming_locales.as_ref()
}
pub fn offered_capabilities(&self) -> Option<Vec<AmqpSymbol>> {
self.offered_capabilities.clone()
pub fn offered_capabilities(&self) -> Option<&Vec<AmqpSymbol>> {
self.offered_capabilities.as_ref()
}
pub fn desired_capabilities(&self) -> Option<Vec<AmqpSymbol>> {
self.desired_capabilities.clone()
pub fn desired_capabilities(&self) -> Option<&Vec<AmqpSymbol>> {
self.desired_capabilities.as_ref()
}
pub fn properties(&self) -> Option<AmqpOrderedMap<AmqpSymbol, AmqpValue>> {
self.properties.clone()
pub fn properties(&self) -> Option<&AmqpOrderedMap<AmqpSymbol, AmqpValue>> {
self.properties.as_ref()
}
pub fn buffer_size(&self) -> Option<usize> {
self.buffer_size
Expand All @@ -62,28 +62,28 @@ impl AmqpConnectionOptions {
pub trait AmqpConnectionApis {
fn open(
&self,
name: impl Into<String>,
name: String,
url: Url,
options: Option<AmqpConnectionOptions>,
) -> impl std::future::Future<Output = Result<()>>;
fn close(&self) -> impl std::future::Future<Output = Result<()>>;
fn close_with_error(
&self,
condition: impl Into<AmqpSymbol>,
condition: AmqpSymbol,
description: Option<String>,
info: Option<AmqpOrderedMap<AmqpSymbol, AmqpValue>>,
) -> impl std::future::Future<Output = Result<()>>;
}

#[derive(Debug, Default)]
#[derive(Default)]
pub struct AmqpConnection {
pub(crate) implementation: ConnectionImplementation,
}

impl AmqpConnectionApis for AmqpConnection {
fn open(
&self,
name: impl Into<String>,
name: String,
url: Url,
options: Option<AmqpConnectionOptions>,
) -> impl std::future::Future<Output = Result<()>> {
Expand All @@ -94,7 +94,7 @@ impl AmqpConnectionApis for AmqpConnection {
}
fn close_with_error(
&self,
condition: impl Into<AmqpSymbol>,
condition: AmqpSymbol,
description: Option<String>,
info: Option<AmqpOrderedMap<AmqpSymbol, AmqpValue>>,
) -> impl std::future::Future<Output = Result<()>> {
Expand Down Expand Up @@ -156,8 +156,8 @@ pub mod builders {
}
pub fn with_properties<K, V>(mut self, properties: impl Into<AmqpOrderedMap<K, V>>) -> Self
where
K: Into<AmqpSymbol> + Debug + Default + PartialEq,
V: Into<AmqpValue> + Debug + Default,
K: Into<AmqpSymbol> + Debug + Clone + PartialEq,
V: Into<AmqpValue> + Debug + Clone,
{
let properties_map: AmqpOrderedMap<K, V> = properties.into();
let properties_map = properties_map
Expand Down
14 changes: 7 additions & 7 deletions sdk/core/azure_core_amqp/src/fe2o3/cbs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,9 @@ use azure_core::error::Result;
use fe2o3_amqp_cbs::token::CbsToken;
use fe2o3_amqp_types::primitives::Timestamp;
use std::borrow::BorrowMut;
use std::{fmt::Debug, sync::OnceLock};
use std::sync::OnceLock;
use tracing::{debug, trace};

#[derive(Debug)]
pub(crate) struct Fe2o3ClaimsBasedSecurity<'a> {
cbs: OnceLock<Mutex<fe2o3_amqp_cbs::client::CbsClient>>,
session: &'a AmqpSession,
Expand Down Expand Up @@ -67,8 +66,9 @@ impl AmqpClaimsBasedSecurityApis for Fe2o3ClaimsBasedSecurity<'_> {

async fn authorize_path(
&self,
path: impl Into<String> + Debug,
secret: impl Into<String>,
path: String,
token_type: Option<String>,
secret: String,
expires_at: time::OffsetDateTime,
) -> Result<()> {
trace!(
Expand All @@ -77,8 +77,8 @@ impl AmqpClaimsBasedSecurityApis for Fe2o3ClaimsBasedSecurity<'_> {
expires_at
);
let cbs_token = CbsToken::new(
secret.into(),
"jwt",
secret,
token_type.unwrap_or("jwt".to_string()),
Some(Timestamp::from(
expires_at
.to_offset(time::UtcOffset::UTC)
Expand All @@ -103,7 +103,7 @@ impl AmqpClaimsBasedSecurityApis for Fe2o3ClaimsBasedSecurity<'_> {
.lock()
.await
.borrow_mut()
.put_token(path.into(), cbs_token)
.put_token(path, cbs_token)
.await
.map_err(AmqpManagement::from)?;
Ok(())
Expand Down
6 changes: 3 additions & 3 deletions sdk/core/azure_core_amqp/src/fe2o3/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ impl Drop for Fe2o3AmqpConnection {
impl AmqpConnectionApis for Fe2o3AmqpConnection {
async fn open(
&self,
id: impl Into<String>,
id: String,
url: Url,
options: Option<AmqpConnectionOptions>,
) -> Result<()> {
Expand Down Expand Up @@ -133,7 +133,7 @@ impl AmqpConnectionApis for Fe2o3AmqpConnection {
}
async fn close_with_error(
&self,
condition: impl Into<AmqpSymbol>,
condition: AmqpSymbol,
description: Option<String>,
info: Option<AmqpOrderedMap<AmqpSymbol, AmqpValue>>,
) -> Result<()> {
Expand All @@ -152,7 +152,7 @@ impl AmqpConnectionApis for Fe2o3AmqpConnection {
.borrow_mut()
.close_with_error(fe2o3_amqp::types::definitions::Error::new(
fe2o3_amqp::types::definitions::ErrorCondition::Custom(
fe2o3_amqp_types::primitives::Symbol::from(condition.into()),
fe2o3_amqp_types::primitives::Symbol::from(condition),
),
description,
info.map(|i| i.into()),
Expand Down
11 changes: 5 additions & 6 deletions sdk/core/azure_core_amqp/src/fe2o3/management.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ use crate::{
session::AmqpSession,
value::{AmqpOrderedMap, AmqpValue},
};

use async_std::sync::Mutex;
use azure_core::{
credentials::AccessToken,
Expand Down Expand Up @@ -40,15 +39,15 @@ impl Drop for Fe2o3AmqpManagement {
impl Fe2o3AmqpManagement {
pub fn new(
session: AmqpSession,
client_node_name: impl Into<String>,
client_node_name: String,
access_token: AccessToken,
) -> Result<Self> {
// Session::get() returns a clone of the underlying session handle.
let session = session.implementation.get()?;

Ok(Self {
access_token,
client_node_name: client_node_name.into(),
client_node_name,
session,
management: OnceLock::new(),
})
Expand Down Expand Up @@ -85,7 +84,7 @@ impl AmqpManagementApis for Fe2o3AmqpManagement {

async fn call(
&self,
operation_type: impl Into<String>,
operation_type: String,
application_properties: AmqpOrderedMap<String, AmqpValue>,
) -> Result<AmqpOrderedMap<String, AmqpValue>> {
let mut management = self
Expand Down Expand Up @@ -122,12 +121,12 @@ struct WithApplicationPropertiesRequest<'a> {

impl<'a> WithApplicationPropertiesRequest<'a> {
pub fn new(
entity_type: impl Into<String>,
entity_type: String,
access_token: &'a AccessToken,
application_properties: AmqpOrderedMap<String, AmqpValue>,
) -> Self {
Self {
entity_type: entity_type.into(),
entity_type,
access_token,
application_properties,
}
Expand Down
22 changes: 11 additions & 11 deletions sdk/core/azure_core_amqp/src/fe2o3/messaging/message_fields.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ impl From<fe2o3_amqp_types::messaging::ApplicationProperties>
fn from(application_properties: fe2o3_amqp_types::messaging::ApplicationProperties) -> Self {
let mut properties = AmqpOrderedMap::<String, AmqpValue>::new();
for (key, value) in application_properties.0 {
properties.insert(key, value);
properties.insert(key, value.into());
}
AmqpApplicationProperties(properties)
}
Expand Down Expand Up @@ -278,7 +278,7 @@ impl From<fe2o3_amqp_types::messaging::Annotations> for AmqpAnnotations {
fn from(annotations: fe2o3_amqp_types::messaging::Annotations) -> Self {
let mut amqp_annotations = AmqpOrderedMap::<AmqpAnnotationKey, AmqpValue>::new();
for (key, value) in annotations {
amqp_annotations.insert(key, value);
amqp_annotations.insert(key.into(), value.into());
}
AmqpAnnotations(amqp_annotations)
}
Expand Down Expand Up @@ -363,11 +363,11 @@ impl From<fe2o3_amqp_types::messaging::Properties> for AmqpMessageProperties {
}
if let Some(content_type) = properties.content_type {
amqp_message_properties_builder =
amqp_message_properties_builder.with_content_type(content_type);
amqp_message_properties_builder.with_content_type(content_type.into());
}
if let Some(content_encoding) = properties.content_encoding {
amqp_message_properties_builder =
amqp_message_properties_builder.with_content_encoding(content_encoding);
amqp_message_properties_builder.with_content_encoding(content_encoding.into());
}
if let Some(absolute_expiry_time) = properties.absolute_expiry_time {
amqp_message_properties_builder =
Expand Down Expand Up @@ -480,17 +480,17 @@ fn test_properties_conversion() {

let properties = AmqpMessageProperties::builder()
.with_absolute_expiry_time(time_now)
.with_content_encoding("content_encoding")
.with_content_type("content_type")
.with_content_encoding(crate::value::AmqpSymbol("content_encoding".to_string()))
.with_content_type(crate::value::AmqpSymbol("content_type".to_string()))
.with_correlation_id("correlation_id")
.with_creation_time(time_now)
.with_group_id("group_id")
.with_group_id("group_id".to_string())
.with_group_sequence(3)
.with_message_id("test")
.with_reply_to("reply_to")
.with_reply_to_group_id("reply_to_group_id")
.with_subject("subject")
.with_to("to")
.with_reply_to("reply_to".to_string())
.with_reply_to_group_id("reply_to_group_id".to_string())
.with_subject("subject".to_string())
.with_to("to".to_string())
.with_user_id(vec![1, 2, 3])
.build();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ fn message_source_conversion_fe2o3_amqp() {
#[test]
fn message_source_conversion_amqp_fe2o3() {
let amqp_source = AmqpSource::builder()
.with_address("test")
.with_address("test".to_string())
.with_durable(TerminusDurability::UnsettledState)
.with_expiry_policy(TerminusExpiryPolicy::SessionEnd)
.with_timeout(95)
Expand Down
14 changes: 7 additions & 7 deletions sdk/core/azure_core_amqp/src/fe2o3/messaging/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -521,8 +521,8 @@ mod tests {
std::time::UNIX_EPOCH + std::time::Duration::from_millis(timestamp as u64);

let amqp_message = AmqpMessage::builder()
.add_application_property("abc", "23 skiddoo")
.add_application_property("What?", 29.5)
.add_application_property("abc".to_string(), "23 skiddoo")
.add_application_property("What?".to_string(), 29.5)
.with_body(AmqpValue::from("hello"))
.with_properties(
AmqpMessageProperties::builder()
Expand All @@ -531,13 +531,13 @@ mod tests {
.with_content_type(AmqpSymbol::from("text/plain"))
.with_correlation_id("abc")
.with_creation_time(timestamp)
.with_group_id(AmqpSymbol::from("group"))
.with_group_id("group".to_string())
.with_group_sequence(5)
.with_message_id("message")
.with_reply_to(AmqpSymbol::from("reply"))
.with_reply_to_group_id(AmqpSymbol::from("reply_group"))
.with_subject(AmqpSymbol::from("subject"))
.with_to(AmqpSymbol::from("to"))
.with_reply_to("reply".to_string())
.with_reply_to_group_id("reply_group".to_string())
.with_subject("subject".to_string())
.with_to("to".to_string())
.with_user_id(vec![39, 20, 54])
.build(),
)
Expand Down
3 changes: 1 addition & 2 deletions sdk/core/azure_core_amqp/src/fe2o3/receiver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use std::borrow::BorrowMut;
use std::sync::{Arc, OnceLock};
use tracing::trace;

#[derive(Debug, Default)]
#[derive(Default)]
pub(crate) struct Fe2o3AmqpReceiver {
receiver: OnceLock<Arc<Mutex<fe2o3_amqp::Receiver>>>,
}
Expand Down Expand Up @@ -84,7 +84,6 @@ impl AmqpReceiverApis for Fe2o3AmqpReceiver {
.max_message_size())
}

#[tracing::instrument]
async fn receive(&self) -> Result<AmqpMessage> {
let mut receiver = self
.receiver
Expand Down
Loading

0 comments on commit 9908ea7

Please sign in to comment.