-
Notifications
You must be signed in to change notification settings - Fork 160
feat: implement GraphSAGE algorithm based on GeaFlow inference framework #679
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
kitalkuyo-gita
wants to merge
26
commits into
apache:master
Choose a base branch
from
kitalkuyo-gita:issue-677
base: master
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from 25 commits
Commits
Show all changes
26 commits
Select commit
Hold shift + click to select a range
7e93737
feat: support GraphSAGE
kitalkuyo-gita 3866aa7
enhance: add feature select
kitalkuyo-gita 22edacd
test: add test
kitalkuyo-gita 67c1fb9
enhance: add test case
kitalkuyo-gita 3f22f9f
enhance: add GQL support
kitalkuyo-gita 86b4822
enhance: add cuda device && adjust dimssion
kitalkuyo-gita c2280b6
chore: add license
kitalkuyo-gita 55e42b6
bugfix: add conda url
kitalkuyo-gita c8120ee
enhance: add user custom sys python path
kitalkuyo-gita 726fc3a
rerfactor: fill original dimssion
kitalkuyo-gita 5b4dd8a
refactor: update agg collect dimssion
kitalkuyo-gita f4a87d4
refactor: adjust dimension
kitalkuyo-gita a5de492
enhance: solve resource lack while boot
kitalkuyo-gita 8de7b49
refactor: cython deps copy
kitalkuyo-gita bc86864
chore:remove useless code
kitalkuyo-gita 9b6921d
fix: Replace var keyword with explicit type for JDK 8 compatibility
kitalkuyo-gita fadd0f8
fix: Replace FileWriter constructor with OutputStreamWriter for JDK 8…
kitalkuyo-gita c4c5480
ci: Install Python dependencies including PyTorch for GraphSAGE tests
kitalkuyo-gita 3c1c656
ci: Trigger CI build to verify Python dependencies installation
kitalkuyo-gita bbe5900
ci: Install Python dependencies in JDK 11 workflow for GraphSAGE tests
kitalkuyo-gita 0992714
Merge remote-tracking branch 'upstream/master' into issue-677
kitalkuyo-gita fe761c8
style: Remove unused imports in BuildInSqlFunctionTable to fix checks…
kitalkuyo-gita 2bd227f
fix: Re-add ConnectedComponents to SQL function table registration
kitalkuyo-gita fe709e6
fix: Add LabelPropagation to SQL function table registration
kitalkuyo-gita 8e4477e
fix: Add Louvain algorithm to SQL function table registration
kitalkuyo-gita 6471ad8
feat: support Python UDF class name parameterization for multi-algori…
kitalkuyo-gita File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
225 changes: 225 additions & 0 deletions
225
...w-dsl/geaflow-dsl-plan/src/main/java/org/apache/geaflow/dsl/udf/graph/FeatureReducer.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,225 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one | ||
| * or more contributor license agreements. See the NOTICE file | ||
| * distributed with this work for additional information | ||
| * regarding copyright ownership. The ASF licenses this file | ||
| * to you under the Apache License, Version 2.0 (the | ||
| * "License"); you may not use this file except in compliance | ||
| * with the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, | ||
| * software distributed under the License is distributed on an | ||
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| * KIND, either express or implied. See the License for the | ||
| * specific language governing permissions and limitations | ||
| * under the License. | ||
| */ | ||
|
|
||
| package org.apache.geaflow.dsl.udf.graph; | ||
|
|
||
| import java.util.List; | ||
|
|
||
| /** | ||
| * Feature reducer for selecting important feature dimensions to reduce transmission overhead. | ||
| * | ||
| * <p>This class implements feature selection by keeping only the most important dimensions | ||
| * from the full feature vector. This significantly reduces the amount of data transferred | ||
| * between Java and Python processes, improving performance for large feature vectors. | ||
| * | ||
| * <p>Usage: | ||
| * <pre> | ||
| * // Select first 64 dimensions | ||
| * int[] selectedDims = new int[64]; | ||
| * for (int i = 0; i < 64; i++) { | ||
| * selectedDims[i] = i; | ||
| * } | ||
| * FeatureReducer reducer = new FeatureReducer(selectedDims); | ||
| * double[] reduced = reducer.reduceFeatures(fullFeatures); | ||
| * </pre> | ||
| * | ||
| * <p>Benefits: | ||
| * - Reduces memory usage for feature storage | ||
| * - Reduces network/IO overhead in Java-Python communication | ||
| * - Improves inference speed by processing smaller feature vectors | ||
| * - Maintains model accuracy if important dimensions are selected correctly | ||
| */ | ||
| public class FeatureReducer { | ||
|
|
||
| private final int[] selectedDimensions; | ||
|
|
||
| /** | ||
| * Creates a feature reducer with specified dimension indices. | ||
| * | ||
| * @param selectedDimensions Array of dimension indices to keep. | ||
| * Indices should be valid for the full feature vector. | ||
| * Duplicate indices are allowed but not recommended. | ||
| */ | ||
| public FeatureReducer(int[] selectedDimensions) { | ||
| if (selectedDimensions == null || selectedDimensions.length == 0) { | ||
| throw new IllegalArgumentException( | ||
| "Selected dimensions array cannot be null or empty"); | ||
| } | ||
| this.selectedDimensions = selectedDimensions.clone(); // Defensive copy | ||
| } | ||
|
|
||
| /** | ||
| * Reduces a full feature vector to selected dimensions. | ||
| * | ||
| * @param fullFeatures The complete feature vector | ||
| * @return Reduced feature vector containing only selected dimensions | ||
| * @throws IllegalArgumentException if fullFeatures is null or too short | ||
| */ | ||
| public double[] reduceFeatures(double[] fullFeatures) { | ||
| if (fullFeatures == null) { | ||
| throw new IllegalArgumentException("Full features array cannot be null"); | ||
| } | ||
|
|
||
| double[] reducedFeatures = new double[selectedDimensions.length]; | ||
| int maxDim = getMaxDimension(); | ||
|
|
||
| if (maxDim >= fullFeatures.length) { | ||
| throw new IllegalArgumentException( | ||
| String.format("Feature vector length (%d) is too short for selected dimensions (max: %d)", | ||
| fullFeatures.length, maxDim + 1)); | ||
| } | ||
|
|
||
| for (int i = 0; i < selectedDimensions.length; i++) { | ||
| int dimIndex = selectedDimensions[i]; | ||
| reducedFeatures[i] = fullFeatures[dimIndex]; | ||
| } | ||
|
|
||
| return reducedFeatures; | ||
| } | ||
|
|
||
| /** | ||
| * Reduces a feature list to selected dimensions. | ||
| * | ||
| * @param fullFeatures The complete feature list | ||
| * @return Reduced feature array containing only selected dimensions | ||
| */ | ||
| public double[] reduceFeatures(List<Double> fullFeatures) { | ||
| if (fullFeatures == null) { | ||
| throw new IllegalArgumentException("Full features list cannot be null"); | ||
| } | ||
|
|
||
| double[] fullArray = new double[fullFeatures.size()]; | ||
| for (int i = 0; i < fullFeatures.size(); i++) { | ||
| Double value = fullFeatures.get(i); | ||
| fullArray[i] = value != null ? value : 0.0; | ||
| } | ||
|
|
||
| return reduceFeatures(fullArray); | ||
| } | ||
|
|
||
| /** | ||
| * Reduces multiple feature vectors in batch. | ||
| * | ||
| * @param fullFeaturesList List of full feature vectors | ||
| * @return Array of reduced feature vectors | ||
| */ | ||
| public double[][] reduceFeaturesBatch(List<double[]> fullFeaturesList) { | ||
| if (fullFeaturesList == null) { | ||
| throw new IllegalArgumentException("Full features list cannot be null"); | ||
| } | ||
|
|
||
| double[][] reducedFeatures = new double[fullFeaturesList.size()][]; | ||
| for (int i = 0; i < fullFeaturesList.size(); i++) { | ||
| reducedFeatures[i] = reduceFeatures(fullFeaturesList.get(i)); | ||
| } | ||
|
|
||
| return reducedFeatures; | ||
| } | ||
|
|
||
| /** | ||
| * Gets the maximum dimension index in the selected dimensions. | ||
| * | ||
| * @return Maximum dimension index | ||
| */ | ||
| private int getMaxDimension() { | ||
| int max = selectedDimensions[0]; | ||
| for (int dim : selectedDimensions) { | ||
| if (dim > max) { | ||
| max = dim; | ||
| } | ||
| } | ||
| return max; | ||
| } | ||
|
|
||
| /** | ||
| * Gets the number of selected dimensions. | ||
| * | ||
| * @return Number of dimensions in the reduced feature vector | ||
| */ | ||
| public int getReducedDimension() { | ||
| return selectedDimensions.length; | ||
| } | ||
|
|
||
| /** | ||
| * Gets the selected dimension indices. | ||
| * | ||
| * @return Copy of the selected dimension indices array | ||
| */ | ||
| public int[] getSelectedDimensions() { | ||
| return selectedDimensions.clone(); // Defensive copy | ||
| } | ||
|
|
||
| /** | ||
| * Creates a feature reducer that selects the first N dimensions. | ||
| * | ||
| * <p>This is a convenience method for the common case of selecting | ||
| * the first N dimensions from a feature vector. | ||
| * | ||
| * @param numDimensions Number of dimensions to select from the beginning | ||
| * @return FeatureReducer instance | ||
| */ | ||
| public static FeatureReducer selectFirst(int numDimensions) { | ||
| if (numDimensions <= 0) { | ||
| throw new IllegalArgumentException( | ||
| "Number of dimensions must be positive, got: " + numDimensions); | ||
| } | ||
|
|
||
| int[] dims = new int[numDimensions]; | ||
| for (int i = 0; i < numDimensions; i++) { | ||
| dims[i] = i; | ||
| } | ||
|
|
||
| return new FeatureReducer(dims); | ||
| } | ||
|
|
||
| /** | ||
| * Creates a feature reducer that selects evenly spaced dimensions. | ||
| * | ||
| * <p>This method selects dimensions at regular intervals, which can be useful | ||
| * for uniform sampling across the feature space. | ||
| * | ||
| * @param numDimensions Number of dimensions to select | ||
| * @param totalDimensions Total number of dimensions in the full feature vector | ||
| * @return FeatureReducer instance | ||
| */ | ||
| public static FeatureReducer selectEvenlySpaced(int numDimensions, int totalDimensions) { | ||
| if (numDimensions <= 0) { | ||
| throw new IllegalArgumentException( | ||
| "Number of dimensions must be positive, got: " + numDimensions); | ||
| } | ||
| if (totalDimensions <= 0) { | ||
| throw new IllegalArgumentException( | ||
| "Total dimensions must be positive, got: " + totalDimensions); | ||
| } | ||
| if (numDimensions > totalDimensions) { | ||
| throw new IllegalArgumentException( | ||
| String.format("Cannot select %d dimensions from %d total dimensions", | ||
| numDimensions, totalDimensions)); | ||
| } | ||
|
|
||
| int[] dims = new int[numDimensions]; | ||
| double step = (double) totalDimensions / numDimensions; | ||
| for (int i = 0; i < numDimensions; i++) { | ||
| dims[i] = (int) Math.floor(i * step); | ||
| } | ||
|
|
||
| return new FeatureReducer(dims); | ||
| } | ||
| } | ||
|
|
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't understand the change here. clientLocal is a thread-local variable that has the same lifecycle as the graph operator.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The change itself was made because it was feared that the current inference context (inferContext) could not be obtained from inferpool. clientLocal was made to use the already created inferContext, reusing existing resources.