Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Stop a test if one of the threads terminated because of an error #1654

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/iperf.h
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,7 @@ struct iperf_stream
int remote_port;
int socket;
int id;
int thread_number;
int sender;
/* XXX: is settings just a pointer to the same struct in iperf_test? if not,
should it be? */
Expand Down
1 change: 1 addition & 0 deletions src/iperf_api.c
Original file line number Diff line number Diff line change
Expand Up @@ -4457,6 +4457,7 @@ iperf_new_stream(struct iperf_test *test, int s, int sender)
return NULL;
}
sp->pending_size = 0;
sp->thread_number = 0;

/* Set socket */
sp->socket = s;
Expand Down
1 change: 1 addition & 0 deletions src/iperf_api.h
Original file line number Diff line number Diff line change
Expand Up @@ -475,6 +475,7 @@ enum {
IEPTHREADJOIN=152, // Unable to join thread (check perror)
IEPTHREADATTRINIT=153, // Unable to initialize thread attribute (check perror)
IEPTHREADATTRDESTROY=154, // Unable to destroy thread attribute (check perror)
IEPTHREADNOTRUNNING=155, // A thread stopped running unexpectedly
/* Stream errors */
IECREATESTREAM = 200, // Unable to create a new stream (check herror/perror)
IEINITSTREAM = 201, // Unable to initialize stream (check herror/perror)
Expand Down
141 changes: 94 additions & 47 deletions src/iperf_client_api.c
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@
#endif /* TCP_CA_NAME_MAX */
#endif /* HAVE_TCP_CONGESTION */

// variable for number of active threads count (for checking if any failed)
static volatile int running_threads = 0;
static pthread_mutex_t running_mutex = PTHREAD_MUTEX_INITIALIZER;

void *
iperf_client_worker_run(void *s) {
struct iperf_stream *sp = (struct iperf_stream *) s;
Expand All @@ -75,6 +79,12 @@ iperf_client_worker_run(void *s) {
return NULL;

cleanup_and_fail:
if (test->debug_level >= DEBUG_LEVEL_INFO) {
iperf_printf(sp->test, "Thread number %d FD %d terminated unexpectedly\n", sp->thread_number, sp->socket);
}
pthread_mutex_lock(&running_mutex);
running_threads--; // Indicate that the thread failed
pthread_mutex_unlock(&running_mutex);
return NULL;
}

Expand Down Expand Up @@ -545,6 +555,7 @@ iperf_run_client(struct iperf_test * test)
int64_t timeout_us;
int64_t rcv_timeout_us;
int i_errno_save;
int total_num_streams = 0;

if (NULL == test)
{
Expand Down Expand Up @@ -678,13 +689,23 @@ iperf_run_client(struct iperf_test * test)
goto cleanup_and_fail;
}

pthread_mutex_lock(&running_mutex);
running_threads = 0;
total_num_streams = 0;
pthread_mutex_unlock(&running_mutex);
SLIST_FOREACH(sp, &test->streams, streams) {
pthread_mutex_lock(&running_mutex);
running_threads++; // Count running threads
sp->thread_number = running_threads;
pthread_mutex_unlock(&running_mutex);
total_num_streams++;

if (pthread_create(&(sp->thr), &attr, &iperf_client_worker_run, sp) != 0) {
i_errno = IEPTHREADCREATE;
goto cleanup_and_fail;
}
if (test->debug_level >= DEBUG_LEVEL_INFO) {
iperf_printf(test, "Thread FD %d created\n", sp->socket);
iperf_printf(test, "Thread number %d using FD %d created\n", sp->thread_number, sp->socket);
}
}
if (test->debug_level >= DEBUG_LEVEL_INFO) {
Expand Down Expand Up @@ -721,22 +742,28 @@ iperf_run_client(struct iperf_test * test)
if (sp->sender) {
int rc;
sp->done = 1;
rc = pthread_cancel(sp->thr);
if (rc != 0 && rc != ESRCH) {
i_errno = IEPTHREADCANCEL;
errno = rc;
iperf_err(test, "sender cancel in pthread_cancel - %s", iperf_strerror(i_errno));
goto cleanup_and_fail;
}
rc = pthread_join(sp->thr, NULL);
if (rc != 0 && rc != ESRCH) {
i_errno = IEPTHREADJOIN;
errno = rc;
iperf_err(test, "sender cancel in pthread_join - %s", iperf_strerror(i_errno));
goto cleanup_and_fail;
}
if (test->debug_level >= DEBUG_LEVEL_INFO) {
iperf_printf(test, "Thread FD %d stopped\n", sp->socket);
if (sp->thread_number > 0) { // if thread was created
rc = pthread_cancel(sp->thr);
if (rc != 0 && rc != ESRCH) {
i_errno = IEPTHREADCANCEL;
errno = rc;
iperf_err(test, "sender cancel in pthread_cancel of thread %d - %s", sp->thread_number, iperf_strerror(i_errno));
goto cleanup_and_fail;
}
rc = pthread_join(sp->thr, NULL);
if (rc != 0 && rc != ESRCH) {
i_errno = IEPTHREADJOIN;
errno = rc;
iperf_err(test, "sender cancel in pthread_join of thread %d - %s", sp->thread_number, iperf_strerror(i_errno));
goto cleanup_and_fail;
}
if (test->debug_level >= DEBUG_LEVEL_INFO) {
iperf_printf(test, "Thread number %d FD %d stopped\n", sp->thread_number, sp->socket);
}
sp->thread_number = 0;
} else {
if (test->debug_level >= DEBUG_LEVEL_INFO)
iperf_printf(test, "Not stopping thread for FD %d as it was not created\n", sp->socket);
}
}
}
Expand All @@ -751,6 +778,14 @@ iperf_run_client(struct iperf_test * test)
if (iperf_set_send_state(test, TEST_END) != 0)
goto cleanup_and_fail;
}

/* Terminate if one of the threads failed */
if (running_threads != total_num_streams) {
i_errno = IEPTHREADNOTRUNNING;
iperf_err(test, "Number of running threads is %d but expected %d", running_threads, test->num_streams);
goto cleanup_and_fail;
}

}
}

Expand All @@ -759,22 +794,28 @@ iperf_run_client(struct iperf_test * test)
if (!sp->sender) {
int rc;
sp->done = 1;
rc = pthread_cancel(sp->thr);
if (rc != 0 && rc != ESRCH) {
i_errno = IEPTHREADCANCEL;
errno = rc;
iperf_err(test, "receiver cancel in pthread_cancel - %s", iperf_strerror(i_errno));
goto cleanup_and_fail;
}
rc = pthread_join(sp->thr, NULL);
if (rc != 0 && rc != ESRCH) {
i_errno = IEPTHREADJOIN;
errno = rc;
iperf_err(test, "receiver cancel in pthread_join - %s", iperf_strerror(i_errno));
goto cleanup_and_fail;
}
if (test->debug_level >= DEBUG_LEVEL_INFO) {
iperf_printf(test, "Thread FD %d stopped\n", sp->socket);
if (sp->thread_number > 0) { // if thread was created
rc = pthread_cancel(sp->thr);
if (rc != 0 && rc != ESRCH) {
i_errno = IEPTHREADCANCEL;
errno = rc;
iperf_err(test, "receiver cancel in pthread_cancel of thread %d - %s", sp->thread_number, iperf_strerror(i_errno));
goto cleanup_and_fail;
}
rc = pthread_join(sp->thr, NULL);
if (rc != 0 && rc != ESRCH) {
i_errno = IEPTHREADJOIN;
errno = rc;
iperf_err(test, "receiver cancel in pthread_join of thread %d - %s", sp->thread_number, iperf_strerror(i_errno));
goto cleanup_and_fail;
}
if (test->debug_level >= DEBUG_LEVEL_INFO) {
iperf_printf(test, "Thread number %d FD %d stopped\n", sp->thread_number, sp->socket);
}
sp->thread_number = 0;
} else {
if (test->debug_level >= DEBUG_LEVEL_INFO)
iperf_printf(test, "Not stopping thread for FD %d as it was not created\n", sp->socket);
}
}
}
Expand All @@ -800,20 +841,26 @@ iperf_run_client(struct iperf_test * test)
SLIST_FOREACH(sp, &test->streams, streams) {
sp->done = 1;
int rc;
rc = pthread_cancel(sp->thr);
if (rc != 0 && rc != ESRCH) {
i_errno = IEPTHREADCANCEL;
errno = rc;
iperf_err(test, "cleanup_and_fail in pthread_cancel - %s", iperf_strerror(i_errno));
}
rc = pthread_join(sp->thr, NULL);
if (rc != 0 && rc != ESRCH) {
i_errno = IEPTHREADJOIN;
errno = rc;
iperf_err(test, "cleanup_and_fail in pthread_join - %s", iperf_strerror(i_errno));
}
if (test->debug_level >= DEBUG_LEVEL_INFO) {
iperf_printf(test, "Thread FD %d stopped\n", sp->socket);
if (sp->thread_number > 0) { // if thread was created
rc = pthread_cancel(sp->thr);
if (rc != 0 && rc != ESRCH) {
i_errno = IEPTHREADCANCEL;
errno = rc;
iperf_err(test, "cleanup_and_fail in pthread_cancel of thread %d - %s", sp->thread_number, iperf_strerror(i_errno));
}
rc = pthread_join(sp->thr, NULL);
if (rc != 0 && rc != ESRCH) {
i_errno = IEPTHREADJOIN;
errno = rc;
iperf_err(test, "cleanup_and_fail in pthread_join of thread %d - %s", sp->thread_number, iperf_strerror(i_errno));
}
if (test->debug_level >= DEBUG_LEVEL_INFO) {
iperf_printf(test, "Thread number %d FD %d stopped\n", sp->thread_number, sp->socket);
}
sp->thread_number = 0;
} else {
if (test->debug_level >= DEBUG_LEVEL_INFO)
iperf_printf(test, "Not stopping thread for FD %d as it was not created\n", sp->socket);
}
}
if (test->debug_level >= DEBUG_LEVEL_INFO) {
Expand Down
4 changes: 4 additions & 0 deletions src/iperf_error.c
Original file line number Diff line number Diff line change
Expand Up @@ -509,6 +509,10 @@ iperf_strerror(int int_errno)
snprintf(errstr, len, "unable to destroy thread attributes");
perr = 1;
break;
case IEPTHREADNOTRUNNING:
snprintf(errstr, len, "a thread stopped running unexpectedly");
perr = 1;
break;
default:
snprintf(errstr, len, "int_errno=%d", int_errno);
perr = 1;
Expand Down
2 changes: 1 addition & 1 deletion src/iperf_sctp.c
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ iperf_sctp_recv(struct iperf_stream *sp)
sp->result->bytes_received_this_interval += r;
}
else {
if (sp->test->debug)
if (sp->test->debug_level >= DEBUG_LEVEL_DEBUG)
printf("Late receive, state = %d\n", sp->test->state);
}

Expand Down
66 changes: 51 additions & 15 deletions src/iperf_server_api.c
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,10 @@
#endif /* TCP_CA_NAME_MAX */
#endif /* HAVE_TCP_CONGESTION */

// variable for number of active threads count
static volatile int running_threads = 0;
static pthread_mutex_t running_mutex = PTHREAD_MUTEX_INITIALIZER;

void *
iperf_server_worker_run(void *s) {
struct iperf_stream *sp = (struct iperf_stream *) s;
Expand All @@ -88,6 +92,12 @@ iperf_server_worker_run(void *s) {
return NULL;

cleanup_and_fail:
if (test->debug_level >= DEBUG_LEVEL_INFO) {
iperf_printf(sp->test, "Thread number %d FD %d terminated unexpectedly\n", sp->thread_number, sp->socket);
}
pthread_mutex_lock(&running_mutex);
running_threads--; // Indicate that the thread failed
pthread_mutex_unlock(&running_mutex);
return NULL;
}

Expand Down Expand Up @@ -418,20 +428,26 @@ cleanup_server(struct iperf_test *test)
SLIST_FOREACH(sp, &test->streams, streams) {
int rc;
sp->done = 1;
rc = pthread_cancel(sp->thr);
if (rc != 0 && rc != ESRCH) {
i_errno = IEPTHREADCANCEL;
errno = rc;
iperf_err(test, "cleanup_server in pthread_cancel - %s", iperf_strerror(i_errno));
}
rc = pthread_join(sp->thr, NULL);
if (rc != 0 && rc != ESRCH) {
i_errno = IEPTHREADJOIN;
errno = rc;
iperf_err(test, "cleanup_server in pthread_join - %s", iperf_strerror(i_errno));
}
if (test->debug_level >= DEBUG_LEVEL_INFO) {
iperf_printf(test, "Thread FD %d stopped\n", sp->socket);
if (sp->thread_number > 0) { // if thread was created
rc = pthread_cancel(sp->thr);
if (rc != 0 && rc != ESRCH) {
i_errno = IEPTHREADCANCEL;
errno = rc;
iperf_err(test, "cleanup_server in pthread_cancel of thread %d - %s", sp->thread_number, iperf_strerror(i_errno));
}
rc = pthread_join(sp->thr, NULL);
if (rc != 0 && rc != ESRCH) {
i_errno = IEPTHREADJOIN;
errno = rc;
iperf_err(test, "cleanup_server in pthread_join of thread %d - %s", sp->thread_number, iperf_strerror(i_errno));
}
if (test->debug_level >= DEBUG_LEVEL_INFO) {
iperf_printf(test, "Thread number %d FD %d stopped\n", sp->thread_number, sp->socket);
}
sp->thread_number = 0;
} else {
if (test->debug_level >= DEBUG_LEVEL_INFO)
iperf_printf(test, "Not stopping thread for FD %d as it was not created\n", sp->socket);
}
}
i_errno = i_errno_save;
Expand Down Expand Up @@ -509,6 +525,7 @@ iperf_run_server(struct iperf_test *test)
int64_t t_usecs;
int64_t timeout_us;
int64_t rcv_timeout_us;
int total_num_streams = 0;

if (test->logfile)
if (iperf_open_logfile(test) < 0)
Expand Down Expand Up @@ -870,14 +887,24 @@ iperf_run_server(struct iperf_test *test)
cleanup_server(test);
};

pthread_mutex_lock(&running_mutex);
running_threads = 0;
total_num_streams = 0;
pthread_mutex_unlock(&running_mutex);
SLIST_FOREACH(sp, &test->streams, streams) {
pthread_mutex_lock(&running_mutex);
running_threads++; // Count running threads
sp->thread_number = running_threads;
pthread_mutex_unlock(&running_mutex);
total_num_streams++;

if (pthread_create(&(sp->thr), &attr, &iperf_server_worker_run, sp) != 0) {
i_errno = IEPTHREADCREATE;
cleanup_server(test);
return -1;
}
if (test->debug_level >= DEBUG_LEVEL_INFO) {
iperf_printf(test, "Thread FD %d created\n", sp->socket);
iperf_printf(test, "Thread number %d FD %d created\n", sp->thread_number, sp->socket);
}
}
if (test->debug_level >= DEBUG_LEVEL_INFO) {
Expand All @@ -891,6 +918,15 @@ iperf_run_server(struct iperf_test *test)
}
}

/* Terminate if any thread failed */
if (test->state == TEST_RUNNING) {
if (running_threads != total_num_streams) {
i_errno = IEPTHREADNOTRUNNING;
iperf_err(test, "Number of running threads is %d but expected %d", running_threads, test->num_streams);
cleanup_server(test);
}
}

if (result == 0 ||
(timeout != NULL && timeout->tv_sec == 0 && timeout->tv_usec == 0)) {
/* Run the timers. */
Expand Down
2 changes: 1 addition & 1 deletion src/iperf_tcp.c
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ iperf_tcp_recv(struct iperf_stream *sp)
sp->result->bytes_received_this_interval += r;
}
else {
if (sp->test->debug)
if (sp->test->debug_level >= DEBUG_LEVEL_DEBUG)
printf("Late receive, state = %d\n", sp->test->state);
}

Expand Down
2 changes: 1 addition & 1 deletion src/iperf_udp.c
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ iperf_udp_recv(struct iperf_stream *sp)
sp->jitter += (d - sp->jitter) / 16.0;
}
else {
if (sp->test->debug)
if (sp->test->debug_level >= DEBUG_LEVEL_DEBUG)
printf("Late receive, state = %d\n", sp->test->state);
}

Expand Down
Loading