Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Module load and unload support #59

Merged
merged 4 commits into from
Nov 9, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
176 changes: 136 additions & 40 deletions src/module-xrdp-sink.c
Original file line number Diff line number Diff line change
Expand Up @@ -81,11 +81,21 @@ PA_MODULE_USAGE(
"format=<sample format> "
"rate=<sample rate> "
"channels=<number of channels> "
"channel_map=<channel map>");
"channel_map=<channel map> "
"xrdp_socket_path=<path to XRDP sockets> "
"xrdp_pulse_sink_socket=<name of sink socket>");

#define DEFAULT_SINK_NAME "xrdp-sink"
#define BLOCK_USEC 30000
//#define BLOCK_USEC (PA_USEC_PER_SEC * 2)
#define UNUSED_VAR(x) ((void) (x))

/* support for the set_state_in_io_thread callback was added in 11.99.1 */
#if defined(PA_CHECK_VERSION) && PA_CHECK_VERSION(11, 99, 1)
#define USE_SET_STATE_IN_IO_THREAD_CB
#else
#undef USE_SET_STATE_IN_IO_THREAD_CB
#endif

struct userdata {
pa_core *core;
Expand All @@ -102,10 +112,9 @@ struct userdata {
pa_usec_t last_send_time;

int fd; /* unix domain socket connection to xrdp chansrv */
int display_num;
int skip_bytes;
int got_max_latency;

char *sink_socket;
};

static const char* const valid_modargs[] = {
Expand All @@ -115,6 +124,8 @@ static const char* const valid_modargs[] = {
"rate",
"channels",
"channel_map",
"xrdp_socket_path",
"xrdp_pulse_sink_socket",
NULL
};

Expand All @@ -127,27 +138,30 @@ static int sink_process_msg(pa_msgobject *o, int code, void *data,
pa_usec_t now;
long lat;

pa_log_debug("sink_process_msg: code %d", code);

switch (code) {

case PA_SINK_MESSAGE_SET_VOLUME: /* 3 */
case PA_SINK_MESSAGE_SET_VOLUME:
pa_log_debug("sink_process_msg: PA_SINK_MESSAGE_SET_VOLUME");
break;

case PA_SINK_MESSAGE_SET_MUTE: /* 6 */
case PA_SINK_MESSAGE_SET_MUTE:
pa_log_debug("sink_process_msg: PA_SINK_MESSAGE_SET_MUTE");
break;

case PA_SINK_MESSAGE_GET_LATENCY: /* 7 */
case PA_SINK_MESSAGE_GET_LATENCY:
pa_log_debug("sink_process_msg: PA_SINK_MESSAGE_GET_LATENCY");
now = pa_rtclock_now();
lat = u->timestamp > now ? u->timestamp - now : 0ULL;
pa_log_debug("sink_process_msg: lat %ld", lat);
*((pa_usec_t*) data) = lat;
return 0;

case PA_SINK_MESSAGE_GET_REQUESTED_LATENCY: /* 8 */
case PA_SINK_MESSAGE_GET_REQUESTED_LATENCY:
pa_log_debug("sink_process_msg: PA_SINK_MESSAGE_GET_REQUESTED_LATENCY");
break;

case PA_SINK_MESSAGE_SET_STATE: /* 9 */
case PA_SINK_MESSAGE_SET_STATE:
pa_log_debug("sink_process_msg: PA_SINK_MESSAGE_SET_STATE");
if (PA_PTR_TO_UINT(data) == PA_SINK_RUNNING) /* 0 */ {
pa_log("sink_process_msg: running");

Expand All @@ -158,11 +172,40 @@ static int sink_process_msg(pa_msgobject *o, int code, void *data,
}
break;

default:
pa_log_debug("sink_process_msg: code %d", code);

}

return pa_sink_process_msg(o, code, data, offset, chunk);
}

#ifdef USE_SET_STATE_IN_IO_THREAD_CB
/* Called from the IO thread. */
static int sink_set_state_in_io_thread_cb(pa_sink *s,
pa_sink_state_t new_state,
pa_suspend_cause_t new_suspend_cause)
{
struct userdata *u;

UNUSED_VAR(new_suspend_cause);

pa_assert(s);
pa_assert_se(u = s->userdata);

if (s->thread_info.state == PA_SINK_SUSPENDED || s->thread_info.state == PA_SINK_INIT)
{
if (PA_SINK_IS_OPENED(new_state))
{
pa_log_debug("sink_set_state_in_io_thread_cb: set timestamp");
u->timestamp = pa_rtclock_now();
}
}

return 0;
}
#endif /* USE_SET_STATE_IN_IO_THREAD_CB */

static void sink_update_requested_latency_cb(pa_sink *s) {
struct userdata *u;
size_t nbytes;
Expand All @@ -176,15 +219,52 @@ static void sink_update_requested_latency_cb(pa_sink *s) {
u->block_usec = s->thread_info.max_latency;
}
nbytes = pa_usec_to_bytes(u->block_usec, &s->sample_spec);
pa_sink_set_max_rewind_within_thread(s, nbytes);
pa_sink_set_max_request_within_thread(s, nbytes);
}

static void process_rewind(struct userdata *u, pa_usec_t now) {
size_t rewind_nbytes, in_buffer;
pa_usec_t delay;

pa_assert(u);

rewind_nbytes = u->sink->thread_info.rewind_nbytes;

if (!PA_SINK_IS_OPENED(u->sink->thread_info.state) || rewind_nbytes <= 0)
goto do_nothing;

pa_log_debug("Requested to rewind %lu bytes.", (unsigned long) rewind_nbytes);

if (u->timestamp <= now)
goto do_nothing;

delay = u->timestamp - now;
in_buffer = pa_usec_to_bytes(delay, &u->sink->sample_spec);

if (in_buffer <= 0)
goto do_nothing;

if (rewind_nbytes > in_buffer)
rewind_nbytes = in_buffer;

pa_sink_process_rewind(u->sink, rewind_nbytes);
u->timestamp -= pa_bytes_to_usec(rewind_nbytes, &u->sink->sample_spec);

pa_log_debug("Rewound %lu bytes.", (unsigned long) rewind_nbytes);
return;

do_nothing:

pa_sink_process_rewind(u->sink, 0);
}

struct header {
int code;
int bytes;
};

static int get_display_num_from_display(char *display_text) {
static int get_display_num_from_display(const char *display_text) {
int index;
int mode;
int host_index;
Expand Down Expand Up @@ -248,8 +328,6 @@ static int lsend(int fd, char *data, int bytes) {

static int data_send(struct userdata *u, pa_memchunk *chunk) {
char *data;
char *socket_dir;
char *sink_socket;
int bytes;
int sent;
int fd;
Expand All @@ -265,26 +343,7 @@ static int data_send(struct userdata *u, pa_memchunk *chunk) {
fd = socket(PF_LOCAL, SOCK_STREAM, 0);
memset(&s, 0, sizeof(s));
s.sun_family = AF_UNIX;
bytes = sizeof(s.sun_path) - 1;
socket_dir = getenv("XRDP_SOCKET_PATH");
if (socket_dir == NULL || socket_dir[0] == '\0')
{
socket_dir = "/tmp/.xrdp";
}
sink_socket = getenv("XRDP_PULSE_SINK_SOCKET");
if (sink_socket == NULL || sink_socket[0] == '\0')
{
pa_log_debug("Could not obtain sink_socket from environment.");
/* usually it doesn't reach here. if the socket name is not given
via environment variable, use hardcoded name as fallback */
snprintf(s.sun_path, bytes,
"%s/xrdp_chansrv_audio_out_socket_%d", socket_dir, u->display_num);
}
else
{
pa_log_debug("Obtained sink_socket from environment.");
snprintf(s.sun_path, bytes, "%s/%s", socket_dir, sink_socket);
}
pa_strlcpy(s.sun_path, u->sink_socket, sizeof(s.sun_path));
pa_log_debug("trying to connect to %s", s.sun_path);

if (connect(fd, (struct sockaddr *)&s,
Expand Down Expand Up @@ -353,9 +412,6 @@ static void process_render(struct userdata *u, pa_usec_t now) {
int request_bytes;

pa_assert(u);
if (u->got_max_latency) {
return;
}
pa_log_debug("process_render: u->block_usec %llu", (unsigned long long) u->block_usec);
while (u->timestamp < now + u->block_usec) {
request_bytes = u->sink->thread_info.max_request;
Expand All @@ -372,8 +428,6 @@ static void process_render(struct userdata *u, pa_usec_t now) {
static void thread_func(void *userdata) {

struct userdata *u = userdata;
int ret;
pa_usec_t now;

pa_assert(u);

Expand All @@ -391,7 +445,7 @@ static void thread_func(void *userdata) {
now = pa_rtclock_now();
}
if (PA_UNLIKELY(u->sink->thread_info.rewind_requested)) {
pa_sink_process_rewind(u->sink, 0);
process_rewind(u, now);
}
/* Render some data and write it to the socket */
if (PA_SINK_IS_OPENED(u->sink->thread_info.state)) {
Expand Down Expand Up @@ -428,6 +482,35 @@ static void thread_func(void *userdata) {
pa_log_debug("Thread shutting down");
}

static void set_sink_socket(pa_modargs *ma, struct userdata *u) {
const char *socket_dir;
const char *socket_name;
char default_socket_name[64];
size_t nbytes;

socket_dir = pa_modargs_get_value(ma, "xrdp_socket_path",
getenv("XRDP_SOCKET_PATH"));
if (socket_dir == NULL || socket_dir[0] == '\0') {
socket_dir = "/tmp/.xrdp";
}

socket_name = pa_modargs_get_value(ma, "xrdp_pulse_sink_socket",
getenv("XRDP_PULSE_SINK_SOCKET"));
if (socket_name == NULL || socket_name[0] == '\0')
{
int display_num = get_display_num_from_display(getenv("DISPLAY"));

pa_log_debug("Could not obtain sink_socket from environment.");
snprintf(default_socket_name, sizeof(default_socket_name),
"xrdp_chansrv_audio_out_socket_%d", display_num);
socket_name = default_socket_name;
}

nbytes = strlen(socket_dir) + 1 + strlen(socket_name) + 1;
u->sink_socket = pa_xmalloc(nbytes);
snprintf(u->sink_socket, nbytes, "%s/%s", socket_dir, socket_name);
}

int pa__init(pa_module*m) {
struct userdata *u = NULL;
pa_sample_spec ss;
Expand Down Expand Up @@ -484,6 +567,9 @@ int pa__init(pa_module*m) {
}

u->sink->parent.process_msg = sink_process_msg;
#ifdef USE_SET_STATE_IN_IO_THREAD_CB
u->sink->set_state_in_io_thread = sink_set_state_in_io_thread_cb;
#endif
u->sink->update_requested_latency = sink_update_requested_latency_cb;
u->sink->userdata = u;

Expand All @@ -493,9 +579,10 @@ int pa__init(pa_module*m) {
u->block_usec = BLOCK_USEC;
pa_log_debug("3 block_usec %llu", (unsigned long long) u->block_usec);
nbytes = pa_usec_to_bytes(u->block_usec, &u->sink->sample_spec);
pa_sink_set_max_rewind(u->sink, nbytes);
pa_sink_set_max_request(u->sink, nbytes);

u->display_num = get_display_num_from_display(getenv("DISPLAY"));
set_sink_socket(ma, u);

u->fd = -1;

Expand All @@ -512,6 +599,8 @@ int pa__init(pa_module*m) {
goto fail;
}

pa_sink_set_latency_range(u->sink, 0, BLOCK_USEC);

pa_sink_put(u->sink);

pa_modargs_free(ma);
Expand Down Expand Up @@ -566,5 +655,12 @@ void pa__done(pa_module*m) {
pa_rtpoll_free(u->rtpoll);
}

if (u->fd >= 0)
{
close(u->fd);
u->fd = -1;
}

pa_xfree(u->sink_socket);
pa_xfree(u);
}
Loading