From 2c5ac83d54571a9617dc0276a1169497b9fc083d Mon Sep 17 00:00:00 2001 From: Brizjin Ivan Date: Tue, 1 Sep 2015 10:30:19 +0300 Subject: [PATCH 1/4] add properties as XML string --- sql/definitions.sql | 4 +- sql/tests.sql | 11 ++++ .../zenika/oracle/amqp/RabbitMQPublisher.java | 11 ++-- src/main/java/com/zenika/oracle/amqp/xml.java | 56 +++++++++++++++++++ 4 files changed, 73 insertions(+), 9 deletions(-) create mode 100644 src/main/java/com/zenika/oracle/amqp/xml.java diff --git a/sql/definitions.sql b/sql/definitions.sql index bbd8f40..b85385f 100644 --- a/sql/definitions.sql +++ b/sql/definitions.sql @@ -6,10 +6,10 @@ as language java name 'com.zenika.oracle.amqp.RabbitMQPublisher.amqpExchangeDeclare(int, java.lang.String, java.lang.String) return int'; create or replace function amqp_publish -(brokerId IN number, exchange IN varchar2, routingKey IN varchar2, message IN varchar2) +(brokerId IN number, exchange IN varchar2, routingKey IN varchar2, message IN varchar2, xml_string_properties IN varchar2) return NUMBER as language java -name 'com.zenika.oracle.amqp.RabbitMQPublisher.amqpPublish(int, java.lang.String, java.lang.String, java.lang.String) return int'; +name 'com.zenika.oracle.amqp.RabbitMQPublisher.amqpPublish(int, java.lang.String, java.lang.String, java.lang.String, java.lang.String) return int'; create or replace procedure amqp_print_configuration (brokerId IN number) diff --git a/sql/tests.sql b/sql/tests.sql index b9a8a29..16dbdb3 100644 --- a/sql/tests.sql +++ b/sql/tests.sql @@ -10,5 +10,16 @@ insert into broker (BROKER_ID,HOST,PORT,VHOST,USERNAME,PASSWORD) values (1,'192. -- tests select amqp_exchange_declare(1, 'oracle', 'fanout') from dual; select amqp_publish(1, 'oracle', 'key', 'Hello World!') from dual; +select amqp_publish(1, 'oracle', 'key', 'Hello World!' + ,' + 2 + application/vnd.masstransit+json + '||regexp_replace(rawtohex(sys_guid()),'([A-F0-9]{8})([A-F0-9]{4})([A-F0-9]{4})([A-F0-9]{4})([A-F0-9]{12})', '\1-\2-\3-\4-\5') ||' + ') from dual; +select amqp_publish(brokerid => 1, + exchange => 'oracle', + routingkey => 'key', + message => 'Hello World!', + xml_string_properties => '') from dual; call amqp_print_configuration(1); call amqp_probe_servers(1); diff --git a/src/main/java/com/zenika/oracle/amqp/RabbitMQPublisher.java b/src/main/java/com/zenika/oracle/amqp/RabbitMQPublisher.java index 59de8a3..e7259c8 100644 --- a/src/main/java/com/zenika/oracle/amqp/RabbitMQPublisher.java +++ b/src/main/java/com/zenika/oracle/amqp/RabbitMQPublisher.java @@ -113,12 +113,8 @@ public static int amqpExchangeDeclare(int brokerId, String exchange, String type public static int amqpPublish(int brokerId, String exchange, String routingKey, String message) { return amqpPublish(brokerId, exchange, routingKey, message, null); } - - /** - * FIXME timeout intelligently. FIXME test whether we can declare a type conversion for a Map. - */ - public static int amqpPublish(int brokerId, String exchange, String routingKey, String message, - Map properties) { + + public static int amqpPublish(int brokerId, String exchange, String routingKey, String message, String xml_string_properties) { Connection connection = null; Channel channel = null; @@ -128,7 +124,8 @@ public static int amqpPublish(int brokerId, String exchange, String routingKey, channel = connection.createChannel(); // send the message - channel.basicPublish(exchange, routingKey, false, false, null, message.getBytes()); + + channel.basicPublish(exchange, routingKey, false, false, xml.getMapFromXml(xml_string_properties), message.getBytes()); // remember the current broker used state.put(brokerId, connectionState.currentAddress); diff --git a/src/main/java/com/zenika/oracle/amqp/xml.java b/src/main/java/com/zenika/oracle/amqp/xml.java new file mode 100644 index 0000000..b7365b0 --- /dev/null +++ b/src/main/java/com/zenika/oracle/amqp/xml.java @@ -0,0 +1,56 @@ +package com.zenika.oracle.amqp; + +import java.io.IOException; +import java.io.StringReader; +import java.util.HashMap; +import java.util.Map; + +import javax.xml.parsers.DocumentBuilder; +import javax.xml.parsers.DocumentBuilderFactory; +import javax.xml.parsers.ParserConfigurationException; + +import org.w3c.dom.Document; +import org.w3c.dom.Element; +import org.w3c.dom.NodeList; +import org.xml.sax.InputSource; +import org.xml.sax.SAXException; + +import com.rabbitmq.client.AMQP.BasicProperties; + +public class xml { + public static BasicProperties getMapFromXml(String XmlString) + { + //Map properties = new HashMap(); + BasicProperties.Builder prop_builder = new BasicProperties.Builder(); + try { + DocumentBuilder db = DocumentBuilderFactory.newInstance().newDocumentBuilder(); + InputSource is = new InputSource(); + is.setCharacterStream(new StringReader(XmlString)); + try { + Document doc = db.parse(is); + NodeList nodes = doc.getFirstChild().getChildNodes(); + for (int i = 0; i < nodes.getLength(); i++) { + Element element = (Element) nodes.item(i); + + if (element.getNodeName() == "CONTENTTYPE") prop_builder = prop_builder.contentType (element.getTextContent()); + else if(element.getNodeName() == "DELIVERYMODE") prop_builder = prop_builder.deliveryMode(Integer.parseInt(element.getTextContent())); + else if(element.getNodeName() == "PRIORITY") prop_builder = prop_builder.priority (Integer.parseInt(element.getTextContent())); + else if(element.getNodeName() == "MESSAGEID") prop_builder = prop_builder.messageId (element.getTextContent()); + else if(element.getNodeName() == "HEADERS"){ + NodeList header_nodes = element.getChildNodes(); + Map headers = new HashMap(); + for (int j = 0; j < header_nodes.getLength(); j++) { + Element header = (Element) header_nodes.item(j); + headers.put(header.getNodeName(), header.getTextContent()); + } + prop_builder = prop_builder.headers(headers); + } + } + } catch (SAXException e) { + } catch (IOException e) {} + + } catch (ParserConfigurationException e) { + } + return prop_builder.build(); + } +} From 686b0b3ed976c69bea681741713da776f5054ba4 Mon Sep 17 00:00:00 2001 From: Brizjin Ivan Date: Thu, 3 Sep 2015 14:30:40 +0300 Subject: [PATCH 2/4] exchange declaration with duarability param --- .classpath | 17 ++++++ .gitignore | 1 + .project | 17 ++++++ sql/definitions.sql | 6 +- .../zenika/oracle/amqp/RabbitMQPublisher.java | 19 ++++--- .../oracle/amqp/TestRabbitMQDriver.java | 4 +- .../oracle/amqp/TestRabbitMQDriverTest.java | 57 +++++++++++++++++-- 7 files changed, 105 insertions(+), 16 deletions(-) create mode 100644 .classpath create mode 100644 .project diff --git a/.classpath b/.classpath new file mode 100644 index 0000000..bc8370c --- /dev/null +++ b/.classpath @@ -0,0 +1,17 @@ + + + + + + + + + + + + + + + + + diff --git a/.gitignore b/.gitignore index 2f7896d..ffff5ff 100644 --- a/.gitignore +++ b/.gitignore @@ -1 +1,2 @@ target/ +/bin/ diff --git a/.project b/.project new file mode 100644 index 0000000..8bf9fc0 --- /dev/null +++ b/.project @@ -0,0 +1,17 @@ + + + rabbitmq-oracle-stored-procedures + + + + + + org.eclipse.jdt.core.javabuilder + + + + + + org.eclipse.jdt.core.javanature + + diff --git a/sql/definitions.sql b/sql/definitions.sql index b85385f..bc4503a 100644 --- a/sql/definitions.sql +++ b/sql/definitions.sql @@ -1,12 +1,12 @@ -- FIXME use a schema named 'amqp' create or replace function amqp_exchange_declare -(brokerId IN number, exchange IN varchar2, exchange_type IN varchar2) +(brokerId IN number, exchange IN varchar2, exchange_type IN varchar2,is_durable IN boolean default false) return NUMBER as language java -name 'com.zenika.oracle.amqp.RabbitMQPublisher.amqpExchangeDeclare(int, java.lang.String, java.lang.String) return int'; +name 'com.zenika.oracle.amqp.RabbitMQPublisher.amqpExchangeDeclare(int, java.lang.String, java.lang.String, java.lang.Boolean) return int'; create or replace function amqp_publish -(brokerId IN number, exchange IN varchar2, routingKey IN varchar2, message IN varchar2, xml_string_properties IN varchar2) +(brokerId IN number, exchange IN varchar2, routingKey IN varchar2, message IN varchar2, xml_string_properties IN varchar2 default null) return NUMBER as language java name 'com.zenika.oracle.amqp.RabbitMQPublisher.amqpPublish(int, java.lang.String, java.lang.String, java.lang.String, java.lang.String) return int'; diff --git a/src/main/java/com/zenika/oracle/amqp/RabbitMQPublisher.java b/src/main/java/com/zenika/oracle/amqp/RabbitMQPublisher.java index e7259c8..36e237d 100644 --- a/src/main/java/com/zenika/oracle/amqp/RabbitMQPublisher.java +++ b/src/main/java/com/zenika/oracle/amqp/RabbitMQPublisher.java @@ -10,7 +10,8 @@ import java.util.Hashtable; import java.util.Iterator; import java.util.List; -import java.util.Map; +//import java.util.Map; +import java.util.concurrent.TimeoutException; import com.rabbitmq.client.Address; import com.rabbitmq.client.Channel; @@ -87,7 +88,7 @@ public static int amqpExchangeDeclare(int brokerId, String exchange, String type connection.close(CONNECTION_CLOSE_TIMEOUT); } - } catch (IOException e) { + } catch (IOException | TimeoutException e) { e.printStackTrace(); return E_CANNOT_CLOSE; } @@ -124,8 +125,10 @@ public static int amqpPublish(int brokerId, String exchange, String routingKey, channel = connection.createChannel(); // send the message - - channel.basicPublish(exchange, routingKey, false, false, xml.getMapFromXml(xml_string_properties), message.getBytes()); + if(xml_string_properties == null) + channel.basicPublish(exchange, routingKey, false, false, null, message.getBytes()); + else + channel.basicPublish(exchange, routingKey, false, false, xml.getMapFromXml(xml_string_properties), message.getBytes()); // remember the current broker used state.put(brokerId, connectionState.currentAddress); @@ -144,7 +147,7 @@ public static int amqpPublish(int brokerId, String exchange, String routingKey, connection.close(CONNECTION_CLOSE_TIMEOUT); } - } catch (IOException e) { + } catch (IOException | TimeoutException e) { e.printStackTrace(); return E_CANNOT_CLOSE; } @@ -216,7 +219,7 @@ public static void amqpProbeAllServers(int brokerId) { currConnection = openConnection(currFullAddress); System.out.println(currFullAddress + " : SUCCESSFUL"); - } catch (IOException ioe) { + } catch (IOException | TimeoutException ioe) { System.out.println(currFullAddress + " : FAILED (" + ioe.getMessage() + ')'); } finally { @@ -313,7 +316,7 @@ private static Connection openConnection(BrokerConnectionState connectionState) System.err.println("connected to " + currFullAddress); } - } catch (IOException ioe) { + } catch (IOException | TimeoutException ioe) { // we catch SocketTimeoutException if (ENABLE_DEBUG) { System.err.println("cannot connect to " + currFullAddress + " (" + ioe.getMessage() + ')'); @@ -331,7 +334,7 @@ private static Connection openConnection(BrokerConnectionState connectionState) return connection; } - private static Connection openConnection(FullAddress address) throws IOException { + private static Connection openConnection(FullAddress address) throws IOException, TimeoutException { Connection connection = null; if (ENABLE_DEBUG) { diff --git a/src/test/java/com/zenika/oracle/amqp/TestRabbitMQDriver.java b/src/test/java/com/zenika/oracle/amqp/TestRabbitMQDriver.java index 1de749a..ccefe0e 100644 --- a/src/test/java/com/zenika/oracle/amqp/TestRabbitMQDriver.java +++ b/src/test/java/com/zenika/oracle/amqp/TestRabbitMQDriver.java @@ -41,8 +41,10 @@ public static void amqpPublish(int brokerId, String exchange, String routingKey, System.out.println("SimpleRabbitMQ.amqpPublish() #1"); try { ConnectionFactory connectionFactory = new ConnectionFactory(); - connectionFactory.setHost("192.168.56.1"); + connectionFactory.setHost("172.21.10.15"); connectionFactory.setPort(5672); + connectionFactory.setUsername("guest"); + connectionFactory.setPassword("guest"); System.out.println("SimpleRabbitMQ.amqpPublish() #2"); Connection connection = connectionFactory.newConnection(); diff --git a/src/test/java/com/zenika/oracle/amqp/TestRabbitMQDriverTest.java b/src/test/java/com/zenika/oracle/amqp/TestRabbitMQDriverTest.java index 02ad940..e82c440 100644 --- a/src/test/java/com/zenika/oracle/amqp/TestRabbitMQDriverTest.java +++ b/src/test/java/com/zenika/oracle/amqp/TestRabbitMQDriverTest.java @@ -1,18 +1,67 @@ package com.zenika.oracle.amqp; -import org.junit.Ignore; +import static org.junit.Assert.*; + +import java.io.IOException; +import java.util.concurrent.TimeoutException; + +import org.junit.AfterClass; +//import org.junit.Before; +import org.junit.BeforeClass; +//import org.junit.Ignore; import org.junit.Test; +//import com.rabbitmq.client.AMQP.Exchange.DeclareOk; +import com.rabbitmq.client.Channel; +import com.rabbitmq.client.Connection; +import com.rabbitmq.client.ConnectionFactory; + /** * * @author Pierre Queinnec */ -@Ignore +//@Ignore public class TestRabbitMQDriverTest { + + static Connection connection; + + @BeforeClass + public static void allTestsStarted() throws IOException, TimeoutException { + + ConnectionFactory connectionFactory = new ConnectionFactory(); + connectionFactory.setHost("172.21.10.15"); + connectionFactory.setPort(5672); + connectionFactory.setUsername("guest"); + connectionFactory.setPassword("guest"); + + connection = connectionFactory.newConnection(); + System.out.println("All tests started"); + } + + @AfterClass + public static void allTestsFineshed() throws IOException, TimeoutException { + connection.close(); + } + + + @Test + public void testConnectionIsOpen() throws IOException, TimeoutException { + assertEquals(connection.isOpen(),true); + } + + @Test + public void testAmqpExchangeDeclare() throws IOException { + Channel channel = connection.createChannel(); + channel.exchangeDeclare("oracle2", "fanout"); + boolean b = true; + channel.exchangeDeclare("oracle3", "fanout",b); + channel.exchangeDeclare("oracle4", "fanout",false); + } @Test - public void testAmqpPublishIntStringStringString() { - TestRabbitMQDriver.amqpPublish(1, "oracle", "test.unit", "Hello World!"); + public void testAmqpPublishIntStringStringString() { + TestRabbitMQDriver.amqpPublish(1, "oracle2", "test.unit", "Hello World!"); + } } From bd164732abf7ee225317f7cadd99a7ed75a32ecc Mon Sep 17 00:00:00 2001 From: Brizjin Ivan Date: Thu, 3 Sep 2015 18:35:31 +0300 Subject: [PATCH 3/4] jre 1.5 and amqp client 3.5.4 --- .settings/org.eclipse.jdt.core.prefs | 12 +++++++ pom.xml | 2 +- sql/definitions.sql | 6 ++-- .../zenika/oracle/amqp/RabbitMQPublisher.java | 31 ++++++++++++++----- 4 files changed, 39 insertions(+), 12 deletions(-) create mode 100644 .settings/org.eclipse.jdt.core.prefs diff --git a/.settings/org.eclipse.jdt.core.prefs b/.settings/org.eclipse.jdt.core.prefs new file mode 100644 index 0000000..eb1cb4c --- /dev/null +++ b/.settings/org.eclipse.jdt.core.prefs @@ -0,0 +1,12 @@ +eclipse.preferences.version=1 +org.eclipse.jdt.core.compiler.codegen.inlineJsrBytecode=enabled +org.eclipse.jdt.core.compiler.codegen.methodParameters=do not generate +org.eclipse.jdt.core.compiler.codegen.targetPlatform=1.5 +org.eclipse.jdt.core.compiler.codegen.unusedLocal=preserve +org.eclipse.jdt.core.compiler.compliance=1.5 +org.eclipse.jdt.core.compiler.debug.lineNumber=generate +org.eclipse.jdt.core.compiler.debug.localVariable=generate +org.eclipse.jdt.core.compiler.debug.sourceFile=generate +org.eclipse.jdt.core.compiler.problem.assertIdentifier=error +org.eclipse.jdt.core.compiler.problem.enumIdentifier=error +org.eclipse.jdt.core.compiler.source=1.5 diff --git a/pom.xml b/pom.xml index 474e772..896351d 100644 --- a/pom.xml +++ b/pom.xml @@ -32,7 +32,7 @@ com.rabbitmq amqp-client - 2.8.1 + 3.5.4 Publish RabbitMQ AMQP notifications from Oracle DB diff --git a/sql/definitions.sql b/sql/definitions.sql index bc4503a..b784cc7 100644 --- a/sql/definitions.sql +++ b/sql/definitions.sql @@ -1,12 +1,12 @@ -- FIXME use a schema named 'amqp' create or replace function amqp_exchange_declare -(brokerId IN number, exchange IN varchar2, exchange_type IN varchar2,is_durable IN boolean default false) +(brokerId IN number, exchange IN varchar2, exchange_type IN varchar2, is_durable IN number) return NUMBER as language java -name 'com.zenika.oracle.amqp.RabbitMQPublisher.amqpExchangeDeclare(int, java.lang.String, java.lang.String, java.lang.Boolean) return int'; +name 'com.zenika.oracle.amqp.RabbitMQPublisher.amqpExchangeDeclare(int, java.lang.String, java.lang.String, int) return int'; create or replace function amqp_publish -(brokerId IN number, exchange IN varchar2, routingKey IN varchar2, message IN varchar2, xml_string_properties IN varchar2 default null) +(brokerId IN number, exchange IN varchar2, routingKey IN varchar2, message IN varchar2, xml_string_properties IN varchar2) return NUMBER as language java name 'com.zenika.oracle.amqp.RabbitMQPublisher.amqpPublish(int, java.lang.String, java.lang.String, java.lang.String, java.lang.String) return int'; diff --git a/src/main/java/com/zenika/oracle/amqp/RabbitMQPublisher.java b/src/main/java/com/zenika/oracle/amqp/RabbitMQPublisher.java index 36e237d..78a7e45 100644 --- a/src/main/java/com/zenika/oracle/amqp/RabbitMQPublisher.java +++ b/src/main/java/com/zenika/oracle/amqp/RabbitMQPublisher.java @@ -62,7 +62,7 @@ public class RabbitMQPublisher { * @see Channel#exchangeDeclare(String, String) * @return an error code, see the source */ - public static int amqpExchangeDeclare(int brokerId, String exchange, String type) { + public static int amqpExchangeDeclare(int brokerId, String exchange, String type, int durable) { // FIXME declare on all brokers for brokerId? Connection connection = null; Channel channel = null; @@ -72,7 +72,10 @@ public static int amqpExchangeDeclare(int brokerId, String exchange, String type channel = connection.createChannel(); // declare the exchange - channel.exchangeDeclare(exchange, type); + if(durable == 1) + channel.exchangeDeclare(exchange, type, true); + else + channel.exchangeDeclare(exchange, type); } catch (IOException ioe) { ioe.printStackTrace(); @@ -88,7 +91,10 @@ public static int amqpExchangeDeclare(int brokerId, String exchange, String type connection.close(CONNECTION_CLOSE_TIMEOUT); } - } catch (IOException | TimeoutException e) { + } catch (TimeoutException e) { + e.printStackTrace(); + return E_CANNOT_CLOSE; + } catch (IOException e) { e.printStackTrace(); return E_CANNOT_CLOSE; } @@ -147,7 +153,10 @@ public static int amqpPublish(int brokerId, String exchange, String routingKey, connection.close(CONNECTION_CLOSE_TIMEOUT); } - } catch (IOException | TimeoutException e) { + } catch (IOException e) { + e.printStackTrace(); + return E_CANNOT_CLOSE; + } catch (TimeoutException e) { e.printStackTrace(); return E_CANNOT_CLOSE; } @@ -218,10 +227,10 @@ public static void amqpProbeAllServers(int brokerId) { try { currConnection = openConnection(currFullAddress); System.out.println(currFullAddress + " : SUCCESSFUL"); - - } catch (IOException | TimeoutException ioe) { + } catch (IOException ioe) { + System.out.println(currFullAddress + " : FAILED (" + ioe.getMessage() + ')'); + } catch (TimeoutException ioe) { System.out.println(currFullAddress + " : FAILED (" + ioe.getMessage() + ')'); - } finally { if (currConnection != null) { try { @@ -316,12 +325,18 @@ private static Connection openConnection(BrokerConnectionState connectionState) System.err.println("connected to " + currFullAddress); } - } catch (IOException | TimeoutException ioe) { + } catch (TimeoutException ioe) { + // we catch SocketTimeoutException + if (ENABLE_DEBUG) { + System.err.println("cannot connect to " + currFullAddress + " (" + ioe.getMessage() + ')'); + } + } catch (IOException ioe) { // we catch SocketTimeoutException if (ENABLE_DEBUG) { System.err.println("cannot connect to " + currFullAddress + " (" + ioe.getMessage() + ')'); } } + } } From dd55e05ed0330d5893e00fde0b8d99b4f266e8bd Mon Sep 17 00:00:00 2001 From: Brizjin Ivan Date: Mon, 7 Sep 2015 14:32:42 +0300 Subject: [PATCH 4/4] make hashing address work add all properties test publish with properties --- sql/tests.sql | 7 ++ .../zenika/oracle/amqp/RabbitMQPublisher.java | 68 +++++++++++-------- src/main/java/com/zenika/oracle/amqp/xml.java | 20 ++++-- .../oracle/amqp/TestRabbitMQDriverTest.java | 57 ++++++++-------- 4 files changed, 92 insertions(+), 60 deletions(-) diff --git a/sql/tests.sql b/sql/tests.sql index 16dbdb3..6e17a2f 100644 --- a/sql/tests.sql +++ b/sql/tests.sql @@ -15,6 +15,13 @@ select amqp_publish(1, 'oracle', 'key', 'Hello World!' 2 application/vnd.masstransit+json '||regexp_replace(rawtohex(sys_guid()),'([A-F0-9]{8})([A-F0-9]{4})([A-F0-9]{4})([A-F0-9]{4})([A-F0-9]{12})', '\1-\2-\3-\4-\5') ||' + + application/vnd.masstransit+json + test + + XML + 10 + iso-8859-5 ') from dual; select amqp_publish(brokerid => 1, exchange => 'oracle', diff --git a/src/main/java/com/zenika/oracle/amqp/RabbitMQPublisher.java b/src/main/java/com/zenika/oracle/amqp/RabbitMQPublisher.java index 78a7e45..e180b91 100644 --- a/src/main/java/com/zenika/oracle/amqp/RabbitMQPublisher.java +++ b/src/main/java/com/zenika/oracle/amqp/RabbitMQPublisher.java @@ -104,29 +104,13 @@ public static int amqpExchangeDeclare(int brokerId, String exchange, String type return EXIT_SUCCESS; } - /** - * Publish an AMQP message to the given exchange. - * - * @param brokerId - * the ID of the broker in the configuration table - * @param exchange - * the name of the AMQP exchange - * @param routingKey - * the AMQP routing key - * @param message - * the payload - * @return an error code, see the source - */ - public static int amqpPublish(int brokerId, String exchange, String routingKey, String message) { - return amqpPublish(brokerId, exchange, routingKey, message, null); - } - public static int amqpPublish(int brokerId, String exchange, String routingKey, String message, String xml_string_properties) { + public static int amqpPublish(BrokerConnectionState connectionState, String exchange, String routingKey, String message, String xml_string_properties) { Connection connection = null; Channel channel = null; try { - BrokerConnectionState connectionState = getConnectionState(brokerId); + //BrokerConnectionState connectionState = getConnectionState(brokerId); connection = openConnection(connectionState); channel = connection.createChannel(); @@ -134,10 +118,7 @@ public static int amqpPublish(int brokerId, String exchange, String routingKey, if(xml_string_properties == null) channel.basicPublish(exchange, routingKey, false, false, null, message.getBytes()); else - channel.basicPublish(exchange, routingKey, false, false, xml.getMapFromXml(xml_string_properties), message.getBytes()); - - // remember the current broker used - state.put(brokerId, connectionState.currentAddress); + channel.basicPublish(exchange, routingKey, false, false, xml.basicPropertiesFromXml(xml_string_properties), message.getBytes()); } catch (IOException ioe) { ioe.printStackTrace(); @@ -165,6 +146,28 @@ public static int amqpPublish(int brokerId, String exchange, String routingKey, // everything went OK return EXIT_SUCCESS; } + + + /** + * Publish an AMQP message to the given exchange. + * + * @param brokerId + * the ID of the broker in the configuration table + * @param exchange + * the name of the AMQP exchange + * @param routingKey + * the AMQP routing key + * @param message + * the payload + * @return an error code, see the source + */ + public static int amqpPublish(int brokerId, String exchange, String routingKey, String message) { + return amqpPublish(brokerId, exchange, routingKey, message, null); + } + + public static int amqpPublish(int brokerId, String exchange, String routingKey, String message, String xml_string_properties) { + return amqpPublish(getConnectionState(brokerId), exchange, routingKey, message, xml_string_properties); + } /** * Print the current configuration for broker definitions. In case there's more than one broker per ID, the active @@ -245,13 +248,25 @@ public static void amqpProbeAllServers(int brokerId) { } } + public static BrokerConnectionState createConnectionState(String host,int port, String vhost, String username, String password){ + BrokerConnectionState connectionState = new BrokerConnectionState(); + connectionState.currentAddress = new FullAddress(new Address(host, port), vhost, username,password); + connectionState.addresses.add(connectionState.currentAddress); + return connectionState; + } + private static BrokerConnectionState getConnectionState(int brokerId) { BrokerConnectionState connectionState = new BrokerConnectionState(); fillAllAdresses(connectionState, brokerId); // fill in the previously used broker instance - connectionState.currentAddress = state.get(brokerId); - + FullAddress currAddress = state.get(brokerId); + if (currAddress == null) { + fillAllAdresses(connectionState, brokerId); + currAddress = state.get(brokerId); + state.put(brokerId, currAddress); + } + connectionState.currentAddress = currAddress; return connectionState; } @@ -274,8 +289,7 @@ private static void fillAllAdresses(BrokerConnectionState connectionState, int b String username = results.getString(4); String password = results.getString(5); - FullAddress currAddress = new FullAddress(new Address(host, port), vhost, username, - password); + FullAddress currAddress = new FullAddress(new Address(host, port), vhost, username, password); connectionState.addresses.add(currAddress); } @@ -375,7 +389,7 @@ private static Connection openConnection(FullAddress address) throws IOException return connection; } - private static class BrokerConnectionState { + protected static class BrokerConnectionState { public List addresses; public FullAddress currentAddress; diff --git a/src/main/java/com/zenika/oracle/amqp/xml.java b/src/main/java/com/zenika/oracle/amqp/xml.java index b7365b0..a4d4832 100644 --- a/src/main/java/com/zenika/oracle/amqp/xml.java +++ b/src/main/java/com/zenika/oracle/amqp/xml.java @@ -18,7 +18,7 @@ import com.rabbitmq.client.AMQP.BasicProperties; public class xml { - public static BasicProperties getMapFromXml(String XmlString) + public static BasicProperties basicPropertiesFromXml(String XmlString) { //Map properties = new HashMap(); BasicProperties.Builder prop_builder = new BasicProperties.Builder(); @@ -31,12 +31,20 @@ public static BasicProperties getMapFromXml(String XmlString) NodeList nodes = doc.getFirstChild().getChildNodes(); for (int i = 0; i < nodes.getLength(); i++) { Element element = (Element) nodes.item(i); + String nodeName = element.getNodeName().toUpperCase(); - if (element.getNodeName() == "CONTENTTYPE") prop_builder = prop_builder.contentType (element.getTextContent()); - else if(element.getNodeName() == "DELIVERYMODE") prop_builder = prop_builder.deliveryMode(Integer.parseInt(element.getTextContent())); - else if(element.getNodeName() == "PRIORITY") prop_builder = prop_builder.priority (Integer.parseInt(element.getTextContent())); - else if(element.getNodeName() == "MESSAGEID") prop_builder = prop_builder.messageId (element.getTextContent()); - else if(element.getNodeName() == "HEADERS"){ + if (nodeName == "CONTENTTYPE") prop_builder = prop_builder.contentType (element.getTextContent()); + else if(nodeName == "DELIVERYMODE") prop_builder = prop_builder.deliveryMode (Integer.parseInt(element.getTextContent())); + else if(nodeName == "PRIORITY") prop_builder = prop_builder.priority (Integer.parseInt(element.getTextContent())); + else if(nodeName == "MESSAGEID") prop_builder = prop_builder.messageId (element.getTextContent()); + else if(nodeName == "APPID") prop_builder = prop_builder.appId (element.getTextContent()); + else if(nodeName == "CLUSTERID") prop_builder = prop_builder.clusterId (element.getTextContent()); + else if(nodeName == "CORRELATIONID") prop_builder = prop_builder.correlationId (element.getTextContent()); + else if(nodeName == "EXPIRATION") prop_builder = prop_builder.expiration (element.getTextContent()); + else if(nodeName == "REPLYTO") prop_builder = prop_builder.replyTo (element.getTextContent()); + //else if(nodeName == "TIMESTAMP") prop_builder = prop_builder.timestamp (element.getTextContent()); + else if(nodeName == "TYPE") prop_builder = prop_builder.type (element.getTextContent()); + else if(nodeName == "HEADERS"){ NodeList header_nodes = element.getChildNodes(); Map headers = new HashMap(); for (int j = 0; j < header_nodes.getLength(); j++) { diff --git a/src/test/java/com/zenika/oracle/amqp/TestRabbitMQDriverTest.java b/src/test/java/com/zenika/oracle/amqp/TestRabbitMQDriverTest.java index e82c440..5dc19b5 100644 --- a/src/test/java/com/zenika/oracle/amqp/TestRabbitMQDriverTest.java +++ b/src/test/java/com/zenika/oracle/amqp/TestRabbitMQDriverTest.java @@ -1,6 +1,6 @@ package com.zenika.oracle.amqp; -import static org.junit.Assert.*; +//import static org.junit.Assert.*; import java.io.IOException; import java.util.concurrent.TimeoutException; @@ -11,10 +11,13 @@ //import org.junit.Ignore; import org.junit.Test; + //import com.rabbitmq.client.AMQP.Exchange.DeclareOk; -import com.rabbitmq.client.Channel; -import com.rabbitmq.client.Connection; -import com.rabbitmq.client.ConnectionFactory; +//import com.rabbitmq.client.Channel; +//import com.rabbitmq.client.Connection; +//import com.rabbitmq.client.ConnectionFactory; +import com.zenika.oracle.amqp.RabbitMQPublisher.BrokerConnectionState; + /** * @@ -23,44 +26,44 @@ //@Ignore public class TestRabbitMQDriverTest { - static Connection connection; + static BrokerConnectionState connectionState; @BeforeClass public static void allTestsStarted() throws IOException, TimeoutException { - - ConnectionFactory connectionFactory = new ConnectionFactory(); - connectionFactory.setHost("172.21.10.15"); - connectionFactory.setPort(5672); - connectionFactory.setUsername("guest"); - connectionFactory.setPassword("guest"); - - connection = connectionFactory.newConnection(); - System.out.println("All tests started"); + connectionState = RabbitMQPublisher.createConnectionState("172.21.10.15", 5672, "/", "guest", "guest"); } @AfterClass public static void allTestsFineshed() throws IOException, TimeoutException { - connection.close(); + } - @Test - public void testConnectionIsOpen() throws IOException, TimeoutException { - assertEquals(connection.isOpen(),true); + public void testAmqpPublishIntStringStringString() { + TestRabbitMQDriver.amqpPublish(1, "oracle2", "test.unit", "Hello World!"); + } @Test - public void testAmqpExchangeDeclare() throws IOException { - Channel channel = connection.createChannel(); - channel.exchangeDeclare("oracle2", "fanout"); - boolean b = true; - channel.exchangeDeclare("oracle3", "fanout",b); - channel.exchangeDeclare("oracle4", "fanout",false); + public void testRabbitPublisher() { + RabbitMQPublisher.amqpPublish(connectionState, "oracle5", "test.unit", "Hello World!", null); + } - + @Test - public void testAmqpPublishIntStringStringString() { - TestRabbitMQDriver.amqpPublish(1, "oracle2", "test.unit", "Hello World!"); + public void testRabbitPropPublisher() { + RabbitMQPublisher.amqpPublish(connectionState, "oracle5", "test.unit", "PROP" + ,"" + + "2" + + "" + + "application/vnd.masstransit+json" + + "test" + + "" + + "1" + + "XML" + + "10" + + "iso-8859-5" + + ""); }