24
24
import com .spectralogic .ds3client .models .bulk .Node ;
25
25
import com .spectralogic .ds3client .models .bulk .Objects ;
26
26
import com .spectralogic .ds3client .serializer .XmlProcessingException ;
27
+ import org .slf4j .Logger ;
28
+ import org .slf4j .LoggerFactory ;
27
29
28
30
import java .io .IOException ;
29
31
import java .security .SignatureException ;
33
35
import java .util .concurrent .Executors ;
34
36
35
37
class ChunkTransferrer {
38
+ private final static Logger LOG = LoggerFactory .getLogger (ChunkTransferrer .class );
36
39
private final ItemTransferrer itemTransferrer ;
37
40
private final Ds3Client mainClient ;
38
41
private final JobPartTracker partTracker ;
@@ -57,18 +60,24 @@ public void transferChunks(
57
60
final Iterable <Node > nodes ,
58
61
final Iterable <Objects > chunks )
59
62
throws SignatureException , IOException , XmlProcessingException {
63
+ LOG .debug ("Getting ready to process chunks" );
60
64
final Map <UUID , Node > nodeMap = buildNodeMap (nodes );
65
+ LOG .debug ("Starting executor service" );
61
66
final ListeningExecutorService executor = MoreExecutors .listeningDecorator (Executors .newFixedThreadPool (maxParallelRequests ));
67
+ LOG .debug ("Executor service started" );
62
68
try {
63
69
final List <ListenableFuture <?>> tasks = new ArrayList <>();
64
70
for (final Objects chunk : chunks ) {
71
+ LOG .debug ("Processing parts for chunk: " + chunk .getChunkId ().toString ());
65
72
final Ds3Client client = mainClient .newForNode (nodeMap .get (chunk .getNodeId ()));
66
73
for (final BulkObject ds3Object : chunk ) {
67
74
final ObjectPart part = new ObjectPart (ds3Object .getOffset (), ds3Object .getLength ());
68
75
if (this .partTracker .containsPart (ds3Object .getName (), part )) {
76
+ LOG .debug ("Adding " + ds3Object .getName () + " to executor for processing" );
69
77
tasks .add (executor .submit (new Callable <Object >() {
70
78
@ Override
71
79
public Object call () throws Exception {
80
+ LOG .debug ("Processing " + ds3Object .getName ());
72
81
ChunkTransferrer .this .itemTransferrer .transferItem (client , ds3Object );
73
82
ChunkTransferrer .this .partTracker .completePart (ds3Object .getName (), part );
74
83
return null ;
@@ -79,6 +88,7 @@ public Object call() throws Exception {
79
88
}
80
89
executeWithExceptionHandling (tasks );
81
90
} finally {
91
+ LOG .debug ("Shutting down executor" );
82
92
executor .shutdown ();
83
93
}
84
94
}
0 commit comments