Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support merge for Distribution #15296

Closed
wants to merge 3 commits into from

Conversation

xudong963
Copy link
Member

Which issue does this PR close?

Rationale for this change

See issue #15290

What changes are included in this PR?

Are these changes tested?

I'll do it after we are consistent.

Are there any user-facing changes?

No

@github-actions github-actions bot added the logical-expr Logical plan and expressions label Mar 18, 2025
Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for this @xudong963 -- it is an interesting proposal

I left some comments

I think eventually it would be nice to add some tests for this code

let range_b = b.range()?;

// Determine data type and create combined range
let combined_range = if range_a.is_unbounded() || range_b.is_unbounded() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member Author

@xudong963 xudong963 Mar 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great, one concern is that I found the Interval::union works with intervals of the same data type.

It seems that we can loose the requirement, such as, Int64 with Int32, int with float, etc also can be unioned.

Interval::try_new(combined_lower, combined_upper)?
};

// Calculate weights for the mixture distribution
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what does "mixture distribution" mean in this context?

It seems like this code weighs the input distributions on number of distinct values (cardinality) which seems not right. For example if we have two inputs:

  1. 1M rows, 3 distinct values
  2. 10 rows, 10 distinct values

I think this code is going to assume the man is close to the second input even though there are only 10 values

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Your point is correct.

IMO, the best way to compute the weight is based on the count of each interval, but the count of each interval is unknown.

After thinking, I have a new idea, maybe we can use the variance to approximate the weight. That means, lower variance generally indicates more samples:

let (weight_a, weight_b) = {
    // Lower variance generally indicates more samples
    let var_a = self.variance()?.cast_to(&DataType::Float64)?;
    let var_b = other.variance()?.cast_to(&DataType::Float64)?;
    
    match (var_a, var_b) {
        (ScalarValue::Float64(Some(va)), ScalarValue::Float64(Some(vb))) => {
            // Weighting inversely by variance (with safeguards against division by zero)
            let va_safe = va.max(f64::EPSILON);
            let vb_safe = vb.max(f64::EPSILON);
            let wa = 1.0 / va_safe;
            let wb = 1.0 / vb_safe;
            let total = wa + wb;
            (wa / total, wb / total)
        }
        _ => (0.5, 0.5)  // Fall back to equal weights
    }
};

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And also ping @kosiew , do you have any thoughts for the new way to compute the weight?

@xudong963
Copy link
Member Author

I think eventually it would be nice to add some tests for this code

Yes, as the ticket description said: I'll do it after we are consistent.

@xudong963
Copy link
Member Author

FYI @berkaysynnada @ozankabak

Copy link
Contributor

@berkaysynnada berkaysynnada left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @xudong963 for working on this new feature. I understand what you are trying to do, and don’t want to block your progress, but we need to ground this function in probability theory. Unfortunately I don’t see a clear justification for it. To help move this toward the finish line:

Do you know any use cases where this method would be especially useful? If so, maybe we can study one of those cases in more detail. That could help us understand the real need and guide us toward a more solid algorithm.

Here are a couple of theoretical approaches that might be easier to reason about:
(These were suggested with LLM help, so we should double-check the terminology and theory)

a) Mixture Model (Weighted Average of PDFs)
This is a method for combining different probability distributions.
p1(x) and p2(x) is some PDF's, and we give them equal weight (0.5). The combined PDF would be:
pmix(x) = 0.5 * p1(x) + 0.5 * p2(x)
This creates a probabilistic blend of the two. The result is still a valid PDF (non-negative and integrates to 1).

b) Product of PDFs (Bayesian-style Fusion)
This is used when we want to combine independent sources of evidence.
pproduct(x) ∝ p1(x) * p2(x)
To make it a proper PDF, we normalize it:
pproduct(x) = [p1(x) * p2(x)] / Z, where Z = ∫ p1(x) * p2(x) dx

Let me know what you think

///
/// - The resulting mean, median, and variance are approximations of the mixture
/// distribution parameters. They are calculated using weighted averages based on
/// the input distributions. Users should not make definitive assumptions based on these values.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm afraid we cannot allow such approximations. This whole "Distribution" context is implemented to represent these uncertainties. If we also allow uncertainties here, the things are becoming more complicated.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree, this will result in larger uncertainties.

// Always use Float64 for intermediate calculations to avoid truncation
// I assume that the target type is always numeric
// Todo: maybe we can keep all `ScalarValue` as `Float64` in `Distribution`?
let calc_type = DataType::Float64;
Copy link
Contributor

@berkaysynnada berkaysynnada Mar 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why float? decimals have higher precisions? We've thought on that a lot, and relaxing the datatype is not a good way during computations and representing intermediate or final results. Rather than presuming a target type, we need to rely on the data type of the original quantity and standard coercions of it.

@ozankabak
Copy link
Contributor

This API, as it currently stands, does not seem to make sense. It seems to make the assumption that outcomes (i.e. individual items in the range) of the Distributions are equally likely, which is not necessarily the case.

We can only merge two statistical objects in certain special circumstances. For example, if we have a statistical object that tracks sample averages along with counts, we can merge two instances of them. Our distributions are not merge-able quantities in this sense. They are mixable (with a given weight), but not merge-able.

One of the follow-ups we previously discussed was adding a HistogramDistribution object that tracks bins and ranges. These objects will be merge-able. Therefore, we should start off by adding a HistogramDistribution object first. Then, we can add a merge API to that object.

If you think we should have a mix API for the general Distribution object, we can add it too. Such an API will need to include a mixing weight in its signature.

@xudong963
Copy link
Member Author

Do you know any use cases where this method would be especially useful? If so, maybe we can study one of those cases in more detail. That could help us understand the real need and guide us toward a more solid algorithm.

