diff --git a/CHANGELOG.md b/CHANGELOG.md index d165b05..90e7baf 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,6 +14,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0. - Switched from nanomsg (Release 1.1.2) to NNG (Release v1.0.1) - Revert from NNG - Update to use nanomsg version 1.1.4 +- Add notify-stress example app ## [1.0.0] - 2018-06-19 ### Added diff --git a/examples/notify-stress/CMakeLists.txt b/examples/notify-stress/CMakeLists.txt new file mode 100755 index 0000000..e754d93 --- /dev/null +++ b/examples/notify-stress/CMakeLists.txt @@ -0,0 +1,162 @@ +# Copyright 2016 Comcast Cable Communications Management, LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +cmake_minimum_required(VERSION 2.8.7) + +project(notify-stress) +include(ExternalProject) + +set(INSTALL_DIR ${CMAKE_CURRENT_BINARY_DIR}/_install) +set(PREFIX_DIR ${CMAKE_CURRENT_BINARY_DIR}/_prefix) +set(INCLUDE_DIR ${INSTALL_DIR}/include) +set(LIBRARY_DIR ${INSTALL_DIR}/lib) +set(LIBRARY_DIR64 ${INSTALL_DIR}/lib64) +set(COMMON_LIBRARY_DIR ${INSTALL_DIR}/lib/${CMAKE_LIBRARY_ARCHITECTURE}) +set(BIN_DIR ${INSTALL_DIR}/bin) + +include_directories(${INCLUDE_DIR} + ${INCLUDE_DIR}/cjson + ${INCLUDE_DIR}/msgpack + ${INCLUDE_DIR}/trower-base64 + ${INCLUDE_DIR}/wrp-c + ${INCLUDE_DIR}/wdmp-c + ${INCLUDE_DIR}/libparodus + ${INCLUDE_DIR}/cimplog +) + +add_definitions(-std=c99) +set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -D_GNU_SOURCE ") + +# pthread external dependency +#------------------------------------------------------------------------------- +find_package (Threads) + +if (NOT BUILD_YOCTO) + +# base64 external dependency +#------------------------------------------------------------------------------- +ExternalProject_Add(trower-base64 + PREFIX ${CMAKE_CURRENT_BINARY_DIR}/_prefix/trower-base64 + GIT_REPOSITORY https://github.com/Comcast/trower-base64.git + GIT_TAG "v1.1.1" + CMAKE_ARGS += -DCMAKE_INSTALL_PREFIX=${INSTALL_DIR} -DBUILD_TESTING=OFF +) +add_library(libtrower-base64 STATIC SHARED IMPORTED) +add_dependencies(libtrower-base64 trower-base64) + +# nanoMsg external dependency +#------------------------------------------------------------------------------- +ExternalProject_Add(nanomsg + PREFIX ${CMAKE_CURRENT_BINARY_DIR}/_prefix/nanomsg + GIT_REPOSITORY https://github.com/nanomsg/nanomsg.git + GIT_TAG "1.1.4" + CMAKE_ARGS += -DCMAKE_INSTALL_PREFIX=${INSTALL_DIR} +) +add_library(libnanomsg STATIC SHARED IMPORTED) +add_dependencies(libnanomsg nanomsg) + + +# cJSON external dependency +#------------------------------------------------------------------------------- +ExternalProject_Add(cJSON + #PREFIX ${PREFIX_DIR}/cJSON + PREFIX ${CMAKE_CURRENT_BINARY_DIR}/_prefix/cJSON + GIT_REPOSITORY https://github.com/DaveGamble/cJSON.git + GIT_TAG "v1.7.8" + CMAKE_ARGS += -DCMAKE_INSTALL_PREFIX=${INSTALL_DIR} -DBUILD_TESTING=OFF +) +add_library(libcJSON STATIC SHARED IMPORTED) +add_dependencies(libcJSON cJSON) + + +# msgpack-c external dependency +#------------------------------------------------------------------------------- +ExternalProject_Add(msgpack + PREFIX ${CMAKE_CURRENT_BINARY_DIR}/_prefix/msgpack + GIT_REPOSITORY https://github.com/msgpack/msgpack-c.git + GIT_TAG "cpp-3.1.1" + CMAKE_ARGS += -DCMAKE_INSTALL_PREFIX=${INSTALL_DIR} + -DMSGPACK_ENABLE_CXX=OFF + -DMSGPACK_BUILD_EXAMPLES=OFF +) +add_library(libmsgpack STATIC SHARED IMPORTED) +add_dependencies(libmsgpack msgpack) + +# cimplog external dependency +#------------------------------------------------------------------------------- +ExternalProject_Add(cimplog + PREFIX ${CMAKE_CURRENT_BINARY_DIR}/_prefix/cimplog + GIT_REPOSITORY https://github.com/Comcast/cimplog.git + GIT_TAG "1.0.2" + CMAKE_ARGS += -DCMAKE_INSTALL_PREFIX=${INSTALL_DIR} -DBUILD_TESTING=OFF +) +add_library(libcimplog STATIC SHARED IMPORTED) +add_dependencies(libcimplog cimplog) + + +# wrp-c external dependency +#------------------------------------------------------------------------------- +ExternalProject_Add(wrp-c + DEPENDS trower-base64 msgpack cimplog + PREFIX ${CMAKE_CURRENT_BINARY_DIR}/_prefix/wrp-c + GIT_REPOSITORY https://github.com/Comcast/wrp-c.git + GIT_TAG "1.0.1" + CMAKE_ARGS += -DCMAKE_INSTALL_PREFIX=${INSTALL_DIR} + -DMSGPACK_ENABLE_CXX=OFF + -DMSGPACK_BUILD_EXAMPLES=OFF + -DBUILD_TESTING=OFF + -DMAIN_PROJ_BUILD=ON + -DMAIN_PROJ_LIB_PATH=${LIBRARY_DIR} + -DMAIN_PROJ_INCLUDE_PATH=${INCLUDE_DIR} +) +add_library(libwrp-c STATIC SHARED IMPORTED) +add_dependencies(libwrp-c wrp-c) + +# wdmp-c external dependency +#------------------------------------------------------------------------------- +ExternalProject_Add(wdmp-c + PREFIX ${CMAKE_CURRENT_BINARY_DIR}/_prefix/wdmp-c + GIT_REPOSITORY https://github.com/Comcast/wdmp-c.git + GIT_TAG "c227bfa1e129404f21fa0272f411c7a472a0f237" + CMAKE_ARGS += -DCMAKE_INSTALL_PREFIX=${INSTALL_DIR} + -DBUILD_TESTING=OFF +) +add_library(libwdmp-c STATIC SHARED IMPORTED) +add_dependencies(libwdmp-c wdmp-c) + +# libparodus external dependency +#------------------------------------------------------------------------------- +ExternalProject_Add(libparodus + DEPENDS trower-base64 msgpack nanomsg wrp-c + PREFIX ${CMAKE_CURRENT_BINARY_DIR}/_prefix/libparodus + #GIT_REPOSITORY https://github.com/Comcast/libparodus.git + #GIT_TAG "fcef57c79dcf9d2576aff481f1a61fe71ae93813" + GIT_REPOSITORY https://github.com/bill1600/libparodus.git + GIT_TAG "e3f632c7627fcac876ccf93a516dbb4ff38671f1" + CMAKE_ARGS += -DCMAKE_INSTALL_PREFIX=${INSTALL_DIR} + -DMAIN_PROJ_BUILD=ON + -DMAIN_PROJ_LIB_PATH=${LIBRARY_DIR} + -DMAIN_PROJ_LIB64_PATH=${LIBRARY_DIR64} + -DMAIN_PROJ_COMMON_PATH=${COMMON_LIBRARY_DIR} + -DMAIN_PROJ_INCLUDE_PATH=${INCLUDE_DIR} +) +add_library(liblibparodus STATIC SHARED IMPORTED) +add_dependencies(liblibparodus libparodus) + +endif () # NOT BUILD_YOCTO + + +link_directories ( ${LIBRARY_DIR} ${COMMON_LIBRARY_DIR} ${LIBRARY_DIR64} ) + +add_subdirectory(src) diff --git a/examples/notify-stress/src/CMakeLists.txt b/examples/notify-stress/src/CMakeLists.txt new file mode 100644 index 0000000..a699ad3 --- /dev/null +++ b/examples/notify-stress/src/CMakeLists.txt @@ -0,0 +1,33 @@ +# Copyright 2016 Comcast Cable Communications Management, LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +set(SOURCES notify-stress.c) + +add_executable(notify-stress ${SOURCES}) + +target_link_libraries (notify-stress + ${CMAKE_THREAD_LIBS_INIT} + -llibparodus + -lnanomsg + -lwrp-c + -lmsgpackc + -ltrower-base64 + -lm + -lcimplog + -lcjson + -lpthread + -lrt + ) + +install (TARGETS notify-stress DESTINATION bin) diff --git a/examples/notify-stress/src/notify-stress.c b/examples/notify-stress/src/notify-stress.c new file mode 100755 index 0000000..5d69cb4 --- /dev/null +++ b/examples/notify-stress/src/notify-stress.c @@ -0,0 +1,381 @@ +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include + +/*---------------------------------------------------------------------------- + * This is a test application to stress parodus by sending lots of notifications. + * options: + * -p + * -c + * -n + * + * num_msgs defaults to 1 + * when specified as 0, sends messages continuously + * ---------------------------------------------------------------------------*/ + + +/*----------------------------------------------------------------------------*/ +/* Macros */ +/*----------------------------------------------------------------------------*/ +#define CONTENT_TYPE_JSON "application/json" +#define CLIENT_URL "tcp://127.0.0.1:6668" +#define PARODUS_URL "tcp://127.0.0.1:6666" +#define MAX_PARAMETERNAME_LEN 512 +#define URL_SIZE 64 +#define LOGGING_MODULE "notify-stress" +#define OPTIONS_STRING_FORMAT "p:c:n:" +#define debug_error(...) cimplog_error(LOGGING_MODULE, __VA_ARGS__) +#define debug_info(...) cimplog_info(LOGGING_MODULE, __VA_ARGS__) +#define debug_print(...) cimplog_debug(LOGGING_MODULE, __VA_ARGS__) + +/*----------------------------------------------------------------------------*/ +/* File Scoped Variables */ +/*----------------------------------------------------------------------------*/ +static libpd_instance_t hpd_instance; +static struct option command_line_options[] = { + {"parodus_url", optional_argument, 0, 'p'}, + {"client_url", optional_argument, 0, 'c'}, + {"num_msgs", required_argument, 0, 'n'}, + {0,0,0,0} +}; +static char *parodus_url = NULL; +static char *client_url = NULL; +static unsigned num_msgs = 1; + +/*----------------------------------------------------------------------------*/ +/* Function Prototypes */ +/*----------------------------------------------------------------------------*/ +static void sig_handler(int sig); +static int libpd_client_mgr(void); +static int send_notification(char const *buff); +static int connect_parodus(void); +static void* parodus_receive_wait(void *vp); +static void start_parodus_receive_thread(void); +static int get_options( int argc, char **argv); +static void wait_random (void); +/*----------------------------------------------------------------------------*/ +/* External Functions */ +/*----------------------------------------------------------------------------*/ +int main( int argc, char **argv) +{ + debug_info ("RAND_MAX is %ld (0x%lx)\n", RAND_MAX, RAND_MAX); + srandom (getpid()); + + signal(SIGTERM, sig_handler); + signal(SIGINT, sig_handler); + signal(SIGUSR1, sig_handler); + signal(SIGUSR2, sig_handler); + signal(SIGSEGV, sig_handler); + signal(SIGBUS, sig_handler); + signal(SIGKILL, sig_handler); + signal(SIGFPE, sig_handler); + signal(SIGILL, sig_handler); + signal(SIGQUIT, sig_handler); + signal(SIGHUP, sig_handler); + signal(SIGALRM, sig_handler); + + if (get_options (argc, argv) != 0) + return 4; + + if (libpd_client_mgr() == 0) { + int i=0; + int sendStatus = 0; + while (sendStatus == 0) { + sendStatus = send_notification("Hello Parodus"); + i++; + if (num_msgs != 0) + if (i >= num_msgs) + break; + if ((i&0x1FF) == 0) + wait_random (); + } + } + + if( NULL != parodus_url ) { + free(parodus_url); + } + if( NULL != client_url ) { + free(client_url); + } + + return 0; +} + +/*----------------------------------------------------------------------------*/ +/* Internal functions */ +/*----------------------------------------------------------------------------*/ +// waits from 0 to 0x7FFFF (1048575) usecs +// 1 2 secs +static void wait_random (void) +{ + long usecs = random() >> 11; + struct timeval timeout; + if (usecs < 1000000l) { + timeout.tv_sec = 1; + timeout.tv_usec = usecs; + } else { + timeout.tv_sec = 2; + timeout.tv_usec = usecs - 1000000l; + } + debug_info ("wait %ld secs %ld usecs\n", timeout.tv_sec, timeout.tv_usec); + select (0, NULL, NULL, NULL, &timeout); +} + +unsigned int parse_num_arg (const char *arg, const char *arg_name) +{ + unsigned int result = 0; + int i; + char c; + + if (arg[0] == '\0') { + debug_error ("Empty %s argument\n", arg_name); + return (unsigned int) -1; + } + for (i=0; '\0' != (c=arg[i]); i++) + { + if ((c<'0') || (c>'9')) { + debug_error ("Non-numeric %s argument\n", arg_name); + return (unsigned int) -1; + } + result = (result*10) + c - '0'; + } + return result; +} + + +static int get_options( int argc, char **argv) +{ + int item; + int options_index = 0; + + while( -1 != (item = getopt_long(argc, argv, OPTIONS_STRING_FORMAT, + command_line_options, &options_index)) ) + { + switch( item ) { + case 'p': + parodus_url = strdup(optarg); + break; + case 'c': + client_url = strdup(optarg); + break; + case 'n': + num_msgs = parse_num_arg (optarg, "num_msgs"); + if (num_msgs == (unsigned int) -1) + return -1; + if (num_msgs > 1000000) { + debug_info ("Using 1000000 msgs\n"); + num_msgs = 1000000; + } + break; + default: + break; + } + } + return 0; +} + + +static void sig_handler(int sig) +{ + if( sig == SIGINT ) { + signal(SIGINT, sig_handler); /* reset it to this function */ + debug_info("SIGINT received!\n"); + exit(0); + } else if( sig == SIGUSR1 ) { + signal(SIGUSR1, sig_handler); /* reset it to this function */ + debug_info("SIGUSR1 received!\n"); + } else if( sig == SIGUSR2 ) { + debug_info("SIGUSR2 received!\n"); + } else if( sig == SIGCHLD ) { + signal(SIGCHLD, sig_handler); /* reset it to this function */ + debug_info("SIGHLD received!\n"); + } else if( sig == SIGPIPE ) { + signal(SIGPIPE, sig_handler); /* reset it to this function */ + debug_info("SIGPIPE received!\n"); + } else if( sig == SIGALRM ) { + signal(SIGALRM, sig_handler); /* reset it to this function */ + debug_info("SIGALRM received!\n"); + } else { + debug_info("Signal %d received!\n", sig); + exit(0); + } +} + +static int libpd_client_mgr() +{ + int status; + debug_print("Connect parodus, etc. \n"); + status = connect_parodus(); + if (status == 0) + start_parodus_receive_thread(); + return status; +} + +static int send_notification(char const* buff) +{ + wrp_msg_t *notif_wrp_msg = NULL; + int retry_count = 0; + int sendStatus = -1; + int backoffRetryTime = 0; + int c = 2; + int i; + char source[] = "mac:PCApplication"; + cJSON *notifyPayload = cJSON_CreateObject(); + char *stringifiedNotifyPayload; + char msg[4096]; + + /* Create JSON payload */ + for(i=0; i<8; i++) + msg[i] = '-'; + for (i=8; i<4088; i++) + msg[i] = 'X'; + for (i=4088; i<4096; i++) + msg[i] = '#'; + msg[4095] = 0; + cJSON_AddStringToObject(notifyPayload,"device_id", source); + cJSON_AddStringToObject(notifyPayload,"iot", buff); + cJSON_AddStringToObject(notifyPayload,"reboot-reason", msg); + + stringifiedNotifyPayload = cJSON_PrintUnformatted(notifyPayload); + + /* Create WRP message to send Parodus */ + notif_wrp_msg = (wrp_msg_t *)malloc(sizeof(wrp_msg_t)); + memset(notif_wrp_msg, 0, sizeof(wrp_msg_t)); + notif_wrp_msg ->msg_type = WRP_MSG_TYPE__EVENT; + notif_wrp_msg ->u.event.source = source; + notif_wrp_msg ->u.event.dest = "event:IOT_NOTIFICATION"; + notif_wrp_msg->u.event.content_type = CONTENT_TYPE_JSON; + notif_wrp_msg ->u.event.payload = stringifiedNotifyPayload; + notif_wrp_msg ->u.event.payload_size = strlen(stringifiedNotifyPayload); + + debug_print("buf: %s\n",buff); + debug_info("Notification payload %s\n",stringifiedNotifyPayload); + debug_print("source: %s\n",notif_wrp_msg ->u.event.source); + debug_print("destination: %s\n", notif_wrp_msg ->u.event.dest); + debug_print("content_type is %s\n",notif_wrp_msg->u.event.content_type); + + /* Send message to Parodus */ + while( retry_count <= 3 ) { + backoffRetryTime = (int) pow(2, c) -1; + sendStatus = libparodus_send(hpd_instance, notif_wrp_msg ); + if(sendStatus == 0) { + retry_count = 0; + debug_info("Notification successfully sent to parodus\n"); + break; + } else { + debug_error("Failed to send Notification: '%s', retrying ....\n",libparodus_strerror(sendStatus)); + debug_print("sendNotification backoffRetryTime %d seconds\n", backoffRetryTime); + sleep(backoffRetryTime); + c++; + retry_count++; + } + } + debug_print("sendStatus is %d\n",sendStatus); + free (notif_wrp_msg ); + free(stringifiedNotifyPayload); + cJSON_Delete(notifyPayload); + return sendStatus; +} + +static int connect_parodus() +{ + int backoffRetryTime = 0; + int backoff_max_time = 5; + int i; + int max_retry_sleep; + int c = 2; //Retry Backoff count shall start at c=2 & calculate 2^c - 1. + int retval = -1; + max_retry_sleep = (int) pow(2, backoff_max_time) -1; + debug_info("max_retry_sleep is %d\n", max_retry_sleep ); + + if( NULL == parodus_url ) { + parodus_url = strdup(PARODUS_URL); + } + if( NULL == client_url ) { + client_url = strdup(CLIENT_URL); + } + + libpd_cfg_t cfg = { .service_name = "hello-parodus", + .receive = true, + .keepalive_timeout_secs = 64, + .parodus_url = parodus_url, + .client_url = client_url + }; + + debug_info("Configurations => service_name : %s parodus_url : %s client_url : %s\n", cfg.service_name, cfg.parodus_url, cfg.client_url ); + (void)retval; + + for (i=0; i<2; i++) + { + if( backoffRetryTime < max_retry_sleep ) { + backoffRetryTime = (int) pow(2, c) -1; + } + debug_print("New backoffRetryTime value calculated as %d seconds\n", backoffRetryTime); + int ret = libparodus_init (&hpd_instance, &cfg); + if( ret ==0 ) { + debug_info("Init for parodus Success..!!\n"); + return 0; + } else { + debug_error("Init for parodus (url %s) failed: '%s'\n", cfg.parodus_url, libparodus_strerror(ret)); + sleep(backoffRetryTime); + c++; + + if( backoffRetryTime == max_retry_sleep ) { + c = 2; + backoffRetryTime = 0; + debug_print("backoffRetryTime reached max value, reseting to initial value\n"); + } + } + retval = libparodus_shutdown(&hpd_instance); + } + return -1; +} + +static void* parodus_receive_wait(void *vp) +{ + int rtn; + wrp_msg_t *wrp_msg; + + debug_print("parodus_receive_wait\n"); + while( 1 ) { + rtn = libparodus_receive(hpd_instance, &wrp_msg, 2000); + debug_print(" rtn = %d\n", rtn); + if( 0 == rtn ) { + debug_info("Got something from parodus.\n"); + } else if( 1 == rtn || 2 == rtn ) { + debug_info("Timed out or message closed.\n"); + continue; + } else { + debug_info("Libparodus failed to receive message: '%s'\n",libparodus_strerror(rtn)); + } + if( NULL != wrp_msg ) { + free(wrp_msg); + } + sleep(5); + } + libparodus_shutdown(&hpd_instance); + debug_print("End of parodus_upstream\n"); + return 0; +} + +static void start_parodus_receive_thread() +{ + int err = 0; + pthread_t threadId; + err = pthread_create(&threadId, NULL, parodus_receive_wait, NULL); + if( err != 0 ) { + debug_error("Error creating thread :[%s]\n", strerror(err)); + exit(1); + } else { + debug_print("Parodus Receive wait thread created Successfully %d\n", (int ) threadId); + } +}