Skip to content

Implements http.send builtin #218

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,10 @@ sha1 = { version = "0.10", optional = true }
sha2 = { version = "0.10", optional = true }
sprintf = { version = ">=0.3, <0.5", optional = true }
parse-size = { version = "1", features = ["std"], optional = true }
serde_yaml = { version = "0.9.1", optional = true }
serde_yml = { version = "0.0.12", optional = true }
form_urlencoded = { version = "1", optional = true }
http = { version = "1", optional = true }
reqwest = { version = "0.12", features = ["json"], optional = true }
urlencoding = { version = "2", optional = true }
chrono = { version = "0.4.31", optional = true, default-features = false, features = [
"std",
Expand All @@ -69,6 +71,7 @@ wasmtime = { version = ">=22, <32", default-features = false, features = [
"cranelift",
] }
insta = { version = "1", features = ["yaml"] }
httpmock = "0.7"

[build-dependencies]
# We would like at least this version of rayon, because older versions depend on old rand,
Expand Down Expand Up @@ -103,6 +106,7 @@ fast = ["wasmtime/cranelift", "wasmtime/parallel-compilation"]

rng = ["dep:rand"]
time = ["dep:chrono"]
http = ["dep:http", "dep:reqwest"]

base64url-builtins = ["dep:base64", "dep:hex"]
crypto-digest-builtins = ["dep:digest", "dep:hex"]
Expand All @@ -116,9 +120,10 @@ sprintf-builtins = ["dep:sprintf"]
json-builtins = ["dep:json-patch"]
units-builtins = ["dep:parse-size"]
rand-builtins = ["rng"]
yaml-builtins = ["dep:serde_yaml"]
yaml-builtins = ["dep:serde_yml"]
urlquery-builtins = ["dep:form_urlencoded", "dep:urlencoding"]
time-builtins = ["time", "dep:chrono-tz", "dep:duration-str", "dep:chronoutil"]
http-builtins = ["http", "dep:serde_yml"]

all-crypto-builtins = [
"crypto-digest-builtins",
Expand All @@ -140,6 +145,7 @@ all-builtins = [
"yaml-builtins",
"urlquery-builtins",
"time-builtins",
"http-builtins",
]

[[test]]
Expand Down
2 changes: 2 additions & 0 deletions features.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
loader
cli
rng
http
base64url-builtins
crypto-digest-builtins crypto-md5-builtins
crypto-digest-builtins crypto-sha1-builtins
Expand All @@ -17,5 +18,6 @@ units-builtins
rand-builtins
yaml-builtins
time-builtins
http-builtins
all-crypto-builtins
all-builtins
216 changes: 212 additions & 4 deletions src/builtins/impls/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,218 @@

//! Builtins used to make HTTP request

use anyhow::{bail, Result};
use std::{collections::HashMap, future::Future, pin::Pin, time::Duration};

use anyhow::{Context, Result};
use http;
use serde_json::{self, Map};
use serde_yml;
use tokio::time::sleep;

use crate::{builtins::traits::Builtin, EvaluationContext};

/// This builtin is needed because the wrapper in traits.rs doesn't work when
/// dealing with async+context.
pub struct HttpSendBuiltin {}

impl<C: 'static> Builtin<C> for HttpSendBuiltin
where
C: EvaluationContext,
{
fn call<'a>(
&'a self,
context: &'a mut C,
args: &'a [&'a [u8]],
) -> Pin<Box<dyn Future<Output = Result<Vec<u8>, anyhow::Error>> + Send + 'a>> {
Box::pin(async move {
let [opa_req]: [&'a [u8]; 1] = args.try_into().ok().context("invalid arguments")?;

let opa_req: serde_json::Value = serde_json::from_slice(opa_req)
.context(concat!("failed to convert opa_req argument"))?;

let res = send(context, opa_req).await?;
let res = serde_json::to_vec(&res).context("could not serialize result")?;
Ok(res)
})
}
}

/// Returns a HTTP response to the given HTTP request.
#[tracing::instrument(name = "http.send", err)]
pub fn send(request: serde_json::Value) -> Result<serde_json::Value> {
bail!("not implemented");
///
/// Wraps [`internal_send`] to add error handling regarding the `raise_error`
/// field in the OPA request.
#[tracing::instrument(name = "http.send", skip(ctx), err)]
pub async fn send<C: EvaluationContext>(
ctx: &mut C,
opa_req: serde_json::Value,
) -> Result<serde_json::Value> {
let raise_error = opa_req
.get("raise_error")
.and_then(serde_json::Value::as_bool)
.unwrap_or(true);

match internal_send(ctx, opa_req).await {
Ok(resp) => Ok(resp),
Err(e) => {
if raise_error {
Err(e)
} else {
Ok(serde_json::json!({
"status_code": 0,
"error": {
"message": e.to_string(),
},
}))
}
}
}
}

/// Sends a HTTP request and returns the response.
async fn internal_send<C: EvaluationContext>(
ctx: &mut C,
opa_req: serde_json::Value,
) -> Result<serde_json::Value> {
let opa_req = opa_req
.as_object()
.ok_or(anyhow::anyhow!("request must be a JSON object"))?;

let http_req = convert_opa_req_to_http_req(opa_req)?;

let timeout_value = opa_req.get("timeout");

let timeout = if let Some(timeout_value) = timeout_value {
if let Some(timeout_nanos) = timeout_value.as_u64() {
Some(Duration::from_nanos(timeout_nanos))
} else if let Some(timeout_str) = timeout_value.as_str() {
duration_str::parse(timeout_str).ok()
} else {
None
}
} else {
None
};

let enable_redirect = opa_req
.get("enable_redirect")
.and_then(serde_json::Value::as_bool);

let max_retry_attempts = opa_req
.get("max_retry_attempts")
.and_then(serde_json::Value::as_u64)
.unwrap_or(0);

let mut http_resp_res: Result<http::Response<String>> =
Err(anyhow::anyhow!("This shouldnt happen"));

for attempt in 0..=max_retry_attempts {
http_resp_res = ctx
.send_http(http_req.clone(), timeout, enable_redirect)
.await;
if http_resp_res.is_ok() {
break;
}
if max_retry_attempts > 0 {
#[allow(clippy::cast_possible_truncation)]
sleep(Duration::from_millis(500 * 2_u64.pow(attempt as u32))).await;
}
}

match http_resp_res {
Ok(http_resp) => {
let force_json_decode = opa_req
.get("force_json_decode")
.and_then(serde_json::Value::as_bool)
.unwrap_or(false);
let force_yaml_decode = opa_req
.get("force_yaml_decode")
.and_then(serde_json::Value::as_bool)
.unwrap_or(false);

Ok(convert_http_resp_to_opa_resp(
http_resp,
force_json_decode,
force_yaml_decode,
))
}
Err(e) => Err(e),
}
}

/// Converts an OPA request to an HTTP request.
fn convert_opa_req_to_http_req(
opa_req: &Map<String, serde_json::Value>,
) -> Result<http::Request<String>> {
let url = opa_req
.get("url")
.ok_or(anyhow::anyhow!("missing url"))?
.as_str()
.ok_or(anyhow::anyhow!("url must be a string"))?;
let method = opa_req
.get("method")
.ok_or(anyhow::anyhow!("missing method"))?
.as_str()
.map(str::to_uppercase)
.ok_or(anyhow::anyhow!("method must be a string"))?;
let headers = opa_req.get("headers").and_then(|v| v.as_object());

let mut req_builder = http::Request::builder().method(method.as_str()).uri(url);
if let Some(headers) = headers {
for (key, value) in headers {
req_builder = req_builder.header(key, value.to_string());
}
}

let json_req_body = opa_req.get("body");
let http_req = if let Some(json_req_body) = json_req_body {
req_builder.body(json_req_body.to_string())?
} else {
let raw_req_body = opa_req
.get("raw_body")
.map(std::string::ToString::to_string);
req_builder.body(raw_req_body.unwrap_or_default())?
};

Ok(http_req)
}

/// Converts an HTTP response to an OPA response.
fn convert_http_resp_to_opa_resp(
response: http::Response<String>,
force_json_decode: bool,
force_yaml_decode: bool,
) -> serde_json::Value {
let response_headers = response
.headers()
.iter()
.map(|(k, v)| (k.as_str(), v.to_str().unwrap_or("")))
.collect::<HashMap<&str, &str>>();

let mut opa_resp = serde_json::json!({
"status_code": response.status().as_u16(),
"headers": response_headers,
});

let raw_resp_body: &String = response.body();
opa_resp["raw_body"] = serde_json::Value::String(raw_resp_body.clone());

let content_type = response
.headers()
.get("content-type")
.map(|v| v.to_str().unwrap_or_default());

if force_json_decode || content_type == Some("application/json") {
if let Ok(parsed_body) = serde_json::from_str::<serde_json::Value>(raw_resp_body) {
opa_resp["body"] = parsed_body;
}
} else if force_yaml_decode
|| content_type == Some("application/yaml")
|| content_type == Some("application/x-yaml")
{
if let Ok(parsed_body) = serde_yml::from_str::<serde_json::Value>(raw_resp_body) {
opa_resp["body"] = parsed_body;
}
}

opa_resp
}
1 change: 1 addition & 0 deletions src/builtins/impls/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ pub mod graph;
pub mod graphql;
#[cfg(feature = "hex-builtins")]
pub mod hex;
#[cfg(feature = "http-builtins")]
pub mod http;
pub mod io;
#[cfg(feature = "json-builtins")]
Expand Down
2 changes: 1 addition & 1 deletion src/builtins/impls/rand.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ pub fn intn<C: EvaluationContext>(ctx: &mut C, str: String, n: i64) -> Result<i6
let cache_key = ("rand", str, n);
if let Some(v) = ctx.cache_get(&cache_key)? {
return Ok(v);
};
}

let mut rng = ctx.get_rng();
let val = rng.gen_range(0..n);
Expand Down
10 changes: 5 additions & 5 deletions src/builtins/impls/yaml.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,25 +15,25 @@
//! Builtins parse and serialize YAML documents

use anyhow::Result;
use serde_yaml;
use serde_yml;

/// Verifies the input string is a valid YAML document.
#[tracing::instrument(name = "yaml.is_valid")]
pub fn is_valid(x: String) -> bool {
let parse: Result<serde_yaml::Value, _> = serde_yaml::from_str(&x);
let parse: Result<serde_yml::Value, _> = serde_yml::from_str(&x);
parse.is_ok()
}

/// Serializes the input term to YAML.
#[tracing::instrument(name = "yaml.marshal", err)]
pub fn marshal(x: serde_yaml::Value) -> Result<String> {
let parse: String = serde_yaml::to_string(&x)?;
pub fn marshal(x: serde_yml::Value) -> Result<String> {
let parse: String = serde_yml::to_string(&x)?;
Ok(parse)
}

/// Deserializes the input string.
#[tracing::instrument(name = "yaml.unmarshal", err)]
pub fn unmarshal(x: String) -> Result<serde_json::Value> {
let parse: serde_json::Value = serde_yaml::from_str(&x)?;
let parse: serde_json::Value = serde_yml::from_str(&x)?;
Ok(parse)
}
4 changes: 3 additions & 1 deletion src/builtins/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,9 @@ pub fn resolve<C: EvaluationContext>(name: &str) -> Result<Box<dyn Builtin<C>>>
#[cfg(feature = "hex-builtins")]
"hex.encode" => Ok(self::impls::hex::encode.wrap()),

"http.send" => Ok(self::impls::http::send.wrap()),
#[cfg(feature = "http-builtins")]
"http.send" => Ok(Box::new(self::impls::http::HttpSendBuiltin {})),

"indexof_n" => Ok(self::impls::indexof_n.wrap()),
"io.jwt.decode" => Ok(self::impls::io::jwt::decode.wrap()),
"io.jwt.decode_verify" => Ok(self::impls::io::jwt::decode_verify.wrap()),
Expand Down
Loading