Skip to content
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
1463d64
Create auto-pr-review.yml (#1)
imbajin Jun 12, 2025
528f12e
Merge branch 'apache:master' into master
imbajin Jul 16, 2025
3503b08
Merge pull request #2 from apache/master
imbajin Nov 26, 2025
9683b47
Merge pull request #6 from apache/master
imbajin Jan 16, 2026
d3fa860
[feat] Adjusted several default parameters and descriptions in the Lo…
kenssa4eedfd Jan 16, 2026
cd92e05
[feat] Adjusted several default parameters and descriptions in the Lo…
kenssa4eedfd Jan 16, 2026
80d1544
[feat] Adjusted descriptions in the Loader
kenssa4eedfd Jan 17, 2026
d37cb4e
[feat] Add validator for parallelThreads
kenssa4eedfd Jan 17, 2026
ca92b2a
[feat] Update the default value of parallelThreads
kenssa4eedfd Jan 17, 2026
8a487f5
[feat] Update the default value of parallelThreads
kenssa4eedfd Jan 17, 2026
0b965c6
[feat] Add validator for maxConnections and maxConnectionsPerRoute
kenssa4eedfd Jan 17, 2026
a432785
[feat] Update the default value of parallelThreads
kenssa4eedfd Jan 17, 2026
171a988
[feat] Update the default value of parallelThreads
kenssa4eedfd Jan 17, 2026
f062624
[feat] Adjusted several default parameters and descriptions in the Lo…
kenssa4eedfd Jan 19, 2026
866a4a0
[feat] Adjusted several default parameters and descriptions in the Lo…
kenssa4eedfd Jan 19, 2026
ceaed28
[feat] Adjusted several default parameters and descriptions in the Lo…
kenssa4eedfd Jan 19, 2026
e39f503
[feat] Adjusted several default parameters and descriptions in the Lo…
kenssa4eedfd Jan 19, 2026
eda716e
[feat] Add unit tests
kenssa4eedfd Jan 20, 2026
71644b6
[feat] Add unit tests
kenssa4eedfd Jan 20, 2026
78fa442
[feat] Add unit tests
kenssa4eedfd Jan 20, 2026
ee496dd
[feat] Add unit tests
kenssa4eedfd Jan 20, 2026
0379118
[feat] Add unit tests
kenssa4eedfd Jan 20, 2026
572cc94
[feat] Add unit tests
kenssa4eedfd Jan 20, 2026
a54eb64
Update loader-ci.yml
kenssa4eedfd Jan 20, 2026
537445c
[feat] Add unit tests
kenssa4eedfd Jan 20, 2026
6f663c7
Merge remote-tracking branch 'origin/loader-update' into loader-update
kenssa4eedfd Jan 20, 2026
18e83cf
[feat] Add unit tests
kenssa4eedfd Jan 20, 2026
dddfa40
[feat] Add unit tests
kenssa4eedfd Jan 20, 2026
ed816e3
[feat] Add unit tests
kenssa4eedfd Jan 20, 2026
1831893
[feat] Add unit tests
kenssa4eedfd Jan 20, 2026
588adad
[feat] Add unit tests
kenssa4eedfd Jan 20, 2026
f60d5a1
[feat] Add unit tests
kenssa4eedfd Jan 20, 2026
3706a3b
[feat] Add unit tests
kenssa4eedfd Jan 20, 2026
c328a01
[feat] Standardize log levels and streamline parameter descriptions.
kenssa4eedfd Jan 21, 2026
146bdf4
Update required review count and collaborators
imbajin Jan 21, 2026
1af9a32
Merge pull request #9 from hugegraph/imbajin-patch-1
imbajin Jan 21, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 35 additions & 0 deletions .github/workflows/auto-pr-review.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

name: "Auto PR Commenter"

on:
pull_request_target:
types: [opened]

jobs:
add-review-comment:
runs-on: ubuntu-latest
permissions:
pull-requests: write
steps:
- name: Add review comment
uses: peter-evans/create-or-update-comment@v4
with:
issue-number: ${{ github.event.pull_request.number }}
body: |
@codecov-ai-reviewer review
Original file line number Diff line number Diff line change
Expand Up @@ -662,24 +662,21 @@ private List<InputTaskItem> prepareTaskItems(List<InputStruct> structs,
}

private void loadStructs(List<InputStruct> structs) {
int parallelCount = this.context.options().parallelCount;
int parallelThreads = this.context.options().parallelThreads;
if (structs.size() == 0) {
return;
}
if (parallelCount <= 0) {
parallelCount = Math.min(structs.size(), Runtime.getRuntime().availableProcessors() * 2);
}

boolean scatter = this.context.options().scatterSources;

LOG.info("{} threads for loading {} structs, from {} to {} in {} mode",
parallelCount, structs.size(), this.context.options().startFile,
parallelThreads, structs.size(), this.context.options().startFile,
this.context.options().endFile,
scatter ? "scatter" : "sequential");

ExecutorService loadService = null;
try {
loadService = ExecutorUtil.newFixedThreadPool(parallelCount, "loader");
loadService = ExecutorUtil.newFixedThreadPool(parallelThreads, "loader");
List<InputTaskItem> taskItems = prepareTaskItems(structs, scatter);
List<CompletableFuture<Void>> loadTasks = new ArrayList<>();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ public final class LoadOptions implements Cloneable {
public static final String HTTPS_SCHEMA = "https";
public static final String HTTP_SCHEMA = "http";
private static final int CPUS = Runtime.getRuntime().availableProcessors();
private static final int DEFAULT_MAX_CONNECTIONS = CPUS * 4;
private static final int DEFAULT_MAX_CONNECTIONS_PER_ROUTE = CPUS * 2;
private static final int MINIMUM_REQUIRED_ARGS = 3;

@Parameter(names = {"-f", "--file"}, required = true, arity = 1,
Expand Down Expand Up @@ -156,7 +158,9 @@ public final class LoadOptions implements Cloneable {

@Parameter(names = {"--batch-insert-threads"}, arity = 1,
validateWith = {PositiveValidator.class},
description = "The number of threads to execute batch insert")
description = "The number of threads to execute batch insert. " +
"If max-conn/max-conn-per-route keep defaults, " +
"they may be auto-adjusted based on this value")
public int batchInsertThreads = CPUS;

@Parameter(names = {"--single-insert-threads"}, arity = 1,
Expand All @@ -165,21 +169,30 @@ public final class LoadOptions implements Cloneable {
public int singleInsertThreads = 8;

@Parameter(names = {"--max-conn"}, arity = 1,
description = "Max number of HTTP connections to server")
public int maxConnections = CPUS * 4;
validateWith = {PositiveValidator.class},
description = "Max number of HTTP connections to server. " +
"If left as default and batch-insert-threads is " +
"set, this may be auto-adjusted")
public int maxConnections = DEFAULT_MAX_CONNECTIONS;

@Parameter(names = {"--max-conn-per-route"}, arity = 1,
description = "Max number of HTTP connections to each route")
public int maxConnectionsPerRoute = CPUS * 2;
validateWith = {PositiveValidator.class},
description = "Max number of HTTP connections to each route. " +
"If left as default and batch-insert-threads is " +
"set, this may be auto-adjusted")
public int maxConnectionsPerRoute = DEFAULT_MAX_CONNECTIONS_PER_ROUTE;

@Parameter(names = {"--batch-size"}, arity = 1,
validateWith = {PositiveValidator.class},
description = "The number of lines in each submit")
public int batchSize = 500;

@Parameter(names = {"--parallel-count"}, arity = 1,
description = "The number of parallel read pipelines")
public int parallelCount = 1;
@Parameter(names = {"--parallel-count", "--parser-threads"}, arity = 1,
validateWith = {PositiveValidator.class},
description = "The number of parallel read pipelines. " +
"Default: auto max(2, cpu/2). " +
"Must be >= 1")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ 默认并行线程数变更需要性能测试验证

从固定值 1 变更为 Math.max(2, CPUS/2) 是一个显著的性能相关变更。

建议

  1. 需要在不同CPU配置(2核、4核、8核、16核+)的机器上进行性能测试
  2. 验证在不同数据规模下的表现(小数据集 vs 大数据集)
  3. 在PR描述的"Verifying these changes"章节补充性能测试结果
  4. 考虑是否需要设置一个上限值,避免在高核心数机器上创建过多线程

测试建议

# 测试不同并行度下的性能
for i in 1 2 4 8; do
  time hugegraph-loader --parser-threads $i ...
done

public int parallelThreads = Math.max(2, CPUS/2);

@Parameter(names = {"--start-file"}, arity = 1,
description = "start file index for partial loading")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;

import org.apache.hugegraph.loader.util.Printer;
import org.slf4j.Logger;

import org.apache.hugegraph.loader.builder.Record;
Expand Down Expand Up @@ -137,6 +138,9 @@ public void shutdown() {

public void submitBatch(InputStruct struct, ElementMapping mapping,
List<Record> batch) {
if (this.context.stopped()) {
return;
}
long start = System.currentTimeMillis();
try {
this.batchSemaphore.acquire();
Expand All @@ -152,13 +156,12 @@ public void submitBatch(InputStruct struct, ElementMapping mapping,
CompletableFuture.runAsync(task, this.batchService).whenComplete(
(r, e) -> {
if (e != null) {
LOG.warn("Batch insert {} error, try single insert",
mapping.type(), e);
// The time of single insert is counted separately
this.submitInSingle(struct, mapping, batch);
} else {
summary.metrics(struct).minusFlighting(batch.size());
LOG.error("Batch insert {} error, interrupting import", mapping.type(), e);
this.context.stopLoading();
Printer.printError("Batch insert %s failed, stop loading, Please check the logs",
mapping.type().string());
}
summary.metrics(struct).minusFlighting(batch.size());

this.batchSemaphore.release();
long end = System.currentTimeMillis();
Expand Down
Loading