Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
7e93737
feat: support GraphSAGE
kitalkuyo-gita Nov 17, 2025
3866aa7
enhance: add feature select
kitalkuyo-gita Nov 17, 2025
22edacd
test: add test
kitalkuyo-gita Nov 17, 2025
67c1fb9
enhance: add test case
kitalkuyo-gita Nov 17, 2025
3f22f9f
enhance: add GQL support
kitalkuyo-gita Nov 17, 2025
86b4822
enhance: add cuda device && adjust dimssion
kitalkuyo-gita Nov 26, 2025
c2280b6
chore: add license
kitalkuyo-gita Nov 26, 2025
55e42b6
bugfix: add conda url
kitalkuyo-gita Nov 26, 2025
c8120ee
enhance: add user custom sys python path
kitalkuyo-gita Nov 26, 2025
726fc3a
rerfactor: fill original dimssion
kitalkuyo-gita Nov 26, 2025
5b4dd8a
refactor: update agg collect dimssion
kitalkuyo-gita Nov 26, 2025
f4a87d4
refactor: adjust dimension
kitalkuyo-gita Nov 26, 2025
a5de492
enhance: solve resource lack while boot
kitalkuyo-gita Nov 26, 2025
8de7b49
refactor: cython deps copy
kitalkuyo-gita Nov 26, 2025
bc86864
chore:remove useless code
kitalkuyo-gita Nov 27, 2025
9b6921d
fix: Replace var keyword with explicit type for JDK 8 compatibility
kitalkuyo-gita Mar 6, 2026
fadd0f8
fix: Replace FileWriter constructor with OutputStreamWriter for JDK 8…
kitalkuyo-gita Mar 6, 2026
c4c5480
ci: Install Python dependencies including PyTorch for GraphSAGE tests
kitalkuyo-gita Mar 6, 2026
3c1c656
ci: Trigger CI build to verify Python dependencies installation
kitalkuyo-gita Mar 6, 2026
bbe5900
ci: Install Python dependencies in JDK 11 workflow for GraphSAGE tests
kitalkuyo-gita Mar 7, 2026
0992714
Merge remote-tracking branch 'upstream/master' into issue-677
kitalkuyo-gita Mar 7, 2026
fe761c8
style: Remove unused imports in BuildInSqlFunctionTable to fix checks…
kitalkuyo-gita Mar 7, 2026
2bd227f
fix: Re-add ConnectedComponents to SQL function table registration
kitalkuyo-gita Mar 7, 2026
fe709e6
fix: Add LabelPropagation to SQL function table registration
kitalkuyo-gita Mar 7, 2026
8e4477e
fix: Add Louvain algorithm to SQL function table registration
kitalkuyo-gita Mar 7, 2026
6471ad8
feat: support Python UDF class name parameterization for multi-algori…
kitalkuyo-gita Mar 20, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions .github/workflows/ci-jdk11.yml
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,18 @@ jobs:
with:
version: "21.7"

- name: Set up Python
uses: actions/setup-python@v4
with:
python-version: '3.9'
cache: 'pip'

- name: Install Python dependencies
run: |
python -m pip install --upgrade pip
pip install -r geaflow/geaflow-dsl/geaflow-dsl-plan/src/main/resources/requirements.txt
pip list | grep -i torch

# Current hive connector is incompatible with jdk11, implement 4.0.0+ hive version in later.
- name: Build and Test On JDK 11
run: |
Expand Down
12 changes: 12 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,18 @@ jobs:
with:
version: "21.7"

- name: Set up Python
uses: actions/setup-python@v4
with:
python-version: '3.9'
cache: 'pip'

- name: Install Python dependencies
run: |
python -m pip install --upgrade pip
pip install -r geaflow/geaflow-dsl/geaflow-dsl-plan/src/main/resources/requirements.txt
pip list | grep -i torch

