Skip to content

Commit cca407b

Browse files
committed
Allow multiple transactions but one at a time
1 parent f8eff19 commit cca407b

File tree

4 files changed

+61
-46
lines changed

4 files changed

+61
-46
lines changed

cpp/examples/tx_send.cpp

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,6 @@ class tx_send : public proton::messaging_handler, proton::transaction_handler {
109109
} else {
110110
session.txn_abort();
111111
}
112-
// TODO: Only one transaction is permitted per session.
113112
batch_index++;
114113
}
115114
}
@@ -148,7 +147,7 @@ class tx_send : public proton::messaging_handler, proton::transaction_handler {
148147

149148
int main(int argc, char **argv) {
150149
std::string address("127.0.0.1:5672/examples");
151-
int message_count = 3;
150+
int message_count = 6;
152151
int batch_size = 3;
153152
example::options opts(argc, argv);
154153

cpp/include/proton/session.hpp

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -157,7 +157,12 @@ class transaction_impl {
157157
proton::tracker _declare;
158158
proton::tracker _discharge;
159159
bool failed = false;
160-
bool is_declared = false;
160+
enum State {
161+
FREE,
162+
DECLARING,
163+
DECLARED,
164+
};
165+
enum State state = State::FREE;
161166
std::vector<proton::tracker> pending;
162167

163168
void commit();

cpp/include/proton/transaction.hpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@
3636
namespace proton {
3737

3838
class transaction_handler;
39-
class transaction_impl;
39+
// class transaction_impl;
4040

4141
// TODO: This should not be accessible to users.
4242
// class transaction_impl {

cpp/src/session.cpp

Lines changed: 53 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -166,53 +166,58 @@ void* session::user_data() const {
166166

167167
// TODO : WE ARE NOT RETURNING TRANSACTION FOR NOW.
168168
void session::declare_transaction(proton::transaction_handler &handler, bool settle_before_discharge) {
169-
if (session_context::get(pn_object())._txn_impl != nullptr)
170-
throw proton::error("Transaction is already declared for this session");
171-
proton::connection conn = this->connection();
172-
class InternalTransactionHandler : public proton::messaging_handler {
173-
// TODO: auto_settle
174-
175-
void on_tracker_settle(proton::tracker &t) override {
176-
std::cout<<" [InternalTransactionHandler][on_tracker_settle] called with tracker.txn"
177-
<< std::endl;
178-
if (!t.session().txn_is_empty()) {
179-
std::cout<<" [InternalTransactionHandler] Inside if condition" << std::endl;
180-
t.session().txn_handle_outcome(t);
169+
auto &txn_impl = session_context::get(pn_object())._txn_impl;
170+
if (txn_impl == nullptr) {
171+
// Create _txn_impl
172+
proton::connection conn = this->connection();
173+
class InternalTransactionHandler : public proton::messaging_handler {
174+
// TODO: auto_settle
175+
176+
void on_tracker_settle(proton::tracker &t) override {
177+
std::cout<<" [InternalTransactionHandler][on_tracker_settle] called with tracker.txn"
178+
<< std::endl;
179+
if (!t.session().txn_is_empty()) {
180+
std::cout<<" [InternalTransactionHandler] Inside if condition" << std::endl;
181+
t.session().txn_handle_outcome(t);
182+
}
181183
}
182-
}
183-
};
184+
};
184185

185-
// proton::target_options t;
186-
// std::vector<symbol> cap = {proton::symbol("amqp:local-transactions")};
187-
// t.capabilities(cap);
188-
// t.type(PN_COORDINATOR);
186+
// proton::target_options t;
187+
// std::vector<symbol> cap = {proton::symbol("amqp:local-transactions")};
188+
// t.capabilities(cap);
189+
// t.type(PN_COORDINATOR);
189190

190-
proton::coordinator_options co;
191-
std::vector<symbol> cap = {proton::symbol("amqp:local-transactions")};
192-
co.capabilities(cap);
191+
proton::coordinator_options co;
192+
std::vector<symbol> cap = {proton::symbol("amqp:local-transactions")};
193+
co.capabilities(cap);
193194

194195

195-
proton::sender_options so;
196-
so.name("txn-ctrl");
197-
so.coordinator(co);
196+
proton::sender_options so;
197+
so.name("txn-ctrl");
198+
so.coordinator(co);
198199

199-
static InternalTransactionHandler internal_handler; // internal_handler going out of scope. Fix it
200-
so.handler(internal_handler);
201-
std::cout<<" [declare_transaction] txn-name sender open with handler: " << &internal_handler << std::endl;
200+
static InternalTransactionHandler internal_handler; // internal_handler going out of scope. Fix it
201+
so.handler(internal_handler);
202+
std::cout<<" [declare_transaction] txn-name sender open with handler: " << &internal_handler << std::endl;
202203

203-
static proton::sender s = conn.open_sender("does not matter", so);
204+
static proton::sender s = conn.open_sender("does not matter", so);
204205

205-
settle_before_discharge = false;
206+
settle_before_discharge = false;
206207

207-
std::cout<<" [declare_transaction] calling mk_transaction_impl" << std::endl;
208+
std::cout<<" [declare_transaction] calling mk_transaction_impl" << std::endl;
208209

209-
// get the _txn_impl from session_context.
210-
session_context::get(pn_object())._txn_impl.reset(new transaction_impl(s, handler, settle_before_discharge));
211-
// std::cout << "Session object UPdated: " << _txn_impl << "For session at: " << this << std::endl;
212-
// std::cout<<" [declare_transaction] txn address:" << (void*)_txn_impl << std::endl;
210+
// get the _txn_impl from session_context.
211+
txn_impl.reset(new transaction_impl(s, handler, settle_before_discharge));
212+
// std::cout << "Session object UPdated: " << _txn_impl << "For session at: " << this << std::endl;
213+
// std::cout<<" [declare_transaction] txn address:" << (void*)_txn_impl << std::endl;
214+
215+
// _txn = txn;
216+
// return _txn_impl;
217+
}
218+
// Declare txn
219+
txn_impl->declare();
213220

214-
// _txn = txn;
215-
// return _txn_impl;
216221
}
217222

218223
// transaction::transaction() : _impl(NULL) {} // empty transaction, not yet ready
@@ -228,7 +233,7 @@ void session::txn_commit() {session_context::get(pn_object())._txn_impl->commit(
228233
void session::txn_abort() { session_context::get(pn_object())._txn_impl->abort(); }
229234
void session::txn_declare() { session_context::get(pn_object())._txn_impl->declare(); }
230235
bool session::txn_is_empty() { return session_context::get(pn_object())._txn_impl == NULL; }
231-
bool session::txn_is_declared() { return (!txn_is_empty()) && session_context::get(pn_object())._txn_impl->is_declared; }
236+
bool session::txn_is_declared() { return (!txn_is_empty()) && session_context::get(pn_object())._txn_impl->state == transaction_impl::State::DECLARED; }
232237
void session::txn_accept(delivery &t) { return session_context::get(pn_object())._txn_impl->accept(t); }
233238
proton::tracker session::txn_send(proton::sender s, proton::message msg) {
234239
return session_context::get(pn_object())._txn_impl->send(s, msg);
@@ -251,22 +256,23 @@ void session::txn_handle_outcome(proton::tracker t) {
251256
transaction_impl::transaction_impl(proton::sender &_txn_ctrl,
252257
proton::transaction_handler &_handler,
253258
bool _settle_before_discharge)
254-
: txn_ctrl(_txn_ctrl), handler(&_handler), is_declared(false) {
259+
: txn_ctrl(_txn_ctrl), handler(&_handler) {
255260
// bool settle_before_discharge = _settle_before_discharge;
256-
declare();
261+
// declare();
257262
}
258263

259264
void transaction_impl::commit() {
260265
discharge(false);
261-
is_declared = false;
262266
}
263267

264268
void transaction_impl::abort() {
265269
discharge(true);
266-
is_declared = false;
267270
}
268271

269272
void transaction_impl::declare() {
273+
if (state != transaction_impl::State::FREE)
274+
throw proton::error("This session has some associcated transaction already");
275+
state = State::DECLARING;
270276
std::cout<<" [transaction_impl][declare] staring it" << std::endl;
271277

272278
proton::symbol descriptor("amqp:declare:list");
@@ -383,17 +389,19 @@ void transaction_impl::handle_outcome(proton::tracker t) {
383389
std::cout << " transaction_impl: handle_outcome.. txn_declared "
384390
"got txnid:: "
385391
<< vd[0] << std::endl;
386-
is_declared = true;
392+
state = State::DECLARED;
387393
handler->on_transaction_declared(t.session());
388394
} else if (pn_disposition_is_failed(disposition)) {
389395
std::cout << " transaction_impl: handle_outcome.. "
390396
"txn_declared_failed pn_disposition_is_failed "
391397
<< std::endl;
398+
state = State::FREE;
392399
handler->on_transaction_declare_failed(t.session());
393400
} else {
394401
std::cout
395402
<< " transaction_impl: handle_outcome.. txn_declared_failed "
396403
<< std::endl;
404+
state = State::FREE;
397405
handler->on_transaction_declare_failed(t.session());
398406
}
399407
} else if (_discharge == t) {
@@ -402,17 +410,20 @@ void transaction_impl::handle_outcome(proton::tracker t) {
402410
std::cout
403411
<< " transaction_impl: handle_outcome.. commit failed "
404412
<< std::endl;
413+
state = State::FREE;
405414
handler->on_transaction_commit_failed(t.session());
406415
// release pending
407416
}
408417
} else {
409418
if (failed) {
419+
state = State::FREE;
410420
handler->on_transaction_aborted(t.session());
411421
std::cout
412422
<< " transaction_impl: handle_outcome.. txn aborted"
413423
<< std::endl;
414424
// release pending
415425
} else {
426+
state = State::FREE;
416427
handler->on_transaction_committed(t.session());
417428
std::cout
418429
<< " transaction_impl: handle_outcome.. txn commited"

0 commit comments

Comments
 (0)