Skip to content

Commit 3ba0c9c

Browse files
committed
Implements http.send builtin
1 parent 0fbbd1b commit 3ba0c9c

File tree

11 files changed

+501
-21
lines changed

11 files changed

+501
-21
lines changed

Cargo.toml

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,8 +49,10 @@ sha1 = { version = "0.10", optional = true }
4949
sha2 = { version = "0.10", optional = true }
5050
sprintf = { version = ">=0.3, <0.5", optional = true }
5151
parse-size = { version = "1", features = ["std"], optional = true }
52-
serde_yaml = { version = "0.9.1", optional = true }
52+
serde_yml = { version = "0.0.12", optional = true }
5353
form_urlencoded = { version = "1", optional = true }
54+
http = { version = "1", optional = true }
55+
reqwest = { version = "0.12", features = ["json"], optional = true }
5456
urlencoding = { version = "2", optional = true }
5557
chrono = { version = "0.4.31", optional = true, default-features = false, features = [
5658
"std",
@@ -69,6 +71,7 @@ wasmtime = { version = ">=22, <32", default-features = false, features = [
6971
"cranelift",
7072
] }
7173
insta = { version = "1", features = ["yaml"] }
74+
httpmock = "0.7"
7275

7376
[build-dependencies]
7477
# We would like at least this version of rayon, because older versions depend on old rand,
@@ -103,6 +106,7 @@ fast = ["wasmtime/cranelift", "wasmtime/parallel-compilation"]
103106

104107
rng = ["dep:rand"]
105108
time = ["dep:chrono"]
109+
http = ["dep:http", "dep:reqwest"]
106110

107111
base64url-builtins = ["dep:base64", "dep:hex"]
108112
crypto-digest-builtins = ["dep:digest", "dep:hex"]
@@ -116,9 +120,10 @@ sprintf-builtins = ["dep:sprintf"]
116120
json-builtins = ["dep:json-patch"]
117121
units-builtins = ["dep:parse-size"]
118122
rand-builtins = ["rng"]
119-
yaml-builtins = ["dep:serde_yaml"]
123+
yaml-builtins = ["dep:serde_yml"]
120124
urlquery-builtins = ["dep:form_urlencoded", "dep:urlencoding"]
121125
time-builtins = ["time", "dep:chrono-tz", "dep:duration-str", "dep:chronoutil"]
126+
http-builtins = ["http", "dep:serde_yml"]
122127

123128
all-crypto-builtins = [
124129
"crypto-digest-builtins",
@@ -140,6 +145,7 @@ all-builtins = [
140145
"yaml-builtins",
141146
"urlquery-builtins",
142147
"time-builtins",
148+
"http-builtins",
143149
]
144150

145151
[[test]]

features.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
loader
33
cli
44
rng
5+
http
56
base64url-builtins
67
crypto-digest-builtins crypto-md5-builtins
78
crypto-digest-builtins crypto-sha1-builtins
@@ -17,5 +18,6 @@ units-builtins
1718
rand-builtins
1819
yaml-builtins
1920
time-builtins
21+
http-builtins
2022
all-crypto-builtins
2123
all-builtins

src/builtins/impls/http.rs

Lines changed: 203 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,209 @@
1414

1515
//! Builtins used to make HTTP request
1616
17-
use anyhow::{bail, Result};
17+
use std::{collections::HashMap, future::Future, pin::Pin, time::Duration};
18+
19+
use anyhow::{Context, Result};
20+
use http;
21+
use serde_json::{self, Map};
22+
use serde_yml;
23+
use tokio::time::sleep;
24+
25+
use crate::{builtins::traits::Builtin, EvaluationContext};
26+
27+
// This is needed because the wrapper in traits.rs doesn't work when dealing
28+
// with async+context
29+
pub struct HttpSendBuiltin {}
30+
31+
impl<C: 'static> Builtin<C> for HttpSendBuiltin
32+
where
33+
C: EvaluationContext,
34+
{
35+
fn call<'a>(
36+
&'a self,
37+
context: &'a mut C,
38+
args: &'a [&'a [u8]],
39+
) -> Pin<Box<dyn Future<Output = Result<Vec<u8>, anyhow::Error>> + Send + 'a>> {
40+
Box::pin(async move {
41+
let [opa_req]: [&'a [u8]; 1] = args.try_into().ok().context("invalid arguments")?;
42+
43+
let opa_req: serde_json::Value = serde_json::from_slice(opa_req)
44+
.context(concat!("failed to convert opa_req argument"))?;
45+
46+
let res = send(context, opa_req).await?;
47+
let res = serde_json::to_vec(&res).context("could not serialize result")?;
48+
Ok(res)
49+
})
50+
}
51+
}
1852

1953
/// Returns a HTTP response to the given HTTP request.
20-
#[tracing::instrument(name = "http.send", err)]
21-
pub fn send(request: serde_json::Value) -> Result<serde_json::Value> {
22-
bail!("not implemented");
54+
#[tracing::instrument(name = "http.send", skip(ctx), err)]
55+
pub async fn send<C: EvaluationContext>(
56+
ctx: &mut C,
57+
opa_req: serde_json::Value,
58+
) -> Result<serde_json::Value> {
59+
let raise_error = opa_req
60+
.get("raise_error")
61+
.and_then(serde_json::Value::as_bool)
62+
.unwrap_or(true);
63+
64+
match internal_send(ctx, opa_req).await {
65+
Ok(resp) => Ok(resp),
66+
Err(e) => {
67+
if raise_error {
68+
Err(e)
69+
} else {
70+
Ok(serde_json::json!({
71+
"status_code": 0,
72+
"error": {
73+
"message": e.to_string(),
74+
},
75+
}))
76+
}
77+
}
78+
}
79+
}
80+
81+
async fn internal_send<C: EvaluationContext>(
82+
ctx: &mut C,
83+
opa_req: serde_json::Value,
84+
) -> Result<serde_json::Value> {
85+
let opa_req = opa_req
86+
.as_object()
87+
.ok_or(anyhow::anyhow!("request must be a JSON object"))?;
88+
89+
let http_req = convert_opa_req_to_http_req(opa_req)?;
90+
91+
let timeout_value = opa_req.get("timeout");
92+
93+
let timeout = if let Some(timeout_value) = timeout_value {
94+
if let Some(timeout_nanos) = timeout_value.as_u64() {
95+
Some(Duration::from_nanos(timeout_nanos))
96+
} else if let Some(timeout_str) = timeout_value.as_str() {
97+
duration_str::parse(timeout_str).ok()
98+
} else {
99+
None
100+
}
101+
} else {
102+
None
103+
};
104+
105+
let enable_redirect = opa_req
106+
.get("enable_redirect")
107+
.and_then(serde_json::Value::as_bool);
108+
109+
let max_retry_attempts = opa_req
110+
.get("max_retry_attempts")
111+
.and_then(serde_json::Value::as_u64)
112+
.unwrap_or(0);
113+
114+
let mut http_resp_res: Result<http::Response<String>> =
115+
Err(anyhow::anyhow!("This shouldnt happen"));
116+
117+
for attempt in 0..=max_retry_attempts {
118+
http_resp_res = ctx
119+
.send_http(http_req.clone(), timeout, enable_redirect)
120+
.await;
121+
if http_resp_res.is_ok() {
122+
break;
123+
}
124+
if max_retry_attempts > 0 {
125+
sleep(Duration::from_millis(500 * 2_u64.pow(attempt as u32))).await;
126+
}
127+
}
128+
129+
match http_resp_res {
130+
Ok(http_resp) => {
131+
let force_json_decode = opa_req
132+
.get("force_json_decode")
133+
.and_then(serde_json::Value::as_bool)
134+
.unwrap_or(false);
135+
let force_yaml_decode = opa_req
136+
.get("force_yaml_decode")
137+
.and_then(serde_json::Value::as_bool)
138+
.unwrap_or(false);
139+
140+
Ok(convert_http_resp_to_opa_resp(
141+
http_resp,
142+
force_json_decode,
143+
force_yaml_decode,
144+
)?)
145+
}
146+
Err(e) => Err(e),
147+
}
148+
}
149+
150+
fn convert_opa_req_to_http_req(
151+
opa_req: &Map<String, serde_json::Value>,
152+
) -> Result<http::Request<String>> {
153+
let url = opa_req
154+
.get("url")
155+
.ok_or(anyhow::anyhow!("missing url"))?
156+
.as_str()
157+
.ok_or(anyhow::anyhow!("url must be a string"))?;
158+
let method = opa_req
159+
.get("method")
160+
.ok_or(anyhow::anyhow!("missing method"))?
161+
.as_str()
162+
.map(str::to_uppercase)
163+
.ok_or(anyhow::anyhow!("method must be a string"))?;
164+
let headers = opa_req.get("headers").and_then(|v| v.as_object());
165+
166+
let mut req_builder = http::Request::builder().method(method.as_str()).uri(url);
167+
if let Some(headers) = headers {
168+
for (key, value) in headers {
169+
req_builder = req_builder.header(key, value.to_string());
170+
}
171+
}
172+
173+
let json_req_body = opa_req.get("body");
174+
let http_req = if let Some(json_req_body) = json_req_body {
175+
req_builder.body(json_req_body.to_string())?
176+
} else {
177+
let raw_req_body = opa_req
178+
.get("raw_body")
179+
.map(std::string::ToString::to_string);
180+
req_builder.body(raw_req_body.unwrap_or_default())?
181+
};
182+
183+
Ok(http_req)
184+
}
185+
186+
fn convert_http_resp_to_opa_resp(
187+
response: http::Response<String>,
188+
force_json_decode: bool,
189+
force_yaml_decode: bool,
190+
) -> Result<serde_json::Value> {
191+
let response_headers = response
192+
.headers()
193+
.iter()
194+
.map(|(k, v)| (k.as_str(), v.to_str().unwrap_or("")))
195+
.collect::<HashMap<&str, &str>>();
196+
197+
let mut opa_resp = serde_json::json!({
198+
"status_code": response.status().as_u16(),
199+
"headers": response_headers,
200+
});
201+
202+
let raw_resp_body: &String = response.body();
203+
opa_resp["raw_body"] = serde_json::Value::String(raw_resp_body.clone());
204+
205+
let content_type = response
206+
.headers()
207+
.get("content-type")
208+
.map(|v| v.to_str().unwrap_or_default());
209+
210+
if force_json_decode || content_type == Some("application/json") {
211+
opa_resp["body"] = serde_json::from_str::<serde_json::Value>(raw_resp_body)
212+
.map_err(|e| anyhow::anyhow!("failed to parse JSON body: {}", e))?;
213+
} else if force_yaml_decode
214+
|| content_type == Some("application/yaml")
215+
|| content_type == Some("application/x-yaml")
216+
{
217+
opa_resp["body"] = serde_yml::from_str::<serde_json::Value>(raw_resp_body)
218+
.map_err(|e| anyhow::anyhow!("failed to parse YAML body: {}", e))?;
219+
}
220+
221+
Ok(opa_resp)
23222
}

src/builtins/impls/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ pub mod graph;
2727
pub mod graphql;
2828
#[cfg(feature = "hex-builtins")]
2929
pub mod hex;
30+
#[cfg(feature = "http-builtins")]
3031
pub mod http;
3132
pub mod io;
3233
#[cfg(feature = "json-builtins")]

src/builtins/impls/rand.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ pub fn intn<C: EvaluationContext>(ctx: &mut C, str: String, n: i64) -> Result<i6
3535
let cache_key = ("rand", str, n);
3636
if let Some(v) = ctx.cache_get(&cache_key)? {
3737
return Ok(v);
38-
};
38+
}
3939

4040
let mut rng = ctx.get_rng();
4141
let val = rng.gen_range(0..n);

src/builtins/impls/yaml.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,25 +15,25 @@
1515
//! Builtins parse and serialize YAML documents
1616
1717
use anyhow::Result;
18-
use serde_yaml;
18+
use serde_yml;
1919

2020
/// Verifies the input string is a valid YAML document.
2121
#[tracing::instrument(name = "yaml.is_valid")]
2222
pub fn is_valid(x: String) -> bool {
23-
let parse: Result<serde_yaml::Value, _> = serde_yaml::from_str(&x);
23+
let parse: Result<serde_yml::Value, _> = serde_yml::from_str(&x);
2424
parse.is_ok()
2525
}
2626

2727
/// Serializes the input term to YAML.
2828
#[tracing::instrument(name = "yaml.marshal", err)]
29-
pub fn marshal(x: serde_yaml::Value) -> Result<String> {
30-
let parse: String = serde_yaml::to_string(&x)?;
29+
pub fn marshal(x: serde_yml::Value) -> Result<String> {
30+
let parse: String = serde_yml::to_string(&x)?;
3131
Ok(parse)
3232
}
3333

3434
/// Deserializes the input string.
3535
#[tracing::instrument(name = "yaml.unmarshal", err)]
3636
pub fn unmarshal(x: String) -> Result<serde_json::Value> {
37-
let parse: serde_json::Value = serde_yaml::from_str(&x)?;
37+
let parse: serde_json::Value = serde_yml::from_str(&x)?;
3838
Ok(parse)
3939
}

src/builtins/mod.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,9 @@ pub fn resolve<C: EvaluationContext>(name: &str) -> Result<Box<dyn Builtin<C>>>
8080
#[cfg(feature = "hex-builtins")]
8181
"hex.encode" => Ok(self::impls::hex::encode.wrap()),
8282

83-
"http.send" => Ok(self::impls::http::send.wrap()),
83+
#[cfg(feature = "http-builtins")]
84+
"http.send" => Ok(Box::new(self::impls::http::HttpSendBuiltin {})),
85+
8486
"indexof_n" => Ok(self::impls::indexof_n.wrap()),
8587
"io.jwt.decode" => Ok(self::impls::io::jwt::decode.wrap()),
8688
"io.jwt.decode_verify" => Ok(self::impls::io::jwt::decode_verify.wrap()),

src/builtins/traits.rs

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -329,4 +329,56 @@ mod tests {
329329
let result = uppercase.call(&mut ctx, &args[..]).await.unwrap();
330330
assert_eq!(result, b"\"HELLO\"");
331331
}
332+
333+
// async fn async_context_uppercase_not_ok<C: EvaluationContext>(
334+
// ctx: &mut C,
335+
// s: String,
336+
// ) -> String {
337+
// s.to_uppercase()
338+
// }
339+
340+
// #[tokio::test]
341+
// async fn builtins_call_async_result_context() {
342+
// let mut ctx = DefaultContext::default();
343+
// let fct = Box::new(WrappedBuiltin {
344+
// func: async_context_uppercase_not_ok,
345+
// _marker: PhantomData,
346+
// });
347+
// let args = [b"\"hello\"" as &[u8]];
348+
// let result = fct.call(&mut ctx, &args[..]).await.unwrap();
349+
// assert_eq!(result, b"\"HELLO\"");
350+
// }
351+
352+
// fn async_context_uppercase_desugar_simple_ok<C: EvaluationContext>(
353+
// ctx: &mut C,
354+
// s: String,
355+
// ) -> impl Future<Output = String> {
356+
// async move { s.to_uppercase() }
357+
// }
358+
359+
// fn async_context_uppercase_desugar_not_ok<C: EvaluationContext>(
360+
// ctx: &mut C,
361+
// s: String,
362+
// ) -> impl Future<Output = String> + use<'_, C> {
363+
// async move {
364+
// let req = http::Request::builder()
365+
// .uri("https://example.com")
366+
// .body(s)
367+
// .unwrap();
368+
// let res = ctx.send_http(req).await.unwrap();
369+
// res.body().to_string()
370+
// }
371+
// }
372+
373+
// #[tokio::test]
374+
// async fn builtins_call_async_result_context_desugar() {
375+
// let mut ctx = DefaultContext::default();
376+
// let fct = Box::new(WrappedBuiltin {
377+
// func: async_context_uppercase_desugar_simple_ok,
378+
// _marker: PhantomData,
379+
// });
380+
// let args = [b"\"hello\"" as &[u8]];
381+
// let result = fct.call(&mut ctx, &args[..]).await.unwrap();
382+
// assert_eq!(result, b"\"HELLO\"");
383+
// }
332384
}

0 commit comments

Comments
 (0)