Yes, we're considering restarting the work, and given that Precision will be replaced with Distribution, so I opened the proposal to discuss how to do merge for Distribution.

a) Mixture Model (Weighted Average of PDFs)
This is a method for combining different probability distributions.
p1(x) and p2(x) is some PDF's, and we give them equal weight (0.5). The combined PDF would be:
pmix(x) = 0.5 * p1(x) + 0.5 * p2(x)
This creates a probabilistic blend of the two. The result is still a valid PDF (non-negative and integrates to 1).

It seems that my current way is close to the mixture model.

@xudong963
Copy link
Member Author

We can only merge two statistical objects in certain special circumstances. For example, if we have a statistical object that tracks sample averages along with counts, we can merge two instances of them. Our distributions are not merge-able quantities in this sense. They are mixable (with a given weight), but not merge-able.

I confused the merge and mix, after reviewing the information, "Merge" suggests combining datasets that maintain their original properties, but what's implemented is actually close to a weighted mixture of probability distributions. Do I understand correctly?

One of the follow-ups we previously discussed was adding a HistogramDistribution object that tracks bins and ranges. These objects will be merge-able. Therefore, we should start off by adding a HistogramDistribution object first. Then, we can add a merge API to that object.

Yes, I agree. HistogramDistribution is merge-able. Does it look like this?

pub struct HistogramDistribution {
    bins: Vec<Interval>,     // The bin boundaries
    counts: Vec<u64>,        // Frequency in each bin
    total_count: u64,        // Sum of all bin counts
    range: Interval,         // Overall range covered by the histogram
}

If you think we should have a mix API for the general Distribution object, we can add it too. Such an API will need to include a mixing weight in its signature.

This is my use case: https://github.com/apache/datafusion/pull/13296/files#diff-8d786f45bc2d5bf629754a119ed6fa7998dcff7faacd954c45945b7047b87fa1R498, merge the file statistics in the whole file group. I'm still thinking if mix API can satisfy my requirement.

@ozankabak
Copy link
Contributor

I confused the merge and mix, after reviewing the information, "Merge" suggests combining datasets that maintain their original properties, but what's implemented is actually close to a weighted mixture of probability distributions. Do I understand correctly?

Right -- merge coalesces partial information about a single quantity, while mix models a probabilistic selection between two quantities. Your use case seems to fall in the first category. Use cases for mixture arises when modeling things like filters that depend on composite expressions involving random functions etc.

Yes, I agree. HistogramDistribution is merge-able. Does it look like this?

pub struct HistogramDistribution {
    bins: Vec<Interval>,     // The bin boundaries
    counts: Vec<u64>,        // Frequency in each bin
    total_count: u64,        // Sum of all bin counts
    range: Interval,         // Overall range covered by the histogram
}

I haven't thought about it in detail but this seems reasonable. We'd probably want an attribute specifying the maximum number of bins one can have, because many operations (including merge) will have a tendency to increase bins unless special care is taken to coalesce when necessary. Attribute total_count is derivable from counts, so we may not want to store it for normalization/consistency reasons. Same goes for range, it can constructed from bins in O(1) time.

@xudong963
Copy link
Member Author

xudong963 commented Mar 21, 2025

Attribute total_count is derivable from counts, so we may not want to store it for normalization/consistency reasons. Same goes for range, it can constructed from bins in O(1) time.

Yes, we don't need to store them.

pub struct HistogramDistribution {
    bins: Vec<HistogramBin>, 
}

pub struct HistogramBin {
    upper: ScalarValue,
    count: u64
    // Maybe other fileds, such as ndv
}

How do we plan to generate the HistogramDistribution?

Let's assume we can get the exact min/max from the parquet file: https://github.com/apache/datafusion/blob/main/datafusion/datasource-parquet/src/file_format.rs#L827, should we generate the HistogramDistribution based on the min/max? Or do we have alternative ways?

I saw @cj-zhukov's work here: https://github.com/synnada-ai/datafusion-upstream/pull/63/files#diff-a8919cf6209fb777550056cdd7decca3e6ed94370a2821a9395763fdd6271967R796-R811. If we know exact min/max , we'll generate UniformDistribution for min/max, the interval with same bounds. Going further, can we generate the HistogramDistribution based on the UniformDistribution?

@ozankabak
Copy link
Contributor

The most likely way we will end up with HistogramDistributions will be via sampling. We can also leverage statistics in file metadata if a file format stores this information. AFAICT Parquet doesn't store histogram information.

If your use case is specific to Parquet files and you can't do sampling, what we can do is to add an optional num_samples field to GenericDistribution. This way, you can merge two GenericDistrbution objects if both have a value for num_samples. In such a scenario, you can update mean, variance and range fields with appropriate formulas and add the num_samples fields. The median value will always be set to None, that is not merge-able.

In an expression tree, any num_samples information of children GenericDistribution will combine multiplicatively (due to the independence assumption). When a GenericDistribution combines with another Distribution, the information will be lost and set to None for the resulting GenericDistribution.

For example, the resulting GenericDistribution for 2 * x will preserve num_samples (w.r.t. that of x), but the same for x + y will be num_samples_x * num_samples_y.

@xudong963 xudong963 closed this Mar 22, 2025
@xudong963
Copy link
Member Author

xudong963 commented Mar 22, 2025

Thanks for your suggestions!! @alamb @ozankabak @berkaysynnada and @kosiew

I'll continue to do such work after the Migrate to Distribution from Precision work is done. I think after it's done, the new statistics framework will be relatively stable and we can see how the Distribution is integrated into datafusion core. It'll definitely be helpful for me to do some work around the new statistics framework efficiently.

Again, I sincerely appreciate that you took time to review and discuss ❤️

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
logical-expr Logical plan and expressions
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Support merge for Distribution
5 participants