Skip to content

Commit 1c7dcfb

Browse files
authored
Merge pull request #187 from ChannelFinder/chunking
Chunking
2 parents f2562da + 9c30a77 commit 1c7dcfb

File tree

3 files changed

+184
-87
lines changed

3 files changed

+184
-87
lines changed

src/main/java/org/phoebus/channelfinder/ChannelRepository.java

Lines changed: 160 additions & 86 deletions
Original file line numberDiff line numberDiff line change
@@ -31,22 +31,31 @@
3131
import com.fasterxml.jackson.databind.ObjectMapper;
3232
import java.io.IOException;
3333
import java.text.MessageFormat;
34+
import java.util.ArrayList;
3435
import java.util.Collections;
3536
import java.util.Comparator;
3637
import java.util.HashSet;
38+
import java.util.LinkedHashSet;
3739
import java.util.List;
3840
import java.util.Map;
3941
import java.util.Optional;
42+
import java.util.Set;
43+
import java.util.concurrent.ExecutorService;
44+
import java.util.concurrent.Executors;
45+
import java.util.concurrent.Future;
46+
import java.util.concurrent.TimeUnit;
4047
import java.util.logging.Level;
4148
import java.util.logging.Logger;
4249
import java.util.stream.Collectors;
4350
import java.util.stream.StreamSupport;
51+
import javax.annotation.PreDestroy;
4452
import org.phoebus.channelfinder.entity.Channel;
4553
import org.phoebus.channelfinder.entity.Property;
4654
import org.phoebus.channelfinder.entity.SearchResult;
4755
import org.phoebus.channelfinder.entity.Tag;
4856
import org.springframework.beans.factory.annotation.Autowired;
4957
import org.springframework.beans.factory.annotation.Qualifier;
58+
import org.springframework.beans.factory.annotation.Value;
5059
import org.springframework.context.annotation.Configuration;
5160
import org.springframework.data.repository.CrudRepository;
5261
import org.springframework.http.HttpStatus;
@@ -67,11 +76,18 @@ public class ChannelRepository implements CrudRepository<Channel, String> {
6776
@Qualifier("indexClient")
6877
ElasticsearchClient client;
6978

79+
@Value("${repository.chunk.size:10000}")
80+
private int chunkSize;
81+
82+
// Object mapper to ignore properties we don't want to index
7083
final ObjectMapper objectMapper =
7184
new ObjectMapper()
7285
.addMixIn(Tag.class, Tag.OnlyTag.class)
7386
.addMixIn(Property.class, Property.OnlyProperty.class);
7487

88+
private final ExecutorService executor =
89+
Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
90+
7591
/**
7692
* create a new channel using the given Channel
7793
*
@@ -110,41 +126,59 @@ public Channel index(Channel channel) {
110126
* @return the created channels
111127
*/
112128
public List<Channel> indexAll(List<Channel> channels) {
113-
BulkRequest.Builder br = new BulkRequest.Builder();
114-
115-
for (Channel channel : channels) {
116-
br.operations(
117-
op ->
118-
op.index(
119-
idx ->
120-
idx.index(esService.getES_CHANNEL_INDEX())
121-
.id(channel.getName())
122-
.document(
123-
JsonData.of(channel, new JacksonJsonpMapper(objectMapper)))))
124-
.refresh(Refresh.True);
129+
List<Future<List<Channel>>> futures = new ArrayList<>();
130+
131+
for (int i = 0; i < channels.size(); i += chunkSize) {
132+
List<Channel> chunk = channels.stream().skip(i).limit(chunkSize).toList();
133+
futures.add(
134+
executor.submit(
135+
() -> {
136+
BulkRequest.Builder br = new BulkRequest.Builder();
137+
for (Channel channel : chunk) {
138+
br.operations(
139+
op ->
140+
op.index(
141+
idx ->
142+
idx.index(esService.getES_CHANNEL_INDEX())
143+
.id(channel.getName())
144+
.document(
145+
JsonData.of(
146+
channel, new JacksonJsonpMapper(objectMapper)))))
147+
.refresh(Refresh.True);
148+
}
149+
BulkResponse result;
150+
try {
151+
result = client.bulk(br.build());
152+
// Log errors, if any
153+
if (result.errors()) {
154+
logger.log(Level.SEVERE, TextUtil.BULK_HAD_ERRORS);
155+
for (BulkResponseItem item : result.items()) {
156+
if (item.error() != null) {
157+
logger.log(Level.SEVERE, () -> item.error().reason());
158+
}
159+
}
160+
// TODO cleanup? or throw exception?
161+
} else {
162+
return chunk;
163+
}
164+
} catch (IOException e) {
165+
String message = MessageFormat.format(TextUtil.FAILED_TO_INDEX_CHANNELS, chunk);
166+
logger.log(Level.SEVERE, message, e);
167+
throw new ResponseStatusException(
168+
HttpStatus.INTERNAL_SERVER_ERROR, message, null);
169+
}
170+
return Collections.emptyList();
171+
}));
125172
}
126-
127-
BulkResponse result = null;
128-
try {
129-
result = client.bulk(br.build());
130-
// Log errors, if any
131-
if (result.errors()) {
132-
logger.log(Level.SEVERE, TextUtil.BULK_HAD_ERRORS);
133-
for (BulkResponseItem item : result.items()) {
134-
if (item.error() != null) {
135-
logger.log(Level.SEVERE, () -> item.error().reason());
136-
}
137-
}
138-
// TODO cleanup? or throw exception?
139-
} else {
140-
return channels;
173+
List<Channel> allIndexed = new ArrayList<>();
174+
for (Future<List<Channel>> future : futures) {
175+
try {
176+
allIndexed.addAll(future.get(10, TimeUnit.MINUTES));
177+
} catch (Exception e) {
178+
logger.log(Level.SEVERE, "Bulk indexing failed", e);
141179
}
142-
} catch (IOException e) {
143-
String message = MessageFormat.format(TextUtil.FAILED_TO_INDEX_CHANNELS, channels);
144-
logger.log(Level.SEVERE, message, e);
145-
throw new ResponseStatusException(HttpStatus.INTERNAL_SERVER_ERROR, message, null);
146180
}
147-
return Collections.emptyList();
181+
return allIndexed;
148182
}
149183

150184
/**
@@ -177,7 +211,12 @@ public Channel save(String channelName, Channel channel) {
177211
return null;
178212
}
179213

180-
/** */
214+
/**
215+
* Saves the given channel entity. This method is required by the CrudRepository interface.
216+
*
217+
* @param channel the channel entity to save
218+
* @return the saved channel entity
219+
*/
181220
@Override
182221
public Channel save(Channel channel) {
183222
return save(channel.getName(), channel);
@@ -193,66 +232,88 @@ public Channel save(Channel channel) {
193232
@SuppressWarnings("unchecked")
194233
@Override
195234
public <S extends Channel> Iterable<S> saveAll(Iterable<S> channels) {
196-
// Create a list of all channel names
197-
List<String> ids =
235+
List<Future<List<Channel>>> futures = new ArrayList<>();
236+
Set<Channel> channelList =
198237
StreamSupport.stream(channels.spliterator(), false)
199-
.map(Channel::getName)
200-
.collect(Collectors.toList());
238+
.collect(Collectors.toCollection(LinkedHashSet::new));
201239

202-
try {
240+
for (int i = 0; i < channelList.size(); i += chunkSize) {
241+
List<Channel> chunk = channelList.stream().skip(i).limit(chunkSize).toList();
242+
// Create a list of all channel names
243+
Set<String> ids =
244+
chunk.stream().map(Channel::getName).collect(Collectors.toCollection(LinkedHashSet::new));
203245
Map<String, Channel> existingChannels =
204246
findAllById(ids).stream().collect(Collectors.toMap(Channel::getName, c -> c));
205247

206-
BulkRequest.Builder br = new BulkRequest.Builder();
207-
208-
for (Channel channel : channels) {
209-
if (existingChannels.containsKey(channel.getName())) {
210-
// merge with existing channel
211-
Channel updatedChannel = existingChannels.get(channel.getName());
212-
if (channel.getOwner() != null && !channel.getOwner().isEmpty())
213-
updatedChannel.setOwner(channel.getOwner());
214-
updatedChannel.addProperties(channel.getProperties());
215-
updatedChannel.addTags(channel.getTags());
216-
br.operations(
217-
op ->
218-
op.index(
219-
i ->
220-
i.index(esService.getES_CHANNEL_INDEX())
221-
.id(updatedChannel.getName())
222-
.document(
223-
JsonData.of(
224-
updatedChannel, new JacksonJsonpMapper(objectMapper)))));
225-
} else {
226-
br.operations(
227-
op ->
228-
op.index(
229-
i ->
230-
i.index(esService.getES_CHANNEL_INDEX())
231-
.id(channel.getName())
232-
.document(
233-
JsonData.of(channel, new JacksonJsonpMapper(objectMapper)))));
234-
}
235-
}
236-
BulkResponse result = null;
237-
result = client.bulk(br.refresh(Refresh.True).build());
238-
// Log errors, if any
239-
if (result.errors()) {
240-
logger.log(Level.SEVERE, TextUtil.BULK_HAD_ERRORS);
241-
for (BulkResponseItem item : result.items()) {
242-
if (item.error() != null) {
243-
logger.log(Level.SEVERE, () -> item.error().reason());
244-
}
245-
}
246-
// TODO cleanup? or throw exception?
247-
} else {
248-
return (Iterable<S>) findAllById(ids);
248+
futures.add(
249+
executor.submit(
250+
() -> {
251+
BulkRequest.Builder br = new BulkRequest.Builder();
252+
for (Channel channel : chunk) {
253+
if (existingChannels.containsKey(channel.getName())) {
254+
// merge with existing channel
255+
Channel updatedChannel = existingChannels.get(channel.getName());
256+
if (channel.getOwner() != null && !channel.getOwner().isEmpty())
257+
updatedChannel.setOwner(channel.getOwner());
258+
updatedChannel.addProperties(channel.getProperties());
259+
updatedChannel.addTags(channel.getTags());
260+
br.operations(
261+
op ->
262+
op.index(
263+
ch ->
264+
ch.index(esService.getES_CHANNEL_INDEX())
265+
.id(updatedChannel.getName())
266+
.document(
267+
JsonData.of(
268+
updatedChannel,
269+
new JacksonJsonpMapper(objectMapper)))));
270+
} else {
271+
br.operations(
272+
op ->
273+
op.index(
274+
ch ->
275+
ch.index(esService.getES_CHANNEL_INDEX())
276+
.id(channel.getName())
277+
.document(
278+
JsonData.of(
279+
channel, new JacksonJsonpMapper(objectMapper)))));
280+
}
281+
}
282+
BulkResponse result;
283+
try {
284+
result = client.bulk(br.refresh(Refresh.True).build());
285+
// Log errors, if any
286+
if (result.errors()) {
287+
logger.log(Level.SEVERE, TextUtil.BULK_HAD_ERRORS);
288+
for (BulkResponseItem item : result.items()) {
289+
if (item.error() != null) {
290+
logger.log(Level.SEVERE, () -> item.error().reason());
291+
}
292+
}
293+
// TODO cleanup? or throw exception?
294+
} else {
295+
return findAllById(ids);
296+
}
297+
} catch (IOException e) {
298+
String message =
299+
MessageFormat.format(TextUtil.FAILED_TO_INDEX_CHANNELS, channels);
300+
logger.log(Level.SEVERE, message, e);
301+
throw new ResponseStatusException(
302+
HttpStatus.INTERNAL_SERVER_ERROR, message, null);
303+
}
304+
return Collections.emptyList();
305+
}));
306+
}
307+
308+
List<Channel> allSaved = new ArrayList<>();
309+
for (Future<List<Channel>> future : futures) {
310+
try {
311+
allSaved.addAll(future.get());
312+
} catch (Exception e) {
313+
logger.log(Level.SEVERE, "Bulk saving failed", e);
249314
}
250-
} catch (IOException e) {
251-
String message = MessageFormat.format(TextUtil.FAILED_TO_INDEX_CHANNELS, channels);
252-
logger.log(Level.SEVERE, message, e);
253-
throw new ResponseStatusException(HttpStatus.INTERNAL_SERVER_ERROR, message, null);
254315
}
255-
return null;
316+
return (Iterable<S>) allSaved;
256317
}
257318

258319
/**
@@ -705,4 +766,17 @@ public void deleteAllById(Iterable<? extends String> ids) {
705766
// TODO Auto-generated method stub
706767

707768
}
769+
770+
@PreDestroy
771+
public void shutdownExecutor() {
772+
executor.shutdown();
773+
try {
774+
if (!executor.awaitTermination(30, TimeUnit.SECONDS)) {
775+
executor.shutdownNow();
776+
}
777+
} catch (InterruptedException e) {
778+
executor.shutdownNow();
779+
Thread.currentThread().interrupt();
780+
}
781+
}
708782
}

src/main/java/org/phoebus/channelfinder/processors/ChannelProcessorService.java

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,14 @@
11
package org.phoebus.channelfinder.processors;
22

3+
import java.util.ArrayList;
34
import java.util.List;
5+
import java.util.Spliterator;
46
import java.util.logging.Level;
57
import java.util.logging.Logger;
68
import java.util.stream.Collectors;
79
import org.phoebus.channelfinder.entity.Channel;
810
import org.springframework.beans.factory.annotation.Autowired;
11+
import org.springframework.beans.factory.annotation.Value;
912
import org.springframework.core.task.TaskExecutor;
1013
import org.springframework.stereotype.Service;
1114

@@ -18,6 +21,9 @@ public class ChannelProcessorService {
1821

1922
@Autowired private TaskExecutor taskExecutor;
2023

24+
@Value("${processors.chunking.size:10000}")
25+
private int chunkSize;
26+
2127
long getProcessorCount() {
2228
return channelProcessors.size();
2329
}
@@ -49,7 +55,16 @@ public void sendToProcessors(List<Channel> channels) {
4955
.forEach(
5056
channelProcessor -> {
5157
try {
52-
channelProcessor.process(channels);
58+
Spliterator<Channel> split = channels.stream().spliterator();
59+
60+
while (true) {
61+
List<Channel> chunk = new ArrayList<>(chunkSize);
62+
for (int i = 0; i < chunkSize && split.tryAdvance(chunk::add); i++) {}
63+
;
64+
if (chunk.isEmpty()) break;
65+
channelProcessor.process(chunk);
66+
}
67+
5368
} catch (Exception e) {
5469
logger.log(
5570
Level.WARNING,

src/main/resources/application.properties

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,11 @@ elasticsearch.query.size = 10000
106106
# Create the Channel Finder indices if they do not exist
107107
elasticsearch.create.indices=true
108108

109+
############################## Repository #################################
110+
111+
# Repository chunk size, how many channels to submit to elastic at once
112+
repository.chunk.size = 10000
113+
109114
############################## Service Info ###############################
110115
# ChannelFinder version as defined in the pom file
111116
channelfinder.version=@project.version@
@@ -114,6 +119,9 @@ [email protected]@
114119
# DEBUG level will log all requests and responses to and from the REST end points
115120
logging.level.org.springframework.web.filter.CommonsRequestLoggingFilter=INFO
116121

122+
################ Processor ##################################################
123+
processors.chunking.size=10000
124+
117125
################ Archiver Appliance Configuration Processor #################
118126
aa.urls={'default': 'http://localhost:17665'}
119127
# Comma-separated list of archivers to use if archiver_property_name is null

0 commit comments

Comments
 (0)