Skip to content

Commit 8f36a12

Browse files
committed
Has errors
1 parent 22ea517 commit 8f36a12

File tree

8 files changed

+594
-317
lines changed

8 files changed

+594
-317
lines changed

cpp/examples/tx_send.cpp

Lines changed: 18 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ class tx_send : public proton::messaging_handler, proton::transaction_handler {
4949
int confirmed = 0;
5050

5151
proton::session session;
52-
proton::transaction transaction;
52+
// proton::transaction transaction;
5353
public:
5454
tx_send(const std::string &s, int c, int b):
5555
url(s), total(c), batch_size(b), sent(0) {}
@@ -65,32 +65,31 @@ class tx_send : public proton::messaging_handler, proton::transaction_handler {
6565
std::cout << " [on_session_open] declare_txn ended..." << std::endl;
6666
}
6767

68-
void on_transaction_declare_failed(proton::transaction) {}
69-
void on_transaction_commit_failed(proton::transaction t) {
68+
void on_transaction_declare_failed(proton::session) {}
69+
void on_transaction_commit_failed(proton::session s) {
7070
std::cout << "Transaction Commit Failed" << std::endl;
71-
t.connection().close();
71+
s.connection().close();
7272
exit(-1);
7373
}
7474

75-
void on_transaction_declared(proton::transaction t) override {
76-
std::cout << "[on_transaction_declared] txn called " << (&t)
75+
void on_transaction_declared(proton::session s) override {
76+
std::cout << "[on_transaction_declared] Session: " << (&s)
7777
<< std::endl;
78-
std::cout << "[on_transaction_declared] txn is_empty " << (t.is_empty())
79-
<< "\t" << transaction.is_empty() << std::endl;
80-
transaction = t;
78+
std::cout << "[on_transaction_declared] txn is_empty " << (s.txn_is_empty())
79+
<< "\t" << std::endl;
8180

8281
send(sender);
8382
}
8483

8584
void on_sendable(proton::sender &s) override {
86-
std::cout << " [OnSendable] transaction: " << &transaction
85+
std::cout << " [OnSendable] session: " << &session
8786
<< std::endl;
8887
send(s);
8988
}
9089

9190
void send(proton::sender &s) {
9291
static int unique_id = 10000;
93-
while (!transaction.is_empty() && sender.credit() &&
92+
while (!session.txn_is_empty() && sender.credit() &&
9493
(committed + current_batch) < total) {
9594
proton::message msg;
9695
std::map<std::string, int> m;
@@ -100,18 +99,17 @@ class tx_send : public proton::messaging_handler, proton::transaction_handler {
10099
msg.body(m);
101100
std::cout << "##### [example] transaction send msg: " << msg
102101
<< std::endl;
103-
transaction.send(sender, msg);
102+
session.txn_send(sender, msg);
104103
current_batch += 1;
105104
if(current_batch == batch_size)
106105
{
107106
std::cout << " >> Txn attempt commit" << std::endl;
108107
if (batch_index % 2 == 0) {
109-
transaction.commit();
108+
session.txn_commit();
110109
} else {
111-
transaction.abort();
110+
session.txn_abort();
112111
}
113-
114-
transaction = proton::transaction();
112+
// TODO: Only one transaction is permitted per session.
115113
batch_index++;
116114
}
117115
}
@@ -123,20 +121,20 @@ class tx_send : public proton::messaging_handler, proton::transaction_handler {
123121
<< std::endl;
124122
}
125123

126-
void on_transaction_committed(proton::transaction t) override {
124+
void on_transaction_committed(proton::session s) override {
127125
committed += current_batch;
128126
current_batch = 0;
129127
std::cout<<" [OnTxnCommitted] Committed:"<< committed<< std::endl;
130128
if(committed == total) {
131129
std::cout << "All messages committed" << std::endl;
132-
t.connection().close();
130+
s.connection().close();
133131
}
134132
else {
135133
session.declare_transaction(*this);
136134
}
137135
}
138136

139-
void on_transaction_aborted(proton::transaction t) override {
137+
void on_transaction_aborted(proton::session s) override {
140138
std::cout << "Meesages Aborted ....." << std::endl;
141139
current_batch = 0;
142140
session.declare_transaction(*this);
@@ -150,7 +148,7 @@ class tx_send : public proton::messaging_handler, proton::transaction_handler {
150148

151149
int main(int argc, char **argv) {
152150
std::string address("127.0.0.1:5672/examples");
153-
int message_count = 6;
151+
int message_count = 3;
154152
int batch_size = 3;
155153
example::options opts(argc, argv);
156154

cpp/include/proton/session.hpp

Lines changed: 57 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,13 +29,15 @@
2929
#include "./sender.hpp"
3030

3131
#include <string>
32+
#include <iostream>
3233

3334
/// @file
3435
/// @copybrief proton::session
3536

3637
struct pn_session_t;
3738

3839
namespace proton {
40+
class transaction_impl;
3941

4042
/// A container of senders and receivers.
4143
class
@@ -105,11 +107,32 @@ PN_CPP_CLASS_EXTERN session : public internal::object<pn_session_t>, public endp
105107
/// Get user data from this session.
106108
PN_CPP_EXTERN void* user_data() const;
107109

108-
PN_CPP_EXTERN transaction declare_transaction(proton::transaction_handler &handler, bool settle_before_discharge = false);
110+
PN_CPP_EXTERN void declare_transaction(proton::transaction_handler &handler, bool settle_before_discharge = false);
111+
112+
113+
// static transaction mk_transaction_impl(sender &s, transaction_handler &h,
114+
// bool f);
115+
// PN_CPP_EXTERN transaction(transaction_impl *impl);
116+
117+
// PN_CPP_EXTERN transaction();
118+
// PN_CPP_EXTERN ~transaction();
119+
PN_CPP_EXTERN bool txn_is_empty();
120+
PN_CPP_EXTERN void txn_commit();
121+
PN_CPP_EXTERN void txn_abort();
122+
PN_CPP_EXTERN void txn_declare();
123+
PN_CPP_EXTERN void txn_handle_outcome(proton::tracker);
124+
PN_CPP_EXTERN proton::tracker txn_send(proton::sender s, proton::message msg);
125+
PN_CPP_EXTERN void txn_accept(delivery &t);
126+
PN_CPP_EXTERN proton::connection txn_connection() const;
127+
128+
// PN_CPP_EXTERN session_context& get_session_context();
129+
130+
// transaction _txn;
109131

110132
/// @cond INTERNAL
111133
friend class internal::factory<session>;
112134
friend class session_iterator;
135+
friend class transaction_impl;
113136
/// @endcond
114137
};
115138

@@ -124,6 +147,39 @@ class session_iterator : public internal::iter_base<session, session_iterator> {
124147
PN_CPP_EXTERN session_iterator operator++();
125148
};
126149

150+
151+
class transaction_impl {
152+
public:
153+
proton::sender txn_ctrl;
154+
proton::transaction_handler *handler = nullptr;
155+
proton::binary id;
156+
proton::tracker _declare;
157+
proton::tracker _discharge;
158+
bool failed = false;
159+
std::vector<proton::tracker> pending;
160+
161+
void commit();
162+
void abort();
163+
void declare();
164+
proton::tracker send(proton::sender s, proton::message msg);
165+
166+
void discharge(bool failed);
167+
void release_pending();
168+
void accept(delivery &d);
169+
void update(tracker &d, uint64_t state);
170+
void set_id(binary _id);
171+
172+
proton::tracker send_ctrl(proton::symbol descriptor, proton::value _value);
173+
void handle_outcome(proton::tracker t);
174+
transaction_impl(proton::sender &_txn_ctrl,
175+
proton::transaction_handler &_handler,
176+
bool _settle_before_discharge);
177+
178+
// delete copy and assignment operator to ensure no copy of this object is
179+
// every made transaction_impl(const transaction_impl&) = delete;
180+
// transaction_impl& operator=(const transaction_impl&) = delete;
181+
};
182+
127183
/// A range of sessions.
128184
typedef internal::iter_range<session_iterator> session_range;
129185

cpp/include/proton/transaction.hpp

Lines changed: 31 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -71,54 +71,54 @@ class transaction_impl;
7171
// // transaction_impl& operator=(const transaction_impl&) = delete;
7272
// };
7373

74-
class
75-
PN_CPP_CLASS_EXTERN transaction {
76-
private:
77-
// PN_CPP_EXTERN transaction(proton::sender& _txn_ctrl,
78-
// proton::transaction_handler& _handler, bool _settle_before_discharge);
74+
// class
75+
// PN_CPP_CLASS_EXTERN transaction {
76+
// private:
77+
// // PN_CPP_EXTERN transaction(proton::sender& _txn_ctrl,
78+
// // proton::transaction_handler& _handler, bool _settle_before_discharge);
7979

80-
static transaction mk_transaction_impl(sender &s, transaction_handler &h,
81-
bool f);
82-
PN_CPP_EXTERN transaction(transaction_impl *impl);
83-
transaction_impl *_impl;
80+
// static transaction mk_transaction_impl(sender &s, transaction_handler &h,
81+
// bool f);
82+
// PN_CPP_EXTERN transaction(transaction_impl *impl);
83+
// transaction_impl *_impl;
8484

85-
public:
86-
// TODO:
87-
// PN_CPP_EXTERN transaction(transaction &o);
88-
PN_CPP_EXTERN transaction();
89-
PN_CPP_EXTERN ~transaction();
90-
PN_CPP_EXTERN bool is_empty();
91-
PN_CPP_EXTERN void commit();
92-
PN_CPP_EXTERN void abort();
93-
PN_CPP_EXTERN void declare();
94-
PN_CPP_EXTERN void handle_outcome(proton::tracker);
95-
PN_CPP_EXTERN proton::tracker send(proton::sender s, proton::message msg);
96-
PN_CPP_EXTERN void accept(delivery &t);
97-
PN_CPP_EXTERN proton::connection connection() const;
98-
99-
friend class transaction_impl;
100-
friend class session;
101-
};
85+
// public:
86+
// // TODO:
87+
// // PN_CPP_EXTERN transaction(transaction &o);
88+
// PN_CPP_EXTERN transaction();
89+
// PN_CPP_EXTERN ~transaction();
90+
// PN_CPP_EXTERN bool is_empty();
91+
// PN_CPP_EXTERN void commit();
92+
// PN_CPP_EXTERN void abort();
93+
// PN_CPP_EXTERN void declare();
94+
// PN_CPP_EXTERN void handle_outcome(proton::tracker);
95+
// PN_CPP_EXTERN proton::tracker send(proton::sender s, proton::message msg);
96+
// PN_CPP_EXTERN void accept(delivery &t);
97+
// PN_CPP_EXTERN proton::connection connection() const;
98+
99+
// friend class transaction_impl;
100+
// friend class session;
101+
// };
102102

103103
class
104104
PN_CPP_CLASS_EXTERN transaction_handler {
105105
public:
106106
PN_CPP_EXTERN virtual ~transaction_handler();
107107

108108
/// Called when a local transaction is declared.
109-
PN_CPP_EXTERN virtual void on_transaction_declared(transaction);
109+
PN_CPP_EXTERN virtual void on_transaction_declared(session);
110110

111111
/// Called when a local transaction is discharged successfully.
112-
PN_CPP_EXTERN virtual void on_transaction_committed(transaction);
112+
PN_CPP_EXTERN virtual void on_transaction_committed(session);
113113

114114
/// Called when a local transaction is discharged unsuccessfully (aborted).
115-
PN_CPP_EXTERN virtual void on_transaction_aborted(transaction);
115+
PN_CPP_EXTERN virtual void on_transaction_aborted(session);
116116

117117
/// Called when a local transaction declare fails.
118-
PN_CPP_EXTERN virtual void on_transaction_declare_failed(transaction);
118+
PN_CPP_EXTERN virtual void on_transaction_declare_failed(session);
119119

120120
/// Called when the commit of a local transaction fails.
121-
PN_CPP_EXTERN virtual void on_transaction_commit_failed(transaction);
121+
PN_CPP_EXTERN virtual void on_transaction_commit_failed(session);
122122
};
123123

124124
} // namespace proton

cpp/include/proton/transfer.hpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -95,9 +95,9 @@ class transfer : public internal::object<pn_delivery_t> {
9595
PN_CPP_EXTERN bool settled() const;
9696

9797
// Set transaction
98-
PN_CPP_EXTERN void transaction(transaction t);
98+
// PN_CPP_EXTERN void transaction(transaction t);
9999

100-
PN_CPP_EXTERN class transaction transaction() const;
100+
// PN_CPP_EXTERN class transaction transaction() const;
101101

102102
/// Set user data on this transfer.
103103
PN_CPP_EXTERN void user_data(void* user_data) const;

cpp/src/contexts.hpp

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525
#include "reconnect_options_impl.hpp"
2626

2727
#include "proton/work_queue.hpp"
28-
#include "proton/transaction.hpp"
28+
#include "proton/session.hpp"
2929
#include "proton/message.hpp"
3030

3131
#include "proton/object.h"
@@ -42,7 +42,7 @@ namespace proton {
4242

4343
class proton_handler;
4444
class connector;
45-
class transaction;
45+
// class transaction;
4646

4747
namespace io {class link_namer;}
4848

@@ -153,7 +153,7 @@ class session_context : public context {
153153
public:
154154
session_context() : handler(0), user_data_(nullptr) {}
155155
static session_context& get(pn_session_t* s);
156-
156+
std::unique_ptr<transaction_impl> _txn_impl;
157157
messaging_handler* handler;
158158
void* user_data_;
159159
};
@@ -163,7 +163,7 @@ class transfer_context : public context {
163163
transfer_context() : user_data_(nullptr) {}
164164
static transfer_context& get(pn_delivery_t* s);
165165

166-
std::unique_ptr<transaction> transaction_;
166+
// std::unique_ptr<transaction> transaction_;
167167
void* user_data_;
168168
};
169169

0 commit comments

Comments
 (0)