Skip to content

Commit f1c9c71

Browse files
authoredFeb 13, 2025··
fix yamux callback (#294)
Signed-off-by: turuslan <turuslan.devbox@gmail.com>
1 parent fae0af7 commit f1c9c71

File tree

4 files changed

+15
-99
lines changed

4 files changed

+15
-99
lines changed
 

‎include/libp2p/muxer/yamux/yamux_stream.hpp

-3
Original file line numberDiff line numberDiff line change
@@ -147,9 +147,6 @@ namespace libp2p::connection {
147147
/// True if the stream is writable, until FIN sent
148148
bool is_writable_ = true;
149149

150-
/// If set to true, then no more callbacks to client
151-
bool no_more_callbacks_ = false;
152-
153150
/// True after FIN sent
154151
bool fin_sent_ = false;
155152

‎src/muxer/yamux/yamux_stream.cpp

+7-52
Original file line numberDiff line numberDiff line change
@@ -55,16 +55,7 @@ namespace libp2p::connection {
5555

5656
void YamuxStream::deferReadCallback(outcome::result<size_t> res,
5757
ReadCallbackFunc cb) {
58-
if (no_more_callbacks_) {
59-
log()->debug("{} closed by client, ignoring callback", stream_id_);
60-
return;
61-
}
62-
feedback_.deferCall([wptr = weak_from_this(), res, cb = std::move(cb)]() {
63-
auto self = wptr.lock();
64-
if (self && !self->no_more_callbacks_) {
65-
cb(res);
66-
}
67-
});
58+
feedback_.deferCall([res, cb{std::move(cb)}] { cb(res); });
6859
}
6960

7061
void YamuxStream::writeSome(BytesIn in, size_t bytes, WriteCallbackFunc cb) {
@@ -73,16 +64,7 @@ namespace libp2p::connection {
7364

7465
void YamuxStream::deferWriteCallback(std::error_code ec,
7566
WriteCallbackFunc cb) {
76-
if (no_more_callbacks_) {
77-
log()->debug("{} closed by client, ignoring callback", stream_id_);
78-
return;
79-
}
80-
feedback_.deferCall([wptr = weak_from_this(), ec, cb = std::move(cb)]() {
81-
auto self = wptr.lock();
82-
if (self && !self->no_more_callbacks_) {
83-
cb(ec);
84-
}
85-
});
67+
feedback_.deferCall([ec, cb{std::move(cb)}] { cb(ec); });
8668
}
8769

8870
bool YamuxStream::isClosed() const {
@@ -92,12 +74,8 @@ namespace libp2p::connection {
9274
void YamuxStream::close(VoidResultHandlerFunc cb) {
9375
if (isClosed()) {
9476
if (cb) {
95-
feedback_.deferCall([wptr{weak_from_this()}, cb{std::move(cb)}] {
96-
auto self = wptr.lock();
97-
if (self) {
98-
cb(*self->close_reason_);
99-
}
100-
});
77+
feedback_.deferCall(
78+
[cb{std::move(cb)}, ec{*close_reason_}] { cb(ec); });
10179
}
10280
return;
10381
}
@@ -137,7 +115,6 @@ namespace libp2p::connection {
137115
}
138116

139117
void YamuxStream::reset() {
140-
no_more_callbacks_ = true;
141118
feedback_.resetStream(stream_id_);
142119
doClose(Error::STREAM_RESET_BY_HOST, true);
143120
}
@@ -163,14 +140,7 @@ namespace libp2p::connection {
163140
}
164141

165142
if (cb) {
166-
feedback_.deferCall([wptr = weak_from_this(), cb = std::move(cb), ec]() {
167-
auto self = wptr.lock();
168-
if (!self) {
169-
return;
170-
}
171-
if (self->no_more_callbacks_) {
172-
return;
173-
}
143+
feedback_.deferCall([cb{std::move(cb)}, ec] {
174144
if (!ec) {
175145
cb(outcome::success());
176146
} else {
@@ -336,7 +306,7 @@ namespace libp2p::connection {
336306
return;
337307
}
338308

339-
if (result.cb && !no_more_callbacks_) {
309+
if (result.cb) {
340310
result.cb(result.size_to_ack);
341311
}
342312
}
@@ -373,37 +343,22 @@ namespace libp2p::connection {
373343
VoidResultHandlerFunc window_size_cb;
374344
window_size_cb.swap(window_size_cb_);
375345

376-
if (no_more_callbacks_) {
377-
return;
378-
}
379-
380346
// now we are detached from *this* and may be killed from inside callbacks
381347
// we will call
382-
auto wptr = weak_from_this();
348+
auto self = shared_from_this();
383349

384350
if (read_cb_and_res.first) {
385351
read_cb_and_res.first(read_cb_and_res.second);
386352
}
387353

388-
if (wptr.expired() || no_more_callbacks_) {
389-
return;
390-
}
391-
392354
for (const auto &cb : write_callbacks) {
393355
cb(ec);
394-
if (wptr.expired() || no_more_callbacks_) {
395-
return;
396-
}
397356
}
398357

399358
if (window_size_cb) {
400359
window_size_cb(ec);
401360
}
402361

403-
if (wptr.expired() || no_more_callbacks_) {
404-
return;
405-
}
406-
407362
if (close_cb_and_res.first) {
408363
close_cb_and_res.first(close_cb_and_res.second);
409364
}

‎src/muxer/yamux/yamuxed_connection.cpp

+1-6
Original file line numberDiff line numberDiff line change
@@ -644,7 +644,7 @@ namespace libp2p::connection {
644644
}
645645

646646
// this instance may be killed inside further callback
647-
auto wptr = weak_from_this();
647+
auto self = shared_from_this();
648648

649649
if (stream_id != 0) {
650650
// pass write ack to stream about data size written except header size
@@ -669,11 +669,6 @@ namespace libp2p::connection {
669669
}
670670
}
671671

672-
if (wptr.expired()) {
673-
// *this* no longer exists
674-
return;
675-
}
676-
677672
is_writing_ = false;
678673

679674
if (not write_queue_.empty()) {

‎src/transport/tcp/tcp_connection.cpp

+7-38
Original file line numberDiff line numberDiff line change
@@ -92,17 +92,14 @@ namespace libp2p::transport {
9292
template <typename Callback>
9393
auto closeOnError(TcpConnection &conn, Callback cb) {
9494
return [cb{std::move(cb)}, wptr{conn.weak_from_this()}](auto ec,
95-
auto result) {
96-
if (auto self = wptr.lock()) {
97-
if (ec) {
98-
cb(ec);
95+
size_t result) {
96+
if (ec) {
97+
cb(ec);
98+
if (auto self = wptr.lock()) {
9999
self->close(ec);
100-
return;
101100
}
102-
TRACE("{} {}", self->str(), result);
103-
cb(result);
104101
} else {
105-
log().debug("connection wptr expired");
102+
cb(result);
106103
}
107104
};
108105
}
@@ -203,42 +200,14 @@ namespace libp2p::transport {
203200
closeOnError(*this, std::move(cb)));
204201
}
205202

206-
namespace {
207-
template <typename Callback, typename Arg>
208-
void deferCallback(boost::asio::io_context &ctx,
209-
std::weak_ptr<TcpConnection> wptr,
210-
bool &closed_by_host,
211-
Callback cb,
212-
Arg arg) {
213-
// defers callback to the next event loop cycle,
214-
// cb will be called iff TcpConnection is still alive
215-
// and was not closed by host's side
216-
boost::asio::post(
217-
ctx,
218-
[wptr = std::move(wptr), cb = std::move(cb), arg, &closed_by_host]() {
219-
if (!wptr.expired() && !closed_by_host) {
220-
cb(arg);
221-
}
222-
});
223-
}
224-
} // namespace
225-
226203
void TcpConnection::deferReadCallback(outcome::result<size_t> res,
227204
ReadCallbackFunc cb) {
228-
deferCallback(context_,
229-
weak_from_this(),
230-
std::ref(closed_by_host_),
231-
std::move(cb),
232-
res);
205+
boost::asio::post(context_, [res, cb{std::move(cb)}] { cb(res); });
233206
}
234207

235208
void TcpConnection::deferWriteCallback(std::error_code ec,
236209
WriteCallbackFunc cb) {
237-
deferCallback(context_,
238-
weak_from_this(),
239-
std::ref(closed_by_host_),
240-
std::move(cb),
241-
ec);
210+
boost::asio::post(context_, [ec, cb{std::move(cb)}] { cb(ec); });
242211
}
243212

244213
outcome::result<void> TcpConnection::saveMultiaddresses() {

0 commit comments

Comments
 (0)
Please sign in to comment.