Skip to content
This repository has been archived by the owner on Apr 15, 2023. It is now read-only.

Commit

Permalink
Initial import of Amanuensis
Browse files Browse the repository at this point in the history
  • Loading branch information
tristantarrant committed Jan 11, 2011
0 parents commit ec4d034
Show file tree
Hide file tree
Showing 25 changed files with 1,029 additions and 0 deletions.
1 change: 1 addition & 0 deletions README
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Amanuensis: Clustered Infinispan Index Writer using JGroups
57 changes: 57 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<artifactId>dataforte-parent</artifactId>
<groupId>net.dataforte</groupId>
<version>6</version>
</parent>
<groupId>net.dataforte.infinispan</groupId>
<artifactId>amanuensis</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>Infinispan Amanuensis Lucene Indexer</name>

<properties>
<version.lucene>3.0.3</version.lucene>
<version.infinispan>4.2.0.FINAL</version.infinispan>
<version.slf4j>1.6.1</version.slf4j>
</properties>

<scm>
<connection>scm:git:http://github.com/tristantarrant/infinispan-index-writer.git</connection>
<developerConnection>scm:git:ssh://[email protected]/tristantarrant/infinispan-index-writer.git</developerConnection>
<url>http://github.com/tristantarrant/infinispan-index-writer</url>
</scm>

<issueManagement>
<url>http://github.com/tristantarrant/infinispan-index-writer/issues</url>
</issueManagement>

<dependencies>
<dependency>
<groupId>org.infinispan</groupId>
<artifactId>infinispan-core</artifactId>
<version>${version.infinispan}</version>
</dependency>
<dependency>
<groupId>org.infinispan</groupId>
<artifactId>infinispan-lucene-directory</artifactId>
<version>${version.infinispan}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>${version.slf4j}</version>
</dependency>
<dependency>
<groupId>org.apache.lucene</groupId>
<artifactId>lucene-core</artifactId>
<version>${version.lucene}</version>
</dependency>
<dependency>
<groupId>net.dataforte</groupId>
<artifactId>dataforte-commons</artifactId>
<version>0.0.4</version>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
package net.dataforte.infinispan.amanuensis;

import java.awt.Component;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;

import net.dataforte.commons.collections.Computable;
import net.dataforte.commons.collections.Memoizer;
import net.dataforte.commons.slf4j.LoggerFactory;
import net.dataforte.infinispan.amanuensis.backend.jgroups.JGroupsOperationDispatcher;
import net.dataforte.infinispan.amanuensis.backend.jgroups.JGroupsOperationProcessor;
import net.dataforte.infinispan.amanuensis.backend.lucene.LuceneOperationDispatcher;

import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.analysis.SimpleAnalyzer;
import org.infinispan.lifecycle.ComponentStatus;
import org.infinispan.lucene.InfinispanDirectory;
import org.infinispan.manager.EmbeddedCacheManager;
import org.infinispan.remoting.transport.Address;
import org.slf4j.Logger;

public class AmanuensisManager {
private static final Logger log = LoggerFactory.make();
private static final short INFINISPAN_INDEX_WRITER_SCOPE_ID = 1234;
private static final Analyzer SIMPLE_ANALYZER = new SimpleAnalyzer();
private final EmbeddedCacheManager cacheManager;
private ConcurrentMap<String, InfinispanDirectory> directoryMap = new ConcurrentHashMap<String, InfinispanDirectory>();
private Memoizer<String, InfinispanIndexWriter> writerMap;
private JGroupsOperationProcessor remoteOperationProcessor;
private OperationDispatcher remoteOperationDispatcher;
private OperationDispatcher localOperationDispatcher;
private Analyzer analyzer = SIMPLE_ANALYZER;

/**
* Constructs an {@link AmanuensisManager} using the specified
* {@link EmbeddedCacheManager}
*
* @param cacheManager
*/
public AmanuensisManager(EmbeddedCacheManager cacheManager) {
if (cacheManager.getStatus()!=ComponentStatus.RUNNING) {
throw new IllegalStateException("Cache is not running");
}
this.cacheManager = cacheManager;
this.writerMap = new Memoizer<String, InfinispanIndexWriter>(new InfinispanIndexWriterMemoizer());
this.remoteOperationProcessor = new JGroupsOperationProcessor(this, INFINISPAN_INDEX_WRITER_SCOPE_ID);
this.remoteOperationDispatcher = new JGroupsOperationDispatcher(this, this.remoteOperationProcessor.getDispatcher());
this.localOperationDispatcher = new LuceneOperationDispatcher(this);
}

public Analyzer getAnalyzer() {
return analyzer;
}

public void setAnalyzer(Analyzer analyzer) {
this.analyzer = analyzer;
}

public void close() {
this.remoteOperationProcessor.close();
}

/**
* Retrieves (or initializes) an instance of InfinispanIndexWriter for the
* specified directory
*
* @param directory
* @return
*/
public InfinispanIndexWriter getIndexWriter(InfinispanDirectory directory) throws IndexerException {
if (directory.getIndexName() == null) {
throw new IndexerException("InfinispanDirectory must not have a null indexName");
}
try {
directoryMap.putIfAbsent(directory.getIndexName(), directory);
return writerMap.compute(directory.getIndexName());
} catch (Exception e) {
log.error("Could not obtain an IndexWriter");
throw new IndexerException(e);
}
}

public InfinispanDirectory getDirectoryByIndexName(String indexName) {
return directoryMap.get(indexName);
}

public Address getMasterAddress() {
return cacheManager.getCoordinator();
}

public Address getLocalAddress() {
return cacheManager.getAddress();
}

/**
* Dispatches the message to the appropriate destination depending on the
* role of this node.
*
* @param indexOperations
* @throws IndexerException
*/
public void dispatchOperations(IndexOperations indexOperations) throws IndexerException {
if (cacheManager.isCoordinator()) {
// process the messages locally
this.localOperationDispatcher.dispatch(indexOperations);
} else {
// send them to the remote
this.remoteOperationDispatcher.dispatch(indexOperations);
}
}

private class InfinispanIndexWriterMemoizer implements Computable<String, InfinispanIndexWriter> {
@Override
public InfinispanIndexWriter compute(String indexName) throws InterruptedException, ExecutionException {
InfinispanIndexWriter iw = new InfinispanIndexWriter(AmanuensisManager.this, AmanuensisManager.this.directoryMap.get(indexName));
return iw;
}

}

}
131 changes: 131 additions & 0 deletions src/main/java/net/dataforte/infinispan/amanuensis/ExecutorContext.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
package net.dataforte.infinispan.amanuensis;

