Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 18 additions & 1 deletion src/libparodus.c
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,11 @@

#define URL_SIZE 32

#define DBG_QMEM 1

#if DBG_QMEM
#include <assert.h>
#endif

typedef struct {
int run_state;
Expand All @@ -62,8 +67,17 @@ typedef struct {
pthread_t wrp_receiver_tid;
pthread_mutex_t send_mutex;
bool auth_received;
#if DBG_QMEM
libpd_mq_t saved_wrp_queue;
#endif
} __instance_t;

#if DBG_QMEM
#define CHECK_QMEM(inst) assert (inst->wrp_queue == inst->saved_wrp_queue)
#else
#define CHECK_QMEM(inst)
#endif

#define SOCK_SEND_TIMEOUT_MS 2000

#define MAX_RECONNECT_RETRY_DELAY_SECS 63
Expand Down Expand Up @@ -522,6 +536,9 @@ int libparodus_init_dbg (libpd_instance_t *instance, libpd_cfg_t *libpd_cfg,
SETERR (oserr, LIBPD_ERR_INIT_QUEUE + err);
return LIBPD_ERROR_INIT_QUEUE;
}
#if DBG_QMEM
inst->saved_wrp_queue = inst->wrp_queue;
#endif
libpd_log (LEVEL_INFO, ("LIBPARODUS: Created queues\n"));
err = create_thread (&inst->wrp_receiver_tid, wrp_receiver_thread,
inst);
Expand Down Expand Up @@ -752,7 +769,7 @@ int libparodus_receive_dbg (libpd_instance_t instance, wrp_msg_t **msg,
err_info->err_detail = LIBPD_ERR_RCV_NULL_INST;
return LIBPD_ERROR_RCV_NULL_INST;
}

CHECK_QMEM(inst);
if (!inst->cfg.receive) {
libpd_log (LEVEL_ERROR, ("No receive option on libparodus_receive\n"));
err_info->err_detail = LIBPD_ERR_RCV_CFG;
Expand Down
39 changes: 38 additions & 1 deletion src/libparodus_queues.c
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,12 @@
#include <pthread.h>
#include "libparodus_log.h"

#define DBG_QMEM 1

#if DBG_QMEM
#include <assert.h>
#endif

typedef struct queue {
const char *queue_name;
unsigned max_msgs;
Expand All @@ -33,8 +39,32 @@ typedef struct queue {
void **msg_array;
int head_index;
int tail_index;
#if DBG_QMEM
unsigned saved_max_msgs;
void **saved_msg_array;
#endif
} queue_t;

#if DBG_QMEM
#define check_max_msgs(q) assert (q->max_msgs == q->saved_max_msgs)
#define check_msg_array(q) assert (q->msg_array == q->saved_msg_array)
#define validate_head_index(q) assert ( \
(q->head_index >= -1) && (q->head_index < (int)q->max_msgs) \
)
#define validate_tail_index(q) assert ( \
(q->tail_index >= -1) && (q->tail_index < (int)q->max_msgs) \
)
#define CHECK_QMEM(q) {\
check_max_msgs(q); check_msg_array(q); validate_head_index(q); validate_tail_index(q); \
}
#else
#define check_max_msgs(q)
#define check_msg_array(q)
#define validate_head_index(q)
#define validate_tail_index(q)
#define CHECK_QMEM(q)
#endif

int libpd_qcreate (libpd_mq_t *mq, const char *queue_name,
unsigned max_msgs, int *exterr)
{
Expand Down Expand Up @@ -106,7 +136,10 @@ int libpd_qcreate (libpd_mq_t *mq, const char *queue_name,
free (newq);
return LIBPD_QERR_CREATE_ALLOC_2;
}

#if DBG_QMEM
newq->saved_max_msgs = newq->max_msgs;
newq->saved_msg_array = newq->msg_array;
#endif
*mq = (libpd_mq_t) newq;
return 0;
}
Expand Down Expand Up @@ -178,6 +211,7 @@ int libpd_qsend (libpd_mq_t mq, void *msg, unsigned timeout_ms, int *exterr)
*exterr = 0;
if (NULL == mq)
return LIBPD_QERR_SEND_NULL;
CHECK_QMEM(q);
pthread_mutex_lock (&q->mutex);
while (true) {
if (enqueue_msg (q, msg))
Expand All @@ -190,6 +224,7 @@ int libpd_qsend (libpd_mq_t mq, void *msg, unsigned timeout_ms, int *exterr)
pthread_mutex_unlock (&q->mutex);
return LIBPD_QERR_SEND_EXPTIME;
}
CHECK_QMEM(q);
rtn = pthread_cond_timedwait (&q->not_full_cond, &q->mutex, &ts);
if (rtn != 0) {
if (rtn == ETIMEDOUT) {
Expand Down Expand Up @@ -219,6 +254,7 @@ int libpd_qreceive (libpd_mq_t mq, void **msg, unsigned timeout_ms, int *exterr)
*exterr = 0;
if (NULL == mq)
return LIBPD_QERR_RCV_NULL;
CHECK_QMEM(q);
pthread_mutex_lock (&q->mutex);
while (true) {
msg__ = dequeue_msg (q);
Expand All @@ -232,6 +268,7 @@ int libpd_qreceive (libpd_mq_t mq, void **msg, unsigned timeout_ms, int *exterr)
pthread_mutex_unlock (&q->mutex);
return LIBPD_QERR_RCV_EXPTIME;
}
CHECK_QMEM(q);
rtn = pthread_cond_timedwait (&q->not_empty_cond, &q->mutex, &ts);
if (rtn != 0) {
if (rtn == ETIMEDOUT) {
Expand Down