diff --git a/src/libparodus_queues.c b/src/libparodus_queues.c index bc981bf..bed7515 100644 --- a/src/libparodus_queues.c +++ b/src/libparodus_queues.c @@ -28,8 +28,6 @@ typedef struct queue { unsigned max_msgs; int msg_count; pthread_mutex_t mutex; - pthread_cond_t not_empty_cond; - pthread_cond_t not_full_cond; void **msg_array; int head_index; int tail_index; @@ -75,34 +73,11 @@ int libpd_qcreate (libpd_mq_t *mq, const char *queue_name, return LIBPD_QERR_CREATE_MUTEX; } - err = pthread_cond_init (&newq->not_empty_cond, NULL); - if (err != 0) { - *exterr = err; - libpd_log_err (LEVEL_ERROR, err, ("Error creating not_empty_cond for queue %s\n", - queue_name)); - pthread_mutex_destroy (&newq->mutex); - free (newq); - return LIBPD_QERR_CREATE_NECOND; - } - - err = pthread_cond_init (&newq->not_full_cond, NULL); - if (err != 0) { - *exterr = err; - libpd_log_err (LEVEL_ERROR, err, ("Error creating not_full_cond for queue %s\n", - queue_name)); - pthread_mutex_destroy (&newq->mutex); - pthread_cond_destroy (&newq->not_empty_cond); - free (newq); - return LIBPD_QERR_CREATE_NFCOND; - } - newq->msg_array = malloc (array_size); if (NULL == newq->msg_array) { libpd_log (LEVEL_ERROR, ("Unable to allocate memory(2) for queue %s\n", queue_name)); pthread_mutex_destroy (&newq->mutex); - pthread_cond_destroy (&newq->not_empty_cond); - pthread_cond_destroy (&newq->not_full_cond); free (newq); return LIBPD_QERR_CREATE_ALLOC_2; } @@ -160,8 +135,6 @@ int libpd_qdestroy (libpd_mq_t *mq, free_msg_func_t *free_msg_func) } } free (q->msg_array); - pthread_cond_destroy (&q->not_empty_cond); - pthread_cond_destroy (&q->not_full_cond); pthread_mutex_unlock (&q->mutex); pthread_mutex_destroy (&q->mutex); free (q); @@ -169,42 +142,44 @@ int libpd_qdestroy (libpd_mq_t *mq, free_msg_func_t *free_msg_func) return 0; } +void backoff (unsigned *current_wait) +{ + unsigned current = *current_wait; + if (current == 0) + current = 10; + else if (current == 10) + current = 20; + else if (current == 20) + current = 50; + else if (current == 50) + current = 100; + else if (current == 100) + current = 200; + else + current = 500; + delay_ms (current); + *current_wait = current; +} + int libpd_qsend (libpd_mq_t mq, void *msg, unsigned timeout_ms, int *exterr) { queue_t *q = (queue_t*) mq; - struct timespec ts; - int rtn; + unsigned current_wait = 0; + unsigned total_wait = 0; *exterr = 0; if (NULL == mq) return LIBPD_QERR_SEND_NULL; - pthread_mutex_lock (&q->mutex); while (true) { + pthread_mutex_lock (&q->mutex); if (enqueue_msg (q, msg)) break; - rtn = get_expire_time (timeout_ms, &ts); - if (rtn != 0) { - *exterr = rtn; - libpd_log_err (LEVEL_ERROR, rtn, - ("gettimeofday error waiting to send queue\n")); - pthread_mutex_unlock (&q->mutex); - return LIBPD_QERR_SEND_EXPTIME; - } - rtn = pthread_cond_timedwait (&q->not_full_cond, &q->mutex, &ts); - if (rtn != 0) { - if (rtn == ETIMEDOUT) { - pthread_mutex_unlock (&q->mutex); - return 1; - } - *exterr = rtn; - libpd_log_err (LEVEL_ERROR, rtn, - ("pthread_cond_timedwait error waiting for not_full_cond\n")); - pthread_mutex_unlock (&q->mutex); - return LIBPD_QERR_SEND_CONDWAIT; - } + pthread_mutex_unlock (&q->mutex); + if (total_wait >= timeout_ms) + return 1; + backoff (¤t_wait); + total_wait += current_wait; } - if (q->msg_count == 1) - pthread_cond_signal (&q->not_empty_cond); pthread_mutex_unlock (&q->mutex); return 0; } @@ -212,43 +187,25 @@ int libpd_qsend (libpd_mq_t mq, void *msg, unsigned timeout_ms, int *exterr) int libpd_qreceive (libpd_mq_t mq, void **msg, unsigned timeout_ms, int *exterr) { queue_t *q = (queue_t*) mq; - struct timespec ts; + unsigned current_wait = 0; + unsigned total_wait = 0; void *msg__; - int rtn; *exterr = 0; if (NULL == mq) return LIBPD_QERR_RCV_NULL; - pthread_mutex_lock (&q->mutex); while (true) { + pthread_mutex_lock (&q->mutex); msg__ = dequeue_msg (q); + pthread_mutex_unlock (&q->mutex); if (NULL != msg__) break; - rtn = get_expire_time (timeout_ms, &ts); - if (rtn != 0) { - *exterr = rtn; - libpd_log_err (LEVEL_ERROR, rtn, - ("gettimeofday error waiting to receive on queue\n")); - pthread_mutex_unlock (&q->mutex); - return LIBPD_QERR_RCV_EXPTIME; - } - rtn = pthread_cond_timedwait (&q->not_empty_cond, &q->mutex, &ts); - if (rtn != 0) { - if (rtn == ETIMEDOUT) { - pthread_mutex_unlock (&q->mutex); - return 1; - } - *exterr = rtn; - libpd_log_err (LEVEL_ERROR, rtn, - ("pthread_cond_timedwait error waiting for not_empty_cond\n")); - pthread_mutex_unlock (&q->mutex); - return LIBPD_QERR_RCV_CONDWAIT; - } + if (total_wait >= timeout_ms) + return 1; + backoff (¤t_wait); + total_wait += current_wait; } *msg = msg__; - if ((q->msg_count+1) == (int)q->max_msgs) - pthread_cond_signal (&q->not_full_cond); - pthread_mutex_unlock (&q->mutex); return 0; }