diff --git a/src/libparodus.c b/src/libparodus.c index 532eb89..9f1148e 100644 --- a/src/libparodus.c +++ b/src/libparodus.c @@ -44,6 +44,11 @@ #define URL_SIZE 32 +#define DBG_QMEM 1 + +#if DBG_QMEM +#include +#endif typedef struct { int run_state; @@ -62,8 +67,17 @@ typedef struct { pthread_t wrp_receiver_tid; pthread_mutex_t send_mutex; bool auth_received; +#if DBG_QMEM + libpd_mq_t saved_wrp_queue; +#endif } __instance_t; +#if DBG_QMEM +#define CHECK_QMEM(inst) assert (inst->wrp_queue == inst->saved_wrp_queue) +#else +#define CHECK_QMEM(inst) +#endif + #define SOCK_SEND_TIMEOUT_MS 2000 #define MAX_RECONNECT_RETRY_DELAY_SECS 63 @@ -522,6 +536,9 @@ int libparodus_init_dbg (libpd_instance_t *instance, libpd_cfg_t *libpd_cfg, SETERR (oserr, LIBPD_ERR_INIT_QUEUE + err); return LIBPD_ERROR_INIT_QUEUE; } +#if DBG_QMEM + inst->saved_wrp_queue = inst->wrp_queue; +#endif libpd_log (LEVEL_INFO, ("LIBPARODUS: Created queues\n")); err = create_thread (&inst->wrp_receiver_tid, wrp_receiver_thread, inst); @@ -752,7 +769,7 @@ int libparodus_receive_dbg (libpd_instance_t instance, wrp_msg_t **msg, err_info->err_detail = LIBPD_ERR_RCV_NULL_INST; return LIBPD_ERROR_RCV_NULL_INST; } - + CHECK_QMEM(inst); if (!inst->cfg.receive) { libpd_log (LEVEL_ERROR, ("No receive option on libparodus_receive\n")); err_info->err_detail = LIBPD_ERR_RCV_CFG; diff --git a/src/libparodus_queues.c b/src/libparodus_queues.c index bc981bf..a3adea4 100644 --- a/src/libparodus_queues.c +++ b/src/libparodus_queues.c @@ -23,6 +23,12 @@ #include #include "libparodus_log.h" +#define DBG_QMEM 1 + +#if DBG_QMEM +#include +#endif + typedef struct queue { const char *queue_name; unsigned max_msgs; @@ -33,8 +39,32 @@ typedef struct queue { void **msg_array; int head_index; int tail_index; +#if DBG_QMEM + unsigned saved_max_msgs; + void **saved_msg_array; +#endif } queue_t; +#if DBG_QMEM +#define check_max_msgs(q) assert (q->max_msgs == q->saved_max_msgs) +#define check_msg_array(q) assert (q->msg_array == q->saved_msg_array) +#define validate_head_index(q) assert ( \ + (q->head_index >= -1) && (q->head_index < (int)q->max_msgs) \ + ) +#define validate_tail_index(q) assert ( \ + (q->tail_index >= -1) && (q->tail_index < (int)q->max_msgs) \ + ) +#define CHECK_QMEM(q) {\ + check_max_msgs(q); check_msg_array(q); validate_head_index(q); validate_tail_index(q); \ + } +#else +#define check_max_msgs(q) +#define check_msg_array(q) +#define validate_head_index(q) +#define validate_tail_index(q) +#define CHECK_QMEM(q) +#endif + int libpd_qcreate (libpd_mq_t *mq, const char *queue_name, unsigned max_msgs, int *exterr) { @@ -106,7 +136,10 @@ int libpd_qcreate (libpd_mq_t *mq, const char *queue_name, free (newq); return LIBPD_QERR_CREATE_ALLOC_2; } - +#if DBG_QMEM + newq->saved_max_msgs = newq->max_msgs; + newq->saved_msg_array = newq->msg_array; +#endif *mq = (libpd_mq_t) newq; return 0; } @@ -178,6 +211,7 @@ int libpd_qsend (libpd_mq_t mq, void *msg, unsigned timeout_ms, int *exterr) *exterr = 0; if (NULL == mq) return LIBPD_QERR_SEND_NULL; + CHECK_QMEM(q); pthread_mutex_lock (&q->mutex); while (true) { if (enqueue_msg (q, msg)) @@ -190,6 +224,7 @@ int libpd_qsend (libpd_mq_t mq, void *msg, unsigned timeout_ms, int *exterr) pthread_mutex_unlock (&q->mutex); return LIBPD_QERR_SEND_EXPTIME; } + CHECK_QMEM(q); rtn = pthread_cond_timedwait (&q->not_full_cond, &q->mutex, &ts); if (rtn != 0) { if (rtn == ETIMEDOUT) { @@ -219,6 +254,7 @@ int libpd_qreceive (libpd_mq_t mq, void **msg, unsigned timeout_ms, int *exterr) *exterr = 0; if (NULL == mq) return LIBPD_QERR_RCV_NULL; + CHECK_QMEM(q); pthread_mutex_lock (&q->mutex); while (true) { msg__ = dequeue_msg (q); @@ -232,6 +268,7 @@ int libpd_qreceive (libpd_mq_t mq, void **msg, unsigned timeout_ms, int *exterr) pthread_mutex_unlock (&q->mutex); return LIBPD_QERR_RCV_EXPTIME; } + CHECK_QMEM(q); rtn = pthread_cond_timedwait (&q->not_empty_cond, &q->mutex, &ts); if (rtn != 0) { if (rtn == ETIMEDOUT) {