Skip to content

Commit 0f834ac

Browse files
committed
Implementation of http.send built-in
This is largely a derived work on top of [matrix-org/rust-opa-wasm!218](matrix-org#218) which was authored by [@MatMaul](https://github.com/MatMaul). Signed-off-by: Konstantin A. Olkhovskiy <[email protected]>
1 parent 0f49a25 commit 0f834ac

File tree

9 files changed

+417
-15
lines changed

9 files changed

+417
-15
lines changed

Cargo.toml

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,8 @@ chrono = { version = "0.4.31", optional = true, default-features = false, featur
5959
chrono-tz = { version = ">=0.6, <0.11.0", optional = true }
6060
chronoutil = { version = "0.2", optional = true }
6161
duration-str = { version = ">=0.11, <0.16", optional = true, default-features = false }
62+
http = { version = "1.3.1", optional = true }
63+
reqwest = { version = "0.12", features = ["json"], optional = true } # used for tests
6264

6365
[dev-dependencies.tokio]
6466
version = "1.5"
@@ -69,6 +71,7 @@ wasmtime = { version = ">=22, <34", default-features = false, features = [
6971
"cranelift",
7072
] }
7173
insta = { version = "1", features = ["yaml"] }
74+
httpmock = "0.7.0"
7275

7376
[build-dependencies]
7477
# We would like at least this version of rayon, because older versions depend on old rand,
@@ -103,6 +106,7 @@ fast = ["wasmtime/cranelift", "wasmtime/parallel-compilation"]
103106

104107
rng = ["dep:rand"]
105108
time = ["dep:chrono"]
109+
http = ["dep:http"]
106110

107111
base64url-builtins = ["dep:base64", "dep:hex"]
108112
crypto-digest-builtins = ["dep:digest", "dep:hex"]
@@ -117,6 +121,7 @@ json-builtins = ["dep:json-patch"]
117121
units-builtins = ["dep:parse-size"]
118122
rand-builtins = ["rng"]
119123
yaml-builtins = ["dep:serde_yaml"]
124+
http-builtins = ["http", "dep:serde_yaml", "dep:duration-str"]
120125
urlquery-builtins = ["dep:form_urlencoded", "dep:urlencoding"]
121126
time-builtins = ["time", "dep:chrono-tz", "dep:duration-str", "dep:chronoutil"]
122127

@@ -138,13 +143,16 @@ all-builtins = [
138143
"sprintf-builtins",
139144
"units-builtins",
140145
"yaml-builtins",
146+
"http-builtins",
141147
"urlquery-builtins",
142148
"time-builtins",
143149
]
144150

151+
testing = ["http", "dep:reqwest"]
152+
145153
[[test]]
146154
name = "smoke_test"
147-
required-features = ["loader"]
155+
required-features = ["loader", "http-builtins", "testing"]
148156

149157
[[bin]]
150158
name = "opa-eval"

features.txt

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
loader
33
cli
44
rng
5+
http
56
base64url-builtins
67
crypto-digest-builtins crypto-md5-builtins
78
crypto-digest-builtins crypto-sha1-builtins
@@ -16,6 +17,8 @@ json-builtins
1617
units-builtins
1718
rand-builtins
1819
yaml-builtins
20+
http-builtins
1921
time-builtins
2022
all-crypto-builtins
2123
all-builtins
24+
testing

src/builtins/impls/http.rs

Lines changed: 203 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,209 @@
1414

1515
//! Builtins used to make HTTP request
1616
17-
use anyhow::{bail, Result};
17+
use std::{collections::HashMap, future::Future, pin::Pin, time::Duration};
18+
19+
use anyhow::{Context, Result};
20+
use serde_json::{self, Map};
21+
use serde_yaml;
22+
use tokio::time::sleep;
23+
24+
use crate::{builtins::traits::Builtin, EvaluationContext};
25+
26+
/// This builtin is needed because the wrapper in traits.rs doesn't work when
27+
/// dealing with async+context.
28+
pub struct HttpSendBuiltin;
29+
30+
impl<C: 'static> Builtin<C> for HttpSendBuiltin
31+
where
32+
C: EvaluationContext,
33+
{
34+
fn call<'a>(
35+
&'a self,
36+
context: &'a mut C,
37+
args: &'a [&'a [u8]],
38+
) -> Pin<Box<dyn Future<Output = Result<Vec<u8>, anyhow::Error>> + Send + 'a>> {
39+
Box::pin(async move {
40+
let [opa_req]: [&'a [u8]; 1] = args.try_into().ok().context("invalid arguments")?;
41+
let opa_req: serde_json::Value =
42+
serde_json::from_slice(opa_req).context("failed to convert opa_req argument")?;
43+
let res = send(context, opa_req).await?;
44+
let res = serde_json::to_vec(&res).context("could not serialize result")?;
45+
Ok(res)
46+
})
47+
}
48+
}
1849

