diff --git a/src/main/java/org/phoebus/channelfinder/ChannelRepository.java b/src/main/java/org/phoebus/channelfinder/ChannelRepository.java index c30b639..a429667 100644 --- a/src/main/java/org/phoebus/channelfinder/ChannelRepository.java +++ b/src/main/java/org/phoebus/channelfinder/ChannelRepository.java @@ -31,22 +31,31 @@ import com.fasterxml.jackson.databind.ObjectMapper; import java.io.IOException; import java.text.MessageFormat; +import java.util.ArrayList; import java.util.Collections; import java.util.Comparator; import java.util.HashSet; +import java.util.LinkedHashSet; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.Set; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; import java.util.logging.Level; import java.util.logging.Logger; import java.util.stream.Collectors; import java.util.stream.StreamSupport; +import javax.annotation.PreDestroy; import org.phoebus.channelfinder.entity.Channel; import org.phoebus.channelfinder.entity.Property; import org.phoebus.channelfinder.entity.SearchResult; import org.phoebus.channelfinder.entity.Tag; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Configuration; import org.springframework.data.repository.CrudRepository; import org.springframework.http.HttpStatus; @@ -67,11 +76,18 @@ public class ChannelRepository implements CrudRepository { @Qualifier("indexClient") ElasticsearchClient client; + @Value("${repository.chunk.size:10000}") + private int chunkSize; + + // Object mapper to ignore properties we don't want to index final ObjectMapper objectMapper = new ObjectMapper() .addMixIn(Tag.class, Tag.OnlyTag.class) .addMixIn(Property.class, Property.OnlyProperty.class); + private final ExecutorService executor = + Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()); + /** * create a new channel using the given Channel * @@ -110,41 +126,59 @@ public Channel index(Channel channel) { * @return the created channels */ public List indexAll(List channels) { - BulkRequest.Builder br = new BulkRequest.Builder(); - - for (Channel channel : channels) { - br.operations( - op -> - op.index( - idx -> - idx.index(esService.getES_CHANNEL_INDEX()) - .id(channel.getName()) - .document( - JsonData.of(channel, new JacksonJsonpMapper(objectMapper))))) - .refresh(Refresh.True); + List>> futures = new ArrayList<>(); + + for (int i = 0; i < channels.size(); i += chunkSize) { + List chunk = channels.stream().skip(i).limit(chunkSize).toList(); + futures.add( + executor.submit( + () -> { + BulkRequest.Builder br = new BulkRequest.Builder(); + for (Channel channel : chunk) { + br.operations( + op -> + op.index( + idx -> + idx.index(esService.getES_CHANNEL_INDEX()) + .id(channel.getName()) + .document( + JsonData.of( + channel, new JacksonJsonpMapper(objectMapper))))) + .refresh(Refresh.True); + } + BulkResponse result; + try { + result = client.bulk(br.build()); + // Log errors, if any + if (result.errors()) { + logger.log(Level.SEVERE, TextUtil.BULK_HAD_ERRORS); + for (BulkResponseItem item : result.items()) { + if (item.error() != null) { + logger.log(Level.SEVERE, () -> item.error().reason()); + } + } + // TODO cleanup? or throw exception? + } else { + return chunk; + } + } catch (IOException e) { + String message = MessageFormat.format(TextUtil.FAILED_TO_INDEX_CHANNELS, chunk); + logger.log(Level.SEVERE, message, e); + throw new ResponseStatusException( + HttpStatus.INTERNAL_SERVER_ERROR, message, null); + } + return Collections.emptyList(); + })); } - - BulkResponse result = null; - try { - result = client.bulk(br.build()); - // Log errors, if any - if (result.errors()) { - logger.log(Level.SEVERE, TextUtil.BULK_HAD_ERRORS); - for (BulkResponseItem item : result.items()) { - if (item.error() != null) { - logger.log(Level.SEVERE, () -> item.error().reason()); - } - } - // TODO cleanup? or throw exception? - } else { - return channels; + List allIndexed = new ArrayList<>(); + for (Future> future : futures) { + try { + allIndexed.addAll(future.get(10, TimeUnit.MINUTES)); + } catch (Exception e) { + logger.log(Level.SEVERE, "Bulk indexing failed", e); } - } catch (IOException e) { - String message = MessageFormat.format(TextUtil.FAILED_TO_INDEX_CHANNELS, channels); - logger.log(Level.SEVERE, message, e); - throw new ResponseStatusException(HttpStatus.INTERNAL_SERVER_ERROR, message, null); } - return Collections.emptyList(); + return allIndexed; } /** @@ -177,7 +211,12 @@ public Channel save(String channelName, Channel channel) { return null; } - /** */ + /** + * Saves the given channel entity. This method is required by the CrudRepository interface. + * + * @param channel the channel entity to save + * @return the saved channel entity + */ @Override public Channel save(Channel channel) { return save(channel.getName(), channel); @@ -193,66 +232,88 @@ public Channel save(Channel channel) { @SuppressWarnings("unchecked") @Override public Iterable saveAll(Iterable channels) { - // Create a list of all channel names - List ids = + List>> futures = new ArrayList<>(); + Set channelList = StreamSupport.stream(channels.spliterator(), false) - .map(Channel::getName) - .collect(Collectors.toList()); + .collect(Collectors.toCollection(LinkedHashSet::new)); - try { + for (int i = 0; i < channelList.size(); i += chunkSize) { + List chunk = channelList.stream().skip(i).limit(chunkSize).toList(); + // Create a list of all channel names + Set ids = + chunk.stream().map(Channel::getName).collect(Collectors.toCollection(LinkedHashSet::new)); Map existingChannels = findAllById(ids).stream().collect(Collectors.toMap(Channel::getName, c -> c)); - BulkRequest.Builder br = new BulkRequest.Builder(); - - for (Channel channel : channels) { - if (existingChannels.containsKey(channel.getName())) { - // merge with existing channel - Channel updatedChannel = existingChannels.get(channel.getName()); - if (channel.getOwner() != null && !channel.getOwner().isEmpty()) - updatedChannel.setOwner(channel.getOwner()); - updatedChannel.addProperties(channel.getProperties()); - updatedChannel.addTags(channel.getTags()); - br.operations( - op -> - op.index( - i -> - i.index(esService.getES_CHANNEL_INDEX()) - .id(updatedChannel.getName()) - .document( - JsonData.of( - updatedChannel, new JacksonJsonpMapper(objectMapper))))); - } else { - br.operations( - op -> - op.index( - i -> - i.index(esService.getES_CHANNEL_INDEX()) - .id(channel.getName()) - .document( - JsonData.of(channel, new JacksonJsonpMapper(objectMapper))))); - } - } - BulkResponse result = null; - result = client.bulk(br.refresh(Refresh.True).build()); - // Log errors, if any - if (result.errors()) { - logger.log(Level.SEVERE, TextUtil.BULK_HAD_ERRORS); - for (BulkResponseItem item : result.items()) { - if (item.error() != null) { - logger.log(Level.SEVERE, () -> item.error().reason()); - } - } - // TODO cleanup? or throw exception? - } else { - return (Iterable) findAllById(ids); + futures.add( + executor.submit( + () -> { + BulkRequest.Builder br = new BulkRequest.Builder(); + for (Channel channel : chunk) { + if (existingChannels.containsKey(channel.getName())) { + // merge with existing channel + Channel updatedChannel = existingChannels.get(channel.getName()); + if (channel.getOwner() != null && !channel.getOwner().isEmpty()) + updatedChannel.setOwner(channel.getOwner()); + updatedChannel.addProperties(channel.getProperties()); + updatedChannel.addTags(channel.getTags()); + br.operations( + op -> + op.index( + ch -> + ch.index(esService.getES_CHANNEL_INDEX()) + .id(updatedChannel.getName()) + .document( + JsonData.of( + updatedChannel, + new JacksonJsonpMapper(objectMapper))))); + } else { + br.operations( + op -> + op.index( + ch -> + ch.index(esService.getES_CHANNEL_INDEX()) + .id(channel.getName()) + .document( + JsonData.of( + channel, new JacksonJsonpMapper(objectMapper))))); + } + } + BulkResponse result; + try { + result = client.bulk(br.refresh(Refresh.True).build()); + // Log errors, if any + if (result.errors()) { + logger.log(Level.SEVERE, TextUtil.BULK_HAD_ERRORS); + for (BulkResponseItem item : result.items()) { + if (item.error() != null) { + logger.log(Level.SEVERE, () -> item.error().reason()); + } + } + // TODO cleanup? or throw exception? + } else { + return findAllById(ids); + } + } catch (IOException e) { + String message = + MessageFormat.format(TextUtil.FAILED_TO_INDEX_CHANNELS, channels); + logger.log(Level.SEVERE, message, e); + throw new ResponseStatusException( + HttpStatus.INTERNAL_SERVER_ERROR, message, null); + } + return Collections.emptyList(); + })); + } + + List allSaved = new ArrayList<>(); + for (Future> future : futures) { + try { + allSaved.addAll(future.get()); + } catch (Exception e) { + logger.log(Level.SEVERE, "Bulk saving failed", e); } - } catch (IOException e) { - String message = MessageFormat.format(TextUtil.FAILED_TO_INDEX_CHANNELS, channels); - logger.log(Level.SEVERE, message, e); - throw new ResponseStatusException(HttpStatus.INTERNAL_SERVER_ERROR, message, null); } - return null; + return (Iterable) allSaved; } /** @@ -705,4 +766,17 @@ public void deleteAllById(Iterable ids) { // TODO Auto-generated method stub } + + @PreDestroy + public void shutdownExecutor() { + executor.shutdown(); + try { + if (!executor.awaitTermination(30, TimeUnit.SECONDS)) { + executor.shutdownNow(); + } + } catch (InterruptedException e) { + executor.shutdownNow(); + Thread.currentThread().interrupt(); + } + } } diff --git a/src/main/java/org/phoebus/channelfinder/processors/ChannelProcessorService.java b/src/main/java/org/phoebus/channelfinder/processors/ChannelProcessorService.java index 0f98179..3eb4907 100644 --- a/src/main/java/org/phoebus/channelfinder/processors/ChannelProcessorService.java +++ b/src/main/java/org/phoebus/channelfinder/processors/ChannelProcessorService.java @@ -1,11 +1,14 @@ package org.phoebus.channelfinder.processors; +import java.util.ArrayList; import java.util.List; +import java.util.Spliterator; import java.util.logging.Level; import java.util.logging.Logger; import java.util.stream.Collectors; import org.phoebus.channelfinder.entity.Channel; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; import org.springframework.core.task.TaskExecutor; import org.springframework.stereotype.Service; @@ -18,6 +21,9 @@ public class ChannelProcessorService { @Autowired private TaskExecutor taskExecutor; + @Value("${processors.chunking.size:10000}") + private int chunkSize; + long getProcessorCount() { return channelProcessors.size(); } @@ -49,7 +55,16 @@ public void sendToProcessors(List channels) { .forEach( channelProcessor -> { try { - channelProcessor.process(channels); + Spliterator split = channels.stream().spliterator(); + + while (true) { + List chunk = new ArrayList<>(chunkSize); + for (int i = 0; i < chunkSize && split.tryAdvance(chunk::add); i++) {} + ; + if (chunk.isEmpty()) break; + channelProcessor.process(chunk); + } + } catch (Exception e) { logger.log( Level.WARNING, diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties index f4a652d..d4db129 100644 --- a/src/main/resources/application.properties +++ b/src/main/resources/application.properties @@ -106,6 +106,11 @@ elasticsearch.query.size = 10000 # Create the Channel Finder indices if they do not exist elasticsearch.create.indices=true +############################## Repository ################################# + +# Repository chunk size, how many channels to submit to elastic at once +repository.chunk.size = 10000 + ############################## Service Info ############################### # ChannelFinder version as defined in the pom file channelfinder.version=@project.version@ @@ -114,6 +119,9 @@ channelfinder.version=@project.version@ # DEBUG level will log all requests and responses to and from the REST end points logging.level.org.springframework.web.filter.CommonsRequestLoggingFilter=INFO +################ Processor ################################################## +processors.chunking.size=10000 + ################ Archiver Appliance Configuration Processor ################# aa.urls={'default': 'http://localhost:17665'} # Comma-separated list of archivers to use if archiver_property_name is null