Skip to content

Commit 1014a24

Browse files
author
sasa
committed
Added C++ code examplesfor all tutorials
1 parent fda508e commit 1014a24

15 files changed

+695
-0
lines changed

cpp/CMakeLists.txt

+40
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
set(CMAKE_CXX_STANDARD 11)
2+
3+
add_executable(send send.cpp)
4+
target_link_libraries(send rabbitmq)
5+
6+
add_executable(receive receive.cpp)
7+
target_link_libraries(receive rabbitmq)
8+
9+
add_executable(new_task new_task.cpp)
10+
target_link_libraries(new_task rabbitmq)
11+
12+
add_executable(worker worker.cpp)
13+
target_link_libraries(worker rabbitmq)
14+
15+
add_executable(emit_log emit_log.cpp)
16+
target_link_libraries(emit_log rabbitmq)
17+
18+
add_executable(receive_logs receive_logs.cpp)
19+
target_link_libraries(receive_logs rabbitmq)
20+
21+
add_executable(emit_log_direct emit_log_direct.cpp)
22+
target_link_libraries(emit_log_direct rabbitmq)
23+
24+
add_executable(receive_logs_direct receive_logs_direct.cpp)
25+
target_link_libraries(receive_logs_direct rabbitmq)
26+
27+
add_executable(emit_log_topic emit_log_topic.cpp)
28+
target_link_libraries(emit_log_topic rabbitmq)
29+
30+
add_executable(receive_logs_topic receive_logs_topic.cpp)
31+
target_link_libraries(receive_logs_topic rabbitmq)
32+
33+
add_executable(rpc_server rpc_server.cpp)
34+
target_link_libraries(rpc_server rabbitmq)
35+
36+
add_executable(rpc_client rpc_client.cpp)
37+
target_link_libraries(rpc_client rabbitmq)
38+
39+
add_executable(publisher_confirms publisher_confirms.cpp)
40+
target_link_libraries(publisher_confirms rabbitmq)

cpp/README.md

