Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pubsub server #156

Open
wants to merge 9 commits into
base: master
Choose a base branch
from
Open

Conversation

thinkingfish
Copy link
Contributor

This is a pubsub implementation that's compatible with Redis' pub/sub, but it's a standalone server, not mixed with other redis features.

Also, the publisher does not need to wait for the message fanout to complete before receiving a response, which fully decouples publishing from subscribing.

This is a prototype and many error/corner cases are not yet handled. Nor are unit tests added for the corresponding modules.


if (processor->write(s) < 0) {
log_debug("handler signals channel termination");
s->ch->state = CHANNEL_TERM;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

c->state = CHANNEL_TERM to be consistent with the one above

This comment was marked as spam.

struct element el_r = {.type = ELEM_INT}; /* reply, an integer */
struct listener *l;
struct topic *t;
uint32_t nsub = 0;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe this would be more readable without this temporary variable and just using el_r.num

This comment was marked as spam.

log_debug("listener not added");
}
if (!listener_add_topic(l, t)) {
log_debug("topic not added");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these errors should probably write an error message to the client instead of failing silently?

This comment was marked as spam.

if (prev == NULL) {
SLIST_REMOVE_HEAD(bucket, l_sle);
} else {
SLIST_REMOVE_AFTER(prev, l_sle);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what would happen if s was not on the list? prev'd be the last element and it'll segfault or something? should there be an ASSERT(l != NULL)?

This comment was marked as spam.

t->name.len = name->len;
t->name.data = cc_alloc(t->name.len);
if (t->name.data == NULL) {
cc_free(t);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

missing cc_free(t->idx);

This comment was marked as spam.


/*
* a topic is an endpoint that clients can subscribe to, equivalent to
* "channel" in the original redis protocol.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there any reason to call it topic instead of channel? do we refer to something else as channel? you could add the reason to the comment here

This comment was marked as spam.

if (!admin_init) {
log_warn("%s has never been setup", PUBSUB_ADMIN_MODULE_NAME);
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cc_free(buf);?

This comment was marked as spam.

#include <stream/cc_sockio.h>

/*
* a listener is a client that has subscribed to at least one channel
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought that a listener may be subscribed to 0 channels (after unsubscribe it is not deleted)?

This comment was marked as spam.

@@ -10,7 +10,7 @@
#include <sysexits.h>

void
core_run(void *arg_worker)
worker_run(void *arg_worker)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why are we renaming this? I think this name can cause some confusion, since it creates threads for both worker and server threads.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe rename these to core_worker_run and core_pubsub_run

@CLAassistant
Copy link

CLAassistant commented Nov 17, 2019

CLA assistant check
Thank you for your submission! We really appreciate it. Like many open source projects, we ask that you sign our Contributor License Agreement before we can accept your contribution.


Yao Yue seems not to be a GitHub user. You need a GitHub account to be able to sign the CLA. If you have already a GitHub account, please add the email address used for this commit to your account.
You have signed the CLA already but the status is still pending? Let us recheck it.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants