Skip to content

Commit 60edfc9

Browse files
author
吴炳亨
authoredDec 26, 2019
Merge pull request #285 from NigelWu95/dev
Dev
2 parents 34754ba + 0f086ad commit 60edfc9

File tree

8 files changed

+25
-23
lines changed

8 files changed

+25
-23
lines changed
 

‎README.md

+8-2
Original file line numberDiff line numberDiff line change
@@ -392,9 +392,15 @@ java.net.SocketTimeoutException: timeout
392392
5、断点续操作时建议修改下 save-path,便于和上一次保存的结果做区分(7.72 及以下版本中断点参数请和其他参数保持一致放在命令行或配置文件中,7.72 以上
393393
版本无此限制,只要提供断点参数无论是否与其他参数同在命令行或配置文件中均可生效)。
394394

395-
**注意:如果是系统宕机、断电或者强制关机或者进程强行 kill 等情况,无法得到输出的断点文件提示,因此只能通过[<位置记录日志>](#9-程序日志)来查看最后
395+
**注意:
396+
(1)如果是系统宕机、断电或者强制关机或者进程强行 kill 等情况,无法得到输出的断点文件提示,因此只能通过[<位置记录日志>](#9-程序日志)来查看最后
396397
的断点信息,在 8.2.1 版本以上设置了 log 参数可用于启用日志记录的断点,即取出运行路径下 logs 目录中的 procedure[x].log 日志,将该日志文件设置
397-
`-log=<procedure[x].log's path>` 再运行可完成断点续操作。**
398+
`-log=<procedure[x].log's path>` 再运行可完成断点续操作。
399+
(2)如果原任务包含 process 过程(只有数据源读取不包含 process 操作可以不考虑该问题),执行断点操作时,由于断点日志粒度按照 unit-len 来记录的,
400+
当 unit-len 比较大(默认一般都是 1000 以上,甚至是 10000)时,可能存在记录的断点比实际 process 的进度要滞后很多条记录,因此对于一些不希望存在数
401+
据重复执行的 process,如 qupload/syncupload/mirror/fetch/asyncfetch 等,数据重复执行会影响效率和增加流量消耗,那么建议操作断点时设置一些
402+
check 参数,如 check=stat(参考:[过滤和检查](docs/datamigration.md#过滤和检查)),或者对于其他操作有这个需求的话可以自行对 process 的
403+
[结果输出文件](#关于持久化文件名)进行检查,查看执行到的数据行位置并对断点设置的 json 文件进行调整。**
398404

399405
### 11 分布式任务方案
400406
对于不同账号或空间可以直接在不同的机器上执行任务,对于单个空间资源数量太大无法在合适条件下使用单台机器完成作业时,可分机器进行作业,如对一个空间列举完

‎pom.xml

+1-1
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66

77
<groupId>com.qiniu</groupId>
88
<artifactId>qsuits</artifactId>
9-
<version>8.3.8</version>
9+
<version>8.3.9</version>
1010
<name>qsuits</name>
1111
<description>qiniu-suits is a efficient tools for qiniu api implemented by java8.</description>
1212
<url>https://github.com/NigelWu95/qiniu-suits-java</url>

‎src/main/java/com/qiniu/datasource/TextContainer.java

+3-1
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,7 @@ public void export(ITextReader reader, IResultOutput saver, ILineProcess<T> proc
131131
Map<String, String> map = urisMap.get(reader.getName());
132132
JsonObject json = map != null ? JsonUtils.toJsonObject(map) : (lastLine != null ? new JsonObject() : null);
133133
while (lastLine != null) {
134+
if (stopped) break;
134135
if (LocalDateTime.now(DatetimeUtils.clock_Default).isAfter(pauseDateTime)) {
135136
synchronized (object) {
136137
object.wait();
@@ -146,7 +147,6 @@ public void export(ITextReader reader, IResultOutput saver, ILineProcess<T> proc
146147
if (retry == 0) throw e;
147148
}
148149
}
149-
statistics.addAndGet(srcList.size());
150150
convertedList = converter.convertToVList(srcList);
151151
if (converter.errorSize() > 0) saver.writeError(converter.errorLines(), false);
152152
if (stringConverter != null) {
@@ -166,6 +166,8 @@ public void export(ITextReader reader, IResultOutput saver, ILineProcess<T> proc
166166
if (e.response != null) e.response.close();
167167
}
168168
}
169+
statistics.addAndGet(srcList.size());
170+
if (stopped) break;
169171
lastLine = reader.currentEndLine();
170172
json.addProperty("start", lastLine);
171173
recordLister(reader.getName(), json.toString());

‎src/main/java/com/qiniu/datasource/TextFileContainer.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ private List<ITextReader> splitSingleFile(File file) throws IOException {
6969
if (linesNumber < threads * 2) {
7070
return new ArrayList<ITextReader>(){{ add(new TextFileReader(file, null, unitLen)); }};
7171
}
72-
long avgLines = linesNumber / threads;
72+
long avgLines = (linesNumber + threads - 1) / threads;
7373
long avgSize = avgLines * lineSize;
7474
RandomAccessFile[] accessFiles = new RandomAccessFile[threads];
7575
accessFiles[0] = new RandomAccessFile(file, "r");

‎src/main/java/com/qiniu/datasource/TextFileRandomReader.java

+1-4
Original file line numberDiff line numberDiff line change
@@ -69,10 +69,7 @@ public List<String> readLines() throws IOException {
6969
}
7070
} else {
7171
while (true) {
72-
if (srcList.size() >= limit) {
73-
line = null;
74-
break;
75-
}
72+
if (srcList.size() >= limit) break;
7673
try {
7774
// 避免文件过大,行数过多,使用 lines() 的 stream 方式直接转换可能会导致内存泄漏,故使用 readLine() 的方式
7875
originalLine = accessFile.readLine();

‎src/main/java/com/qiniu/entry/CommonParams.java

+9-12
Original file line numberDiff line numberDiff line change
@@ -1142,14 +1142,14 @@ private void checkFilterForProcess() throws IOException {
11421142
private void setUnitLen() throws IOException {
11431143
String unitLen = entryParam.getValue("unit-len", "-1").trim();
11441144
if (unitLen.startsWith("-")) {
1145-
if (isSelfUpload) this.unitLen = 3;
1146-
if ("qiniu".equals(source) || "local".equals(source)) this.unitLen = 10000;
1145+
if (isSelfUpload) this.unitLen = 20;
1146+
else if ("qiniu".equals(source)) this.unitLen = 10000;
11471147
else this.unitLen = 1000;
11481148
} else {
11491149
ParamsUtils.checked(unitLen, "unit-len", "\\d+");
11501150
this.unitLen = Integer.parseInt(unitLen);
11511151
if (isSelfUpload && this.unitLen > 100) {
1152-
throw new IOException("file upload shouldn't has too big unit-len, suggest setting unit-len smaller than 100.");
1152+
throw new IOException("file upload shouldn't have too big unit-len, suggest to set unit-len smaller than 100.");
11531153
}
11541154
}
11551155
}
@@ -1243,20 +1243,17 @@ private void setSavePath() throws IOException {
12431243
} else {
12441244
File file = new File(savePath);
12451245
File[] files = file.listFiles();
1246-
boolean isOk = false;
1246+
boolean isOk;
12471247
if (files != null && files.length > 0) {
1248-
for (File file1 : files) {
1249-
if (file1.length() > 0) {
1250-
if (file1.getName().startsWith(source) || (!"".equals(process) && file1.getName().startsWith(process))) {
1251-
isOk = true;
1252-
break;
1253-
}
1254-
}
1248+
if ("".equals(process)) {
1249+
isOk = Arrays.asList(files).parallelStream().anyMatch(f -> f.getName().startsWith(source));
1250+
} else {
1251+
isOk = Arrays.asList(files).parallelStream().anyMatch(f -> f.getName().startsWith(source) || f.getName().startsWith(process));
12551252
}
12561253
if (isOk) {
12571254
if (pathConfigMap == null || pathConfigMap.size() <= 0) {
12581255
throw new IOException(String.format("please change the save-path \"%s\", " +
1259-
"because there are last listed files, for not cover them.", savePath));
1256+
"because there are remained files from last job, for not cover them.", savePath));
12601257
}
12611258
} else {
12621259
throw new IOException(String.format("please change save-path \"%s\" because it's not empty.", savePath));

‎src/main/java/com/qiniu/process/Base.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -171,7 +171,7 @@ protected List<T> parseBatchResult(List<T> processList, String result) throws Ex
171171
* @throws IOException 处理失败可能抛出的异常
172172
*/
173173
private void batchProcess(List<T> lineList, int batchSize, int retryTimes) throws IOException {
174-
int times = lineList.size()/batchSize + 1;
174+
int times = (lineList.size() + batchSize - 1) / batchSize;
175175
List<T> processList;
176176
int retry;
177177
String message = null;

‎version.properties

+1-1
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
version=8.3.8
1+
version=8.3.9

0 commit comments

Comments
 (0)