1950
/// Returns a HTTP response to the given HTTP request.
20-
#[tracing::instrument(name = "http.send", err)]
21-
pub fn send(request: serde_json::Value) -> Result<serde_json::Value> {
22-
bail!("not implemented");
51+
///
52+
/// Wraps [`internal_send`] to add error handling regarding the `raise_error`
53+
/// field in the OPA request.
54+
#[tracing::instrument(name = "http.send", skip(ctx), err)]
55+
pub async fn send<C: EvaluationContext>(
56+
ctx: &mut C,
57+
opa_req: serde_json::Value,
58+
) -> Result<serde_json::Value> {
59+
let raise_error = opa_req
60+
.get("raise_error")
61+
.and_then(serde_json::Value::as_bool)
62+
.unwrap_or(true);
63+
64+
match internal_send(ctx, opa_req).await {
65+
Ok(resp) => Ok(resp),
66+
Err(e) => {
67+
if raise_error {
68+
Err(e)
69+
} else {
70+
Ok(serde_json::json!({
71+
"status_code": 0,
72+
"error": { "message": e.to_string() },
73+
}))
74+
}
75+
}
76+
}
77+
}
78+
79+
/// Sends a HTTP request and returns the response.
80+
async fn internal_send<C: EvaluationContext>(
81+
ctx: &mut C,
82+
opa_req: serde_json::Value,
83+
) -> Result<serde_json::Value> {
84+
let opa_req = opa_req
85+
.as_object()
86+
.ok_or_else(|| anyhow::anyhow!("request must be a JSON object"))?;
87+
88+
let http_req = convert_opa_req_to_http_req(opa_req)?;
89+
90+
let timeout_value = opa_req.get("timeout");
91+
let timeout = if let Some(timeout_value) = timeout_value {
92+
if let Some(timeout_nanos) = timeout_value.as_u64() {
93+
Some(Duration::from_nanos(timeout_nanos))
94+
} else if let Some(timeout_str) = timeout_value.as_str() {
95+
duration_str::parse(timeout_str).ok()
96+
} else {
97+
None
98+
}
99+
} else {
100+
None
101+
};
102+
103+
let enable_redirect = opa_req
104+
.get("enable_redirect")
105+
.and_then(serde_json::Value::as_bool);
106+
107+
let max_retry_attempts = opa_req
108+
.get("max_retry_attempts")
109+
.and_then(serde_json::Value::as_u64)
110+
.unwrap_or(0);
111+
112+
let mut http_resp_res: Result<http::Response<String>> = Err(anyhow::anyhow!("unreachable"));
113+
114+
for attempt in 0..=max_retry_attempts {
115+
http_resp_res = ctx
116+
.send_http(http_req.clone(), timeout, enable_redirect)
117+
.await;
118+
if http_resp_res.is_ok() {
119+
break;
120+
}
121+
if max_retry_attempts > 0 {
122+
#[allow(clippy::cast_possible_truncation)]
123+
sleep(Duration::from_millis(500 * 2_u64.pow(attempt as u32))).await;
124+
}
125+
}
126+
127+
let http_resp = http_resp_res?;
128+
129+
let force_json_decode = opa_req
130+
.get("force_json_decode")
131+
.and_then(serde_json::Value::as_bool)
132+
.unwrap_or(false);
133+
let force_yaml_decode = opa_req
134+
.get("force_yaml_decode")
135+
.and_then(serde_json::Value::as_bool)
136+
.unwrap_or(false);
137+
138+
Ok(convert_http_resp_to_opa_resp(
139+
http_resp,
140+
force_json_decode,
141+
force_yaml_decode,
142+
))
143+
}
144+
145+
/// Converts an OPA request to an HTTP request.
146+
fn convert_opa_req_to_http_req(
147+
opa_req: &Map<String, serde_json::Value>,
148+
) -> Result<http::Request<String>> {
149+
let url = opa_req
150+
.get("url")
151+
.ok_or_else(|| anyhow::anyhow!("missing url"))?
152+
.as_str()
153+
.ok_or_else(|| anyhow::anyhow!("url must be a string"))?;
154+
let method = opa_req
155+
.get("method")
156+
.ok_or_else(|| anyhow::anyhow!("missing method"))?
157+
.as_str()
158+
.map(str::to_uppercase)
159+
.ok_or_else(|| anyhow::anyhow!("method must be a string"))?;
160+
let headers = opa_req.get("headers").and_then(|v| v.as_object());
161+
162+
let mut req_builder = http::Request::builder().method(method.as_str()).uri(url);
163+
if let Some(headers) = headers {
164+
for (key, value) in headers {
165+
req_builder = req_builder.header(key, value.to_string());
166+
}
167+
}
168+
169+
let json_req_body = opa_req.get("body");
170+
let http_req = if let Some(json_req_body) = json_req_body {
171+
req_builder.body(json_req_body.to_string())?
172+
} else {
173+
let raw_req_body = opa_req
174+
.get("raw_body")
175+
.map(ToString::to_string)
176+
.unwrap_or_default();
177+
req_builder.body(raw_req_body)?
178+
};
179+
180+
Ok(http_req)
181+
}
182+
183+
/// Converts an HTTP response to an OPA response.
184+
fn convert_http_resp_to_opa_resp(
185+
response: http::Response<String>,
186+
force_json_decode: bool,
187+
force_yaml_decode: bool,
188+
) -> serde_json::Value {
189+
let response_headers = response
190+
.headers()
191+
.iter()
192+
.map(|(k, v)| (k.as_str(), v.to_str().unwrap_or("")))
193+
.collect::<HashMap<_, _>>();
194+
195+
let mut opa_resp = serde_json::json!({
196+
"status_code": response.status().as_u16(),
197+
"headers": response_headers,
198+
});
199+
200+
let raw_resp_body = response.body().clone();
201+
opa_resp["raw_body"] = serde_json::Value::String(raw_resp_body.clone());
202+
203+
let content_type = response
204+
.headers()
205+
.get("content-type")
206+
.map(|v| v.to_str().unwrap_or_default());
207+
208+
if force_json_decode || content_type == Some("application/json") {
209+
if let Ok(parsed_body) = serde_json::from_str::<serde_json::Value>(&raw_resp_body) {
210+
opa_resp["body"] = parsed_body;
211+
}
212+
} else if force_yaml_decode
213+
|| content_type == Some("application/yaml")
214+
|| content_type == Some("application/x-yaml")
215+
{
216+
if let Ok(parsed_body) = serde_yaml::from_str::<serde_json::Value>(&raw_resp_body) {
217+
opa_resp["body"] = parsed_body;
218+
}
219+
}
220+
221+
opa_resp
23222
}

src/builtins/impls/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ pub mod graph;
2727
pub mod graphql;
2828
#[cfg(feature = "hex-builtins")]
2929
pub mod hex;
30+
#[cfg(feature = "http-builtins")]
3031
pub mod http;
3132
pub mod io;
3233
#[cfg(feature = "json-builtins")]

src/builtins/mod.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,8 @@ pub fn resolve<C: EvaluationContext>(name: &str) -> Result<Box<dyn Builtin<C>>>
8080
#[cfg(feature = "hex-builtins")]
8181
"hex.encode" => Ok(self::impls::hex::encode.wrap()),
8282

83-
"http.send" => Ok(self::impls::http::send.wrap()),
83+
#[cfg(feature = "http-builtins")]
84+
"http.send" => Ok(Box::new(self::impls::http::HttpSendBuiltin)),
8485
"indexof_n" => Ok(self::impls::indexof_n.wrap()),
8586
"io.jwt.decode" => Ok(self::impls::io::jwt::decode.wrap()),
8687
"io.jwt.decode_verify" => Ok(self::impls::io::jwt::decode_verify.wrap()),

0 commit comments

Comments
 (0)