Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
246 changes: 160 additions & 86 deletions src/main/java/org/phoebus/channelfinder/ChannelRepository.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,22 +31,31 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
import java.text.MessageFormat;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import javax.annotation.PreDestroy;
import org.phoebus.channelfinder.entity.Channel;
import org.phoebus.channelfinder.entity.Property;
import org.phoebus.channelfinder.entity.SearchResult;
import org.phoebus.channelfinder.entity.Tag;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.repository.CrudRepository;
import org.springframework.http.HttpStatus;
Expand All @@ -67,11 +76,18 @@ public class ChannelRepository implements CrudRepository<Channel, String> {
@Qualifier("indexClient")
ElasticsearchClient client;

@Value("${repository.chunk.size:10000}")
private int chunkSize;

// Object mapper to ignore properties we don't want to index
final ObjectMapper objectMapper =
new ObjectMapper()
.addMixIn(Tag.class, Tag.OnlyTag.class)
.addMixIn(Property.class, Property.OnlyProperty.class);

private final ExecutorService executor =
Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());

/**
* create a new channel using the given Channel
*
Expand Down Expand Up @@ -110,41 +126,59 @@ public Channel index(Channel channel) {
* @return the created channels
*/
public List<Channel> indexAll(List<Channel> channels) {
BulkRequest.Builder br = new BulkRequest.Builder();

for (Channel channel : channels) {
br.operations(
op ->
op.index(
idx ->
idx.index(esService.getES_CHANNEL_INDEX())
.id(channel.getName())
.document(
JsonData.of(channel, new JacksonJsonpMapper(objectMapper)))))
.refresh(Refresh.True);
List<Future<List<Channel>>> futures = new ArrayList<>();

for (int i = 0; i < channels.size(); i += chunkSize) {
List<Channel> chunk = channels.stream().skip(i).limit(chunkSize).toList();
futures.add(
executor.submit(
() -> {
BulkRequest.Builder br = new BulkRequest.Builder();
for (Channel channel : chunk) {
br.operations(
op ->
op.index(
idx ->
idx.index(esService.getES_CHANNEL_INDEX())
.id(channel.getName())
.document(
JsonData.of(
channel, new JacksonJsonpMapper(objectMapper)))))
.refresh(Refresh.True);
}
BulkResponse result;
try {
result = client.bulk(br.build());
// Log errors, if any
if (result.errors()) {
logger.log(Level.SEVERE, TextUtil.BULK_HAD_ERRORS);
for (BulkResponseItem item : result.items()) {
if (item.error() != null) {
logger.log(Level.SEVERE, () -> item.error().reason());
}
}
// TODO cleanup? or throw exception?
} else {
return chunk;
}
} catch (IOException e) {
String message = MessageFormat.format(TextUtil.FAILED_TO_INDEX_CHANNELS, chunk);
logger.log(Level.SEVERE, message, e);
throw new ResponseStatusException(
HttpStatus.INTERNAL_SERVER_ERROR, message, null);
}
return Collections.emptyList();
}));
}

BulkResponse result = null;
try {
result = client.bulk(br.build());
// Log errors, if any
if (result.errors()) {
logger.log(Level.SEVERE, TextUtil.BULK_HAD_ERRORS);
for (BulkResponseItem item : result.items()) {
if (item.error() != null) {
logger.log(Level.SEVERE, () -> item.error().reason());
}
}
// TODO cleanup? or throw exception?
} else {
return channels;
List<Channel> allIndexed = new ArrayList<>();
for (Future<List<Channel>> future : futures) {
try {
allIndexed.addAll(future.get(10, TimeUnit.MINUTES));
} catch (Exception e) {
logger.log(Level.SEVERE, "Bulk indexing failed", e);
}
} catch (IOException e) {
String message = MessageFormat.format(TextUtil.FAILED_TO_INDEX_CHANNELS, channels);
logger.log(Level.SEVERE, message, e);
throw new ResponseStatusException(HttpStatus.INTERNAL_SERVER_ERROR, message, null);
}
return Collections.emptyList();
return allIndexed;
}

/**
Expand Down Expand Up @@ -177,7 +211,12 @@ public Channel save(String channelName, Channel channel) {
return null;
}

/** */
/**
* Saves the given channel entity. This method is required by the CrudRepository interface.
*
* @param channel the channel entity to save
* @return the saved channel entity
*/
@Override
public Channel save(Channel channel) {
return save(channel.getName(), channel);
Expand All @@ -193,66 +232,88 @@ public Channel save(Channel channel) {
@SuppressWarnings("unchecked")
@Override
public <S extends Channel> Iterable<S> saveAll(Iterable<S> channels) {
// Create a list of all channel names
List<String> ids =
List<Future<List<Channel>>> futures = new ArrayList<>();
Set<Channel> channelList =
StreamSupport.stream(channels.spliterator(), false)
.map(Channel::getName)
.collect(Collectors.toList());
.collect(Collectors.toCollection(LinkedHashSet::new));

try {
for (int i = 0; i < channelList.size(); i += chunkSize) {
List<Channel> chunk = channelList.stream().skip(i).limit(chunkSize).toList();
// Create a list of all channel names
Set<String> ids =
chunk.stream().map(Channel::getName).collect(Collectors.toCollection(LinkedHashSet::new));
Map<String, Channel> existingChannels =
findAllById(ids).stream().collect(Collectors.toMap(Channel::getName, c -> c));

BulkRequest.Builder br = new BulkRequest.Builder();

for (Channel channel : channels) {
if (existingChannels.containsKey(channel.getName())) {
// merge with existing channel
Channel updatedChannel = existingChannels.get(channel.getName());
if (channel.getOwner() != null && !channel.getOwner().isEmpty())
updatedChannel.setOwner(channel.getOwner());
updatedChannel.addProperties(channel.getProperties());
updatedChannel.addTags(channel.getTags());
br.operations(
op ->
op.index(
i ->
i.index(esService.getES_CHANNEL_INDEX())
.id(updatedChannel.getName())
.document(
JsonData.of(
updatedChannel, new JacksonJsonpMapper(objectMapper)))));
} else {
br.operations(
op ->
op.index(
i ->
i.index(esService.getES_CHANNEL_INDEX())
.id(channel.getName())
.document(
JsonData.of(channel, new JacksonJsonpMapper(objectMapper)))));
}
}
BulkResponse result = null;
result = client.bulk(br.refresh(Refresh.True).build());
// Log errors, if any
if (result.errors()) {
logger.log(Level.SEVERE, TextUtil.BULK_HAD_ERRORS);
for (BulkResponseItem item : result.items()) {
if (item.error() != null) {
logger.log(Level.SEVERE, () -> item.error().reason());
}
}
// TODO cleanup? or throw exception?
} else {
return (Iterable<S>) findAllById(ids);
futures.add(
executor.submit(
() -> {
BulkRequest.Builder br = new BulkRequest.Builder();
for (Channel channel : chunk) {
if (existingChannels.containsKey(channel.getName())) {
// merge with existing channel
Channel updatedChannel = existingChannels.get(channel.getName());
if (channel.getOwner() != null && !channel.getOwner().isEmpty())
updatedChannel.setOwner(channel.getOwner());
updatedChannel.addProperties(channel.getProperties());
updatedChannel.addTags(channel.getTags());
br.operations(
op ->
op.index(
ch ->
ch.index(esService.getES_CHANNEL_INDEX())
.id(updatedChannel.getName())
.document(
JsonData.of(
updatedChannel,
new JacksonJsonpMapper(objectMapper)))));
} else {
br.operations(
op ->
op.index(
ch ->
ch.index(esService.getES_CHANNEL_INDEX())
.id(channel.getName())
.document(
JsonData.of(
channel, new JacksonJsonpMapper(objectMapper)))));
}
}
BulkResponse result;
try {
result = client.bulk(br.refresh(Refresh.True).build());
// Log errors, if any
if (result.errors()) {
logger.log(Level.SEVERE, TextUtil.BULK_HAD_ERRORS);
for (BulkResponseItem item : result.items()) {
if (item.error() != null) {
logger.log(Level.SEVERE, () -> item.error().reason());
}
}
// TODO cleanup? or throw exception?
} else {
return findAllById(ids);
}
} catch (IOException e) {
String message =
MessageFormat.format(TextUtil.FAILED_TO_INDEX_CHANNELS, channels);
logger.log(Level.SEVERE, message, e);
throw new ResponseStatusException(
HttpStatus.INTERNAL_SERVER_ERROR, message, null);
}
return Collections.emptyList();
}));
}

List<Channel> allSaved = new ArrayList<>();
for (Future<List<Channel>> future : futures) {
try {
allSaved.addAll(future.get());
} catch (Exception e) {
logger.log(Level.SEVERE, "Bulk saving failed", e);
}
} catch (IOException e) {
String message = MessageFormat.format(TextUtil.FAILED_TO_INDEX_CHANNELS, channels);
logger.log(Level.SEVERE, message, e);
throw new ResponseStatusException(HttpStatus.INTERNAL_SERVER_ERROR, message, null);
}
return null;
return (Iterable<S>) allSaved;
}

/**
Expand Down Expand Up @@ -705,4 +766,17 @@ public void deleteAllById(Iterable<? extends String> ids) {
// TODO Auto-generated method stub

}

@PreDestroy
public void shutdownExecutor() {
executor.shutdown();
try {
if (!executor.awaitTermination(30, TimeUnit.SECONDS)) {
executor.shutdownNow();
}
} catch (InterruptedException e) {
executor.shutdownNow();
Thread.currentThread().interrupt();
}
}
}
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
package org.phoebus.channelfinder.processors;

import java.util.ArrayList;
import java.util.List;
import java.util.Spliterator;
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.stream.Collectors;
import org.phoebus.channelfinder.entity.Channel;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.core.task.TaskExecutor;
import org.springframework.stereotype.Service;

Expand All @@ -18,6 +21,9 @@ public class ChannelProcessorService {

@Autowired private TaskExecutor taskExecutor;

@Value("${processors.chunking.size:10000}")
private int chunkSize;

long getProcessorCount() {
return channelProcessors.size();
}
Expand Down Expand Up @@ -49,7 +55,16 @@ public void sendToProcessors(List<Channel> channels) {
.forEach(
channelProcessor -> {
try {
channelProcessor.process(channels);
Spliterator<Channel> split = channels.stream().spliterator();

while (true) {
List<Channel> chunk = new ArrayList<>(chunkSize);
for (int i = 0; i < chunkSize && split.tryAdvance(chunk::add); i++) {}
;
if (chunk.isEmpty()) break;
channelProcessor.process(chunk);
}

} catch (Exception e) {
logger.log(
Level.WARNING,
Expand Down
8 changes: 8 additions & 0 deletions src/main/resources/application.properties
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,11 @@ elasticsearch.query.size = 10000
# Create the Channel Finder indices if they do not exist
elasticsearch.create.indices=true

############################## Repository #################################

# Repository chunk size, how many channels to submit to elastic at once
repository.chunk.size = 10000

############################## Service Info ###############################
# ChannelFinder version as defined in the pom file
[email protected]@
Expand All @@ -114,6 +119,9 @@ [email protected]@
# DEBUG level will log all requests and responses to and from the REST end points
logging.level.org.springframework.web.filter.CommonsRequestLoggingFilter=INFO

################ Processor ##################################################
processors.chunking.size=10000

################ Archiver Appliance Configuration Processor #################
aa.urls={'default': 'http://localhost:17665'}
# Comma-separated list of archivers to use if archiver_property_name is null
Expand Down