Skip to content

Commit 10a3bf7

Browse files
committed
Fixing broken feeder mode, update to ES 1.3.1
The feeder mode was broken due to wrong classpath control in the example shell scripts and the river state move into the cluster state. Cluster state is not possible to use in TransportClient, so a workaround was introduced to let the JDBC plugin start also in a TransportClient. More comments added at example shell scripts to clarify the use of feeder mode in a JVM outside of an Elasticsearch node. Update to Elasticsearch 1.3.1 An NPE in the river state getting should be fixed. Dropping unused dependency of hamcrest matchers.
1 parent 130dc51 commit 10a3bf7

File tree

18 files changed

+565
-62
lines changed

18 files changed

+565
-62
lines changed

bin/feeder.in.sh

+26
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
2+
# Configuration for Elasticsearch JDBC feeder mechanism
3+
4+
# Java home
5+
6+
# for Mac OS X
7+
#JAVA_HOME=$(/usr/libexec/java_home -v 1.7*)
8+
JAVA_HOME=$(/usr/libexec/java_home -v 1.8*)
9+
# for Linux
10+
#JAVA_HOME"/etc/alternatives/java"
11+
12+
# Elasticsearch home
13+
ES_HOME=~es/elasticsearch-1.3.0
14+
15+
# Elasticsearch plugins folder where "jdbc" plugin is installed
16+
ES_PATH_PLUGINS=${ES_HOME}/plugins
17+
18+
# Classpath for loading JDBC plugin from external Java execution, without other plugins.
19+
#
20+
# The classpath is very similar to Elasticsearch classpath, but it must follow these rules:
21+
# - first, the elasticsearch*.jar in elasticsearch "lib" folder
22+
# - the other jars in elasticsearch "lib" folder
23+
# - the plugins/jdbc folder for log4j.properties (or log4j2.xml)
24+
# - the plugins/jdbc jars (plugin jar and JDBC driver jars)
25+
# - no more, no other (server-side) plugins etc. !
26+
ES_JDBC_CLASSPATH=${ES_HOME}/lib/elasticsearch\*:${ES_HOME}/lib/\*:${ES_PATH_PLUGINS}/jdbc:${ES_PATH_PLUGINS}/jdbc/\*

bin/feeder/mysql/create.sh

+7-5
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
11
#!/bin/sh
22

3-
java="/usr/bin/java"
4-
#java="/Library/Java/JavaVirtualMachines/jdk1.8.0.jdk/Contents/Home/bin/java"
5-
#java="/usr/java/jdk1.8.0/bin/java"
3+
# This example shows two concurrent feeds from a MySQL database (conncurreny = 2)
4+
# It is possible to connect to many databases in parallel and fetch data for Elasticsearch.
5+
6+
DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"
7+
. ${DIR}/../../feeder.in.sh
68

79
echo '
810
{
@@ -39,7 +41,7 @@ echo '
3941
}
4042
]
4143
}
42-
' | ${java} \
43-
-cp $(pwd):$(pwd)/\*:$(pwd)/../../lib/\* \
44+
' | ${JAVA_HOME}/bin/java \
45+
-cp ${ES_JDBC_CLASSPATH} \
4446
org.xbib.elasticsearch.plugin.feeder.Runner \
4547
org.xbib.elasticsearch.plugin.feeder.jdbc.JDBCFeeder
File renamed without changes.

bin/feeder/mysql/geo.sh

+39-12
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,24 @@
11
#!/bin/sh
22

3-
# a complete minimalistic geo "push" example for MySQL geo -> Elasticsearch geo search
3+
# This example shows a complete minimalistic geo push & search example for MySQL -> Elasticsearch
44

