Skip to content

Commit 916ce88

Browse files
author
wubingheng
committed
fix containers and optimize converting.
1 parent d42f349 commit 916ce88

11 files changed

+63
-36
lines changed

src/main/java/com/qiniu/common/SuitsException.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -13,19 +13,19 @@ public SuitsException(int statusCode, String error) {
1313
}
1414

1515
public SuitsException(Exception e, int statusCode) {
16-
super(String.join(", ", String.valueOf(statusCode), e.getMessage()));
16+
super(String.join(", ", String.valueOf(statusCode), e.getCause().getMessage()));
1717
this.exception = e;
1818
this.statusCode = statusCode;
1919
}
2020

2121
public SuitsException(Exception e, int statusCode, String error) {
22-
super(String.join(", ", String.valueOf(statusCode), error, e.getMessage()));
22+
super(String.join(", ", String.valueOf(statusCode), error, e.getCause().getMessage()));
2323
this.exception = e;
2424
this.statusCode = statusCode;
2525
}
2626

2727
public SuitsException(SuitsException e, String message) {
28-
super(String.join(", ", e.getMessage(), message));
28+
super(String.join(", ", e.getCause().getMessage(), message));
2929
this.exception = e;
3030
this.statusCode = e.getStatusCode();
3131
}

src/main/java/com/qiniu/datasource/CloudStorageContainer.java

+1
Original file line numberDiff line numberDiff line change
@@ -222,6 +222,7 @@ public void export(IStorageLister<E> lister, IResultOutput saver, ILineProcess<T
222222
errorLogger.error("process objects: {}", lister.getPrefix(), e);
223223
if (e.response != null) e.response.close();
224224
}
225+
// statistics.addAndGet(convertedList.size());
225226
}
226227
if (hasNext) {
227228
json.addProperty("marker", lister.getMarker());

src/main/java/com/qiniu/datasource/DatasourceActor.java

+3
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
import java.time.LocalDateTime;
1515
import java.util.*;
1616
import java.util.concurrent.*;
17+
import java.util.concurrent.atomic.AtomicLong;
1718

1819
import static com.qiniu.entry.CommonParams.lineFormats;
1920

@@ -41,6 +42,7 @@ public abstract class DatasourceActor {
4142
protected ConcurrentMap<String, IResultOutput> saverMap;
4243
protected ConcurrentMap<String, ILineProcess> processorMap;
4344
protected boolean stopped;
45+
protected AtomicLong statistics;
4446
protected ConcurrentMap<String, String> progressMap;
4547

4648
public DatasourceActor(int unitLen, int threads) throws IOException {
@@ -49,6 +51,7 @@ public DatasourceActor(int unitLen, int threads) throws IOException {
4951
this.threads = threads;
5052
saverMap = new ConcurrentHashMap<>(threads);
5153
processorMap = new ConcurrentHashMap<>(threads);
54+
statistics = new AtomicLong(0);
5255
progressMap = new ConcurrentHashMap<>(threads);
5356
}
5457

src/main/java/com/qiniu/datasource/FileContainer.java

+7-1
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,7 @@ private void setDirectoriesAndMap(Map<String, Map<String, String>> directoriesMa
118118
String end = value == null ? null : value.get("end");
119119
File tempFile = new File(temp);
120120
if (!tempFile.exists()) tempFile = new File(realPath, temp);
121+
directories = new ArrayList<>();
121122
if (tempFile.isDirectory()) directories.add(tempFile);
122123
else throw new IOException(temp + " is not valid directory.");
123124
String forCheckPath = tempFile.getCanonicalPath() + FileUtils.pathSeparator;
@@ -247,6 +248,7 @@ public void export(IFileLister<E, File> lister, IResultOutput saver, ILineProces
247248
errorLogger.error("process objects: {}", lister.getName(), e);
248249
if (e.response != null) e.response.close();
249250
}
251+
statistics.addAndGet(convertedList.size());
250252
}
251253
if (hasNext) {
252254
json.addProperty("start", lister.currentEndFilepath());
@@ -496,6 +498,7 @@ private List<IFileLister<E, File>> checkListerInPool(int cValue, int tiny) {
496498
}
497499
sleep(1000);
498500
count++;
501+
rootLogger.info("finished count: {}.", statistics.get());
499502
}
500503
if (notCheck) return new ArrayList<>();
501504
else return list;
@@ -557,7 +560,10 @@ private void directoriesListing() throws Exception {
557560
list = checkListerInPool(cValue, tiny);
558561
}
559562
}
560-
while (!executorPool.isTerminated()) sleep(1000);
563+
while (!executorPool.isTerminated()) {
564+
sleep(1000);
565+
rootLogger.info("finished count: {}.", statistics.get());
566+
}
561567
}
562568

563569
@Override

src/main/java/com/qiniu/datasource/TextContainer.java

+11-4
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ private void setUrisAndMap(Map<String, Map<String, String>> urisMap) throws IOEx
6666
this.urisMap = new HashMap<>(threads);
6767
this.urisMap.putAll(urisMap);
6868
int size = this.urisMap.size();
69+
uris = new ArrayList<>();
6970
Iterator<String> iterator = this.urisMap.keySet().stream().sorted().collect(Collectors.toList()).iterator();
7071
while (iterator.hasNext() && size > 0) {
7172
size--;
@@ -162,12 +163,13 @@ public void export(ITextReader<S> reader, IResultOutput saver, ILineProcess<T> p
162163
if (HttpRespUtils.checkException(e, 2) < -1) throw e;
163164
if (e.response != null) e.response.close();
164165
}
165-
json.addProperty("start", reader.currentEndLine());
166+
statistics.addAndGet(convertedList.size());
167+
lastLine = reader.currentEndLine();
168+
json.addProperty("start", lastLine);
166169
record = json.toString();
167170
progressMap.put(reader.getName(), record);
168171
try { FileUtils.createIfNotExists(procedureLogFile); } catch (IOException ignored) {}
169172
procedureLogger.info("{}-|-{}", reader.getName(), record);
170-
lastLine = reader.currentEndLine();
171173
}
172174
}
173175

