Skip to content
This repository has been archived by the owner on Apr 15, 2023. It is now read-only.

Commit

Permalink
Better retry
Browse files Browse the repository at this point in the history
  • Loading branch information
tristantarrant committed Feb 15, 2011
1 parent 9440122 commit fc035c1
Show file tree
Hide file tree
Showing 7 changed files with 111 additions and 22 deletions.
1 change: 0 additions & 1 deletion README

This file was deleted.

7 changes: 7 additions & 0 deletions README.textile
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
h1. Amanuensis: Clustered Infinispan Index Writer using JGroups


h2. What is it ?

Amanuensis attempts to implement a pseudo-distributed Lucene IndexWriter for use with Infinispan's Lucene Directory implementation. It is modelled around Hibernate Search's backend from which it borrows many ideas and bits of code. Index operations are dispatched from slaves to one master (colocated with Infinispan's coordinator) which applies them to the real IndexWriter.
Amanuensis also implements methods for obtaining efficient IndexReader instances which handle
4 changes: 4 additions & 0 deletions TODO.textile
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
h1. Things to do

* Robustness (persistent queues)
* Backup support (snapshot/restore indices)
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ public void configure(IndexWriter writer) {
mergePolicy.setCalibrateSizeByDeletes(calibrateSizeByDeletes);
mergePolicy.setNoCFSRatio(noCFSRatio);
mergePolicy.setUseCompoundDocStore(useCompoundDocStore);
mergePolicy.setUseCompoundFile(useCompoundFile);
mergePolicy.setUseCompoundFile(useCompoundFile);
writer.setMergePolicy(mergePolicy);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,5 +22,5 @@
package net.dataforte.infinispan.amanuensis;

public interface OperationDispatcher {
void dispatch(IndexOperations ops) throws IndexerException;
void dispatch(final IndexOperations ops) throws IndexerException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@

package net.dataforte.infinispan.amanuensis.backend.jgroups;

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

import net.dataforte.commons.slf4j.LoggerFactory;
import net.dataforte.infinispan.amanuensis.AmanuensisManager;
import net.dataforte.infinispan.amanuensis.IndexOperations;
Expand All @@ -30,43 +34,86 @@
import org.infinispan.remoting.transport.jgroups.JGroupsAddress;
import org.jgroups.Address;
import org.jgroups.Message;
import org.jgroups.SuspectedException;
import org.jgroups.TimeoutException;
import org.jgroups.blocks.Request;
import org.jgroups.blocks.RequestOptions;
import org.jgroups.blocks.mux.MuxMessageDispatcher;
import org.slf4j.Logger;

/**
* This class takes care of dispatching {@link IndexOperations} messages
* from the slaves to the master.
* This class takes care of dispatching {@link IndexOperations} messages from
* the slaves to the master.
*
* @author Tristan Tarrant
*/
public class JGroupsOperationDispatcher implements OperationDispatcher {
private static final Logger log = LoggerFactory.make();
private AmanuensisManager manager;
private MuxMessageDispatcher messageDispatcher;
private RequestOptions requestOptions;
private int maxRetries = 10;
private int minTimeout = 10000;

public JGroupsOperationDispatcher(AmanuensisManager manager, MuxMessageDispatcher messageDispatcher) {
this.manager = manager;
this.messageDispatcher = messageDispatcher;
this.requestOptions = new RequestOptions(Request.GET_ALL, 10000); // We want synchronous, and we can wait for 10 seconds
}

@Override
public void dispatch(IndexOperations ops) throws IndexerException {
Address dest = ((JGroupsAddress)manager.getMasterAddress()).getJGroupsAddress();
Address src = ((JGroupsAddress)manager.getLocalAddress()).getJGroupsAddress();
Message message = new Message(dest, src, ops);
try {
if(log.isTraceEnabled()) {
log.trace("Sending {} to {}", ops.toString(), dest.toString());
}
messageDispatcher.sendMessage(message, RequestOptions.ASYNC);
} catch (SuspectedException e) {
throw new IndexerException(e);
} catch (TimeoutException e) {
throw new IndexerException(e);
}
public RequestOptions getRequestOptions() {
return requestOptions;
}

public void setRequestOptions(RequestOptions requestOptions) {
this.requestOptions = requestOptions;
}

public int getMaxRetries() {
return maxRetries;
}

public void setMaxRetries(int retries) {
this.maxRetries = retries;
}

public int getMinTimeout() {
return minTimeout;
}

public void setMinTimeout(int minTimeout) {
this.minTimeout = minTimeout;
}

@Override
public void dispatch(final IndexOperations ops) throws IndexerException {
final ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);

executor.scheduleWithFixedDelay(new Runnable() {
int retryCount = 0;

@Override
public void run() {
Address dest = ((JGroupsAddress) manager.getMasterAddress()).getJGroupsAddress();
Address src = ((JGroupsAddress) manager.getLocalAddress()).getJGroupsAddress();
Message message = new Message(dest, src, ops);
if (log.isTraceEnabled()) {
log.trace("Sending {} to {}", ops.toString(), dest.toString());
}
try {
messageDispatcher.sendMessage(message, requestOptions);
// No exception was raised, stop the executor
executor.shutdown();
} catch (Exception e) {
++retryCount;
if(log.isDebugEnabled()) {
log.debug("Error while sending {} to {}", ops.toString(), dest.toString());
}
if(retryCount<maxRetries) {
log.warn("Sending operations to {} failed, try #{}", dest.toString(), retryCount);
} else {
log.error("Could not send operations to "+dest.toString()+" after "+maxRetries + "tries, giving up", e);
}
}

}}, 0, minTimeout, TimeUnit.MILLISECONDS);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package net.dataforte.infinispan.amanuensis.backend.jgroups;

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

import org.junit.Test;

public class ExecutorTest {

@Test
public void testExecutor() throws Exception {
final ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
executor.scheduleWithFixedDelay(new Runnable() {
int count = 0;

@Override
public void run() {
// TODO Auto-generated method stub
++count;
System.out.printf("Execution count %d\n", count);
if(count>2) {
executor.shutdown();
}
}

}, 0, 1000, TimeUnit.MILLISECONDS);
executor.awaitTermination(5000, TimeUnit.MILLISECONDS);
System.out.println("Completed");
}

}

0 comments on commit fc035c1

Please sign in to comment.