Skip to content

Commit

Permalink
Add locking around sockets, since QUdpSocket isn't thread safe enough.
Browse files Browse the repository at this point in the history
This fixes overte-org#1119
  • Loading branch information
daleglass committed Aug 27, 2024
1 parent eb168e4 commit 964dbbb
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 12 deletions.
55 changes: 43 additions & 12 deletions libraries/networking/src/udt/Socket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,10 @@ Socket::Socket(QObject* parent, bool shouldChangeSocketOptions) :
}

void Socket::bind(SocketType socketType, const QHostAddress& address, quint16 port) {
_networkSocket.bind(socketType, address, port);
{
Guard socketLock(_socketMutex);
_networkSocket.bind(socketType, address, port);
}

if (_shouldChangeSocketOptions) {
setSystemBufferSizes(socketType);
Expand All @@ -68,36 +71,48 @@ void Socket::bind(SocketType socketType, const QHostAddress& address, quint16 po
}

#if defined(Q_OS_LINUX)
auto sd = _networkSocket.socketDescriptor(socketType);
int val = IP_PMTUDISC_DONT;
setsockopt(sd, IPPROTO_IP, IP_MTU_DISCOVER, &val, sizeof(val));
{
Guard socketLock(_socketMutex);
auto sd = _networkSocket.socketDescriptor(socketType);
int val = IP_PMTUDISC_DONT;
setsockopt(sd, IPPROTO_IP, IP_MTU_DISCOVER, &val, sizeof(val));
}
#elif defined(Q_OS_WIN)
auto sd = _networkSocket.socketDescriptor(socketType);
int val = 0; // false
if (setsockopt(sd, IPPROTO_IP, IP_DONTFRAGMENT, (const char *)&val, sizeof(val))) {
auto wsaErr = WSAGetLastError();
qCWarning(networking) << "Socket::bind Cannot setsockopt IP_DONTFRAGMENT" << wsaErr;
{
Guard socketLock(_socketMutex);

auto sd = _networkSocket.socketDescriptor(socketType);
int val = 0; // false
if (setsockopt(sd, IPPROTO_IP, IP_DONTFRAGMENT, (const char *)&val, sizeof(val))) {
auto wsaErr = WSAGetLastError();
qCWarning(networking) << "Socket::bind Cannot setsockopt IP_DONTFRAGMENT" << wsaErr;
}
}
#endif
}
}

void Socket::rebind(SocketType socketType) {
Guard socketLock(_socketMutex);
rebind(socketType, _networkSocket.localPort(socketType));
}

void Socket::rebind(SocketType socketType, quint16 localPort) {
Guard socketLock(_socketMutex);
_networkSocket.abort(socketType);
bind(socketType, QHostAddress::AnyIPv4, localPort);
}

#if defined(WEBRTC_DATA_CHANNELS)
const WebRTCSocket* Socket::getWebRTCSocket() {
Guard socketLock(_socketMutex);
return _networkSocket.getWebRTCSocket();
}
#endif

void Socket::setSystemBufferSizes(SocketType socketType) {
Guard socketLock(_socketMutex);

for (int i = 0; i < 2; i++) {
QAbstractSocket::SocketOption bufferOpt;
QString bufferTypeString;
Expand Down Expand Up @@ -245,6 +260,8 @@ qint64 Socket::writeDatagram(const char* data, qint64 size, const SockAddr& sock
}

qint64 Socket::writeDatagram(const QByteArray& datagram, const SockAddr& sockAddr) {
Guard socketLock(_socketMutex);

auto socketType = sockAddr.getType();

// don't attempt to write the datagram if we're unbound. Just drop it.
Expand Down Expand Up @@ -355,6 +372,8 @@ void Socket::messageFailed(Connection* connection, Packet::MessageNumber message
}

void Socket::checkForReadyReadBackup() {
Guard socketLock(_socketMutex);

if (_networkSocket.hasPendingDatagrams()) {
qCDebug(networking) << "Socket::checkForReadyReadBackup() detected blocked readyRead signal. Flushing pending datagrams.";

Expand All @@ -377,14 +396,20 @@ void Socket::checkForReadyReadBackup() {
}
}

// This is here just so that we can do locking in the while below.
bool Socket::hasPending(int &packetSizeWithHeader) {
Guard socketLock(_socketMutex);
return _networkSocket.hasPendingDatagrams() &&
(packetSizeWithHeader = _networkSocket.pendingDatagramSize()) != -1;
}

void Socket::readPendingDatagrams() {
using namespace std::chrono;
static const auto MAX_PROCESS_TIME { 100ms };
const auto abortTime = system_clock::now() + MAX_PROCESS_TIME;
int packetSizeWithHeader = -1;

while (_networkSocket.hasPendingDatagrams() &&
(packetSizeWithHeader = _networkSocket.pendingDatagramSize()) != -1) {
while (hasPending(packetSizeWithHeader)) {
if (system_clock::now() > abortTime) {
// We've been running for too long, stop processing packets for now
// Once we've processed the event queue, we'll come back to packet processing
Expand All @@ -409,7 +434,11 @@ void Socket::readPendingDatagrams() {
auto buffer = std::unique_ptr<char[]>(new char[packetSizeWithHeader]);

// pull the datagram
auto sizeRead = _networkSocket.readDatagram(buffer.get(), packetSizeWithHeader, &senderSockAddr);
int sizeRead = 0;
{
Guard socketLock(_socketMutex);
sizeRead = _networkSocket.readDatagram(buffer.get(), packetSizeWithHeader, &senderSockAddr);
}

// save information for this packet, in case it is the one that sticks readyRead
_lastPacketSizeRead = sizeRead;
Expand Down Expand Up @@ -552,6 +581,8 @@ std::vector<SockAddr> Socket::getConnectionSockAddrs() {
}

void Socket::handleSocketError(SocketType socketType, QAbstractSocket::SocketError socketError) {
Guard socketLock(_socketMutex);

int wsaError = 0;
static std::atomic<int> previousWsaError(0);
#ifdef WIN32
Expand Down
5 changes: 5 additions & 0 deletions libraries/networking/src/udt/Socket.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ class Socket : public QObject {

using Mutex = std::mutex;
using Lock = std::unique_lock<Mutex>;
using Guard = std::lock_guard<Mutex>;


public:
using StatsVector = std::vector<std::pair<SockAddr, ConnectionStats::Stats>>;
Expand Down Expand Up @@ -123,6 +125,7 @@ private slots:
std::vector<SockAddr> getConnectionSockAddrs();
void connectToSendSignal(const SockAddr& destinationAddr, QObject* receiver, const char* slot);

bool hasPending(int &packetSizeWithHeader);
Q_INVOKABLE void writeReliablePacket(Packet* packet, const SockAddr& sockAddr);
Q_INVOKABLE void writeReliablePacketList(PacketList* packetList, const SockAddr& sockAddr);

Expand All @@ -135,6 +138,8 @@ private slots:

Mutex _unreliableSequenceNumbersMutex;
Mutex _connectionsHashMutex;
Mutex _socketMutex;


std::unordered_map<SockAddr, BasePacketHandler> _unfilteredHandlers;
std::unordered_map<SockAddr, SequenceNumber> _unreliableSequenceNumbers;
Expand Down

0 comments on commit 964dbbb

Please sign in to comment.