@@ -233,11 +235,11 @@ public void export() throws Exception {
233235
readers = getReaders(FileUtils.convertToRealPath(path));
234236
} else {
235237
if (hasAntiPrefixes) {
236-
uris.parallelStream().filter(this::checkPrefix).forEach(this::recordListerByUri);
238+
uris = uris.parallelStream().filter(this::checkPrefix).peek(this::recordListerByUri).collect(Collectors.toList());
237239
} else {
238240
uris.parallelStream().forEach(this::recordListerByUri);
239241
}
240-
readers = uris.parallelStream().filter(this::checkPrefix).map(uri -> {
242+
readers = uris.parallelStream().map(uri -> {
241243
try {
242244
return generateReader(uri);
243245
} catch (IOException e) {
@@ -252,6 +254,11 @@ public void export() throws Exception {
252254
} else {
253255
executorPool = Executors.newFixedThreadPool(threads);
254256
readers.parallelStream().forEach(this::reading);
257+
executorPool.shutdown();
258+
while (!executorPool.isTerminated()) {
259+
sleep(1000);
260+
rootLogger.info("finished count: {}.", statistics.get());
261+
}
255262
rootLogger.info("{} finished, results in {}.", info, savePath);
256263
}
257264
endAction();

src/main/java/com/qiniu/datasource/TextFileContainer.java

+3-2
Original file line numberDiff line numberDiff line change
@@ -94,11 +94,12 @@ protected List<ITextReader<File>> getReaders(String path) throws IOException {
9494
} else {
9595
throw new IOException("");
9696
}
97-
return files.parallelStream().map(file1 -> {
97+
return files.parallelStream().map(pFile -> {
9898
try {
99-
return new TextFileReader(file, null, unitLen);
99+
return new TextFileReader(pFile, null, unitLen);
100100
} catch (IOException e) {
101101
e.printStackTrace();
102+
errorLogger.error("generate lister failed by {}\t{}", pFile.getPath(), urisMap.get(pFile.getPath()), e);
102103
return null;
103104
}
104105
}).filter(Objects::nonNull).peek(reader -> recordListerByUri(reader.getName())).collect(Collectors.toList());

src/main/java/com/qiniu/datasource/UpYosContainer.java

-1
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
import java.util.concurrent.Executors;
2323
import java.util.concurrent.Future;
2424
import java.util.concurrent.atomic.AtomicInteger;
25-
import java.util.concurrent.atomic.AtomicLong;
2625
import java.util.concurrent.locks.Lock;
2726
import java.util.concurrent.locks.ReentrantLock;
2827
import java.util.stream.Collectors;

src/main/java/com/qiniu/entry/CommonParams.java

+9-7
Original file line numberDiff line numberDiff line change
@@ -717,7 +717,7 @@ private void parseConfigMapFromJson(JsonObject jsonObject, boolean withMarker, b
717717
pathConfigMap.put(key, null);
718718
continue;
719719
}
720-
if (withMarker || withEnd) {
720+
// if (withMarker || withEnd) {
721721
if (!(json instanceof JsonObject)) throw new IOException("the value of key: " + key + " must be json.");
722722
jsonCfg = json.getAsJsonObject();
723723
if (withMarker) {
@@ -736,9 +736,9 @@ private void parseConfigMapFromJson(JsonObject jsonObject, boolean withMarker, b
736736
startAndEnd.put("end", endElement.getAsString());
737737
}
738738
}
739-
} else {
740-
startAndEnd.put("start", json.getAsString());
741-
}
739+
// } else {
740+
// startAndEnd.put("start", json.getAsString());
741+
// }
742742
pathConfigMap.put(key, startAndEnd);
743743
}
744744
}
@@ -1206,9 +1206,11 @@ private void setSavePath() throws IOException {
12061206
boolean isOk = false;
12071207
if (files != null && files.length > 0) {
12081208
for (File file1 : files) {
1209-
if (file1.getName().startsWith(source) && file1.length() > 0) {
1210-
isOk = true;
1211-
break;
1209+
if (file1.length() > 0) {
1210+
if (file1.getName().startsWith(source) || (!"".equals(process) && file1.getName().startsWith(process))) {
1211+
isOk = true;
1212+
break;
1213+
}
12121214
}
12131215
}
12141216
if (isOk) {

src/main/java/com/qiniu/persistence/FileSaveMapper.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -43,9 +43,9 @@ public void changePrefixAndSuffix(String prefix, String suffix) {
4343
this.prefix = String.join("", prefix, "_");
4444
}
4545
if (suffix != null && !"".equals(suffix)) {
46-
this.suffix = String.join("", suffix, "_");
46+
this.suffix = String.join("", "_", suffix);
4747
}
48-
if (writerMap.size() == 0) {
48+
if (!writerMap.containsKey("success")) {
4949
for (String targetWriter : "success,error".split(",")) preAddWriter(targetWriter);
5050
}
5151
}

src/main/java/com/qiniu/util/ConvertingUtils.java

+21-14
Original file line numberDiff line numberDiff line change
@@ -327,20 +327,25 @@ public static <T> T toPair(BosObjectSummary bosObject, Map<String, String> index
327327

328328
public static <T> T toPair(JsonObject json, Map<String, String> indexMap, KeyValuePair<String, T> pair) throws IOException {
329329
if (json == null) throw new IOException("empty jsonObject.");
330+
String field;
330331
JsonElement jsonElement;
331332
for (String index : indexMap.keySet()) {
333+
field = indexMap.get(index);
332334
jsonElement = json.get(index);
333335
// JsonUtils.toString(null) 和 JsonUtils.toString(JsonNull.INSTANCE) 均为 null
334-
if (jsonElement instanceof JsonPrimitive) {
335-
JsonPrimitive primitive = jsonElement.getAsJsonPrimitive();
336-
if (primitive.isBoolean()) pair.put(indexMap.get(index), JsonUtils.fromJson(jsonElement, Boolean.class));
337-
else if (primitive.isNumber()) pair.put(indexMap.get(index), JsonUtils.fromJson(jsonElement, Long.class));
338-
else pair.put(indexMap.get(index), jsonElement.toString());
339-
} else {
336+
if (longFields.contains(field)) pair.put(field, jsonElement.getAsLong());
337+
else if (intFields.contains(field)) pair.put(field, jsonElement.getAsInt());
338+
// else if (jsonElement instanceof JsonPrimitive) {
339+
// JsonPrimitive primitive = jsonElement.getAsJsonPrimitive();
340+
// if (primitive.isBoolean()) pair.put(indexMap.get(index), JsonUtils.fromJson(jsonElement, Boolean.class));
341+
// else if (primitive.isNumber()) pair.put(indexMap.get(index), JsonUtils.fromJson(jsonElement, Long.class));
342+
// else pair.put(indexMap.get(index), jsonElement.toString());
343+
// }
344+
else {
340345
try {
341-
pair.put(indexMap.get(index), JsonUtils.toString(jsonElement));
346+
pair.put(field, JsonUtils.toString(jsonElement));
342347
} catch (JsonSyntaxException e) {
343-
pair.put(indexMap.get(index), String.valueOf(jsonElement));
348+
pair.put(field, String.valueOf(jsonElement));
344349
}
345350
}
346351
}
@@ -562,14 +567,16 @@ public static <T> T toPair(JsonObject json, List<String> fields, KeyValuePair<St
562567
JsonElement value;
563568
for (String field : fields) {
564569
value = json.get(field);
570+
if (value == null) continue;
565571
if (longFields.contains(field)) pair.put(field, value.getAsLong());
566572
else if (intFields.contains(field)) pair.put(field, value.getAsInt());
567-
else if (value instanceof JsonPrimitive) {
568-
JsonPrimitive primitive = value.getAsJsonPrimitive();
569-
if (primitive.isBoolean()) pair.put(field, JsonUtils.fromJson(value, Boolean.class));
570-
else if (primitive.isNumber()) pair.put(field, JsonUtils.fromJson(value, Long.class));
571-
else pair.put(field, String.valueOf(value));
572-
} else {
573+
// else if (value instanceof JsonPrimitive) {
574+
// JsonPrimitive primitive = value.getAsJsonPrimitive();
575+
// if (primitive.isBoolean()) pair.put(field, JsonUtils.fromJson(value, Boolean.class));
576+
// else if (primitive.isNumber()) pair.put(field, JsonUtils.fromJson(value, Long.class));
577+
// else pair.put(field, String.valueOf(value));
578+
// }
579+
else {
573580
try {
574581
pair.put(field, JsonUtils.toString(value));
575582
} catch (JsonSyntaxException e) {

src/main/java/com/qiniu/util/JsonUtils.java

+3-2
Original file line numberDiff line numberDiff line change
@@ -50,8 +50,9 @@ public static String toString(JsonElement jsonElement) {
5050

5151
public static String toJsonWithoutUrlEscape(Object srcObject) {
5252
return escapeGson.toJson(srcObject)
53-
.replace(":\"{\\\"", ":{\"")
54-
.replace("\\\"}\",", "\"},")
53+
.replace("\"{}\"", "{}")
54+
.replace("\"{\\\"", "{\"")
55+
.replace("\\\"}\"", "\"}")
5556
.replace("\\\":\\\"", "\":\"")
5657
.replace("\\\\", "\\");
5758
}

0 commit comments

Comments
 (0)