Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Initial implementation of TURN TCP transport between client and server #111

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ set(LIBJUICE_SOURCES
${CMAKE_CURRENT_SOURCE_DIR}/src/timestamp.c
${CMAKE_CURRENT_SOURCE_DIR}/src/turn.c
${CMAKE_CURRENT_SOURCE_DIR}/src/udp.c
${CMAKE_CURRENT_SOURCE_DIR}/src/transport.c
)

set(LIBJUICE_HEADERS
Expand Down
8 changes: 8 additions & 0 deletions include/juice/juice.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,13 @@ extern "C" {
#define JUICE_MAX_CANDIDATE_SDP_STRING_LEN 256
#define JUICE_MAX_SDP_STRING_LEN 4096

typedef enum juice_transport {
JUICE_TRANSPORT_NONE,
JUICE_TRANSPORT_UDP,
JUICE_TRANSPORT_TCP,
JUICE_TRANSPORT_TLS
} juice_transport_t;

typedef struct juice_agent juice_agent_t;

typedef enum juice_state {
Expand All @@ -72,6 +79,7 @@ typedef struct juice_turn_server {
const char *username;
const char *password;
uint16_t port;
juice_transport_t transport;
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is an issue now as it breaks the library interface and would require releasing a new major version.

} juice_turn_server_t;

typedef struct juice_config {
Expand Down
179 changes: 153 additions & 26 deletions src/agent.c
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include "stun.h"
#include "turn.h"
#include "udp.h"
#include "transport.h"

#include <assert.h>
#include <inttypes.h>
Expand Down Expand Up @@ -408,7 +409,7 @@ int agent_send(juice_agent_t *agent, const char *data, size_t size, int ds) {
return ret;
}

return agent_direct_send(agent, &selected_entry->record, data, size, ds);
return agent_direct_send2(agent, selected_entry, data, size, ds);
}

int agent_direct_send(juice_agent_t *agent, const addr_record_t *dst, const char *data, size_t size,
Expand Down Expand Up @@ -436,6 +437,35 @@ int agent_direct_send(juice_agent_t *agent, const addr_record_t *dst, const char
return ret;
}

int agent_direct_send2(juice_agent_t *agent, const agent_stun_entry_t *entry, const char *data,
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The name agent_direct_send2 is not very explicit, is there a reason for not modifying agent_direct_send() since they have the same signature?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I modify the parameters of the function, but I don't want to break other code that relies on the original function

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, you changed dst to entry, my bad. You could call it something more explicit then, like agent_transport_send.

size_t size, int ds) {
mutex_lock(&agent->send_mutex);

if (agent->send_ds >= 0 && agent->send_ds != ds) {
JLOG_VERBOSE("Setting Differentiated Services field to 0x%X", ds);
if (udp_set_diffserv(agent->sock, ds) == 0)
agent->send_ds = ds;
else
agent->send_ds = -1; // disable for next time
}

JLOG_VERBOSE("Sending datagram, size=%d", size);

int ret;
if (entry->sock == INVALID_SOCKET) {
ret = udp_sendto(agent->sock, data, size, &entry->record);
}
else {
ret = transport_sendto(entry->sock, data, size, &entry->record);
}

if (ret < 0 && sockerrno != SEAGAIN && sockerrno != SEWOULDBLOCK)
JLOG_WARN("Send failed, errno=%d", sockerrno);

mutex_unlock(&agent->send_mutex);
return ret;
}

int agent_relay_send(juice_agent_t *agent, agent_stun_entry_t *entry, const addr_record_t *dst,
const char *data, size_t size, int ds) {
if (!entry->turn) {
Expand Down Expand Up @@ -467,8 +497,11 @@ int agent_relay_send(juice_agent_t *agent, agent_stun_entry_t *entry, const addr
JLOG_ERROR("STUN message write failed");
return -1;
}

return agent_direct_send(agent, &entry->record, buffer, size, ds);
if (agent_direct_send2(agent, entry, buffer, size, ds) < 0) {
JLOG_WARN("STUN message send failed, errno=%d", sockerrno);
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I cleaned up the sockerrno after agent_direct_send() as it's a bit clumsy (the function could internally make a last system call modifying errno), so you should do the same here.

return -1;
}
return 0;
}

int agent_channel_send(juice_agent_t *agent, agent_stun_entry_t *entry, const addr_record_t *record,
Expand All @@ -493,8 +526,11 @@ int agent_channel_send(juice_agent_t *agent, agent_stun_entry_t *entry, const ad
JLOG_ERROR("TURN ChannelData wrapping failed");
return -1;
}

return agent_direct_send(agent, &entry->record, buffer, len, ds);
if (agent_direct_send2(agent, entry, buffer, len, ds) < 0) {
JLOG_WARN("ChannelData message send failed, errno=%d", sockerrno);
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here about sockerrno.

return -1;
}
return 0;
}

juice_state_t agent_get_state(juice_agent_t *agent) {
Expand Down Expand Up @@ -580,6 +616,34 @@ void agent_run(juice_agent_t *agent) {
snprintf(entry->turn->credentials.username, STUN_MAX_USERNAME_LEN, "%s",
turn_server->username);
entry->turn->password = turn_server->password;

transport_socket_config_t socket_config;
memset(&socket_config, 0, sizeof(socket_config));

entry->transport = turn_server->transport;
if (entry->transport) {
socket_config.bind_address = agent->config.bind_address;
socket_config.port_begin = agent->config.local_port_range_begin;
socket_config.port_end = agent->config.local_port_range_end;
switch (turn_server->transport) {
case JUICE_TRANSPORT_UDP:
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does UDP transport mean for the user? If I get it correctly, it opens a dedicated UDP socket to reach the server instead of using the one of the agent. What is the point of this mechanism?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In my mind, the ideal is to replace the udp .c completely with transport .c, UDP, TCP and TLS are supported.

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. While this is smart for factoring the code, it will cause issues, for instance reflexive candidates from TURN UDP servers will be incorrect, as they'll reflect the external address for the transport UDP socket instead of the agent one. I think the only way to keep them working is limiting the transport to TCP and use the agent socket for UDP, sadly.

socket_config.type = SOCKET_UDP;
break;
case JUICE_TRANSPORT_TCP:
socket_config.type = SOCKET_TCP;
break;
default:
JLOG_ERROR("Unknown transport type");
}

socket_config.family = record->addr.ss_family;
entry->sock = transport_create_socket(&socket_config);
transport_connect(entry->sock, record);
}
else {
entry->sock = INVALID_SOCKET;
}

juice_random(entry->transaction_id, STUN_TRANSACTION_ID_SIZE);
++agent->entries_count;

Expand Down Expand Up @@ -619,8 +683,11 @@ void agent_run(juice_agent_t *agent) {
entry->state = AGENT_STUN_ENTRY_STATE_PENDING;
entry->pair = NULL;
entry->record = records[i];
entry->transport = JUICE_TRANSPORT_NONE;
entry->sock = INVALID_SOCKET;
juice_random(entry->transaction_id, STUN_TRANSACTION_ID_SIZE);
++agent->entries_count;
entry->sock = INVALID_SOCKET;

agent_arm_transmission(agent, entry, STUN_PACING_TIME * i);
}
Expand All @@ -647,6 +714,14 @@ void agent_run(juice_agent_t *agent) {
FD_ZERO(&readfds);
FD_SET(agent->sock, &readfds);
int n = SOCKET_TO_INT(agent->sock) + 1;
for (int i = 0; i < agent->entries_count; ++i) {
agent_stun_entry_t *entry = agent->entries + i;
if (entry->sock == INVALID_SOCKET) continue;

int n2 = SOCKET_TO_INT(entry->sock) + 1;
if (n2 > n) n = n2;
FD_SET(entry->sock, &readfds);
}

JLOG_VERBOSE("Entering select");
mutex_unlock(&agent->mutex);
Expand All @@ -669,21 +744,52 @@ void agent_run(juice_agent_t *agent) {
}

if (FD_ISSET(agent->sock, &readfds)) {
if (agent_recv(agent) < 0)
if (agent_recv(agent, (agent_stun_entry_t *)NULL) < 0)
break;
}

for (int i = 0; i < agent->entries_count; ++i) {
agent_stun_entry_t *entry = agent->entries + i;
if (entry->sock == INVALID_SOCKET) continue;
if (FD_ISSET(entry->sock, &readfds)) {
if (agent_recv(agent, entry) < 0) {
entry->state = AGENT_STUN_ENTRY_STATE_FAILED;
entry->sock = INVALID_SOCKET;
}
}
}
}
JLOG_DEBUG("Leaving agent thread");
agent_change_state(agent, JUICE_STATE_DISCONNECTED);
mutex_unlock(&agent->mutex);
}

int agent_recv(juice_agent_t *agent) {
int agent_recv(juice_agent_t *agent, agent_stun_entry_t *entry) {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There could be a dedicated function to receive from entry transport as the logic looks quite decorelated.

JLOG_VERBOSE("Receiving datagrams");

addr_record_t record;
if (entry && entry->transport == JUICE_TRANSPORT_TCP) {
// len should be initialized to indicate the amount of space
record.len = sizeof(struct sockaddr_storage);
transport_getpeername(entry->sock, &record);
}

while (true) {
char buffer[BUFFER_SIZE];
addr_record_t record;
int len = udp_recvfrom(agent->sock, buffer, BUFFER_SIZE, &record);
int len;

if (!entry) {
len = udp_recvfrom(agent->sock, buffer, BUFFER_SIZE, &record);
}
else {
if (entry->transport == JUICE_TRANSPORT_TCP) {
len = transport_recv(entry->sock, buffer, BUFFER_SIZE);
}
else {
len = transport_recvfrom(entry->sock, buffer, BUFFER_SIZE, &record);
}
}

if (len < 0) {
if (sockerrno == SEAGAIN || sockerrno == SEWOULDBLOCK) {
JLOG_VERBOSE("No more datagrams to receive");
Expand All @@ -708,7 +814,8 @@ int agent_input(juice_agent_t *agent, char *buf, size_t len, const addr_record_t
const addr_record_t *relayed) {
JLOG_VERBOSE("Received datagram, size=%d", len);

if (is_stun_datagram(buf, len)) {
for (;;) {
if (!is_stun_datagram2(buf, len)) break;
if (JLOG_DEBUG_ENABLED) {
char src_str[ADDR_MAX_STRING_LEN];
addr_record_to_string(src, src_str, ADDR_MAX_STRING_LEN);
Expand All @@ -721,11 +828,17 @@ int agent_input(juice_agent_t *agent, char *buf, size_t len, const addr_record_t
}
}
stun_message_t msg;
if (stun_read(buf, len, &msg) < 0) {
int msg_size = stun_read(buf, len, &msg);
if (msg_size < 0) {
JLOG_ERROR("STUN message reading failed");
return -1;
}
return agent_dispatch_stun(agent, buf, len, &msg, src, relayed);
int ret = agent_dispatch_stun(agent, buf, len, &msg, src, relayed);
if (ret != 0) return ret;

buf = buf + msg_size;
len = len - msg_size;
if (len == 0) return 0;
}

if (JLOG_DEBUG_ENABLED) {
Expand Down Expand Up @@ -815,6 +928,9 @@ int agent_bookkeeping(juice_agent_t *agent, timestamp_t *next_timestamp) {
continue;

if (entry->retransmissions >= 0) {
if (entry->sock != INVALID_SOCKET && entry->transport == JUICE_TRANSPORT_TCP)
transport_wait_for_connected(entry->sock, NULL);
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think blocking the whole thread until the transport connects is not acceptable, it would be better to asynchronously wait for the connection.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right. I'll think about ways to do it asynchronously.


if (JLOG_DEBUG_ENABLED) {
char record_str[ADDR_MAX_STRING_LEN];
addr_record_to_string(&entry->record, record_str, ADDR_MAX_STRING_LEN);
Expand Down Expand Up @@ -1575,8 +1691,8 @@ int agent_send_stun_binding(juice_agent_t *agent, agent_stun_entry_t *entry, stu
}

// Direct send
if (agent_direct_send(agent, &entry->record, buffer, size, 0) < 0) {
JLOG_WARN("STUN message send failed");
if (agent_direct_send2(agent, entry, buffer, size, 0) < 0) {
JLOG_WARN("STUN message send failed, errno=%d", sockerrno);
return -1;
}
return 0;
Expand All @@ -1586,7 +1702,6 @@ int agent_process_turn_allocate(juice_agent_t *agent, const stun_message_t *msg,
agent_stun_entry_t *entry) {
if (msg->msg_method != STUN_METHOD_ALLOCATE && msg->msg_method != STUN_METHOD_REFRESH)
return -1;

if (entry->type != AGENT_STUN_ENTRY_TYPE_RELAY) {
JLOG_WARN("Received TURN %s message for a non-relay entry, ignoring",
msg->msg_method == STUN_METHOD_ALLOCATE ? "Allocate" : "Refresh");
Expand Down Expand Up @@ -1633,9 +1748,12 @@ int agent_process_turn_allocate(juice_agent_t *agent, const stun_message_t *msg,
JLOG_INFO("Got STUN mapped address %s from TURN server", mapped_str);
}

if (agent_add_local_reflexive_candidate(agent, ICE_CANDIDATE_TYPE_SERVER_REFLEXIVE,
&msg->mapped)) {
JLOG_WARN("Failed to add local peer reflexive candidate from TURN mapped address");
if (entry->transport != JUICE_TRANSPORT_TCP) {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that if the entry uses a secondary UDP socket for some reason, the reflexive candidate is useless anyway.

if (agent_add_local_reflexive_candidate(agent, ICE_CANDIDATE_TYPE_SERVER_REFLEXIVE,
&msg->mapped)) {
JLOG_WARN("Failed to add local peer reflexive candidate from",
"TURN mapped address");
}
}
}

Expand Down Expand Up @@ -1757,8 +1875,8 @@ int agent_send_turn_allocate_request(juice_agent_t *agent, const agent_stun_entr
JLOG_ERROR("STUN message write failed");
return -1;
}
if (agent_direct_send(agent, &entry->record, buffer, size, 0) < 0) {
JLOG_WARN("STUN message send failed");
if (agent_direct_send2(agent, entry, buffer, size, 0) < 0) {
JLOG_WARN("STUN message send failed, errno=%d", sockerrno);
return -1;
}
return 0;
Expand Down Expand Up @@ -1853,8 +1971,8 @@ int agent_send_turn_create_permission_request(juice_agent_t *agent, agent_stun_e
JLOG_ERROR("STUN message write failed");
return -1;
}
if (agent_direct_send(agent, &entry->record, buffer, size, ds) < 0) {
JLOG_WARN("STUN message send failed");
if (agent_direct_send2(agent, entry, buffer, size, ds) < 0) {
JLOG_WARN("STUN message send failed, errno=%d", sockerrno);
return -1;
}
return 0;
Expand Down Expand Up @@ -1960,8 +2078,8 @@ int agent_send_turn_channel_bind_request(juice_agent_t *agent, agent_stun_entry_
JLOG_ERROR("STUN message write failed");
return -1;
}
if (agent_direct_send(agent, &entry->record, buffer, size, ds) < 0) {
JLOG_WARN("STUN message send failed");
if (agent_direct_send2(agent, entry, buffer, size, ds) < 0) {
JLOG_WARN("STUN message send failed, errno=%d", sockerrno);
return -1;
}
return 0;
Expand Down Expand Up @@ -2183,6 +2301,8 @@ int agent_add_candidate_pair(juice_agent_t *agent, ice_candidate_t *local, // lo
entry->pair = pos;
entry->record = pos->remote->resolved;
entry->relay_entry = relay_entry;
entry->transport = JUICE_TRANSPORT_NONE;
entry->sock = INVALID_SOCKET;
juice_random(entry->transaction_id, STUN_TRANSACTION_ID_SIZE);
++agent->entries_count;

Expand Down Expand Up @@ -2266,8 +2386,15 @@ void agent_arm_transmission(juice_agent_t *agent, agent_stun_entry_t *entry, tim
bool limit = agent->selected_pair &&
(agent->selected_pair->nominated || (agent->selected_pair != entry->pair &&
agent->mode == AGENT_MODE_CONTROLLING));
entry->retransmissions = limit ? 1 : MAX_STUN_RETRANSMISSION_COUNT;
entry->retransmission_timeout = MIN_STUN_RETRANSMISSION_TIMEOUT;
if (entry->sock != INVALID_SOCKET && entry->transport != JUICE_TRANSPORT_TCP) {
entry->retransmissions = limit ? 1 : MAX_STUN_RETRANSMISSION_COUNT;
entry->retransmission_timeout = MIN_STUN_RETRANSMISSION_TIMEOUT;
}
else {
entry->retransmissions = 1;
entry->retransmission_timeout = MIN_STUN_RETRANSMISSION_TIMEOUT *
MAX_STUN_RETRANSMISSION_COUNT;
}
}

// Find a time slot
Expand Down
7 changes: 6 additions & 1 deletion src/agent.h
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,9 @@ typedef struct agent_stun_entry {
#else
atomic_flag armed;
#endif

juice_transport_t transport;
socket_t sock;
} agent_stun_entry_t;

struct juice_agent {
Expand Down Expand Up @@ -169,6 +172,8 @@ int agent_set_remote_gathering_done(juice_agent_t *agent);
int agent_send(juice_agent_t *agent, const char *data, size_t size, int ds);
int agent_direct_send(juice_agent_t *agent, const addr_record_t *dst, const char *data, size_t size,
int ds);
int agent_direct_send2(juice_agent_t *agent, const agent_stun_entry_t *entry, const char *data,
size_t size, int ds);
int agent_relay_send(juice_agent_t *agent, agent_stun_entry_t *entry, const addr_record_t *dst,
const char *data, size_t size, int ds);
int agent_channel_send(juice_agent_t *agent, agent_stun_entry_t *entry, const addr_record_t *dst,
Expand All @@ -178,7 +183,7 @@ int agent_get_selected_candidate_pair(juice_agent_t *agent, ice_candidate_t *loc
ice_candidate_t *remote);

void agent_run(juice_agent_t *agent);
int agent_recv(juice_agent_t *agent);
int agent_recv(juice_agent_t *agent, agent_stun_entry_t *entry);
int agent_input(juice_agent_t *agent, char *buf, size_t len, const addr_record_t *src,
const addr_record_t *relayed); // relayed may be NULL
int agent_interrupt(juice_agent_t *agent);
Expand Down
Loading