Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pubsub server #156

Open
wants to merge 9 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ option(HAVE_ASSERT_PANIC "assert_panic disabled by default" OFF)
option(HAVE_LOGGING "logging enabled by default" ON)
option(HAVE_STATS "stats enabled by default" ON)
option(TARGET_PINGSERVER "build pingserver binary" ON)
option(TARGET_PUBSUB "build pubsub binary" ON)
option(TARGET_SLIMCACHE "build slimcache binary" ON)
option(TARGET_TWEMCACHE "build twemcache binary" ON)
option(COVERAGE "code coverage" OFF)
Expand Down
3 changes: 3 additions & 0 deletions config/pubsub.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
debug_log_level: 6
# debug_log_file: pubsub.log
# debug_log_nbuf: 16384
1 change: 1 addition & 0 deletions src/core/context.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,6 @@ struct context {
};

bool admin_init;
bool pubsub_init;
bool server_init;
bool worker_init;
31 changes: 30 additions & 1 deletion src/core/core.c
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
#include <sysexits.h>

void
core_run(void *arg_worker)
worker_run(void *arg_worker)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why are we renaming this? I think this name can cause some confusion, since it creates threads for both worker and server threads.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe rename these to core_worker_run and core_pubsub_run

{
pthread_t worker, server;
int ret;
Expand All @@ -37,3 +37,32 @@ core_run(void *arg_worker)
error:
exit(EX_OSERR);
}

void
pubsub_run(void *arg_pubsub)
{
pthread_t pubsub, server;
int ret;

if (!admin_init || !server_init || !pubsub_init) {
log_crit("cannot run: admin/server/pubsub have to be initialized");
return;
}

ret = pthread_create(&pubsub, NULL, core_pubsub_evloop, arg_pubsub);
if (ret != 0) {
log_crit("pthread create failed for pubsub thread: %s", strerror(ret));
goto error;
}

ret = pthread_create(&server, NULL, core_server_evloop, NULL);
if (ret != 0) {
log_crit("pthread create failed for server thread: %s", strerror(ret));
goto error;
}

core_admin_evloop();

error:
exit(EX_OSERR);
}
4 changes: 3 additions & 1 deletion src/core/core.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@
*/

#include "admin/admin.h"
#include "data/pubsub.h"
#include "data/shared.h"
#include "data/server.h"
#include "data/worker.h"

void core_run(void *arg_worker);
void worker_run(void *arg_worker);
void pubsub_run(void *arg_pubsub);
1 change: 1 addition & 0 deletions src/core/data/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
set(SOURCE
${SOURCE}
${CMAKE_CURRENT_SOURCE_DIR}/pubsub.c
${CMAKE_CURRENT_SOURCE_DIR}/server.c
${CMAKE_CURRENT_SOURCE_DIR}/worker.c
PARENT_SCOPE)
302 changes: 302 additions & 0 deletions src/core/data/pubsub.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,302 @@
#include "pubsub.h"

#include "shared.h"

#include <time/time.h>

#include <buffer/cc_buf.h>
#include <buffer/cc_dbuf.h>
#include <cc_debug.h>
#include <cc_event.h>
#include <cc_ring_array.h>
#include <channel/cc_channel.h>
#include <channel/cc_pipe.h>
#include <channel/cc_tcp.h>

#include <stream/cc_sockio.h>

#include <sysexits.h>

#define PUBSUB_MODULE_NAME "core::pubsub"

pubsub_metrics_st *pubsub_metrics = NULL;

static struct context context;
struct context *ctx = &context;

static channel_handler_st handlers;
static channel_handler_st *hdl = &handlers;

struct pubsub_processor *processor;

static inline rstatus_i
_pubsub_write(struct buf_sock *s)
{
rstatus_i status;

log_verb("writing on buf_sock %p", s);

ASSERT(s != NULL);
ASSERT(s->wbuf != NULL && s->rbuf != NULL);

status = buf_tcp_write(s);

return status;
}

/* the caller only needs to check the return status of this function if
* it previously received a write event and wants to re-register the
* read event upon full, successful write.
*/
static inline rstatus_i
_pubsub_event_write(struct buf_sock *s)
{
rstatus_i status;
struct tcp_conn *c = s->ch;

status = _pubsub_write(s);
if (status == CC_ERETRY || status == CC_EAGAIN) { /* retry write */
/* by removing current masks and only listen to write event(s), we are
* effectively stopping processing incoming data until we can write
* something to the (kernel) buffer for the channel. This is sensible
* because either the local network or the client is backed up when
* kernel write buffer is full, and this allows us to propagate back
* pressure to the sending side.
*/

event_del(ctx->evb, hdl->wid(c));
event_add_write(ctx->evb, hdl->wid(c), s);
} else if (status == CC_ERROR) {
c->state = CHANNEL_TERM;
}

if (processor->write(s) < 0) {
log_debug("handler signals channel termination");
s->ch->state = CHANNEL_TERM;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

c->state = CHANNEL_TERM to be consistent with the one above

This comment was marked as spam.

return CC_ERROR;
}

return status;
}

static inline void
_pubsub_read(struct buf_sock *s)
{
log_verb("reading on buf_sock %p", s);

ASSERT(s != NULL);
ASSERT(s->wbuf != NULL && s->rbuf != NULL);

/* TODO(kyang): consider refactoring dbuf_tcp_read and buf_tcp_read to have no return status
at all, since the return status is already given by the connection state */
buf_tcp_read(s);
}

static inline void
pubsub_close(struct buf_sock *s)
{
log_info("pubsub core close on buf_sock %p", s);

processor->error(s);
event_del(ctx->evb, hdl->rid(s->ch));
hdl->term(s->ch);
buf_sock_return(&s);
}