- name: Build and Test On JDK 8
run: mvn -B -e clean test -Pjdk8 -pl !geaflow/geaflow-plugins/geaflow-store/geaflow-store-vector
-Duser.timezone=Asia/Shanghai -Dlog4j.configuration="log4j .rootLogger=WARN, stdout"
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,16 @@ public class FrameworkConfigKeys implements Serializable {
.noDefaultValue()
.description("infer env conda url");

public static final ConfigKey INFER_ENV_USE_SYSTEM_PYTHON = ConfigKeys
.key("geaflow.infer.env.use.system.python")
.defaultValue(false)
.description("use system Python instead of creating virtual environment");

public static final ConfigKey INFER_ENV_SYSTEM_PYTHON_PATH = ConfigKeys
.key("geaflow.infer.env.system.python.path")
.noDefaultValue()
.description("path to system Python executable (e.g., /usr/bin/python3 or /opt/homebrew/bin/python3)");

public static final ConfigKey ASP_ENABLE = ConfigKeys
.key("geaflow.iteration.asp.enable")
.defaultValue(false)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,37 @@ public AbstractIncVertexCentricComputeAlgo(long iterations, String name) {

public abstract FUNC getIncComputeFunction();

/**
* Returns the Python transform class name that this algorithm requires.
*
* <p>Override this method in subclasses to specify which Python UDF class
* should be loaded for inference, enabling multiple algorithms with different
* Python models to coexist in the same job without naming conflicts.
*
* <p>When this method returns a non-null value, the pipeline infrastructure
* will create a dedicated {@code InferContext} keyed to that class name,
* independent of the global {@code geaflow.infer.env.user.transform.classname}
* configuration. When it returns {@code null} (the default), the global
* configuration value is used.
*
* <p>Example:
* <pre>
* // Using the default Python UDF specified in global config:
* incGraphView.incrementalCompute(new GraphSAGECompute(10, 2))
*
* // Explicitly specifying a Python UDF (code-based approach):
* incGraphView.incrementalCompute(new GraphSAGECompute(10, 2, "GraphSAGETransFormFunction"))
*
* // Two algorithms in the same job, each with its own Python UDF:
* incGraphView.incrementalCompute(new GraphSAGECompute(10, 2, "GraphSAGETransFormFunction"))
* incGraphView.incrementalCompute(new GCNCompute(64, "GCNTransFormFunction"))
* </pre>
*
* @return the Python transform class name, or {@code null} to fall back to
* the global configuration
*/
public String getPythonTransformClassName() {
return null;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@
package org.apache.geaflow.operator.impl.graph.compute.dynamic;

import java.io.IOException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.geaflow.api.function.iterator.RichIteratorFunction;
import org.apache.geaflow.api.graph.base.algo.AbstractIncVertexCentricComputeAlgo;
Expand All @@ -33,6 +35,7 @@
import org.apache.geaflow.common.config.keys.FrameworkConfigKeys;
import org.apache.geaflow.common.exception.GeaflowRuntimeException;
import org.apache.geaflow.infer.InferContext;
import org.apache.geaflow.infer.InferContextPool;
import org.apache.geaflow.model.graph.message.DefaultGraphMessage;
import org.apache.geaflow.model.graph.vertex.IVertex;
import org.apache.geaflow.model.record.RecordArgs.GraphRecordNames;
Expand Down Expand Up @@ -164,16 +167,62 @@ class IncGraphInferComputeContextImpl<OUT> extends IncGraphComputeContextImpl im
public IncGraphInferComputeContextImpl() {
if (clientLocal.get() == null) {
try {
inferContext = new InferContext<>(runtimeContext.getConfiguration());
// Build the effective configuration for this algorithm's InferContext.
// If the algorithm declares its own Python transform class name
// (code-based approach), override the global config with it so that
// each algorithm can use an independent Python subprocess.
// This resolves the UDF naming conflict when multiple algorithms
// with different Python models run in the same job.
Configuration inferConfig = buildInferConfig();
inferContext = InferContextPool.getOrCreate(inferConfig);
clientLocal.set(inferContext);
LOGGER.info(
"InferContext obtained from pool for algorithm '{}', "
+ "pythonTransformClass='{}', pool={}",
function.getClass().getSimpleName(),
inferConfig.getString(
FrameworkConfigKeys.INFER_ENV_USER_TRANSFORM_CLASSNAME),
InferContextPool.getStatus());
} catch (Exception e) {
throw new GeaflowRuntimeException(e);
LOGGER.error("Failed to obtain InferContext from pool", e);
throw new GeaflowRuntimeException(
"InferContext initialization failed: " + e.getMessage(), e);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand the change here. clientLocal is a thread-local variable that has the same lifecycle as the graph operator.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The change itself was made because it was feared that the current inference context (inferContext) could not be obtained from inferpool. clientLocal was made to use the already created inferContext, reusing existing resources.

}
clientLocal.set(inferContext);
} else {
inferContext = clientLocal.get();
}
}

/**
* Builds the {@link Configuration} used for creating this algorithm's
* {@link InferContext}.
*
* <p>If the algorithm overrides {@link AbstractIncVertexCentricComputeAlgo
* #getPythonTransformClassName()} with a non-null value, a derived
* configuration is returned with that value set for
* {@code geaflow.infer.env.user.transform.classname}. This gives every
* algorithm its own {@code InferContext} (keyed by config hash in
* {@link InferContextPool}), so multiple algorithms with different Python
* UDFs can coexist in the same job without conflict.
*
* <p>If the algorithm returns {@code null} (the default), the runtime
* configuration is returned as-is, preserving the original behaviour.
*/
private Configuration buildInferConfig() {
String algoClassName = function.getPythonTransformClassName();
if (algoClassName != null && !algoClassName.trim().isEmpty()) {
// Create a shallow copy of the global config and override the
// Python transform class name with the algorithm-specific value.
Map<String, String> overrideMap =
new HashMap<>(runtimeContext.getConfiguration().getConfigMap());
overrideMap.put(
FrameworkConfigKeys.INFER_ENV_USER_TRANSFORM_CLASSNAME.getKey(),
algoClassName);
return new Configuration(overrideMap);
}
return runtimeContext.getConfiguration();
}

@Override
public OUT infer(Object... modelInputs) {
try {
Expand All @@ -186,7 +235,9 @@ public OUT infer(Object... modelInputs) {
@Override
public void close() throws IOException {
if (clientLocal.get() != null) {
clientLocal.get().close();
// Do NOT close the InferContext here since it's managed by the pool.
// The pool handles lifecycle management across the entire job.
LOGGER.debug("Detaching from pooled InferContext");
clientLocal.remove();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -232,15 +232,15 @@ public class BuildInSqlFunctionTable extends ListSqlOperatorTable {
.add(GeaFlowFunction.of(IncMinimumSpanningTree.class))
.add(GeaFlowFunction.of(ClosenessCentrality.class))
.add(GeaFlowFunction.of(WeakConnectedComponents.class))
.add(GeaFlowFunction.of(ConnectedComponents.class))
.add(GeaFlowFunction.of(LabelPropagation.class))
.add(GeaFlowFunction.of(Louvain.class))
.add(GeaFlowFunction.of(TriangleCount.class))
.add(GeaFlowFunction.of(ClusterCoefficient.class))
.add(GeaFlowFunction.of(IncWeakConnectedComponents.class))
.add(GeaFlowFunction.of(CommonNeighbors.class))
.add(GeaFlowFunction.of(JaccardSimilarity.class))
.add(GeaFlowFunction.of(IncKHopAlgorithm.class))
.add(GeaFlowFunction.of(LabelPropagation.class))
.add(GeaFlowFunction.of(ConnectedComponents.class))
.add(GeaFlowFunction.of(Louvain.class))
.build();

public BuildInSqlFunctionTable(GQLJavaTypeFactory typeFactory) {
Expand Down
Loading