17
17
import org .opensearch .index .Message ;
18
18
import org .opensearch .index .engine .IngestionEngine ;
19
19
20
+ import java .util .Comparator ;
20
21
import java .util .List ;
21
22
import java .util .Locale ;
22
23
import java .util .Objects ;
23
24
import java .util .Set ;
24
- import java .util .concurrent .ArrayBlockingQueue ;
25
- import java .util .concurrent .BlockingQueue ;
26
25
import java .util .concurrent .ExecutorService ;
27
26
import java .util .concurrent .Executors ;
28
27
@@ -47,8 +46,6 @@ public class DefaultStreamPoller implements StreamPoller {
47
46
48
47
private ExecutorService consumerThread ;
49
48
50
- private ExecutorService processorThread ;
51
-
52
49
// start of the batch, inclusive
53
50
private IngestionShardPointer initialBatchStartPointer ;
54
51
private boolean includeBatchStartPointer = false ;
@@ -58,16 +55,14 @@ public class DefaultStreamPoller implements StreamPoller {
58
55
59
56
private Set <IngestionShardPointer > persistedPointers ;
60
57
61
- private BlockingQueue <IngestionShardConsumer .ReadResult <? extends IngestionShardPointer , ? extends Message >> blockingQueue ;
62
-
63
- private MessageProcessorRunnable processorRunnable ;
64
-
65
58
private final CounterMetric totalPolledCount = new CounterMetric ();
66
59
67
60
// A pointer to the max persisted pointer for optimizing the check
68
61
@ Nullable
69
62
private IngestionShardPointer maxPersistedPointer ;
70
63
64
+ private PartitionedBlockingQueueContainer blockingQueueContainer ;
65
+
71
66
public DefaultStreamPoller (
72
67
IngestionShardPointer startPointer ,
73
68
Set <IngestionShardPointer > persistedPointers ,
@@ -78,13 +73,14 @@ public DefaultStreamPoller(
78
73
IngestionErrorStrategy errorStrategy ,
79
74
State initialState ,
80
75
long maxPollSize ,
81
- int pollTimeout
76
+ int pollTimeout ,
77
+ int numProcessorThreads
82
78
) {
83
79
this (
84
80
startPointer ,
85
81
persistedPointers ,
86
82
consumer ,
87
- new MessageProcessorRunnable ( new ArrayBlockingQueue <>( 100 ), ingestionEngine , errorStrategy ),
83
+ new PartitionedBlockingQueueContainer ( numProcessorThreads , consumer . getShardId ( ), ingestionEngine , errorStrategy ),
88
84
resetState ,
89
85
resetValue ,
90
86
errorStrategy ,
@@ -96,7 +92,7 @@ public DefaultStreamPoller(
96
92
IngestionShardPointer startPointer ,
97
93
Set <IngestionShardPointer > persistedPointers ,
98
94
IngestionShardConsumer consumer ,
99
- MessageProcessorRunnable processorRunnable ,
95
+ PartitionedBlockingQueueContainer blockingQueueContainer ,
100
96
ResetState resetState ,
101
97
String resetValue ,
102
98
IngestionErrorStrategy errorStrategy ,
@@ -111,22 +107,13 @@ public DefaultStreamPoller(
111
107
if (!this .persistedPointers .isEmpty ()) {
112
108
maxPersistedPointer = this .persistedPointers .stream ().max (IngestionShardPointer ::compareTo ).get ();
113
109
}
114
- this .processorRunnable = processorRunnable ;
115
- blockingQueue = processorRunnable .getBlockingQueue ();
110
+ this .blockingQueueContainer = blockingQueueContainer ;
116
111
this .consumerThread = Executors .newSingleThreadExecutor (
117
112
r -> new Thread (
118
113
r ,
119
114
String .format (Locale .ROOT , "stream-poller-consumer-%d-%d" , consumer .getShardId (), System .currentTimeMillis ())
120
115
)
121
116
);
122
-
123
- // TODO: allow multiple threads for processing the messages in parallel
124
- this .processorThread = Executors .newSingleThreadExecutor (
125
- r -> new Thread (
126
- r ,
127
- String .format (Locale .ROOT , "stream-poller-processor-%d-%d" , consumer .getShardId (), System .currentTimeMillis ())
128
- )
129
- );
130
117
this .errorStrategy = errorStrategy ;
131
118
}
132
119
@@ -144,7 +131,7 @@ public void start() {
144
131
// when we start, we need to include the batch start pointer in the read for the first read
145
132
includeBatchStartPointer = true ;
146
133
consumerThread .submit (this ::startPoll );
147
- processorThread . submit ( processorRunnable );
134
+ blockingQueueContainer . startProcessorThreads ( );
148
135
}
149
136
150
137
/**
@@ -242,7 +229,7 @@ private void processRecords(List<IngestionShardConsumer.ReadResult<? extends Ing
242
229
continue ;
243
230
}
244
231
totalPolledCount .inc ();
245
- blockingQueue . put (result );
232
+ blockingQueueContainer . add (result );
246
233
247
234
logger .debug (
248
235
"Put message {} with pointer {} to the blocking queue" ,
@@ -322,10 +309,8 @@ public void close() {
322
309
logger .error ("Error in closing the poller of shard {}: {}" , consumer .getShardId (), e );
323
310
}
324
311
}
325
- blockingQueue .clear ();
326
312
consumerThread .shutdown ();
327
- // interrupts the processor
328
- processorThread .shutdownNow ();
313
+ blockingQueueContainer .close ();
329
314
logger .info ("closed the poller of shard {}" , consumer .getShardId ());
330
315
}
331
316
@@ -343,19 +328,23 @@ public boolean isClosed() {
343
328
* Returns the batch start pointer from where the poller can resume in case of shard recovery. The poller and
344
329
* processor are decoupled in this implementation, and hence the latest pointer tracked by the processor acts as the
345
330
* recovery/start point. In case the processor has not started tracking, then the initial batchStartPointer used by
346
- * the poller acts as the start point.
331
+ * the poller acts as the start point. If multiple processor threads are used, the minimum shard pointer across
332
+ * processors indicates the start point.
347
333
*/
348
334
@ Override
349
335
public IngestionShardPointer getBatchStartPointer () {
350
- IngestionShardPointer currentShardPointer = processorRunnable .getCurrentShardPointer ();
351
- return currentShardPointer == null ? initialBatchStartPointer : currentShardPointer ;
336
+ return blockingQueueContainer .getCurrentShardPointers ()
337
+ .stream ()
338
+ .filter (Objects ::nonNull )
339
+ .min (Comparator .naturalOrder ())
340
+ .orElseGet (() -> initialBatchStartPointer );
352
341
}
353
342
354
343
@ Override
355
344
public PollingIngestStats getStats () {
356
345
PollingIngestStats .Builder builder = new PollingIngestStats .Builder ();
357
346
builder .setTotalPolledCount (totalPolledCount .count ());
358
- builder .setTotalProcessedCount (processorRunnable . getStats (). count ());
347
+ builder .setTotalProcessedCount (blockingQueueContainer . getTotalProcessedCount ());
359
348
return builder .build ();
360
349
}
361
350
@@ -371,6 +360,6 @@ public IngestionErrorStrategy getErrorStrategy() {
371
360
@ Override
372
361
public void updateErrorStrategy (IngestionErrorStrategy errorStrategy ) {
373
362
this .errorStrategy = errorStrategy ;
374
- processorRunnable . setErrorStrategy (errorStrategy );
363
+ blockingQueueContainer . updateErrorStrategy (errorStrategy );
375
364
}
376
365
}
0 commit comments