diff --git a/src/compiler/expression/function_call.rs b/src/compiler/expression/function_call.rs index 5e3a9db68f..1305b2ecfc 100644 --- a/src/compiler/expression/function_call.rs +++ b/src/compiler/expression/function_call.rs @@ -306,17 +306,41 @@ impl<'a> Builder<'a> { // // We set "bar" (index 0) to return bytes, and "baz" (index 1) to return an // integer. + // + // If one of the arguments is dependant on a closure, the parameters initial + // type is assigned here and it's recomputed from the compiled closure. for (index, input_var) in input.variables.clone().into_iter().enumerate() { let call_ident = &variables[index]; let type_def = target.type_info(state).result; - let (type_def, value) = match input_var.kind { + let mut var_kind = input_var.kind; + if let VariableKind::Closure(vk) = var_kind { + var_kind = vk.into(); + } + + let (type_def, value) = match var_kind { + // A closure variable kind is not possible here but we need to + // satisfy all variants with a match arm. + VariableKind::Closure(_) => { + panic!("got unexpected variable kind") + } + + // The variable kind is expected to be equal to the kind of a + // specified parameter of the closure. + VariableKind::Parameter(keyword) => { + let expr = list + .arguments + .get(keyword) + .expect("parameter should exist"); + (expr.type_info(state).result, expr.resolve_constant(state)) + } + // The variable kind is expected to be exactly // the kind provided by the closure definition. VariableKind::Exact(kind) => (kind.into(), None), // The variable kind is expected to be equal to - // the ind of the target of the closure. + // the kind of the target of the closure. VariableKind::Target => ( target.type_info(state).result, target.resolve_constant(state), @@ -500,7 +524,36 @@ impl<'a> Builder<'a> { // TODO: This assumes the closure will run exactly once, which is incorrect. // see: https://github.com/vectordotdev/vector/issues/13782 - let block = closure_block.expect("closure must contain block"); + let (block_span, (block, mut block_type_def)) = + closure_block.expect("closure must contain block").take(); + + let mut closure_dependent_variables = input + .variables + .iter() + .enumerate() + .filter_map(|(index, input_var)| match input_var.kind { + VariableKind::Closure(_) => Some(&variables[index]), + _ => None, + }) + .peekable(); + + // If any of the arugments are dependant on the closure, union the initial type with the + // returned type of the closure and then recompute the closures type. + if closure_dependent_variables.peek().is_some() { + let block_kind = block_type_def + .kind() + .union(block_type_def.returns().clone()); + + closure_dependent_variables.for_each(|ident| { + let details = state + .local + .variable_mut(ident) + .expect("state must contain a closure dependant argument"); + details.type_def = details.type_def.kind().union(block_kind.clone()).into(); + }); + + block_type_def = block.apply_type_info(state); + } // At this point, we've compiled the block, so we can remove the // closure variables from the compiler's local environment. @@ -513,8 +566,6 @@ impl<'a> Builder<'a> { } }); - let (block_span, (block, block_type_def)) = block.take(); - let closure_fallible = block_type_def.is_fallible(); // Check the type definition of the resulting block.This needs to match diff --git a/src/compiler/function/closure.rs b/src/compiler/function/closure.rs index 93384938fd..98359b516b 100644 --- a/src/compiler/function/closure.rs +++ b/src/compiler/function/closure.rs @@ -1,7 +1,6 @@ use std::collections::BTreeMap; use crate::compiler::{ - state::RuntimeState, value::{Kind, VrlValueConvert}, Context, ExpressionError, }; @@ -80,6 +79,16 @@ pub enum VariableKind { /// variable the closure takes will be a `Kind::bytes()`. Exact(Kind), + /// The variable [`Kind`] is inferred from a parameter of the closure. + /// + /// For example, `VariableKind::Parameter('initial')` is equivalent to `VariableKind::Target` + /// where the [`Input`] `parameter_keyword` is "initial". + Parameter(&'static str), + + /// The variable [`Kind`] is inferred from the closure's output. The inner + /// value is the type used initially when determining the closure's output. + Closure(ClosureInitialKind), + /// The variable [`Kind`] is inferred from the target of the closure. Target, @@ -94,6 +103,38 @@ pub enum VariableKind { TargetInnerKey, } +impl From for VariableKind { + fn from(initial_kind: ClosureInitialKind) -> Self { + match initial_kind { + ClosureInitialKind::Exact(kind) => VariableKind::Exact(kind), + ClosureInitialKind::Parameter(parameter) => VariableKind::Parameter(parameter), + ClosureInitialKind::Target => VariableKind::Target, + ClosureInitialKind::TargetInnerValue => VariableKind::TargetInnerValue, + ClosureInitialKind::TargetInnerKey => VariableKind::TargetInnerKey, + } + } +} + +/// If a [`Variable`] is inferring its [`Value`] kind from a closure (see +/// [`VariableKind::Closure`]), this is the initial value used for the variable. +#[derive(Debug, Clone)] +pub enum ClosureInitialKind { + /// Equivalent to [`VariableKind::Exact`] + Exact(Kind), + + /// Equivalent to [`VariableKind::Parameter`] + Parameter(&'static str), + + /// Equivalent to [`VariableKind::Target`] + Target, + + /// Equivalent to [`VariableKind::TargetInnerValue`] + TargetInnerValue, + + /// Equivalent to [`VariableKind::TargetInnerKey`] + TargetInnerKey, +} + /// The output type required by the closure block. #[derive(Debug, Clone)] pub enum Output { @@ -141,9 +182,124 @@ impl Output { } } +enum SwapSpace<'a> { + Owned(Vec>), + Borrowed(&'a mut [Option]), +} + +impl<'a> SwapSpace<'a> { + fn as_mut_slice(&mut self) -> &mut [Option] { + match self { + SwapSpace::Owned(v) => v.as_mut_slice(), + SwapSpace::Borrowed(s) => s, + } + } +} + +#[must_use] +pub struct FluentRunnerInterface<'a, 'b, T> { + parent: &'a FluentRunner<'a, T>, + swap_space: SwapSpace<'b>, +} + +impl<'a, 'b, T> FluentRunnerInterface<'a, 'b, T> +where + T: Fn(&mut Context) -> Result, +{ + fn new(parent: &'a FluentRunner<'a, T>, swap_space: Option<&'b mut [Option]>) -> Self { + let swap_space = if let Some(s) = swap_space { + SwapSpace::Borrowed(s) + } else { + let mut swap_space = Vec::new(); + swap_space.resize_with(parent.variables.len(), Default::default); + SwapSpace::Owned(swap_space) + }; + + Self { parent, swap_space } + } + + /// Adds a new parameter to the runner. The `index` corresponds with the index of the supplied + /// variables. + pub fn parameter(mut self, ctx: &mut Context, index: usize, value: Value) -> Self { + self.parent + .parameter(self.swap_space.as_mut_slice(), ctx, index, value); + self + } + + /// Run the closure to completion, given the supplied parameters, and the runtime context. + pub fn run(mut self, ctx: &mut Context) -> Result { + self.parent.run(self.swap_space.as_mut_slice(), ctx) + } +} + +pub struct FluentRunner<'a, T> { + variables: &'a [Ident], + runner: T, +} + +impl<'a, T> FluentRunner<'a, T> +where + T: Fn(&mut Context) -> Result, +{ + pub fn new(variables: &'a [Ident], runner: T) -> Self { + Self { variables, runner } + } + + /// Creates a new [`FluentRunnerInterface`] with a temporary swap space equal in size to the + /// number of provided variables. + /// + /// This is useful when a closure is expected to only run once. + pub fn with_tmp_swap_space(&'a self) -> FluentRunnerInterface<'a, 'a, T> { + FluentRunnerInterface::new(self, None) + } + + /// Creates a new [`FluentRunnerInterface`] with a supplied swap space. + /// + /// This is useful for repeating closures that need the same sized swap space. + pub fn with_swap_space<'b>( + &'a self, + swap_space: &'b mut [Option], + ) -> FluentRunnerInterface<'a, 'b, T> { + FluentRunnerInterface::new(self, Some(swap_space)) + } + + fn parameter( + &self, + swap_space: &mut [Option], + ctx: &mut Context, + index: usize, + value: Value, + ) { + let ident = self.variables.get(index).filter(|i| !i.is_empty()).cloned(); + + if let Some(swap) = swap_space.get_mut(index) { + *swap = ident.and_then(|ident| ctx.state_mut().swap_variable(ident, value)) + } + } + + fn run( + &self, + swap_space: &mut [Option], + ctx: &mut Context, + ) -> Result { + let value = (self.runner)(ctx)?; + let state = ctx.state_mut(); + + for (old_value, ident) in swap_space.iter().zip(self.variables) { + match old_value { + Some(value) => { + state.insert_variable(ident.clone(), value.clone()); + } + None => state.remove_variable(ident), + } + } + + Ok(value) + } +} + pub struct Runner<'a, T> { - pub(crate) variables: &'a [Ident], - pub(crate) runner: T, + inner_runner: FluentRunner<'a, T>, } impl<'a, T> Runner<'a, T> @@ -151,7 +307,8 @@ where T: Fn(&mut Context) -> Result, { pub fn new(variables: &'a [Ident], runner: T) -> Self { - Self { variables, runner } + let inner_runner = FluentRunner::new(variables, runner); + Self { inner_runner } } /// Run the closure to completion, given the provided key/value pair, and @@ -167,26 +324,20 @@ where ) -> Result { // TODO: we need to allow `LocalEnv` to take a mutable reference to // values, instead of owning them. - let cloned_key = key.to_owned(); - let cloned_value = value.clone(); - - let key_ident = self.ident(0); - let value_ident = self.ident(1); - - let old_key = insert(ctx.state_mut(), key_ident, cloned_key.into()); - let old_value = insert(ctx.state_mut(), value_ident, cloned_value); - - let result = match (self.runner)(ctx) { + let mut swap_space: [Option; 2] = [None, None]; + + let result = match self + .inner_runner + .with_swap_space(&mut swap_space) + .parameter(ctx, 0, key.to_owned().into()) + .parameter(ctx, 1, value.clone()) + .run(ctx) + { Ok(value) | Err(ExpressionError::Return { value, .. }) => Ok(value), err @ Err(_) => err, }; - let value = result?; - - cleanup(ctx.state_mut(), key_ident, old_key); - cleanup(ctx.state_mut(), value_ident, old_value); - - Ok(value) + result } /// Run the closure to completion, given the provided index/value pair, and @@ -202,20 +353,13 @@ where ) -> Result { // TODO: we need to allow `LocalEnv` to take a mutable reference to // values, instead of owning them. - let cloned_value = value.clone(); - - let index_ident = self.ident(0); - let value_ident = self.ident(1); - - let old_index = insert(ctx.state_mut(), index_ident, index.into()); - let old_value = insert(ctx.state_mut(), value_ident, cloned_value); - - let value = (self.runner)(ctx)?; - - cleanup(ctx.state_mut(), index_ident, old_index); - cleanup(ctx.state_mut(), value_ident, old_value); + let mut swap_space: [Option; 2] = [None, None]; - Ok(value) + self.inner_runner + .with_swap_space(&mut swap_space) + .parameter(ctx, 0, index.into()) + .parameter(ctx, 1, value.clone()) + .run(ctx) } /// Run the closure to completion, given the provided key, and the runtime @@ -228,13 +372,15 @@ where pub fn map_key(&self, ctx: &mut Context, key: &mut KeyString) -> Result<(), ExpressionError> { // TODO: we need to allow `LocalEnv` to take a mutable reference to // values, instead of owning them. - let cloned_key = key.clone(); - let ident = self.ident(0); - let old_key = insert(ctx.state_mut(), ident, cloned_key.into()); - - *key = (self.runner)(ctx)?.try_bytes_utf8_lossy()?.into(); + let mut swap_space: [Option; 1] = [None]; - cleanup(ctx.state_mut(), ident, old_key); + *key = self + .inner_runner + .with_swap_space(&mut swap_space) + .parameter(ctx, 0, key.clone().into()) + .run(ctx)? + .try_bytes_utf8_lossy()? + .into(); Ok(()) } @@ -249,34 +395,14 @@ where pub fn map_value(&self, ctx: &mut Context, value: &mut Value) -> Result<(), ExpressionError> { // TODO: we need to allow `LocalEnv` to take a mutable reference to // values, instead of owning them. - let cloned_value = value.clone(); - let ident = self.ident(0); - let old_value = insert(ctx.state_mut(), ident, cloned_value); - - *value = (self.runner)(ctx)?; + let mut swap_space: [Option; 1] = [None]; - cleanup(ctx.state_mut(), ident, old_value); + *value = self + .inner_runner + .with_swap_space(&mut swap_space) + .parameter(ctx, 0, value.clone()) + .run(ctx)?; Ok(()) } - - fn ident(&self, index: usize) -> Option<&Ident> { - self.variables - .get(index) - .and_then(|v| (!v.is_empty()).then_some(v)) - } -} - -fn insert(state: &mut RuntimeState, ident: Option<&Ident>, data: Value) -> Option { - ident.and_then(|ident| state.swap_variable(ident.clone(), data)) -} - -fn cleanup(state: &mut RuntimeState, ident: Option<&Ident>, data: Option) { - match (ident, data) { - (Some(ident), Some(value)) => { - state.insert_variable(ident.clone(), value); - } - (Some(ident), None) => state.remove_variable(ident), - _ => {} - } } diff --git a/src/compiler/state.rs b/src/compiler/state.rs index 33fe7bab46..db3246f0d3 100644 --- a/src/compiler/state.rs +++ b/src/compiler/state.rs @@ -62,6 +62,10 @@ impl LocalEnv { self.bindings.get(ident) } + pub(crate) fn variable_mut(&mut self, ident: &Ident) -> Option<&mut Details> { + self.bindings.get_mut(ident) + } + pub(crate) fn insert_variable(&mut self, ident: Ident, details: Details) { self.bindings.insert(ident, details); } diff --git a/src/stdlib/fold.rs b/src/stdlib/fold.rs new file mode 100644 index 0000000000..aa612348c5 --- /dev/null +++ b/src/stdlib/fold.rs @@ -0,0 +1,156 @@ +use crate::compiler::prelude::*; + +fn fold( + value: Value, + initial_value: Value, + ctx: &mut Context, + runner: closure::FluentRunner, +) -> Resolved +where + T: Fn(&mut Context) -> Resolved, +{ + let mut swap_space: [Option; 3] = [None, None, None]; + match value { + Value::Object(object) => { + object + .into_iter() + .try_fold(initial_value, |accum, (key, value)| { + runner + .with_swap_space(&mut swap_space) + .parameter(ctx, 0, accum) + .parameter(ctx, 1, key.into()) + .parameter(ctx, 2, value) + .run(ctx) + }) + } + + Value::Array(array) => { + array + .into_iter() + .enumerate() + .try_fold(initial_value, |accum, (index, value)| { + runner + .with_swap_space(&mut swap_space) + .parameter(ctx, 0, accum) + .parameter(ctx, 1, index.into()) + .parameter(ctx, 2, value) + .run(ctx) + }) + } + + _ => Err("function requires collection types as input".into()), + } +} + +#[derive(Clone, Copy, Debug)] +pub struct Fold; + +impl Function for Fold { + fn identifier(&self) -> &'static str { + "fold" + } + + fn parameters(&self) -> &'static [Parameter] { + &[ + Parameter { + keyword: "value", + kind: kind::OBJECT | kind::ARRAY, + required: true, + }, + Parameter { + keyword: "initial_value", + kind: kind::ANY, + required: true, + }, + ] + } + + fn examples(&self) -> &'static [Example] { + &[ + Example { + title: "fold array", + source: r"fold([false, false, true], false) -> |accum, _index, value| { value && accum }", + result: Ok("false"), + }, + Example { + title: "fold object", + source: r#"fold({"first_key": 0, "second_key": 1}, 0) -> |accum, key, _value| { strlen(key) + accum }"#, + result: Ok("19"), + }, + ] + } + + fn compile( + &self, + _state: &state::TypeState, + _ctx: &mut FunctionCompileContext, + arguments: ArgumentList, + ) -> Compiled { + let value = arguments.required("value"); + let initial_value = arguments.required("initial_value"); + let closure = arguments.required_closure()?; + + Ok(FoldFn { + value, + initial_value, + closure, + } + .as_expr()) + } + + fn closure(&self) -> Option { + use closure::{ClosureInitialKind, Definition, Input, Output, Variable, VariableKind}; + + Some(Definition { + inputs: vec![Input { + parameter_keyword: "value", + kind: Kind::object(Collection::any()).or_array(Collection::any()), + variables: vec![ + Variable { + kind: VariableKind::Closure(ClosureInitialKind::Parameter("initial_value")), + }, + Variable { + kind: VariableKind::TargetInnerKey, + }, + Variable { + kind: VariableKind::TargetInnerValue, + }, + ], + output: Output::Kind(Kind::any()), + example: Example { + title: "fold array", + source: r"fold([15, 40, 35], 20) -> |accum, _index, value| { if value > accum { value } else { accum } }", + result: Ok(r"40"), + }, + }], + is_iterator: true, + }) + } +} + +#[derive(Debug, Clone)] +struct FoldFn { + value: Box, + initial_value: Box, + closure: FunctionClosure, +} + +impl FunctionExpression for FoldFn { + fn resolve(&self, ctx: &mut Context) -> ExpressionResult { + let value = self.value.resolve(ctx)?; + let initial_value = self.initial_value.resolve(ctx)?; + + let FunctionClosure { + variables, + block, + block_type_def: _, + } = &self.closure; + let runner = closure::FluentRunner::new(variables, |ctx| block.resolve(ctx)); + + fold(value, initial_value, ctx, runner) + } + + fn type_def(&self, _ctx: &state::TypeState) -> TypeDef { + self.closure.block_type_def.kind().clone().into() + } +} diff --git a/src/stdlib/mod.rs b/src/stdlib/mod.rs index 8bbe20d791..f1c2396e54 100644 --- a/src/stdlib/mod.rs +++ b/src/stdlib/mod.rs @@ -84,6 +84,7 @@ cfg_if::cfg_if! { mod flatten; mod float; mod floor; + mod fold; mod for_each; mod format_int; mod format_number; @@ -270,6 +271,7 @@ cfg_if::cfg_if! { pub use flatten::Flatten; pub use float::Float; pub use floor::Floor; + pub use fold::Fold; pub use for_each::ForEach; pub use format_int::FormatInt; pub use format_number::FormatNumber; @@ -456,6 +458,7 @@ pub fn all() -> Vec> { Box::new(Flatten), Box::new(Float), Box::new(Floor), + Box::new(Fold), Box::new(ForEach), Box::new(FormatInt), Box::new(FormatNumber),