Skip to content

Commit 3a898f0

Browse files
author
吴炳亨
authored
Merge pull request #273 from NigelWu95/dev
Dev
2 parents a151b65 + 1163a74 commit 3a898f0

14 files changed

+228
-67
lines changed

pom.xml

+1-1
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66

77
<groupId>com.qiniu</groupId>
88
<artifactId>qsuits</artifactId>
9-
<version>8.1.0</version>
9+
<version>8.2.1</version>
1010
<name>qsuits</name>
1111
<description>qiniu-suits is a efficient tools for qiniu api implemented by java8.</description>
1212
<url>https://github.com/NigelWu95/qiniu-suits-java</url>

src/main/java/com/qiniu/common/SuitsException.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -13,19 +13,19 @@ public SuitsException(int statusCode, String error) {
1313
}
1414

1515
public SuitsException(Exception e, int statusCode) {
16-
super(String.join(", ", String.valueOf(statusCode), e.getMessage()));
16+
super(String.join(", ", String.valueOf(statusCode), e.getCause().getMessage()));
1717
this.exception = e;
1818
this.statusCode = statusCode;
1919
}
2020

2121
public SuitsException(Exception e, int statusCode, String error) {
22-
super(String.join(", ", String.valueOf(statusCode), error, e.getMessage()));
22+
super(String.join(", ", String.valueOf(statusCode), error, e.getCause().getMessage()));
2323
this.exception = e;
2424
this.statusCode = statusCode;
2525
}
2626

2727
public SuitsException(SuitsException e, String message) {
28-
super(String.join(", ", e.getMessage(), message));
28+
super(String.join(", ", e.getCause().getMessage(), message));
2929
this.exception = e;
3030
this.statusCode = e.getStatusCode();
3131
}

