Skip to content

Commit ed624cd

Browse files
committed
Make example compile
1 parent ea79f47 commit ed624cd

File tree

10 files changed

+165
-46
lines changed

10 files changed

+165
-46
lines changed

cpp/examples/CMakeLists.txt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,8 @@ foreach(example
6060
scheduled_send
6161
service_bus
6262
multithreaded_client
63-
multithreaded_client_flow_control)
63+
multithreaded_client_flow_control
64+
tx_send)
6465
add_executable(${example} ${example}.cpp)
6566
target_link_libraries(${example} Proton::cpp Threads::Threads)
6667
endforeach()

cpp/examples/tx_send.cpp

Lines changed: 30 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -44,8 +44,8 @@ class tx_send : public proton::messaging_handler, proton::transaction_handler {
4444
int committed = 0;
4545
int confirmed = 0;
4646
proton::container *container;
47-
proton::transaction_handler transaction_handler;
48-
proton::transaction *transaction;
47+
// proton::transaction_handler transaction_handler;
48+
proton::transaction transaction;
4949
proton::connection connection;
5050
public:
5151
tx_send(const std::string &s, int c, int b):
@@ -55,34 +55,48 @@ class tx_send : public proton::messaging_handler, proton::transaction_handler {
5555
container = &c; // TODO: Fix error
5656
sender = c.open_sender(url);
5757
connection = sender.connection();
58-
transaction = NULL;
59-
c.declare_transaction(connection, transaction_handler);
58+
std::cout << " [on_container_start] declare_txn started..." << std::endl;
59+
transaction = c.declare_transaction(connection, *this);
60+
std::cout << " [on_container_start] completed!! txn: " << &transaction << std::endl;
6061
}
6162

63+
void on_transaction_aborted(proton::transaction&) {}
64+
void on_transaction_declare_failed(proton::transaction &) {}
65+
void on_transaction_commit_failed(proton::transaction&) {}
66+
67+
6268
void on_transaction_declared(proton::transaction &t) override {
63-
transaction = &t;
64-
send();
69+
std::cout<<"[on_transaction_declared] txn: "<<(&transaction)<< " new_txn: "<<(&t)<<std::endl;
70+
connection.close();
71+
// transaction = &t;
72+
// ASSUME: THIS FUNCTION DOESN"T WORK
73+
// send();
6574
}
6675

6776
void on_sendable(proton::sender &s) override {
68-
send();
77+
// send();
78+
// std::cout<<" [OnSendable] transaction: "<< &transaction << std::endl;
79+
// send(s);
6980
}
7081

71-
void send() {
82+
void send(proton::sender &s) {
7283
// TODO: Add more condition in while loop
73-
while (transaction && sender.credit() && (committed + current_batch) < total)
84+
// transaction != null
85+
while ( sender.credit() && (committed + current_batch) < total)
7486
{
7587
proton::message msg;
7688
std::map<std::string, int> m;
7789
m["sequence"] = committed + current_batch;
7890

7991
msg.id(committed + current_batch + 1);
8092
msg.body(m);
93+
transaction.send(sender, msg);
8194
current_batch += 1;
8295
if(current_batch == batch_size)
8396
{
84-
transaction->commit();
85-
transaction = NULL;
97+
transaction.commit();
98+
// WE DON"T CARE ANY MORE FOR NOW
99+
// transaction = NULL;
86100
}
87101
}
88102

@@ -94,13 +108,14 @@ class tx_send : public proton::messaging_handler, proton::transaction_handler {
94108

95109
void on_transaction_committed(proton::transaction &t) override {
96110
committed += current_batch;
111+
std::cout<<" [OnTxnCommitted] Committed:"<< committed<< std::endl;
97112
if(committed == total) {
98113
std::cout << "All messages committed";
99-
connection.close();
114+
// connection.close();
100115
}
101116
else {
102-
current_batch = 0;
103-
container->declare_transaction(connection, transaction_handler);
117+
// current_batch = 0;
118+
// container->declare_transaction(connection, transaction_handler);
104119
}
105120
}
106121

@@ -112,7 +127,7 @@ class tx_send : public proton::messaging_handler, proton::transaction_handler {
112127

113128
int main(int argc, char **argv) {
114129
std::string address("127.0.0.1:5672/examples");
115-
int message_count = 100;
130+
int message_count = 10;
116131
int batch_size = 10;
117132
example::options opts(argc, argv);
118133

cpp/include/proton/tracker.hpp

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@
2626
#include "./internal/export.hpp"
2727
#include "./transfer.hpp"
2828
#include "./messaging_handler.hpp"
29-
#include "./transaction.hpp"
3029

3130
/// @file
3231
/// @copybrief proton::tracker

cpp/include/proton/transaction.hpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ PN_CPP_CLASS_EXTERN transaction {
4444
PN_CPP_EXTERN virtual void commit();
4545
PN_CPP_EXTERN virtual void abort();
4646
PN_CPP_EXTERN virtual void declare();
47+
PN_CPP_EXTERN virtual void handle_outcome(proton::tracker);
4748
PN_CPP_EXTERN virtual proton::tracker send(proton::sender s, proton::message msg);
4849
};
4950

cpp/include/proton/transfer.hpp

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -78,9 +78,9 @@ class transfer : public internal::object<pn_delivery_t> {
7878
PN_CPP_EXTERN bool settled() const;
7979

8080
// Set transaction
81-
PN_CPP_EXTERN void transaction(transaction& t);
81+
PN_CPP_EXTERN void transaction(transaction t);
8282

83-
PN_CPP_EXTERN class transaction* transaction() const;
83+
PN_CPP_EXTERN class transaction transaction() const;
8484

8585
/// Set user data on this transfer.
8686
PN_CPP_EXTERN void user_data(void* user_data) const;
@@ -93,9 +93,9 @@ class transfer : public internal::object<pn_delivery_t> {
9393
/// @endcond
9494
};
9595

96-
/// Human-readalbe name of the transfer::state
96+
/// Human-readable name of the transfer::state
9797
PN_CPP_EXTERN std::string to_string(enum transfer::state);
98-
/// Human-readalbe name of the transfer::state
98+
/// Human-readable name of the transfer::state
9999
PN_CPP_EXTERN std::ostream& operator<<(std::ostream&, const enum transfer::state);
100100

101101
} // proton

cpp/src/contexts.hpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
#include "reconnect_options_impl.hpp"
2626

2727
#include "proton/work_queue.hpp"
28+
#include "proton/transaction.hpp"
2829
#include "proton/message.hpp"
2930

3031
#include "proton/object.h"
@@ -162,7 +163,7 @@ class transfer_context : public context {
162163
transfer_context() : user_data_(nullptr) {}
163164
static transfer_context& get(pn_delivery_t* s);
164165

165-
transaction* transaction_;
166+
std::unique_ptr<transaction> transaction_;
166167
void* user_data_;
167168
};
168169

cpp/src/messaging_adapter.cpp

Lines changed: 67 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
#include "proton/receiver_options.hpp"
3131
#include "proton/sender.hpp"
3232
#include "proton/sender_options.hpp"
33+
#include "proton/target_options.hpp"
3334
#include "proton/session.hpp"
3435
#include "proton/tracker.hpp"
3536
#include "proton/transport.hpp"
@@ -69,6 +70,11 @@ void on_link_flow(messaging_handler& handler, pn_event_t* event) {
6970
// TODO: process session flow data, if no link-specific data, just return.
7071
if (!lnk) return;
7172
int state = pn_link_state(lnk);
73+
if (pn_terminus_get_type(pn_link_remote_target(lnk))==PN_COORDINATOR) {
74+
std::cout << " on_link_flow, type: PN_COORDINATOR" << std::endl;
75+
return;
76+
77+
}
7278
if ((state&PN_LOCAL_ACTIVE) && (state&PN_REMOTE_ACTIVE)) {
7379
link_context& lctx = link_context::get(lnk);
7480
if (pn_link_is_sender(lnk)) {
@@ -115,8 +121,14 @@ void on_delivery(messaging_handler& handler, pn_event_t* event) {
115121
pn_delivery_t *dlv = pn_event_delivery(event);
116122
link_context& lctx = link_context::get(lnk);
117123
Tracing& ot = Tracing::getTracing();
124+
if (pn_terminus_get_type(pn_link_remote_target(lnk))==PN_COORDINATOR) {
125+
std::cout<< " on_delivery: COOORINDATOR.. " << &handler << std::endl;
126+
tracker t(make_wrapper<tracker>(dlv));
127+
std::cout<< " on_delivery: COOORINDATOR.. tracker" << &t << std::endl;
128+
handler.on_tracker_settle(t);
129+
}
118130

119-
if (pn_link_is_receiver(lnk)) {
131+
else if (pn_link_is_receiver(lnk)) {
120132
delivery d(make_wrapper<delivery>(dlv));
121133
if (pn_delivery_aborted(dlv)) {
122134
pn_delivery_settle(dlv);
@@ -274,23 +286,73 @@ void on_link_local_open(messaging_handler& handler, pn_event_t* event) {
274286

275287
void on_link_remote_open(messaging_handler& handler, pn_event_t* event) {
276288
auto lnk = pn_event_link(event);
277-
// Currently don't implement (transaction) coordinator
289+
int type = pn_terminus_get_type(pn_link_remote_target(lnk));
290+
std::cout << " on_link_remote_open, type:" << type << std::endl;
278291
if (pn_terminus_get_type(pn_link_remote_target(lnk))==PN_COORDINATOR) {
279-
auto error = pn_link_condition(lnk);
280-
pn_condition_set_name(error, "amqp:not-implemented");
281-
pn_link_close(lnk);
292+
auto cond = pn_link_condition(lnk);
293+
if (pn_condition_is_set(cond)) {
294+
std::cout<<" Got condition on_link_remote_open(.PN_COORDINATOR): "
295+
<< pn_event_type_name(pn_event_type(event)) << " "
296+
<< pn_condition_get_name(cond) << " "
297+
<< pn_condition_get_description(cond) << std::endl;
298+
299+
pn_condition_set_name(cond, "amqp:on_link_remote_open:FAILED");
300+
pn_link_close(lnk);
301+
return;
302+
}
303+
std::cout<<" IN on_link_remote_open(.PN_COORDINATOR) success " << std::endl;
304+
305+
// WHY???
306+
// pn_terminus_copy(pn_link_source(lnk), pn_link_remote_source(lnk));
307+
// pn_terminus_copy(pn_link_target(lnk), pn_link_remote_target(lnk));
308+
309+
// We need a new class?
310+
// auto coordinator = pn_link_remote_target(lnk);
311+
312+
313+
// proton::target_options to;
314+
// std::vector<symbol> cap = {proton::symbol("amqp:local-transactions")};
315+
// to.capabilities(cap);
316+
// to.type(PN_COORDINATOR);
317+
318+
// proton::receiver_options ro;
319+
// ro.name("txn-ctrl");
320+
// ro.target(to);
321+
// ro.handler(handler);
322+
// receiver r(make_wrapper<receiver>(lnk));
323+
324+
// proton::receiver rcv = r.connection().open_receiver("does not matter", ro);
325+
std::cout<<" IN on_link_remote_open(.PN_COORDINATOR) have handler " << &handler << std::endl;
326+
327+
// handler.on_receiver_open(rcv);
328+
// credit_topup(lnk);
329+
330+
// pn_delivery_t *dlv = pn_event_delivery(event);
331+
// tracker t(make_wrapper<tracker>(dlv));
332+
333+
// // sender s(make_wrapper<sender>(lnk));
334+
// handler.on_tracker_settle(t);
335+
// TODO: find what to do...
336+
// HAHA.. treating coordinator like sender...
337+
// sender s(make_wrapper<sender>(lnk));
338+
// handler.on_sender_open(s);
339+
// pn_link_close(lnk);
282340
return;
283341
}
284342
if (pn_link_state(lnk) & PN_LOCAL_UNINIT) { // Incoming link
285343
// Copy source and target from remote end.
344+
std::cout<<" Inside on_link_remote_open() .. PN_LOCAL_UNINIT " << std::endl;
345+
286346
pn_terminus_copy(pn_link_source(lnk), pn_link_remote_source(lnk));
287347
pn_terminus_copy(pn_link_target(lnk), pn_link_remote_target(lnk));
288348
}
289349
if (pn_link_is_receiver(lnk)) {
350+
std::cout<<" Inside on_link_remote_open() .. pn_link_is_receiver " << std::endl;
290351
receiver r(make_wrapper<receiver>(lnk));
291352
handler.on_receiver_open(r);
292353
credit_topup(lnk);
293354
} else {
355+
std::cout<<" Inside on_link_remote_open() .. sender " << std::endl;
294356
sender s(make_wrapper<sender>(lnk));
295357
handler.on_sender_open(s);
296358
}

cpp/src/proactor_container_impl.cpp

Lines changed: 17 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
#include "proton/listener.h"
3535
#include "proton/proactor.h"
3636
#include "proton/transport.h"
37+
#include "proton/transaction.hpp"
3738

3839
#include "contexts.hpp"
3940
#include "messaging_adapter.hpp"
@@ -49,7 +50,7 @@
4950
#include <random>
5051

5152
// XXXX: Debug
52-
//#include <iostream>
53+
#include <iostream>
5354

5455
namespace proton {
5556

@@ -868,9 +869,11 @@ transaction container::impl::declare_transaction(proton::connection conn, proton
868869
class InternalTransactionHandler : public proton::messaging_handler {
869870
// TODO: auto_settle
870871
void on_tracker_settle(proton::tracker &t) override {
871-
if(t.transaction()) {
872-
//t.transaction()->handle_outcome(t);
873-
}
872+
std::cout<<" [InternalTransactionHandler][on_tracker_settle] called with tracker.txn"
873+
<< std::endl;
874+
// if(t.transaction()) {
875+
t.transaction().handle_outcome(t);
876+
// }
874877
}
875878

876879
// TODO: Add on_unhandled function
@@ -882,23 +885,28 @@ transaction container::impl::declare_transaction(proton::connection conn, proton
882885
proton::target_options t;
883886
std::vector<symbol> cap = {proton::symbol("amqp:local-transactions")};
884887
t.capabilities(cap);
885-
// Type PN_COORDINATOR value is 3. It is a special target identifying a transaction coordinator.
886-
// TODO: Change the type from int to enum.
887-
t.type(3);
888+
t.type(PN_COORDINATOR);
888889

889890
proton::sender_options so;
890891
so.name("txn-ctrl");
891892
// Todo: Check the value, Or by deafult null?
892893
//so.source() ?
893894
so.target(t);
894-
InternalTransactionHandler internal_handler; // internal_handler going out of scope. Fix it
895+
// TODO: FIX STATIC
896+
static InternalTransactionHandler internal_handler; // internal_handler going out of scope. Fix it
895897
so.handler(internal_handler);
898+
std::cout<<" [declare_transaction] txn-name sender open with handler: " << &internal_handler << std::endl;
899+
896900
proton::sender s = conn.open_sender("does not matter", so);
897901

898902
settle_before_discharge = false;
899903

900-
return mk_transaction_impl(s, handler, settle_before_discharge);
904+
std::cout<<" [declare_transaction] calling mk_transaction_impl" << std::endl;
905+
906+
auto txn = mk_transaction_impl(s, handler, settle_before_discharge);
907+
std::cout<<" [declare_transaction] txn address:" << &txn << std::endl;
901908

909+
return txn;
902910
}
903911

904912
}

0 commit comments

Comments
 (0)