Skip to content

Commit 78df1f7

Browse files
committed
Complete exercise 22.4
1 parent 1d9fe10 commit 78df1f7

File tree

2 files changed

+170
-0
lines changed

2 files changed

+170
-0
lines changed

chapter_22/README.md

+25
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,31 @@ Working on job 1ok
7272

7373
**4. Check for lazy workers. Change the work wanted function to return `{JobNumber, JobTime, F}` where JobTime is the number of seconds that the worker as to complete the job by. At `JobTime - 1`, the server should send a `hurry_up` message to the worker if it has not finished the job. And at time `JobTime + 1`, it should kill the worker process with an `exit(Pid, youre_fired)` call.**
7474

75+
Solution in the [exercise_4/](exercise_4/) directory.
76+
```
77+
erl
78+
1> job_centre:start_link().
79+
{ok,<0.67.0>}
80+
2> job_centre:add_job(fun() -> io:format("Working on job 1") end).
81+
1
82+
3> job_centre:add_job(fun() -> io:format("Working on job 2") end).
83+
2
84+
4> spawn(fun() ->
85+
{JobID, JobFun} = job_centre:work_wanted(),
86+
JobFun(),
87+
receive
88+
Msg ->
89+
io:format("~p", [Msg])
90+
after 3000 ->
91+
ok
92+
end
93+
end).
94+
Working on job 1<0.71.0>
95+
Worker process <0.71.0> crashed with reason test_failureDOWN message from unknown process
96+
5> job_centre:statistics().
97+
[{1,pending},{2,pending}]
98+
```
99+
75100
**5. Implement a trade union server to monitor the workers. Check that they are not fired without being sent a warning.**
76101

77102
Not going to implement this as it's not a design pattern I've seen regularly used in OTP applications.

chapter_22/exercise_4/job_centre.erl

