-
Notifications
You must be signed in to change notification settings - Fork 8
/
Copy pathdouble_lock_queue.h
107 lines (89 loc) · 2.45 KB
/
double_lock_queue.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
#include <stdio.h>
#include <pthread.h>
#include <string.h>
#include <stdlib.h>
#include <string>
#include <atomic>
class DoubleLockQueue
{
public:
DoubleLockQueue(int capacity)
{
// what if capacity == 0?
this->res_max = capacity;
this->res_cnt = 0;
this->queue_cnt = 0;
}
void enqueue(int element)
{
int cnt;
pthread_mutex_lock(&this->put_mutex);
while (this->res_cnt >= this->res_max)
pthread_cond_wait(&this->put_cond, &this->put_mutex);
this->list.add_tail(element);
cnt = __sync_fetch_and_add(&this->res_cnt, 1);
// i can still put one more
// can i signal outside the put_mutex?
// should have try !!!
if (cnt + 1 < this->res_max)
pthread_cond_signal(&this->put_cond);
pthread_mutex_unlock(&this->put_mutex);
// before i add this, it`s empty and some one may be waiting for get_cond
// signalNotEmpty();
if (cnt == 0)
{
// why i must lock get_mutex
pthread_mutex_lock(&this->get_mutex);
pthread_cond_signal(&this->get_cond);
pthread_mutex_unlock(&this->get_mutex);
}
__sync_add_and_fetch(&this->queue_cnt, 1);
// printf("enqueue(%d) and size=%d res_cnt=%d\n",
// element, this->size(), this->res_cnt);
}
int dequeue()
{
int ret;
int cnt;
pthread_mutex_lock(&this->get_mutex);
while (this->res_cnt == 0)// && this->list.empty())
pthread_cond_wait(&this->get_cond, &this->get_mutex);
// don`t know whether this is necessary
if (this->res_cnt != 0)
{
this->list.get_head(ret);
cnt = __sync_fetch_and_sub(&this->res_cnt, 1);
// something still left inside the queue
if (cnt > 1)
pthread_cond_signal(&this->get_cond);
// } else {
// this should not happen depend in this leetcode
}
pthread_mutex_unlock(&this->get_mutex);
// just now i took and make one extra space
if (cnt == this->res_max)
{
pthread_mutex_lock(&this->put_mutex);
pthread_cond_signal(&this->put_cond);
pthread_mutex_unlock(&this->put_mutex);
}
__sync_sub_and_fetch(&this->queue_cnt, 1);
// printf("dequeue()=%d and size=%d res_cnt=%d\n",
// ret, this->size(), this->res_cnt);
return ret;
}
int size()
{
return this->queue_cnt;
}
private:
int res_max;
int res_cnt;
int queue_cnt;
List<int> list;
pthread_mutex_t put_mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_mutex_t get_mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t put_cond = PTHREAD_COND_INITIALIZER;
pthread_cond_t get_cond = PTHREAD_COND_INITIALIZER;
// get_cond means not empty, put_cond means not full
};