Skip to content

Commit 975112c

Browse files
committed
Complete exercise 26.6
1 parent afcf39e commit 975112c

File tree

3 files changed

+118
-3
lines changed

3 files changed

+118
-3
lines changed

chapter_26/README.md

+2-2
Original file line numberDiff line numberDiff line change
@@ -101,8 +101,8 @@ erl -sname holmes
101101
In a third shell call the `run` function and pass in the list of nodes:
102102

103103
```
104-
erl
105-
1> web_profiler:run(['watson@trevor-ThinkPad-E550', 'holmes@trevor-ThinkPad-E550'], 10000).
104+
erl -sname profiler
105+
1> web_profiler:run(['watson@localhost', 'holmes@localhost'], 10000).
106106
182777
107107
```
108108

chapter_26/exercise_3/web_profiler.erl

-1
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@ run(Timeout) ->
1717
% Return the time it took to execute all of the requests
1818
Time.
1919

20-
2120
pmap(F, L) ->
2221
S = self(),
2322
Ref = make_ref(),
+116
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,116 @@
1+
-module(web_profiler).
2+
3+
-export([run/2, ping/2, receive_response/2, do_f/4]).
4+
5+
-define(LIST_OF_WEBSITES, [
6+
"http://stratus3d.com/",
7+
"http://lobste.rs/",
8+
"http://news.ycombinator.com/",
9+
"http://stackoverflow.com/"
10+
]).
11+
12+
% Since there is so much state that must be passed around for the lists:foldl/3
13+
% state I defined a record to make things easier.
14+
-record(pmap_state, {
15+
function,
16+
ref,
17+
max,
18+
nodes,
19+
results = [],
20+
pids = [],
21+
running_processes = 0
22+
}).
23+
24+
25+
run(Nodes, Timeout) ->
26+
% Map over the list
27+
{Time, Result} = timer:tc(fun() ->
28+
pmap(fun(Url) -> ping(Url, Timeout) end, ?LIST_OF_WEBSITES, 2, Nodes)
29+
end),
30+
31+
io:format("Result: ~p~n", [Result]),
32+
33+
% Return the time it took to execute all of the requests
34+
Time.
35+
36+
do_f(Parent, Ref, F, I) ->
37+
Parent ! {self(), Ref, (catch F(I))}.
38+
39+
gather(Pid, Ref) ->
40+
receive
41+
{Pid, Ref, Ret} -> Ret
42+
end.
43+
44+
pmap(F, L, Max, Nodes) ->
45+
Ref = make_ref(),
46+
InitialState = #pmap_state{max = Max, ref = Ref, function = F, nodes = Nodes},
47+
#pmap_state{pids = Pids, results = Result} = lists:foldl(fun map_element/2, InitialState, L),
48+
49+
% Receive the remaining `Max` number results after the foldl
50+
RemainingResults = lists:map(fun(Pid) ->
51+
gather(Pid, Ref)
52+
end, Pids),
53+
54+
% Return the final results
55+
lists:reverse(RemainingResults ++ Result).
56+
57+
map_element(Element, #pmap_state{
58+
running_processes = RunningProcesses,
59+
max = Max,
60+
pids = Pids,
61+
ref = Ref,
62+
function = F,
63+
nodes = Nodes
64+
} = State) when RunningProcesses < Max ->
65+
% Spawn process per item until limit is reached
66+
{Node, NewNodes} = next_node(Nodes),
67+
io:format("spawning~n"),
68+
Pid = spawn_on_node(Node, Ref, F, Element),
69+
State#pmap_state{running_processes = RunningProcesses + 1, pids = [Pid|Pids], nodes = NewNodes};
70+
71+
map_element(Element, #pmap_state{pids = Pids, results = Results, ref = Ref, function = F, nodes = Nodes} = State) ->
72+
% When limit is reached wait until we receive a message from one of the workers
73+
[Pid|RestPids] = lists:reverse(Pids),
74+
io:format("waiting for message back~n"),
75+
Result = gather(Pid, Ref),
76+
77+
% When message is received spawn process for next item in list if one remains
78+
{Node, NewNodes} = next_node(Nodes),
79+
NewPid = spawn_on_node(Node, Ref, F, Element),
80+
State#pmap_state{results = [Result|Results], pids = [NewPid|lists:reverse(RestPids)], nodes = NewNodes}.
81+
82+
spawn_on_node(Node, Ref, F, Element) ->
83+
Self = self(),
84+
spawn(Node, ?MODULE, do_f, [Self, Ref, F, Element]).
85+
86+
next_node([Node|Nodes]) ->
87+
{Node, Nodes ++ [Node]}.
88+
89+
ping(URL, Timeout) ->
90+
{_Protocol, Host, Port, Path} = parse_url(URL),
91+
{ok, Socket} = gen_tcp:connect(Host, Port, [binary, {packet, 0}, {active, false}]),
92+
% Send the request
93+
ok = gen_tcp:send(Socket, io_lib:format("HEAD ~s HTTP/1.0\r\n\r\n", [Path])),
94+
% Time the response
95+
{Time, Result} = timer:tc(fun receive_response/2, [Socket, Timeout]),
96+
97+
% Format the return value of the function
98+
case Result of
99+
timeout ->
100+
timeout;
101+
_ ->
102+
{time, Time}
103+
end.
104+
105+
receive_response(Socket, Timeout) ->
106+
% And receive the response
107+
case gen_tcp:recv(Socket, 0, Timeout) of
108+
{ok, Packet} -> Packet;
109+
{error, timeout} -> timeout
110+
end.
111+
112+
parse_url(Url) ->
113+
{ok, Parsed} = http_uri:parse(Url),
114+
% We ignore the query string for simplicity here
115+
{Protocol, _, Host, Port, Path, _Query} = Parsed,
116+
{Protocol, Host, Port, Path}.

0 commit comments

Comments
 (0)