@@ -160,6 +160,49 @@ void* session::user_data() const {
160160 return sctx.user_data_ ;
161161}
162162
163+
164+
165+ class transaction_impl {
166+ public:
167+ proton::sender txn_ctrl;
168+ proton::transaction_handler *handler = nullptr ;
169+ proton::binary id;
170+ proton::tracker _declare;
171+ proton::tracker _discharge;
172+ bool failed = false ;
173+ enum State {
174+ FREE,
175+ DECLARING,
176+ DECLARED,
177+ DISCHARE,
178+ };
179+ enum State state = State::FREE;
180+ std::vector<proton::tracker> pending;
181+
182+ void commit ();
183+ void abort ();
184+ void declare ();
185+ proton::tracker send (proton::sender s, proton::message msg);
186+
187+ void discharge (bool failed);
188+ void release_pending ();
189+ void accept (delivery &d);
190+ void update (tracker &d, uint64_t state);
191+ void set_id (binary _id);
192+
193+ proton::tracker send_ctrl (proton::symbol descriptor, proton::value _value);
194+ void handle_outcome (proton::tracker t);
195+ transaction_impl (proton::sender &_txn_ctrl,
196+ proton::transaction_handler &_handler,
197+ bool _settle_before_discharge);
198+ ~transaction_impl ();
199+ // delete copy and assignment operator to ensure no copy of this object is
200+ // every made transaction_impl(const transaction_impl&) = delete;
201+ // transaction_impl& operator=(const transaction_impl&) = delete;
202+ };
203+
204+
205+
163206// session_context& session::get_session_context() {
164207// return session_context::get(pn_object());
165208// }
@@ -208,11 +251,11 @@ void session::declare_transaction(proton::transaction_handler &handler, bool set
208251 std::cout<<" [declare_transaction] calling mk_transaction_impl" << std::endl;
209252
210253 // get the _txn_impl from session_context.
211- txn_impl. reset ( new transaction_impl (s, handler, settle_before_discharge) );
254+ txn_impl = new transaction_impl (s, handler, settle_before_discharge);
212255 // std::cout << "Session object UPdated: " << _txn_impl << "For session at: " << this << std::endl;
213256 // std::cout<<" [declare_transaction] txn address:" << (void*)_txn_impl << std::endl;
214257
215- // _txn = txn;
258+ // _txn = txn ;
216259 // return _txn_impl;
217260 }
218261 // Declare txn
@@ -229,7 +272,10 @@ void session::declare_transaction(proton::transaction_handler &handler, bool set
229272 // : _impl(impl) {}
230273// transaction::transaction( transaction_impl* impl): _impl(impl){}
231274// transaction::~transaction() = default;
232- void session::txn_commit () {session_context::get (pn_object ())._txn_impl ->commit (); }
275+
276+ // clean up txn internally
277+ void session::txn_delete () { auto &_txn_impl = session_context::get (pn_object ())._txn_impl ; delete _txn_impl; _txn_impl = nullptr ;}
278+ void session::txn_commit () { session_context::get (pn_object ())._txn_impl ->commit (); }
233279void session::txn_abort () { session_context::get (pn_object ())._txn_impl ->abort (); }
234280void session::txn_declare () { session_context::get (pn_object ())._txn_impl ->declare (); }
235281bool session::txn_is_empty () { return session_context::get (pn_object ())._txn_impl == NULL ; }
@@ -253,12 +299,18 @@ void session::txn_handle_outcome(proton::tracker t) {
253299// return _txn_impl->txn_ctrl.connection();
254300// }
255301
302+
256303transaction_impl::transaction_impl (proton::sender &_txn_ctrl,
257304 proton::transaction_handler &_handler,
258305 bool _settle_before_discharge)
259306 : txn_ctrl(_txn_ctrl), handler(&_handler) {
260307 // bool settle_before_discharge = _settle_before_discharge;
261308 // declare();
309+
310+ std::cout<<" # transaction_impl created!" << std::endl;
311+ }
312+ transaction_impl::~transaction_impl () {
313+ std::cout<<" # transaction_impl deleted!" << std::endl;
262314}
263315
264316void transaction_impl::commit () {
@@ -291,6 +343,10 @@ void transaction_impl::declare() {
291343}
292344
293345void transaction_impl::discharge (bool _failed) {
346+ if (state != transaction_impl::State::DECLARED)
347+ throw proton::error (" Only a delcared txn can be discharged." );
348+ state = State::DISCHARE;
349+
294350 failed = _failed;
295351 proton::symbol descriptor (" amqp:discharge:list" );
296352 std::list<proton::value> vd;
@@ -328,6 +384,8 @@ proton::tracker transaction_impl::send_ctrl(proton::symbol descriptor, proton::v
328384}
329385
330386proton::tracker transaction_impl::send (proton::sender s, proton::message msg) {
387+ if (state != transaction_impl::State::DECLARED)
388+ throw proton::error (" Only a delcared txn can send a message" );
331389 proton::tracker tracker = s.send (msg);
332390 std::cout << " transaction_impl::send " << id << " , done: " << msg
333391 << " tracker: " << tracker << std::endl;
@@ -395,14 +453,20 @@ void transaction_impl::handle_outcome(proton::tracker t) {
395453 std::cout << " transaction_impl: handle_outcome.. "
396454 " txn_declared_failed pn_disposition_is_failed "
397455 << std::endl;
456+
398457 state = State::FREE;
458+ t.session ().txn_delete ();
399459 handler->on_transaction_declare_failed (t.session ());
460+ return ;
400461 } else {
401462 std::cout
402463 << " transaction_impl: handle_outcome.. txn_declared_failed "
403464 << std::endl;
465+ // state = State::BAD;
404466 state = State::FREE;
467+ t.session ().txn_delete ();
405468 handler->on_transaction_declare_failed (t.session ());
469+ return ;
406470 }
407471 } else if (_discharge == t) {
408472 if (pn_disposition_is_failed (disposition)) {
@@ -411,23 +475,29 @@ void transaction_impl::handle_outcome(proton::tracker t) {
411475 << " transaction_impl: handle_outcome.. commit failed "
412476 << std::endl;
413477 state = State::FREE;
478+ t.session ().txn_delete ();
414479 handler->on_transaction_commit_failed (t.session ());
415480 // release pending
481+ return ;
416482 }
417483 } else {
418484 if (failed) {
419485 state = State::FREE;
486+ t.session ().txn_delete ();
420487 handler->on_transaction_aborted (t.session ());
421488 std::cout
422489 << " transaction_impl: handle_outcome.. txn aborted"
423490 << std::endl;
424491 // release pending
492+ return ;
425493 } else {
426494 state = State::FREE;
495+ t.session ().txn_delete ();
427496 handler->on_transaction_committed (t.session ());
428497 std::cout
429498 << " transaction_impl: handle_outcome.. txn commited"
430499 << std::endl;
500+ return ;
431501 }
432502 }
433503 pending.clear ();
0 commit comments