Skip to content

Commit 77d959d

Browse files
committed
feat: collector automatically merge and align multiple collect() called with different schema
1 parent 4577f0c commit 77d959d

File tree

3 files changed

+95
-29
lines changed

3 files changed

+95
-29
lines changed

src/builder/analyzer.rs

Lines changed: 65 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -255,14 +255,54 @@ fn try_merge_collector_schemas(
255255
schema1: &CollectorSchema,
256256
schema2: &CollectorSchema,
257257
) -> Result<CollectorSchema> {
258-
let fields = try_merge_fields_schemas(&schema1.fields, &schema2.fields)?;
258+
// Union all fields from both schemas
259+
let mut field_map: HashMap<FieldName, EnrichedValueType> = HashMap::new();
260+
261+
// Add fields from schema1
262+
for field in &schema1.fields {
263+
field_map.insert(field.name.clone(), field.value_type.clone());
264+
}
265+
266+
// Merge fields from schema2
267+
for field in &schema2.fields {
268+
if let Some(existing_type) = field_map.get(&field.name) {
269+
// Try to merge types if they differ
270+
let merged_type = try_make_common_value_type(existing_type, &field.value_type)?;
271+
field_map.insert(field.name.clone(), merged_type);
272+
} else {
273+
field_map.insert(field.name.clone(), field.value_type.clone());
274+
}
275+
}
276+
277+
// Sort fields by name for consistent ordering
278+
let mut fields: Vec<FieldSchema> = field_map
279+
.into_iter()
280+
.map(|(name, value_type)| FieldSchema {
281+
name,
282+
value_type,
283+
description: None,
284+
})
285+
.collect();
286+
fields.sort_by(|a, b| a.name.cmp(&b.name));
287+
288+
// Handle auto_uuid_field_idx
289+
let auto_uuid_field_idx = match (schema1.auto_uuid_field_idx, schema2.auto_uuid_field_idx) {
290+
(Some(idx1), Some(idx2)) => {
291+
let name1 = &schema1.fields[idx1].name;
292+
let name2 = &schema2.fields[idx2].name;
293+
if name1 == name2 {
294+
// Find the new index of the auto_uuid field
295+
fields.iter().position(|f| &f.name == name1)
296+
} else {
297+
None // Different auto_uuid fields, disable
298+
}
299+
}
300+
_ => None, // If either doesn't have it, or both don't, disable
301+
};
302+
259303
Ok(CollectorSchema {
260304
fields,
261-
auto_uuid_field_idx: if schema1.auto_uuid_field_idx == schema2.auto_uuid_field_idx {
262-
schema1.auto_uuid_field_idx
263-
} else {
264-
None
265-
},
305+
auto_uuid_field_idx,
266306
})
267307
}
268308

