Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions .classpath
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
<?xml version="1.0" encoding="UTF-8"?>
<classpath>
<classpathentry kind="src" path="src/main/java"/>
<classpathentry kind="src" path="src/test/java"/>
<classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER"/>
<classpathentry kind="lib" path="C:/Users/BryzzhinIS/Downloads/rabbitmq-java-client-bin-3.5.4/commons-cli-1.1.jar"/>
<classpathentry kind="lib" path="C:/Users/BryzzhinIS/Downloads/rabbitmq-java-client-bin-3.5.4/commons-io-1.2.jar"/>
<classpathentry kind="lib" path="C:/Users/BryzzhinIS/Downloads/rabbitmq-java-client-bin-3.5.4/junit.jar"/>
<classpathentry kind="lib" path="C:/Users/BryzzhinIS/Downloads/rabbitmq-java-client-bin-3.5.4/rabbitmq-client.jar"/>
<classpathentry kind="lib" path="C:/Users/BryzzhinIS/Downloads/rabbitmq-java-client-bin-3.5.4/rabbitmq-client-tests.jar"/>
<classpathentry kind="con" path="org.eclipse.jdt.junit.JUNIT_CONTAINER/4"/>
<classpathentry kind="lib" path="C:/Users/BryzzhinIS/Downloads/spring-amqp-1.1.4.RELEASE.jar/spring-amqp-1.1.4.RELEASE.jar"/>
<classpathentry kind="lib" path="C:/Users/BryzzhinIS/Downloads/spring-rabbit-1.1.4.RELEASE.jar/spring-rabbit-1.1.4.RELEASE.jar"/>
<classpathentry kind="lib" path="C:/Users/BryzzhinIS/Downloads/org.springframework.context.jar/org.springframework.context.jar"/>
<classpathentry kind="lib" path="C:/Users/BryzzhinIS/Downloads/spring-beans-3.0.4.RELEASE.jar"/>
<classpathentry kind="output" path="bin"/>
</classpath>
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
target/
/bin/
17 changes: 17 additions & 0 deletions .project
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
<?xml version="1.0" encoding="UTF-8"?>
<projectDescription>
<name>rabbitmq-oracle-stored-procedures</name>
<comment></comment>
<projects>
</projects>
<buildSpec>
<buildCommand>
<name>org.eclipse.jdt.core.javabuilder</name>
<arguments>
</arguments>
</buildCommand>
</buildSpec>
<natures>
<nature>org.eclipse.jdt.core.javanature</nature>
</natures>
</projectDescription>
12 changes: 12 additions & 0 deletions .settings/org.eclipse.jdt.core.prefs
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
eclipse.preferences.version=1
org.eclipse.jdt.core.compiler.codegen.inlineJsrBytecode=enabled
org.eclipse.jdt.core.compiler.codegen.methodParameters=do not generate
org.eclipse.jdt.core.compiler.codegen.targetPlatform=1.5
org.eclipse.jdt.core.compiler.codegen.unusedLocal=preserve
org.eclipse.jdt.core.compiler.compliance=1.5
org.eclipse.jdt.core.compiler.debug.lineNumber=generate
org.eclipse.jdt.core.compiler.debug.localVariable=generate
org.eclipse.jdt.core.compiler.debug.sourceFile=generate
org.eclipse.jdt.core.compiler.problem.assertIdentifier=error
org.eclipse.jdt.core.compiler.problem.enumIdentifier=error
org.eclipse.jdt.core.compiler.source=1.5
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>2.8.1</version>
<version>3.5.4</version>
</dependency>
</dependencies>
<description>Publish RabbitMQ AMQP notifications from Oracle DB</description>
Expand Down
8 changes: 4 additions & 4 deletions sql/definitions.sql
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
-- FIXME use a schema named 'amqp'
create or replace function amqp_exchange_declare
(brokerId IN number, exchange IN varchar2, exchange_type IN varchar2)
(brokerId IN number, exchange IN varchar2, exchange_type IN varchar2, is_durable IN number)
return NUMBER
as language java
name 'com.zenika.oracle.amqp.RabbitMQPublisher.amqpExchangeDeclare(int, java.lang.String, java.lang.String) return int';
name 'com.zenika.oracle.amqp.RabbitMQPublisher.amqpExchangeDeclare(int, java.lang.String, java.lang.String, int) return int';