5+
# - install Elasticsearch
6+
# - run Elasticsearch
57
# - install MySQL in /usr/local/mysql
6-
# - start MySQL on localhost:3306 (default)
7-
# - prepare a 'test' database in MySQL
8-
# - create empty user '' with empty password ''
9-
# - execute SQL in "geo.dump" /usr/local/mysql/bin/mysql test < src/test/resources/geo.dump
10-
# - then run this script from $ES_HOME/plugins/jdbc: bash bin/feeder/mysql/geo.sh
8+
# - start MySQL on localhost:3306
9+
# - as MySQL root admin, prepare a 'geo' database in MySQL :
10+
# CREATE DATABASE geo
11+
# - as MySQL root admin, create empty user '' with empty password '' :
12+
# GRANT ALL PRIVILEGES ON geo.* TO ''@'localhost' IDENTIFIED BY '';
13+
# - execute SQL in geo.dump
14+
# /usr/local/mysql/bin/mysql geo < ./bin/feeder/mysql/geo.dump
15+
# - then run this script
16+
# ./bin/feeder/mysql/geo.sh
1117

12-
curl -XDELETE 'localhost:9200/myjdbc'
18+
DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"
19+
. ${DIR}/../../feeder.in.sh
1320

14-
java="/usr/bin/java"
15-
#java="/Library/Java/JavaVirtualMachines/jdk1.8.0.jdk/Contents/Home/bin/java"
16-
#java="/usr/java/jdk1.8.0/bin/java"
21+
curl -XDELETE 'localhost:9200/myjdbc'
1722

1823
echo '
1924
{
@@ -46,8 +51,8 @@ echo '
4651
}
4752
}
4853
}
49-
' | ${java} \
50-
-cp $(pwd):$(pwd)/\*:$(pwd)/../../lib/\* \
54+
' | ${JAVA_HOME}/bin/java \
55+
-cp ${ES_JDBC_CLASSPATH} \
5156
org.xbib.elasticsearch.plugin.feeder.Runner \
5257
org.xbib.elasticsearch.plugin.feeder.jdbc.JDBCFeeder
5358

@@ -73,3 +78,25 @@ curl -XPOST 'localhost:9200/myjdbc/_search?pretty' -d '
7378
}
7479
}
7580
}'
81+
82+
# Expected result:
83+
# {"_shards":{"total":2,"successful":1,"failed":0}}{
84+
# "took" : 117,
85+
# "timed_out" : false,
86+
# "_shards" : {
87+
# "total" : 1,
88+
# "successful" : 1,
89+
# "failed" : 0
90+
# },
91+
# "hits" : {
92+
# "total" : 1,
93+
# "max_score" : 1.0,
94+
# "hits" : [ {
95+
# "_index" : "myjdbc",
96+
# "_type" : "mytype",
97+
# "_id" : "Dom",
98+
# "_score" : 1.0,
99+
# "_source":{"city":"Köln","zip":"50667","address":"Domkloster 4","location":{"lat":50.9406645,"lon":6.9599115}}
100+
# } ]
101+
# }
102+
# }

bin/feeder/oracle/create.sh

+7-5
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
11
#!/bin/sh
22

3-
java="/usr/bin/java"
4-
#java="/Library/Java/JavaVirtualMachines/jdk1.8.0.jdk/Contents/Home/bin/java"
5-
#java="/usr/java/jdk1.8.0/bin/java"
3+
# This example is a template to connect to Oracle in feeder mode.
4+
# The JDBC URL and SQL must be replaced by working ones.
5+
6+
DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"
7+
. ${DIR}/../../feeder.in.sh
68

79
echo '
810
{
@@ -24,7 +26,7 @@ echo '
2426
}
2527
}
2628
}
27-
' | ${java} \
28-
-cp $(pwd):$(pwd)/\*:$(pwd)/../../lib/\* \
29+
' | ${JAVA_HOME}/bin/java \
30+
-cp ${ES_JDBC_CLASSPATH} \
2931
org.xbib.elasticsearch.plugin.feeder.Runner \
3032
org.xbib.elasticsearch.plugin.feeder.jdbc.JDBCFeeder

pom.xml

+6-15
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,12 @@
77

88
<groupId>org.xbib.elasticsearch.plugin</groupId>
99
<artifactId>elasticsearch-river-jdbc</artifactId>
10-
<version>1.3.0.1</version>
10+
<version>1.3.0.2</version>
1111

1212
<packaging>jar</packaging>
1313

1414
<name>elasticsearch-river-jdbc</name>
15-
<description>JDBC River for ElasticSearch</description>
15+
<description>JDBC River for Elasticsearch</description>
1616

