From fc035c19d3a2a65de8f355d84ad4bed25da84ea4 Mon Sep 17 00:00:00 2001 From: Tristan Tarrant Date: Tue, 15 Feb 2011 11:45:22 +0100 Subject: [PATCH] Better retry --- README | 1 - README.textile | 7 ++ TODO.textile | 4 + .../amanuensis/DefaultWriterConfigurator.java | 2 +- .../amanuensis/OperationDispatcher.java | 2 +- .../jgroups/JGroupsOperationDispatcher.java | 85 ++++++++++++++----- .../backend/jgroups/ExecutorTest.java | 32 +++++++ 7 files changed, 111 insertions(+), 22 deletions(-) delete mode 100644 README create mode 100644 README.textile create mode 100644 TODO.textile create mode 100644 src/test/java/net/dataforte/infinispan/amanuensis/backend/jgroups/ExecutorTest.java diff --git a/README b/README deleted file mode 100644 index cd3b628..0000000 --- a/README +++ /dev/null @@ -1 +0,0 @@ -Amanuensis: Clustered Infinispan Index Writer using JGroups diff --git a/README.textile b/README.textile new file mode 100644 index 0000000..9cf6275 --- /dev/null +++ b/README.textile @@ -0,0 +1,7 @@ +h1. Amanuensis: Clustered Infinispan Index Writer using JGroups + + +h2. What is it ? + +Amanuensis attempts to implement a pseudo-distributed Lucene IndexWriter for use with Infinispan's Lucene Directory implementation. It is modelled around Hibernate Search's backend from which it borrows many ideas and bits of code. Index operations are dispatched from slaves to one master (colocated with Infinispan's coordinator) which applies them to the real IndexWriter. +Amanuensis also implements methods for obtaining efficient IndexReader instances which handle diff --git a/TODO.textile b/TODO.textile new file mode 100644 index 0000000..2f7aba1 --- /dev/null +++ b/TODO.textile @@ -0,0 +1,4 @@ +h1. Things to do + +* Robustness (persistent queues) +* Backup support (snapshot/restore indices) diff --git a/src/main/java/net/dataforte/infinispan/amanuensis/DefaultWriterConfigurator.java b/src/main/java/net/dataforte/infinispan/amanuensis/DefaultWriterConfigurator.java index 1cf759c..4c85f03 100644 --- a/src/main/java/net/dataforte/infinispan/amanuensis/DefaultWriterConfigurator.java +++ b/src/main/java/net/dataforte/infinispan/amanuensis/DefaultWriterConfigurator.java @@ -109,7 +109,7 @@ public void configure(IndexWriter writer) { mergePolicy.setCalibrateSizeByDeletes(calibrateSizeByDeletes); mergePolicy.setNoCFSRatio(noCFSRatio); mergePolicy.setUseCompoundDocStore(useCompoundDocStore); - mergePolicy.setUseCompoundFile(useCompoundFile); + mergePolicy.setUseCompoundFile(useCompoundFile); writer.setMergePolicy(mergePolicy); } diff --git a/src/main/java/net/dataforte/infinispan/amanuensis/OperationDispatcher.java b/src/main/java/net/dataforte/infinispan/amanuensis/OperationDispatcher.java index 4d9ab28..681402b 100644 --- a/src/main/java/net/dataforte/infinispan/amanuensis/OperationDispatcher.java +++ b/src/main/java/net/dataforte/infinispan/amanuensis/OperationDispatcher.java @@ -22,5 +22,5 @@ package net.dataforte.infinispan.amanuensis; public interface OperationDispatcher { - void dispatch(IndexOperations ops) throws IndexerException; + void dispatch(final IndexOperations ops) throws IndexerException; } diff --git a/src/main/java/net/dataforte/infinispan/amanuensis/backend/jgroups/JGroupsOperationDispatcher.java b/src/main/java/net/dataforte/infinispan/amanuensis/backend/jgroups/JGroupsOperationDispatcher.java index 1f12687..194e79a 100644 --- a/src/main/java/net/dataforte/infinispan/amanuensis/backend/jgroups/JGroupsOperationDispatcher.java +++ b/src/main/java/net/dataforte/infinispan/amanuensis/backend/jgroups/JGroupsOperationDispatcher.java @@ -21,6 +21,10 @@ package net.dataforte.infinispan.amanuensis.backend.jgroups; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + import net.dataforte.commons.slf4j.LoggerFactory; import net.dataforte.infinispan.amanuensis.AmanuensisManager; import net.dataforte.infinispan.amanuensis.IndexOperations; @@ -30,15 +34,14 @@ import org.infinispan.remoting.transport.jgroups.JGroupsAddress; import org.jgroups.Address; import org.jgroups.Message; -import org.jgroups.SuspectedException; -import org.jgroups.TimeoutException; +import org.jgroups.blocks.Request; import org.jgroups.blocks.RequestOptions; import org.jgroups.blocks.mux.MuxMessageDispatcher; import org.slf4j.Logger; /** - * This class takes care of dispatching {@link IndexOperations} messages - * from the slaves to the master. + * This class takes care of dispatching {@link IndexOperations} messages from + * the slaves to the master. * * @author Tristan Tarrant */ @@ -46,27 +49,71 @@ public class JGroupsOperationDispatcher implements OperationDispatcher { private static final Logger log = LoggerFactory.make(); private AmanuensisManager manager; private MuxMessageDispatcher messageDispatcher; + private RequestOptions requestOptions; + private int maxRetries = 10; + private int minTimeout = 10000; public JGroupsOperationDispatcher(AmanuensisManager manager, MuxMessageDispatcher messageDispatcher) { this.manager = manager; this.messageDispatcher = messageDispatcher; + this.requestOptions = new RequestOptions(Request.GET_ALL, 10000); // We want synchronous, and we can wait for 10 seconds } - @Override - public void dispatch(IndexOperations ops) throws IndexerException { - Address dest = ((JGroupsAddress)manager.getMasterAddress()).getJGroupsAddress(); - Address src = ((JGroupsAddress)manager.getLocalAddress()).getJGroupsAddress(); - Message message = new Message(dest, src, ops); - try { - if(log.isTraceEnabled()) { - log.trace("Sending {} to {}", ops.toString(), dest.toString()); - } - messageDispatcher.sendMessage(message, RequestOptions.ASYNC); - } catch (SuspectedException e) { - throw new IndexerException(e); - } catch (TimeoutException e) { - throw new IndexerException(e); - } + public RequestOptions getRequestOptions() { + return requestOptions; + } + + public void setRequestOptions(RequestOptions requestOptions) { + this.requestOptions = requestOptions; + } + + public int getMaxRetries() { + return maxRetries; + } + + public void setMaxRetries(int retries) { + this.maxRetries = retries; } + public int getMinTimeout() { + return minTimeout; + } + + public void setMinTimeout(int minTimeout) { + this.minTimeout = minTimeout; + } + + @Override + public void dispatch(final IndexOperations ops) throws IndexerException { + final ScheduledExecutorService executor = Executors.newScheduledThreadPool(1); + + executor.scheduleWithFixedDelay(new Runnable() { + int retryCount = 0; + + @Override + public void run() { + Address dest = ((JGroupsAddress) manager.getMasterAddress()).getJGroupsAddress(); + Address src = ((JGroupsAddress) manager.getLocalAddress()).getJGroupsAddress(); + Message message = new Message(dest, src, ops); + if (log.isTraceEnabled()) { + log.trace("Sending {} to {}", ops.toString(), dest.toString()); + } + try { + messageDispatcher.sendMessage(message, requestOptions); + // No exception was raised, stop the executor + executor.shutdown(); + } catch (Exception e) { + ++retryCount; + if(log.isDebugEnabled()) { + log.debug("Error while sending {} to {}", ops.toString(), dest.toString()); + } + if(retryCount2) { + executor.shutdown(); + } + } + + }, 0, 1000, TimeUnit.MILLISECONDS); + executor.awaitTermination(5000, TimeUnit.MILLISECONDS); + System.out.println("Completed"); + } + +}