Skip to content

Commit 90b9d78

Browse files
author
wubingheng
committed
fix CommonParams qupload setting and add path config method from LogPath.
1 parent 8551c91 commit 90b9d78

File tree

7 files changed

+124
-70
lines changed

7 files changed

+124
-70
lines changed

src/main/java/com/qiniu/convert/LineToMap.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ public LineToMap(String parseType, String separator, String addKeyPrefix, String
1818
this.lineParser = line -> process(addKeyPrefix, rmKeyPrefix, ConvertingUtils.toPair(line, indexMap, new StringMapPair()));
1919
} else if ("csv".equals(parseType)) {
2020
this.lineParser = line -> process(addKeyPrefix, rmKeyPrefix, ConvertingUtils.toPair(line, ",", indexMap, new StringMapPair()));
21-
} else if ("tab".equals(parseType) || "self".equals(parseType)) {
21+
} else if ("tab".equals(parseType)) {
2222
this.lineParser = line -> process(addKeyPrefix, rmKeyPrefix, ConvertingUtils.toPair(line, separator, indexMap, new StringMapPair()));
2323
} else {
2424
throw new IOException("please check your format for line to map.");

src/main/java/com/qiniu/datasource/CloudStorageContainer.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -228,7 +228,7 @@ public void export(IStorageLister<E> lister, IResultOutput saver, ILineProcess<T
228228
record = json.toString();
229229
progressMap.put(lister.getPrefix(), record);
230230
try { FileUtils.createIfNotExists(procedureLogFile); } catch (IOException ignored) {}
231-
procedureLogger.info("{}:{}", lister.getPrefix(), record);
231+
procedureLogger.info("{}-|-{}", lister.getPrefix(), record);
232232
}
233233
if (map != null) map.put("start", lister.currentEndKey());
234234
if (stopped) break;
@@ -271,7 +271,7 @@ void listing(IStorageLister<E> lister) {
271271
processorMap.put(orderStr, lineProcessor);
272272
}
273273
export(lister, saver, lineProcessor);
274-
procedureLogger.info("{}:", lister.getPrefix());
274+
procedureLogger.info("{}-|-", lister.getPrefix());
275275
progressMap.remove(lister.getPrefix()); // 只有 export 成功情况下才移除 record
276276
} catch (QiniuException e) {
277277
try { FileUtils.createIfNotExists(errorLogFile); } catch (IOException ignored) {}

src/main/java/com/qiniu/datasource/FileContainer.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -253,7 +253,7 @@ public void export(IFileLister<E, File> lister, IResultOutput saver, ILineProces
253253
record = json.toString();
254254
progressMap.put(lister.getName(), record);
255255
try { FileUtils.createIfNotExists(procedureLogFile); } catch (IOException ignored) {}
256-
procedureLogger.info("{}:{}", lister.getName(), record);
256+
procedureLogger.info("{}-|-{}", lister.getName(), record);
257257
}
258258
if (stopped) break;
259259
// objects.clear(); 上次其实不能做 clear,会导致 lister 中的列表被清空
@@ -279,7 +279,7 @@ private void listing(IFileLister<E, File> lister) {
279279
processorMap.put(orderStr, lineProcessor);
280280
}
281281
export(lister, saver, lineProcessor);
282-
procedureLogger.info("{}:", lister.getName());
282+
procedureLogger.info("{}-|-", lister.getName());
283283
progressMap.remove(lister.getName()); // 只有 export 成功情况下才移除 record
284284
} catch (QiniuException e) {
285285
try { FileUtils.createIfNotExists(errorLogFile); } catch (IOException ignored) {}

src/main/java/com/qiniu/datasource/TextContainer.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -166,7 +166,7 @@ public void export(ITextReader<S> reader, IResultOutput saver, ILineProcess<T> p
166166
record = json.toString();
167167
progressMap.put(reader.getName(), record);
168168
try { FileUtils.createIfNotExists(procedureLogFile); } catch (IOException ignored) {}
169-
procedureLogger.info("{}:{}", reader.getName(), record);
169+
procedureLogger.info("{}-|-{}", reader.getName(), record);
170170
lastLine = reader.currentEndLine();
171171
}
172172
}
@@ -187,7 +187,7 @@ private void reading(ITextReader<S> reader) {
187187
processorMap.put(orderStr, lineProcessor);
188188
}
189189
export(reader, saver, lineProcessor);
190-
procedureLogger.info("{}:", reader.getName());
190+
procedureLogger.info("{}-|-", reader.getName());
191191
progressMap.remove(reader.getName()); // 只有 export 成功情况下才移除 record
192192
} catch (QiniuException e) {
193193
try { FileUtils.createIfNotExists(errorLogFile); } catch (IOException ignored) {}

src/main/java/com/qiniu/entry/CommonParams.java

+91-58
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ public class CommonParams {
4444
private String baiduAccessId;
4545
private String baiduSecretKey;
4646
private String bucket;
47+
private String logFilepath;
4748
private Map<String, Map<String, String>> pathConfigMap;
4849
private List<String> antiDirectories;
4950
private List<String> antiPrefixes;
@@ -164,12 +165,12 @@ public CommonParams(IEntryParam entryParam) throws Exception {
164165
path = entryParam.getValue("path", "");
165166
setSource();
166167
accountInit();
168+
logFilepath = entryParam.getValue("log", null);
167169
if (isStorageSource) {
168170
setAuthKey();
169171
setBucket();
170172
String prefixes = entryParam.getValue("prefixes", null);
171173
setPathConfigMap(entryParam.getValue("prefix-config", ""), prefixes, true, true);
172-
antiPrefixes = Arrays.asList(ParamsUtils.escapeSplit(entryParam.getValue("anti-prefixes", "")));
173174
setPrefixLeft(entryParam.getValue("prefix-left", "false").trim());
174175
setPrefixRight(entryParam.getValue("prefix-right", "false").trim());
175176
} else {
@@ -181,6 +182,7 @@ public CommonParams(IEntryParam entryParam) throws Exception {
181182
String files = entryParam.getValue("files", null);
182183
setPathConfigMap(entryParam.getValue("file-config", ""), files, false, false);
183184
}
185+
antiPrefixes = Arrays.asList(ParamsUtils.escapeSplit(entryParam.getValue("anti-prefixes", "")));
184186
setProcess();
185187
setPrivateType();
186188
regionName = entryParam.getValue("region", "").trim().toLowerCase();
@@ -297,11 +299,14 @@ public CommonParams(Map<String, String> paramsMap) throws Exception {
297299
setSaveSeparator();
298300
break;
299301
case "qupload":
300-
if (!fromLine) mapLine.put("key", entryParam.getValue("key", ""));
301-
String filepath = entryParam.getValue("filepath", "").trim();
302+
String key = entryParam.getValue("key", "");
303+
if (!fromLine) mapLine.put("key", key);
304+
String filepath = entryParam.getValue("filepath", "");
302305
if (!"".equals(filepath)) {
303306
indexMap.put("filepath", "filepath");
304307
mapLine.put("filepath", filepath);
308+
} else if ("".equals(key)) {
309+
throw new IOException("filepath and key shouldn't all be empty, file must be found with them.");
305310
}
306311
break;
307312
case "mime":
@@ -602,9 +607,9 @@ private void setProcess() throws Exception {
602607
}
603608
if ("qupload".equals(process) && "file".equals(entryParam.getValue("parse", "file")) && !"terminal".equals(source)) {
604609
isSelfUpload = true;
605-
String prefixes = entryParam.getValue("directories", null);
606-
setPathConfigMap(entryParam.getValue("directory-config", ""), prefixes, false, true);
607-
antiDirectories = Arrays.asList(ParamsUtils.escapeSplit(entryParam.getValue("anti-directories", "")));
610+
parse = "file"; // 修正 parse 的默认值
611+
String directories = entryParam.getValue("directories", null);
612+
setPathConfigMap(entryParam.getValue("directory-config", ""), directories, false, true);
608613
}
609614
}
610615

@@ -671,63 +676,91 @@ private void setPrivateType() throws IOException {
671676
}
672677
}
673678

674-
private Map<String, String> fromProcedureLog(String logFile) throws IOException {
675-
File file = new File(logFile);
676-
FileReader fileReader = new FileReader(file);
677-
BufferedReader bufferedReader = new BufferedReader(fileReader);
678-
Map<String, String> map = new HashMap<>();
679-
int index;
680-
String line;
681-
while ((line = bufferedReader.readLine()) != null) {
682-
index = line.indexOf(":{");
683-
map.put(line.substring(0, index), line.substring(index));
679+
private void fromProcedureLog(String logFile, boolean withMarker, boolean withEnd) throws IOException {
680+
String lastLine = FileUtils.lastLineOfFile(logFile);
681+
if (lastLine != null && !"".equals(lastLine)) {
682+
try {
683+
JsonObject jsonObject = JsonUtils.toJsonObject(lastLine);
684+
parseConfigMapFromJson(jsonObject, withMarker, withEnd);
685+
} catch (Exception e) {
686+
File file = new File(logFile);
687+
FileReader fileReader = new FileReader(file);
688+
BufferedReader bufferedReader = new BufferedReader(fileReader);
689+
int index;
690+
String line;
691+
String value;
692+
Map<String, String> map = new HashMap<>();
693+
while ((line = bufferedReader.readLine()) != null) {
694+
index = line.indexOf("-|-");
695+
map.put(line.substring(0, index), line.substring(index));
696+
}
697+
for (String key : map.keySet()) {
698+
value = map.get(key);
699+
if (!"".equals(value)) {
700+
pathConfigMap.put(key, JsonUtils.fromJson(value, map.getClass()));
701+
}
702+
}
703+
}
684704
}
685-
return map;
686705
}
687706

688-
private void setPathConfigMap(String jsonConfigPath, String subPaths, boolean withMarker, boolean withEnd) throws Exception {
689-
pathConfigMap = new HashMap<>();
690-
if (jsonConfigPath != null && !"".equals(jsonConfigPath)) {
691-
JsonFile jsonFile = new JsonFile(jsonConfigPath);
692-
JsonObject jsonCfg;
693-
JsonElement markerElement;
694-
JsonElement startElement;
695-
JsonElement endElement;
696-
for (String key : jsonFile.getKeys()) {
697-
Map<String, String> startAndEnd = new HashMap<>();
707+
private void parseConfigMapFromJson(JsonObject jsonObject, boolean withMarker, boolean withEnd) throws IOException {
708+
JsonObject jsonCfg;
709+
JsonElement markerElement;
710+
JsonElement startElement;
711+
JsonElement endElement;
712+
for (String key : jsonObject.keySet()) {
713+
Map<String, String> startAndEnd = new HashMap<>();
698714
// if ("".equals(prefix)) throw new IOException("prefix (prefixes config's element key) can't be empty.");
699-
JsonElement json = jsonFile.getElement(key);
700-
if (json == null || json instanceof JsonNull) {
701-
pathConfigMap.put(key, null);
702-
continue;
703-
}
704-
if (withMarker || withEnd) {
705-
if (!(json instanceof JsonObject)) throw new IOException("the value of key: " + key + " must be json.");
706-
jsonCfg = json.getAsJsonObject();
707-
if (withMarker) {
708-
markerElement = jsonCfg.get("marker");
709-
if (markerElement != null && !(markerElement instanceof JsonNull)) {
710-
startAndEnd.put("marker", markerElement.getAsString());
711-
}
712-
}
713-
startElement = jsonCfg.get("start");
714-
if (startElement != null && !(startElement instanceof JsonNull)) {
715-
startAndEnd.put("start", startElement.getAsString());
715+
JsonElement json = jsonObject.get(key);
716+
if (json == null || json instanceof JsonNull) {
717+
pathConfigMap.put(key, null);
718+
continue;
719+
}
720+
if (withMarker || withEnd) {
721+
if (!(json instanceof JsonObject)) throw new IOException("the value of key: " + key + " must be json.");
722+
jsonCfg = json.getAsJsonObject();
723+
if (withMarker) {
724+
markerElement = jsonCfg.get("marker");
725+
if (markerElement != null && !(markerElement instanceof JsonNull)) {
726+
startAndEnd.put("marker", markerElement.getAsString());
716727
}
717-
if (withEnd) {
718-
endElement = jsonCfg.get("end");
719-
if (endElement != null && !(endElement instanceof JsonNull)) {
720-
startAndEnd.put("end", endElement.getAsString());
721-
}
728+
}
729+
startElement = jsonCfg.get("start");
730+
if (startElement != null && !(startElement instanceof JsonNull)) {
731+
startAndEnd.put("start", startElement.getAsString());
732+
}
733+
if (withEnd) {
734+
endElement = jsonCfg.get("end");
735+
if (endElement != null && !(endElement instanceof JsonNull)) {
736+
startAndEnd.put("end", endElement.getAsString());
722737
}
723-
} else {
724-
startAndEnd.put("start", json.getAsString());
725738
}
726-
pathConfigMap.put(key, startAndEnd);
739+
} else {
740+
startAndEnd.put("start", json.getAsString());
741+
}
742+
pathConfigMap.put(key, startAndEnd);
743+
}
744+
}
745+
746+
private void setPathConfigMap(String jsonConfigPath, String subPaths, boolean withMarker, boolean withEnd) throws Exception {
747+
pathConfigMap = new HashMap<>();
748+
if (logFilepath == null || "".equals(logFilepath)) {
749+
if (jsonConfigPath != null && !"".equals(jsonConfigPath)) {
750+
JsonFile jsonFile = new JsonFile(jsonConfigPath);
751+
parseConfigMapFromJson(jsonFile.getJsonObject(), withMarker, withEnd);
752+
} else if (subPaths != null && !"".equals(subPaths)) {
753+
String[] subPathList = ParamsUtils.escapeSplit(subPaths);
754+
for (String subPath : subPathList) pathConfigMap.put(subPath, null);
755+
}
756+
} else {
757+
if (jsonConfigPath != null && !"".equals(jsonConfigPath)) {
758+
throw new IOException("log and uris can not be used together, please remove prefixes/files/directories if you want use breakpoint with log.");
759+
} else if (subPaths != null && !"".equals(subPaths)) {
760+
throw new IOException("log and json config can not be used together, please remove config path if you want use breakpoint with log.");
761+
} else {
762+
fromProcedureLog(logFilepath, withMarker, withEnd);
727763
}
728-
} else if (subPaths != null && !"".equals(subPaths)) {
729-
String[] subPathList = ParamsUtils.escapeSplit(subPaths);
730-
for (String subPath : subPathList) pathConfigMap.put(subPath, null);
731764
}
732765
}
733766

@@ -991,8 +1024,8 @@ private void setIndexMap() throws IOException {
9911024
if (!indexMap.containsKey("0")) indexMap.put("0", "key");
9921025
}
9931026
}
994-
if (ProcessUtils.needFilepath(process) || "file".equals(parse) || isSelfUpload) {
995-
setIndex(entryParam.getValue("filepath-index", "filepath").trim(), "filepath");
1027+
if (ProcessUtils.needFilepath(process) || "file".equals(parse)) {
1028+
setIndex(entryParam.getValue("filepath-index", fieldIndex ? "filepath" : "").trim(), "filepath");
9961029
// setIndex("parent", "parent");
9971030
}
9981031
if (indexMap.size() == 0) {
@@ -1097,7 +1130,7 @@ private void checkFilterForProcess() throws IOException {
10971130

10981131
private void setUnitLen(String unitLen) throws IOException {
10991132
if (unitLen.startsWith("-")) {
1100-
if (isSelfUpload) this.unitLen = 20;
1133+
if (isSelfUpload) this.unitLen = 3;
11011134
if ("qiniu".equals(source) || "local".equals(source)) this.unitLen = 10000;
11021135
else this.unitLen = 1000;
11031136
} else {

src/main/java/com/qiniu/entry/QSuitsEntry.java

+1-2
Original file line numberDiff line numberDiff line change
@@ -949,8 +949,7 @@ private ILineProcess<Map<String, String>> getCensorResult(Map<String, String> in
949949
: new QueryCensorResult(qiniuAccessKey, qiniuSecretKey, getQiniuConfig(), jobIdIndex, savePath);
950950
}
951951

952-
private ILineProcess<Map<String, String>> getQiniuUploadFile(Map<String, String> indexMap, boolean single)
953-
throws IOException {
952+
private ILineProcess<Map<String, String>> getQiniuUploadFile(Map<String, String> indexMap, boolean single) throws IOException {
954953
String pathIndex = indexMap.containsValue("filepath") ? "filepath" : null;
955954
String parentPath = entryParam.getValue("parent-path", "").trim();
956955
String recorder = entryParam.getValue("record", "false").trim();

src/main/java/com/qiniu/util/FileUtils.java

+25-3
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,10 @@
11
package com.qiniu.util;
22

33
import javax.activation.MimetypesFileTypeMap;
4-
import java.io.File;
5-
import java.io.IOException;
6-
import java.io.RandomAccessFile;
4+
import java.io.*;
75
import java.net.FileNameMap;
86
import java.net.URLConnection;
7+
import java.nio.charset.StandardCharsets;
98
import java.nio.file.Files;
109
import java.nio.file.Paths;
1110
import java.util.ArrayList;
@@ -225,4 +224,27 @@ public static void randomModify(String filePath, String oldStr, String newStr) t
225224
}
226225
}
227226
}
227+
228+
public static String lastLineOfFile(String filePath) throws IOException {
229+
RandomAccessFile accessFile = new RandomAccessFile(filePath, "r");
230+
if (accessFile.length() < 1) return null;
231+
byte[] bytes = new byte[1];
232+
long pos = accessFile.length();
233+
if (pos > 2) {
234+
accessFile.seek(--pos);
235+
accessFile.read(bytes);
236+
String s = new String(bytes);
237+
accessFile.seek(--pos);
238+
while (pos > 0 && accessFile.read(bytes) != -1 && "\n".equals(s)) {
239+
accessFile.seek(--pos);
240+
s = new String(bytes);
241+
}
242+
while (pos > 0 && accessFile.read(bytes) != -1 && !"\n".equals(s)) {
243+
accessFile.seek(--pos);
244+
s = new String(bytes);
245+
}
246+
if (pos > 0) accessFile.seek(pos + 2);
247+
}
248+
return new String(accessFile.readLine().getBytes(StandardCharsets.ISO_8859_1));
249+
}
228250
}

0 commit comments

Comments
 (0)