Skip to content

Commit

Permalink
wrap zmq crate with libzmq-like C API and use from qzmq
Browse files Browse the repository at this point in the history
  • Loading branch information
jkarneges committed Nov 2, 2023
1 parent e7c6292 commit 7b0fc2a
Show file tree
Hide file tree
Showing 5 changed files with 859 additions and 57 deletions.
6 changes: 3 additions & 3 deletions src/corelib/qzmq/src/qzmqcontext.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,19 +24,19 @@
#include "qzmqcontext.h"

#include <assert.h>
#include <zmq.h>
#include "rust/wzmq.h"

namespace QZmq {

Context::Context(int ioThreads)
{
context_ = zmq_init(ioThreads);
context_ = wzmq_init(ioThreads);
assert(context_);
}

Context::~Context()
{
zmq_term(context_);
wzmq_term(context_);
}

}
108 changes: 54 additions & 54 deletions src/corelib/qzmq/src/qzmqsocket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
#include <QTimer>
#include <QSocketNotifier>
#include <QMutex>
#include <zmq.h>
#include "rust/wzmq.h"
#include "qzmqcontext.h"

namespace QZmq {
Expand All @@ -39,56 +39,56 @@ static int get_fd(void *sock)
{
int fd;
size_t opt_len = sizeof(fd);
int ret = zmq_getsockopt(sock, ZMQ_FD, &fd, &opt_len);
int ret = wzmq_getsockopt(sock, WZMQ_FD, &fd, &opt_len);
assert(ret == 0);
return fd;
}

static void set_subscribe(void *sock, const char *data, int size)
{
size_t opt_len = size;
int ret = zmq_setsockopt(sock, ZMQ_SUBSCRIBE, data, opt_len);
int ret = wzmq_setsockopt(sock, WZMQ_SUBSCRIBE, data, opt_len);
assert(ret == 0);
}

static void set_unsubscribe(void *sock, const char *data, int size)
{
size_t opt_len = size;
zmq_setsockopt(sock, ZMQ_UNSUBSCRIBE, data, opt_len);
wzmq_setsockopt(sock, WZMQ_UNSUBSCRIBE, data, opt_len);
// note: we ignore errors, such as unsubscribing a nonexisting filter
}

static void set_linger(void *sock, int value)
{
size_t opt_len = sizeof(value);
int ret = zmq_setsockopt(sock, ZMQ_LINGER, &value, opt_len);
int ret = wzmq_setsockopt(sock, WZMQ_LINGER, &value, opt_len);
assert(ret == 0);
}

static int get_identity(void *sock, char *data, int size)
{
size_t opt_len = size;
int ret = zmq_getsockopt(sock, ZMQ_IDENTITY, data, &opt_len);
int ret = wzmq_getsockopt(sock, WZMQ_IDENTITY, data, &opt_len);
assert(ret == 0);
return (int)opt_len;
}

static void set_identity(void *sock, const char *data, int size)
{
size_t opt_len = size;
int ret = zmq_setsockopt(sock, ZMQ_IDENTITY, data, opt_len);
int ret = wzmq_setsockopt(sock, WZMQ_IDENTITY, data, opt_len);
if(ret != 0)
printf("%d\n", errno);
assert(ret == 0);
}

#if ZMQ_VERSION_MAJOR >= 4
#if WZMQ_VERSION_MAJOR >= 4

static void set_immediate(void *sock, bool on)
{
int v = on ? 1 : 0;
size_t opt_len = sizeof(v);
int ret = zmq_setsockopt(sock, ZMQ_IMMEDIATE, &v, opt_len);
int ret = wzmq_setsockopt(sock, WZMQ_IMMEDIATE, &v, opt_len);
assert(ret == 0);
}

Expand All @@ -98,21 +98,21 @@ static void set_immediate(void *sock, bool on)
{
int v = on ? 1 : 0;
size_t opt_len = sizeof(v);
int ret = zmq_setsockopt(sock, ZMQ_DELAY_ATTACH_ON_CONNECT, &v, opt_len);
int ret = wzmq_setsockopt(sock, WZMQ_DELAY_ATTACH_ON_CONNECT, &v, opt_len);
assert(ret == 0);
}

#endif

#if (ZMQ_VERSION_MAJOR >= 4) || ((ZMQ_VERSION_MAJOR >= 3) && (ZMQ_VERSION_MINOR >= 2))
#if (WZMQ_VERSION_MAJOR >= 4) || ((WZMQ_VERSION_MAJOR >= 3) && (WZMQ_VERSION_MINOR >= 2))

#define USE_MSG_IO

static bool get_rcvmore(void *sock)
{
int more;
size_t opt_len = sizeof(more);
int ret = zmq_getsockopt(sock, ZMQ_RCVMORE, &more, &opt_len);
int ret = wzmq_getsockopt(sock, WZMQ_RCVMORE, &more, &opt_len);
assert(ret == 0);
return more ? true : false;
}
Expand All @@ -124,7 +124,7 @@ static int get_events(void *sock)
int events;
size_t opt_len = sizeof(events);

int ret = zmq_getsockopt(sock, ZMQ_EVENTS, &events, &opt_len);
int ret = wzmq_getsockopt(sock, WZMQ_EVENTS, &events, &opt_len);
if(ret == 0)
{
return (int)events;
Expand All @@ -138,7 +138,7 @@ static int get_sndhwm(void *sock)
{
int hwm;
size_t opt_len = sizeof(hwm);
int ret = zmq_getsockopt(sock, ZMQ_SNDHWM, &hwm, &opt_len);
int ret = wzmq_getsockopt(sock, WZMQ_SNDHWM, &hwm, &opt_len);
assert(ret == 0);
return (int)hwm;
}
Expand All @@ -147,15 +147,15 @@ static void set_sndhwm(void *sock, int value)
{
int v = value;
size_t opt_len = sizeof(v);
int ret = zmq_setsockopt(sock, ZMQ_SNDHWM, &v, opt_len);
int ret = wzmq_setsockopt(sock, WZMQ_SNDHWM, &v, opt_len);
assert(ret == 0);
}

static int get_rcvhwm(void *sock)
{
int hwm;
size_t opt_len = sizeof(hwm);
int ret = zmq_getsockopt(sock, ZMQ_RCVHWM, &hwm, &opt_len);
int ret = wzmq_getsockopt(sock, WZMQ_RCVHWM, &hwm, &opt_len);
assert(ret == 0);
return (int)hwm;
}
Expand All @@ -164,7 +164,7 @@ static void set_rcvhwm(void *sock, int value)
{
int v = value;
size_t opt_len = sizeof(v);
int ret = zmq_setsockopt(sock, ZMQ_RCVHWM, &v, opt_len);
int ret = wzmq_setsockopt(sock, WZMQ_RCVHWM, &v, opt_len);
assert(ret == 0);
}

Expand All @@ -183,31 +183,31 @@ static void set_tcp_keepalive(void *sock, int value)
{
int v = value;
size_t opt_len = sizeof(v);
int ret = zmq_setsockopt(sock, ZMQ_TCP_KEEPALIVE, &v, opt_len);
int ret = wzmq_setsockopt(sock, WZMQ_TCP_KEEPALIVE, &v, opt_len);
assert(ret == 0);
}

static void set_tcp_keepalive_idle(void *sock, int value)
{
int v = value;
size_t opt_len = sizeof(v);
int ret = zmq_setsockopt(sock, ZMQ_TCP_KEEPALIVE_IDLE, &v, opt_len);
int ret = wzmq_setsockopt(sock, WZMQ_TCP_KEEPALIVE_IDLE, &v, opt_len);
assert(ret == 0);
}

static void set_tcp_keepalive_cnt(void *sock, int value)
{
int v = value;
size_t opt_len = sizeof(v);
int ret = zmq_setsockopt(sock, ZMQ_TCP_KEEPALIVE_CNT, &v, opt_len);
int ret = wzmq_setsockopt(sock, WZMQ_TCP_KEEPALIVE_CNT, &v, opt_len);
assert(ret == 0);
}

static void set_tcp_keepalive_intvl(void *sock, int value)
{
int v = value;
size_t opt_len = sizeof(v);
int ret = zmq_setsockopt(sock, ZMQ_TCP_KEEPALIVE_INTVL, &v, opt_len);
int ret = wzmq_setsockopt(sock, WZMQ_TCP_KEEPALIVE_INTVL, &v, opt_len);
assert(ret == 0);
}

Expand All @@ -217,7 +217,7 @@ static bool get_rcvmore(void *sock)
{
qint64 more;
size_t opt_len = sizeof(more);
int ret = zmq_getsockopt(sock, ZMQ_RCVMORE, &more, &opt_len);
int ret = wzmq_getsockopt(sock, WZMQ_RCVMORE, &more, &opt_len);
assert(ret == 0);
return more ? true : false;
}
Expand All @@ -229,7 +229,7 @@ static int get_events(void *sock)
quint32 events;
size_t opt_len = sizeof(events);

int ret = zmq_getsockopt(sock, ZMQ_EVENTS, &events, &opt_len);
int ret = wzmq_getsockopt(sock, WZMQ_EVENTS, &events, &opt_len);
if(ret == 0)
{
return (int)events;
Expand All @@ -243,7 +243,7 @@ static int get_hwm(void *sock)
{
quint64 hwm;
size_t opt_len = sizeof(hwm);
int ret = zmq_getsockopt(sock, ZMQ_HWM, &hwm, &opt_len);
int ret = wzmq_getsockopt(sock, WZMQ_HWM, &hwm, &opt_len);
assert(ret == 0);
return (int)hwm;
}
Expand All @@ -252,7 +252,7 @@ static void set_hwm(void *sock, int value)
{
quint64 v = value;
size_t opt_len = sizeof(v);
int ret = zmq_setsockopt(sock, ZMQ_HWM, &v, opt_len);
int ret = wzmq_setsockopt(sock, WZMQ_HWM, &v, opt_len);
assert(ret == 0);
}

Expand Down Expand Up @@ -390,20 +390,20 @@ class Socket::Private : public QObject
int ztype = 0;
switch(type)
{
case Socket::Pair: ztype = ZMQ_PAIR; break;
case Socket::Dealer: ztype = ZMQ_DEALER; break;
case Socket::Router: ztype = ZMQ_ROUTER; break;
case Socket::Req: ztype = ZMQ_REQ; break;
case Socket::Rep: ztype = ZMQ_REP; break;
case Socket::Push: ztype = ZMQ_PUSH; break;
case Socket::Pull: ztype = ZMQ_PULL; break;
case Socket::Pub: ztype = ZMQ_PUB; break;
case Socket::Sub: ztype = ZMQ_SUB; break;
case Socket::Pair: ztype = WZMQ_PAIR; break;
case Socket::Dealer: ztype = WZMQ_DEALER; break;
case Socket::Router: ztype = WZMQ_ROUTER; break;
case Socket::Req: ztype = WZMQ_REQ; break;
case Socket::Rep: ztype = WZMQ_REP; break;
case Socket::Push: ztype = WZMQ_PUSH; break;
case Socket::Pull: ztype = WZMQ_PULL; break;
case Socket::Pub: ztype = WZMQ_PUB; break;
case Socket::Sub: ztype = WZMQ_SUB; break;
default:
assert(0);
}

sock = zmq_socket(context->context(), ztype);
sock = wzmq_socket(context->context(), ztype);
assert(sock != NULL);

sn_read = new QSocketNotifier(get_fd(sock), QSocketNotifier::Read, this);
Expand All @@ -422,7 +422,7 @@ class Socket::Private : public QObject
updateTimer->deleteLater();

set_linger(sock, shutdownWaitTime);
zmq_close(sock);
wzmq_close(sock);

if(usingGlobalContext)
removeGlobalContextRef();
Expand All @@ -447,29 +447,29 @@ class Socket::Private : public QObject

do
{
zmq_msg_t msg;
wzmq_msg_t msg;

int ret = zmq_msg_init(&msg);
int ret = wzmq_msg_init(&msg);
assert(ret == 0);

#ifdef USE_MSG_IO
ret = zmq_msg_recv(&msg, sock, ZMQ_DONTWAIT);
ret = wzmq_msg_recv(&msg, sock, WZMQ_DONTWAIT);
#else
ret = zmq_recv(sock, &msg, ZMQ_NOBLOCK);
ret = wzmq_recv(sock, &msg, WZMQ_NOBLOCK);
#endif

if(ret < 0)
{
ret = zmq_msg_close(&msg);
ret = wzmq_msg_close(&msg);
assert(ret == 0);

ok = false;
break;
}

QByteArray buf((const char *)zmq_msg_data(&msg), zmq_msg_size(&msg));
QByteArray buf((const char *)wzmq_msg_data(&msg), wzmq_msg_size(&msg));

ret = zmq_msg_close(&msg);
ret = wzmq_msg_close(&msg);
assert(ret == 0);

out += buf;
Expand Down Expand Up @@ -522,8 +522,8 @@ class Socket::Private : public QObject
bool canWriteOld = canWrite;
bool canReadOld = canRead;

canWrite = (flags & ZMQ_POLLOUT);
canRead = (flags & ZMQ_POLLIN);
canWrite = (flags & WZMQ_POLLOUT);
canRead = (flags & WZMQ_POLLIN);

return (canWrite != canWriteOld || canRead != canReadOld);
}
Expand All @@ -534,28 +534,28 @@ class Socket::Private : public QObject
{
const QByteArray &buf = message[n];

zmq_msg_t msg;
wzmq_msg_t msg;

int ret = zmq_msg_init_size(&msg, buf.size());
int ret = wzmq_msg_init_size(&msg, buf.size());
assert(ret == 0);

memcpy(zmq_msg_data(&msg), buf.data(), buf.size());
memcpy(wzmq_msg_data(&msg), buf.data(), buf.size());

#ifdef USE_MSG_IO
ret = zmq_msg_send(&msg, sock, ZMQ_DONTWAIT | (n + 1 < message.count() ? ZMQ_SNDMORE : 0));
ret = wzmq_msg_send(&msg, sock, WZMQ_DONTWAIT | (n + 1 < message.count() ? WZMQ_SNDMORE : 0));
#else
ret = zmq_send(sock, &msg, ZMQ_NOBLOCK | (n + 1 < message.count() ? ZMQ_SNDMORE : 0));
ret = wzmq_send(sock, &msg, WZMQ_NOBLOCK | (n + 1 < message.count() ? WZMQ_SNDMORE : 0));
#endif

if(ret < 0)
{
ret = zmq_msg_close(&msg);
ret = wzmq_msg_close(&msg);
assert(ret == 0);

return false;
}

ret = zmq_msg_close(&msg);
ret = wzmq_msg_close(&msg);
assert(ret == 0);
}

Expand Down Expand Up @@ -722,13 +722,13 @@ void Socket::setTcpKeepAliveParameters(int idle, int count, int interval)

void Socket::connectToAddress(const QString &addr)
{
int ret = zmq_connect(d->sock, addr.toUtf8().data());
int ret = wzmq_connect(d->sock, addr.toUtf8().data());
assert(ret == 0);
}

bool Socket::bind(const QString &addr)
{
int ret = zmq_bind(d->sock, addr.toUtf8().data());
int ret = wzmq_bind(d->sock, addr.toUtf8().data());
if(ret != 0)
return false;

Expand Down
Loading

0 comments on commit 7b0fc2a

Please sign in to comment.