/* read event over an existing connection */
static inline void
_pubsub_event_read(struct buf_sock *s)
{
ASSERT(s != NULL);

_pubsub_read(s);
if (processor->read(s) < 0) {
log_debug("handler signals channel termination");
s->ch->state = CHANNEL_TERM;
return;
}
if (buf_rsize(s->wbuf) > 0) {
log_verb("attempt to write");
_pubsub_event_write(s);
}
}

static void
pubsub_add_conn(void)
{
struct buf_sock *s;
char buf[RING_ARRAY_DEFAULT_CAP]; /* buffer for discarding pipe data */
int i;
rstatus_i status;

/* server pushes connection on to the ring array before writing to the pipe,
* therefore, we should read from the pipe first and take the connections
* off the ring array to match the number of bytes received.
*
* Once we move server to its own thread, it is possible that there are more
* connections added to the queue when we are processing, it is OK to wait
* for the next read event in that case.
*/

i = pipe_recv(pipe_c, buf, RING_ARRAY_DEFAULT_CAP);
if (i < 0) { /* errors, do not read from ring array */
log_warn("not adding new connections due to pipe error");
return;
}

/* each byte in the pipe corresponds to a new connection, which we will
* now get from the ring array
*/
for (; i > 0; --i) {
status = ring_array_pop(&s, conn_arr);
if (status != CC_OK) {
log_warn("event number does not match conn queue: missing %d conns",
i);
return;
}
log_verb("Adding new buf_sock %p to pubsub thread", s);
s->owner = ctx;
s->hdl = hdl;
event_add_read(ctx->evb, hdl->rid(s->ch), s);
}
}

static void
_pubsub_event(void *arg, uint32_t events)
{
struct buf_sock *s = arg;
log_verb("pubsub event %06"PRIX32" on buf_sock %p", events, s);

if (s == NULL) {
/* event on pipe_c, new connection */
if (events & EVENT_READ) {
pubsub_add_conn();
} else if (events & EVENT_ERR) {
log_error("error event received on conn_fds pipe");
} else {
/* there should never be any write events on the pipe from pubsub */
NOT_REACHED();
}
} else {
/* event on one of the connections */

if (events & EVENT_READ) {
log_verb("processing pubsub read event on buf_sock %p", s);
INCR(pubsub_metrics, pubsub_event_read);
_pubsub_event_read(s);
} else if (events & EVENT_WRITE) {
log_verb("processing pubsub write event on buf_sock %p", s);
INCR(pubsub_metrics, pubsub_event_write);
if (_pubsub_event_write(s) == CC_OK) {
/* write backlog cleared up, re-add read event (only) */
event_del(ctx->evb, hdl->wid(s->ch));
event_add_read(ctx->evb, hdl->rid(s->ch), s);
}
} else if (events & EVENT_ERR) {
s->ch->state = CHANNEL_TERM;
INCR(pubsub_metrics, pubsub_event_error);
} else {
NOT_REACHED();
}

/* TODO(yao): come up with a robust policy about channel connection
* and pending data. Since an error can either be server (usually
* memory) issues or client issues (bad syntax etc), or requested (quit)
* it is hard to determine whether the channel should be immediately
* closed or not. A simplistic approach might be to always close asap,
* and clients should not initiate closing unless they have received all
* their responses. This is not as nice as the TCP half-close behavior,
* but simpler to implement and probably fine initially.
*/
if (s->ch->state == CHANNEL_TERM || s->ch->state == CHANNEL_ERROR) {
pubsub_close(s);
}
}
}

void
core_pubsub_setup(pubsub_options_st *options, pubsub_metrics_st *metrics)
{
int timeout = PUBSUB_TIMEOUT;
int nevent = PUBSUB_NEVENT;

log_info("set up the %s module", PUBSUB_MODULE_NAME);

if (pubsub_init) {
log_warn("pubsub has already been setup, re-creating");
core_pubsub_teardown();
}

pubsub_metrics = metrics;

if (options != NULL) {
timeout = option_uint(&options->pubsub_timeout);
nevent = option_uint(&options->pubsub_nevent);
}

ctx->timeout = timeout;
ctx->evb = event_base_create(nevent, _pubsub_event);
if (ctx->evb == NULL) {
log_crit("failed to setup pubsub thread core; could not create event_base");
exit(EX_CONFIG);
}

hdl->accept = (channel_accept_fn)tcp_accept;
hdl->reject = (channel_reject_fn)tcp_reject;
hdl->open = (channel_open_fn)tcp_listen;
hdl->term = (channel_term_fn)tcp_close;
hdl->recv = (channel_recv_fn)tcp_recv;
hdl->send = (channel_send_fn)tcp_send;
hdl->rid = (channel_id_fn)tcp_read_id;
hdl->wid = (channel_id_fn)tcp_write_id;

event_add_read(ctx->evb, pipe_read_id(pipe_c), NULL);

pubsub_init = true;
}

void
core_pubsub_teardown(void)
{
log_info("tear down the %s module", PUBSUB_MODULE_NAME);

if (!pubsub_init) {
log_warn("%s has never been setup", PUBSUB_MODULE_NAME);
} else {
event_base_destroy(&(ctx->evb));
}
pubsub_metrics = NULL;
pubsub_init = false;
}

static rstatus_i
_pubsub_evwait(void)
{
int n;

n = event_wait(ctx->evb, ctx->timeout);
if (n < 0) {
return n;
}

INCR(pubsub_metrics, pubsub_event_loop);
INCR_N(pubsub_metrics, pubsub_event_total, n);
time_update();

return CC_OK;
}

void *
core_pubsub_evloop(void *arg)
{
processor = arg;

for(;;) {
if (_pubsub_evwait() != CC_OK) {
log_crit("pubsub core event loop exited due to failure");
break;
}
}

exit(1);
}
Loading