Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Zephyr federated support #232

Draft
wants to merge 5 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions core/federated/clock-sync.c
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,15 @@ THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/

#ifdef FEDERATED
#ifdef PLATFORM_ZEPHYR
#else
#include <sys/socket.h>
#include <netinet/in.h>
#endif
#include <errno.h>
#include <math.h>
#include <string.h>
#include <stdlib.h>
#include <sys/socket.h>
#include <netinet/in.h>

#include "platform.h"
#include "clock-sync.h"
Expand Down
57 changes: 35 additions & 22 deletions core/federated/federate.c
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,11 @@ THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#ifdef FEDERATED
#ifdef PLATFORM_ARDUINO
#error To be implemented. No support for federation on Arduino yet.
#elif PLATFORM_ZEPHYR
#warning Federated support on Zephyr is still experimental.
#else
#include <arpa/inet.h> // inet_ntop & inet_pton
#include <netdb.h> // Defines gethostbyname().
#include <netdb.h> // Defines getaddrinfo(), freeaddrinfo() and struct addrinfo.
#include <netinet/in.h> // Defines struct sockaddr_in
#include <regex.h>
#include <strings.h> // Defines bzero().
Expand Down Expand Up @@ -1027,33 +1029,43 @@ void connect_to_rti(const char* hostname, int port) {
int result = -1;
int count_retries = 0;

struct addrinfo hints;
struct addrinfo *res;

memset(&hints, 0, sizeof(hints));
hints.ai_family = AF_INET; /* Allow IPv4 */
hints.ai_socktype = SOCK_STREAM; /* Stream socket */
hints.ai_protocol = IPPROTO_TCP; /* TCP protocol */
hints.ai_addr = NULL;
hints.ai_next = NULL;
hints.ai_flags = AI_NUMERICSERV; /* Allow only numeric port numbers */

while (result < 0) {
// Create an IPv4 socket for TCP (not UDP) communication over IP (0).
_fed.socket_TCP_RTI = socket(AF_INET, SOCK_STREAM, 0);
// Convert port number to string
char str[6];
sprintf(str,"%u",uport);

// Get address structure matching hostname and hints criteria, and
// set port to the port number provided in str. There should only
// ever be one matching address structure, and we connect to that.
int server = getaddrinfo(hostname, &str, &hints, &res);
if (server != 0) {
lf_print_error_and_exit("No host for RTI matching given hostname: %s", hostname);
}

// Create a socket matching hints criteria
_fed.socket_TCP_RTI = socket(res->ai_family, res->ai_socktype, res->ai_protocol);
if (_fed.socket_TCP_RTI < 0) {
lf_print_error_and_exit("Creating socket to RTI.");
lf_print_error_and_exit("Failed to create socket to RTI.");
}

struct hostent *server = gethostbyname(hostname);
if (server == NULL) {
lf_print_error_and_exit("ERROR, no such host for RTI: %s\n", hostname);
result = connect(_fed.socket_TCP_RTI, res->ai_addr, res->ai_addrlen);
if (result == 0) {
lf_print("Successfully connected to RTI.");
}
// Server file descriptor.
struct sockaddr_in server_fd;
// Zero out the server_fd struct.
bzero((char*)&server_fd, sizeof(server_fd));

// Set up the server_fd fields.
server_fd.sin_family = AF_INET; // IPv4
bcopy((char*)server->h_addr,
(char*)&server_fd.sin_addr.s_addr,
(size_t)server->h_length);
// Convert the port number from host byte order to network byte order.
server_fd.sin_port = htons(uport);
result = connect(
_fed.socket_TCP_RTI,
(struct sockaddr *)&server_fd,
sizeof(server_fd));
freeaddrinfo(res); /* No longer needed */

// If this failed, try more ports, unless a specific port was given.
if (result != 0
&& !specific_port_given
Expand Down Expand Up @@ -2404,6 +2416,7 @@ void terminate_execution() {
// possibility of deadlock. To ensure this, this
// function should NEVER be called while holding any mutex lock.
lf_mutex_lock(&outbound_socket_mutex);
// FIXME: Should this be _fed.number_of_outbound_p2p_connections instead?
for (int i=0; i < NUMBER_OF_FEDERATES; i++) {
// Close outbound connections, in case they have not closed themselves.
// This will result in EOF being sent to the remote federate, I think.
Expand Down
10 changes: 10 additions & 0 deletions core/federated/net_util.c
Original file line number Diff line number Diff line change
Expand Up @@ -576,6 +576,7 @@ void encode_tag(
* @return true if there is a match, false otherwise.
*/
bool match_regex(const char* str, char* regex) {
#ifndef PLATFORM_ZEPHYR
regex_t regex_compiled;
regmatch_t group;
bool valid = false;
Expand All @@ -591,6 +592,9 @@ bool match_regex(const char* str, char* regex) {
}
regfree(&regex_compiled);
return valid;
#else
return true;
siljesu marked this conversation as resolved.
Show resolved Hide resolved
#endif
}


Expand Down Expand Up @@ -637,6 +641,8 @@ bool validate_user(const char* user) {
return match_regex(user, username_regex);
}

#ifndef PLATFORM_ZEPHYR

/**
* Extract one match group from the rti_addr regex .
* @return true if SUCCESS, else false.
Expand Down Expand Up @@ -671,10 +677,13 @@ bool extract_match_groups(const char* rti_addr, char** rti_addr_strs, bool** rti
return true;
}

#endif

/**
* Extract the host, port and user from rti_addr.
*/
void extract_rti_addr_info(const char* rti_addr, rti_addr_info_t* rti_addr_info) {
#ifndef PLATFORM_ZEPHYR
const char* regex_str = "(([a-zA-Z0-9_-]{1,254})@)?([a-zA-Z0-9.]{1,255})(:([0-9]{1,5}))?";
size_t max_groups = 6;
// The group indices of each field of interest in the regex.
Expand Down Expand Up @@ -710,5 +719,6 @@ void extract_rti_addr_info(const char* rti_addr, rti_addr_info_t* rti_addr_info)
}
}
regfree(&regex_compiled);
#endif
}
#endif
33 changes: 32 additions & 1 deletion core/platform/lf_zephyr_support.c
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,7 @@ int lf_notify_of_event() {
#warning "Threaded support on Zephyr is still experimental."

// FIXME: What is an appropriate stack size?
#define _LF_STACK_SIZE 1024
#define _LF_STACK_SIZE 4096
// FIXME: What is an appropriate thread prio?
#define _LF_THREAD_PRIORITY 5

Expand All @@ -363,7 +363,38 @@ int lf_notify_of_event() {
#define USER_THREADS 0
#endif

#if defined(FEDERATED) && defined(FEDERATED_DECENTRALIZED)
#define RTI_SOCKET_LISTENER_THREAD 1
#define FEDERATE_SOCKET_LISTENER_THREADS NUMBER_OF_FEDERATES
#define P2P_HANDLER_THREAD 1

#elif defined(FEDERATED) && defined(FEDERATED_CENTRALIZED)
#define RTI_SOCKET_LISTENER_THREAD 1
#define FEDERATE_SOCKET_LISTENER_THREADS 0
#define P2P_HANDLER_THREAD 0

#else
#define RTI_SOCKET_LISTENER_THREAD 0
#define FEDERATE_SOCKET_LISTENER_THREADS 0
#define P2P_HANDLER_THREAD 0
#endif

#if defined(FEDERATED) && defined(_LF_CLOCK_SYNC_ON)
#define CLOCK_SYNC_THREAD 1
#else
#define CLOCK_SYNC_THREAD 0
#endif

#ifndef WORKERS_NEEDED_FOR_FEDERATE
#define WORKERS_NEEDED_FOR_FEDERATE 0
#endif

#define NUMBER_OF_THREADS (NUMBER_OF_WORKERS \
+ WORKERS_NEEDED_FOR_FEDERATE \
+ RTI_SOCKET_LISTENER_THREAD \
+ FEDERATE_SOCKET_LISTENER_THREADS \
+ P2P_HANDLER_THREAD \
+ CLOCK_SYNC_THREAD \
+ USER_THREADS)

K_MUTEX_DEFINE(thread_mutex);
Expand Down
5 changes: 5 additions & 0 deletions include/core/federated/net_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

#ifdef PLATFORM_ARDUINO
#error To be implemented. No support for federation on Arduino yet.
#elif PLATFORM_ZEPHYR
#warning Federated support on Zephyr is still experimental.
#else
#include <sys/socket.h>
#include <regex.h>
Expand Down Expand Up @@ -345,6 +347,7 @@ bool validate_host(const char* host);
*/
bool validate_user(const char* user);

#ifndef PLATFORM_ZEPHYR

/**
* Extract one match group from the rti_addr regex .
Expand All @@ -359,6 +362,8 @@ bool extract_match_group(const char* rti_addr, char* dest, regmatch_t group,
*/
bool extract_match_groups(const char* rti_addr, char** rti_addr_strs, bool** rti_addr_flags, regmatch_t* group_array, int* gids, int* max_lens, int* min_lens, const char** err_msgs);

#endif

/**
* Extract the host, port and user from rti_addr.
*/
Expand Down
2 changes: 2 additions & 0 deletions include/core/platform/lf_zephyr_support.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#include <stdlib.h> //malloc, calloc, free, realloc

#include <zephyr/kernel.h>
#include <zephyr/posix/sys/socket.h>
#include <zephyr/posix/netdb.h>

#define NO_TTY
#define _LF_TIMEOUT 1
Expand Down