Skip to content

Commit

Permalink
proxy: ask for router-resp in requests (#48079)
Browse files Browse the repository at this point in the history
  • Loading branch information
jkarneges authored Oct 4, 2024
1 parent 2fff730 commit 5050428
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 19 deletions.
72 changes: 53 additions & 19 deletions src/core/zhttpmanager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ class ZhttpManager::Private : public QObject
QZmq::Socket *server_in_stream_sock;
QZmq::Socket *server_out_sock;
QZmq::Valve *client_in_valve;
QZmq::Valve *client_out_stream_valve;
QZmq::Valve *server_in_valve;
QZmq::Valve *server_in_stream_valve;
QByteArray instanceId;
Expand All @@ -110,6 +111,7 @@ class ZhttpManager::Private : public QObject
Connection sosConnection;
Connection rrConnection;
Connection clientConnection;
Connection clientOutStreamConnection;
Connection serverConnection;
Connection serverStreamConnection;
Connection refreshTimerConnection;
Expand All @@ -125,6 +127,7 @@ class ZhttpManager::Private : public QObject
server_in_stream_sock(0),
server_out_sock(0),
client_in_valve(0),
client_out_stream_valve(0),
server_in_valve(0),
server_in_stream_valve(0),
ipcFileMode(-1),
Expand Down Expand Up @@ -186,11 +189,13 @@ class ZhttpManager::Private : public QObject
rrConnection.disconnect();
cossConnection.disconnect();
delete client_req_sock;
delete client_out_stream_valve;
delete client_out_stream_sock;

client_out_stream_sock = new QZmq::Socket(QZmq::Socket::Router, this);
cossConnection = client_out_stream_sock->messagesWritten.connect(boost::bind(&Private::client_out_stream_messagesWritten, this, boost::placeholders::_1));

client_out_stream_sock->setIdentity(instanceId);
client_out_stream_sock->setWriteQueueEnabled(false);
client_out_stream_sock->setHwm(DEFAULT_HWM);
client_out_stream_sock->setShutdownWaitTime(CLIENT_STREAM_WAIT_TIME);
Expand All @@ -203,13 +208,19 @@ class ZhttpManager::Private : public QObject
return false;
}

client_out_stream_valve = new QZmq::Valve(client_out_stream_sock, this);
clientOutStreamConnection = client_out_stream_valve->readyRead.connect(boost::bind(&Private::client_out_stream_readyRead, this, boost::placeholders::_1));

client_out_stream_valve->open();

return true;
}

bool setupClientIn()
{
rrConnection.disconnect();
delete client_req_sock;
delete client_in_valve;
delete client_in_sock;

client_in_sock = new QZmq::Socket(QZmq::Socket::Sub, this);
Expand Down Expand Up @@ -259,6 +270,7 @@ class ZhttpManager::Private : public QObject

bool setupServerIn()
{
delete server_in_valve;
delete server_in_sock;

server_in_sock = new QZmq::Socket(QZmq::Socket::Pull, this);
Expand Down Expand Up @@ -563,38 +575,28 @@ class ZhttpManager::Private : public QObject
}
}

void client_in_readyRead(const QList<QByteArray> &msg)
void processClientIn(const QByteArray &receiver, const QByteArray &msg)
{
if(msg.count() != 1)
{
log_warning("zhttp/zws client: received message with parts != 1, skipping");
return;
}

int at = msg[0].indexOf(' ');
if(at == -1)
{
log_warning("zhttp/zws client: received message with invalid format, skipping");
return;
}

QByteArray receiver = msg[0].mid(0, at);
QByteArray dataRaw = msg[0].mid(at + 1);
if(dataRaw.length() < 1 || dataRaw[0] != 'T')
if(msg.length() < 1 || msg[0] != 'T')
{
log_warning("zhttp/zws client: received message with invalid format (missing type), skipping");
return;
}

QVariant data = TnetString::toVariant(dataRaw.mid(1));
QVariant data = TnetString::toVariant(msg.mid(1));
if(data.isNull())
{
log_warning("zhttp/zws client: received message with invalid format (tnetstring parse failed), skipping");
return;
}

if(log_outputLevel() >= LOG_LEVEL_DEBUG)
LogUtil::logVariantWithContent(LOG_LEVEL_DEBUG, data, "body", "zhttp/zws client: IN %s", receiver.data());
{
if(!receiver.isEmpty())
LogUtil::logVariantWithContent(LOG_LEVEL_DEBUG, data, "body", "zhttp/zws client: IN %s", receiver.data());
else
LogUtil::logVariantWithContent(LOG_LEVEL_DEBUG, data, "body", "zhttp/zws client: IN");
}

ZhttpResponsePacket p;
if(!p.fromVariant(data))
Expand Down Expand Up @@ -633,6 +635,38 @@ class ZhttpManager::Private : public QObject
}
}

void client_out_stream_readyRead(const QList<QByteArray> &msg)
{
if(msg.count() != 3)
{
log_warning("zhttp/zws client: received router message with parts != 3, skipping");
return;
}

processClientIn(QByteArray(), msg[2]);
}

void client_in_readyRead(const QList<QByteArray> &msg)
{
if(msg.count() != 1)
{
log_warning("zhttp/zws client: received pub message with parts != 1, skipping");
return;
}

int at = msg[0].indexOf(' ');
if(at == -1)
{
log_warning("zhttp/zws client: received pub message with invalid format, skipping");
return;
}

QByteArray receiver = msg[0].mid(0, at);
QByteArray dataRaw = msg[0].mid(at + 1);

processClientIn(receiver, dataRaw);
}

void server_in_readyRead(const QList<QByteArray> &msg)
{
if(msg.count() != 1)
Expand Down
1 change: 1 addition & 0 deletions src/core/zhttprequest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1014,6 +1014,7 @@ public slots:
if(!requestBodyBuf.isEmpty() || !bodyFinished)
p.more = true;
p.stream = true;
p.routerResp = true;
p.connectHost = connectHost;
p.connectPort = connectPort;
if(ignorePolicies)
Expand Down
1 change: 1 addition & 0 deletions src/core/zwebsocket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1037,6 +1037,7 @@ public slots:
p.type = ZhttpRequestPacket::Data;
p.uri = requestUri;
p.headers = requestHeaders;
p.routerResp = true;
p.connectHost = connectHost;
p.connectPort = connectPort;
if(ignorePolicies)
Expand Down

0 comments on commit 5050428

Please sign in to comment.