+11
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
# C++ code for RabbitMQ tutorials
2+
3+
Here you can find the C++ code examples from [RabbitMQ
4+
tutorials](https://www.rabbitmq.com/getstarted.html).
5+
6+
To successfully use the examples you will need a RabbitMQ node running locally.
7+
8+
## Requirements
9+
Examples use [rabbitmq-c library](https://github.com/alanxz/rabbitmq-c) that must be installed first.
10+
You can build the examples via CMakeFile.txt.
11+
Examples are tested on CentOS 7, RabbitMQ 3.10.0, Erlang 23.3.1 and built with Visual Studio Code 1.73.1

cpp/emit_log.cpp

+37
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
#include <string.h>
2+
#include <iostream>
3+
#include <sstream>
4+
#include <iterator>
5+
6+
#include <amqp.h>
7+
#include <amqp_tcp_socket.h>
8+
9+
int main(int argc, char const *const *argv)
10+
{
11+
amqp_connection_state_t conn = amqp_new_connection();
12+
amqp_socket_t *socket = amqp_tcp_socket_new(conn);
13+
amqp_socket_open(socket, "localhost", AMQP_PROTOCOL_PORT);
14+
amqp_login(conn, "/", 0, AMQP_DEFAULT_FRAME_SIZE, 0, AMQP_SASL_METHOD_PLAIN, "guest", "guest");
15+
const amqp_channel_t KChannel = 1;
16+
amqp_channel_open(conn, KChannel);
17+
18+
amqp_bytes_t exchangeName(amqp_cstring_bytes("logs"));
19+
amqp_exchange_declare(conn, KChannel, exchangeName, amqp_cstring_bytes("fanout"),
20+
false, false, false, false, amqp_empty_table);
21+
22+
std::string message("info: Hello World!");
23+
if (argc > 1)
24+
{
25+
std::stringstream s;
26+
copy(&argv[1], &argv[argc], std::ostream_iterator<const char*>(s, " "));
27+
message = s.str();
28+
}
29+
30+
amqp_basic_publish(conn, KChannel, exchangeName, amqp_empty_bytes, false, false, nullptr, amqp_cstring_bytes(message.c_str()));
31+
std::cout << " [x] Sent " << message << std::endl;
32+
33+
amqp_channel_close(conn, KChannel, AMQP_REPLY_SUCCESS);
34+
amqp_connection_close(conn, AMQP_REPLY_SUCCESS);
35+
amqp_destroy_connection(conn);
36+
return 0;
37+
}

cpp/emit_log_direct.cpp

+38
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
#include <string.h>
2+
#include <iostream>
3+
#include <sstream>
4+
#include <iterator>
5+
6+
#include <amqp.h>
7+
#include <amqp_tcp_socket.h>
8+
9+
int main(int argc, char const *const *argv)
10+
{
11+
amqp_connection_state_t conn = amqp_new_connection();
12+
amqp_socket_t *socket = amqp_tcp_socket_new(conn);
13+
amqp_socket_open(socket, "localhost", AMQP_PROTOCOL_PORT);
14+
amqp_login(conn, "/", 0, AMQP_DEFAULT_FRAME_SIZE, 0, AMQP_SASL_METHOD_PLAIN, "guest", "guest");
15+
const amqp_channel_t KChannel = 1;
16+
amqp_channel_open(conn, KChannel);
17+
18+
amqp_bytes_t exchangeName(amqp_cstring_bytes("direct_logs"));
19+
amqp_exchange_declare(conn, KChannel, exchangeName, amqp_cstring_bytes("direct"),
20+
false, false, false, false, amqp_empty_table);
21+
22+
std::string severity = argc > 1 ? argv[1] : "info";
23+
std::string message(" Hello World!");
24+
if (argc > 2)
25+
{
26+
std::stringstream s;
27+
copy(&argv[2], &argv[argc], std::ostream_iterator<const char*>(s, " "));
28+
message = s.str();
29+
}
30+
31+
amqp_basic_publish(conn, KChannel, exchangeName, amqp_cstring_bytes(severity.c_str()), false, false, nullptr, amqp_cstring_bytes(message.c_str()));
32+
std::cout << " [x] Sent " << severity << ":" << message << std::endl;
33+
34+
amqp_channel_close(conn, KChannel, AMQP_REPLY_SUCCESS);
35+
amqp_connection_close(conn, AMQP_REPLY_SUCCESS);
36+
amqp_destroy_connection(conn);
37+
return 0;
38+
}

cpp/emit_log_topic.cpp

+38
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
#include <string.h>
2+
#include <iostream>
3+
#include <sstream>
4+
#include <iterator>
5+
6+
#include <amqp.h>
7+
#include <amqp_tcp_socket.h>
8+
9+
int main(int argc, char const *const *argv)
10+
{
11+
amqp_connection_state_t conn = amqp_new_connection();
12+
amqp_socket_t *socket = amqp_tcp_socket_new(conn);
13+
amqp_socket_open(socket, "localhost", AMQP_PROTOCOL_PORT);
14+
amqp_login(conn, "/", 0, AMQP_DEFAULT_FRAME_SIZE, 0, AMQP_SASL_METHOD_PLAIN, "guest", "guest");
15+
const amqp_channel_t KChannel = 1;
16+
amqp_channel_open(conn, KChannel);
17+
18+
amqp_bytes_t exchangeName(amqp_cstring_bytes("topic_logs"));
19+
amqp_exchange_declare(conn, KChannel, exchangeName, amqp_cstring_bytes("topic"),
20+
false, false, false, false, amqp_empty_table);
21+
22+
std::string routing_key = argc > 2 ? argv[1] : "anonymous.info";
23+
std::string message(" Hello World!");
24+
if (argc > 2)
25+
{
26+
std::stringstream s;
27+
copy(&argv[2], &argv[argc], std::ostream_iterator<const char*>(s, " "));
28+
message = s.str();
29+
}
30+
31+
amqp_basic_publish(conn, KChannel, exchangeName, amqp_cstring_bytes(routing_key.c_str()), false, false, nullptr, amqp_cstring_bytes(message.c_str()));
32+
std::cout << " [x] Sent " << routing_key << ":" << message << std::endl;
33+
34+
amqp_channel_close(conn, KChannel, AMQP_REPLY_SUCCESS);
35+
amqp_connection_close(conn, AMQP_REPLY_SUCCESS);
36+
amqp_destroy_connection(conn);
37+
return 0;
38+
}

cpp/new_task.cpp

+40
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
#include <string.h>
2+
#include <iostream>
3+
#include <sstream>
4+
#include <iterator>
5+
6+
#include <amqp.h>
7+
#include <amqp_tcp_socket.h>
8+
9+
int main(int argc, char const *const *argv)
10+
{
11+
amqp_connection_state_t conn = amqp_new_connection();
12+
amqp_socket_t *socket = amqp_tcp_socket_new(conn);
13+
amqp_socket_open(socket, "localhost", AMQP_PROTOCOL_PORT);
14+
amqp_login(conn, "/", 0, AMQP_DEFAULT_FRAME_SIZE, 0, AMQP_SASL_METHOD_PLAIN, "guest", "guest");
15+
const amqp_channel_t KChannel = 1;
16+
amqp_channel_open(conn, KChannel);
17+
18+
amqp_bytes_t queueName(amqp_cstring_bytes("task_queue"));
19+
amqp_queue_declare(conn, KChannel, queueName, false, /*durable*/ true, false, true, amqp_empty_table);
20+
21+
std::string message("Hello World!");
22+
if (argc > 1)
23+
{
24+
std::stringstream s;
25+
copy(&argv[1], &argv[argc], std::ostream_iterator<const char*>(s, " "));
26+
message = s.str();
27+
}
28+
29+
amqp_basic_properties_t props;
30+
props._flags = AMQP_BASIC_DELIVERY_MODE_FLAG;
31+
props.delivery_mode = AMQP_DELIVERY_PERSISTENT;
32+
33+
amqp_basic_publish(conn, KChannel, amqp_empty_bytes, /* routing key*/ queueName, false, false, &props, amqp_cstring_bytes(message.c_str()));
34+
std::cout << " [x] Sent " << message << std::endl;
35+
36+
amqp_channel_close(conn, KChannel, AMQP_REPLY_SUCCESS);
37+
amqp_connection_close(conn, AMQP_REPLY_SUCCESS);
38+
amqp_destroy_connection(conn);
39+
return 0;
40+
}

cpp/publisher_confirms.cpp

+52
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
#include <string.h>
2+
#include <iostream>
3+
4+
#include <amqp.h>
5+
#include <amqp_tcp_socket.h>
6+
7+
int main(int argc, char const *const *argv)
8+
{
9+
amqp_connection_state_t conn = amqp_new_connection();
10+
amqp_socket_t *socket = amqp_tcp_socket_new(conn);
11+
amqp_socket_open(socket, "localhost", AMQP_PROTOCOL_PORT);
12+
amqp_login(conn, "/", 0, AMQP_DEFAULT_FRAME_SIZE, 0, AMQP_SASL_METHOD_PLAIN, "guest", "guest");
13+
const amqp_channel_t KChannel = 1;
14+
amqp_channel_open(conn, KChannel);
15+
16+
amqp_bytes_t queueName(amqp_cstring_bytes("hello"));
17+
amqp_queue_declare(conn, KChannel, queueName, false, false, false, false, amqp_empty_table);
18+
19+
amqp_confirm_select(conn, KChannel);
20+
amqp_basic_publish(conn, KChannel, amqp_empty_bytes, /* routing key*/ queueName, false, false, nullptr, amqp_cstring_bytes("Hello World!"));
21+
amqp_basic_publish(conn, KChannel, amqp_empty_bytes, /* routing key*/ queueName, false, false, nullptr, amqp_cstring_bytes("Hello World!"));
22+
23+
amqp_frame_t frame;
24+
amqp_simple_wait_frame(conn, &frame);
25+
if (frame.channel == KChannel)
26+
{
27+
if (frame.payload.method.id == AMQP_BASIC_ACK_METHOD)
28+
{
29+
amqp_basic_ack_t *ack = (amqp_basic_ack_t *)frame.payload.method.decoded;
30+
if (ack->multiple)
31+
std::cout << "Sucessfully sent messages up to delivery tag: " << ack->delivery_tag << std::endl;
32+
else
33+
std::cout << "Sucessfully sent message with delivery tag: " << ack->delivery_tag << std::endl;
34+
}
35+
else if (frame.payload.method.id == AMQP_BASIC_RETURN_METHOD)
36+
{
37+
// message wasn't routed to a queue, but returned
38+
amqp_message_t returned_message;
39+
amqp_read_message(conn, 1, &returned_message, 0);
40+
amqp_destroy_message(&returned_message);
41+
42+
amqp_simple_wait_frame(conn, &frame);
43+
if (frame.payload.method.id == AMQP_BASIC_ACK_METHOD)
44+
std::cout << "Message returned" << std::endl;
45+
}
46+
}
47+
48+
amqp_channel_close(conn, KChannel, AMQP_REPLY_SUCCESS);
49+
amqp_connection_close(conn, AMQP_REPLY_SUCCESS);
50+
amqp_destroy_connection(conn);
51+
return 0;
52+
}

cpp/receive.cpp

+41
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
#include <iostream>
2+
#include <string.h>
3+
4+
#include <amqp.h>
5+
#include <amqp_tcp_socket.h>
6+
7+
8+
int main(int argc, char const *const *argv)
9+
{
10+
11+
amqp_connection_state_t conn = amqp_new_connection();
12+
13+
amqp_socket_t *socket = amqp_tcp_socket_new(conn);
14+
15+
amqp_socket_open(socket, "localhost", AMQP_PROTOCOL_PORT);
16+
17+
amqp_login(conn, "/", 0, AMQP_DEFAULT_FRAME_SIZE, 0, AMQP_SASL_METHOD_PLAIN, "guest", "guest");
18+
const amqp_channel_t KChannel = 1;
19+
amqp_channel_open(conn, KChannel);
20+
21+
amqp_bytes_t queueName(amqp_cstring_bytes("hello"));
22+
amqp_queue_declare(conn, KChannel, queueName, false, false, false, false, amqp_empty_table);
23+
24+
amqp_basic_consume(conn, KChannel, queueName, amqp_empty_bytes, false, /* auto ack*/true, false, amqp_empty_table);
25+
26+
for (;;)
27+
{
28+
amqp_maybe_release_buffers(conn);
29+
amqp_envelope_t envelope;
30+
amqp_consume_message(conn, &envelope, nullptr, 0);
31+
32+
std::cout << " [x] Received " << std::string((char *)envelope.message.body.bytes,(int)envelope.message.body.len) << std::endl;
33+
amqp_destroy_envelope(&envelope);
34+
}
35+
36+
amqp_channel_close(conn, KChannel, AMQP_REPLY_SUCCESS);
37+
amqp_connection_close(conn, AMQP_REPLY_SUCCESS);
38+
amqp_destroy_connection(conn);
39+
40+
return 0;
41+
}

cpp/receive_logs.cpp

+52
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
#include <iostream>
2+
#include <string.h>
3+
#include <algorithm>
4+
#include <thread>
5+
#include <chrono>
6+
7+
#include <amqp.h>
8+
#include <amqp_tcp_socket.h>
9+
10+
11+
int main(int argc, char const *const *argv)
12+
{
13+
14+
amqp_connection_state_t conn = amqp_new_connection();
15+
amqp_socket_t *socket = amqp_tcp_socket_new(conn);
16+
amqp_socket_open(socket, "localhost", AMQP_PROTOCOL_PORT);
17+
18+
amqp_login(conn, "/", 0, AMQP_DEFAULT_FRAME_SIZE, 0, AMQP_SASL_METHOD_PLAIN, "guest", "guest");
19+
const amqp_channel_t KChannel = 1;
20+
amqp_channel_open(conn, KChannel);
21+
22+
amqp_bytes_t exchangeName(amqp_cstring_bytes("logs"));
23+
amqp_exchange_declare(conn, KChannel, exchangeName, amqp_cstring_bytes("fanout"),
24+
false, false, false, false, amqp_empty_table);
25+
26+
amqp_queue_declare_ok_t *r = amqp_queue_declare(conn, KChannel, amqp_empty_bytes, false, false, true, false, amqp_empty_table);
27+
amqp_bytes_t queueName = amqp_bytes_malloc_dup(r->queue);
28+
29+
amqp_queue_bind(conn, KChannel, queueName, exchangeName, amqp_empty_bytes, amqp_empty_table);
30+
31+
std::cout << "[*] Waiting for logs. To exit press CTRL+C'" << std::endl;
32+
amqp_basic_consume(conn, KChannel, queueName, amqp_empty_bytes, false, /* auto ack*/true, false, amqp_empty_table);
33+
34+
for (;;)
35+
{
36+
amqp_maybe_release_buffers(conn);
37+
amqp_envelope_t envelope;
38+
amqp_consume_message(conn, &envelope, nullptr, 0);
39+
40+
std::string message((char *)envelope.message.body.bytes,(int)envelope.message.body.len);
41+
std::cout << " [x] Received " << message << std::endl;
42+
43+
amqp_destroy_envelope(&envelope);
44+
}
45+
46+
amqp_bytes_free(queueName);
47+
amqp_channel_close(conn, KChannel, AMQP_REPLY_SUCCESS);
48+
amqp_connection_close(conn, AMQP_REPLY_SUCCESS);
49+
amqp_destroy_connection(conn);
50+
51+
return 0;
52+
}

0 commit comments

Comments
 (0)