@@ -803,16 +843,29 @@ impl AnalyzerContext {
803843
let (struct_mapping, fields_schema) = analyze_struct_mapping(&op.input, op_scope)?;
804844
let has_auto_uuid_field = op.auto_uuid_field.is_some();
805845
let fingerprinter = Fingerprinter::default().with(&fields_schema)?;
846+
let input_field_names = fields_schema.iter().map(|f| f.name.clone()).collect();
847+
let collector_ref = add_collector(
848+
&op.scope_name,
849+
op.collector_name.clone(),
850+
CollectorSchema::from_fields(fields_schema, op.auto_uuid_field.clone()),
851+
op_scope,
852+
)?;
853+
// Get the merged collector schema after adding
854+
let collector_schema: Arc<CollectorSchema> = {
855+
let scope = find_scope(&op.scope_name, op_scope)?.1;
856+
let states = scope.states.lock().unwrap();
857+
let collector = states.collectors
858+
.get(&op.collector_name)
859+
.unwrap();
860+
collector.schema.clone()
861+
};
806862
let collect_op = AnalyzedReactiveOp::Collect(AnalyzedCollectOp {
807863
name: reactive_op.name.clone(),
808864
has_auto_uuid_field,
809865
input: struct_mapping,
810-
collector_ref: add_collector(
811-
&op.scope_name,
812-
op.collector_name.clone(),
813-
CollectorSchema::from_fields(fields_schema, op.auto_uuid_field.clone()),
814-
op_scope,
815-
)?,
866+
input_field_names,
867+
collector_schema,
868+
collector_ref,
816869
fingerprinter,
817870
});
818871
async move { Ok(collect_op) }.boxed()

src/builder/plan.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
use crate::base::schema::FieldSchema;
2+
use crate::base::spec::FieldName;
23
use crate::prelude::*;
34

45
use crate::ops::interface::*;
@@ -90,6 +91,8 @@ pub struct AnalyzedCollectOp {
9091
pub name: String,
9192
pub has_auto_uuid_field: bool,
9293
pub input: AnalyzedStructMapping,
94+
pub input_field_names: Vec<FieldName>,
95+
pub collector_schema: Arc<schema::CollectorSchema>,
9396
pub collector_ref: AnalyzedCollectorReference,
9497
/// Fingerprinter of the collector's schema. Used to decide when to reuse auto-generated UUIDs.
9598
pub fingerprinter: Fingerprinter,

src/execution/evaluator.rs

Lines changed: 27 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -483,26 +483,36 @@ async fn evaluate_op_scope(
483483
}
484484

485485
AnalyzedReactiveOp::Collect(op) => {
486-
let mut field_values = Vec::with_capacity(
487-
op.input.fields.len() + if op.has_auto_uuid_field { 1 } else { 0 },
488-
);
489-
let field_values_iter = assemble_input_values(&op.input.fields, scoped_entries);
490-
if op.has_auto_uuid_field {
491-
field_values.push(value::Value::Null);
492-
field_values.extend(field_values_iter);
493-
let uuid = memory.next_uuid(
494-
op.fingerprinter
495-
.clone()
496-
.with(&field_values[1..])?
497-
.into_fingerprint(),
498-
)?;
499-
field_values[0] = value::Value::Basic(value::BasicValue::Uuid(uuid));
500-
} else {
501-
field_values.extend(field_values_iter);
502-
};
503486
let collector_entry = scoped_entries
504487
.headn(op.collector_ref.scope_up_level as usize)
505488
.ok_or_else(|| anyhow::anyhow!("Collector level out of bound"))?;
489+
490+
// Assemble input values
491+
let input_values: Vec<value::Value> = assemble_input_values(&op.input.fields, scoped_entries).collect();
492+
493+
// Create field_values vector for all fields in the merged schema
494+
let mut field_values: Vec<value::Value> = vec![value::Value::Null; op.collector_schema.fields.len()];
495+
496+
// Map input fields to their positions in the merged schema
497+
for (i, field_name) in op.input_field_names.iter().enumerate() {
498+
if let Some(pos) = op.collector_schema.fields.iter().position(|f| &f.name == field_name) {
499+
field_values[pos] = input_values[i].clone();
500+
}
501+
}
502+
503+
// Handle auto_uuid_field
504+
if op.has_auto_uuid_field {
505+
if let Some(uuid_idx) = op.collector_schema.auto_uuid_field_idx {
506+
let uuid = memory.next_uuid(
507+
op.fingerprinter
508+
.clone()
509+
.with(&field_values.iter().enumerate().filter(|(i, _)| *i != uuid_idx).map(|(_, v)| v).collect::<Vec<_>>())?
510+
.into_fingerprint(),
511+
)?;
512+
field_values[uuid_idx] = value::Value::Basic(value::BasicValue::Uuid(uuid));
513+
}
514+
}
515+
506516
{
507517
let mut collected_records = collector_entry.collected_values
508518
[op.collector_ref.local.collector_idx as usize]

0 commit comments

Comments
 (0)