Skip to content

hibernate sockjs_session process #25

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 8 commits into
base: dev
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
@@ -111,10 +111,10 @@ simple. It has just a couple of methods:
after the client was last connected (in ms).
* `{response_limit, integer()}` - the maximum size of a single
http streaming response (in bytes).
* `{hib_timeout, integer() | hibernate}` - hibernate websocket
process after hib_timeout milliseconds of inactivity (5000 by
default) to reduce memory footprint. Set to 'hibernate' atom to
hibernate always (may be inefficient). (implementation is
* `{hib_timeout, integer() | hibernate | infinity}` - hibernate
websocket process after hib_timeout milliseconds of inactivity
(5000 by default) to reduce memory footprint. Set to 'hibernate'
atom to hibernate always (may be inefficient). (implementation is
incomplete, see #15)
* `{logger, fun/3}` - a function called on every request, used
to print request to the logs (or on the screen by default).
45 changes: 24 additions & 21 deletions src/sockjs_action.erl
Original file line number Diff line number Diff line change
@@ -189,27 +189,8 @@ chunk_start(Req, Headers, ContentType) ->
reply_loop(Req, SessionId, ResponseLimit, Fmt, Service) ->
Req0 = sockjs_http:hook_tcp_close(Req),
case sockjs_session:reply(SessionId) of
wait -> receive
%% In Cowboy we need to capture async
%% messages from the tcp connection -
%% ie: {active, once}.
{tcp_closed, _} ->
Req0;
%% In Cowboy we may in theory get real
%% http requests, this is bad.
{tcp, _S, Data} ->
error_logger:error_msg(
"Received unexpected data on a "
"long-polling http connection: ~p. "
"Connection aborted.~n",
[Data]),
Req1 = sockjs_http:abruptly_kill(Req),
Req1;
go ->
Req1 = sockjs_http:unhook_tcp_close(Req0),
reply_loop(Req1, SessionId, ResponseLimit,
Fmt, Service)
end;
{wait, hibernate} -> catch erlang:hibernate(?MODULE, reply_loop_wait, [Req0, SessionId, ResponseLimit, Fmt, Service, infinity]);
{wait, Timeout} -> reply_loop_wait(Req0, SessionId, ResponseLimit, Fmt, Service, Timeout);
session_in_use -> Frame = sockjs_util:encode_frame({close, ?STILL_OPEN}),
chunk_end(Req0, Frame, Fmt);
{close, Frame} -> Frame1 = sockjs_util:encode_frame(Frame),
@@ -222,6 +203,28 @@ reply_loop(Req, SessionId, ResponseLimit, Fmt, Service) ->
Fmt, Service)
end.

reply_loop_wait(Req0, SessionId, ResponseLimit, Fmt, Service, Timeout) ->
receive
%% In Cowboy we need to capture async
%% messages from the tcp connection -
%% ie: {active, once}.
{tcp_closed, _} -> Req0;
%% In Cowboy we may in theory get real
%% http requests, this is bad.
{tcp, _S, Data} ->
error_logger:error_msg(
"Received unexpected data on a "
"long-polling http connection: ~p. "
"Connection aborted.~n",
[Data]),
sockjs_http:abruptly_kill(Req0);
go ->
Req1 = sockjs_http:unhook_tcp_close(Req0),
reply_loop(Req1, SessionId, ResponseLimit,
Fmt, Service)
after
Timeout -> catch erlang:hibernate(?MODULE, reply_loop_wait, [Req0, SessionId, ResponseLimit, Fmt, Service, infinity])
end.
reply_loop0(Req, _SessionId, ResponseLimit, _Fmt, _Service) when ResponseLimit =< 0 ->
chunk_end(Req);
reply_loop0(Req, SessionId, ResponseLimit, Fmt, Service) ->
41 changes: 15 additions & 26 deletions src/sockjs_cowboy_handler.erl
Original file line number Diff line number Diff line change
@@ -31,7 +31,7 @@ terminate(_Req, _Service) ->
%% --------------------------------------------------------------------------

websocket_init(_TransportName, Req,
Service = #service{logger = Logger, hib_timeout = HibTimeout}) ->
Service = #service{logger = Logger}) ->
Req0 = Logger(Service, {cowboy, Req}, websocket),

{Info, Req1} = sockjs_handler:extract_info(Req0),
@@ -43,43 +43,32 @@ websocket_init(_TransportName, Req,
{WS, Req2}
end,
self() ! go,
mh({ok, Req3, {RawWebsocket, SessionPid, {undefined, HibTimeout}}}).
{ok, Req3, {RawWebsocket, SessionPid}}.

websocket_handle({text, Data}, Req, {RawWebsocket, SessionPid, _HT} = S) ->
websocket_handle({text, Data}, Req, {RawWebsocket, SessionPid} = S) ->
case sockjs_ws_handler:received(RawWebsocket, SessionPid, Data) of
ok -> mh({ok, Req, S});
shutdown -> {shutdown, Req, S}
{ok, hibernate} -> {ok, Req, S, hibernate};
{ok, _Timeout} -> {ok, Req, S};
shutdown -> {shutdown, Req, S}
end;
websocket_handle(_Unknown, Req, S) ->
{shutdown, Req, S}.

websocket_info(go, Req, {RawWebsocket, SessionPid, _HT} = S) ->
websocket_info(go, Req, {RawWebsocket, SessionPid} = S) ->
case sockjs_ws_handler:reply(RawWebsocket, SessionPid) of
wait -> mh({ok, Req, S});
{ok, Data} -> self() ! go,
{reply, {text, Data}, Req, S};
{close, <<>>} -> {shutdown, Req, S};
{close, Data} -> self() ! shutdown,
{reply, {text, Data}, Req, S}
{wait, hibernate} -> {ok, Req, S, hibernate};
{wait, _Timeout} -> {ok, Req, S};
{ok, Data} -> self() ! go,
{reply, {text, Data}, Req, S};
{close, <<>>} -> {shutdown, Req, S};
{close, Data} -> self() ! shutdown,
{reply, {text, Data}, Req, S}
end;
websocket_info(shutdown, Req, S) ->
{shutdown, Req, S};
websocket_info(hibernate_triggered, Req, S) ->
{ok, Req, S, hibernate}.

websocket_terminate(_Reason, _Req, {RawWebsocket, SessionPid, _HT}) ->
websocket_terminate(_Reason, _Req, {RawWebsocket, SessionPid}) ->
sockjs_ws_handler:close(RawWebsocket, SessionPid),
ok.

%% --------------------------------------------------------------------------

mh({ok, Req, {RawWebsocket, SessionPid, {TRef, hibernate}}}) ->
{ok, Req, {RawWebsocket, SessionPid, {TRef, hibernate}}, hibernate};

mh({ok, Req, {RawWebsocket, SessionPid, {TRef, HibTimeout}}}) ->
case TRef of
undefined -> ok;
_ -> sockjs_util:cancel_send_after(TRef, hibernate_triggered)
end,
TRef2 = erlang:send_after(HibTimeout, self(), hibernate_triggered),
{ok, Req, {RawWebsocket, SessionPid, {TRef2, HibTimeout}}}.
2 changes: 1 addition & 1 deletion src/sockjs_internal.hrl
Original file line number Diff line number Diff line change
@@ -15,7 +15,7 @@
disconnect_delay :: non_neg_integer(),
heartbeat_delay :: non_neg_integer(),
response_limit :: non_neg_integer(),
hib_timeout :: non_neg_integer() | hibernate,
hib_timeout :: non_neg_integer() | hibernate | infinity,
logger :: logger()
}).

56 changes: 44 additions & 12 deletions src/sockjs_session.erl
Original file line number Diff line number Diff line change
@@ -20,6 +20,8 @@
disconnect_delay = 5000 :: non_neg_integer(),
heartbeat_tref :: reference() | triggered,
heartbeat_delay = 25000 :: non_neg_integer(),
hibernate_tref :: reference(),
hibernate_delay :: non_neg_integer() | hibernate | infinity,
ready_state = connecting :: connecting | open | closed,
close_msg :: {non_neg_integer(), string()},
callback,
@@ -56,8 +58,8 @@ maybe_create(SessionId, Service, Info) ->
-spec received(list(iodata()), session_or_pid()) -> ok.
received(Messages, SessionPid) when is_pid(SessionPid) ->
case gen_server:call(SessionPid, {received, Messages}, infinity) of
ok -> ok;
error -> throw(no_session)
{ok, Timeout} -> {ok, Timeout};
error -> throw(no_session)
%% TODO: should we respond 404 when session is closed?
end;
received(Messages, SessionId) ->
@@ -166,28 +168,48 @@ emit(What, State = #session{callback = Callback,
ok -> State
end.

mh(#session{hibernate_delay = hibernate} = State) -> {State, hibernate};

mh(#session{hibernate_delay = infinity} = State) -> {State, infinity};

mh(#session{hibernate_delay = HibTimeout, heartbeat_delay = HeartbeatDelay} = State) when HibTimeout >= HeartbeatDelay -> {State, infinity};

mh(#session{hibernate_delay = HibTimeout, hibernate_tref = TRef} = State) ->
case TRef of
undefined -> ok;
_ -> sockjs_util:cancel_send_after(TRef, hibernate_triggered)
end,
TRef2 = erlang:send_after(HibTimeout, self(), hibernate_triggered),
{State#session{hibernate_tref = TRef2}, infinity}.

%% --------------------------------------------------------------------------

-spec init({session_or_undefined(), service(), info()}) -> {ok, #session{}}.
-spec init({session_or_undefined(), service(), info()}) -> {ok, #session{}, infinity | hibernate}.
init({SessionId, #service{callback = Callback,
state = UserState,
disconnect_delay = DisconnectDelay,
hib_timeout = HibTimeout,
heartbeat_delay = HeartbeatDelay}, Info}) ->
case SessionId of
undefined -> ok;
_Else -> ets:insert(?ETS, {SessionId, self()})
end,
process_flag(trap_exit, true),
TRef = erlang:send_after(DisconnectDelay, self(), session_timeout),
{ok, #session{id = SessionId,
State = #session{id = SessionId,
callback = Callback,
state = UserState,
response_pid = undefined,
disconnect_tref = TRef,
disconnect_delay = DisconnectDelay,
heartbeat_tref = undefined,
heartbeat_delay = HeartbeatDelay,
handle = {?MODULE, {self(), Info}}}}.
hibernate_tref = undefined,
hibernate_delay = HibTimeout,
handle = {?MODULE, {self(), Info}}},
{State2, Timeout} = mh(State),
{ok, State2, Timeout}.



handle_call({reply, Pid, _Multiple}, _From, State = #session{
@@ -212,10 +234,10 @@ handle_call({reply, Pid, _Multiple}, _From, State = #session{
{reply, session_in_use, State};

handle_call({reply, Pid, Multiple}, _From, State = #session{
ready_state = open,
response_pid = RPid,
heartbeat_tref = HeartbeatTRef,
outbound_queue = Q})
ready_state = open,
response_pid = RPid,
heartbeat_tref = HeartbeatTRef,
outbound_queue = Q})
when RPid == undefined orelse RPid == Pid ->
{Messages, Q1} = case Multiple of
true -> {queue:to_list(Q), queue:new()};
@@ -228,7 +250,8 @@ handle_call({reply, Pid, Multiple}, _From, State = #session{
{[], triggered} -> State1 = unmark_waiting(Pid, State),
{reply, {ok, {heartbeat, nil}}, State1};
{[], _TRef} -> State1 = mark_waiting(Pid, State),
{reply, wait, State1};
{State2, Timeout} = mh(State1),
{reply, {wait, Timeout}, State2, Timeout};
_More -> State1 = unmark_waiting(Pid, State),
{reply, {ok, {data, Messages}},
State1#session{outbound_queue = Q1}}
@@ -238,7 +261,8 @@ handle_call({received, Messages}, _From, State = #session{ready_state = open}) -
State2 = lists:foldl(fun(Msg, State1) ->
emit({recv, iolist_to_binary(Msg)}, State1)
end, State, Messages),
{reply, ok, State2};
{State3, Timeout} = mh(State2),
{reply, {ok, Timeout}, State3, Timeout};

handle_call({received, _Data}, _From, State = #session{ready_state = _Any}) ->
{reply, error, State};
@@ -281,9 +305,17 @@ handle_info(force_shutdown, State) ->
handle_info(session_timeout, State = #session{response_pid = undefined}) ->
{stop, normal, State};

handle_info(hibernate_triggered, #session{response_pid = RPid} = State) ->
case RPid of
undefined -> ok;
_ -> RPid ! hibernate_triggered
end,
{noreply, State#session{hibernate_tref = undefined}, hibernate};

handle_info(heartbeat_triggered, State = #session{response_pid = RPid}) when RPid =/= undefined ->
RPid ! go,
{noreply, State#session{heartbeat_tref = triggered}};
{State2, Timeout} = mh(State),
{noreply, State2#session{heartbeat_tref = triggered}, Timeout};

handle_info(Info, State) ->
{stop, {odd_info, Info}, State}.
12 changes: 6 additions & 6 deletions src/sockjs_ws_handler.erl
Original file line number Diff line number Diff line change
@@ -25,9 +25,9 @@ received(rawwebsocket, SessionPid, Data) ->

session_received(Messages, SessionPid) ->
try sockjs_session:received(Messages, SessionPid) of
ok -> ok
{ok, Timeout} -> {ok, Timeout}
catch
no_session -> shutdown
no_session -> shutdown
end.

-spec reply(websocket|rawwebsocket, pid()) -> {close|open, binary()} | wait.
@@ -36,8 +36,8 @@ reply(websocket, SessionPid) ->
{W, Frame} when W =:= ok orelse W =:= close->
Frame1 = sockjs_util:encode_frame(Frame),
{W, iolist_to_binary(Frame1)};
wait ->
wait
{wait, Timeout} ->
{wait, Timeout}
end;
reply(rawwebsocket, SessionPid) ->
case sockjs_session:reply(SessionPid, false) of
@@ -48,8 +48,8 @@ reply(rawwebsocket, SessionPid) ->
{data, [Msg]} -> {ok, iolist_to_binary(Msg)};
{heartbeat, nil} -> reply(rawwebsocket, SessionPid)
end;
wait ->
wait
{wait, Timeout} ->
{wait, Timeout}
end.

-spec close(websocket|rawwebsocket, pid()) -> ok.