Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
170 changes: 143 additions & 27 deletions src/builder/analyzer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -255,14 +255,88 @@ fn try_merge_collector_schemas(
schema1: &CollectorSchema,
schema2: &CollectorSchema,
) -> Result<CollectorSchema> {
let fields = try_merge_fields_schemas(&schema1.fields, &schema2.fields)?;
let schema1_fields = &schema1.fields;
let schema2_fields = &schema2.fields;

// Create a map from field name to index in schema1
let field_map: HashMap<FieldName, usize> = schema1_fields
.iter()
.enumerate()
.map(|(i, f)| (f.name.clone(), i))
.collect();

let mut output_fields = Vec::new();
let mut next_field_id_1 = 0;
let mut next_field_id_2 = 0;

for (idx, field) in schema2_fields.iter().enumerate() {
if let Some(&idx1) = field_map.get(&field.name) {
if idx1 < next_field_id_1 {
api_bail!(
"Common fields are expected to have consistent order across different `collect()` calls, but got different orders between fields '{}' and '{}'",
field.name,
schema1_fields[next_field_id_1 - 1].name
);
}
// Add intervening fields from schema1
for i in next_field_id_1..idx1 {
output_fields.push(schema1_fields[i].clone());
}
// Add intervening fields from schema2
for i in next_field_id_2..idx {
output_fields.push(schema2_fields[i].clone());
}
// Merge the field
let merged_type =
try_make_common_value_type(&schema1_fields[idx1].value_type, &field.value_type)?;
output_fields.push(FieldSchema {
name: field.name.clone(),
value_type: merged_type,
description: None,
});
next_field_id_1 = idx1 + 1;
next_field_id_2 = idx + 1;
// Fields not in schema1 and not UUID are added at the end
}
}

// Add remaining fields from schema1
for i in next_field_id_1..schema1_fields.len() {
output_fields.push(schema1_fields[i].clone());
}

// Add remaining fields from schema2
for i in next_field_id_2..schema2_fields.len() {
output_fields.push(schema2_fields[i].clone());
}

// Handle auto_uuid_field_idx
let auto_uuid_field_idx = match (schema1.auto_uuid_field_idx, schema2.auto_uuid_field_idx) {
(Some(idx1), Some(idx2)) => {
let name1 = &schema1_fields[idx1].name;
let name2 = &schema2_fields[idx2].name;
if name1 == name2 {
// Find the position of the auto_uuid field in the merged output
output_fields.iter().position(|f| &f.name == name1)
} else {
api_bail!(
"Generated UUID fields must have the same name across different `collect()` calls, got different names: '{}' vs '{}'",
name1,
name2
);
}
}
(Some(_), None) | (None, Some(_)) => {
api_bail!(
"The generated UUID field, once present for one `collect()`, must be consistently present for other `collect()` calls for the same collector"
);
}
(None, None) => None,
};

Ok(CollectorSchema {
fields,
auto_uuid_field_idx: if schema1.auto_uuid_field_idx == schema2.auto_uuid_field_idx {
schema1.auto_uuid_field_idx
} else {
None
},
fields: output_fields,
auto_uuid_field_idx,
})
}

