diff --git a/commandline.c b/commandline.c index 00c77b29..d49b3b15 100644 --- a/commandline.c +++ b/commandline.c @@ -220,7 +220,7 @@ JNIEXPORT jint JNICALL Java_org_servalproject_servald_ServalD_rawCommand(JNIEnv */ int parseCommandLine(struct cli_context *context, const char *argv0, int argc, const char *const *args) { - fd_clearstats(); + fd_clearstats(current_fdqueue()); IN(); struct cli_parsed parsed; @@ -228,9 +228,11 @@ int parseCommandLine(struct cli_context *context, const char *argv0, int argc, c switch (result) { case 0: // Do not run the command if the configuration does not load ok. - if (((parsed.commands[parsed.cmdi].flags & CLIFLAG_PERMISSIVE_CONFIG) ? cf_reload_permissive() : cf_reload()) != -1) + if (((parsed.commands[parsed.cmdi].flags & CLIFLAG_PERMISSIVE_CONFIG) ? cf_reload_permissive() : cf_reload()) != -1) { + fdqueues_init(); result = cli_invoke(&parsed, context); - else { + fdqueues_free(); + } else { strbuf b = strbuf_alloca(160); strbuf_append_argv(b, argc, args); result = WHYF("configuration defective, not running command: %s", strbuf_str(b)); @@ -253,8 +255,12 @@ int parseCommandLine(struct cli_context *context, const char *argv0, int argc, c rhizome_close_db(); OUT(); - if (config.debug.timing) - fd_showstats(); + if (config.debug.timing) { + INFO("Main thread stats:"); + fd_showstats(&main_fdqueue); + INFO("Rhizome thread stats:"); + fd_showstats(&rhizome_fdqueue); + } return result; } diff --git a/fdqueue.c b/fdqueue.c index 63b54a55..f1bd19df 100644 --- a/fdqueue.c +++ b/fdqueue.c @@ -18,43 +18,113 @@ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. */ #include +#include +#include #include "serval.h" #include "conf.h" #include "str.h" #include "strbuf.h" #include "strbuf_helpers.h" +#include "fdqueue.h" +#include "parallel.h" -#define MAX_WATCHED_FDS 128 -struct pollfd fds[MAX_WATCHED_FDS]; -int fdcount=0; -struct sched_ent *fd_callbacks[MAX_WATCHED_FDS]; -struct sched_ent *next_alarm=NULL; -struct sched_ent *next_deadline=NULL; -struct profile_total poll_stats={NULL,0,"Idle (in poll)",0,0,0}; +#ifndef PTHREAD_RECURSIVE_MUTEX_INITIALIZER +#define PTHREAD_RECURSIVE_MUTEX_INITIALIZER \ + PTHREAD_RECURSIVE_MUTEX_INITIALIZER_NP +#endif + +fdqueue main_fdqueue = { + .poll_stats = { .name = "Main fdqueue" }, + .mutex = PTHREAD_RECURSIVE_MUTEX_INITIALIZER, + .cond_is_active = PTHREAD_COND_INITIALIZER, + .cond_change = PTHREAD_COND_INITIALIZER +}; + +fdqueue rhizome_fdqueue = { + .poll_stats = { .name = "Rhizome fdqueue" }, + .mutex = PTHREAD_RECURSIVE_MUTEX_INITIALIZER, + .cond_is_active = PTHREAD_COND_INITIALIZER, + .cond_change = PTHREAD_COND_INITIALIZER +}; + +fdqueue *current_fdqueue(void) { + if (pthread_self() == rhizome_thread) { + return &rhizome_fdqueue; + } + return &main_fdqueue; +} + +static inline int is_active(fdqueue *fdq) { + return fdq->next_alarm || fdq->next_deadline || fdq->fdcount > 0; +} + +static void fdqueue_init(fdqueue *fdq) { + fdq->fds = fdq->intfds + 1; + if (pipe(fdq->pipefd)) { + FATAL("pipe failed"); + } + fdq->intfds[0].fd = fdq->pipefd[0]; + fdq->intfds[0].events = POLLIN; +} + +void fdqueues_init(void) { + fdqueue_init(&main_fdqueue); + fdqueue_init(&rhizome_fdqueue); +} + +static void fdqueue_free(fdqueue *fdq) { + close(fdq->pipefd[0]); + close(fdq->pipefd[1]); + fdq->intfds[0].fd = 0; +} + +void fdqueues_free(void) { + fdqueue_free(&main_fdqueue); + fdqueue_free(&rhizome_fdqueue); +} + +/* write 1 char to pipe fd for poll() to always be non-blocking */ +static inline void add_poll_nonblock(fdqueue *fdq) { + char c = 0; + write(fdq->pipefd[1], &c, 1); +} + +/* read 1 char from pipe fd */ +static inline void remove_poll_nonblock(fdqueue *fdq) { + char c; + read(fdq->pipefd[0], &c, 1); +} #define alloca_alarm_name(alarm) ((alarm)->stats ? alloca_str_toprint((alarm)->stats->name) : "Unnamed") -void list_alarms() +void list_alarms(fdqueue *fdq) { DEBUG("Alarms;"); + + add_poll_nonblock(fdq); + pthread_mutex_lock(&fdq->mutex); + remove_poll_nonblock(fdq); + time_ms_t now = gettime_ms(); struct sched_ent *alarm; - for (alarm = next_deadline; alarm; alarm = alarm->_next) + for (alarm = fdq->next_deadline; alarm; alarm = alarm->_next) DEBUGF("%p %s deadline in %lldms", alarm->function, alloca_alarm_name(alarm), alarm->deadline - now); - for (alarm = next_alarm; alarm; alarm = alarm->_next) + for (alarm = fdq->next_alarm; alarm; alarm = alarm->_next) DEBUGF("%p %s in %lldms, deadline in %lldms", alarm->function, alloca_alarm_name(alarm), alarm->alarm - now, alarm->deadline - now); DEBUG("File handles;"); int i; - for (i = 0; i < fdcount; ++i) - DEBUGF("%s watching #%d", alloca_alarm_name(fd_callbacks[i]), fds[i].fd); + for (i = 0; i < fdq->fdcount; ++i) + DEBUGF("%s watching #%d", alloca_alarm_name(fdq->fd_callbacks[i]), fdq->fds[i].fd); + pthread_mutex_unlock(&fdq->mutex); } -int deadline(struct sched_ent *alarm) +static int deadline(struct sched_ent *alarm) { - struct sched_ent *node = next_deadline, *last = NULL; + fdqueue *fdq = alarm->fdqueue; + struct sched_ent *node = fdq->next_deadline, *last = NULL; if (alarm->deadline < alarm->alarm) alarm->deadline = alarm->alarm; @@ -65,7 +135,7 @@ int deadline(struct sched_ent *alarm) node = node->_next; } if (last == NULL){ - next_deadline = alarm; + fdq->next_deadline = alarm; }else{ last->_next = alarm; } @@ -78,7 +148,23 @@ int deadline(struct sched_ent *alarm) int is_scheduled(const struct sched_ent *alarm) { - return alarm->_next || alarm->_prev || alarm == next_alarm || alarm == next_deadline; + fdqueue *fdq = alarm->fdqueue; + if (!fdq) { + return 0; + } + if (!multithread) { + fdq = &main_fdqueue; + } + int res; + + add_poll_nonblock(fdq); + pthread_mutex_lock(&fdq->mutex); + remove_poll_nonblock(fdq); + + res = alarm->_next || alarm->_prev || alarm == fdq->next_alarm + || alarm == fdq->next_deadline; + pthread_mutex_unlock(&fdq->mutex); + return res; } // add an alarm to the list of scheduled function calls. @@ -94,14 +180,30 @@ int _schedule(struct __sourceloc __whence, struct sched_ent *alarm) WARNF("schedule() called from %s() %s:%d without supplying an alarm name", __whence.function,__whence.file,__whence.line); - struct sched_ent *node = next_alarm, *last = NULL; - - if (is_scheduled(alarm)) - FATAL("Scheduling an alarm that is already scheduled"); - if (!alarm->function) return WHY("Can't schedule if you haven't set the function pointer"); - + + fdqueue *fdq = alarm->fdqueue; + if (!fdq || !multithread) { + fdq = alarm->fdqueue = &main_fdqueue; + } + + add_poll_nonblock(fdq); + pthread_mutex_lock(&fdq->mutex); + remove_poll_nonblock(fdq); + + struct sched_ent *node = fdq->next_alarm, *last = NULL; + + if (is_scheduled(alarm)) { + pthread_mutex_unlock(&fdq->mutex); + FATAL("Scheduling an alarm that is already scheduled"); + } + + if (!is_active(fdq)) { + /* it will become active before releasing the mutex */ + pthread_cond_signal(&fdq->cond_is_active); + } + time_ms_t now = gettime_ms(); if (alarm->deadline < alarm->alarm) @@ -116,9 +218,13 @@ int _schedule(struct __sourceloc __whence, struct sched_ent *alarm) } // if the alarm has already expired, move straight to the deadline queue - if (alarm->alarm <= now) - return deadline(alarm); - + if (alarm->alarm <= now) { + int res = deadline(alarm); + pthread_cond_signal(&fdq->cond_change); + pthread_mutex_unlock(&fdq->mutex); + return res; + } + while(node!=NULL){ if (node->alarm > alarm->alarm) break; @@ -126,7 +232,8 @@ int _schedule(struct __sourceloc __whence, struct sched_ent *alarm) node = node->_next; } if (last == NULL){ - next_alarm = alarm; + fdq->next_alarm = alarm; + pthread_cond_signal(&fdq->cond_change); }else{ last->_next=alarm; } @@ -134,7 +241,9 @@ int _schedule(struct __sourceloc __whence, struct sched_ent *alarm) if(node!=NULL) node->_prev = alarm; alarm->_next = node; - + + pthread_mutex_unlock(&fdq->mutex); + return 0; } @@ -145,21 +254,40 @@ int _unschedule(struct __sourceloc __whence, struct sched_ent *alarm) if (config.debug.io) DEBUGF("unschedule(alarm=%s)", alloca_alarm_name(alarm)); + fdqueue *fdq = alarm->fdqueue; + if (!fdq) { + /* was never scheduled */ + return 0; + } + if (!multithread) { + fdq = alarm->fdqueue = &main_fdqueue; + } + + add_poll_nonblock(fdq); + pthread_mutex_lock(&fdq->mutex); + remove_poll_nonblock(fdq); + struct sched_ent *prev = alarm->_prev; struct sched_ent *next = alarm->_next; - if (prev) + if (prev) { prev->_next = next; - else if(next_alarm==alarm) - next_alarm = next; - else if(next_deadline==alarm) - next_deadline = next; + } else if (fdq->next_alarm == alarm) { + fdq->next_alarm = next; + pthread_cond_signal(&fdq->cond_change); + } else if (fdq->next_deadline == alarm) { + fdq->next_deadline = next; + pthread_cond_signal(&fdq->cond_change); + } if (next) next->_prev = prev; alarm->_prev = NULL; alarm->_next = NULL; + + pthread_mutex_unlock(&fdq->mutex); + return 0; } @@ -174,22 +302,42 @@ int _watch(struct __sourceloc __whence, struct sched_ent *alarm) if (!alarm->function) return WHY("Can't watch if you haven't set the function pointer"); - - if (alarm->_poll_index>=0 && fd_callbacks[alarm->_poll_index]==alarm){ + + fdqueue *fdq = alarm->fdqueue; + if (!fdq || !multithread) { + fdq = alarm->fdqueue = &main_fdqueue; + } + + add_poll_nonblock(fdq); + pthread_mutex_lock(&fdq->mutex); + remove_poll_nonblock(fdq); + + if (!is_active(fdq)) { + /* it will become active before releasing the mutex */ + pthread_cond_signal(&fdq->cond_is_active); + } + pthread_cond_signal(&fdq->cond_change); + + if (alarm->_poll_index >= 0 && fdq->fd_callbacks[alarm->_poll_index] == alarm) { // updating event flags if (config.debug.io) DEBUGF("Updating watch %s, #%d for %d", alloca_alarm_name(alarm), alarm->poll.fd, alarm->poll.events); }else{ if (config.debug.io) DEBUGF("Adding watch %s, #%d for %d", alloca_alarm_name(alarm), alarm->poll.fd, alarm->poll.events); - if (fdcount>=MAX_WATCHED_FDS) + if (fdq->fdcount >= MAX_WATCHED_FDS) { + pthread_mutex_unlock(&fdq->mutex); return WHY("Too many file handles to watch"); - fd_callbacks[fdcount]=alarm; + } + fdq->fd_callbacks[fdq->fdcount] = alarm; alarm->poll.revents = 0; - alarm->_poll_index=fdcount; - fdcount++; + alarm->_poll_index = fdq->fdcount; + fdq->fdcount++; } - fds[alarm->_poll_index]=alarm->poll; + fdq->fds[alarm->_poll_index] = alarm->poll; + + pthread_mutex_unlock(&fdq->mutex); + return 0; } @@ -199,22 +347,36 @@ int _unwatch(struct __sourceloc __whence, struct sched_ent *alarm) if (config.debug.io) DEBUGF("unwatch(alarm=%s)", alloca_alarm_name(alarm)); + fdqueue *fdq = alarm->fdqueue; + if (!multithread) { + fdq = alarm->fdqueue = &main_fdqueue; + } + + add_poll_nonblock(fdq); + pthread_mutex_lock(&fdq->mutex); + remove_poll_nonblock(fdq); + int index = alarm->_poll_index; - if (index <0 || fds[index].fd!=alarm->poll.fd) + if (index < 0 || fdq->fds[index].fd != alarm->poll.fd) { + pthread_mutex_unlock(&fdq->mutex); return WHY("Attempted to unwatch a handle that is not being watched"); - - fdcount--; - if (index!=fdcount){ + } + + fdq->fdcount--; + if (index != fdq->fdcount) { // squash fds - fds[index] = fds[fdcount]; - fd_callbacks[index] = fd_callbacks[fdcount]; - fd_callbacks[index]->_poll_index=index; + fdq->fds[index] = fdq->fds[fdq->fdcount]; + fdq->fd_callbacks[index] = fdq->fd_callbacks[fdq->fdcount]; + fdq->fd_callbacks[index]->_poll_index = index; } - fds[fdcount].fd=-1; - fd_callbacks[fdcount]=NULL; + fdq->fds[fdq->fdcount].fd = -1; + fdq->fd_callbacks[fdq->fdcount] = NULL; alarm->_poll_index=-1; if (config.debug.io) DEBUGF("%s stopped watching #%d for %d", alloca_alarm_name(alarm), alarm->poll.fd, alarm->poll.events); + + pthread_mutex_unlock(&fdq->mutex); + return 0; } @@ -223,6 +385,7 @@ static void call_alarm(struct sched_ent *alarm, int revents) IN(); if (!alarm) FATAL("Attempted to call with no alarm"); + fdqueue *fdq = alarm->fdqueue; struct call_stats call_stats; call_stats.totals = alarm->stats; @@ -230,123 +393,162 @@ static void call_alarm(struct sched_ent *alarm, int revents) alarm, alloca_alarm_name(alarm)); if (call_stats.totals) - fd_func_enter(__HERE__, &call_stats); + fd_func_enter(__HERE__, fdq, &call_stats); alarm->poll.revents = revents; + pthread_mutex_unlock(&fdq->mutex); alarm->function(alarm); + pthread_mutex_lock(&fdq->mutex); if (call_stats.totals) - fd_func_exit(__HERE__, &call_stats); + fd_func_exit(__HERE__, fdq, &call_stats); if (config.debug.io) DEBUGF("Alarm %p returned",alarm); OUT(); } -int fd_poll() +int fd_poll(fdqueue *fdq, int wait) { IN(); - int i, r=0; - int ms=60000; - time_ms_t now = gettime_ms(); - - if (!next_alarm && !next_deadline && fdcount==0) - RETURN(0); - - /* move alarms that have elapsed to the deadline queue */ - while (next_alarm!=NULL&&next_alarm->alarm <=now){ - struct sched_ent *alarm = next_alarm; - unschedule(alarm); - deadline(alarm); - } - - /* work out how long we can block in poll */ - if (next_deadline) - ms = 0; - else if (next_alarm){ - ms = next_alarm->alarm - now; + if (!multithread) { + fdq = &main_fdqueue; } - - /* Make sure we don't have any silly timeouts that will make us wait forever. */ - if (ms<0) ms=0; - - /* check if any file handles have activity */ - { + pthread_mutex_lock(&fdq->mutex); + int i, r=0; + int invalidated; + int ms; + time_ms_t now; + + do { + invalidated = 0; + if (wait) { + while (!is_active(fdq)) { + pthread_cond_wait(&fdq->cond_is_active, &fdq->mutex); + } + } else { + if (!is_active(fdq)) { + pthread_mutex_unlock(&fdq->mutex); + RETURN(0); + } + } + + now = gettime_ms(); + + /* move alarms that have elapsed to the deadline queue */ + while (fdq->next_alarm && fdq->next_alarm->alarm <= now) { + struct sched_ent *alarm = fdq->next_alarm; + unschedule(alarm); + deadline(alarm); + } + + /* check if any file handles have activity */ struct call_stats call_stats; - call_stats.totals=&poll_stats; - fd_func_enter(__HERE__, &call_stats); - if (fdcount==0){ - if (ms>=1000) - sleep(ms/1000); - else - usleep(ms*1000); - }else{ - if (config.debug.io) DEBUGF("poll(X,%d,%d)",fdcount,ms); - r = poll(fds, fdcount, ms); - if (config.debug.io) { - strbuf b = strbuf_alloca(1024); - int i; - for (i = 0; i < fdcount; ++i) { - if (i) - strbuf_puts(b, ", "); - strbuf_sprintf(b, "%d:", fds[i].fd); - strbuf_append_poll_events(b, fds[i].events); - strbuf_putc(b, ':'); - strbuf_append_poll_events(b, fds[i].revents); - } - DEBUGF("poll(fds=(%s), fdcount=%d, ms=%d) = %d", strbuf_str(b), fdcount, ms, r); + call_stats.totals = &fdq->poll_stats; + fd_func_enter(__HERE__, fdq, &call_stats); + if (fdq->fdcount == 0) { + if (fdq->fdcount == 0 && !fdq->next_deadline) { + /* wait for the next alarm or the next change */ + struct timespec timeout; + MS_TO_TIMESPEC(fdq->next_alarm->alarm, &timeout); + int retcode = + pthread_cond_timedwait(&fdq->cond_change, &fdq->mutex, &timeout); + invalidated = retcode != ETIMEDOUT; + } + } else { + if (fdq->next_deadline) { + ms = 0; + } else if (fdq->next_alarm) { + ms = fdq->next_alarm->alarm - now; + if (ms < 0) { + ms = 0; + } + } else { + /* infinite timeout */ + ms = -1; + } + + if (config.debug.io) DEBUGF("poll(X,%d,%d)", fdq->fdcount, ms); + + /* poll on fdq->intdfs which contains fds + the interrupt fd */ + r = poll(fdq->intfds, fdq->fdcount + 1, ms); + + if (fdq->intfds[0].revents) { + /* interrupted (another thread wants the mutex) */ + invalidated = 1; + pthread_mutex_unlock(&fdq->mutex); + usleep(1000); /* release the lock for 1 ms */ + pthread_mutex_lock(&fdq->mutex); + } else { + if (config.debug.io) { + strbuf b = strbuf_alloca(1024); + int i; + for (i = 0; i < fdq->fdcount; ++i) { + if (i) + strbuf_puts(b, ", "); + strbuf_sprintf(b, "%d:", fdq->fds[i].fd); + strbuf_append_poll_events(b, fdq->fds[i].events); + strbuf_putc(b, ':'); + strbuf_append_poll_events(b, fdq->fds[i].revents); + } + DEBUGF("poll(fds=(%s), fdcount=%d, ms=%d) = %d", strbuf_str(b), fdq->fdcount, ms, r); + } } } - fd_func_exit(__HERE__, &call_stats); + fd_func_exit(__HERE__, fdq, &call_stats); now=gettime_ms(); - } + } while (invalidated); // Reading new data takes priority over everything else // Are any handles marked with POLLIN? - int in_count=0; - if (r>0){ - for (i=0;i 0) { + for (i = 0; i < fdq->fdcount; i++) + if (fdq->fds[i].revents & POLLIN) in_count++; } /* call one alarm function, but only if its deadline time has elapsed OR there is no incoming file activity */ - if (next_deadline && (next_deadline->deadline <=now || (in_count==0))){ - struct sched_ent *alarm = next_deadline; + if (fdq->next_deadline && (fdq->next_deadline->deadline <= now || in_count == 0)) { + struct sched_ent *alarm = fdq->next_deadline; unschedule(alarm); call_alarm(alarm, 0); now=gettime_ms(); // after running a timed alarm, unless we already know there is data to read we want to check for more incoming IO before we send more outgoing. - if (in_count==0) + if (in_count==0) { + pthread_mutex_unlock(&fdq->mutex); RETURN(1); + } } /* If file descriptors are ready, then call the appropriate functions */ - if (r>0) { - for(i=fdcount -1;i>=0;i--){ - if (fds[i].revents) { + if (r > 0) { + for (i = fdq->fdcount -1; i >= 0; i--){ + if (fdq->fds[i].revents) { // if any handles have POLLIN set, don't process any other handles - if (!(fds[i].revents&POLLIN || in_count==0)) + if (!(fdq->fds[i].revents & POLLIN || in_count == 0)) continue; - - int fd = fds[i].fd; - /* Call the alarm callback with the socket in non-blocking mode */ - errno=0; - set_nonblock(fd); - // Work around OSX behaviour that doesn't set POLLERR on - // devices that have been deconfigured, e.g., a USB serial adapter - // that has been removed. - if (errno == ENXIO) fds[i].revents|=POLLERR; - call_alarm(fd_callbacks[i], fds[i].revents); - /* The alarm may have closed and unwatched the descriptor, make sure this descriptor still matches */ - if (ifds[i].fd; + /* Call the alarm callback with the socket in non-blocking mode */ + errno=0; + set_nonblock(fd); + // Work around OSX behaviour that doesn't set POLLERR on + // devices that have been deconfigured, e.g., a USB serial adapter + // that has been removed. + if (errno == ENXIO) fdq->fds[i].revents|=POLLERR; + call_alarm(fdq->fd_callbacks[i], fdq->fds[i].revents); + /* The alarm may have closed and unwatched the descriptor, make sure this descriptor still matches */ + if (i < fdq->fdcount && fdq->fds[i].fd == fd){ + if (set_block(fdq->fds[i].fd)) + FATALF("Alarm %p %s has a bad descriptor that wasn't closed!", fdq->fd_callbacks[i], alloca_alarm_name(fdq->fd_callbacks[i])); + } } } } + + pthread_mutex_unlock(&fdq->mutex); + RETURN(1); OUT(); } diff --git a/fdqueue.h b/fdqueue.h new file mode 100644 index 00000000..e07a9bbd --- /dev/null +++ b/fdqueue.h @@ -0,0 +1,73 @@ +/* + Copyright (C) 2012 Serval Project. + + This program is free software; you can redistribute it and/or + modify it under the terms of the GNU General Public License + as published by the Free Software Foundation; either version 2 + of the License, or (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA + 02110-1301, USA. + */ + +#ifndef __SERVALD_FDQUEUE_H +#define __SERVALD_FDQUEUE_H + +#include +#include "serval.h" + +#define MAX_WATCHED_FDS 128 + +/* fdqueue used for all actions (including MDP) except Rhizome */ +extern struct fdqueue main_fdqueue; + +/* fdqueue used for Rhizome actions */ +extern struct fdqueue rhizome_fdqueue; + +typedef struct fdqueue { + + /* a pipe fd is added at intfds[0] for interrupting poll() */ + int pipefd[2]; + struct pollfd intfds[MAX_WATCHED_FDS+1]; + + struct pollfd *fds; /* must be initialized to intfds + 1 */ + int fdcount; + struct sched_ent *fd_callbacks[MAX_WATCHED_FDS]; + struct sched_ent *next_alarm; + struct sched_ent *next_deadline; + struct profile_total poll_stats; + + /* thread associated with the fdqueue */ + pthread_t thread; + + /* mutex to be acquired for every fdqueue items access */ + pthread_mutex_t mutex; + + /* signaled when the queue state changes from inactive to active (see + * is_active(fdqueue *) in fdqueue.c) */ + pthread_cond_t cond_is_active; + + /* signaled when next_alarm or next_deadline is changed or when a new + * fd is being watched */ + pthread_cond_t cond_change; + + /* for performance_timing.c */ + struct profile_total *stats_head; + struct call_stats *current_call; + +} fdqueue; + +void fdqueues_init(void); +void fdqueues_free(void); + +/* return the fdqueue associated to the current thread */ +fdqueue *current_fdqueue(void); + +#endif diff --git a/headerfiles.mk b/headerfiles.mk index 45528bd9..5b26e730 100644 --- a/headerfiles.mk +++ b/headerfiles.mk @@ -22,4 +22,6 @@ HDRS= fifo.h \ constants.h \ monitor-client.h \ mdp_client.h \ - sqlite-amalgamation-3070900/sqlite3.h + sqlite-amalgamation-3070900/sqlite3.h \ + fdqueue.h \ + parallel.h diff --git a/log.h b/log.h index b6790ee3..1b673a4f 100644 --- a/log.h +++ b/log.h @@ -127,6 +127,7 @@ struct strbuf; #define FATAL(X) FATALF("%s", (X)) #define FATALF_perror(F,...) FATALF(F ": %s [errno=%d]", ##__VA_ARGS__, strerror(errno), errno) #define FATAL_perror(X) FATALF_perror("%s", (X)) +#define OUT_OF_MEMORY FATAL("Out of memory") #define WHYF(F,...) (LOGF(LOG_LEVEL_ERROR, F, ##__VA_ARGS__), -1) #define WHY(X) WHYF("%s", (X)) diff --git a/main.c b/main.c index 38279a38..040d9afa 100644 --- a/main.c +++ b/main.c @@ -33,6 +33,13 @@ int main(int argc, char **argv) srandomdev(); server_save_argv(argc, (const char*const*)argv); cf_init(); + + /* workaround: when running tests, if no trace is logged before + * fdqueues_init(), then fd 0 will not be taken for logfile, so it will be + * used by a pipe (internal to fdqueues). When starting background server, + * fd 0 will be closed, so fdqueues would be broken. */ + INFO(""); + int status = parseCommandLine(NULL, argv[0], argc - 1, (const char*const*)&argv[1]); #if defined WIN32 WSACleanup(); diff --git a/overlay.c b/overlay.c index 8f13c489..1571c5b4 100644 --- a/overlay.c +++ b/overlay.c @@ -71,6 +71,7 @@ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. #include "serval.h" #include "conf.h" #include "rhizome.h" +#include "parallel.h" #include "strbuf.h" int overlayMode=0; @@ -85,6 +86,9 @@ int overlayServerMode() send periodic traffic. This means we need to */ INFO("Running in overlay mode."); + multithread = 1; + main_fdqueue.thread = main_thread = pthread_self(); + /* Get keyring available for use. Required for MDP, and very soon as a complete replacement for the HLR for DNA lookups, even in non-overlay mode. */ @@ -104,16 +108,19 @@ int overlayServerMode() of wifi latency anyway, so we'll live with it. Larger values will affect voice transport, and smaller values would affect CPU and energy use, and make the simulation less realistic. */ -#define SCHEDULE(X, Y, D) { \ +#define SCHEDULE_Q(X, Y, D, Q) { \ static struct profile_total _stats_##X={.name="" #X "",}; \ static struct sched_ent _sched_##X={\ .stats = &_stats_##X, \ .function=X,\ }; \ -_sched_##X.alarm=(gettime_ms()+Y);\ -_sched_##X.deadline=(gettime_ms()+Y+D);\ +_sched_##X.alarm=(gettime_ms()+(Y));\ +_sched_##X.deadline=(gettime_ms()+(Y)+(D));\ +_sched_##X.fdqueue=(Q);\ schedule(&_sched_##X); } - + +#define SCHEDULE(X, Y, D) SCHEDULE_Q(X,Y,D,NULL) + /* Periodically check for server shut down */ SCHEDULE(server_shutdown_check, 0, 100); @@ -148,17 +155,23 @@ schedule(&_sched_##X); } SCHEDULE(overlay_interface_discover, 1, 100); /* Periodically advertise bundles */ - SCHEDULE(overlay_rhizome_advertise, 1000, 10000); + SCHEDULE_Q(overlay_rhizome_advertise, 1000, 10000, &rhizome_fdqueue); /* Calculate (and possibly show) CPU usage stats periodically */ SCHEDULE(fd_periodicstats, 3000, 500); #undef SCHEDULE + if (pthread_create(&rhizome_thread, NULL, rhizome_run, NULL)) { + FATAL("Cannot create rhizome pthread"); + } + // log message used by tests to wait for the server to start INFO("Server started, entering main loop"); /* Check for activitiy and respond to it */ - while(fd_poll()); + while(fd_poll(&main_fdqueue, 1)); + + pthread_join(rhizome_thread, NULL); RETURN(0); OUT(); diff --git a/overlay_mdp.c b/overlay_mdp.c index 190ea96b..34fc9097 100644 --- a/overlay_mdp.c +++ b/overlay_mdp.c @@ -26,6 +26,7 @@ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. #include "overlay_packet.h" #include "mdp_client.h" #include "crypto.h" +#include "parallel.h" struct profile_total mdp_stats={.name="overlay_mdp_poll"}; @@ -551,6 +552,14 @@ int overlay_mdp_encode_ports(struct overlay_buffer *plaintext, int dst_port, int return 0; } +void overlay_mdp_dispatch_alarm(struct sched_ent *alarm) { + ASSERT_THREAD(main_thread); + overlay_mdp_frame *mdp = alarm->context; + free(alarm); + overlay_mdp_dispatch(mdp, 0, NULL, 0); + free(mdp); +} + /* Construct MDP packet frame from overlay_mdp_frame structure (need to add return address from bindings list, and copy payload etc). @@ -560,6 +569,7 @@ int overlay_mdp_encode_ports(struct overlay_buffer *plaintext, int dst_port, int int overlay_mdp_dispatch(overlay_mdp_frame *mdp,int userGeneratedFrameP, struct sockaddr_un *recvaddr,int recvaddrlen) { + ASSERT_THREAD(main_thread); IN(); if (mdp->out.payload_length > sizeof(mdp->out.payload)) diff --git a/overlay_mdp_services.c b/overlay_mdp_services.c index e7386d1a..5b7b8070 100644 --- a/overlay_mdp_services.c +++ b/overlay_mdp_services.c @@ -28,10 +28,28 @@ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. #include "rhizome.h" #include "crypto.h" #include "log.h" +#include "parallel.h" +int rhizome_mdp_send_block(unsigned char *unicast_dest_sid, + unsigned char *bid, uint64_t version, + uint64_t file_offset, uint32_t bitmap, + uint16_t block_length); +void rhizome_mdp_send_block_alarm(struct sched_ent *alarm) +{ + ASSERT_THREAD(rhizome_thread); + struct rmsb_arg *arg = alarm->context; + free(alarm); + rhizome_mdp_send_block(arg->unicast ? arg->unicast_dest_sid : NULL, + arg->bid, arg->version, arg->file_offset, + arg->bitmap, arg->block_length); + free(arg); +} -int rhizome_mdp_send_block(struct subscriber *dest, unsigned char *id, uint64_t version, uint64_t fileOffset, uint32_t bitmap, uint16_t blockLength) +int rhizome_mdp_send_block(unsigned char *unicast_dest_sid, + unsigned char *id, uint64_t version, + uint64_t fileOffset, uint32_t bitmap, + uint16_t blockLength) { IN(); if (blockLength>1024) RETURN(-1); @@ -60,78 +78,73 @@ int rhizome_mdp_send_block(struct subscriber *dest, unsigned char *id, uint64_t int ret=rhizome_open_read(&read, filehash, 0); if (!ret){ - overlay_mdp_frame reply; - bzero(&reply,sizeof(reply)); - // Reply is broadcast, so we cannot authcrypt, and signing is too time consuming - // for low devices. The result is that an attacker can prevent rhizome transfers - // if they want to by injecting fake blocks. The alternative is to not broadcast - // back replies, and then we can authcrypt. - // multiple receivers starting at different times, we really need merkle-tree hashing. - // so multiple receivers is not realistic for now. So use non-broadcast unicode - // for now would seem the safest. But that would stop us from allowing multiple - // receivers in the special case where additional nodes begin listening in from the - // beginning. - reply.packetTypeAndFlags=MDP_TX|MDP_NOCRYPT|MDP_NOSIGN; - bcopy(my_subscriber->sid,reply.out.src.sid,SID_SIZE); - reply.out.src.port=MDP_PORT_RHIZOME_RESPONSE; - int send_broadcast=1; - - if (dest){ - if (!(dest->reachable&REACHABLE_DIRECT)) - send_broadcast=0; - if (dest->reachable&REACHABLE_UNICAST && dest->interface && dest->interface->prefer_unicast) - send_broadcast=0; - } - - if (send_broadcast){ - // send replies to broadcast so that others can hear blocks and record them - // (not that preemptive listening is implemented yet). - memset(reply.out.dst.sid,0xff,SID_SIZE); - reply.out.ttl=1; - }else{ - // if we get a request from a peer that we can only talk to via unicast, send data via unicast too. - bcopy(dest->sid, reply.out.dst.sid, SID_SIZE); - } - - reply.out.dst.port=MDP_PORT_RHIZOME_RESPONSE; - reply.out.queue=OQ_OPPORTUNISTIC; - reply.out.payload[0]='B'; // reply contains blocks - // include 16 bytes of BID prefix for identification - bcopy(id, &reply.out.payload[1], 16); - // and version of manifest - bcopy(&version, &reply.out.payload[1+16], sizeof(uint64_t)); - int i; - for(i=0;i<32;i++){ - if (bitmap&(1<<(31-i))) - continue; - - if (overlay_queue_remaining(reply.out.queue) < 10) - break; - + for (i = 0; i < 32; i++) { + if (bitmap & (1 << (31 - i))) + continue; + // calculate and set offset of block read.offset = fileOffset+i*blockLength; - + // stop if we passed the length of the file // (but we may not know the file length until we attempt a read) if (read.length!=-1 && read.offset>read.length) - break; - - write_uint64(&reply.out.payload[1+16+8], read.offset); + break; + + overlay_mdp_frame *reply = calloc(1, sizeof(overlay_mdp_frame)); + if (!reply) OUT_OF_MEMORY; + // Reply is broadcast, so we cannot authcrypt, and signing is too time consuming + // for low devices. The result is that an attacker can prevent rhizome transfers + // if they want to by injecting fake blocks. The alternative is to not broadcast + // back replies, and then we can authcrypt. + // multiple receivers starting at different times, we really need merkle-tree hashing. + // so multiple receivers is not realistic for now. So use non-broadcast unicode + // for now would seem the safest. But that would stop us from allowing multiple + // receivers in the special case where additional nodes begin listening in from the + // beginning. + reply->packetTypeAndFlags = MDP_TX|MDP_NOCRYPT|MDP_NOSIGN; + bcopy(my_subscriber->sid, reply->out.src.sid, SID_SIZE); + reply->out.src.port = MDP_PORT_RHIZOME_RESPONSE; + + if (!unicast_dest_sid) { + // send replies to broadcast so that others can hear blocks and record them + // (not that preemptive listening is implemented yet). + memset(reply->out.dst.sid, 0xff, SID_SIZE); + reply->out.ttl = 1; + }else{ + // if we get a request from a peer that we can only talk to via unicast, send data via unicast too. + bcopy(unicast_dest_sid, reply->out.dst.sid, SID_SIZE); + } + + reply->out.dst.port = MDP_PORT_RHIZOME_RESPONSE; + reply->out.queue = OQ_OPPORTUNISTIC; + reply->out.payload[0] = 'B'; // reply contains blocks + // include 16 bytes of BID prefix for identification + bcopy(id, &reply->out.payload[1], 16); + // and version of manifest + bcopy(&version, &reply->out.payload[1+16], sizeof(uint64_t)); + + /* overlay_queue_remaining cannot be called by a thread other than + main_thread */ + //if (overlay_queue_remaining(reply->out.queue) < 10) + // break; - int bytes_read = rhizome_read(&read, &reply.out.payload[1+16+8+8], blockLength); - if (bytes_read<=0) - break; + write_uint64(&reply->out.payload[1+16+8], read.offset); - reply.out.payload_length=1+16+8+8+bytes_read; + int bytes_read = rhizome_read(&read, &reply->out.payload[1+16+8+8], blockLength); + if (bytes_read<=0) { + free(reply); + break; + } + + reply->out.payload_length=1+16+8+8+bytes_read; // Mark the last block of the file, if required if (read.offset >= read.length) - reply.out.payload[0]='T'; + reply->out.payload[0]='T'; // send packet - if (overlay_mdp_dispatch(&reply,0 /* system generated */, NULL,0)) - break; + post_runnable(overlay_mdp_dispatch_alarm, reply, &main_fdqueue); } } rhizome_read_close(&read); @@ -142,18 +155,24 @@ int rhizome_mdp_send_block(struct subscriber *dest, unsigned char *id, uint64_t int overlay_mdp_service_rhizomerequest(overlay_mdp_frame *mdp) { - uint64_t version= - read_uint64(&mdp->out.payload[RHIZOME_MANIFEST_ID_BYTES]); - uint64_t fileOffset= - read_uint64(&mdp->out.payload[RHIZOME_MANIFEST_ID_BYTES+8]); - uint32_t bitmap= - read_uint32(&mdp->out.payload[RHIZOME_MANIFEST_ID_BYTES+8+8]); - uint16_t blockLength= - read_uint16(&mdp->out.payload[RHIZOME_MANIFEST_ID_BYTES+8+8+4]); - - struct subscriber *source = find_subscriber(mdp->out.src.sid, SID_SIZE, 0); - - return rhizome_mdp_send_block(source, &mdp->out.payload[0], version, fileOffset, bitmap, blockLength); + struct rmsb_arg *arg = malloc(sizeof(struct rmsb_arg)); + if (!arg) OUT_OF_MEMORY; + arg->version = read_uint64(&mdp->out.payload[RHIZOME_MANIFEST_ID_BYTES]); + arg->file_offset = read_uint64(&mdp->out.payload[RHIZOME_MANIFEST_ID_BYTES+8]); + arg->bitmap = read_uint32(&mdp->out.payload[RHIZOME_MANIFEST_ID_BYTES+8+8]); + arg->block_length = read_uint16(&mdp->out.payload[RHIZOME_MANIFEST_ID_BYTES+8+8+4]); + + struct subscriber *sid = find_subscriber(mdp->out.src.sid, SID_SIZE, 0); + arg->unicast = sid && (!(sid->reachable & REACHABLE_DIRECT) + || ((sid->reachable & REACHABLE_UNICAST) + && sid->interface + && sid->interface->prefer_unicast)); + if (arg->unicast) { + memcpy(arg->unicast_dest_sid, sid->sid, SID_SIZE); + } + memcpy(arg->bid, &mdp->out.payload[0], RHIZOME_MANIFEST_ID_BYTES); + post_runnable(rhizome_mdp_send_block_alarm, arg, &rhizome_fdqueue); + return 0; } int overlay_mdp_service_rhizomeresponse(overlay_mdp_frame *mdp) @@ -167,12 +186,15 @@ int overlay_mdp_service_rhizomeresponse(overlay_mdp_frame *mdp) case 'B': /* data block */ case 'T': /* terminal data block */ { - if (mdp->out.payload_length<(1+16+8+8+1)) RETURN(-1); - unsigned char *bidprefix=&mdp->out.payload[1]; - uint64_t version=read_uint64(&mdp->out.payload[1+16]); - uint64_t offset=read_uint64(&mdp->out.payload[1+16+8]); - int count=mdp->out.payload_length-(1+16+8+8); - unsigned char *bytes=&mdp->out.payload[1+16+8+8]; + struct rrc_arg *arg = malloc(sizeof(struct rrc_arg)); + if (!arg) OUT_OF_MEMORY; + memcpy(arg->bidprefix, &mdp->out.payload[1], 16); + arg->version = read_uint64(&mdp->out.payload[1+16]); + arg->offset = read_uint64(&mdp->out.payload[1+16+8]); + arg->count = mdp->out.payload_length-(1+16+8+8); + arg->bytes = malloc(arg->count * sizeof(char)); + memcpy(arg->bytes, &mdp->out.payload[1+16+8+8], arg->count); + arg->type = type; /* Now see if there is a slot that matches. If so, then see if the bytes are in the window, and write them. @@ -181,7 +203,8 @@ int overlay_mdp_service_rhizomeresponse(overlay_mdp_frame *mdp) a slot to capture this files as it is being requested by someone else. */ - rhizome_received_content(bidprefix,version,offset,count,bytes,type); + + post_runnable(rhizome_received_content_alarm, arg, &rhizome_fdqueue); RETURN(-1); } @@ -379,21 +402,44 @@ static int overlay_mdp_service_trace(overlay_mdp_frame *mdp){ RETURN(ret); } +static void rhizome_retrieve_and_advertise_manifest(const char *id_hex); + +void rhizome_retrieve_and_advertise_manifest_alarm(struct sched_ent *alarm) +{ + ASSERT_THREAD(rhizome_thread); + char *id_hex = alarm->context; + free(alarm); + rhizome_retrieve_and_advertise_manifest(id_hex); + free(id_hex); +} + +static void rhizome_retrieve_and_advertise_manifest(const char *id_hex) +{ + ASSERT_THREAD(rhizome_thread); + rhizome_manifest *manifest = rhizome_new_manifest(); + if (!manifest) OUT_OF_MEMORY; + if (!rhizome_retrieve_manifest(id_hex, manifest)) { + struct ram_arg *arg = malloc(sizeof(struct ram_arg)); + if (!arg) OUT_OF_MEMORY; + arg->manifest_all_bytes = manifest->manifest_all_bytes; + memcpy(arg->manifestdata, manifest->manifestdata, + manifest->manifest_all_bytes); + post_runnable(rhizome_advertise_manifest_alarm, arg, &main_fdqueue); + } + rhizome_manifest_free(manifest); +} + static int overlay_mdp_service_manifest_response(overlay_mdp_frame *mdp){ int offset=0; - char id_hex[RHIZOME_MANIFEST_ID_STRLEN]; - + while (offsetout.payload_length){ unsigned char *bar=&mdp->out.payload[offset]; + char *id_hex = malloc(RHIZOME_MANIFEST_ID_STRLEN * sizeof(char)); + if (!id_hex) OUT_OF_MEMORY; tohex(id_hex, &bar[RHIZOME_BAR_PREFIX_OFFSET], RHIZOME_BAR_PREFIX_BYTES); strcat(id_hex, "%"); - rhizome_manifest *m = rhizome_new_manifest(); - if (!m) - return WHY("Unable to allocate manifest"); - if (!rhizome_retrieve_manifest(id_hex, m)){ - rhizome_advertise_manifest(m); - } - rhizome_manifest_free(m); + post_runnable(rhizome_retrieve_and_advertise_manifest_alarm, id_hex, + &rhizome_fdqueue); offset+=RHIZOME_BAR_BYTES; } diff --git a/overlay_packetformats.c b/overlay_packetformats.c index 536e97d0..a453b0a0 100644 --- a/overlay_packetformats.c +++ b/overlay_packetformats.c @@ -23,6 +23,7 @@ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. #include "strbuf.h" #include "overlay_buffer.h" #include "overlay_packet.h" +#include "parallel.h" struct sockaddr_in loopback; @@ -84,7 +85,20 @@ int process_incoming_frame(time_ms_t now, struct overlay_interface *interface, s break; // data frames case OF_TYPE_RHIZOME_ADVERT: - overlay_rhizome_saw_advertisements(id,f,now); + ; // only a statement can follow a label + struct orsa_arg *arg = malloc(sizeof(struct orsa_arg)); + if (!arg) OUT_OF_MEMORY; + arg->id = id; + arg->payload = ob_dup(f->payload); + /* ob_dup() does not do what expected + * See https://github.com/servalproject/serval-dna/pull/66 */ + arg->payload->position = f->payload->position; + memcpy(arg->src_sid, f->source->sid, SID_SIZE); + arg->src_reachable = f->source->reachable; + memcpy(&arg->recvaddr, &f->recvaddr, sizeof(struct sockaddr_in)); + arg->now = now; + post_runnable(overlay_rhizome_saw_advertisements_alarm, arg, + &rhizome_fdqueue); break; case OF_TYPE_DATA: case OF_TYPE_DATA_VOICE: diff --git a/overlay_queue.c b/overlay_queue.c index 739f66b4..419784d8 100644 --- a/overlay_queue.c +++ b/overlay_queue.c @@ -21,6 +21,7 @@ #include "conf.h" #include "overlay_buffer.h" #include "overlay_packet.h" +#include "parallel.h" #include "str.h" #include "strbuf.h" @@ -59,6 +60,7 @@ static void overlay_send_packet(struct sched_ent *alarm); static int overlay_calc_queue_time(overlay_txqueue *queue, struct overlay_frame *frame); int overlay_queue_init(){ + ASSERT_THREAD(main_thread); /* Set default congestion levels for queues */ int i; for(i=0;icontext; + free(alarm); + struct overlay_frame *frame = calloc(1, sizeof(struct overlay_frame)); + if (!frame) OUT_OF_MEMORY; + frame->type = OF_TYPE_RHIZOME_ADVERT; + frame->source = my_subscriber; + frame->ttl = 1; + frame->queue = OQ_OPPORTUNISTIC; + frame->payload = payload; + if (overlay_payload_enqueue(frame)) { + op_free(frame); + } +} + int overlay_payload_enqueue(struct overlay_frame *p) { + ASSERT_THREAD(main_thread); /* Add payload p to queue q. Queues get scanned from first to last, so we should append new entries @@ -270,6 +289,7 @@ static void overlay_init_packet(struct outgoing_packet *packet, struct subscriber *destination, int unicast, int packet_version, overlay_interface *interface, struct sockaddr_in addr){ + ASSERT_THREAD(main_thread); packet->interface = interface; packet->context.interface = interface; packet->i = (interface - overlay_interfaces); @@ -295,6 +315,7 @@ overlay_init_packet(struct outgoing_packet *packet, struct subscriber *destinati } int overlay_queue_schedule_next(time_ms_t next_allowed_packet){ + ASSERT_THREAD(main_thread); if (next_packet.alarm==0 || next_allowed_packet < next_packet.alarm){ if (!next_packet.function){ @@ -369,6 +390,7 @@ overlay_calc_queue_time(overlay_txqueue *queue, struct overlay_frame *frame){ static void overlay_stuff_packet(struct outgoing_packet *packet, overlay_txqueue *queue, time_ms_t now){ + ASSERT_THREAD(main_thread); struct overlay_frame *frame = queue->first; // TODO stop when the packet is nearly full? @@ -594,6 +616,7 @@ overlay_stuff_packet(struct outgoing_packet *packet, overlay_txqueue *queue, tim // fill a packet from our outgoing queues and send it static int overlay_fill_send_packet(struct outgoing_packet *packet, time_ms_t now) { + ASSERT_THREAD(main_thread); int i; IN(); // while we're looking at queues, work out when to schedule another packet @@ -626,6 +649,7 @@ overlay_fill_send_packet(struct outgoing_packet *packet, time_ms_t now) { // when the queue timer elapses, send a packet static void overlay_send_packet(struct sched_ent *alarm){ + ASSERT_THREAD(main_thread); struct outgoing_packet packet; bzero(&packet, sizeof(struct outgoing_packet)); packet.seq=-1; @@ -633,6 +657,7 @@ static void overlay_send_packet(struct sched_ent *alarm){ } int overlay_send_tick_packet(struct overlay_interface *interface){ + ASSERT_THREAD(main_thread); struct outgoing_packet packet; bzero(&packet, sizeof(struct outgoing_packet)); packet.seq=-1; @@ -645,6 +670,7 @@ int overlay_send_tick_packet(struct overlay_interface *interface){ // de-queue all packets that have been sent to this subscriber & have arrived. int overlay_queue_ack(struct subscriber *neighbour, struct overlay_interface *interface, uint32_t ack_mask, int ack_seq) { + ASSERT_THREAD(main_thread); int interface_id = interface - overlay_interfaces; int i; time_ms_t now = gettime_ms(); diff --git a/parallel.c b/parallel.c new file mode 100644 index 00000000..544789ac --- /dev/null +++ b/parallel.c @@ -0,0 +1,53 @@ +/* + Copyright (C) 2012 Serval Project. + + This program is free software; you can redistribute it and/or + modify it under the terms of the GNU General Public License + as published by the Free Software Foundation; either version 2 + of the License, or (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA + 02110-1301, USA. + */ + +#include +#include "parallel.h" +#include "serval.h" + +int multithread; + +pthread_t main_thread; +pthread_t rhizome_thread; + +/* thread function, must have type: void *f(void *) */ +void *rhizome_run(void *arg) { + rhizome_fdqueue.thread = rhizome_thread = pthread_self(); + while (fd_poll(&rhizome_fdqueue, 1)); + return NULL; +} + +void post_runnable(ALARM_FUNCP function, void *arg, fdqueue *fdq) { + if (!multithread) { + fdq = &main_fdqueue; + } + time_ms_t now = gettime_ms(); + static struct profile_total stats = { .name = "post_runnable/generic" }; + struct sched_ent *alarm = malloc(sizeof(struct sched_ent)); + if (!alarm) OUT_OF_MEMORY; + *alarm = (struct sched_ent) { + .function = function, + .alarm = now, + .deadline = now + 2000, + .stats = &stats, + .context = arg, + .fdqueue = fdq, + }; + schedule(alarm); +} diff --git a/parallel.h b/parallel.h new file mode 100644 index 00000000..f8568b1e --- /dev/null +++ b/parallel.h @@ -0,0 +1,113 @@ +/* + Copyright (C) 2012 Serval Project. + + This program is free software; you can redistribute it and/or + modify it under the terms of the GNU General Public License + as published by the Free Software Foundation; either version 2 + of the License, or (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA + 02110-1301, USA. + */ + +#ifndef __SERVALD_PARALLEL_H +#define __SERVALD_PARALLEL_H + +#include +#include "serval.h" +#include "rhizome.h" + +extern int multithread; + +extern pthread_t main_thread; +extern pthread_t rhizome_thread; + +#define ASSERT_THREAD(P)\ + if (multithread && pthread_self() != (P)) {\ + FATAL("Not called from the expected thread");\ + } + +/* rhizome thread function */ +void *rhizome_run(void *arg); + +/* schedule a function call with the specified arguments on fdq */ +void post_runnable(ALARM_FUNCP function, void *arg, fdqueue *fdq); + +/* following structs are used for passing args from one thread to another */ + +/* *alarm->context is an overlay_mdp_frame */ +void overlay_mdp_dispatch_alarm(struct sched_ent *alarm); + +/* overlay_rhizome_saw_advertisements argument */ +struct orsa_arg { + int id; + struct overlay_buffer *payload; + unsigned char src_sid[SID_SIZE]; + int src_reachable; + int use_new_sync_protocol; + struct sockaddr_in recvaddr; + time_ms_t now; +}; + +/* *alarm->context is a struct orsa_arg */ +void overlay_rhizome_saw_advertisements_alarm(struct sched_ent *alarm); + +/* *alarm->context is a struct overlay_buffer (payload) */ +void overlay_payload_enqueue_alarm(struct sched_ent *alarm); + +/* rhizome_received_content argument */ +struct rrc_arg { + int type; + unsigned char bidprefix[16]; + uint64_t version; + uint64_t offset; + int count; + unsigned char* bytes; +}; + +/* *alarm->context is a struct rrc_arg */ +void rhizome_received_content_alarm(struct sched_ent *alarm); + +/* rhizome_mdp_send_block argument */ +struct rmsb_arg { + int unicast; + unsigned char unicast_dest_sid[SID_SIZE]; + unsigned char bid[RHIZOME_MANIFEST_ID_BYTES]; + uint64_t version; + uint64_t file_offset; + uint32_t bitmap; + uint16_t block_length; +}; + +/* *alarm->context is a struct rmsb_arg */ +void rhizome_mdp_send_block_alarm(struct sched_ent *alarm); + +/* rhizome_advertise_manifest_alarm argument */ +struct ram_arg { + int manifest_all_bytes; + unsigned char manifestdata[MAX_MANIFEST_BYTES]; +}; + +/* *alarm->context is a struct ram_arg */ +void rhizome_advertise_manifest_alarm(struct sched_ent *alarm); + +/* *alarm->context is a char * (id_hex) */ +void rhizome_retrieve_and_advertise_manifest_alarm(struct sched_ent *alarm); + +/* rhizome_sync_send_requests argument */ +struct rssr_arg { + unsigned char sid[SID_SIZE]; + struct rhizome_sync *state; +}; + +/* *alarm->context is a struct rssr_arg */ +void rhizome_sync_send_requests_alarm(struct sched_ent *alarm); + +#endif diff --git a/performance_timing.c b/performance_timing.c index 86c4c6f1..c026b10d 100644 --- a/performance_timing.c +++ b/performance_timing.c @@ -20,9 +20,6 @@ #include "serval.h" #include "conf.h" -struct profile_total *stats_head=NULL; -struct call_stats *current_call=NULL; - void fd_clearstat(struct profile_total *s){ s->max_time = 0; s->total_time = 0; @@ -126,9 +123,9 @@ struct profile_total *sort(struct profile_total *list){ return left_head; } -int fd_clearstats() +int fd_clearstats(fdqueue *fdq) { - struct profile_total *stats = stats_head; + struct profile_total *stats = fdq->stats_head; while(stats!=NULL){ fd_clearstat(stats); stats = stats->_next; @@ -136,13 +133,13 @@ int fd_clearstats() return 0; } -int fd_showstats() +int fd_showstats(fdqueue *fdq) { struct profile_total total={NULL, 0, "Total", 0,0,0}; - stats_head = sort(stats_head); + fdq->stats_head = sort(fdq->stats_head); - struct profile_total *stats = stats_head; + struct profile_total *stats = fdq->stats_head; while(stats!=NULL){ /* Get total time spent doing everything */ fd_tallystats(&total,stats); @@ -151,7 +148,7 @@ int fd_showstats() // Show periodic rhizome transfer information, but only // if there are some active rhizome transfers. - if (rhizome_active_fetch_count()!=0) + if (fdq == &rhizome_fdqueue && rhizome_active_fetch_count()!=0) INFOF("Rhizome transfer progress: %d,%d,%d,%d,%d,%d (remaining %d)", rhizome_active_fetch_bytes_received(0), rhizome_active_fetch_bytes_received(1), @@ -164,7 +161,7 @@ int fd_showstats() // Report any functions that take too much time if (!config.debug.timing) { - stats = stats_head; + stats = fdq->stats_head; while(stats!=NULL){ /* If a function spends more than 1 second in any notionally 3 second period, then dob on it */ @@ -176,7 +173,7 @@ int fd_showstats() } else { INFOF("servald time usage stats:"); - stats = stats_head; + stats = fdq->stats_head; while(stats!=NULL){ /* Get total time spent doing everything */ if (stats->calls) @@ -191,16 +188,16 @@ int fd_showstats() void fd_periodicstats(struct sched_ent *alarm) { - fd_showstats(); - fd_clearstats(); + fd_showstats(alarm->fdqueue); + fd_clearstats(alarm->fdqueue); alarm->alarm = gettime_ms()+3000; alarm->deadline = alarm->alarm+1000; schedule(alarm); } -void dump_stack(int log_level) +void dump_stack(fdqueue *fdq, int log_level) { - struct call_stats *call = current_call; + struct call_stats *call = fdq->current_call; while(call){ if (call->totals) LOGF(log_level, "%s",call->totals->name); @@ -208,7 +205,16 @@ void dump_stack(int log_level) } } -int fd_func_enter(struct __sourceloc __whence, struct call_stats *this_call) +void dump_stacks(int log_level) +{ + INFOF("Main thread stack:"); + dump_stack(&main_fdqueue, log_level); + INFOF("Rhizome thread stack:"); + dump_stack(&rhizome_fdqueue, log_level); +} + +int fd_func_enter(struct __sourceloc __whence, fdqueue * fdq, + struct call_stats *this_call) { if (config.debug.profiling) DEBUGF("%s called from %s() %s:%d", @@ -216,12 +222,13 @@ int fd_func_enter(struct __sourceloc __whence, struct call_stats *this_call) this_call->enter_time=gettime_ms(); this_call->child_time=0; - this_call->prev = current_call; - current_call = this_call; + this_call->prev = fdq->current_call; + fdq->current_call = this_call; return 0; } -int fd_func_exit(struct __sourceloc __whence, struct call_stats *this_call) +int fd_func_exit(struct __sourceloc __whence, fdqueue * fdq, + struct call_stats *this_call) { // If current_call does not match this_call, then all bets are off as to where it points. It // probably points to somewhere on the stack (see the IN() macro) that has since been overwritten, @@ -231,22 +238,22 @@ int fd_func_exit(struct __sourceloc __whence, struct call_stats *this_call) DEBUGF("%s called from %s() %s:%d", __FUNCTION__,__whence.function,__whence.file,__whence.line); - if (current_call != this_call) + if (fdq->current_call != this_call) FATAL("performance timing stack trace corrupted"); time_ms_t now = gettime_ms(); time_ms_t elapsed = now - this_call->enter_time; - current_call = this_call->prev; + fdq->current_call = this_call->prev; if (this_call->totals && !this_call->totals->_initialised){ this_call->totals->_initialised=1; - this_call->totals->_next = stats_head; + this_call->totals->_next = fdq->stats_head; fd_clearstat(this_call->totals); - stats_head = this_call->totals; + fdq->stats_head = this_call->totals; } - if (current_call) - current_call->child_time+=elapsed; + if (fdq->current_call) + fdq->current_call->child_time+=elapsed; elapsed-=this_call->child_time; diff --git a/rhizome.h b/rhizome.h index e8caec0f..75818c9a 100644 --- a/rhizome.h +++ b/rhizome.h @@ -335,7 +335,7 @@ int rhizome_list_manifests(struct cli_context *context, const char *service, con const char *sender_sid, const char *recipient_sid, int limit, int offset, char count_rows); int rhizome_retrieve_manifest(const char *manifestid, rhizome_manifest *m); -int rhizome_advertise_manifest(rhizome_manifest *m); +int rhizome_advertise_manifest(int manifest_all_bytes, unsigned char *manifestdata); int rhizome_delete_bundle(const char *manifestid); int rhizome_delete_manifest(const char *manifestid); int rhizome_delete_payload(const char *manifestid); diff --git a/rhizome_bundle.c b/rhizome_bundle.c index b174846e..d3c4e777 100644 --- a/rhizome_bundle.c +++ b/rhizome_bundle.c @@ -20,6 +20,7 @@ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. #include #include "serval.h" #include "conf.h" +#include "parallel.h" #include "rhizome.h" #include "str.h" @@ -502,6 +503,7 @@ static void _log_manifest_trace(struct __sourceloc __whence, const char *operati rhizome_manifest *_rhizome_new_manifest(struct __sourceloc __whence) { + ASSERT_THREAD(rhizome_thread); if (manifest_first_free<0) { /* Setup structures */ int i; @@ -554,6 +556,7 @@ rhizome_manifest *_rhizome_new_manifest(struct __sourceloc __whence) void _rhizome_manifest_free(struct __sourceloc __whence, rhizome_manifest *m) { + ASSERT_THREAD(rhizome_thread); if (!m) return; int i; int mid=m->manifest_record_number; diff --git a/rhizome_direct.c b/rhizome_direct.c index 1d3294ed..6b996baf 100644 --- a/rhizome_direct.c +++ b/rhizome_direct.c @@ -504,7 +504,7 @@ static int rhizome_sync_with_peers(int mode, int peer_count, const struct config rhizome_direct_sync_request *s = rhizome_direct_new_sync_request(rhizome_direct_http_dispatch, 65536, 0, mode, state); rhizome_direct_start_sync_request(s); if (rd_sync_handle_count > 0) - while (fd_poll() && rd_sync_handle_count > 0) + while (fd_poll(&rhizome_fdqueue, 1) && rd_sync_handle_count > 0) ; } return 0; diff --git a/rhizome_direct_http.c b/rhizome_direct_http.c index ccc42a59..f347bb81 100644 --- a/rhizome_direct_http.c +++ b/rhizome_direct_http.c @@ -877,7 +877,7 @@ void rhizome_direct_http_dispatch(rhizome_direct_sync_request *r) fetch the file for import is all handled asynchronously, so just wait for it to finish. */ while (rhizome_any_fetch_active() || rhizome_any_fetch_queued()) - fd_poll(); + fd_poll(&rhizome_fdqueue, 1); } } else if (type==1&&r->pushP) { diff --git a/rhizome_fetch.c b/rhizome_fetch.c index 0bf3d20a..0d2c11a9 100644 --- a/rhizome_fetch.c +++ b/rhizome_fetch.c @@ -26,6 +26,7 @@ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. #include "str.h" #include "strbuf_helpers.h" #include "overlay_address.h" +#include "parallel.h" /* Represents a queued fetch of a bundle payload, for which the manifest is already known. */ @@ -119,18 +120,19 @@ struct rhizome_fetch_candidate queue5[2]; /* Static allocation of the queue structures. Must be in order of ascending log_size_threshold. */ struct rhizome_fetch_queue rhizome_fetch_queues[] = { - { .candidate_queue_size = NELS(queue0), .candidate_queue = queue0, .log_size_threshold = 10, .active = { .state = RHIZOME_FETCH_FREE } }, - { .candidate_queue_size = NELS(queue1), .candidate_queue = queue1, .log_size_threshold = 13, .active = { .state = RHIZOME_FETCH_FREE } }, - { .candidate_queue_size = NELS(queue2), .candidate_queue = queue2, .log_size_threshold = 16, .active = { .state = RHIZOME_FETCH_FREE } }, - { .candidate_queue_size = NELS(queue3), .candidate_queue = queue3, .log_size_threshold = 19, .active = { .state = RHIZOME_FETCH_FREE } }, - { .candidate_queue_size = NELS(queue4), .candidate_queue = queue4, .log_size_threshold = 22, .active = { .state = RHIZOME_FETCH_FREE } }, - { .candidate_queue_size = NELS(queue5), .candidate_queue = queue5, .log_size_threshold = 0xFF, .active = { .state = RHIZOME_FETCH_FREE } } + { .candidate_queue_size = NELS(queue0), .candidate_queue = queue0, .log_size_threshold = 10, .active = { .state = RHIZOME_FETCH_FREE, .alarm = { .fdqueue = &rhizome_fdqueue } } }, + { .candidate_queue_size = NELS(queue1), .candidate_queue = queue1, .log_size_threshold = 13, .active = { .state = RHIZOME_FETCH_FREE, .alarm = { .fdqueue = &rhizome_fdqueue } } }, + { .candidate_queue_size = NELS(queue2), .candidate_queue = queue2, .log_size_threshold = 16, .active = { .state = RHIZOME_FETCH_FREE, .alarm = { .fdqueue = &rhizome_fdqueue } } }, + { .candidate_queue_size = NELS(queue3), .candidate_queue = queue3, .log_size_threshold = 19, .active = { .state = RHIZOME_FETCH_FREE, .alarm = { .fdqueue = &rhizome_fdqueue } } }, + { .candidate_queue_size = NELS(queue4), .candidate_queue = queue4, .log_size_threshold = 22, .active = { .state = RHIZOME_FETCH_FREE, .alarm = { .fdqueue = &rhizome_fdqueue } } }, + { .candidate_queue_size = NELS(queue5), .candidate_queue = queue5, .log_size_threshold = 0xFF, .active = { .state = RHIZOME_FETCH_FREE, .alarm = { .fdqueue = &rhizome_fdqueue } } } }; #define NQUEUES NELS(rhizome_fetch_queues) int rhizome_active_fetch_count() { + ASSERT_THREAD(rhizome_thread); int i,active=0; for(i=0;i=NQUEUES) return -1; if (rhizome_fetch_queues[q].active.state==RHIZOME_FETCH_FREE) return -1; return (int)rhizome_fetch_queues[q].active.write_state.file_offset; } int rhizome_fetch_queue_bytes(){ + ASSERT_THREAD(rhizome_thread); int i,j,bytes=0; for(i=0;icandidate_queue[i]; struct rhizome_fetch_candidate * e = &q->candidate_queue[q->candidate_queue_size - 1]; if (config.debug.rhizome_rx) @@ -259,6 +267,7 @@ static struct rhizome_fetch_candidate *rhizome_fetch_insert(struct rhizome_fetch */ static void rhizome_fetch_unqueue(struct rhizome_fetch_queue *q, int i) { + ASSERT_THREAD(rhizome_thread); assert(i >= 0 && i < q->candidate_queue_size); struct rhizome_fetch_candidate *c = &q->candidate_queue[i]; if (config.debug.rhizome_rx) @@ -279,6 +288,7 @@ static void rhizome_fetch_unqueue(struct rhizome_fetch_queue *q, int i) */ int rhizome_any_fetch_active() { + ASSERT_THREAD(rhizome_thread); int i; for (i = 0; i < NQUEUES; ++i) if (rhizome_fetch_queues[i].active.state != RHIZOME_FETCH_FREE) @@ -292,6 +302,7 @@ int rhizome_any_fetch_active() */ int rhizome_any_fetch_queued() { + ASSERT_THREAD(rhizome_thread); int i; for (i = 0; i < NQUEUES; ++i) if (rhizome_fetch_queues[i].candidate_queue[0].manifest) @@ -301,6 +312,7 @@ int rhizome_any_fetch_queued() int rhizome_manifest_version_cache_lookup(rhizome_manifest *m) { + ASSERT_THREAD(rhizome_thread); char id[RHIZOME_MANIFEST_ID_STRLEN + 1]; if (!rhizome_manifest_get(m, "id", id, sizeof id)) // dodgy manifest, we don't want to receive it @@ -341,6 +353,7 @@ ignored_manifest_cache ignored; int rhizome_ignore_manifest_check(unsigned char *bid_prefix, int prefix_len) { + ASSERT_THREAD(rhizome_thread); if (prefix_len < RHIZOME_BAR_PREFIX_BYTES) FATAL("Prefix length is too short"); @@ -363,6 +376,7 @@ int rhizome_ignore_manifest_check(unsigned char *bid_prefix, int prefix_len) int rhizome_queue_ignore_manifest(unsigned char *bid_prefix, int prefix_len, int timeout) { + ASSERT_THREAD(rhizome_thread); if (prefix_len < RHIZOME_BAR_PREFIX_BYTES) FATAL("Prefix length is too short"); @@ -389,6 +403,7 @@ int rhizome_queue_ignore_manifest(unsigned char *bid_prefix, int prefix_len, int static int rhizome_import_received_bundle(struct rhizome_manifest *m) { + ASSERT_THREAD(rhizome_thread); m->finalised = 1; m->manifest_bytes = m->manifest_all_bytes; // store the signatures too if (config.debug.rhizome_rx) { @@ -401,6 +416,7 @@ static int rhizome_import_received_bundle(struct rhizome_manifest *m) static int schedule_fetch(struct rhizome_fetch_slot *slot) { + ASSERT_THREAD(rhizome_thread); IN(); int sock = -1; /* TODO Don't forget to implement resume */ @@ -539,6 +555,7 @@ static int schedule_fetch(struct rhizome_fetch_slot *slot) static enum rhizome_start_fetch_result rhizome_fetch(struct rhizome_fetch_slot *slot, rhizome_manifest *m, const struct sockaddr_in *peerip,unsigned const char *peersid) { + ASSERT_THREAD(rhizome_thread); IN(); if (slot->state != RHIZOME_FETCH_FREE) RETURN(SLOTBUSY); @@ -645,6 +662,7 @@ rhizome_fetch_request_manifest_by_prefix(const struct sockaddr_in *peerip, const unsigned char peersid[SID_SIZE], const unsigned char *prefix, size_t prefix_length) { + ASSERT_THREAD(rhizome_thread); assert(peerip); struct rhizome_fetch_slot *slot = rhizome_find_fetch_slot(MAX_MANIFEST_BYTES); if (slot == NULL) @@ -675,6 +693,7 @@ rhizome_fetch_request_manifest_by_prefix(const struct sockaddr_in *peerip, */ static void rhizome_start_next_queued_fetch(struct rhizome_fetch_slot *slot) { + ASSERT_THREAD(rhizome_thread); IN(); struct rhizome_fetch_queue *q; for (q = (struct rhizome_fetch_queue *) slot; q >= rhizome_fetch_queues; --q) { @@ -715,6 +734,7 @@ static void rhizome_start_next_queued_fetch(struct rhizome_fetch_slot *slot) */ static void rhizome_start_next_queued_fetches(struct sched_ent *alarm) { + ASSERT_THREAD(rhizome_thread); IN(); int i; for (i = 0; i < NQUEUES; ++i) @@ -724,6 +744,7 @@ static void rhizome_start_next_queued_fetches(struct sched_ent *alarm) /* Search all fetch slots, including active downloads, for a matching manifest */ rhizome_manifest * rhizome_fetch_search(unsigned char *id, int prefix_length){ + ASSERT_THREAD(rhizome_thread); int i, j; for (i = 0; i < NQUEUES; ++i) { struct rhizome_fetch_queue *q = &rhizome_fetch_queues[i]; @@ -744,6 +765,7 @@ rhizome_manifest * rhizome_fetch_search(unsigned char *id, int prefix_length){ /* Do we have space to add a fetch candidate of this size? */ int rhizome_fetch_has_queue_space(unsigned char log2_size){ + ASSERT_THREAD(rhizome_thread); int i; for (i = 0; i < NQUEUES; ++i) { struct rhizome_fetch_queue *q = &rhizome_fetch_queues[i]; @@ -780,6 +802,7 @@ struct profile_total rsnqf_stats={.name="rhizome_start_next_queued_fetches"}; int rhizome_suggest_queue_manifest_import(rhizome_manifest *m, const struct sockaddr_in *peerip,const unsigned char peersid[SID_SIZE]) { + ASSERT_THREAD(rhizome_thread); IN(); if (!config.rhizome.fetch){ @@ -904,11 +927,14 @@ int rhizome_suggest_queue_manifest_import(rhizome_manifest *m, const struct sock } } + /* no need to synchronize is_schedule() and schedule(): only this thread can + schedule sched_activate */ if (!is_scheduled(&sched_activate)) { sched_activate.function = rhizome_start_next_queued_fetches; sched_activate.stats = &rsnqf_stats; sched_activate.alarm = gettime_ms() + rhizome_fetch_delay_ms(); sched_activate.deadline = sched_activate.alarm + config.rhizome.idle_timeout; + sched_activate.fdqueue = &rhizome_fdqueue; schedule(&sched_activate); } @@ -918,6 +944,7 @@ int rhizome_suggest_queue_manifest_import(rhizome_manifest *m, const struct sock static int rhizome_fetch_close(struct rhizome_fetch_slot *slot) { + ASSERT_THREAD(rhizome_thread); if (config.debug.rhizome_rx) DEBUGF("close Rhizome fetch slot=%d", slotno(slot)); assert(slot->state != RHIZOME_FETCH_FREE); @@ -951,6 +978,7 @@ static int rhizome_fetch_close(struct rhizome_fetch_slot *slot) static void rhizome_fetch_mdp_slot_callback(struct sched_ent *alarm) { + ASSERT_THREAD(rhizome_thread); IN(); struct rhizome_fetch_slot *slot=(struct rhizome_fetch_slot*)alarm; @@ -972,6 +1000,7 @@ static void rhizome_fetch_mdp_slot_callback(struct sched_ent *alarm) static int rhizome_fetch_mdp_touch_timeout(struct rhizome_fetch_slot *slot) { + ASSERT_THREAD(rhizome_thread); // 266ms @ 1mbit (WiFi broadcast speed) = 32x1024 byte packets. // But on a packet radio interface at perhaps 50kbit, this is clearly // a bad policy. Ideally we should know about the interface speed @@ -988,6 +1017,7 @@ static int rhizome_fetch_mdp_touch_timeout(struct rhizome_fetch_slot *slot) static int rhizome_fetch_mdp_requestblocks(struct rhizome_fetch_slot *slot) { + ASSERT_THREAD(rhizome_thread); IN(); // only issue new requests every 133ms. // we automatically re-issue once we have received all packets in this @@ -995,19 +1025,18 @@ static int rhizome_fetch_mdp_requestblocks(struct rhizome_fetch_slot *slot) // faster. Optimising behaviour when there is no packet loss is an // outstanding task. - overlay_mdp_frame mdp; - - bzero(&mdp,sizeof(mdp)); - bcopy(my_subscriber->sid,mdp.out.src.sid,SID_SIZE); - mdp.out.src.port=MDP_PORT_RHIZOME_RESPONSE; - bcopy(slot->peer_sid,mdp.out.dst.sid,SID_SIZE); - mdp.out.dst.port=MDP_PORT_RHIZOME_REQUEST; - mdp.out.ttl=1; - mdp.packetTypeAndFlags=MDP_TX; - - mdp.out.queue=OQ_ORDINARY; - mdp.out.payload_length=RHIZOME_MANIFEST_ID_BYTES+8+8+4+2; - bcopy(slot->bid,&mdp.out.payload[0],RHIZOME_MANIFEST_ID_BYTES); + overlay_mdp_frame *mdp = calloc(1, sizeof(overlay_mdp_frame)); + if (!mdp) OUT_OF_MEMORY; + bcopy(my_subscriber->sid, mdp->out.src.sid, SID_SIZE); + mdp->out.src.port = MDP_PORT_RHIZOME_RESPONSE; + bcopy(slot->peer_sid, mdp->out.dst.sid, SID_SIZE); + mdp->out.dst.port = MDP_PORT_RHIZOME_REQUEST; + mdp->out.ttl = 1; + mdp->packetTypeAndFlags = MDP_TX; + + mdp->out.queue = OQ_ORDINARY; + mdp->out.payload_length= RHIZOME_MANIFEST_ID_BYTES+8+8+4+2; + bcopy(slot->bid, &mdp->out.payload[0], RHIZOME_MANIFEST_ID_BYTES); uint32_t bitmap=0; int requests=32; @@ -1026,18 +1055,18 @@ static int rhizome_fetch_mdp_requestblocks(struct rhizome_fetch_slot *slot) offset+=slot->mdpRXBlockLength; } - write_uint64(&mdp.out.payload[RHIZOME_MANIFEST_ID_BYTES], slot->bidVersion); - write_uint64(&mdp.out.payload[RHIZOME_MANIFEST_ID_BYTES+8], slot->write_state.file_offset); - write_uint32(&mdp.out.payload[RHIZOME_MANIFEST_ID_BYTES+8+8], bitmap); - write_uint16(&mdp.out.payload[RHIZOME_MANIFEST_ID_BYTES+8+8+4], slot->mdpRXBlockLength); + write_uint64(&mdp->out.payload[RHIZOME_MANIFEST_ID_BYTES], slot->bidVersion); + write_uint64(&mdp->out.payload[RHIZOME_MANIFEST_ID_BYTES+8], slot->write_state.file_offset); + write_uint32(&mdp->out.payload[RHIZOME_MANIFEST_ID_BYTES+8+8], bitmap); + write_uint16(&mdp->out.payload[RHIZOME_MANIFEST_ID_BYTES+8+8+4], slot->mdpRXBlockLength); if (config.debug.rhizome_tx) DEBUGF("src sid=%s, dst sid=%s, mdpRXWindowStart=0x%"PRIx64, - alloca_tohex_sid(mdp.out.src.sid),alloca_tohex_sid(mdp.out.dst.sid), + alloca_tohex_sid(mdp->out.src.sid), alloca_tohex_sid(mdp->out.dst.sid), slot->write_state.file_offset); - overlay_mdp_dispatch(&mdp,0 /* system generated */,NULL,0); - + post_runnable(overlay_mdp_dispatch_alarm, mdp, &main_fdqueue); + // remember when we sent the request so that we can adjust the inter-request // interval based on how fast the packets arrive. slot->mdpResponsesOutstanding=requests; @@ -1052,6 +1081,7 @@ static int rhizome_fetch_mdp_requestblocks(struct rhizome_fetch_slot *slot) static int rhizome_fetch_switch_to_mdp(struct rhizome_fetch_slot *slot) { + ASSERT_THREAD(rhizome_thread); /* In Rhizome Direct we use the same fetch slot system, but we aren't actually a running servald instance, so we cannot fall back to MDP. This is detected by checking if we have a SID for this instance. If not, then we are not a @@ -1110,6 +1140,7 @@ static int rhizome_fetch_switch_to_mdp(struct rhizome_fetch_slot *slot) void rhizome_fetch_write(struct rhizome_fetch_slot *slot) { + ASSERT_THREAD(rhizome_thread); IN(); if (config.debug.rhizome_rx) DEBUGF("write_nonblock(%d, %s)", slot->alarm.poll.fd, alloca_toprint(-1, &slot->request[slot->request_ofs], slot->request_len-slot->request_ofs)); @@ -1142,6 +1173,7 @@ void rhizome_fetch_write(struct rhizome_fetch_slot *slot) int rhizome_write_complete(struct rhizome_fetch_slot *slot) { + ASSERT_THREAD(rhizome_thread); IN(); if (slot->manifest) { @@ -1211,6 +1243,7 @@ int rhizome_write_complete(struct rhizome_fetch_slot *slot) int rhizome_write_content(struct rhizome_fetch_slot *slot, unsigned char *buffer, int bytes) { + ASSERT_THREAD(rhizome_thread); IN(); if (bytes<=0) @@ -1247,10 +1280,21 @@ int rhizome_write_content(struct rhizome_fetch_slot *slot, unsigned char *buffer OUT(); } +void rhizome_received_content_alarm(struct sched_ent *alarm) { + ASSERT_THREAD(rhizome_thread); + struct rrc_arg *arg = alarm->context; + free(alarm); + rhizome_received_content(arg->bidprefix, arg->version, arg->offset, + arg->count, arg->bytes, arg->type); + free(arg->bytes); + free(arg); +} + int rhizome_received_content(unsigned char *bidprefix, uint64_t version, uint64_t offset, int count,unsigned char *bytes,int type) { + ASSERT_THREAD(rhizome_thread); IN(); int i; for(i=0;ipoll.revents & POLLOUT) { @@ -1427,6 +1472,7 @@ void rhizome_fetch_poll(struct sched_ent *alarm) */ int unpack_http_response(char *response, struct http_response_parts *parts) { + ASSERT_THREAD(rhizome_thread); IN(); parts->code = -1; parts->reason = NULL; diff --git a/rhizome_http.c b/rhizome_http.c index 3de26d38..ae1e010e 100644 --- a/rhizome_http.c +++ b/rhizome_http.c @@ -172,6 +172,7 @@ int rhizome_http_server_start(int (*parse_func)(rhizome_http_request *), server_alarm.stats=&server_stats; server_alarm.poll.fd = rhizome_server_socket; server_alarm.poll.events = POLLIN; + server_alarm.fdqueue = &rhizome_fdqueue; watch(&server_alarm); return 0; @@ -297,6 +298,7 @@ void rhizome_server_poll(struct sched_ent *alarm) request->alarm.poll.events=POLLIN; request->alarm.alarm = gettime_ms()+RHIZOME_IDLE_TIMEOUT; request->alarm.deadline = request->alarm.alarm+RHIZOME_IDLE_TIMEOUT; + request->alarm.fdqueue = &rhizome_fdqueue; // watch for the incoming http request watch(&request->alarm); // set an inactivity timeout to close the connection diff --git a/rhizome_packetformats.c b/rhizome_packetformats.c index 9ed7b3e2..de7134d8 100644 --- a/rhizome_packetformats.c +++ b/rhizome_packetformats.c @@ -29,6 +29,7 @@ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. #include #include #include +#include "parallel.h" /* Android doesn't have log2(), and we don't really need to do floating point math to work out how big a file is. @@ -174,6 +175,7 @@ static int append_bars(struct overlay_buffer *e, sqlite_retry_state *retry, cons */ int64_t bundles_available=0; void overlay_rhizome_advertise(struct sched_ent *alarm){ + ASSERT_THREAD(rhizome_thread); bundles_available=0; static int64_t bundle_last_rowid=INT64_MAX; @@ -195,20 +197,14 @@ void overlay_rhizome_advertise(struct sched_ent *alarm){ if (bundles_available<1) goto end; - struct overlay_frame *frame = malloc(sizeof(struct overlay_frame)); - bzero(frame,sizeof(struct overlay_frame)); - frame->type = OF_TYPE_RHIZOME_ADVERT; - frame->source = my_subscriber; - frame->ttl = 1; - frame->queue = OQ_OPPORTUNISTIC; - frame->payload = ob_new(); - ob_limitsize(frame->payload, 800); - - ob_append_byte(frame->payload, 2); - ob_append_ui16(frame->payload, rhizome_http_server_port); + struct overlay_buffer *payload = ob_new(); + if (!payload) OUT_OF_MEMORY; + ob_limitsize(payload, 800); + ob_append_byte(payload, 2); + ob_append_ui16(payload, rhizome_http_server_port); int64_t rowid=0; - int count = append_bars(frame->payload, &retry, + int count = append_bars(payload, &retry, "SELECT BAR,ROWID FROM MANIFESTS ORDER BY ROWID DESC LIMIT 3", &rowid); @@ -216,16 +212,14 @@ void overlay_rhizome_advertise(struct sched_ent *alarm){ if (bundle_last_rowid>rowid || bundle_last_rowid<=0) bundle_last_rowid=rowid; - count = append_bars(frame->payload, &retry, + count = append_bars(payload, &retry, "SELECT BAR,ROWID FROM MANIFESTS WHERE ROWID < %lld ORDER BY ROWID DESC LIMIT 17", &bundle_last_rowid); if (count<17) bundle_last_rowid=INT64_MAX; } - if (overlay_payload_enqueue(frame)) - op_free(frame); - + post_runnable(overlay_payload_enqueue_alarm, payload, &main_fdqueue); end: sqlite_set_tracefunc(oldfunc); alarm->alarm = gettime_ms()+config.rhizome.advertise.interval; @@ -236,8 +230,18 @@ void overlay_rhizome_advertise(struct sched_ent *alarm){ #define HAS_PORT (1<<1) #define HAS_MANIFESTS (1<<0) +void rhizome_advertise_manifest_alarm(struct sched_ent *alarm) { + ASSERT_THREAD(main_thread); + struct ram_arg *arg = alarm->context; + free(alarm); + rhizome_advertise_manifest(arg->manifest_all_bytes, arg->manifestdata); + free(arg); +} + /* Queue an advertisment for a single manifest */ -int rhizome_advertise_manifest(rhizome_manifest *m){ +int rhizome_advertise_manifest(int manifest_all_bytes, + unsigned char *manifestdata) { + ASSERT_THREAD(main_thread); struct overlay_frame *frame = malloc(sizeof(struct overlay_frame)); bzero(frame,sizeof(struct overlay_frame)); frame->type = OF_TYPE_RHIZOME_ADVERT; @@ -249,8 +253,8 @@ int rhizome_advertise_manifest(rhizome_manifest *m){ if (ob_append_byte(frame->payload, HAS_PORT|HAS_MANIFESTS)) goto error; if (ob_append_ui16(frame->payload, is_rhizome_http_enabled()?rhizome_http_server_port:0)) goto error; - if (ob_append_ui16(frame->payload, m->manifest_all_bytes)) goto error; - if (ob_append_bytes(frame->payload, m->manifestdata, m->manifest_all_bytes)) goto error; + if (ob_append_ui16(frame->payload, manifest_all_bytes)) goto error; + if (ob_append_bytes(frame->payload, manifestdata, manifest_all_bytes)) goto error; ob_append_byte(frame->payload, 0xFF); if (overlay_payload_enqueue(frame)) goto error; return 0; @@ -262,17 +266,33 @@ int rhizome_advertise_manifest(rhizome_manifest *m){ time_ms_t lookup_time=0; -int overlay_rhizome_saw_advertisements(int i, struct overlay_frame *f, long long now) +void overlay_rhizome_saw_advertisements_alarm(struct sched_ent *alarm) { + ASSERT_THREAD(rhizome_thread); + struct orsa_arg *arg = alarm->context; + free(alarm); + overlay_rhizome_saw_advertisements(arg->id, arg->payload, arg->src_sid, + arg->src_reachable, + arg->use_new_sync_protocol, arg->recvaddr, + arg->now); + free(arg->payload); + free(arg); +} + +int overlay_rhizome_saw_advertisements(int i, struct overlay_buffer *payload, + unsigned char *src_sid, + int src_reachable, + int use_new_sync_protocol, + struct sockaddr_in recvaddr, + time_ms_t now) { + ASSERT_THREAD(rhizome_thread); IN(); - if (!f) - RETURN(-1); if (!(rhizome_db && config.rhizome.fetch)) RETURN(0); - int ad_frame_type=ob_get(f->payload); - struct sockaddr_in httpaddr = f->recvaddr; + int ad_frame_type = ob_get(payload); + struct sockaddr_in httpaddr = recvaddr; httpaddr.sin_port = htons(RHIZOME_HTTP_PORT); int manifest_length; rhizome_manifest *m=NULL; @@ -280,24 +300,24 @@ int overlay_rhizome_saw_advertisements(int i, struct overlay_frame *f, long long int (*oldfunc)() = sqlite_set_tracefunc(is_debug_rhizome_ads); if (ad_frame_type & HAS_PORT){ - httpaddr.sin_port = htons(ob_get_ui16(f->payload)); + httpaddr.sin_port = htons(ob_get_ui16(payload)); } if (ad_frame_type & HAS_MANIFESTS){ /* Extract whole manifests */ - while(f->payload->position < f->payload->sizeLimit) { - if (ob_getbyte(f->payload, f->payload->position)==0xff){ - f->payload->position++; - break; + while (payload->position < payload->sizeLimit) { + if (ob_getbyte(payload, payload->position) == 0xff) { + payload->position++; + break; } - - manifest_length=ob_get_ui16(f->payload); + + manifest_length=ob_get_ui16(payload); if (manifest_length==0) continue; - unsigned char *data = ob_get_bytes_ptr(f->payload, manifest_length); + unsigned char *data = ob_get_bytes_ptr(payload, manifest_length); if (!data) { WHYF("Illegal manifest length field in rhizome advertisement frame %d vs %d.", - manifest_length, f->payload->sizeLimit - f->payload->position); + manifest_length, payload->sizeLimit - payload->position); break; } @@ -377,7 +397,7 @@ int overlay_rhizome_saw_advertisements(int i, struct overlay_frame *f, long long DEBUG("Not seen before."); // start the fetch process! - rhizome_suggest_queue_manifest_import(m, &httpaddr,f->source->sid); + rhizome_suggest_queue_manifest_import(m, &httpaddr, src_sid); // the above function will free the manifest structure, make sure we don't free it again m=NULL; @@ -390,22 +410,20 @@ int overlay_rhizome_saw_advertisements(int i, struct overlay_frame *f, long long } // if we're using the new sync protocol, ignore the rest of the packet - if (f->source->sync_state) + if (use_new_sync_protocol) goto end; - overlay_mdp_frame mdp; - - bzero(&mdp,sizeof(mdp)); - mdp.out.payload_length=0; - + overlay_mdp_frame *mdp = calloc(1, sizeof(overlay_mdp_frame)); + if (!mdp) OUT_OF_MEMORY; + // parse BAR's unsigned char *bars[50]; int bar_count=0; - while(ob_remaining(f->payload)>0 && bar_count<50){ + while(ob_remaining(payload)>0 && bar_count<50){ unsigned char *bar; - bars[bar_count]=bar=ob_get_bytes_ptr(f->payload, RHIZOME_BAR_BYTES); + bars[bar_count]=bar=ob_get_bytes_ptr(payload, RHIZOME_BAR_BYTES); if (!bar){ - WARNF("Expected whole BAR @%x (only %d bytes remain)", ob_position(f->payload), ob_remaining(f->payload)); + WARNF("Expected whole BAR @%x (only %d bytes remain)", ob_position(payload), ob_remaining(payload)); break; } @@ -444,22 +462,21 @@ int overlay_rhizome_saw_advertisements(int i, struct overlay_frame *f, long long test_count++; if (rhizome_is_bar_interesting(bars[index])==1){ // add a request for the manifest - if (mdp.out.payload_length==0){ - bcopy(my_subscriber->sid,mdp.out.src.sid,SID_SIZE); - mdp.out.src.port=MDP_PORT_RHIZOME_RESPONSE; - bcopy(f->source->sid,mdp.out.dst.sid,SID_SIZE); - mdp.out.dst.port=MDP_PORT_RHIZOME_MANIFEST_REQUEST; - if (f->source->reachable&REACHABLE_DIRECT) - mdp.out.ttl=1; - else - mdp.out.ttl=64; - mdp.packetTypeAndFlags=MDP_TX; - - mdp.out.queue=OQ_ORDINARY; + if (mdp->out.payload_length == 0) { + bcopy(my_subscriber->sid, mdp->out.src.sid,SID_SIZE); + mdp->out.src.port = MDP_PORT_RHIZOME_RESPONSE; + bcopy(src_sid, mdp->out.dst.sid, SID_SIZE); + mdp->out.dst.port = MDP_PORT_RHIZOME_MANIFEST_REQUEST; + if (src_reachable & REACHABLE_DIRECT) + mdp->out.ttl = 1; + else + mdp->out.ttl = 64; + mdp->packetTypeAndFlags = MDP_TX; + mdp->out.queue = OQ_ORDINARY; } DEBUGF("Requesting manifest for BAR %s", alloca_tohex(bars[index], RHIZOME_BAR_BYTES)); - bcopy(bars[index], &mdp.out.payload[mdp.out.payload_length], RHIZOME_BAR_BYTES); - mdp.out.payload_length+=RHIZOME_BAR_BYTES; + bcopy(bars[index], &mdp->out.payload[mdp->out.payload_length], RHIZOME_BAR_BYTES); + mdp->out.payload_length += RHIZOME_BAR_BYTES; } } @@ -470,8 +487,11 @@ int overlay_rhizome_saw_advertisements(int i, struct overlay_frame *f, long long else lookup_time = (end_time - start_time); - if (mdp.out.payload_length>0) - overlay_mdp_dispatch(&mdp,0 /* system generated */,NULL,0); + if (mdp->out.payload_length > 0) { + post_runnable(overlay_mdp_dispatch_alarm, mdp, &main_fdqueue); + } else { + free(mdp); + } end: sqlite_set_tracefunc(oldfunc); diff --git a/rhizome_sync.c b/rhizome_sync.c index d91ee770..83c7cf19 100644 --- a/rhizome_sync.c +++ b/rhizome_sync.c @@ -24,6 +24,7 @@ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. #include "mdp_client.h" #include "log.h" #include "conf.h" +#include "parallel.h" #define MSG_TYPE_BARS 0 #define MSG_TYPE_REQ 1 @@ -55,38 +56,53 @@ struct rhizome_sync struct bar_entry bars[CACHE_BARS]; }; -static void rhizome_sync_request(struct subscriber *subscriber, uint64_t token, unsigned char forwards) +static void rhizome_sync_request(unsigned char *sid, uint64_t token, unsigned char forwards) { - overlay_mdp_frame mdp; - bzero(&mdp,sizeof(mdp)); + overlay_mdp_frame *mdp = calloc(1, sizeof(overlay_mdp_frame)); + if (!mdp) OUT_OF_MEMORY; - bcopy(my_subscriber->sid,mdp.out.src.sid,SID_SIZE); - mdp.out.src.port=MDP_PORT_RHIZOME_SYNC; - bcopy(subscriber->sid,mdp.out.dst.sid,SID_SIZE); - mdp.out.dst.port=MDP_PORT_RHIZOME_SYNC; - mdp.packetTypeAndFlags=MDP_TX; - mdp.out.queue=OQ_OPPORTUNISTIC; + bcopy(my_subscriber->sid, mdp->out.src.sid, SID_SIZE); + mdp->out.src.port = MDP_PORT_RHIZOME_SYNC; + bcopy(sid, mdp->out.dst.sid, SID_SIZE); + mdp->out.dst.port = MDP_PORT_RHIZOME_SYNC; + mdp->packetTypeAndFlags = MDP_TX; + mdp->out.queue = OQ_OPPORTUNISTIC; - struct overlay_buffer *b = ob_static(mdp.out.payload, sizeof(mdp.out.payload)); + struct overlay_buffer *b = ob_static(mdp->out.payload, sizeof(mdp->out.payload)); ob_append_byte(b, MSG_TYPE_REQ); ob_append_byte(b, forwards); ob_append_packed_ui64(b, token); - mdp.out.payload_length = ob_position(b); + mdp->out.payload_length = ob_position(b); if (config.debug.rhizome) - DEBUGF("Sending request to %s for BARs from %"PRIu64" %s", alloca_tohex_sid(subscriber->sid), token, forwards?"forwards":"backwards"); - overlay_mdp_dispatch(&mdp,0,NULL,0); + DEBUGF("Sending request to %s for BARs from %"PRIu64" %s", alloca_tohex_sid(sid), token, forwards?"forwards":"backwards"); + + post_runnable(overlay_mdp_dispatch_alarm, mdp, &main_fdqueue); ob_free(b); } -static void rhizome_sync_send_requests(struct subscriber *subscriber, struct rhizome_sync *state) +static void rhizome_sync_send_requests(unsigned char *sid, + struct rhizome_sync *state); + +void rhizome_sync_send_requests_alarm(struct sched_ent *alarm) +{ + ASSERT_THREAD(rhizome_thread); + struct rssr_arg *arg = alarm->context; + free(alarm); + rhizome_sync_send_requests(arg->sid, arg->state); + free(arg); +} + +static void rhizome_sync_send_requests(unsigned char *sid, + struct rhizome_sync *state) { + ASSERT_THREAD(rhizome_thread); int i, requests=0; time_ms_t now = gettime_ms(); // send requests for manifests that we have room to fetch - overlay_mdp_frame mdp; - bzero(&mdp,sizeof(mdp)); + overlay_mdp_frame *mdp = calloc(1, sizeof(overlay_mdp_frame)); + if (!mdp) OUT_OF_MEMORY; for (i=0;i < state->bar_count;i++){ if (state->bars[i].next_request > now) @@ -108,28 +124,31 @@ static void rhizome_sync_send_requests(struct subscriber *subscriber, struct rhi if (m && m->version >= version) continue; - if (mdp.out.payload_length==0){ - bcopy(my_subscriber->sid,mdp.out.src.sid,SID_SIZE); - mdp.out.src.port=MDP_PORT_RHIZOME_RESPONSE; - bcopy(subscriber->sid,mdp.out.dst.sid,SID_SIZE); - mdp.out.dst.port=MDP_PORT_RHIZOME_MANIFEST_REQUEST; - mdp.packetTypeAndFlags=MDP_TX; + if (mdp->out.payload_length == 0) { + bcopy(my_subscriber->sid, mdp->out.src.sid, SID_SIZE); + mdp->out.src.port = MDP_PORT_RHIZOME_RESPONSE; + bcopy(sid, mdp->out.dst.sid, SID_SIZE); + mdp->out.dst.port = MDP_PORT_RHIZOME_MANIFEST_REQUEST; + mdp->packetTypeAndFlags = MDP_TX; - mdp.out.queue=OQ_OPPORTUNISTIC; + mdp->out.queue = OQ_OPPORTUNISTIC; } - if (mdp.out.payload_length + RHIZOME_BAR_BYTES>MDP_MTU) + if (mdp->out.payload_length + RHIZOME_BAR_BYTES>MDP_MTU) break; if (config.debug.rhizome) DEBUGF("Requesting manifest for BAR %s", alloca_tohex(state->bars[i].bar, RHIZOME_BAR_BYTES)); - bcopy(state->bars[i].bar, &mdp.out.payload[mdp.out.payload_length], RHIZOME_BAR_BYTES); - mdp.out.payload_length+=RHIZOME_BAR_BYTES; + bcopy(state->bars[i].bar, &mdp->out.payload[mdp->out.payload_length], RHIZOME_BAR_BYTES); + mdp->out.payload_length += RHIZOME_BAR_BYTES; state->bars[i].next_request = now+1000; requests++; if (requests>=BARS_PER_RESPONSE) break; } - if (mdp.out.payload_length!=0) - overlay_mdp_dispatch(&mdp,0,NULL,0); + if (mdp->out.payload_length != 0) { + post_runnable(overlay_mdp_dispatch_alarm, mdp, &main_fdqueue); + } else { + free(mdp); + } // send request for more bars if we have room to cache them if (state->bar_count >= CACHE_BARS) @@ -137,13 +156,13 @@ static void rhizome_sync_send_requests(struct subscriber *subscriber, struct rhi if (state->next_request<=now){ if (state->sync_end < state->highest_seen){ - rhizome_sync_request(subscriber, state->sync_end, 1); + rhizome_sync_request(sid, state->sync_end, 1); }else if(state->sync_start >0){ - rhizome_sync_request(subscriber, state->sync_start, 0); + rhizome_sync_request(sid, state->sync_start, 0); }else if(!state->sync_complete){ state->sync_complete = 1; if (config.debug.rhizome) - DEBUGF("BAR sync with %s complete", alloca_tohex_sid(subscriber->sid)); + DEBUGF("BAR sync with %s complete", alloca_tohex_sid(sid)); } state->next_request = now+5000; } @@ -310,26 +329,26 @@ static uint64_t max_token=0; static void sync_send_response(struct subscriber *dest, int forwards, uint64_t token) { IN(); - overlay_mdp_frame mdp; - bzero(&mdp,sizeof(mdp)); + overlay_mdp_frame *mdp = calloc(1, sizeof(overlay_mdp_frame)); + if (!mdp) OUT_OF_MEMORY; - bcopy(my_subscriber->sid,mdp.out.src.sid,SID_SIZE); - mdp.out.src.port=MDP_PORT_RHIZOME_SYNC; - mdp.out.dst.port=MDP_PORT_RHIZOME_SYNC; - mdp.packetTypeAndFlags=MDP_TX; - mdp.out.queue=OQ_OPPORTUNISTIC; + bcopy(my_subscriber->sid, mdp->out.src.sid, SID_SIZE); + mdp->out.src.port = MDP_PORT_RHIZOME_SYNC; + mdp->out.dst.port = MDP_PORT_RHIZOME_SYNC; + mdp->packetTypeAndFlags = MDP_TX; + mdp->out.queue = OQ_OPPORTUNISTIC; if (dest){ - bcopy(dest->sid,mdp.out.dst.sid,SID_SIZE); + bcopy(dest->sid, mdp->out.dst.sid, SID_SIZE); }else{ - memset(mdp.out.dst.sid, 0xFF, SID_SIZE); - mdp.packetTypeAndFlags|=(MDP_NOCRYPT|MDP_NOSIGN); + memset(mdp->out.dst.sid, 0xFF, SID_SIZE); + mdp->packetTypeAndFlags|=(MDP_NOCRYPT|MDP_NOSIGN); } if (!dest) - mdp.out.ttl=1; + mdp->out.ttl=1; - struct overlay_buffer *b = ob_static(mdp.out.payload, sizeof(mdp.out.payload)); + struct overlay_buffer *b = ob_static(mdp->out.payload, sizeof(mdp->out.payload)); ob_append_byte(b, MSG_TYPE_BARS); ob_checkpoint(b); @@ -403,10 +422,12 @@ static void sync_send_response(struct subscriber *dest, int forwards, uint64_t t sqlite3_finalize(statement); if (count){ - mdp.out.payload_length = ob_position(b); + mdp->out.payload_length = ob_position(b); if (config.debug.rhizome) DEBUGF("Sending %d BARs from %"PRIu64" to %"PRIu64, count, token, last); - overlay_mdp_dispatch(&mdp,0,NULL,0); + post_runnable(overlay_mdp_dispatch_alarm, mdp, &main_fdqueue); + } else { + free(mdp); } ob_free(b); OUT(); @@ -442,7 +463,11 @@ int overlay_mdp_service_rhizome_sync(struct overlay_frame *frame, overlay_mdp_fr break; } ob_free(b); - rhizome_sync_send_requests(frame->source, state); + struct rssr_arg *arg = malloc(sizeof(struct rssr_arg)); + if (!arg) OUT_OF_MEMORY; + memcpy(arg->sid, frame->source->sid, SID_SIZE); + arg->state = state; + post_runnable(rhizome_sync_send_requests_alarm, arg, &rhizome_fdqueue); return 0; } diff --git a/serval.h b/serval.h index 0726eca9..50c85e78 100644 --- a/serval.h +++ b/serval.h @@ -308,6 +308,12 @@ struct call_stats{ struct call_stats *prev; }; +#define MS_TO_TIMESPEC(MS, TS)\ + (TS)->tv_sec = (MS) / 1000;\ + (TS)->tv_nsec = ((MS) % 1000) * 1000000; + +#include "fdqueue.h" + struct sched_ent; typedef void (*ALARM_FUNCP) (struct sched_ent *alarm); @@ -325,6 +331,7 @@ struct sched_ent{ time_ms_t deadline; struct profile_total *stats; int _poll_index; + fdqueue *fdqueue; }; struct limit_state{ @@ -544,7 +551,12 @@ int overlay_queue_schedule_next(time_ms_t next_allowed_packet); int overlay_send_tick_packet(struct overlay_interface *interface); int overlay_queue_ack(struct subscriber *neighbour, struct overlay_interface *interface, uint32_t ack_mask, int ack_seq); -int overlay_rhizome_saw_advertisements(int i, struct overlay_frame *f, time_ms_t now); +int overlay_rhizome_saw_advertisements(int i, struct overlay_buffer *payload, + unsigned char *src_sid, + int src_reachable, + int use_new_sync_protocol, + struct sockaddr_in recvaddr, + time_ms_t now); int rhizome_server_get_fds(struct pollfd *fds,int *fdcount,int fdmax); int rhizome_saw_voice_traffic(); int overlay_saw_mdp_containing_frame(struct overlay_frame *f, time_ms_t now); @@ -747,7 +759,7 @@ int _unwatch(struct __sourceloc whence, struct sched_ent *alarm); #define unschedule(alarm) _unschedule(__WHENCE__, alarm) #define watch(alarm) _watch(__WHENCE__, alarm) #define unwatch(alarm) _unwatch(__WHENCE__, alarm) -int fd_poll(); +int fd_poll(fdqueue *fdqueue, int wait); void overlay_interface_discover(struct sched_ent *alarm); void overlay_packetradio_poll(struct sched_ent *alarm); @@ -781,18 +793,20 @@ int limit_is_allowed(struct limit_state *state); int limit_init(struct limit_state *state, int rate_micro_seconds); /* function timing routines */ -int fd_clearstats(); -int fd_showstats(); -int fd_checkalarms(); -int fd_func_enter(struct __sourceloc __whence, struct call_stats *this_call); -int fd_func_exit(struct __sourceloc __whence, struct call_stats *this_call); -void dump_stack(int log_level); +int fd_clearstats(fdqueue *fdq); +int fd_showstats(fdqueue *fdq); +int fd_func_enter(struct __sourceloc __whence, fdqueue * fdq, + struct call_stats *this_call); +int fd_func_exit(struct __sourceloc __whence, fdqueue * fdq, + struct call_stats *this_call); +void dump_stack(fdqueue *fdq, int log_level); +void dump_stacks(int log_level); #define IN() static struct profile_total _aggregate_stats={NULL,0,__FUNCTION__,0,0,0}; \ struct call_stats _this_call={.totals=&_aggregate_stats}; \ - fd_func_enter(__HERE__, &_this_call); + fd_func_enter(__HERE__, current_fdqueue(), &_this_call); -#define OUT() fd_func_exit(__HERE__, &_this_call) +#define OUT() fd_func_exit(__HERE__, current_fdqueue(), &_this_call) #define RETURN(X) do { OUT(); return (X); } while (0); #define RETURNNULL do { OUT(); return (NULL); } while (0); diff --git a/server.c b/server.c index 31e52110..2db4dcdc 100644 --- a/server.c +++ b/server.c @@ -374,7 +374,7 @@ void signal_handler(int signal) LOGF(LOG_LEVEL_FATAL, "Caught signal %s", buf); LOGF(LOG_LEVEL_FATAL, "The following clue may help: %s",crash_handler_clue); - dump_stack(LOG_LEVEL_FATAL); + dump_stacks(LOG_LEVEL_FATAL); serverCleanUp(); exit(0); @@ -387,8 +387,8 @@ void crash_handler(int signal) signame(buf, sizeof(buf), signal); LOGF(LOG_LEVEL_FATAL, "Caught signal %s", buf); LOGF(LOG_LEVEL_FATAL, "The following clue may help: %s",crash_handler_clue); - dump_stack(LOG_LEVEL_FATAL); - + dump_stacks(LOG_LEVEL_FATAL); + BACKTRACE; if (config.server.respawn_on_crash) { int i; diff --git a/sourcefiles.mk b/sourcefiles.mk index eb63a20e..b0fe2a1d 100644 --- a/sourcefiles.mk +++ b/sourcefiles.mk @@ -40,6 +40,7 @@ SERVAL_SOURCES = \ $(SERVAL_BASE)overlay_packetformats.c \ $(SERVAL_BASE)overlay_payload.c \ $(SERVAL_BASE)packetformats.c \ + $(SERVAL_BASE)parallel.c \ $(SERVAL_BASE)performance_timing.c \ $(SERVAL_BASE)randombytes.c \ $(SERVAL_BASE)route_link.c \ diff --git a/vomp_console.c b/vomp_console.c index a3ee4c05..c3de2c7d 100644 --- a/vomp_console.c +++ b/vomp_console.c @@ -358,7 +358,7 @@ int app_vomp_console(const struct cli_parsed *parsed, struct cli_context *contex watch(&stdin_state.alarm); while(monitor_client_fd!=-1){ - fd_poll(); + fd_poll(&main_fdqueue, 1); } unwatch(&stdin_state.alarm);