1717
<url>http://github.com/jprante/elasticsearch-river-jdbc</url>
1818

@@ -68,12 +68,11 @@
6868
</repository>
6969
</repositories>
7070

71-
7271
<properties>
7372
<github.global.server>github</github.global.server>
7473
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
75-
<java.compile.version>1.7</java.compile.version>
76-
<elasticsearch.version>1.3.0</elasticsearch.version>
74+
<java.compiler.version>1.7</java.compiler.version>
75+
<elasticsearch.version>1.3.1</elasticsearch.version>
7776
</properties>
7877

7978
<dependencies>
@@ -92,14 +91,6 @@
9291
<scope>test</scope>
9392
</dependency>
9493

95-
<dependency>
96-
<groupId>org.hamcrest</groupId>
97-
<artifactId>hamcrest-all</artifactId>
98-
<version>1.1</version>
99-
<type>jar</type>
100-
<scope>test</scope>
101-
</dependency>
102-
10394
<dependency>
10495
<groupId>org.apache.logging.log4j</groupId>
10596
<artifactId>log4j-slf4j-impl</artifactId>
@@ -145,8 +136,8 @@
145136
<artifactId>maven-compiler-plugin</artifactId>
146137
<version>3.1</version>
147138
<configuration>
148-
<source>${java.compile.version}</source>
149-
<target>${java.compile.version}</target>
139+
<source>${java.compiler.version}</source>
140+
<target>${java.compiler.version}</target>
150141
<encoding>UTF-8</encoding>
151142
<optimize>true</optimize>
152143
<showDeprecation>true</showDeprecation>

src/main/assemblies/plugin.xml

+1
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
<directory>${project.basedir}/src/test/resources</directory>
3737
<outputDirectory>/</outputDirectory>
3838
<includes>
39+
<include>log4j.properties</include>
3940
<include>log4j2.xml</include>
4041
</includes>
4142
</fileSet>

src/main/java/org/xbib/elasticsearch/action/river/jdbc/state/delete/TransportDeleteRiverStateAction.java

+5-3
Original file line numberDiff line numberDiff line change
@@ -7,21 +7,22 @@
77
import org.elasticsearch.cluster.ClusterState;
88
import org.elasticsearch.cluster.ack.ClusterStateUpdateResponse;
99
import org.elasticsearch.common.inject.Inject;
10+
import org.elasticsearch.common.inject.Injector;
1011
import org.elasticsearch.common.settings.Settings;
1112
import org.elasticsearch.threadpool.ThreadPool;
1213
import org.elasticsearch.transport.TransportService;
1314
import org.xbib.elasticsearch.action.river.jdbc.state.RiverStateService;
1415

