|
| 1 | +#include <cstdlib> |
| 2 | +#include <cstdio> |
| 3 | +#include <cstdint> |
| 4 | +#include <cstring> |
| 5 | +#include <unistd.h> |
| 6 | +#include <pthread.h> |
| 7 | +#include <vector> |
| 8 | +#include <memory> |
| 9 | + |
| 10 | +// Lock-free, thread-safe FIFO. |
| 11 | +template<typename T, unsigned int buffer_size=256> |
| 12 | +class RingBuffer |
| 13 | +{ |
| 14 | + T backing_buf[buffer_size]; |
| 15 | + volatile unsigned int head = 0; |
| 16 | + volatile unsigned int tail = 0; |
| 17 | + |
| 18 | +public: |
| 19 | + RingBuffer() |
| 20 | + { |
| 21 | + memset((void*) backing_buf, 0, sizeof(backing_buf)); |
| 22 | + head = 0; |
| 23 | + tail = 0; |
| 24 | + __sync_synchronize(); |
| 25 | + } |
| 26 | + |
| 27 | + // Reader-side functions |
| 28 | + bool empty(const unsigned int& tail_local) const |
| 29 | + { |
| 30 | + return head == tail_local; |
| 31 | + } |
| 32 | + |
| 33 | + T read(unsigned int& tail_local) |
| 34 | + { |
| 35 | + while (empty(tail_local)); // Buffer is empty, block. |
| 36 | + |
| 37 | + T data = std::move(backing_buf[tail_local]); |
| 38 | + tail_local = (tail_local + 1) % buffer_size; |
| 39 | + tail = tail_local; |
| 40 | + return std::move(data); |
| 41 | + } |
| 42 | + |
| 43 | + // Writer-side functions |
| 44 | + bool full(const unsigned int& head_local) const |
| 45 | + { |
| 46 | + return (head_local + 1) % buffer_size == tail; |
| 47 | + } |
| 48 | + |
| 49 | + void write(T data, unsigned int& head_local) |
| 50 | + { |
| 51 | + while (full(head_local)); // Buffer is full, block. |
| 52 | + |
| 53 | + backing_buf[head_local] = std::move(data); |
| 54 | + head_local = (head_local + 1) % buffer_size; |
| 55 | + head = head_local; |
| 56 | + } |
| 57 | +}; |
| 58 | + |
| 59 | +struct Request |
| 60 | +{ |
| 61 | + char *str; |
| 62 | + uint64_t result; |
| 63 | + |
| 64 | + Request(char *str, int result) : str(strdup(str)), result(result) {} |
| 65 | + ~Request() { free(str); } |
| 66 | +}; |
| 67 | + |
| 68 | +// Work queue |
| 69 | +RingBuffer<Request*> wq; |
| 70 | +// Results queue |
| 71 | +RingBuffer<std::unique_ptr<Request>, 65536*2> rq; |
| 72 | + |
| 73 | +void* thread_consumer(void* arg) |
| 74 | +{ |
| 75 | + uint64_t i = 0; |
| 76 | + unsigned int wq_tail = 0; |
| 77 | + unsigned int rq_head = 0; |
| 78 | + |
| 79 | + while (1) { |
| 80 | + auto data = wq.read(wq_tail); |
| 81 | + data->result = strlen(data->str); |
| 82 | + rq.write(std::unique_ptr<Request>(data), rq_head); |
| 83 | + } |
| 84 | +} |
| 85 | + |
| 86 | +unsigned int get_number(bool prompt=true) |
| 87 | +{ |
| 88 | + unsigned int num; |
| 89 | + if (prompt) { |
| 90 | + printf("> "); |
| 91 | + fflush(stdout); |
| 92 | + } |
| 93 | + scanf("%u", &num); |
| 94 | + int c; |
| 95 | + do { |
| 96 | + c = getchar(); |
| 97 | + } while(c != EOF && c != '\n'); |
| 98 | + return num; |
| 99 | +} |
| 100 | + |
| 101 | +void manage_results(std::vector<std::unique_ptr<Request>>& results) |
| 102 | +{ |
| 103 | + printf("%lu results:\n", results.size()); |
| 104 | + if (!results.size()) |
| 105 | + return; |
| 106 | + for (int i = 0; i < results.size(); i++) { |
| 107 | + auto& result = results[i]; |
| 108 | + printf("#%d: %s", i, result.get() ? result->str : "<deleted>"); |
| 109 | + } |
| 110 | + printf("Choose a result: #"); |
| 111 | + fflush(stdout); |
| 112 | + unsigned int i = get_number(false); |
| 113 | + if (i >= results.size()) |
| 114 | + exit(1); |
| 115 | + auto& result = results[i]; |
| 116 | + printf("Result #%d selected\n", i); |
| 117 | + if (!result) { |
| 118 | + puts("<deleted>"); |
| 119 | + return; |
| 120 | + } |
| 121 | + switch (get_number()) { |
| 122 | + // View result |
| 123 | + case 1: { |
| 124 | + printf("Input: %s\n", result->str); |
| 125 | + printf("Result: %ld\n", result->result); |
| 126 | + return; |
| 127 | + } |
| 128 | + |
| 129 | + // Delete result |
| 130 | + case 2: |
| 131 | + puts("Result deleted"); |
| 132 | + result.reset(); |
| 133 | + return; |
| 134 | + |
| 135 | + default: |
| 136 | + exit(1); |
| 137 | + } |
| 138 | + return; |
| 139 | +} |
| 140 | + |
| 141 | +void thread_producer() |
| 142 | +{ |
| 143 | + char buf[64]; |
| 144 | + memset(buf, 0, sizeof(buf)); |
| 145 | + |
| 146 | + unsigned int wq_head = 0; |
| 147 | + unsigned int rq_tail = 0; |
| 148 | + |
| 149 | + std::vector<std::unique_ptr<Request>> results; |
| 150 | + |
| 151 | + while (1) { |
| 152 | + switch (get_number()) { |
| 153 | + // New request |
| 154 | + case 1: { |
| 155 | + puts("How many requests in this job?"); |
| 156 | + unsigned int count = get_number(); |
| 157 | + if (count > 100000) { |
| 158 | + puts("Too many!"); |
| 159 | + exit(1); |
| 160 | + } |
| 161 | + for (unsigned int i = 0; i < count; i++) { |
| 162 | + if (!fgets_unlocked(buf, sizeof(buf), stdin)) |
| 163 | + exit(0); |
| 164 | + |
| 165 | + wq.write(new Request{buf, 0}, wq_head); |
| 166 | + } |
| 167 | + break; |
| 168 | + } |
| 169 | + |
| 170 | + // Receive results |
| 171 | + case 2: { |
| 172 | + unsigned int n = 0; |
| 173 | + while (!rq.empty(rq_tail)) { |
| 174 | + results.push_back(std::unique_ptr<Request>(rq.read(rq_tail))), n++; |
| 175 | + } |
| 176 | + printf("Received %u results\n", n); |
| 177 | + break; |
| 178 | + } |
| 179 | + |
| 180 | + // Manage results |
| 181 | + case 3: |
| 182 | + manage_results(results); |
| 183 | + break; |
| 184 | + |
| 185 | + // Clear results |
| 186 | + case 4: |
| 187 | + puts("All saved results cleared"); |
| 188 | + results.clear(); |
| 189 | + break; |
| 190 | + |
| 191 | + // Exit |
| 192 | + case 5: |
| 193 | + puts("Bye"); |
| 194 | + |
| 195 | + default: |
| 196 | + exit(1); |
| 197 | + } |
| 198 | + |
| 199 | + } |
| 200 | +} |
| 201 | + |
| 202 | +int main() |
| 203 | +{ |
| 204 | + alarm(60); |
| 205 | + |
| 206 | + puts("hihgly scalable strlen() service"); |
| 207 | + puts("1. New job"); |
| 208 | + puts("2. Receive results"); |
| 209 | + puts("3. Manage results"); |
| 210 | + puts("3.1. View result"); |
| 211 | + puts("3.2. Delete result"); |
| 212 | + puts("4. Clear results history"); |
| 213 | + puts("5. Exit"); |
| 214 | + |
| 215 | + pthread_t pt_C; |
| 216 | + pthread_create(&pt_C, 0, thread_consumer, 0); |
| 217 | + |
| 218 | + thread_producer(); |
| 219 | +} |
0 commit comments