import java.io.IOException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import net.dataforte.commons.slf4j.LoggerFactory;
import net.dataforte.infinispan.amanuensis.backend.lucene.LuceneOperationExecutorFactory;

import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.index.IndexWriter;
import org.infinispan.lucene.InfinispanDirectory;
import org.slf4j.Logger;

public class ExecutorContext {
private static final Logger log = LoggerFactory.make();
private static final String THREAD_GROUP_PREFIX = "Amanuensis: ";
private static final int QUEUE_MAX_LENGTH = 1000;
private static final IndexWriter.MaxFieldLength MAX_FIELD_LENGTH = new IndexWriter.MaxFieldLength(IndexWriter.DEFAULT_MAX_FIELD_LENGTH);
private final ExecutorService executor;
private LuceneOperationExecutorFactory operationExecutorFactory;
private final InfinispanDirectory directory;
private IndexWriter writer;
private Analyzer analyzer;

public ExecutorContext(InfinispanDirectory directory, Analyzer analyzer) {
this.executor = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(QUEUE_MAX_LENGTH), new ExecutorThreadFactory("IndexWriter"), new BlockingPolicy());
this.directory = directory;
this.analyzer = analyzer;
this.operationExecutorFactory = new LuceneOperationExecutorFactory();
}

public ExecutorService getExecutor() {
return executor;
}

public LuceneOperationExecutorFactory getOperationExecutorFactory() {
return operationExecutorFactory;
}

public void setOperationExecutorFactory(LuceneOperationExecutorFactory operationExecutorFactory) {
this.operationExecutorFactory = operationExecutorFactory;
}

public InfinispanDirectory getDirectory() {
return directory;
}

public synchronized IndexWriter getWriter() throws IndexerException {
if (writer != null)
return writer;
try {
writer = new IndexWriter(directory, analyzer, true, MAX_FIELD_LENGTH);
} catch (IOException e) {
writer = null;
throw new IndexerException("Error while creating writer for index " + directory.getIndexName(), e);
}
return writer;
}

public synchronized void commit() throws IndexerException {
if (writer != null) {
try {
writer.commit();
} catch (IOException e) {
throw new IndexerException("Error while committing writer for index " + directory.getIndexName(), e);
}
}
}

public synchronized void close() {
IndexWriter w = writer;
writer = null;
if (w != null) {
try {
w.close();
} catch (IOException e) {
log.error("Error while closing writer for index " + directory.getIndexName(), e);
}
}
}

public synchronized void forceUnlock() {
try {
try {
close();
} finally {
IndexWriter.unlock(this.directory);
}
} catch (Exception e) {
log.warn("Error while unlocking writer for index " + directory.getIndexName(), e);
}
}

private static class ExecutorThreadFactory implements ThreadFactory {

final ThreadGroup group;
final AtomicInteger threadNumber = new AtomicInteger(1);
final String namePrefix;

ExecutorThreadFactory(String groupname) {
SecurityManager s = System.getSecurityManager();
group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup();
namePrefix = THREAD_GROUP_PREFIX + groupname + "-";
}

public Thread newThread(Runnable r) {
Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0);
return t;
}

}

public static class BlockingPolicy implements RejectedExecutionHandler {
public BlockingPolicy() {
}

public void rejectedExecution(Runnable r, ThreadPoolExecutor exec) {
try {
exec.getQueue().put(r);
} catch (InterruptedException e) {
log.error("Work discarded, thread was interrupted while waiting for space to schedule: {}", r);
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package net.dataforte.infinispan.amanuensis;

import java.io.Serializable;

/**
* This class indicates an operation to be performed on an index
*
* @author Tristan Tarrant
*/
public abstract class IndexOperation implements Serializable {

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package net.dataforte.infinispan.amanuensis;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;

/**
* This class implements the message that is sent
* from the slaves to the master. It contains the name
* of the index on which to operate and an ordered list of
* operations to apply to that index
*
* @author Tristan Tarrant
*/
public class IndexOperations implements Serializable {
final String indexName;
List<IndexOperation> operations = new ArrayList<IndexOperation>();

public IndexOperations(String indexName) {
this.indexName = indexName;
}

public IndexOperations(String indexName, List<IndexOperation> operations) {
this.indexName = indexName;
addOperations(operations);
}

public IndexOperations(String indexName, IndexOperation... operations) {
this.indexName = indexName;
addOperations(operations);
}

public void addOperations(List<IndexOperation> operations) {
this.operations.addAll(operations);
}

public void addOperations(IndexOperation... ops) {
for (IndexOperation op : ops) {
operations.add(op);
}
}

public String getIndexName() {
return indexName;
}

public List<IndexOperation> getOperations() {
return operations;
}

@Override
public String toString() {
return "IndexOperations [indexName=" + indexName + ", operations=" + operations + "]";
}
}
Loading

0 comments on commit ec4d034

Please sign in to comment.