src/main/java/com/qiniu/datasource/CloudStorageContainer.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -222,6 +222,7 @@ public void export(IStorageLister<E> lister, IResultOutput saver, ILineProcess<T
222222
errorLogger.error("process objects: {}", lister.getPrefix(), e);
223223
if (e.response != null) e.response.close();
224224
}
225+
// statistics.addAndGet(convertedList.size());
225226
}
226227
if (hasNext) {
227228
json.addProperty("marker", lister.getMarker());
@@ -411,7 +412,7 @@ private List<IStorageLister<E>> filteredListerByPrefixes(Stream<String> prefixes
411412
return prefixesLister;
412413
}
413414

414-
private void processNodeLister(IStorageLister<E> lister) {
415+
void processNodeLister(IStorageLister<E> lister) {
415416
if (lister.currents().size() > 0 || lister.hasNext()) {
416417
executorPool.execute(() -> listing(lister));
417418
} else {

src/main/java/com/qiniu/datasource/DatasourceActor.java

+4-1
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
import java.time.LocalDateTime;
1515
import java.util.*;
1616
import java.util.concurrent.*;
17+
import java.util.concurrent.atomic.AtomicLong;
1718

1819
import static com.qiniu.entry.CommonParams.lineFormats;
1920

@@ -41,6 +42,7 @@ public abstract class DatasourceActor {
4142
protected ConcurrentMap<String, IResultOutput> saverMap;
4243
protected ConcurrentMap<String, ILineProcess> processorMap;
4344
protected boolean stopped;
45+
protected AtomicLong statistics;
4446
protected ConcurrentMap<String, String> progressMap;
4547

4648
public DatasourceActor(int unitLen, int threads) throws IOException {
@@ -49,6 +51,7 @@ public DatasourceActor(int unitLen, int threads) throws IOException {
4951
this.threads = threads;
5052
saverMap = new ConcurrentHashMap<>(threads);
5153
processorMap = new ConcurrentHashMap<>(threads);
54+
statistics = new AtomicLong(0);
5255
progressMap = new ConcurrentHashMap<>(threads);
5356
}
5457

@@ -71,7 +74,7 @@ public void setRetryTimes(int retryTimes) {
7174

7275
void recordLister(String key, String record) {
7376
try { FileUtils.createIfNotExists(procedureLogFile); } catch (IOException ignored) {}
74-
procedureLogger.info("{}:{}", key, record);
77+
procedureLogger.info("{}-|-{}", key, record);
7578
progressMap.put(key, record);
7679
}
7780

src/main/java/com/qiniu/datasource/FileContainer.java

+10-4
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,7 @@ private void setDirectoriesAndMap(Map<String, Map<String, String>> directoriesMa
118118
String end = value == null ? null : value.get("end");
119119
File tempFile = new File(temp);
120120
if (!tempFile.exists()) tempFile = new File(realPath, temp);
121+
directories = new ArrayList<>();
121122
if (tempFile.isDirectory()) directories.add(tempFile);
122123
else throw new IOException(temp + " is not valid directory.");
123124
String forCheckPath = tempFile.getCanonicalPath() + FileUtils.pathSeparator;
@@ -247,6 +248,7 @@ public void export(IFileLister<E, File> lister, IResultOutput saver, ILineProces
247248
errorLogger.error("process objects: {}", lister.getName(), e);
248249
if (e.response != null) e.response.close();
249250
}
251+
statistics.addAndGet(convertedList.size());
250252
}
251253
if (hasNext) {
252254
json.addProperty("start", lister.currentEndFilepath());
@@ -372,7 +374,7 @@ private List<File> loopForFutures(List<Future<IFileLister<E, File>>> futures) th
372374
}
373375

374376
private Lock lock = new ReentrantLock();
375-
private AtomicInteger integer = new AtomicInteger(threads);
377+
private AtomicInteger integer = new AtomicInteger(0);
376378

377379
private List<File> listForNextIteratively(List<File> directories) throws Exception {
378380
List<Future<IFileLister<E, File>>> futures = new ArrayList<>();
@@ -411,7 +413,7 @@ private List<File> listForNextIteratively(List<File> directories) throws Excepti
411413
}
412414
} catch (Exception e) {
413415
try { FileUtils.createIfNotExists(errorLogFile); } catch (IOException ignored) {}
414-
errorLogger.error("excute lister failed", e);
416+
errorLogger.error("execute lister failed", e);
415417
} finally {
416418
lock.unlock();
417419
}
@@ -496,6 +498,7 @@ private List<IFileLister<E, File>> checkListerInPool(int cValue, int tiny) {
496498
}
497499
sleep(1000);
498500
count++;
501+
rootLogger.info("finished count: {}.", statistics.get());
499502
}
500503
if (notCheck) return new ArrayList<>();
501504
else return list;
@@ -506,7 +509,7 @@ private void directoriesListing() throws Exception {
506509
// directories = directories.parallelStream().map(this::directoriesFromLister).filter(Objects::nonNull)
507510
// .reduce((list1, list2) -> { list1.addAll(list2); return list1; }).orElse(null);
508511
// }
509-
while (directories != null && directories.size() > 0) directories = listForNextIteratively(directories);
512+
while (directories.size() > 0) directories = listForNextIteratively(directories);
510513
executorPool.shutdown();
511514
if (threads > 1) {
512515
int cValue = threads >= 10 ? threads / 2 : 3;
@@ -557,7 +560,10 @@ private void directoriesListing() throws Exception {
557560
list = checkListerInPool(cValue, tiny);
558561
}
559562
}
560-
while (!executorPool.isTerminated()) sleep(1000);
563+
while (!executorPool.isTerminated()) {
564+
sleep(1000);
565+
rootLogger.info("finished count: {}.", statistics.get());
566+
}
561567
}
562568

563569
@Override

src/main/java/com/qiniu/datasource/TextContainer.java

+11-4
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ private void setUrisAndMap(Map<String, Map<String, String>> urisMap) throws IOEx
6666
this.urisMap = new HashMap<>(threads);
6767
this.urisMap.putAll(urisMap);
6868
int size = this.urisMap.size();
69+
uris = new ArrayList<>();
6970
Iterator<String> iterator = this.urisMap.keySet().stream().sorted().collect(Collectors.toList()).iterator();
7071
while (iterator.hasNext() && size > 0) {
7172
size--;
@@ -162,12 +163,13 @@ public void export(ITextReader<S> reader, IResultOutput saver, ILineProcess<T> p
162163
if (HttpRespUtils.checkException(e, 2) < -1) throw e;
163164
if (e.response != null) e.response.close();
164165
}
165-
json.addProperty("start", reader.currentEndLine());
166+
statistics.addAndGet(convertedList.size());
167+
lastLine = reader.currentEndLine();
168+
json.addProperty("start", lastLine);
166169
record = json.toString();
167170
progressMap.put(reader.getName(), record);
168171
try { FileUtils.createIfNotExists(procedureLogFile); } catch (IOException ignored) {}
169172
procedureLogger.info("{}-|-{}", reader.getName(), record);
170-
lastLine = reader.currentEndLine();
171173
}
172174
}
173175

@@ -233,11 +235,11 @@ public void export() throws Exception {
233235
readers = getReaders(FileUtils.convertToRealPath(path));
234236
} else {
235237
if (hasAntiPrefixes) {
236-
uris.parallelStream().filter(this::checkPrefix).forEach(this::recordListerByUri);
238+
uris = uris.parallelStream().filter(this::checkPrefix).peek(this::recordListerByUri).collect(Collectors.toList());
237239
} else {
238240
uris.parallelStream().forEach(this::recordListerByUri);
239241
}
240-
readers = uris.parallelStream().filter(this::checkPrefix).map(uri -> {
242+
readers = uris.parallelStream().map(uri -> {
241243
try {
242244
return generateReader(uri);
243245
} catch (IOException e) {
@@ -252,6 +254,11 @@ public void export() throws Exception {
252254
} else {
253255
executorPool = Executors.newFixedThreadPool(threads);
254256
readers.parallelStream().forEach(this::reading);
257+
executorPool.shutdown();
258+
while (!executorPool.isTerminated()) {
259+
sleep(1000);
260+
rootLogger.info("finished count: {}.", statistics.get());
261+
}
255262
rootLogger.info("{} finished, results in {}.", info, savePath);
256263
}
257264
endAction();

src/main/java/com/qiniu/datasource/TextFileContainer.java

+3-2
Original file line numberDiff line numberDiff line change
@@ -94,11 +94,12 @@ protected List<ITextReader<File>> getReaders(String path) throws IOException {
9494
} else {
9595
throw new IOException("");
9696
}
97-
return files.parallelStream().map(file1 -> {
97+
return files.parallelStream().map(pFile -> {
9898
try {
99-
return new TextFileReader(file, null, unitLen);
99+
return new TextFileReader(pFile, null, unitLen);
100100
} catch (IOException e) {
101101
e.printStackTrace();
102+
errorLogger.error("generate lister failed by {}\t{}", pFile.getPath(), urisMap.get(pFile.getPath()), e);
102103
return null;
103104
}
104105
}).filter(Objects::nonNull).peek(reader -> recordListerByUri(reader.getName())).collect(Collectors.toList());

0 commit comments

Comments
 (0)