Expand Down Expand Up @@ -704,11 +778,14 @@ impl AnalyzerContext {
op_scope: &Arc<OpScope>,
reactive_op: &NamedSpec<ReactiveOpSpec>,
) -> Result<BoxFuture<'static, Result<AnalyzedReactiveOp>>> {
let result_fut = match &reactive_op.spec {
let op_scope_clone = op_scope.clone();
let reactive_op_clone = reactive_op.clone();
let reactive_op_name = reactive_op.name.clone();
let result_fut = match reactive_op_clone.spec {
ReactiveOpSpec::Transform(op) => {
let input_field_schemas =
analyze_input_fields(&op.inputs, op_scope).with_context(|| {
format!("Preparing inputs for transform op: {}", reactive_op.name)
format!("Preparing inputs for transform op: {}", reactive_op_name)
})?;
let spec = serde_json::Value::Object(op.op.spec.clone());

Expand All @@ -725,8 +802,8 @@ impl AnalyzerContext {
.with(&output_enriched_type.without_attrs())?;
let output_type = output_enriched_type.typ.clone();
let output =
op_scope.add_op_output(reactive_op.name.clone(), output_enriched_type)?;
let op_name = reactive_op.name.clone();
op_scope.add_op_output(reactive_op_name.clone(), output_enriched_type)?;
let op_name = reactive_op_name.clone();
async move {
trace!("Start building executor for transform op `{op_name}`");
let executor = executor.await.with_context(|| {
Expand Down Expand Up @@ -777,10 +854,10 @@ impl AnalyzerContext {
.lock()
.unwrap()
.sub_scopes
.insert(reactive_op.name.clone(), Arc::new(sub_op_scope_schema));
.insert(reactive_op_name.clone(), Arc::new(sub_op_scope_schema));
analyzed_op_scope_fut
};
let op_name = reactive_op.name.clone();
let op_name = reactive_op_name.clone();

let concur_control_options =
foreach_op.execution_options.get_concur_control_options();
Expand All @@ -800,22 +877,61 @@ impl AnalyzerContext {
}

ReactiveOpSpec::Collect(op) => {
let (struct_mapping, fields_schema) = analyze_struct_mapping(&op.input, op_scope)?;
let (struct_mapping, fields_schema) =
analyze_struct_mapping(&op.input, &op_scope_clone)?;
let has_auto_uuid_field = op.auto_uuid_field.is_some();
let fingerprinter = Fingerprinter::default().with(&fields_schema)?;
let collect_op = AnalyzedReactiveOp::Collect(AnalyzedCollectOp {
name: reactive_op.name.clone(),
has_auto_uuid_field,
input: struct_mapping,
collector_ref: add_collector(
&op.scope_name,
op.collector_name.clone(),
CollectorSchema::from_fields(fields_schema, op.auto_uuid_field.clone()),
op_scope,
)?,
fingerprinter,
});
async move { Ok(collect_op) }.boxed()
let input_field_names: Vec<FieldName> =
fields_schema.iter().map(|f| f.name.clone()).collect();
let collector_ref = add_collector(
&op.scope_name,
op.collector_name.clone(),
CollectorSchema::from_fields(fields_schema, op.auto_uuid_field.clone()),
&op_scope_clone,
)?;
async move {
// Get the merged collector schema after adding
let collector_schema: Arc<CollectorSchema> = {
let scope = find_scope(&op.scope_name, &op_scope_clone)?.1;
let states = scope.states.lock().unwrap();
let collector = states.collectors.get(&op.collector_name).unwrap();
collector.schema.clone()
};

// Pre-compute field index mappings for efficient evaluation
let field_name_to_index: HashMap<&FieldName, usize> = collector_schema
.fields
.iter()
.enumerate()
.map(|(i, f)| (&f.name, i))
.collect();
let mut field_index_mapping: HashMap<usize, usize> = HashMap::new();
for (input_idx, field_name) in input_field_names.iter().enumerate() {
let collector_idx = field_name_to_index
.get(field_name)
.copied()
.ok_or_else(|| {
anyhow!(
"field `{}` not found in merged collector schema",
field_name
)
})?;
field_index_mapping.insert(collector_idx, input_idx);
}

let collect_op = AnalyzedReactiveOp::Collect(AnalyzedCollectOp {
name: reactive_op_name,
has_auto_uuid_field,
input: struct_mapping,
input_field_names,
collector_schema,
collector_ref,
field_index_mapping,
fingerprinter,
});
Ok(collect_op)
}
.boxed()
}
};
Ok(result_fut)
Expand Down
7 changes: 7 additions & 0 deletions src/builder/plan.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
use crate::base::schema::FieldSchema;
use crate::base::spec::FieldName;
use crate::prelude::*;

use std::collections::HashMap;

use crate::ops::interface::*;
use crate::utils::fingerprint::{Fingerprint, Fingerprinter};

Expand Down Expand Up @@ -90,7 +93,11 @@ pub struct AnalyzedCollectOp {
pub name: String,
pub has_auto_uuid_field: bool,
pub input: AnalyzedStructMapping,
pub input_field_names: Vec<FieldName>,
pub collector_schema: Arc<schema::CollectorSchema>,
pub collector_ref: AnalyzedCollectorReference,
/// Pre-computed mapping from collector field index to input field index.
pub field_index_mapping: HashMap<usize, usize>,
/// Fingerprinter of the collector's schema. Used to decide when to reuse auto-generated UUIDs.
pub fingerprinter: Fingerprinter,
}
Expand Down
35 changes: 35 additions & 0 deletions src/execution/evaluator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -515,6 +515,41 @@ async fn evaluate_op_scope(
let collector_entry = scoped_entries
.headn(op.collector_ref.scope_up_level as usize)
.ok_or_else(|| anyhow::anyhow!("Collector level out of bound"))?;

// Assemble input values
let input_values: Vec<value::Value> =
assemble_input_values(&op.input.fields, scoped_entries)
.collect::<Result<Vec<_>>>()?;

// Create field_values vector for all fields in the merged schema
let mut field_values: Vec<value::Value> =
vec![value::Value::Null; op.collector_schema.fields.len()];

// Use pre-computed field index mappings for O(1) field placement
for (&collector_idx, &input_idx) in op.field_index_mapping.iter() {
field_values[collector_idx] = input_values[input_idx].clone();
}

// Handle auto_uuid_field (assumed to be at position 0 for efficiency)
if op.has_auto_uuid_field {
if let Some(uuid_idx) = op.collector_schema.auto_uuid_field_idx {
let uuid = memory.next_uuid(
op.fingerprinter
.clone()
.with(
&field_values
.iter()
.enumerate()
.filter(|(i, _)| *i != uuid_idx)
.map(|(_, v)| v)
.collect::<Vec<_>>(),
)?
.into_fingerprint(),
)?;
field_values[uuid_idx] = value::Value::Basic(value::BasicValue::Uuid(uuid));
}
}

{
let mut collected_records = collector_entry.collected_values
[op.collector_ref.local.collector_idx as usize]
Expand Down
Loading