diff --git a/min-webrtc/abseil-cpp b/min-webrtc/abseil-cpp index f8fa267ec..2f9e432cc 160000 --- a/min-webrtc/abseil-cpp +++ b/min-webrtc/abseil-cpp @@ -1 +1 @@ -Subproject commit f8fa267ec230b8577507b3798ac234999b72f348 +Subproject commit 2f9e432cce407ce0ae50676696666f33a77d42ac diff --git a/min-webrtc/libsrtp b/min-webrtc/libsrtp index 5b7c744eb..17b2d544e 160000 --- a/min-webrtc/libsrtp +++ b/min-webrtc/libsrtp @@ -1 +1 @@ -Subproject commit 5b7c744eb8310250ccc534f3f86a2015b3887a0a +Subproject commit 17b2d544ecf4f5d4d338baa41aa7e60eaa6c3e98 diff --git a/min-webrtc/target.mk b/min-webrtc/target.mk index e8e61fb97..6663fc16a 100644 --- a/min-webrtc/target.mk +++ b/min-webrtc/target.mk @@ -19,12 +19,15 @@ cflags += -I$(pwd)/webrtc webrtc := +# XXX: wrapping_async_dns_resolver.cc should have been deleted in 24510d43dccc88a47e2cd139b2190e25d4c7b700 webrtc += $(filter-out \ %/create_peerconnection_factory.cc \ %/field_trials_registry.cc \ + %/wrapping_async_dns_resolver.cc \ ,$(wildcard $(pwd)/webrtc/api/*.cc)) webrtc += $(wildcard $(pwd)/webrtc/api/crypto/*.cc) +webrtc += $(wildcard $(pwd)/webrtc/api/task_queue/*.cc) webrtc += $(wildcard $(pwd)/webrtc/api/transport/*.cc) webrtc += $(wildcard $(pwd)/webrtc/api/transport/media/*.cc) webrtc += $(wildcard $(pwd)/webrtc/api/transport/rtp/*.cc) @@ -32,16 +35,16 @@ webrtc += $(wildcard $(pwd)/webrtc/api/units/*.cc) webrtc += $(pwd)/webrtc/api/field_trials_registry.cc +webrtc += $(pwd)/webrtc/api/audio_codecs/audio_codec_pair_id.cc webrtc += $(pwd)/webrtc/api/audio_codecs/audio_encoder.cc + webrtc += $(pwd)/webrtc/api/call/transport.cc +webrtc += $(pwd)/webrtc/api/environment/environment_factory.cc webrtc += $(pwd)/webrtc/api/numerics/samples_stats_counter.cc webrtc += $(pwd)/webrtc/api/rtc_event_log/rtc_event.cc webrtc += $(pwd)/webrtc/api/rtc_event_log/rtc_event_log.cc -webrtc += $(pwd)/webrtc/api/task_queue/pending_task_safety_flag.cc -webrtc += $(pwd)/webrtc/api/task_queue/task_queue_base.cc - webrtc += $(pwd)/webrtc/api/video/color_space.cc webrtc += $(pwd)/webrtc/api/video/encoded_image.cc webrtc += $(pwd)/webrtc/api/video/hdr_metadata.cc @@ -103,8 +106,6 @@ webrtc += $(wildcard $(pwd)/webrtc/modules/rtp_rtcp/source/deprecated/*.cc) webrtc += $(wildcard $(pwd)/webrtc/modules/rtp_rtcp/source/rtcp_packet/*.cc) webrtc += $(wildcard $(pwd)/webrtc/modules/utility/source/*.cc) -webrtc += $(pwd)/webrtc/modules/utility/maybe_worker_thread.cc - webrtc += $(pwd)/webrtc/modules/video_coding/chain_diff_calculator.cc webrtc += $(pwd)/webrtc/modules/video_coding/encoded_frame.cc webrtc += $(pwd)/webrtc/modules/video_coding/frame_dependencies_calculator.cc @@ -190,11 +191,11 @@ webrtc := $(filter-out $(pwd)/webrtc/rtc_base/system/%,$(webrtc)) source += $(pwd)/webrtc/rtc_base/system/file_wrapper.cc webrtc := $(filter-out $(pwd)/webrtc/rtc_base/boringssl_%.cc,$(webrtc)) -webrtc := $(filter-out $(pwd)/webrtc/rtc_base/%_gcd.cc,$(webrtc)) -webrtc := $(filter-out $(pwd)/webrtc/rtc_base/%_libevent.cc,$(webrtc)) +webrtc := $(filter-out $(pwd)/webrtc/%_gcd.cc,$(webrtc)) +webrtc := $(filter-out $(pwd)/webrtc/%_libevent.cc,$(webrtc)) +webrtc := $(filter-out $(pwd)/webrtc/%_libevent_experiment.cc,$(webrtc)) webrtc := $(filter-out $(pwd)/webrtc/rtc_base/mac_%.cc,$(webrtc)) -webrtc := $(filter-out $(pwd)/webrtc/rtc_base/%_stdlib.cc,$(webrtc)) -webrtc := $(filter-out $(pwd)/webrtc/rtc_base/%_win.cc,$(webrtc)) +webrtc := $(filter-out $(pwd)/webrtc/%_win.cc,$(webrtc)) webrtc := $(filter-out $(pwd)/webrtc/rtc_base/win/%.cc,$(webrtc)) webrtc := $(filter-out $(pwd)/webrtc/rtc_base/win32%.cc,$(webrtc)) diff --git a/min-webrtc/webrtc b/min-webrtc/webrtc index b459deaf3..c8068f68f 160000 --- a/min-webrtc/webrtc +++ b/min-webrtc/webrtc @@ -1 +1 @@ -Subproject commit b459deaf380fb95ad0275dbe9e92cb7a8b4b99e5 +Subproject commit c8068f68f2de58e61b67c5b37c7a621642daf134 diff --git a/p2p/lwip.sh b/p2p/lwip.sh new file mode 100755 index 000000000..e5cf05244 --- /dev/null +++ b/p2p/lwip.sh @@ -0,0 +1,66 @@ +#!/bin/bash +set -e + +function fix() { + sed -e ' + s/PhysicalSocket/LwipSocket/g; + s/PHYSICAL/LOGICAL/g; + + s/\/orc/g; + s/orc::/rtc::/g; + + s/= {}/= {0}/g; + s/{}/= default;/g; + + s/#include /#include /g; + s/#include "rtc_base\/physical_socket_server\.h"/#include "logical_.hpp"/g; + + /#include /-1/g; + ' +} + +{ + diff -ru source/lwip.hpp <(fix <../min-webrtc/webrtc/rtc_base/physical_socket_server.h) | colordiff + diff -ru source/lwip.cpp <(fix <../min-webrtc/webrtc/rtc_base/physical_socket_server.cc) | colordiff +} | nl | less -R + +# e077ee472a6a14fb78aa59faa4549eea3a227958 <- remove IP_MTU + +#/GetSocketRecvTimestamp(.*{/,/^}/s/^ .*/ return -1;/; diff --git a/p2p/source/channel.cpp b/p2p/source/channel.cpp index 6f2907989..d618b533e 100644 --- a/p2p/source/channel.cpp +++ b/p2p/source/channel.cpp @@ -27,7 +27,6 @@ #include "channel.hpp" #include "peer.hpp" -#include "pirate.hpp" #include "tube.hpp" namespace orc { @@ -161,23 +160,6 @@ struct P { } }; -struct SctpDataChannel$SendDataMessage { typedef bool (webrtc::SctpDataChannel::*type)(const webrtc::DataBuffer &, bool); }; -template struct Pirate; - -struct SctpDataChannel$controller_ { typedef webrtc::SctpDataChannelControllerInterface *const (webrtc::SctpDataChannel::*type); }; -template struct Pirate; - -struct DataChannelController$DataChannelSendData { typedef bool (webrtc::DataChannelController::*type)(int, const webrtc::SendDataParams &, const rtc::CopyOnWriteBuffer &, cricket::SendDataResult *); }; -template struct Pirate; - -struct DataChannelController$network_thread { typedef rtc::Thread *(webrtc::DataChannelController::*type)() const; }; -template struct Pirate; - -#if 0 -struct SctpDataChannelTransport$sctp_transport_ { typedef cricket::SctpTransportInternal *const (webrtc::SctpDataChannelTransport::*type); }; -template struct Pirate; -#endif - task Channel::Send(const Buffer &data) { Trace("WebRTC", true, false, data); @@ -185,71 +167,11 @@ task Channel::Send(const Buffer &data) { rtc::CopyOnWriteBuffer buffer(size); data.copy(buffer.MutableData(), size); -#if 1 - static const webrtc::SendDataParams params([]() { - webrtc::SendDataParams params; - params.type = webrtc::DataMessageType::kBinary; - params.ordered = false; - params.max_rtx_count = 0; - return params; - }()); -#endif - -#if 0 - co_await Post([&]() { - orc_assert(channel_ != nullptr); -#if 0 - if (channel_->buffered_amount() == 0) - channel_->Send({buffer, true}); -#else - const auto sctp(reinterpret_cast(channel_.get() + 1)[1]); -#if 0 - sctp->Send({buffer, true}); -#else - if (sctp->state() != webrtc::DataChannelInterface::kOpen) - return; -#if 0 - if (!(sctp->*Loot::pointer)({buffer, true}, false)) - return; -#else - const auto provider(sctp->*Loot::pointer); - cricket::SendDataResult result; -#if 0 - provider->SendData(sctp->id(), params, buffer, &result); -#else - // NOLINTNEXTLINE(cppcoreguidelines-pro-type-static-cast-downcast) - const auto controller(static_cast(provider)); - (controller->*Loot::pointer)(sctp->id(), params, buffer, &result); -#endif -#endif -#endif -#endif - }); -#else - // XXX: is this safe? orc_assert(channel_ != nullptr); + if (channel_->buffered_amount() == 0) + channel_->Send({buffer, true}); - const auto sctp(reinterpret_cast(channel_.get() + 1)[1]); - const auto provider(sctp->*Loot::pointer); - // NOLINTNEXTLINE(cppcoreguidelines-pro-type-static-cast-downcast) - const auto controller(static_cast(provider)); - - co_await Post([&]() { - const auto interface(controller->data_channel_transport()); - if (!interface->IsReadyToSend()) { - orc_trace(); - return; - } -#if 1 // XXX - interface->SendData(sctp->id(), params, buffer); -#else - // NOLINTNEXTLINE(cppcoreguidelines-pro-type-static-cast-downcast) - const auto transport(static_cast(interface)); - cricket::SendDataResult result; - (transport->*Loot::pointer)->SendData(sctp->id(), params, buffer, &result); -#endif - }, *(controller->*Loot::pointer)()); -#endif + co_return; } task Description(const S &base, std::vector ice) { diff --git a/p2p/source/lwip.cpp b/p2p/source/lwip.cpp index e10806f01..914c4b917 100644 --- a/p2p/source/lwip.cpp +++ b/p2p/source/lwip.cpp @@ -9,6 +9,9 @@ */ #include "logical_.hpp" +#include +#include + #if defined(_MSC_VER) && _MSC_VER < 1300 #pragma warning(disable : 4786) #endif @@ -19,10 +22,11 @@ #if 1 #include -#include #if 0 // "poll" will be used to wait for the signal dispatcher. #include +#elif 0 +#include #endif #include #endif @@ -31,20 +35,18 @@ #include #include #include + #undef SetPort #endif #include -#include -#include - -#include "rtc_base/arraysize.h" -#include "rtc_base/byte_order.h" +#include "rtc_base/async_dns_resolver.h" #include "rtc_base/checks.h" +#include "rtc_base/event.h" +#include "rtc_base/ip_address.h" #include "rtc_base/logging.h" #include "rtc_base/network_monitor.h" -#include "rtc_base/null_socket_server.h" #include "rtc_base/synchronization/mutex.h" #include "rtc_base/time_utils.h" #include "system_wrappers/include/field_trial.h" @@ -67,10 +69,14 @@ typedef void* SockOptArg; #endif // WEBRTC_POSIX +#if 1 && !0 && !0 + int64_t GetSocketRecvTimestamp_(int socket) { return -1; } +#endif + #if 0 typedef char* SockOptArg; #endif @@ -98,10 +104,10 @@ class ScopedSetTrue { bool* value_; }; -// Returns true if the the client is in the experiment to get timestamps -// from the socket implementation. -bool IsScmTimeStampExperimentEnabled() { - return webrtc::field_trial::IsEnabled("WebRTC-SCM-Timestamp"); +// Returns true if the experiement "WebRTC-SCM-Timestamp" is explicitly +// disabled. +bool IsScmTimeStampExperimentDisabled() { + return webrtc::field_trial::IsDisabled("WebRTC-SCM-Timestamp"); } } // namespace @@ -113,7 +119,7 @@ LwipSocket::LwipSocket(LwipSocketServer* ss, SOCKET s) error_(0), state_((s == INVALID_SOCKET) ? CS_CLOSED : CS_CONNECTED), resolver_(nullptr), - read_scm_timestamp_experiment_(IsScmTimeStampExperimentEnabled()) { + read_scm_timestamp_experiment_(!IsScmTimeStampExperimentDisabled()) { if (s_ != INVALID_SOCKET) { SetEnabledEvents(DE_READ | DE_WRITE); @@ -230,9 +236,8 @@ int LwipSocket::Connect(const SocketAddress& addr) { } if (addr.IsUnresolvedIP()) { RTC_LOG(LS_VERBOSE) << "Resolving addr in LwipSocket::Connect"; - resolver_ = new AsyncResolver(); - resolver_->SignalDone.connect(this, &LwipSocket::OnResolveResult); - resolver_->Start(addr); + resolver_ = std::make_unique(); + resolver_->Start(addr, [this] { OnResolveResult(resolver_->result()); }); state_ = CS_CONNECTING; return 0; } @@ -287,6 +292,16 @@ int LwipSocket::GetOption(Option opt, int* value) { if (ret == -1) { return -1; } + if (opt == OPT_DONTFRAGMENT) { +#if 0 && !0 + *value = (*value != IP_PMTUDISC_DONT) ? 1 : 0; +#endif + } else if (opt == OPT_DSCP) { +#if 1 + // unshift DSCP value to get six most significant bits of IP DiffServ field + *value >>= 2; +#endif + } return ret; } @@ -295,7 +310,17 @@ int LwipSocket::SetOption(Option opt, int value) { int sopt; if (TranslateOption(opt, &slevel, &sopt) == -1) return -1; -#if 0 + if (opt == OPT_DONTFRAGMENT) { +#if 0 && !0 + value = (value) ? IP_PMTUDISC_DO : IP_PMTUDISC_DONT; +#endif + } else if (opt == OPT_DSCP) { +#if 1 + // shift DSCP value to fit six most significant bits of IP DiffServ field + value <<= 2; +#endif + } +#if 1 if (sopt == IPV6_TCLASS) { // Set the IPv4 option in all cases to support dual-stack sockets. // Don't bother checking the return code, as this is expected to fail if @@ -393,6 +418,31 @@ int LwipSocket::RecvFrom(void* buffer, SocketAddress* out_addr, int64_t* timestamp) { int received = DoReadFromSocket(buffer, length, out_addr, timestamp); + + UpdateLastError(); + int error = GetError(); + bool success = (received >= 0) || IsBlockingError(error); + if (udp_ || success) { + EnableEvents(DE_READ); + } + if (!success) { + RTC_LOG_F(LS_VERBOSE) << "Error = " << error; + } + return received; +} + +int LwipSocket::RecvFrom(ReceiveBuffer& buffer) { + int64_t timestamp = -1; + static constexpr int BUF_SIZE = 64 * 1024; + buffer.payload.EnsureCapacity(BUF_SIZE); + + int received = + DoReadFromSocket(buffer.payload.data(), buffer.payload.capacity(), + &buffer.source_address, ×tamp); + buffer.payload.SetSize(received > 0 ? received : 0); + if (received > 0 && timestamp != -1) { + buffer.arrival_time = webrtc::Timestamp::Micros(timestamp); + } UpdateLastError(); int error = GetError(); bool success = (received >= 0) || IsBlockingError(error); @@ -423,7 +473,7 @@ int LwipSocket::DoReadFromSocket(void* buffer, msg.msg_name = addr; msg.msg_namelen = addr_len; } - char control[CMSG_SPACE(sizeof(struct timeval))] = {}; + char control[CMSG_SPACE(sizeof(struct timeval))] = {0}; if (timestamp) { *timestamp = -1; msg.msg_control = &control; @@ -522,8 +572,7 @@ int LwipSocket::Close() { state_ = CS_CLOSED; SetEnabledEvents(0); if (resolver_) { - resolver_->Destroy(false); - resolver_ = nullptr; + resolver_.reset(); } return err; } @@ -547,14 +596,16 @@ int LwipSocket::DoSendTo(SOCKET socket, return ::lwip_sendto(socket, buf, len, flags, dest_addr, addrlen); } -void LwipSocket::OnResolveResult(AsyncResolverInterface* resolver) { - if (resolver != resolver_) { - return; - } - - int error = resolver_->GetError(); +void LwipSocket::OnResolveResult( + const webrtc::AsyncDnsResolverResult& result) { + int error = result.GetError(); if (error == 0) { - error = DoConnect(resolver_->address()); + SocketAddress address; + if (result.GetResolvedAddress(AF_INET, &address)) { + error = DoConnect(address); + } else { + Close(); + } } else { Close(); } @@ -678,7 +729,7 @@ bool SocketDispatcher::Initialize() { ioctlsocket(s_, FIONBIO, &argp); #elif 1 lwip_fcntl(s_, F_SETFL, lwip_fcntl(s_, F_GETFL, 0) | O_NONBLOCK); - if (IsScmTimeStampExperimentEnabled()) { + if (!IsScmTimeStampExperimentDisabled()) { int value = 1; // Attempt to get receive packet timestamp from the socket. if (::lwip_setsockopt(s_, SOL_SOCKET, -1, &value, sizeof(value)) != @@ -978,6 +1029,7 @@ int SocketDispatcher::Close() { } #if 1 +// Sets the value of a boolean value to false when signaled. class Signaler : public Dispatcher { public: Signaler(LwipSocketServer* ss, bool& flag_to_clear) @@ -1141,7 +1193,7 @@ LwipSocketServer::~LwipSocketServer() { delete signal_wakeup_; #if 0 if (epoll_fd_ != INVALID_SOCKET) { - close(epoll_fd_); + lwip_close(epoll_fd_); } #endif RTC_DCHECK(dispatcher_by_key_.empty()); @@ -1232,22 +1284,27 @@ int LwipSocketServer::ToCmsWait(webrtc::TimeDelta max_wait_duration) { #if 1 bool LwipSocketServer::Wait(webrtc::TimeDelta max_wait_duration, - bool process_io) { + bool process_io) { // We don't support reentrant waiting. RTC_DCHECK(!waiting_); ScopedSetTrue s(&waiting_); const int cmsWait = ToCmsWait(max_wait_duration); + +#if 0 + return WaitPoll(cmsWait, process_io); +#else #if 0 // We don't keep a dedicated "epoll" descriptor containing only the non-IO // (i.e. signaling) dispatcher, so "poll" will be used instead of the default // "select" to support sockets larger than FD_SETSIZE. if (!process_io) { - return WaitPoll(cmsWait, signal_wakeup_); + return WaitPollOneDispatcher(cmsWait, signal_wakeup_); } else if (epoll_fd_ != INVALID_SOCKET) { return WaitEpoll(cmsWait); } #endif return WaitSelect(cmsWait, process_io); +#endif } // `error_event` is true if we are responding to an event where we know an @@ -1317,6 +1374,34 @@ static void ProcessEvents(Dispatcher* dispatcher, } } +#if 0 || 0 +static void ProcessPollEvents(Dispatcher* dispatcher, const pollfd& pfd) { + bool readable = (pfd.revents & (POLLIN | POLLPRI)); + bool writable = (pfd.revents & POLLOUT); + bool error = (pfd.revents & (POLLRDHUP | POLLERR | POLLHUP)); + + ProcessEvents(dispatcher, readable, writable, error, error); +} + +static pollfd DispatcherToPollfd(Dispatcher* dispatcher) { + pollfd fd{ + .fd = dispatcher->GetDescriptor(), + .events = 0, + .revents = 0, + }; + + uint32_t ff = dispatcher->GetRequestedEvents(); + if (ff & (DE_READ | DE_ACCEPT)) { + fd.events |= POLLIN; + } + if (ff & (DE_WRITE | DE_CONNECT)) { + fd.events |= POLLOUT; + } + + return fd; +} +#endif // WEBRTC_USE_POLL || WEBRTC_USE_EPOLL + bool LwipSocketServer::WaitSelect(int cmsWait, bool process_io) { // Calculate timing information @@ -1333,7 +1418,6 @@ bool LwipSocketServer::WaitSelect(int cmsWait, bool process_io) { stop_us = rtc::TimeMicros() + cmsWait * 1000; } - fd_set fdsRead; fd_set fdsWrite; // Explicitly unpoison these FDs on MemorySanitizer which doesn't handle the @@ -1359,7 +1443,6 @@ bool LwipSocketServer::WaitSelect(int cmsWait, bool process_io) { for (auto const& kv : dispatcher_by_key_) { uint64_t key = kv.first; Dispatcher* pdispatcher = kv.second; - // Query dispatchers for read and write wait state if (!process_io && (pdispatcher != signal_wakeup_)) continue; current_dispatcher_keys_.push_back(key); @@ -1400,9 +1483,9 @@ bool LwipSocketServer::WaitSelect(int cmsWait, bool process_io) { } else { // We have signaled descriptors CritScope cr(&crit_); - // Iterate only on the dispatchers whose sockets were passed into - // WSAEventSelect; this avoids the ABA problem (a socket being - // destroyed and a new one created with the same file descriptor). + // Iterate only on the dispatchers whose file descriptors were passed into + // select; this avoids the ABA problem (a socket being destroyed and a new + // one created with the same file descriptor). for (uint64_t key : current_dispatcher_keys_) { if (!dispatcher_by_key_.count(key)) continue; @@ -1519,11 +1602,11 @@ void LwipSocketServer::UpdateEpoll(Dispatcher* pdispatcher, uint64_t key) { bool LwipSocketServer::WaitEpoll(int cmsWait) { RTC_DCHECK(epoll_fd_ != INVALID_SOCKET); - int64_t tvWait = -1; - int64_t tvStop = -1; + int64_t msWait = -1; + int64_t msStop = -1; if (cmsWait != kForeverMs) { - tvWait = cmsWait; - tvStop = TimeAfter(cmsWait); + msWait = cmsWait; + msStop = TimeAfter(cmsWait); } fWait_ = true; @@ -1533,7 +1616,7 @@ bool LwipSocketServer::WaitEpoll(int cmsWait) { // 0 means timeout // > 0 means count of descriptors ready int n = epoll_wait(epoll_fd_, epoll_events_.data(), epoll_events_.size(), - static_cast(tvWait)); + static_cast(msWait)); if (n < 0) { if (errno != EINTR) { RTC_LOG_E(LS_ERROR, EN, errno) << "epoll"; @@ -1566,9 +1649,9 @@ bool LwipSocketServer::WaitEpoll(int cmsWait) { } } - if (cmsWait != kForeverMS) { - tvWait = TimeDiff(tvStop, TimeMillis()); - if (tvWait <= 0) { + if (cmsWait != kForeverMs) { + msWait = TimeDiff(msStop, TimeMillis()); + if (msWait <= 0) { // Return success on timeout. return true; } @@ -1578,37 +1661,27 @@ bool LwipSocketServer::WaitEpoll(int cmsWait) { return true; } -bool LwipSocketServer::WaitPoll(int cmsWait, Dispatcher* dispatcher) { +bool LwipSocketServer::WaitPollOneDispatcher(int cmsWait, + Dispatcher* dispatcher) { RTC_DCHECK(dispatcher); - int64_t tvWait = -1; - int64_t tvStop = -1; + int64_t msWait = -1; + int64_t msStop = -1; if (cmsWait != kForeverMs) { - tvWait = cmsWait; - tvStop = TimeAfter(cmsWait); + msWait = cmsWait; + msStop = TimeAfter(cmsWait); } fWait_ = true; - - struct pollfd fds = {0}; - int fd = dispatcher->GetDescriptor(); - fds.fd = fd; + const int fd = dispatcher->GetDescriptor(); while (fWait_) { - uint32_t ff = dispatcher->GetRequestedEvents(); - fds.events = 0; - if (ff & (DE_READ | DE_ACCEPT)) { - fds.events |= POLLIN; - } - if (ff & (DE_WRITE | DE_CONNECT)) { - fds.events |= POLLOUT; - } - fds.revents = 0; + auto fds = DispatcherToPollfd(dispatcher); // Wait then call handlers as appropriate // < 0 means error // 0 means timeout // > 0 means count of descriptors ready - int n = poll(&fds, 1, static_cast(tvWait)); + int n = poll(&fds, 1, static_cast(msWait)); if (n < 0) { if (errno != EINTR) { RTC_LOG_E(LS_ERROR, EN, errno) << "poll"; @@ -1625,17 +1698,85 @@ bool LwipSocketServer::WaitPoll(int cmsWait, Dispatcher* dispatcher) { // We have signaled descriptors (should only be the passed dispatcher). RTC_DCHECK_EQ(n, 1); RTC_DCHECK_EQ(fds.fd, fd); + ProcessPollEvents(dispatcher, fds); + } + + if (cmsWait != kForeverMs) { + msWait = TimeDiff(msStop, TimeMillis()); + if (msWait < 0) { + // Return success on timeout. + return true; + } + } + } + + return true; +} - bool readable = (fds.revents & (POLLIN | POLLPRI)); - bool writable = (fds.revents & POLLOUT); - bool error = (fds.revents & (POLLRDHUP | POLLERR | POLLHUP)); +#elif 0 - ProcessEvents(dispatcher, readable, writable, error, error); +bool LwipSocketServer::WaitPoll(int cmsWait, bool process_io) { + int64_t msWait = -1; + int64_t msStop = -1; + if (cmsWait != kForeverMs) { + msWait = cmsWait; + msStop = TimeAfter(cmsWait); + } + + std::vector pollfds; + fWait_ = true; + + while (fWait_) { + { + CritScope cr(&crit_); + current_dispatcher_keys_.clear(); + pollfds.clear(); + pollfds.reserve(dispatcher_by_key_.size()); + + for (auto const& kv : dispatcher_by_key_) { + uint64_t key = kv.first; + Dispatcher* pdispatcher = kv.second; + if (!process_io && (pdispatcher != signal_wakeup_)) + continue; + current_dispatcher_keys_.push_back(key); + pollfds.push_back(DispatcherToPollfd(pdispatcher)); + } + } + + // Wait then call handlers as appropriate + // < 0 means error + // 0 means timeout + // > 0 means count of descriptors ready + int n = poll(pollfds.data(), pollfds.size(), static_cast(msWait)); + if (n < 0) { + if (errno != EINTR) { + RTC_LOG_E(LS_ERROR, EN, errno) << "poll"; + return false; + } + // Else ignore the error and keep going. If this EINTR was for one of the + // signals managed by this LwipSocketServer, the + // PosixSignalDeliveryDispatcher will be in the signaled state in the next + // iteration. + } else if (n == 0) { + // If timeout, return success + return true; + } else { + // We have signaled descriptors + CritScope cr(&crit_); + // Iterate only on the dispatchers whose file descriptors were passed into + // poll; this avoids the ABA problem (a socket being destroyed and a new + // one created with the same file descriptor). + for (size_t i = 0; i < current_dispatcher_keys_.size(); ++i) { + uint64_t key = current_dispatcher_keys_[i]; + if (!dispatcher_by_key_.count(key)) + continue; + ProcessPollEvents(dispatcher_by_key_.at(key), pollfds[i]); + } } if (cmsWait != kForeverMs) { - tvWait = TimeDiff(tvStop, TimeMillis()); - if (tvWait < 0) { + msWait = TimeDiff(msStop, TimeMillis()); + if (msWait < 0) { // Return success on timeout. return true; } @@ -1645,13 +1786,13 @@ bool LwipSocketServer::WaitPoll(int cmsWait, Dispatcher* dispatcher) { return true; } -#endif // WEBRTC_USE_EPOLL +#endif // WEBRTC_USE_EPOLL, WEBRTC_USE_POLL #endif // WEBRTC_POSIX #if 0 bool LwipSocketServer::Wait(webrtc::TimeDelta max_wait_duration, - bool process_io) { + bool process_io) { // We don't support reentrant waiting. RTC_DCHECK(!waiting_); ScopedSetTrue s(&waiting_); diff --git a/p2p/source/lwip.hpp b/p2p/source/lwip.hpp index 0cea9a528..317dad024 100644 --- a/p2p/source/lwip.hpp +++ b/p2p/source/lwip.hpp @@ -11,13 +11,35 @@ #ifndef RTC_BASE_LOGICAL_SOCKET_SERVER_H_ #define RTC_BASE_LOGICAL_SOCKET_SERVER_H_ +#include "api/async_dns_resolver.h" +#include "api/units/time_delta.h" +#include "rtc_base/socket.h" +#include "rtc_base/socket_address.h" +#include "rtc_base/third_party/sigslot/sigslot.h" + +#if 1 +#if 0 +// On Linux, use epoll. + +#define WEBRTC_USE_EPOLL 1 +#elif 0 +// Fuchsia implements select and poll but not epoll, and testing shows that poll +// is faster than select. +#include + +#define WEBRTC_USE_POLL 1 +#else +// On other POSIX systems, use select by default. +#endif // WEBRTC_LINUX, WEBRTC_FUCHSIA +#endif // WEBRTC_POSIX + #include +#include #include +#include #include #include -#include "rtc_base/async_resolver.h" -#include "rtc_base/async_resolver_interface.h" #include "rtc_base/deprecated/recursive_critical_section.h" #include "rtc_base/socket_server.h" #include "rtc_base/synchronization/mutex.h" @@ -51,7 +73,7 @@ class Dispatcher { virtual void OnEvent(uint32_t ff, int err) = 0; #if 0 virtual WSAEVENT GetWSAEvent() = 0; - virtual SOCKET_ GetSocket() = 0; + virtual SOCKET GetSocket() = 0; virtual bool CheckSignalClose() = 0; #elif 1 virtual int GetDescriptor() = 0; @@ -86,15 +108,16 @@ class RTC_EXPORT LwipSocketServer : public SocketServer { static constexpr int kForeverMs = -1; static int ToCmsWait(webrtc::TimeDelta max_wait_duration); + #if 1 bool WaitSelect(int cmsWait, bool process_io); -#endif // WEBRTC_POSIX + #if 0 void AddEpoll(Dispatcher* dispatcher, uint64_t key); void RemoveEpoll(Dispatcher* dispatcher); void UpdateEpoll(Dispatcher* dispatcher, uint64_t key); bool WaitEpoll(int cmsWait); - bool WaitPoll(int cmsWait, Dispatcher* dispatcher); + bool WaitPollOneDispatcher(int cmsWait, Dispatcher* dispatcher); // This array is accessed in isolation by a thread calling into Wait(). // It's useless to use a SequenceChecker to guard it because a socket @@ -102,7 +125,16 @@ class RTC_EXPORT LwipSocketServer : public SocketServer { // to have to reset the sequence checker on Wait calls. std::array epoll_events_; const int epoll_fd_ = INVALID_SOCKET; -#endif // WEBRTC_USE_EPOLL + +#elif 0 + void AddPoll(Dispatcher* dispatcher, uint64_t key); + void RemovePoll(Dispatcher* dispatcher); + void UpdatePoll(Dispatcher* dispatcher, uint64_t key); + bool WaitPoll(int cmsWait, bool process_io); + +#endif // WEBRTC_USE_EPOLL, WEBRTC_USE_POLL +#endif // WEBRTC_POSIX + // uint64_t keys are used to uniquely identify a dispatcher in order to avoid // the ABA problem during the epoll loop (a dispatcher being destroyed and // replaced by one with the same address). @@ -113,9 +145,9 @@ class RTC_EXPORT LwipSocketServer : public SocketServer { std::unordered_map key_by_dispatcher_ RTC_GUARDED_BY(crit_); // A list of dispatcher keys that we're interested in for the current - // lwip_select() or WSAWaitForMultipleEvents() loop. Again, used to avoid the ABA - // problem (a socket being destroyed and a new one created with the same - // handle, erroneously receiving the events from the destroyed socket). + // lwip_select(), poll(), or WSAWaitForMultipleEvents() loop. Again, used to avoid + // the ABA problem (a socket being destroyed and a new one created with the + // same handle, erroneously receiving the events from the destroyed socket). // // Kept as a member variable just for efficiency. std::vector current_dispatcher_keys_; @@ -158,10 +190,12 @@ class LwipSocket : public Socket, public sigslot::has_slots<> { const SocketAddress& addr) override; int Recv(void* buffer, size_t length, int64_t* timestamp) override; + // TODO(webrtc:15368): Deprecate and remove. int RecvFrom(void* buffer, size_t length, SocketAddress* out_addr, int64_t* timestamp) override; + int RecvFrom(ReceiveBuffer& buffer) override; int Listen(int backlog) override; Socket* Accept(SocketAddress* out_addr) override; @@ -170,6 +204,8 @@ class LwipSocket : public Socket, public sigslot::has_slots<> { SocketServer* socketserver() { return ss_; } + SOCKET GetSocketFD() const { return s_; } + protected: int DoConnect(const SocketAddress& connect_addr); @@ -192,7 +228,7 @@ class LwipSocket : public Socket, public sigslot::has_slots<> { SocketAddress* out_addr, int64_t* timestamp); - void OnResolveResult(AsyncResolverInterface* resolver); + void OnResolveResult(const webrtc::AsyncDnsResolverResult& resolver); void UpdateLastError(); void MaybeRemapSendError(); @@ -211,7 +247,7 @@ class LwipSocket : public Socket, public sigslot::has_slots<> { mutable webrtc::Mutex mutex_; int error_ RTC_GUARDED_BY(mutex_); ConnState state_; - AsyncResolver* resolver_; + std::unique_ptr resolver_; #if !defined(NDEBUG) std::string dbg_addr_; @@ -235,7 +271,7 @@ class SocketDispatcher : public Dispatcher, public LwipSocket { #if 0 WSAEVENT GetWSAEvent() override; - SOCKET_ GetSocket() override; + SOCKET GetSocket() override; bool CheckSignalClose() override; #elif 1 int GetDescriptor() override; diff --git a/p2p/source/remote.cpp b/p2p/source/remote.cpp index 44334d3e9..2d119348f 100644 --- a/p2p/source/remote.cpp +++ b/p2p/source/remote.cpp @@ -34,6 +34,7 @@ #include #include +#include #include "dns.hpp" #include "event.hpp" diff --git a/srv-daemon/source/main.cpp b/srv-daemon/source/main.cpp index 8d5492771..12b56ef21 100644 --- a/srv-daemon/source/main.cpp +++ b/srv-daemon/source/main.cpp @@ -100,7 +100,7 @@ task Symmetric(const S &base) { const auto ice(ices->at(i)); orc_assert(ice != nullptr); const auto &candidate(ice->candidate()); - if (candidate.type() != "stun") + if (!candidate.is_stun()) continue; if (!reflexive.emplace(candidate.related_address(), candidate.address()).second) co_return true;