Skip to content

Commit f8eff19

Browse files
committed
Add coordinator class
1 parent 8f36a12 commit f8eff19

File tree

13 files changed

+158
-19
lines changed

13 files changed

+158
-19
lines changed

cpp/examples/tx_recv.cpp

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ class tx_recv : public proton::messaging_handler, proton::transaction_handler {
4646
int committed = 0;
4747

4848
proton::session session;
49-
proton::transaction transaction;
49+
// proton::transaction transaction;
5050
public:
5151
tx_recv(const std::string &s, int c, int b):
5252
url(s), expected(c), batch_size(b) {}
@@ -62,38 +62,38 @@ class tx_recv : public proton::messaging_handler, proton::transaction_handler {
6262
std::cout << " [on_session_open] declare_txn ended..." << std::endl;
6363
}
6464

65-
void on_transaction_declare_failed(proton::transaction) {}
66-
void on_transaction_commit_failed(proton::transaction t) {
65+
void on_transaction_declare_failed(proton::session) {}
66+
void on_transaction_commit_failed(proton::session s) {
6767
std::cout << "Transaction Commit Failed" << std::endl;
68-
t.connection().close();
68+
s.connection().close();
6969
exit(-1);
7070
}
7171

72-
void on_transaction_declared(proton::transaction t) override {
73-
std::cout << "[on_transaction_declared] txn called " << (&t)
72+
void on_transaction_declared(proton::session s) override {
73+
std::cout << "[on_transaction_declared] txn called " << (&s)
7474
<< std::endl;
75-
std::cout << "[on_transaction_declared] txn is_empty " << (t.is_empty())
76-
<< "\t" << transaction.is_empty() << std::endl;
75+
// std::cout << "[on_transaction_declared] txn is_empty " << (t.is_empty())
76+
// << "\t" << transaction.is_empty() << std::endl;
7777
receiver.add_credit(batch_size);
78-
transaction = t;
78+
// transaction = t;
7979
}
8080

8181
void on_message(proton::delivery &d, proton::message &msg) override {
8282
std::cout<<"# MESSAGE: " << msg.id() <<": " << msg.body() << std::endl;
83-
transaction.accept(d);
83+
session.txn_accept(d);
8484
current_batch += 1;
8585
if(current_batch == batch_size) {
86-
transaction = proton::transaction(); // null
86+
// transaction = proton::transaction(); // null
8787
}
8888
}
8989

90-
void on_transaction_committed(proton::transaction t) override {
90+
void on_transaction_committed(proton::session s) override {
9191
committed += current_batch;
9292
current_batch = 0;
9393
std::cout<<" [OnTxnCommitted] Committed:"<< committed<< std::endl;
9494
if(committed == expected) {
9595
std::cout << "All messages committed" << std::endl;
96-
t.connection().close();
96+
s.connection().close();
9797
}
9898
else {
9999
session.declare_transaction(*this);

cpp/examples/tx_send.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ class tx_send : public proton::messaging_handler, proton::transaction_handler {
8989

9090
void send(proton::sender &s) {
9191
static int unique_id = 10000;
92-
while (!session.txn_is_empty() && sender.credit() &&
92+
while (session.txn_is_declared() && sender.credit() &&
9393
(committed + current_batch) < total) {
9494
proton::message msg;
9595
std::map<std::string, int> m;

cpp/include/proton/fwd.hpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ class sender_options;
5252
class session;
5353
class session_options;
5454
class source_options;
55+
class coordinator_options;
5556
class ssl;
5657
class target_options;
5758
class tracker;

cpp/include/proton/sender_options.hpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,9 @@ class sender_options {
9494
/// Options for the receiver node of the receiver.
9595
PN_CPP_EXTERN sender_options& target(const target_options&);
9696

97+
/// Options for the coordinator node of the receiver.
98+
PN_CPP_EXTERN sender_options& coordinator(const coordinator_options&);
99+
97100
/// Set the link name. If not set a unique name is generated.
98101
PN_CPP_EXTERN sender_options& name(const std::string& name);
99102

cpp/include/proton/session.hpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,7 @@ PN_CPP_CLASS_EXTERN session : public internal::object<pn_session_t>, public endp
117117
// PN_CPP_EXTERN transaction();
118118
// PN_CPP_EXTERN ~transaction();
119119
PN_CPP_EXTERN bool txn_is_empty();
120+
PN_CPP_EXTERN bool txn_is_declared();
120121
PN_CPP_EXTERN void txn_commit();
121122
PN_CPP_EXTERN void txn_abort();
122123
PN_CPP_EXTERN void txn_declare();
@@ -156,6 +157,7 @@ class transaction_impl {
156157
proton::tracker _declare;
157158
proton::tracker _discharge;
158159
bool failed = false;
160+
bool is_declared = false;
159161
std::vector<proton::tracker> pending;
160162

161163
void commit();

cpp/include/proton/target.hpp

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,29 @@ class target : public terminus {
6565
/// @endcond
6666
};
6767

68+
69+
/// TODO: A point of coordinator for messages.
70+
///
71+
class coordinator : public terminus {
72+
public:
73+
/// Create an empty coordinator.
74+
coordinator() = default;
75+
76+
/// The address of the coordinator.
77+
PN_CPP_EXTERN std::string address() const;
78+
private:
79+
coordinator(pn_terminus_t* t);
80+
coordinator(const sender&);
81+
coordinator(const receiver&);
82+
83+
84+
/// @cond INTERNAL
85+
friend class proton::internal::factory<coordinator>;
86+
friend class sender;
87+
friend class receiver;
88+
/// @endcond
89+
};
90+
6891
} // proton
6992

7093
#endif // PROTON_TARGET_HPP

cpp/include/proton/target_options.hpp

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,39 @@ class target_options {
103103
/// @endcond
104104
};
105105

106+
class coordinator_options {
107+
public:
108+
/// Create an empty set of options.
109+
PN_CPP_EXTERN coordinator_options();
110+
111+
/// Copy options.
112+
PN_CPP_EXTERN coordinator_options(const coordinator_options&);
113+
114+
PN_CPP_EXTERN ~coordinator_options();
115+
116+
/// Copy options.
117+
PN_CPP_EXTERN coordinator_options& operator=(const coordinator_options&);
118+
119+
/// Set the address for the coordinator. It is unset by default. The
120+
/// address is ignored if dynamic() is true.
121+
PN_CPP_EXTERN coordinator_options& address(const std::string& addr);
122+
123+
/// **Unsettled API** Extension capabilities that are supported/requested
124+
PN_CPP_EXTERN coordinator_options& capabilities(const std::vector<symbol>&);
125+
126+
private:
127+
void apply(coordinator&) const;
128+
129+
class impl;
130+
std::unique_ptr<impl> impl_;
131+
132+
/// @cond INTERNAL
133+
friend class coordinator;
134+
friend class sender_options;
135+
friend class receiver_options;
136+
/// @endcond
137+
};
138+
106139
} // proton
107140

108141
#endif // PROTON_TARGET_OPTIONS_HPP

cpp/include/proton/terminus.hpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,7 @@ class terminus {
122122
friend class internal::factory<terminus>;
123123
friend class source;
124124
friend class target;
125+
friend class coordinator;
125126
/// @endcond
126127
};
127128

cpp/src/node_options.cpp

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -209,5 +209,36 @@ target_options& target_options::type(int t) { impl_->type = t; return *this;}
209209
void target_options::apply(target& s) const { impl_->apply(s); }
210210

211211

212+
class coordinator_options::impl {
213+
public:
214+
option<std::string> address;
215+
option<std::vector<symbol> > capabilities;
216+
217+
void apply(coordinator& c) {
218+
if(address.set) {
219+
pn_terminus_set_address(unwrap(c), address.value.c_str());
220+
}
221+
if (capabilities.set) {
222+
value(pn_terminus_capabilities(unwrap(c))) = capabilities.value;
223+
}
224+
pn_terminus_set_type(unwrap(c), pn_terminus_type_t(PN_COORDINATOR)); ;
225+
}
226+
};
227+
228+
coordinator_options::coordinator_options() : impl_(new impl()) {}
229+
coordinator_options::coordinator_options(const coordinator_options& x) : impl_(new impl()) {
230+
*this = x;
231+
}
232+
coordinator_options::~coordinator_options() = default;
233+
234+
coordinator_options& coordinator_options::operator=(const coordinator_options& x) {
235+
*impl_ = *x.impl_;
236+
return *this;
237+
}
238+
239+
coordinator_options& coordinator_options::address(const std::string &addr) { impl_->address = addr; return *this; }
240+
coordinator_options& coordinator_options::capabilities(const std::vector<symbol>& c) { impl_->capabilities = c; return *this; }
241+
void coordinator_options::apply(coordinator& c) const { impl_->apply(c); }
242+
212243

213244
} // namespace proton

cpp/src/proton_bits.hpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@ class acceptor;
6969
class terminus;
7070
class source;
7171
class target;
72+
class coordinator;
7273
class reactor;
7374
class messaging_handler;
7475

@@ -109,6 +110,7 @@ template <> struct wrapped<error_condition> { typedef pn_condition_t type; };
109110
template <> struct wrapped<terminus> { typedef pn_terminus_t type; };
110111
template <> struct wrapped<source> { typedef pn_terminus_t type; };
111112
template <> struct wrapped<target> { typedef pn_terminus_t type; };
113+
template <> struct wrapped<coordinator> { typedef pn_terminus_t type; };
112114

113115
template <class T> struct wrapper {};
114116
template <> struct wrapper<pn_data_t> { typedef internal::data type; };

0 commit comments

Comments
 (0)