diff --git a/protobuf-c-rpc/protobuf-c-rpc-client.c b/protobuf-c-rpc/protobuf-c-rpc-client.c index 0dd25a5..d2bab9f 100644 --- a/protobuf-c-rpc/protobuf-c-rpc-client.c +++ b/protobuf-c-rpc/protobuf-c-rpc-client.c @@ -1,19 +1,33 @@ #include #include -#include +#ifdef WIN32 +#define _WINSOCK_DEPRECATED_NO_WARNINGS +#include +#include +#include "protobuf-c-rpc-win.h" +#else #include #include #include +#include +#endif #include +#ifndef _WIN32_WCE #include +#include +#endif #include #include #include -#include #include "protobuf-c-rpc.h" #include "protobuf-c-rpc-data-buffer.h" +#ifdef WIN32 +// Visual C++ knows about inline and __inline, but Visual C only knows about __inline. +#define inline __inline +#endif + #define protobuf_c_rpc_assert(x) assert(x) #undef TRUE @@ -106,9 +120,14 @@ static void destroy_client_rpc (ProtobufCService *service); static void set_fd_nonblocking(int fd) { +#ifndef WIN32 /*LINUX*/ int flags = fcntl (fd, F_GETFL); protobuf_c_rpc_assert (flags >= 0); fcntl (fd, F_SETFL, flags | O_NONBLOCK); +#else + unsigned long flags = 1UL; + ioctlsocket(fd, FIONBIO, &flags); +#endif } static void @@ -228,14 +247,18 @@ set_state_connected (ProtobufC_RPC_Client *client) } static void -handle_client_fd_connect_events (int fd, - unsigned events, - void *callback_data) +handle_client_fd_connect_events (ProtobufC_RPC_FD fd, + unsigned events, + void *callback_data) { ProtobufC_RPC_Client *client = callback_data; socklen_t size_int = sizeof (int); int fd_errno = EINVAL; +#ifndef WIN32 /*LINUX*/ if (getsockopt (fd, SOL_SOCKET, SO_ERROR, &fd_errno, &size_int) < 0) +#elif defined(WIN32) + if (getsockopt (fd, SOL_SOCKET, SO_ERROR, (char*) &fd_errno, &size_int) < 0) +#endif { /* Note: this behavior is vaguely hypothetically broken, * in terms of ignoring getsockopt's error; @@ -284,7 +307,11 @@ begin_connecting (ProtobufC_RPC_Client *client, set_fd_nonblocking (client->fd); if (connect (client->fd, address, addr_len) < 0) { +#ifndef WIN32 /*LINUX*/ if (errno == EINPROGRESS) +#elif defined(WIN32) + if (WSAGetLastError() == WSAEWOULDBLOCK) +#endif { /* register interest in fd */ protobuf_c_rpc_dispatch_watch_fd (client->dispatch, @@ -349,6 +376,7 @@ begin_name_lookup (ProtobufC_RPC_Client *client) client->info.name_lookup.pending = 0; switch (client->address_type) { +#ifndef WIN32 /*LINUX*/ case PROTOBUF_C_RPC_ADDRESS_LOCAL: { struct sockaddr_un addr; @@ -358,7 +386,7 @@ begin_name_lookup (ProtobufC_RPC_Client *client) sizeof (addr)); return; } - +#endif case PROTOBUF_C_RPC_ADDRESS_TCP: { /* parse hostname:port from client->name */ @@ -454,6 +482,7 @@ enqueue_request (ProtobufC_RPC_Client *client, { const ProtobufCServiceDescriptor *desc = client->base_service.descriptor; const ProtobufCMethodDescriptor *method = desc->methods + method_index; + uint32_t request_id; protobuf_c_rpc_assert (method_index < desc->n_methods); @@ -462,22 +491,23 @@ enqueue_request (ProtobufC_RPC_Client *client, if (client->info.connected.first_free_request_id == 0) grow_closure_array (client); - uint32_t request_id = client->info.connected.first_free_request_id; + request_id = client->info.connected.first_free_request_id; /* Serialize the message */ - ProtobufC_RPC_Payload payload = {method_index, - request_id, - (ProtobufCMessage *)input}; - - client->rpc_protocol.serialize_func (desc, client->allocator, + { + ProtobufC_RPC_Payload payload = {method_index, request_id, (ProtobufCMessage *)input}; + client->rpc_protocol.serialize_func (desc, client->allocator, &client->outgoing.base, payload); + } /* Add closure to request-tree */ - Closure *cl = client->info.connected.closures + (request_id - 1); - client->info.connected.first_free_request_id = POINTER_TO_UINT (cl->closure_data); - cl->response_type = method->output; - cl->closure = closure; - cl->closure_data = closure_data; + { + Closure *cl = client->info.connected.closures + (request_id - 1); + client->info.connected.first_free_request_id = POINTER_TO_UINT (cl->closure_data); + cl->response_type = method->output; + cl->closure = closure; + cl->closure_data = closure_data; + } } static const ProtobufCMessageDescriptor * @@ -494,13 +524,14 @@ get_rcvd_message_descriptor (const ProtobufC_RPC_Payload *payload, void *data) client_failed (client, "bad request-id in response from server"); return NULL; } - - Closure *closure = client->info.connected.closures + (request_id - 1); - return closure->response_type; + { + Closure *closure = client->info.connected.closures + (request_id - 1); + return closure->response_type; + } } static void -handle_client_fd_events (int fd, +handle_client_fd_events (ProtobufC_RPC_FD fd, unsigned events, void *func_data) { @@ -569,12 +600,14 @@ handle_client_fd_events (int fd, } /* invoke closure */ - Closure *closure = client->info.connected.closures + (payload.request_id - 1); - closure->closure (payload.message, closure->closure_data); - closure->response_type = NULL; - closure->closure = NULL; - closure->closure_data = UINT_TO_POINTER (client->info.connected.first_free_request_id); - client->info.connected.first_free_request_id = payload.request_id; + { + Closure *closure = client->info.connected.closures + (payload.request_id - 1); + closure->closure (payload.message, closure->closure_data); + closure->response_type = NULL; + closure->closure = NULL; + closure->closure_data = UINT_TO_POINTER (client->info.connected.first_free_request_id); + client->info.connected.first_free_request_id = payload.request_id; + } /* clean up */ if (payload.message) @@ -606,19 +639,22 @@ static ProtobufC_RPC_Protocol_Status client_serialize (const ProtobufCServiceDes if (!protobuf_c_message_check (payload.message)) return PROTOBUF_C_RPC_PROTOCOL_STATUS_FAILED; - size_t message_length = protobuf_c_message_get_packed_size (payload.message); - uint32_t header[3]; - header[0] = uint32_to_le (payload.method_index); - header[1] = uint32_to_le (message_length); - header[2] = payload.request_id; - out_buffer->append (out_buffer, sizeof (header), (const uint8_t *) header); - - size_t packed_length = protobuf_c_message_pack_to_buffer (payload.message, + { + size_t message_length = protobuf_c_message_get_packed_size (payload.message); + uint32_t header[3]; + header[0] = uint32_to_le (payload.method_index); + header[1] = uint32_to_le (message_length); + header[2] = payload.request_id; + out_buffer->append (out_buffer, sizeof (header), (const uint8_t *) header); + { + size_t packed_length = protobuf_c_message_pack_to_buffer (payload.message, out_buffer); - if (packed_length != message_length) - return PROTOBUF_C_RPC_PROTOCOL_STATUS_FAILED; + if (packed_length != message_length) + return PROTOBUF_C_RPC_PROTOCOL_STATUS_FAILED; - return PROTOBUF_C_RPC_PROTOCOL_STATUS_SUCCESS; + return PROTOBUF_C_RPC_PROTOCOL_STATUS_SUCCESS; + } + } } static ProtobufC_RPC_Protocol_Status client_deserialize (const ProtobufCServiceDescriptor *descriptor, @@ -628,18 +664,22 @@ static ProtobufC_RPC_Protocol_Status client_deserialize (const ProtobufCServiceD ProtobufC_RPC_Get_Descriptor get_descriptor, void *get_descriptor_data) { + uint32_t header[4]; + uint32_t status_code; + uint32_t message_length; + ProtobufCMessage *msg; + if (!allocator || !in_buffer || !payload) return PROTOBUF_C_RPC_PROTOCOL_STATUS_FAILED; - uint32_t header[4]; if (in_buffer->size < sizeof (header)) return PROTOBUF_C_RPC_PROTOCOL_STATUS_INCOMPLETE_BUFFER; /* try processing buffer */ protobuf_c_rpc_data_buffer_peek (in_buffer, header, sizeof (header)); - uint32_t status_code = uint32_from_le (header[0]); + status_code = uint32_from_le (header[0]); payload->method_index = uint32_from_le (header[1]); - uint32_t message_length = uint32_from_le (header[2]); + message_length = uint32_from_le (header[2]); payload->request_id = header[3]; /* already native-endian */ if (sizeof (header) + message_length > in_buffer->size) @@ -648,15 +688,16 @@ static ProtobufC_RPC_Protocol_Status client_deserialize (const ProtobufCServiceD /* Discard the RPC header */ protobuf_c_rpc_data_buffer_discard (in_buffer, sizeof (header)); - ProtobufCMessage *msg; if (status_code == PROTOBUF_C_RPC_STATUS_CODE_SUCCESS) { + uint8_t *packed_data; + /* read message and unpack */ const ProtobufCMessageDescriptor *desc = get_descriptor (payload, get_descriptor_data); if (!desc) return PROTOBUF_C_RPC_PROTOCOL_STATUS_FAILED; - uint8_t *packed_data = allocator->alloc (allocator, message_length); + packed_data = allocator->alloc (allocator, message_length); if (!packed_data && message_length > 0) return PROTOBUF_C_RPC_PROTOCOL_STATUS_FAILED; @@ -807,7 +848,31 @@ trivial_sync_libc_resolver (ProtobufCRPCDispatch *dispatch, struct hostent *ent; ent = gethostbyname (name); if (ent == NULL) + { +#ifdef WIN32 + /* hstrerror isn't available on Windows. + Error mapping taken from http://linux.die.net/man/3/hstrerror */ + switch (h_errno) { + case HOST_NOT_FOUND: + failed_func ("The specified host is unknown.", callback_data); + break; + case NO_ADDRESS: + failed_func ("The requested name is valid but does not have an IP address.", callback_data); + break; + case NO_RECOVERY: + failed_func ("A nonrecoverable name server error occurred.", callback_data); + break; + case TRY_AGAIN: + failed_func ("A temporary error occurred on an authoritative name server. Try again later.", callback_data); + break; + default: + failed_func (strerror (h_errno), callback_data); + break; + } +#else failed_func (hstrerror (h_errno), callback_data); +#endif + } else found_func ((const uint8_t *) ent->h_addr_list[0], callback_data); } @@ -836,17 +901,20 @@ ProtobufCService *protobuf_c_rpc_client_new (ProtobufC_RPC_AddressType type, rv->error_handler = error_handler; rv->error_handler_data = "protobuf-c rpc client"; rv->info.init.idle = protobuf_c_rpc_dispatch_add_idle (dispatch, handle_init_idle, rv); - ProtobufC_RPC_Protocol default_rpc_protocol = {client_serialize, client_deserialize}; - rv->rpc_protocol = default_rpc_protocol; - - size_t name_len = strlen (name); - rv->name = allocator->alloc (allocator, name_len + 1); - if (!rv->name) - return NULL; - strncpy (rv->name, name, name_len); - rv->name[name_len] = '\0'; - - return &rv->base_service; + { + ProtobufC_RPC_Protocol default_rpc_protocol = {client_serialize, client_deserialize}; + size_t name_len; + + rv->rpc_protocol = default_rpc_protocol; + name_len = strlen (name); + rv->name = allocator->alloc (allocator, name_len + 1); + if (!rv->name) + return NULL; + strncpy (rv->name, name, name_len); + rv->name[name_len] = '\0'; + + return &rv->base_service; + } } protobuf_c_boolean diff --git a/protobuf-c-rpc/protobuf-c-rpc-data-buffer.c b/protobuf-c-rpc/protobuf-c-rpc-data-buffer.c index 2f77ad1..59019c9 100644 --- a/protobuf-c-rpc/protobuf-c-rpc-data-buffer.c +++ b/protobuf-c-rpc/protobuf-c-rpc-data-buffer.c @@ -18,7 +18,9 @@ #define BUFFER_RECYCLING 0 +#ifndef _WIN32_WCE #include +#endif #if HAVE_SYS_UIO_H /* writev function isn't available on Windows */ #include #endif @@ -33,6 +35,14 @@ #if HAVE_ALLOCA_H # include #endif + +#ifdef WIN32 +#define _WINSOCK_DEPRECATED_NO_WARNINGS +#include +#include +#include "protobuf-c-rpc-win.h" +#endif + #include #include "protobuf-c-rpc.h" #include "protobuf-c-rpc-data-buffer.h" @@ -45,6 +55,11 @@ #define PROTOBUF_C_RPC_FRAGMENT_DATA_SIZE 4096 #define PROTOBUF_C_RPC_FRAGMENT_DATA(frag) ((uint8_t*)(((ProtobufCRPCDataBufferFragment*)(frag))+1)) +#ifdef WIN32 +// Visual C++ knows about inline and __inline, but Visual C only knows about __inline. +#define inline __inline +#endif + /* --- ProtobufCRPCDataBufferFragment implementation --- */ static inline int protobuf_c_rpc_data_buffer_fragment_avail (ProtobufCRPCDataBufferFragment *frag) @@ -560,7 +575,6 @@ errno_is_ignorable (int e) return e == EINTR || e == EAGAIN; } -#if HAVE_SYS_UIO_H /** * protobuf_c_rpc_data_buffer_writev: * @read_from: buffer to take data from. @@ -576,10 +590,14 @@ errno_is_ignorable (int e) */ int protobuf_c_rpc_data_buffer_writev (ProtobufCRPCDataBuffer *read_from, - int fd) + ProtobufC_RPC_FD fd) { int rv; +#if HAVE_SYS_UIO_H struct iovec *iov; +#elif defined(WIN32) + WSABUF *iov; +#endif int nfrag, i; ProtobufCRPCDataBufferFragment *frag_at = read_from->first_frag; CHECK_INTEGRITY (read_from); @@ -589,25 +607,37 @@ protobuf_c_rpc_data_buffer_writev (ProtobufCRPCDataBuffer *read_from, #endif ; nfrag++) frag_at = frag_at->next; +#if HAVE_SYS_UIO_H iov = (struct iovec *) alloca (sizeof (struct iovec) * nfrag); +#elif defined(WIN32) + iov = (WSABUF *) alloca (sizeof (WSABUF) * nfrag); +#endif frag_at = read_from->first_frag; for (i = 0; i < nfrag; i++) { +#if HAVE_SYS_UIO_H iov[i].iov_len = frag_at->buf_length; iov[i].iov_base = protobuf_c_rpc_data_buffer_fragment_start (frag_at); +#elif defined(WIN32) + iov[i].len = frag_at->buf_length; + iov[i].buf = protobuf_c_rpc_data_buffer_fragment_start (frag_at); +#endif frag_at = frag_at->next; } +#if HAVE_SYS_UIO_H rv = writev (fd, iov, nfrag); if (rv < 0 && errno_is_ignorable (errno)) return 0; if (rv <= 0) return rv; +#elif defined(WIN32) + if (WSASend(fd, iov, nfrag, &rv, 0, NULL, NULL)) + return -1; // consult errno (WSAGetLastError) +#endif protobuf_c_rpc_data_buffer_discard (read_from, rv); return rv; } -#endif -#if HAVE_SYS_UIO_H /** * protobuf_c_rpc_data_buffer_writev_len: * @read_from: buffer to take data from. @@ -626,11 +656,15 @@ protobuf_c_rpc_data_buffer_writev (ProtobufCRPCDataBuffer *read_from, #define MIN(a,b) ((a) < (b) ? (a) : (b)) int protobuf_c_rpc_data_buffer_writev_len (ProtobufCRPCDataBuffer *read_from, - int fd, + ProtobufC_RPC_FD fd, size_t max_bytes) { int rv; +#if HAVE_SYS_UIO_H struct iovec *iov; +#elif defined(WIN32) + WSABUF *iov; +#endif int nfrag, i; size_t bytes; ProtobufCRPCDataBufferFragment *frag_at = read_from->first_frag; @@ -644,25 +678,38 @@ protobuf_c_rpc_data_buffer_writev_len (ProtobufCRPCDataBuffer *read_from, bytes += frag_at->buf_length; frag_at = frag_at->next; } +#if HAVE_SYS_UIO_H iov = (struct iovec *) alloca (sizeof (struct iovec) * nfrag); +#elif defined(WIN32) + iov = (WSABUF *) alloca (sizeof (WSABUF) * nfrag); +#endif frag_at = read_from->first_frag; for (bytes = max_bytes, i = 0; i < nfrag && bytes > 0; i++) { size_t frag_bytes = MIN (frag_at->buf_length, bytes); +#if HAVE_SYS_UIO_H iov[i].iov_len = frag_bytes; iov[i].iov_base = protobuf_c_rpc_data_buffer_fragment_start (frag_at); +#elif defined(WIN32) + iov[i].len = frag_bytes; + iov[i].buf = protobuf_c_rpc_data_buffer_fragment_start (frag_at); +#endif frag_at = frag_at->next; bytes -= frag_bytes; } +#if HAVE_SYS_UIO_H rv = writev (fd, iov, i); if (rv < 0 && errno_is_ignorable (errno)) return 0; if (rv <= 0) return rv; +#elif defined(WIN32) + if (WSASend(fd, iov, i, &rv, 0, NULL, NULL)) + return -1; // consult errno (WSAGetLastError) +#endif protobuf_c_rpc_data_buffer_discard (read_from, rv); return rv; } -#endif /** * protobuf_c_rpc_data_buffer_read_in_fd: @@ -678,7 +725,7 @@ protobuf_c_rpc_data_buffer_writev_len (ProtobufCRPCDataBuffer *read_from, /* TODO: zero-copy! */ int protobuf_c_rpc_data_buffer_read_in_fd(ProtobufCRPCDataBuffer *write_to, - int read_from) + ProtobufC_RPC_FD read_from) { char buf[8192]; int rv = read (read_from, buf, sizeof (buf)); diff --git a/protobuf-c-rpc/protobuf-c-rpc-data-buffer.h b/protobuf-c-rpc/protobuf-c-rpc-data-buffer.h index e6f2c0b..455cf26 100644 --- a/protobuf-c-rpc/protobuf-c-rpc-data-buffer.h +++ b/protobuf-c-rpc/protobuf-c-rpc-data-buffer.h @@ -116,12 +116,12 @@ size_t protobuf_c_rpc_data_buffer_transfer (ProtobufCRPCDataBuffer /* file-descriptor mucking */ int protobuf_c_rpc_data_buffer_writev (ProtobufCRPCDataBuffer *read_from, - int fd); + ProtobufC_RPC_FD fd); int protobuf_c_rpc_data_buffer_writev_len (ProtobufCRPCDataBuffer *read_from, - int fd, + ProtobufC_RPC_FD fd, size_t max_bytes); int protobuf_c_rpc_data_buffer_read_in_fd (ProtobufCRPCDataBuffer *write_to, - int read_from); + ProtobufC_RPC_FD read_from); /* This deallocates memory used by the buffer-- you are responsible * for the allocation and deallocation of the ProtobufCRPCDataBuffer itself. */ diff --git a/protobuf-c-rpc/protobuf-c-rpc-dispatch.c b/protobuf-c-rpc/protobuf-c-rpc-dispatch.c index ec8f552..ae6b7ee 100644 --- a/protobuf-c-rpc/protobuf-c-rpc-dispatch.c +++ b/protobuf-c-rpc/protobuf-c-rpc-dispatch.c @@ -38,40 +38,49 @@ #if HAVE_ALLOCA_H # include #endif +#ifndef WIN32 #include #include +#endif + #include #include #include #if HAVE_SYS_POLL_H # include # define USE_POLL 1 +#elif WIN32 +# define _WINSOCK_DEPRECATED_NO_WARNINGS +# include +# include +# include "protobuf-c-rpc-win.h" +# if(_WIN32_WINNT >= 0x0600) +# define USE_POLL 1 +# else +/* Windows XP / WinCE select() path */ +# define USE_POLL 0 +# endif #elif HAVE_SYS_SELECT_H # include # define USE_POLL 0 #endif - -/* windows annoyances: use select, use a full-fledges map for fds */ -#ifdef WIN32 -# include -# define USE_POLL 0 -# define HAVE_SMALL_FDS 0 -#endif #include #include +#ifndef _WIN32_WCE #include +#endif #include "protobuf-c-rpc.h" #include "protobuf-c-rpc-dispatch.h" -#include "gskrbtreemacros.h" #include "gsklistmacros.h" +#ifdef WIN32 +// Visual C++ knows about inline and __inline, but Visual C only knows about __inline. +#define inline __inline +#endif + #define DEBUG_DISPATCH_INTERNALS 0 #define DEBUG_DISPATCH 0 -#ifndef HAVE_SMALL_FDS -# define HAVE_SMALL_FDS 1 -#endif - #define protobuf_c_rpc_assert(condition) assert(condition) #define ALLOC_WITH_ALLOCATOR(allocator, size) ((allocator)->alloc ((allocator)->allocator_data, (size))) @@ -92,23 +101,12 @@ struct _Callback typedef struct _FDMap FDMap; struct _FDMap { + int fd; int notify_desired_index; /* -1 if not an known fd */ int change_index; /* -1 if no prior change */ int closed_since_notify_started; }; -#if !HAVE_SMALL_FDS -typedef struct _FDMapNode FDMapNode; -struct _FDMapNode -{ - ProtobufC_RPC_FD fd; - FDMapNode *left, *right, *parent; - protobuf_c_boolean is_red; - FDMap map; -}; -#endif - - typedef struct _RealDispatch RealDispatch; struct _RealDispatch { @@ -116,16 +114,13 @@ struct _RealDispatch Callback *callbacks; /* parallels notifies_desired */ size_t notifies_desired_alloced; size_t changes_alloced; -#if HAVE_SMALL_FDS - FDMap *fd_map; /* map indexed by fd */ - size_t fd_map_size; /* number of elements of fd_map */ -#else - FDMapNode *fd_map_tree; /* map indexed by fd */ -#endif + FDMap *fd_map; /* map sorted by fd */ + size_t fd_map_size; /* number of valid elements of fd_map */ + size_t fd_map_capacity; /* number of elements allocated in fd_map */ protobuf_c_boolean is_dispatching; - ProtobufCRPCDispatchTimer *timer_tree; + ProtobufCRPCDispatchTimer *timer_list; ProtobufCAllocator *allocator; ProtobufCRPCDispatchTimer *recycled_timeouts; @@ -141,9 +136,7 @@ struct _ProtobufCRPCDispatchTimer unsigned long timeout_secs; unsigned timeout_usecs; - /* red-black tree stuff */ - ProtobufCRPCDispatchTimer *left, *right, *parent; - protobuf_c_boolean is_red; + ProtobufCRPCDispatchTimer *prev, *next; /* user callback */ ProtobufCRPCDispatchTimerFunc func; @@ -160,38 +153,56 @@ struct _ProtobufCRPCDispatchIdle ProtobufCRPCDispatchIdleFunc func; void *func_data; }; -/* Define the tree of timers, as per gskrbtreemacros.h */ -#define TIMER_GET_IS_RED(n) ((n)->is_red) -#define TIMER_SET_IS_RED(n,v) ((n)->is_red = (v)) -#define TIMERS_COMPARE(a,b, rv) \ - if (a->timeout_secs < b->timeout_secs) rv = -1; \ - else if (a->timeout_secs > b->timeout_secs) rv = 1; \ - else if (a->timeout_usecs < b->timeout_usecs) rv = -1; \ - else if (a->timeout_usecs > b->timeout_usecs) rv = 1; \ - else if (a < b) rv = -1; \ - else if (a > b) rv = 1; \ - else rv = 0; -#define GET_TIMER_TREE(d) \ - (d)->timer_tree, ProtobufCRPCDispatchTimer *, \ - TIMER_GET_IS_RED, TIMER_SET_IS_RED, \ - parent, left, right, \ - TIMERS_COMPARE - -#if !HAVE_SMALL_FDS -#define FD_MAP_NODES_COMPARE(a,b, rv) \ - if (a->fd < b->fd) rv = -1; \ - else if (a->fd > b->fd) rv = 1; \ - else rv = 0; -#define GET_FD_MAP_TREE(d) \ - (d)->fd_map_tree, FDMapNode *, \ - TIMER_GET_IS_RED, TIMER_SET_IS_RED, \ - parent, left, right, \ - FD_MAP_NODES_COMPARE -#define COMPARE_FD_TO_FD_MAP_NODE(a,b, rv) \ - if (a < b->fd) rv = -1; \ - else if (a > b->fd) rv = 1; \ - else rv = 0; + +static inline int +compare_timers (ProtobufCRPCDispatchTimer * a, ProtobufCRPCDispatchTimer * b) +{ + if (a->timeout_secs < b->timeout_secs) + return -1; + else if (a->timeout_secs > b->timeout_secs) + return 1; + else if (a->timeout_usecs < b->timeout_usecs) + return -1; + else if (a->timeout_usecs > b->timeout_usecs) + return 1; + else if (a < b) + return -1; + else if (a > b) + return 1; + else + return 0; +} + +static inline void +get_timestamp(struct timeval* timestamp) +{ +#ifndef WIN32 /*LINUX*/ + gettimeofday (timestamp, NULL); +#else + // Number of 100 nanosecond between January 1, 1601 and January 1, 1970. + static const ULONGLONG epochBias = 116444736000000000ULL; + static const ULONGLONG hundredsOfNanosecondsPerMicrosecond = 10; + static const ULONGLONG microsecondsPerSecond = 1000000; + FILETIME fileTime; + ULARGE_INTEGER dateTime; + ULONGLONG t; + + if (!timestamp) + return; + + GetSystemTimeAsFileTime(&fileTime); + // As per Windows documentation for FILETIME, copy the resulting FILETIME structure to a + // ULARGE_INTEGER structure using memcpy (using memcpy instead of direct assignment can + // prevent alignment faults on 64-bit Windows). + memcpy(&dateTime, &fileTime, sizeof(dateTime)); + + // Windows file times are in 100s of nanoseconds. + t = (dateTime.QuadPart - epochBias) / hundredsOfNanosecondsPerMicrosecond; + + timestamp->tv_sec = (long)(t / microsecondsPerSecond); + timestamp->tv_usec = (long)(t % microsecondsPerSecond); #endif +} /* declare the idle-handler list */ #define GET_IDLE_LIST(d) \ @@ -202,6 +213,7 @@ ProtobufCRPCDispatch *protobuf_c_rpc_dispatch_new (ProtobufCAllocator *allocator { RealDispatch *rv = ALLOC (sizeof (RealDispatch)); struct timeval tv; + rv->base.n_changes = 0; rv->notifies_desired_alloced = 8; rv->base.notifies_desired = ALLOC (sizeof (ProtobufC_RPC_FDNotify) * rv->notifies_desired_alloced); @@ -209,44 +221,31 @@ ProtobufCRPCDispatch *protobuf_c_rpc_dispatch_new (ProtobufCAllocator *allocator rv->callbacks = ALLOC (sizeof (Callback) * rv->notifies_desired_alloced); rv->changes_alloced = 8; rv->base.changes = ALLOC (sizeof (ProtobufC_RPC_FDNotifyChange) * rv->changes_alloced); -#if HAVE_SMALL_FDS - rv->fd_map_size = 16; - rv->fd_map = ALLOC (sizeof (FDMap) * rv->fd_map_size); - memset (rv->fd_map, 255, sizeof (FDMap) * rv->fd_map_size); -#else - rv->fd_map_tree = NULL; -#endif + rv->fd_map_size = 0; + rv->fd_map_capacity = 16; + rv->fd_map = ALLOC (sizeof (FDMap) * rv->fd_map_capacity); + memset(rv->fd_map, 255, sizeof(FDMap) * rv->fd_map_capacity); rv->allocator = allocator; - rv->timer_tree = NULL; + rv->timer_list = NULL; rv->first_idle = rv->last_idle = NULL; rv->base.has_idle = 0; rv->recycled_idles = NULL; rv->recycled_timeouts = NULL; rv->is_dispatching = 0; +#ifndef WIN32 /* need to handle SIGPIPE more gracefully than default */ signal (SIGPIPE, SIG_IGN); +#endif + + get_timestamp(&tv); - gettimeofday (&tv, NULL); rv->base.last_dispatch_secs = tv.tv_sec; rv->base.last_dispatch_usecs = tv.tv_usec; return &rv->base; } -#if !HAVE_SMALL_FDS -void free_fd_tree_recursive (ProtobufCAllocator *allocator, - FDMapNode *node) -{ - if (node) - { - free_fd_tree_recursive (allocator, node->left); - free_fd_tree_recursive (allocator, node->right); - FREE (node); - } -} -#endif - /* XXX: leaking timer_tree seemingly? */ void protobuf_c_rpc_dispatch_free(ProtobufCRPCDispatch *dispatch) @@ -256,7 +255,7 @@ protobuf_c_rpc_dispatch_free(ProtobufCRPCDispatch *dispatch) while (d->recycled_timeouts != NULL) { ProtobufCRPCDispatchTimer *t = d->recycled_timeouts; - d->recycled_timeouts = t->right; + d->recycled_timeouts = t->next; FREE (t); } while (d->recycled_idles != NULL) @@ -268,12 +267,7 @@ protobuf_c_rpc_dispatch_free(ProtobufCRPCDispatch *dispatch) FREE (d->base.notifies_desired); FREE (d->base.changes); FREE (d->callbacks); - -#if HAVE_SMALL_FDS FREE (d->fd_map); -#else - free_fd_tree_recursive (allocator, d->fd_map_tree); -#endif FREE (d); } @@ -298,11 +292,18 @@ system_free(void *allocator_data, void *data) free(data); } +/* Visual C doesn't understand this initialization syntax. static ProtobufCAllocator protobuf_c_rpc__allocator = { .alloc = &system_alloc, .free = &system_free, .allocator_data = NULL, }; +*/ +static ProtobufCAllocator protobuf_c_rpc__allocator = { + &system_alloc, + &system_free, + NULL, +}; /* TODO: perhaps thread-private dispatches make more sense? */ static ProtobufCRPCDispatch *def = NULL; @@ -314,34 +315,19 @@ ProtobufCRPCDispatch *protobuf_c_rpc_dispatch_default (void) return def; } -#if HAVE_SMALL_FDS static void -enlarge_fd_map (RealDispatch *d, - unsigned fd) +enlarge_fd_map (RealDispatch *d) { - size_t new_size = d->fd_map_size * 2; + size_t new_capacity = d->fd_map_capacity * 2; FDMap *new_map; ProtobufCAllocator *allocator = d->allocator; - while (fd >= new_size) - new_size *= 2; - new_map = ALLOC (sizeof (FDMap) * new_size); + new_map = ALLOC (sizeof (FDMap) * new_capacity); + memset (new_map, 255, sizeof(FDMap) * new_capacity); memcpy (new_map, d->fd_map, d->fd_map_size * sizeof (FDMap)); - memset (new_map + d->fd_map_size, - 255, - sizeof (FDMap) * (new_size - d->fd_map_size)); FREE (d->fd_map); d->fd_map = new_map; - d->fd_map_size = new_size; -} - -static inline void -ensure_fd_map_big_enough (RealDispatch *d, - unsigned fd) -{ - if (fd >= d->fd_map_size) - enlarge_fd_map (d, fd); + d->fd_map_capacity = new_capacity; } -#endif static unsigned allocate_notifies_desired_index (RealDispatch *d) @@ -386,40 +372,51 @@ allocate_change_index (RealDispatch *d) static inline FDMap * get_fd_map (RealDispatch *d, ProtobufC_RPC_FD fd) { -#if HAVE_SMALL_FDS - if ((unsigned)fd >= d->fd_map_size) + uint32_t low = 0, high = d->fd_map_size, mid; + FDMap * map = NULL; + while (low < high) { + mid = (low + high) / 2; + map = d->fd_map + mid; + if (map->fd >= fd) + high = mid; + else + low = mid + 1; + } + map = d->fd_map + low; + if (!map || map->fd != fd) return NULL; - else - return d->fd_map + fd; -#else - FDMapNode *node; - GSK_RBTREE_LOOKUP_COMPARATOR (GET_FD_MAP_TREE (d), fd, COMPARE_FD_TO_FD_MAP_NODE, node); - return node ? &node->map : NULL; -#endif + return map; } + static inline FDMap * force_fd_map (RealDispatch *d, ProtobufC_RPC_FD fd) { -#if HAVE_SMALL_FDS - ensure_fd_map_big_enough (d, fd); - return d->fd_map + fd; -#else - { - FDMap *fm = get_fd_map (d, fd); - ProtobufCAllocator *allocator = d->allocator; - if (fm == NULL) - { - FDMapNode *node = ALLOC (sizeof (FDMapNode)); - FDMapNode *conflict; - node->fd = fd; - memset (&node->map, 255, sizeof (FDMap)); - GSK_RBTREE_INSERT (GET_FD_MAP_TREE (d), node, conflict); - assert (conflict == NULL); - fm = &node->map; - } - return fm; - } -#endif + FDMap *fm = get_fd_map (d, fd); + if (fm == NULL) + { + uint32_t low = 0, high = d->fd_map_size, mid; + FDMap * map = NULL; + if (d->fd_map_size == d->fd_map_capacity) + { + enlarge_fd_map(d); + } + while (low < high) + { + mid = (low + high) / 2; + map = d->fd_map + mid; + + if (map->fd >= fd) + high = mid; + else + low = mid + 1; + } + fm = d->fd_map + low; + memmove(fm + 1, fm, sizeof(FDMap) * (d->fd_map_size - low)); + memset(fm, 255, sizeof(FDMap)); + fm->fd = fd; + d->fd_map_size++; + } + return fm; } static void @@ -563,20 +560,48 @@ protobuf_c_rpc_dispatch_fd_closed(ProtobufCRPCDispatch *dispatch, } static void -free_timer (ProtobufCRPCDispatchTimer *timer) +remove_timer(RealDispatch *d, ProtobufCRPCDispatchTimer * timer) { - RealDispatch *d = timer->dispatch; - timer->right = d->recycled_timeouts; + if (timer == d->timer_list) + { + if (!timer->next) + d->timer_list = NULL; + else + d->timer_list = timer->next; + } + else if (timer->prev) + timer->prev->next = timer->next; + if (timer->next) + timer->next->prev = timer->prev; + + timer->next = d->recycled_timeouts; d->recycled_timeouts = timer; } +static void +insert_timer(RealDispatch *d, ProtobufCRPCDispatchTimer *timer, ProtobufCRPCDispatchTimer * after) +{ + if (after) { + if (after->next) + after->next->prev = timer; + timer->next = after->next; + timer->prev = after; + after->next = timer; + } + else { + timer->next = d->timer_list; + timer->prev = NULL; + d->timer_list->prev = timer; + d->timer_list = timer; + } +} + void protobuf_c_rpc_dispatch_dispatch (ProtobufCRPCDispatch *dispatch, size_t n_notifies, ProtobufC_RPC_FDNotify *notifies) { RealDispatch *d = (RealDispatch *) dispatch; - unsigned fd_max; unsigned i; struct timeval tv; @@ -586,24 +611,18 @@ protobuf_c_rpc_dispatch_dispatch (ProtobufCRPCDispatch *dispatch, protobuf_c_rpc_assert (!d->is_dispatching); d->is_dispatching = 1; - gettimeofday (&tv, NULL); + get_timestamp (&tv); dispatch->last_dispatch_secs = tv.tv_sec; dispatch->last_dispatch_usecs = tv.tv_usec; - - fd_max = 0; - for (i = 0; i < n_notifies; i++) - if (fd_max < (unsigned) notifies[i].fd) - fd_max = notifies[i].fd; - ensure_fd_map_big_enough (d, fd_max); for (i = 0; i < n_notifies; i++) - d->fd_map[notifies[i].fd].closed_since_notify_started = 0; + force_fd_map(d, notifies[i].fd)->closed_since_notify_started = 0; for (i = 0; i < n_notifies; i++) { unsigned fd = notifies[i].fd; - if (!d->fd_map[fd].closed_since_notify_started - && d->fd_map[fd].notify_desired_index != -1) + FDMap * fdmap = force_fd_map(d, fd); + if (fdmap->notify_desired_index != -1) { - unsigned nd_ind = d->fd_map[fd].notify_desired_index; + unsigned nd_ind = fdmap->notify_desired_index; unsigned events = d->base.notifies_desired[nd_ind].events & notifies[i].events; if (events != 0) d->callbacks[nd_ind].func (fd, events, d->callbacks[nd_ind].data); @@ -612,7 +631,7 @@ protobuf_c_rpc_dispatch_dispatch (ProtobufCRPCDispatch *dispatch, /* clear changes */ for (i = 0; i < dispatch->n_changes; i++) - d->fd_map[dispatch->changes[i].fd].change_index = -1; + force_fd_map(d, dispatch->changes[i].fd)->change_index = -1; dispatch->n_changes = 0; /* handle idle functions */ @@ -621,8 +640,12 @@ protobuf_c_rpc_dispatch_dispatch (ProtobufCRPCDispatch *dispatch, ProtobufCRPCDispatchIdle *idle = d->first_idle; ProtobufCRPCDispatchIdleFunc func = idle->func; void *data = idle->func_data; +#ifndef _MSC_VER GSK_LIST_REMOVE_FIRST (GET_IDLE_LIST (d)); - +#else + // MSVC 2008 (and possibly later versions) does not expand macros within macros as required by this code, so we'll do it by hand: + GSK_LIST_REMOVE_FIRST_(ProtobufCRPCDispatchIdle *, d->first_idle, d->last_idle, prev, next); +#endif idle->func = NULL; /* set to NULL to render remove_idle a no-op */ func (dispatch, data); @@ -632,23 +655,21 @@ protobuf_c_rpc_dispatch_dispatch (ProtobufCRPCDispatch *dispatch, dispatch->has_idle = 0; /* handle timers */ - while (d->timer_tree != NULL) + while (d->timer_list != NULL) { - ProtobufCRPCDispatchTimer *min_timer; - GSK_RBTREE_FIRST (GET_TIMER_TREE (d), min_timer); + ProtobufCRPCDispatchTimer *min_timer = d->timer_list; if (min_timer->timeout_secs < (unsigned long) tv.tv_sec || (min_timer->timeout_secs == (unsigned long) tv.tv_sec && min_timer->timeout_usecs <= (unsigned) tv.tv_usec)) { ProtobufCRPCDispatchTimerFunc func = min_timer->func; void *func_data = min_timer->func_data; - GSK_RBTREE_REMOVE (GET_TIMER_TREE (d), min_timer); + remove_timer(d, min_timer); /* Set to NULL as a way to tell protobuf_c_rpc_dispatch_remove_timer() that we are in the middle of notifying */ min_timer->func = NULL; min_timer->func_data = NULL; func (&d->base, func_data); - free_timer (min_timer); } else { @@ -658,7 +679,7 @@ protobuf_c_rpc_dispatch_dispatch (ProtobufCRPCDispatch *dispatch, break; } } - if (d->timer_tree == NULL) + if (d->timer_list == NULL) d->base.has_timeout = 0; /* Finish reentrance guard. */ @@ -679,6 +700,8 @@ protobuf_c_rpc_dispatch_clear_changes (ProtobufCRPCDispatch *dispatch) dispatch->n_changes = 0; } +#if USE_POLL + static inline unsigned events_to_pollfd_events (unsigned ev) { @@ -694,10 +717,20 @@ pollfd_events_to_events (unsigned ev) ; } +#endif + void protobuf_c_rpc_dispatch_run (ProtobufCRPCDispatch *dispatch) { +#if USE_POLL struct pollfd *fds; +#else + fd_set fdread; + fd_set fdwrite; + fd_set fdexcep; + int maxfd = 0; + struct timeval timeval_timeout; +#endif void *to_free = NULL, *to_free2 = NULL; size_t n_events; RealDispatch *d = (RealDispatch *) dispatch; @@ -705,8 +738,10 @@ protobuf_c_rpc_dispatch_run (ProtobufCRPCDispatch *dispatch) unsigned i; int timeout; ProtobufC_RPC_FDNotify *events; + +#if USE_POLL if (dispatch->n_notifies_desired < 128) - fds = alloca (sizeof (struct pollfd) * dispatch->n_notifies_desired); + fds = (struct pollfd*) alloca (sizeof (struct pollfd) * dispatch->n_notifies_desired); else to_free = fds = ALLOC (sizeof (struct pollfd) * dispatch->n_notifies_desired); for (i = 0; i < dispatch->n_notifies_desired; i++) @@ -715,6 +750,21 @@ protobuf_c_rpc_dispatch_run (ProtobufCRPCDispatch *dispatch) fds[i].events = events_to_pollfd_events (dispatch->notifies_desired[i].events); fds[i].revents = 0; } +#else + FD_ZERO(&fdread); + FD_ZERO(&fdwrite); + FD_ZERO(&fdexcep); + for (i = 0; i < dispatch->n_notifies_desired; i++) + { + if (dispatch->notifies_desired[i].events & PROTOBUF_C_RPC_EVENT_READABLE) + FD_SET(dispatch->notifies_desired[i].fd, &fdread); + if (dispatch->notifies_desired[i].events & PROTOBUF_C_RPC_EVENT_WRITABLE) + FD_SET(dispatch->notifies_desired[i].fd, &fdwrite); + FD_SET(dispatch->notifies_desired[i].fd, &fdexcep); + if (dispatch->notifies_desired[i].fd > maxfd) + maxfd = dispatch->notifies_desired[i].fd; + } +#endif /* compute timeout */ if (dispatch->has_idle) @@ -724,7 +774,7 @@ protobuf_c_rpc_dispatch_run (ProtobufCRPCDispatch *dispatch) else { struct timeval tv; - gettimeofday (&tv, NULL); + get_timestamp (&tv); if (dispatch->timeout_secs < (unsigned long) tv.tv_sec || (dispatch->timeout_secs == (unsigned long) tv.tv_sec && dispatch->timeout_usecs <= (unsigned) tv.tv_usec)) @@ -746,8 +796,12 @@ protobuf_c_rpc_dispatch_run (ProtobufCRPCDispatch *dispatch) timeout = ds * 1000 + (du + 999) / 1000; } } - +#if USE_POLL +#ifdef WIN32 + if (dispatch->n_notifies_desired && WSAPoll(fds, dispatch->n_notifies_desired, timeout) < 0) +#else if (poll (fds, dispatch->n_notifies_desired, timeout) < 0) +#endif { if (errno == EINTR) return; /* probably a signal interrupted the poll-- let the user have control */ @@ -760,12 +814,34 @@ protobuf_c_rpc_dispatch_run (ProtobufCRPCDispatch *dispatch) for (i = 0; i < dispatch->n_notifies_desired; i++) if (fds[i].revents) n_events++; +#else + if (timeout != -1) + { + timeval_timeout.tv_sec = timeout / 1000; + timeval_timeout.tv_usec = (timeout % 1000) * 1000; + } + if (dispatch->n_notifies_desired) + n_events = select(maxfd + 1, &fdread, &fdwrite, &fdexcep, timeout == -1 ? NULL : &timeval_timeout); + else + n_events = 0; + if (n_events == SOCKET_ERROR) + { + if (errno == EINTR) + return; /* probably a signal interrupted the poll-- let the user have control */ + + /* i don't really know what would plausibly cause this */ + fprintf (stderr, "select error: %s\n", strerror (errno)); + return; + } +#endif if (n_events < 128) - events = alloca (sizeof (ProtobufC_RPC_FDNotify) * n_events); + events = (ProtobufC_RPC_FDNotify*) alloca (sizeof (ProtobufC_RPC_FDNotify) * n_events); else to_free2 = events = ALLOC (sizeof (ProtobufC_RPC_FDNotify) * n_events); n_events = 0; for (i = 0; i < dispatch->n_notifies_desired; i++) + { +#if USE_POLL if (fds[i].revents) { events[n_events].fd = fds[i].fd; @@ -776,6 +852,20 @@ protobuf_c_rpc_dispatch_run (ProtobufCRPCDispatch *dispatch) if (events[n_events].events != 0) n_events++; } +#else + if (FD_ISSET(dispatch->notifies_desired[i].fd, &fdread) || + FD_ISSET(dispatch->notifies_desired[i].fd, &fdwrite)) + { + events[n_events].fd = dispatch->notifies_desired[i].fd; + events[n_events].events = 0; + if (FD_ISSET(dispatch->notifies_desired[i].fd, &fdread)) + events[n_events].events |= PROTOBUF_C_RPC_EVENT_READABLE; + if (FD_ISSET(dispatch->notifies_desired[i].fd, &fdwrite)) + events[n_events].events |= PROTOBUF_C_RPC_EVENT_WRITABLE; + n_events++; + } +#endif + } protobuf_c_rpc_dispatch_dispatch (dispatch, n_events, events); if (to_free) FREE (to_free); @@ -798,28 +888,37 @@ protobuf_c_rpc_dispatch_add_timer(ProtobufCRPCDispatch *dispatch, if (d->recycled_timeouts != NULL) { rv = d->recycled_timeouts; - d->recycled_timeouts = rv->right; + d->recycled_timeouts = rv->next; } else { rv = d->allocator->alloc (d->allocator, sizeof (ProtobufCRPCDispatchTimer)); } + rv->dispatch = d; rv->timeout_secs = timeout_secs; rv->timeout_usecs = timeout_usecs; rv->func = func; rv->func_data = func_data; - rv->dispatch = d; - GSK_RBTREE_INSERT (GET_TIMER_TREE (d), rv, conflict); - - /* is this the first element in the tree */ - for (at = rv; at != NULL; at = at->parent) - if (at->parent && at->parent->right == at) - break; - if (at == NULL) /* yes, so set the public members */ + rv->next = NULL; + rv->prev = NULL; + + if (!d->timer_list) { dispatch->has_timeout = 1; dispatch->timeout_secs = rv->timeout_secs; dispatch->timeout_usecs = rv->timeout_usecs; + d->timer_list = rv; + } + else + { + for (at = d->timer_list; at != NULL; at = at->next) { + if (compare_timers(rv, at) < 0) { + insert_timer(d, rv, at->prev); + break; + } + } + if (at == NULL) + insert_timer(d, rv, at); } return rv; } @@ -855,16 +954,15 @@ void protobuf_c_rpc_dispatch_remove_timer (ProtobufCRPCDispatchTimer *timer) may_be_first = d->base.timeout_usecs == timer->timeout_usecs && d->base.timeout_secs == timer->timeout_secs; - GSK_RBTREE_REMOVE (GET_TIMER_TREE (d), timer); + remove_timer(d, timer); if (may_be_first) { - if (d->timer_tree == NULL) + if (d->timer_list == NULL) d->base.has_timeout = 0; else { - ProtobufCRPCDispatchTimer *min; - GSK_RBTREE_FIRST (GET_TIMER_TREE (d), min); + ProtobufCRPCDispatchTimer *min = d->timer_list; d->base.timeout_secs = min->timeout_secs; d->base.timeout_usecs = min->timeout_usecs; } @@ -887,7 +985,13 @@ protobuf_c_rpc_dispatch_add_idle (ProtobufCRPCDispatch *dispatch, ProtobufCAllocator *allocator = d->allocator; rv = ALLOC (sizeof (ProtobufCRPCDispatchIdle)); } +#ifndef _MSC_VER GSK_LIST_APPEND (GET_IDLE_LIST (d), rv); +#else + // MSVC 2008 (and possibly later versions) does not expand macros within macros as required by this code, so we'll do it by hand: + GSK_LIST_APPEND_(ProtobufCRPCDispatchIdle *, d->first_idle, d->last_idle, prev, next, rv); +#endif + rv->func = func; rv->func_data = func_data; rv->dispatch = d; @@ -901,7 +1005,12 @@ protobuf_c_rpc_dispatch_remove_idle (ProtobufCRPCDispatchIdle *idle) if (idle->func != NULL) { RealDispatch *d = idle->dispatch; +#ifndef _MSC_VER GSK_LIST_REMOVE (GET_IDLE_LIST (d), idle); +#else + // MSVC 2008 (and possibly later versions) does not expand macros within macros as required by this code, so we'll do it by hand: + GSK_LIST_REMOVE_(ProtobufCRPCDispatchIdle *, d->first_idle, d->last_idle, prev, next, idle); +#endif idle->next = d->recycled_idles; d->recycled_idles = idle; } diff --git a/protobuf-c-rpc/protobuf-c-rpc-server.c b/protobuf-c-rpc/protobuf-c-rpc-server.c index 110c0e3..9516fd9 100644 --- a/protobuf-c-rpc/protobuf-c-rpc-server.c +++ b/protobuf-c-rpc/protobuf-c-rpc-server.c @@ -1,17 +1,32 @@ #include +#ifndef _WIN32_WCE #include +#include +#endif #include #include #include + +#ifdef WIN32 +#define _WINSOCK_DEPRECATED_NO_WARNINGS +#include +#include +#include "protobuf-c-rpc-win.h" +#else #include #include -#include #include +#endif #include "protobuf-c-rpc.h" #include "protobuf-c-rpc-data-buffer.h" #include "gsklistmacros.h" +#ifdef WIN32 +// Visual C++ knows about inline and __inline, but Visual C only knows about __inline. +#define inline __inline +#endif + #undef TRUE #define TRUE 1 #undef FALSE @@ -79,11 +94,13 @@ struct _ProtobufC_RPC_Server ServerRequest *recycled_requests; /* multithreading support */ +#ifndef WIN32 ProtobufC_RPC_IsRpcThreadFunc is_rpc_thread_func; void * is_rpc_thread_data; int proxy_pipe[2]; unsigned proxy_extra_data_len; uint8_t proxy_extra_data[sizeof (void*)]; +#endif ProtobufC_RPC_Error_Func error_handler; void *error_handler_data; @@ -107,10 +124,14 @@ errno_is_ignorable (int e) static void set_fd_nonblocking(int fd) { +#ifndef WIN32 /*LINUX*/ int flags = fcntl (fd, F_GETFL); - if (flags >= 0) fcntl (fd, F_SETFL, flags | O_NONBLOCK); +#else + unsigned long flags = 1UL; + ioctlsocket(fd, FIONBIO, &flags); +#endif } static void @@ -139,7 +160,12 @@ server_connection_close (ServerConnection *conn) protobuf_c_rpc_data_buffer_clear (&conn->outgoing); /* remove this connection from the server's list */ +#ifndef _MSC_VER GSK_LIST_REMOVE (GET_CONNECTION_LIST (conn->server), conn); +#else + // MSVC 2008 (and possibly later versions) does not expand macros within macros as required by this code, so we'll do it by hand: + GSK_LIST_REMOVE_(ServerConnection *, conn->server->first_connection, conn->server->last_connection, prev, next, conn); +#endif /* disassocate all the requests from the connection */ while (conn->first_pending_request != NULL) @@ -259,7 +285,12 @@ create_server_request (ServerConnection *conn, rv->request_id = request_id; rv->method_index = method_index; conn->n_pending_requests++; +#ifndef _MSC_VER GSK_LIST_APPEND (GET_PENDING_REQUEST_LIST (conn), rv); +#else + // MSVC 2008 (and possibly later versions) does not expand macros within macros as required by this code, so we'll do it by hand: + GSK_LIST_APPEND_ (ServerRequest *, conn->first_pending_request, conn->last_pending_request, info.alive.prev, info.alive.next, rv); +#endif return rv; } @@ -290,7 +321,7 @@ uint32_to_le (uint32_t le) } #define uint32_from_le uint32_to_le /* make the code more readable, i guess */ -static void handle_server_connection_events (int fd, +static void handle_server_connection_events (ProtobufC_RPC_FD fd, unsigned events, void *data); static void @@ -301,15 +332,18 @@ server_connection_response_closure (const ProtobufCMessage *message, ProtobufC_RPC_Server *server = request->server; protobuf_c_boolean must_proxy = 0; ProtobufCAllocator *allocator = server->allocator; + uint8_t buffer_slab[512]; + +#ifndef WIN32 if (server->is_rpc_thread_func != NULL) { must_proxy = !server->is_rpc_thread_func (server, server->dispatch, server->is_rpc_thread_data); } +#endif - - uint8_t buffer_slab[512]; + { ProtobufCBufferSimple buffer_simple = PROTOBUF_C_BUFFER_SIMPLE_INIT (buffer_slab); ProtobufC_RPC_Payload payload = { request->method_index, @@ -326,6 +360,7 @@ server_connection_response_closure (const ProtobufCMessage *message, return; } +#ifndef WIN32 if (must_proxy) { ProxyResponse *pr = allocator->alloc (allocator, sizeof (ProxyResponse) + buffer_simple.len); @@ -352,7 +387,9 @@ server_connection_response_closure (const ProtobufCMessage *message, allocator->free (allocator, pr); } } - else if (request->conn == NULL) + else +#endif + if (request->conn == NULL) { /* defunct request */ allocator->free (allocator, request); @@ -369,12 +406,19 @@ server_connection_response_closure (const ProtobufCMessage *message, PROTOBUF_C_RPC_EVENT_READABLE|PROTOBUF_C_RPC_EVENT_WRITABLE, handle_server_connection_events, conn); +#ifndef _MSC_VER GSK_LIST_REMOVE (GET_PENDING_REQUEST_LIST (conn), request); +#else + // MSVC 2008 (and possibly later versions) does not expand macros within macros as required by this code, so we'll do it by hand: + GSK_LIST_REMOVE_ (ServerRequest *, conn->first_pending_request, conn->last_pending_request, info.alive.prev, info.alive.next, request); +#endif + conn->n_pending_requests--; free_server_request (server, request); } PROTOBUF_C_BUFFER_SIMPLE_CLEAR (&buffer_simple); + } } static const ProtobufCMessageDescriptor * @@ -396,7 +440,7 @@ get_rcvd_message_descriptor (const ProtobufC_RPC_Payload *payload, void *data) } static void -handle_server_connection_events (int fd, +handle_server_connection_events (ProtobufC_RPC_FD fd, unsigned events, void *data) { @@ -451,6 +495,8 @@ handle_server_connection_events (int fd, else while (conn->incoming.size > 0) { + ServerRequest *server_request; + /* Deserialize the buffer */ ProtobufC_RPC_Payload payload = {0}; ProtobufC_RPC_Protocol_Status status = @@ -472,9 +518,9 @@ handle_server_connection_events (int fd, } /* Invoke service (note that it may call back immediately) */ - ServerRequest *server_request = create_server_request (conn, - payload.request_id, - payload.method_index); + server_request = create_server_request (conn, + payload.request_id, + payload.method_index); service->invoke (service, payload.method_index, payload.message, server_connection_response_closure, server_request); @@ -502,10 +548,11 @@ server_serialize (const ProtobufCServiceDescriptor *descriptor, ProtobufCBuffer *out_buffer, ProtobufC_RPC_Payload payload) { + uint32_t header[4]; + if (!out_buffer) return PROTOBUF_C_RPC_PROTOCOL_STATUS_FAILED; - uint32_t header[4]; if (!protobuf_c_message_check (payload.message)) { /* send failed response */ @@ -540,17 +587,18 @@ server_deserialize (const ProtobufCServiceDescriptor *descriptor, ProtobufC_RPC_Get_Descriptor get_descriptor, void *get_descriptor_data) { + uint32_t header[3]; + uint32_t message_length; + uint8_t *packed_data; + ProtobufCMessage *message; + const ProtobufCMessageDescriptor *desc; + if (!allocator || !in_buffer || !payload) return PROTOBUF_C_RPC_PROTOCOL_STATUS_FAILED; - uint32_t header[3]; if (in_buffer->size < sizeof (header)) return PROTOBUF_C_RPC_PROTOCOL_STATUS_INCOMPLETE_BUFFER; - uint32_t message_length; - uint8_t *packed_data; - ProtobufCMessage *message; - protobuf_c_rpc_data_buffer_peek (in_buffer, header, sizeof (header)); payload->method_index = uint32_from_le (header[0]); message_length = uint32_from_le (header[1]); @@ -560,8 +608,7 @@ server_deserialize (const ProtobufCServiceDescriptor *descriptor, if (in_buffer->size < sizeof (header) + message_length) return PROTOBUF_C_RPC_PROTOCOL_STATUS_INCOMPLETE_BUFFER; - const ProtobufCMessageDescriptor *desc = - get_descriptor (payload, get_descriptor_data); + desc = get_descriptor (payload, get_descriptor_data); if (!desc) return PROTOBUF_C_RPC_PROTOCOL_STATUS_FAILED; @@ -583,7 +630,7 @@ server_deserialize (const ProtobufCServiceDescriptor *descriptor, static void -handle_server_listener_readable (int fd, +handle_server_listener_readable (ProtobufC_RPC_FD fd, unsigned events, void *data) { @@ -608,7 +655,12 @@ handle_server_listener_readable (int fd, conn->n_pending_requests = 0; conn->first_pending_request = conn->last_pending_request = NULL; conn->server = server; +#ifndef _MSC_VER GSK_LIST_APPEND (GET_CONNECTION_LIST (server), conn); +#else + // MSVC 2008 (and possibly later versions) does not expand macros within macros as required by this code, so we'll do it by hand: + GSK_LIST_APPEND_ (ServerConnection *, server->first_connection, server->last_connection, prev, next, conn); +#endif protobuf_c_rpc_dispatch_watch_fd (server->dispatch, conn->fd, PROTOBUF_C_RPC_EVENT_READABLE, handle_server_connection_events, conn); } @@ -633,26 +685,32 @@ server_new_from_fd (ProtobufC_RPC_FD listening_fd, server->error_handler_data = "protobuf-c rpc server"; server->listening_fd = listening_fd; server->recycled_requests = NULL; +#ifndef WIN32 server->is_rpc_thread_func = NULL; server->is_rpc_thread_data = NULL; server->proxy_pipe[0] = server->proxy_pipe[1] = -1; server->proxy_extra_data_len = 0; - ProtobufC_RPC_Protocol default_rpc_protocol = {server_serialize, server_deserialize}; - server->rpc_protocol = default_rpc_protocol; - - size_t name_len = strlen (bind_name); - server->bind_name = allocator->alloc (allocator, name_len + 1); - if (!server->bind_name) - return NULL; - strncpy (server->bind_name, bind_name, name_len); - server->bind_name[name_len] = '\0'; - - set_fd_nonblocking (listening_fd); - protobuf_c_rpc_dispatch_watch_fd (dispatch, listening_fd, PROTOBUF_C_RPC_EVENT_READABLE, +#endif + { + size_t name_len; + ProtobufC_RPC_Protocol default_rpc_protocol = {server_serialize, server_deserialize}; + server->rpc_protocol = default_rpc_protocol; + + name_len = strlen (bind_name); + server->bind_name = allocator->alloc (allocator, name_len + 1); + if (!server->bind_name) + return NULL; + strncpy (server->bind_name, bind_name, name_len); + server->bind_name[name_len] = '\0'; + + set_fd_nonblocking (listening_fd); + protobuf_c_rpc_dispatch_watch_fd (dispatch, listening_fd, PROTOBUF_C_RPC_EVENT_READABLE, handle_server_listener_readable, server); - return server; + return server; + } } +#ifndef WIN32 /*LINUX*/ /* this function is for handling the common problem that we bind over-and-over again to the same unix path. @@ -707,6 +765,8 @@ _gsk_socket_address_local_maybe_delete_stale_socket (const char *path, fprintf (stderr, "unable to delete %s: %s\n", path, strerror(errno)); } +#endif + ProtobufC_RPC_Server * protobuf_c_rpc_server_new (ProtobufC_RPC_AddressType type, const char *name, @@ -717,10 +777,13 @@ protobuf_c_rpc_server_new (ProtobufC_RPC_AddressType type, int protocol_family; struct sockaddr *address; socklen_t address_len; +#ifndef WIN32 /*LINUX*/ struct sockaddr_un addr_un; +#endif struct sockaddr_in addr_in; switch (type) { +#ifndef WIN32 /*LINUX*/ case PROTOBUF_C_RPC_ADDRESS_LOCAL: protocol_family = PF_UNIX; memset (&addr_un, 0, sizeof (addr_un)); @@ -732,6 +795,7 @@ protobuf_c_rpc_server_new (ProtobufC_RPC_AddressType type, address, address_len); break; +#endif case PROTOBUF_C_RPC_ADDRESS_TCP: protocol_family = PF_INET; memset (&addr_in, 0, sizeof (addr_in)); @@ -777,8 +841,11 @@ protobuf_c_rpc_server_destroy (ProtobufC_RPC_Server *server, while (server->first_connection != NULL) server_connection_close (server->first_connection); - if (server->address_type == PROTOBUF_C_RPC_ADDRESS_LOCAL) +#ifndef WIN32 + if (server->address_type == PROTOBUF_C_RPC_ADDRESS_LOCAL) /* unix-domain socket */ unlink (server->bind_name); +#endif + server->allocator->free (server->allocator, server->bind_name); while (server->recycled_requests != NULL) @@ -798,6 +865,7 @@ protobuf_c_rpc_server_destroy (ProtobufC_RPC_Server *server, return rv; } +#ifndef WIN32 /* Number of proxied requests to try to grab in a single read */ #define PROXY_BUF_SIZE 256 static void @@ -838,7 +906,12 @@ handle_proxy_pipe_readable (ProtobufC_RPC_FD fd, PROTOBUF_C_RPC_EVENT_READABLE|PROTOBUF_C_RPC_EVENT_WRITABLE, handle_server_connection_events, conn); +#ifndef _MSC_VER GSK_LIST_REMOVE (GET_PENDING_REQUEST_LIST (conn), request); +#else + // MSVC 2008 (and possibly later versions) does not expand macros within macros as required by this code, so we'll do it by hand: + GSK_LIST_REMOVE_(ServerRequest *, conn->first_pending_request, conn->last_pending_request, info.alive.prev, info.alive.next, request); +#endif conn->n_pending_requests--; free_server_request (conn->server, request); @@ -873,6 +946,7 @@ protobuf_c_rpc_server_configure_threading (ProtobufC_RPC_Server *server, PROTOBUF_C_RPC_EVENT_READABLE, handle_proxy_pipe_readable, server); } +#endif void protobuf_c_rpc_server_set_error_handler (ProtobufC_RPC_Server *server, diff --git a/protobuf-c-rpc/protobuf-c-rpc.h b/protobuf-c-rpc/protobuf-c-rpc.h index c699c03..2fdd868 100644 --- a/protobuf-c-rpc/protobuf-c-rpc.h +++ b/protobuf-c-rpc/protobuf-c-rpc.h @@ -36,7 +36,9 @@ typedef enum { +#ifndef WIN32 PROTOBUF_C_RPC_ADDRESS_LOCAL, /* unix-domain socket */ +#endif PROTOBUF_C_RPC_ADDRESS_TCP /* host/port tcp socket */ } ProtobufC_RPC_AddressType; @@ -203,9 +205,12 @@ typedef protobuf_c_boolean (*ProtobufC_RPC_IsRpcThreadFunc) (ProtobufC_RPC_Server *server, ProtobufCRPCDispatch *dispatch, void *is_rpc_data); + +#ifndef WIN32 void protobuf_c_rpc_server_configure_threading (ProtobufC_RPC_Server *server, ProtobufC_RPC_IsRpcThreadFunc func, void *is_rpc_data); +#endif /* Error handling */