create or replace function amqp_publish
(brokerId IN number, exchange IN varchar2, routingKey IN varchar2, message IN varchar2)
(brokerId IN number, exchange IN varchar2, routingKey IN varchar2, message IN varchar2, xml_string_properties IN varchar2)
return NUMBER
as language java
name 'com.zenika.oracle.amqp.RabbitMQPublisher.amqpPublish(int, java.lang.String, java.lang.String, java.lang.String) return int';
name 'com.zenika.oracle.amqp.RabbitMQPublisher.amqpPublish(int, java.lang.String, java.lang.String, java.lang.String, java.lang.String) return int';

create or replace procedure amqp_print_configuration
(brokerId IN number)
Expand Down
18 changes: 18 additions & 0 deletions sql/tests.sql
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,23 @@ insert into broker (BROKER_ID,HOST,PORT,VHOST,USERNAME,PASSWORD) values (1,'192.
-- tests
select amqp_exchange_declare(1, 'oracle', 'fanout') from dual;
select amqp_publish(1, 'oracle', 'key', 'Hello World!') from dual;
select amqp_publish(1, 'oracle', 'key', 'Hello World!'
,'<properities>
<DELIVERYMODE>2</DELIVERYMODE>
<CONTENTTYPE>application/vnd.masstransit+json</CONTENTTYPE>
<MESSAGEID>'||regexp_replace(rawtohex(sys_guid()),'([A-F0-9]{8})([A-F0-9]{4})([A-F0-9]{4})([A-F0-9]{4})([A-F0-9]{12})', '\1-\2-\3-\4-\5') ||'</MESSAGEID>
<HEADEARS>
<content_type>application/vnd.masstransit+json</content_type>
<head1>test</head1>
</HEADEARS>
<CONTENTTYPE>XML</CONTENTTYPE>
<PRIORITY>10</PRIORITY>
<CONTENTENCODING>iso-8859-5</CONTENTENCODING>
</properities>') from dual;
select amqp_publish(brokerid => 1,
exchange => 'oracle',
routingkey => 'key',
message => 'Hello World!',
xml_string_properties => '<properities></properities>') from dual;
call amqp_print_configuration(1);
call amqp_probe_servers(1);
105 changes: 67 additions & 38 deletions src/main/java/com/zenika/oracle/amqp/RabbitMQPublisher.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@
import java.util.Hashtable;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
//import java.util.Map;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.Address;
import com.rabbitmq.client.Channel;
Expand Down Expand Up @@ -61,7 +62,7 @@ public class RabbitMQPublisher {
* @see Channel#exchangeDeclare(String, String)
* @return an error code, see the source
*/
public static int amqpExchangeDeclare(int brokerId, String exchange, String type) {
public static int amqpExchangeDeclare(int brokerId, String exchange, String type, int durable) {
// FIXME declare on all brokers for brokerId?
Connection connection = null;
Channel channel = null;
Expand All @@ -71,7 +72,10 @@ public static int amqpExchangeDeclare(int brokerId, String exchange, String type
channel = connection.createChannel();

// declare the exchange
channel.exchangeDeclare(exchange, type);
if(durable == 1)
channel.exchangeDeclare(exchange, type, true);
else
channel.exchangeDeclare(exchange, type);

} catch (IOException ioe) {
ioe.printStackTrace();
Expand All @@ -87,6 +91,9 @@ public static int amqpExchangeDeclare(int brokerId, String exchange, String type
connection.close(CONNECTION_CLOSE_TIMEOUT);
}

} catch (TimeoutException e) {
e.printStackTrace();
return E_CANNOT_CLOSE;
} catch (IOException e) {
e.printStackTrace();
return E_CANNOT_CLOSE;
Expand All @@ -97,41 +104,21 @@ public static int amqpExchangeDeclare(int brokerId, String exchange, String type
return EXIT_SUCCESS;
}

/**
* Publish an AMQP message to the given exchange.
*
* @param brokerId
* the ID of the broker in the configuration table
* @param exchange
* the name of the AMQP exchange
* @param routingKey
* the AMQP routing key
* @param message
* the payload
* @return an error code, see the source
*/
public static int amqpPublish(int brokerId, String exchange, String routingKey, String message) {
return amqpPublish(brokerId, exchange, routingKey, message, null);
}

/**
* FIXME timeout intelligently. FIXME test whether we can declare a type conversion for a Map.
*/
public static int amqpPublish(int brokerId, String exchange, String routingKey, String message,
Map<String, String> properties) {

public static int amqpPublish(BrokerConnectionState connectionState, String exchange, String routingKey, String message, String xml_string_properties) {

Connection connection = null;
Channel channel = null;
try {
BrokerConnectionState connectionState = getConnectionState(brokerId);
//BrokerConnectionState connectionState = getConnectionState(brokerId);
connection = openConnection(connectionState);
channel = connection.createChannel();

// send the message
channel.basicPublish(exchange, routingKey, false, false, null, message.getBytes());

// remember the current broker used
state.put(brokerId, connectionState.currentAddress);
if(xml_string_properties == null)
channel.basicPublish(exchange, routingKey, false, false, null, message.getBytes());
else
channel.basicPublish(exchange, routingKey, false, false, xml.basicPropertiesFromXml(xml_string_properties), message.getBytes());

} catch (IOException ioe) {
ioe.printStackTrace();
Expand All @@ -150,12 +137,37 @@ public static int amqpPublish(int brokerId, String exchange, String routingKey,
} catch (IOException e) {
e.printStackTrace();
return E_CANNOT_CLOSE;
} catch (TimeoutException e) {
e.printStackTrace();
return E_CANNOT_CLOSE;
}
}

// everything went OK
return EXIT_SUCCESS;
}


/**
* Publish an AMQP message to the given exchange.
*
* @param brokerId
* the ID of the broker in the configuration table
* @param exchange
* the name of the AMQP exchange
* @param routingKey
* the AMQP routing key
* @param message
* the payload
* @return an error code, see the source
*/
public static int amqpPublish(int brokerId, String exchange, String routingKey, String message) {
return amqpPublish(brokerId, exchange, routingKey, message, null);
}

public static int amqpPublish(int brokerId, String exchange, String routingKey, String message, String xml_string_properties) {
return amqpPublish(getConnectionState(brokerId), exchange, routingKey, message, xml_string_properties);
}

/**
* Print the current configuration for broker definitions. In case there's more than one broker per ID, the active
Expand Down Expand Up @@ -218,10 +230,10 @@ public static void amqpProbeAllServers(int brokerId) {
try {
currConnection = openConnection(currFullAddress);
System.out.println(currFullAddress + " : SUCCESSFUL");

} catch (IOException ioe) {
System.out.println(currFullAddress + " : FAILED (" + ioe.getMessage() + ')');

} catch (TimeoutException ioe) {
System.out.println(currFullAddress + " : FAILED (" + ioe.getMessage() + ')');
} finally {
if (currConnection != null) {
try {
Expand All @@ -236,13 +248,25 @@ public static void amqpProbeAllServers(int brokerId) {
}
}

public static BrokerConnectionState createConnectionState(String host,int port, String vhost, String username, String password){
BrokerConnectionState connectionState = new BrokerConnectionState();
connectionState.currentAddress = new FullAddress(new Address(host, port), vhost, username,password);
connectionState.addresses.add(connectionState.currentAddress);
return connectionState;
}

private static BrokerConnectionState getConnectionState(int brokerId) {
BrokerConnectionState connectionState = new BrokerConnectionState();
fillAllAdresses(connectionState, brokerId);

// fill in the previously used broker instance
connectionState.currentAddress = state.get(brokerId);

FullAddress currAddress = state.get(brokerId);
if (currAddress == null) {
fillAllAdresses(connectionState, brokerId);
currAddress = state.get(brokerId);
state.put(brokerId, currAddress);
}
connectionState.currentAddress = currAddress;
return connectionState;
}

Expand All @@ -265,8 +289,7 @@ private static void fillAllAdresses(BrokerConnectionState connectionState, int b
String username = results.getString(4);
String password = results.getString(5);

FullAddress currAddress = new FullAddress(new Address(host, port), vhost, username,
password);
FullAddress currAddress = new FullAddress(new Address(host, port), vhost, username, password);
connectionState.addresses.add(currAddress);
}

Expand Down Expand Up @@ -316,12 +339,18 @@ private static Connection openConnection(BrokerConnectionState connectionState)
System.err.println("connected to " + currFullAddress);
}

} catch (TimeoutException ioe) {
// we catch SocketTimeoutException
if (ENABLE_DEBUG) {
System.err.println("cannot connect to " + currFullAddress + " (" + ioe.getMessage() + ')');
}
} catch (IOException ioe) {
// we catch SocketTimeoutException
if (ENABLE_DEBUG) {
System.err.println("cannot connect to " + currFullAddress + " (" + ioe.getMessage() + ')');
}
}

}
}

Expand All @@ -334,7 +363,7 @@ private static Connection openConnection(BrokerConnectionState connectionState)
return connection;
}

private static Connection openConnection(FullAddress address) throws IOException {
private static Connection openConnection(FullAddress address) throws IOException, TimeoutException {
Connection connection = null;

if (ENABLE_DEBUG) {
Expand All @@ -360,7 +389,7 @@ private static Connection openConnection(FullAddress address) throws IOException
return connection;
}

private static class BrokerConnectionState {
protected static class BrokerConnectionState {
public List<FullAddress> addresses;
public FullAddress currentAddress;

Expand Down
64 changes: 64 additions & 0 deletions src/main/java/com/zenika/oracle/amqp/xml.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package com.zenika.oracle.amqp;

import java.io.IOException;
import java.io.StringReader;
import java.util.HashMap;
import java.util.Map;

import javax.xml.parsers.DocumentBuilder;
import javax.xml.parsers.DocumentBuilderFactory;
import javax.xml.parsers.ParserConfigurationException;

import org.w3c.dom.Document;
import org.w3c.dom.Element;
import org.w3c.dom.NodeList;
import org.xml.sax.InputSource;
import org.xml.sax.SAXException;

import com.rabbitmq.client.AMQP.BasicProperties;

public class xml {
public static BasicProperties basicPropertiesFromXml(String XmlString)
{
//Map<String, String> properties = new HashMap<String,String>();
BasicProperties.Builder prop_builder = new BasicProperties.Builder();
try {
DocumentBuilder db = DocumentBuilderFactory.newInstance().newDocumentBuilder();
InputSource is = new InputSource();
is.setCharacterStream(new StringReader(XmlString));
try {
Document doc = db.parse(is);
NodeList nodes = doc.getFirstChild().getChildNodes();
for (int i = 0; i < nodes.getLength(); i++) {
Element element = (Element) nodes.item(i);
String nodeName = element.getNodeName().toUpperCase();

if (nodeName == "CONTENTTYPE") prop_builder = prop_builder.contentType (element.getTextContent());
else if(nodeName == "DELIVERYMODE") prop_builder = prop_builder.deliveryMode (Integer.parseInt(element.getTextContent()));
else if(nodeName == "PRIORITY") prop_builder = prop_builder.priority (Integer.parseInt(element.getTextContent()));
else if(nodeName == "MESSAGEID") prop_builder = prop_builder.messageId (element.getTextContent());
else if(nodeName == "APPID") prop_builder = prop_builder.appId (element.getTextContent());
else if(nodeName == "CLUSTERID") prop_builder = prop_builder.clusterId (element.getTextContent());
else if(nodeName == "CORRELATIONID") prop_builder = prop_builder.correlationId (element.getTextContent());
else if(nodeName == "EXPIRATION") prop_builder = prop_builder.expiration (element.getTextContent());
else if(nodeName == "REPLYTO") prop_builder = prop_builder.replyTo (element.getTextContent());
//else if(nodeName == "TIMESTAMP") prop_builder = prop_builder.timestamp (element.getTextContent());
else if(nodeName == "TYPE") prop_builder = prop_builder.type (element.getTextContent());
else if(nodeName == "HEADERS"){
NodeList header_nodes = element.getChildNodes();
Map<String, Object> headers = new HashMap<String,Object>();
for (int j = 0; j < header_nodes.getLength(); j++) {
Element header = (Element) header_nodes.item(j);
headers.put(header.getNodeName(), header.getTextContent());
}
prop_builder = prop_builder.headers(headers);
}
}
} catch (SAXException e) {
} catch (IOException e) {}

} catch (ParserConfigurationException e) {
}
return prop_builder.build();
}
}
4 changes: 3 additions & 1 deletion src/test/java/com/zenika/oracle/amqp/TestRabbitMQDriver.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,10 @@ public static void amqpPublish(int brokerId, String exchange, String routingKey,
System.out.println("SimpleRabbitMQ.amqpPublish() #1");
try {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.56.1");
connectionFactory.setHost("172.21.10.15");
connectionFactory.setPort(5672);
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");

System.out.println("SimpleRabbitMQ.amqpPublish() #2");
Connection connection = connectionFactory.newConnection();
Expand Down
Loading