1516
public class TransportDeleteRiverStateAction extends TransportMasterNodeOperationAction<DeleteRiverStateRequest, DeleteRiverStateResponse> {
1617

17-
private final RiverStateService riverStateService;
18+
private final Injector injector;
1819

1920
@Inject
2021
public TransportDeleteRiverStateAction(Settings settings, ThreadPool threadPool,
2122
ClusterService clusterService, TransportService transportService,
22-
RiverStateService riverStateService) {
23+
Injector injector) {
2324
super(settings, DeleteRiverStateAction.NAME, transportService, clusterService, threadPool);
24-
this.riverStateService = riverStateService;
25+
this.injector = injector;
2526
}
2627

2728
@Override
@@ -41,6 +42,7 @@ protected DeleteRiverStateResponse newResponse() {
4142

4243
@Override
4344
protected void masterOperation(DeleteRiverStateRequest request, ClusterState state, final ActionListener<DeleteRiverStateResponse> listener) throws ElasticsearchException {
45+
RiverStateService riverStateService = injector.getInstance(RiverStateService.class);
4446
riverStateService.unregisterRiver(new RiverStateService.UnregisterRiverStateRequest("delete_river_state[" + request.getRiverName() + "]", request.getRiverName())
4547
.masterNodeTimeout(request.masterNodeTimeout())
4648
.ackTimeout(request.ackTimeout()), new ActionListener<ClusterStateUpdateResponse>() {

src/main/java/org/xbib/elasticsearch/action/river/jdbc/state/get/GetRiverStateResponse.java

+3
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,8 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
4646
@Override
4747
public void readFrom(StreamInput in) throws IOException {
4848
super.readFrom(in);
49+
getRiverStateRequest = new GetRiverStateRequest();
50+
getRiverStateRequest.readFrom(in);
4951
int len = in.readInt();
5052
ImmutableList.Builder<RiverState> builder = ImmutableList.builder();
5153
for (int i = 0; i < len; i++) {
@@ -59,6 +61,7 @@ public void readFrom(StreamInput in) throws IOException {
5961
@Override
6062
public void writeTo(StreamOutput out) throws IOException {
6163
super.writeTo(out);
64+
getRiverStateRequest.writeTo(out);
6265
out.writeInt(states.size());
6366
for (RiverState rs : states) {
6467
rs.writeTo(out);

src/main/java/org/xbib/elasticsearch/action/river/jdbc/state/put/TransportPutRiverStateAction.java

+5-3
Original file line numberDiff line numberDiff line change
@@ -7,21 +7,22 @@
77
import org.elasticsearch.cluster.ClusterState;
88
import org.elasticsearch.cluster.ack.ClusterStateUpdateResponse;
99
import org.elasticsearch.common.inject.Inject;
10+
import org.elasticsearch.common.inject.Injector;
1011
import org.elasticsearch.common.settings.Settings;
1112
import org.elasticsearch.threadpool.ThreadPool;
1213
import org.elasticsearch.transport.TransportService;
1314
import org.xbib.elasticsearch.action.river.jdbc.state.RiverStateService;
1415

1516
public class TransportPutRiverStateAction extends TransportMasterNodeOperationAction<PutRiverStateRequest, PutRiverStateResponse> {
1617

17-
private final RiverStateService riverStateService;
18+
private final Injector injector;
1819

1920
@Inject
2021
public TransportPutRiverStateAction(Settings settings, ThreadPool threadPool,
2122
ClusterService clusterService, TransportService transportService,
22-
RiverStateService riverStateService) {
23+
Injector injector) {
2324
super(settings, PutRiverStateAction.NAME, transportService, clusterService, threadPool);
24-
this.riverStateService = riverStateService;
25+
this.injector = injector;
2526
}
2627

2728
@Override
@@ -41,6 +42,7 @@ protected PutRiverStateResponse newResponse() {
4142

4243
@Override
4344
protected void masterOperation(PutRiverStateRequest request, ClusterState state, final ActionListener<PutRiverStateResponse> listener) throws ElasticsearchException {
45+
RiverStateService riverStateService = injector.getInstance(RiverStateService.class);
4446
riverStateService.registerRiver(new RiverStateService.RegisterRiverStateRequest("put_river_state[" + request.getRiverName() + "]", request.getRiverName(), request.getRiverType())
4547
.riverState(request.getRiverState())
4648
.masterNodeTimeout(request.masterNodeTimeout())

src/main/java/org/xbib/elasticsearch/plugin/feeder/AbstractFeeder.java

+17
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
import org.xbib.elasticsearch.action.river.jdbc.state.RiverState;
1515
import org.xbib.elasticsearch.support.client.Ingest;
1616
import org.xbib.elasticsearch.support.client.node.NodeClient;
17+
import org.xbib.io.URIUtil;
1718
import org.xbib.pipeline.AbstractPipeline;
1819
import org.xbib.pipeline.Pipeline;
1920
import org.xbib.pipeline.PipelineException;
@@ -25,6 +26,7 @@
2526
import java.io.IOException;
2627
import java.io.InputStream;
2728
import java.io.Reader;
29+
import java.net.URI;
2830
import java.util.Arrays;
2931
import java.util.List;
3032
import java.util.Map;
@@ -410,4 +412,19 @@ public InputStream getDefaultMapping(String index, String type) {
410412

411413
public abstract void executeTask(Map<String, Object> map) throws Exception;
412414

415+
public Settings clientSettings(URI connectionSpec) {
416+
return settingsBuilder()
417+
.put("name", "feeder") // prevents lookup of names.txt, we don't have it, and marks this node as "feeder". See also module load skipping in JDBCRiverPlugin
418+
.put("network.server", false) // this is not a server
419+
.put("node.client", true) // this is an Elasticearch client
420+
.put("cluster.name", URIUtil.parseQueryString(connectionSpec).get("es.cluster.name")) // specified remote ES cluster
421+
.put("client.transport.sniff", false) // we do not sniff (should be configurable)
422+
.put("client.transport.ignore_cluster_name", false) // respect cluster name setting
423+
.put("client.transport.ping_timeout", "30s") // large ping timeout (should not be required)
424+
.put("client.transport.nodes_sampler_interval", "30s") // only for sniff sampling
425+
.put("path.plugins", ".dontexist") // pointing to a non-exiting folder means, this disables loading site plugins
426+
// we do not need to change class path settings when using the "feeder" name trick
427+
.build();
428+
}
429+
413430
}

src/main/java/org/xbib/elasticsearch/plugin/feeder/jdbc/JDBCFeeder.java

+5-3
Original file line numberDiff line numberDiff line change
@@ -111,12 +111,14 @@ public Feeder<T, R, P> beforeRun() throws IOException {
111111
Runtime.getRuntime().availableProcessors());
112112
ByteSizeValue maxvolume = settings.getAsBytesSize("maxbulkvolume", ByteSizeValue.parseBytesSizeValue("10m"));
113113
TimeValue maxrequestwait = settings.getAsTime("maxrequestwait", TimeValue.timeValueSeconds(60));
114-
ingest = new BulkTransportClient();
114+
BulkTransportClient ingest = new BulkTransportClient();
115+
URI connSpec = URI.create(settings.get("elasticsearch"));
115116
ingest.maxActionsPerBulkRequest(maxbulkactions)
116117
.maxConcurrentBulkRequests(maxconcurrentbulkrequests)
117118
.maxVolumePerBulkRequest(maxvolume)
118-
.maxRequestWait(maxrequestwait);
119-
ingest.newClient(URI.create(settings.get("elasticsearch")));
119+
.maxRequestWait(maxrequestwait)
120+
.newClient(connSpec, clientSettings(connSpec));
121+
this.ingest = ingest;
120122
}
121123
// create queue
122124
super.beforeRun();

src/main/java/org/xbib/elasticsearch/plugin/river/jdbc/JDBCRiverPlugin.java

+9-2
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import org.elasticsearch.action.ActionModule;
44
import org.elasticsearch.common.inject.Inject;
55
import org.elasticsearch.common.inject.Module;
6+
import org.elasticsearch.common.settings.Settings;
67
import org.elasticsearch.plugins.AbstractPlugin;
78
import org.elasticsearch.rest.RestModule;
89
import org.elasticsearch.river.RiversModule;
@@ -24,8 +25,11 @@
2425

2526
public class JDBCRiverPlugin extends AbstractPlugin {
2627

28+
private final Settings settings;
29+
2730
@Inject
28-
public JDBCRiverPlugin() {
31+
public JDBCRiverPlugin(Settings settings) {
32+
this.settings = settings;
2933
}
3034

3135
@Override
@@ -43,7 +47,10 @@ public String description() {
4347
@Override
4448
public Collection<Class<? extends Module>> modules() {
4549
Collection<Class<? extends Module>> modules = newArrayList();
46-
modules.add(RiverStateModule.class);
50+
// if we are in "feeder" node mode, we skip initiating the server-side only river state module
51+
if (!"feeder".equals(settings.get("name"))) {
52+
modules.add(RiverStateModule.class);
53+
}
4754
return modules;
4855
}
4956

src/main/java/org/xbib/elasticsearch/support/client/bulk/BulkTransportClient.java

-9
Original file line numberDiff line numberDiff line change
@@ -96,15 +96,6 @@ public BulkTransportClient newClient(Client client) {
9696
return this.newClient(findURI());
9797
}
9898

99-
/**
100-
* Create a new client
101-
*
102-
* @return this client
103-
*/
104-
public BulkTransportClient newClient() {
105-
return this.newClient(findURI());
106-
}
107-
10899
/**
109100
* Create new client
110101
* The URI describes host and port of the node the client should connect to,

0 commit comments

Comments
 (0)