Skip to content

Commit 94c0c08

Browse files
committed
Dedup impl
Signed-off-by: Ryan Levick <[email protected]>
1 parent 2cdc5a1 commit 94c0c08

File tree

3 files changed

+71
-50
lines changed

3 files changed

+71
-50
lines changed

crates/factor-outbound-http/src/lib.rs

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,66 @@ impl InstanceState {
142142

143143
impl SelfInstanceBuilder for InstanceState {}
144144

145+
/// Helper module for acquiring permits from the outbound connections semaphore.
146+
///
147+
/// This is used by the outbound HTTP implementations to limit concurrent outbound connections.
148+
mod concurrent_outbound_connections {
149+
use super::*;
150+
151+
/// Acquires a semaphore permit for the given interface, if a semaphore is configured.
152+
pub async fn acquire_semaphore<'a>(
153+
interface: &str,
154+
semaphore: &'a Option<Arc<Semaphore>>,
155+
) -> Option<tokio::sync::SemaphorePermit<'a>> {
156+
let s = semaphore.as_ref()?;
157+
acquire(interface, || s.try_acquire(), async || s.acquire().await).await
158+
}
159+
160+
/// Acquires an owned semaphore permit for the given interface, if a semaphore is configured.
161+
pub async fn acquire_owned_semaphore(
162+
interface: &str,
163+
semaphore: &Option<Arc<Semaphore>>,
164+
) -> Option<tokio::sync::OwnedSemaphorePermit> {
165+
let s = semaphore.as_ref()?;
166+
acquire(
167+
interface,
168+
|| s.clone().try_acquire_owned(),
169+
async || s.clone().acquire_owned().await,
170+
)
171+
.await
172+
}
173+
174+
/// Helper function to acquire a semaphore permit, either immediately or by waiting.
175+
///
176+
/// Allows getting either a borrowed or owned permit.
177+
async fn acquire<T>(
178+
interface: &str,
179+
try_acquire: impl Fn() -> Result<T, tokio::sync::TryAcquireError>,
180+
acquire: impl AsyncFnOnce() -> Result<T, tokio::sync::AcquireError>,
181+
) -> Option<T> {
182+
// Try to acquire a permit without waiting first
183+
// Keep track of whether we had to wait for metrics purposes.
184+
let mut waited = false;
185+
let permit = match try_acquire() {
186+
Ok(p) => Ok(p),
187+
// No available permits right now; wait for one
188+
Err(tokio::sync::TryAcquireError::NoPermits) => {
189+
waited = true;
190+
acquire().await.map_err(|_| ())
191+
}
192+
Err(_) => Err(()),
193+
};
194+
if permit.is_ok() {
195+
spin_telemetry::monotonic_counter!(
196+
outbound_http.acquired_permits = 1,
197+
interface = interface,
198+
waited = waited
199+
);
200+
}
201+
permit.ok()
202+
}
203+
}
204+
145205
pub type Request = http::Request<wasmtime_wasi_http::body::HyperOutgoingBody>;
146206
pub type Response = http::Response<wasmtime_wasi_http::body::HyperIncomingBody>;
147207

crates/factor-outbound-http/src/spin.rs

Lines changed: 5 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -104,31 +104,11 @@ impl spin_http::Host for crate::InstanceState {
104104
// If we're limiting concurrent outbound requests, acquire a permit
105105
// Note: since we don't have access to the underlying connection, we can only
106106
// limit the number of concurrent requests, not connections.
107-
let permit = match &self.concurrent_outbound_connections_semaphore {
108-
Some(s) => {
109-
// Try to acquire a permit without waiting first
110-
// Keep track of whether we had to wait for metrics purposes.
111-
let mut waited = false;
112-
let permit = match s.try_acquire() {
113-
Ok(p) => Ok(p),
114-
// No available permits right now; wait for one
115-
Err(tokio::sync::TryAcquireError::NoPermits) => {
116-
waited = true;
117-
s.acquire().await.map_err(|_| ())
118-
}
119-
Err(_) => Err(()),
120-
};
121-
if permit.is_ok() {
122-
spin_telemetry::monotonic_counter!(
123-
outbound_http.acquired_permits = 1,
124-
interface = "spin",
125-
waited = waited
126-
);
127-
}
128-
permit.ok()
129-
}
130-
None => None,
131-
};
107+
let permit = crate::concurrent_outbound_connections::acquire_semaphore(
108+
"spin",
109+
&self.concurrent_outbound_connections_semaphore,
110+
)
111+
.await;
132112
let resp = client.execute(req).await.map_err(log_reqwest_error)?;
133113
drop(permit);
134114

crates/factor-outbound-http/src/wasi.rs

Lines changed: 6 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -590,31 +590,12 @@ impl ConnectOptions {
590590
crate::remove_blocked_addrs(&self.blocked_networks, &mut socket_addrs)?;
591591

592592
// If we're limiting concurrent outbound requests, acquire a permit
593-
let permit = match &self.concurrent_outbound_connections_semaphore {
594-
Some(s) => {
595-
// Try to acquire a permit without waiting first
596-
// Keep track of whether we had to wait for metrics purposes.
597-
let mut waited = false;
598-
let permit = match s.clone().try_acquire_owned() {
599-
Ok(p) => Ok(p),
600-
// No available permits right now; wait for one
601-
Err(tokio::sync::TryAcquireError::NoPermits) => {
602-
waited = true;
603-
s.clone().acquire_owned().await.map_err(|_| ())
604-
}
605-
Err(_) => Err(()),
606-
};
607-
if permit.is_ok() {
608-
spin_telemetry::monotonic_counter!(
609-
outbound_http.acquired_permits = 1,
610-
interface = "wasi",
611-
waited = waited
612-
);
613-
}
614-
permit.ok()
615-
}
616-
None => None,
617-
};
593+
594+
let permit = crate::concurrent_outbound_connections::acquire_owned_semaphore(
595+
"wasi",
596+
&self.concurrent_outbound_connections_semaphore,
597+
)
598+
.await;
618599

619600
let stream = timeout(self.connect_timeout, TcpStream::connect(&*socket_addrs))
620601
.await

0 commit comments

Comments
 (0)