+145
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,145 @@
1+
-module(job_centre).
2+
3+
-behaviour(gen_server).
4+
5+
-define(JOB_TIME, 2000).
6+
7+
% API
8+
-export([start_link/0, add_job/1, work_wanted/0, job_done/1, statistics/0]).
9+
10+
%% gen_server callbacks
11+
-export([init/1,
12+
handle_call/3,
13+
handle_cast/2,
14+
handle_info/2,
15+
terminate/2,
16+
code_change/3]).
17+
18+
% API functions
19+
start_link() ->
20+
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
21+
22+
add_job(Fun) ->
23+
gen_server:call(?MODULE, {add_job, Fun}).
24+
25+
work_wanted() ->
26+
gen_server:call(?MODULE, checkout_job).
27+
28+
job_done(JobId) ->
29+
gen_server:call(?MODULE, {remove_job, JobId}).
30+
31+
statistics() ->
32+
gen_server:call(?MODULE, get_statistics).
33+
34+
% gen_server callbacks
35+
init([]) ->
36+
% Initial the gen_server state with an integer for future IDs and a ets
37+
% table
38+
{ok, {0, ets:new(?MODULE,[])}}.
39+
40+
handle_call({add_job, Fun}, _From, {LastId, Tab}) ->
41+
% Generate another job ID by incrementing the last one
42+
JobId = LastId + 1,
43+
44+
% Insert the ID and work function into the ets table
45+
true = ets:insert(Tab, job(JobId, Fun)),
46+
{reply, JobId, {JobId, Tab}};
47+
handle_call(checkout_job, {FromPid, _} = _From, {_, Tab} = State) ->
48+
% Find a job
49+
case ets:match(Tab, {'$1', '$2', '_', '_', '_'}) of
50+
[] ->
51+
% If none return `no`
52+
{reply, no, State};
53+
[[JobId, JobFun]|_] ->
54+
% If one is found mark it as in_progess and return it to the caller
55+
% Monitor `From` to ensure process doesn't crash while handling job
56+
Ref = monitor(process, FromPid),
57+
ets:insert(Tab, job(JobId, JobFun, in_progress, FromPid, Ref)),
58+
59+
% Set timer
60+
_TimerRef = erlang:send_after(?JOB_TIME, self(), {almost_up, JobId}),
61+
62+
{reply, {JobId, JobFun}, State}
63+
end;
64+
handle_call({remove_job, JobId}, _From, {_, Tab} = State) ->
65+
% Mark the job as done in the ETS table
66+
case find_job(Tab, JobId) of
67+
false ->
68+
{reply, not_found, State};
69+
{JobId, JobFun, _State, _Pid, _Ref} ->
70+
ets:insert(Tab, job(JobId, JobFun, finished)),
71+
{reply, ok, State}
72+
end;
73+
handle_call(get_statistics, _From, {_, Tab} = State) ->
74+
% Dump job data from ets table
75+
Jobs = ets:tab2list(Tab),
76+
77+
% Format the data and return the state of each job
78+
Results = lists:map(fun({JobID, _JobFun, JobState, _Pid, _Ref}) ->
79+
{JobID, JobState}
80+
end, Jobs),
81+
{reply, Results, State}.
82+
83+
handle_cast(_Msg, State) ->
84+
{noreply, State}.
85+
86+
handle_info({youre_fired, JobId}, {_, Tab} = State) ->
87+
case find_job(Tab, JobId) of
88+
false -> ok;
89+
{JobId, JobFun, _State, Pid, Ref} ->
90+
% Kill the job if it hasn't already finished
91+
exit(Pid, youre_fired),
92+
demonitor(Ref),
93+
ets:insert(Tab, job(JobId, JobFun))
94+
end,
95+
{noreply, State};
96+
handle_info({almost_up, JobId}, {_, Tab} = State) ->
97+
_TimerRef = erlang:send_after(?JOB_TIME, self(), {almost_up, JobId}),
98+
case find_job(Tab, JobId) of
99+
false -> ok;
100+
{JobId, _JobFun, _State, Pid, _Ref} ->
101+
Pid ! hurry_up,
102+
_TimerRef2 = erlang:send_after(2000, self(), {youre_fired, JobId})
103+
end,
104+
{noreply, State};
105+
handle_info({'DOWN', Ref, process, Pid, Reason}, {_, Tab} = State) ->
106+
LookupByRefAndPid = [{{'_', '_', '_','$1','$2'},
107+
[{'=:=','$1',{const,Pid}},{'=:=','$2',{const,Ref}}],
108+
['$_']}],
109+
case ets:match(Tab, LookupByRefAndPid) of
110+
[] ->
111+
% Unknown down message
112+
io:format("DOWN message from unknown process");
113+
[{JobId, JobFun, _State, Pid, Ref}|_] ->
114+
io:format("Worker process ~p crashed with reason ~p", [Pid, Reason]),
115+
ets:insert(Tab, job(JobId, JobFun))
116+
end,
117+
{noreply, State};
118+
119+
handle_info(_Info, State) ->
120+
{noreply, State}.
121+
122+
terminate(_Reason, _State) ->
123+
ok.
124+
125+
code_change(_OldVsn, State, _Extra) ->
126+
{ok, State}.
127+
128+
% Helper functions for manipulating job state
129+
job(JobId, JobFun) ->
130+
job(JobId, JobFun, pending).
131+
132+
job(JobId, JobFun, State) ->
133+
job(JobId, JobFun, State, undefined, undefined).
134+
135+
job(JobId, JobFun, State, Pid, Ref) ->
136+
{JobId, JobFun, State, Pid, Ref}.
137+
138+
% Helper function for looking up jobs
139+
find_job(Tab, JobId) ->
140+
case ets:lookup(Tab, JobId) of
141+
[] ->
142+
false;
143+
[{JobId, JobFun, State, Pid, Ref}|_] ->
144+
{JobId, JobFun, State, Pid, Ref}
145+
end.

0 commit comments

Comments
 (0)