通过login/logout/send API实现一个消息路由器- 更新mochiweb应用,使其可以从路由器接收消息
- 建立一套分布式的erlang系统,我们可以在不同的节点/主机运行该路由器
- 写个工具,向路由器发送大量消息
- 图形化显示24小时内存实用情况,优化mochiweb应用,使其节省内存开销
- login(Id, Pid) 注册一个进程(进程Id为Pid),为Id接收消息
- logout(Pid)停止接收消息
- send(Id, Msg)向任何以Id登陆的客户端发送消息Msg
- -module(router).
- -behaviour(gen_server).
-
- -export([start_link/0]).
- -export([init/1, handle_call/3, handle_cast/2, handle_info/2,
- terminate/2, code_change/3]).
-
- -export([send/2, login/2, logout/1]).
-
- -define(SERVER, global:whereis_name(?MODULE)).
-
- % will hold bidirectional mapping between id <–> pid
- -record(state, {pid2id, id2pid}).
-
- start_link() ->
- gen_server:start_link({global, ?MODULE}, ?MODULE, [], []).
-
- % sends Msg to anyone logged in as Id
- send(Id, Msg) ->
- gen_server:call(?SERVER, {send, Id, Msg}).
-
- login(Id, Pid) when is_pid(Pid) ->
- gen_server:call(?SERVER, {login, Id, Pid}).
-
- logout(Pid) when is_pid(Pid) ->
- gen_server:call(?SERVER, {logout, Pid}).
-
- %%
-
- init([]) ->
- % set this so we can catch death of logged in pids:
- process_flag(trap_exit, true),
- % use ets for routing tables
- {ok, #state{
- pid2id = ets:new(?MODULE, [bag]),
- id2pid = ets:new(?MODULE, [bag])
- }
- }.
-
- handle_call({login, Id, Pid}, _From, State) when is_pid(Pid) ->
- ets:insert(State#state.pid2id, {Pid, Id}),
- ets:insert(State#state.id2pid, {Id, Pid}),
- link(Pid), % tell us if they exit, so we can log them out
- io:format("~w logged in as ~w\n",[Pid, Id]),
- {reply, ok, State};
-
- handle_call({logout, Pid}, _From, State) when is_pid(Pid) ->
- unlink(Pid),
- PidRows = ets:lookup(State#state.pid2id, Pid),
- case PidRows of
- [] ->
- ok;
- _ ->
- IdRows = [ {I,P} || {P,I} <- PidRows ], % invert tuples
- % delete all pid->id entries
- ets:delete(State#state.pid2id, Pid),
- % and all id->pid
- [ ets:delete_object(State#state.id2pid, Obj) || Obj <- IdRows ]
- end,
- io:format("pid ~w logged out\n",[Pid]),
- {reply, ok, State};
-
- handle_call({send, Id, Msg}, _From, State) ->
- % get pids who are logged in as this Id
- Pids = [ P || { _Id, P } <- ets:lookup(State#state.id2pid, Id) ],
- % send Msg to them all
- M = {router_msg, Msg},
- [ Pid ! M || Pid <- Pids ],
- {reply, ok, State}.
-
- % handle death and cleanup of logged in processes
- handle_info(Info, State) ->
- case Info of
- {‘EXIT’, Pid, _Why} ->
- % force logout:
- handle_call({logout, Pid}, blah, State);
- Wtf ->
- io:format("Caught unhandled message: ~w\n", [Wtf])
- end,
- {noreply, State}.
-
- handle_cast(_Msg, State) ->
- {noreply, State}.
- terminate(_Reason, _State) ->
- ok.
- code_change(_OldVsn, State, _Extra) ->
- {ok, State}.
- 客户端通过http://localhost:8000/test/123连接mochiweb
- Mochiweb应用把该连接的pid和id '123'关联起来,注册在消息路由器
- 假如你把(目标)地址id '123'的消息发送给路由器,该消息将被转发给正确得mochiweb进程,并在那个用户的浏览器中显示。
- -module(mochiconntest_web).
-
- -export([start/1, stop/0, loop/2]).
-
- %% External API
-
- start(Options) ->
- {DocRoot, Options1} = get_option(docroot, Options),
- Loop = fun (Req) ->
- ?MODULE:loop(Req, DocRoot)
- end,
- % we’ll set our maximum to 1 million connections. (default: 2048)
- mochiweb_http:start([{max, 1000000}, {name, ?MODULE}, {loop, Loop} | Options1]).
-
- stop() ->
- mochiweb_http:stop(?MODULE).
-
- loop(Req, DocRoot) ->
- "/" ++ Path = Req:get(path),
- case Req:get(method) of
- Method when Method =:= ‘GET’; Method =:= ‘HEAD’ ->
- case Path of
- "test/" ++ Id ->
- Response = Req:ok({"text/html; charset=utf-8",
- [{"Server","Mochiweb-Test"}],
- chunked}),
- % login using an integer rather than a string
- {IdInt, _} = string:to_integer(Id),
- router:login(IdInt, self()),
- feed(Response, IdInt, 1);
- _ ->
- Req:not_found()
- end;
- ‘POST’ ->
- case Path of
- _ ->
- Req:not_found()
- end;
- _ ->
- Req:respond({501, [], []})
- end.
-
- feed(Response, Id, N) ->
- receive
- {router_msg, Msg} ->
- Html = io_lib:format("Recvd msg #~w: ‘~s’", [N, Msg]),
- Response:write_chunk(Html)
- end,
- feed(Response, Id, N+1).
-
- %% Internal API
-
- get_option(Option, Options) ->
- {proplists:get_value(Option, Options), proplists:delete(Option, Options)}.
- -sname n1 命名erlang节点为'n1'
- +K true 启用kernel-poll,处理大量连接时,似乎没有理由不启用这个选项
- +P 134217727 默认的最大进程数目为32768;考虑到我们每个连接需要一个进程,我建议把它设为可以的最大值。通过'man erl'我们知道134,217,727是最大值。
[]
(n2@localhost)2> net_adm:ping(n1@localhost).
pong
(n2@localhost)3> nodes().
[n1@localhost]
{ok,router}
(n2@localhost)5> router:start_link().
{ok,<0.38.0>}
(n2@localhost)7> router:send(123, "Why not open another browser window too?").
(n2@localhost)8> router:send(456, "This message will go into the void unless you are connected as /test/456 too").
- -module(msggen).
- -export([start/3]).
-
- start(0, _, _) -> ok;
- start(Num, Interval, Max) ->
- Id = random:uniform(Max),
- router:send(Id, "Fake message Num = " ++ Num),
- receive after Interval -> start(Num -1, Interval, Max) end.
它会给1到Max之间的随机的用户Id发送Num条消息,每次发送之间会等待Interval毫秒。
(test@localhost)1> net_adm:ping(n1@localhost).
pong
(test@localhost)2> c(msggen).
{ok,msggen}
(test@localhost)3> msggen:start(20, 10, 5).
ok
<0.103.0>,<0.104.0>,<0.105.0>,<0.106.0>]
<0.101.0> finished.
<0.105.0> finished.
<0.106.0> finished.
<0.104.0> finished.
<0.102.0> finished.
<0.98.0> finished.
<0.99.0> finished.
<0.100.0> finished.
<0.103.0> finished.
<0.97.0> finished.
40156
Stats: {825,0,0}
Stats: {1629,0,0}
Stats: {2397,0,0}
Stats: {3218,0,0}
Stats: {4057,0,0}
Stats: {4837,0,0}
Stats: {5565,0,0}
Stats: {6295,0,0}
Stats: {7022,0,0}
Stats: {7727,0,0}
Stats: {8415,0,0}
Stats: {9116,0,0}
Stats: {9792,0,0}
Stats: {10000,0,0}
...
Mon Oct 20 16:58:25 BST 2008 1224518305 41120 263
Mon Oct 20 16:59:27 BST 2008 1224518367 65252 5267
Mon Oct 20 17:00:32 BST 2008 1224518432 89008 9836
Mon Oct 20 17:01:37 BST 2008 1224518497 90748 10001
Mon Oct 20 17:02:41 BST 2008 1224518561 90964 10001
Mon Oct 20 17:03:46 BST 2008 1224518626 90964 10001
Mon Oct 20 17:04:51 BST 2008 1224518691 90964 10001
erl> [ spawn(fun() -> msggen:start(1000000, 100, 10000) end) || _ <- lists:seq(1,100) ]. [<0.65.0>,<0.66.0>,<0.67.0>,<0.68.0>,<0.69.0>,<0.70.0>,
<0.71.0>,<0.72.0>,<0.73.0>,<0.74.0>,<0.75.0>,<0.76.0>,
<0.77.0>,<0.78.0>,<0.79.0>,<0.80.0>,<0.81.0>,<0.82.0>,
<0.83.0>,<0.84.0>,<0.85.0>,<0.86.0>,<0.87.0>,<0.88.0>,
<0.89.0>,<0.90.0>,<0.91.0>,<0.92.0>,<0.93.0>|...]
100个进程,每个进程以每秒10条的频率随机地给1到1万之间的id发送1百万条消息。这意味着,路由器每秒发送1000条消息,平均每个客户端每10秒接收一条消息。
看一下floodtest的输出,你会看到客户端正在接收http块(记住这是{已连接的连接数,被关闭的连接数,接收到的数据块的数量}):
...
Stats: {10000,0,5912}
Stats: {10000,0,15496}
Stats: {10000,0,25145}
Stats: {10000,0,34755}
Stats: {10000,0,44342}
...
每个进程以每秒10条消息的频率发送1百万条消息,需要27个小时才能完成。
这里是10分钟后的内存使用情况:
Mon Oct 20 16:57:24 BST 2008 1224518244 40388 1
Mon Oct 20 16:58:25 BST 2008 1224518305 41120 263
Mon Oct 20 16:59:27 BST 2008 1224518367 65252 5267
Mon Oct 20 17:00:32 BST 2008 1224518432 89008 9836
Mon Oct 20 17:01:37 BST 2008 1224518497 90748 10001
Mon Oct 20 17:02:41 BST 2008 1224518561 90964 10001
Mon Oct 20 17:03:46 BST 2008 1224518626 90964 10001
Mon Oct 20 17:04:51 BST 2008 1224518691 90964 10001
Mon Oct 20 17:05:55 BST 2008 1224518755 90980 10001
Mon Oct 20 17:07:00 BST 2008 1224518820 91120 10001
Mon Oct 20 17:08:05 BST 2008 1224518885 98664 10001
Mon Oct 20 17:09:10 BST 2008 1224518950 106752 10001
Mon Oct 20 17:10:15 BST 2008 1224519015 114044 10001
Mon Oct 20 17:11:20 BST 2008 1224519080 119468 10001
Mon Oct 20 17:12:25 BST 2008 1224519145 125360 10001
当连上1万个客户端后,内存使用量就从40MB增加到90MB,当运行一段时间后,变为125MB。
值得指出的是,floodtest工具占用了大部分的CPU,msggen使用了2%的CPU,而路由器和mochiweb不到1%(只有客户端模拟程序占用大量的CPU,而服务器程序本身没怎么占用CPU)。这表明我们需要在多台机器或者多核CPU机器上跑测试。
24小时后的结果
(echo -e "set terminal png size 500,300\nset xlabel \"Minutes Elapsed\"\nset ylabel \"Mem (KB)\"\nset title \"Mem usage with 10k active connections, 1000 msg/sec\"\nplot \"-\" using 1:2 with lines notitle" ; awk 'BEGIN{FS="\t";} NR%10==0 {if(!t){t=$2} mins=($2-t)/60; printf("%d %d\n",mins,$3)}' mochimem.log ; echo -e "end" ) | gnuplot > mochimem.png
erlang:system_flag(fullsweep_after, Number)
Number is a non-negative integer which indicates how many times generational garbages collections can be done without forcing a fullsweep collection. The value applies to new processes; processes already running are not affected.
In low-memory systems (especially without virtual memory), setting the value to 0 can help to conserve memory.
An alternative way to set this value is through the (operating system) environment variable ERL_FULLSWEEP_AFTER.
erlang:system_flag(min_heap_size, MinHeapSize)
Sets the default minimum heap size for processes. The size is given in words. The new min_heap_size only effects processes spawned after the change of min_heap_size has been made. The min_heap_size can be set for individual processes by use of spawn_opt/N or process_flag/2.
erlang:hibernate(Module, Function, Args)
Puts the calling process into a wait state where its memory allocation has been reduced as much as possible, which is useful if the process does not expect to receive any messages in the near future.
The process will be awaken when a message is sent to it, and control will resume in Module:Function with the arguments given by Args with the call stack emptied, meaning that the process will terminate when that function returns. Thus erlang:hibernate/3 will never return to its caller.
If the process has any message in its message queue, the process will be awaken immediately in the same way as described above.
In more technical terms, what erlang:hibernate/3 does is the following. It discards the call stack for the process. Then it garbage collects the process. After the garbage collection, all live data is in one continuous heap. The heap is then shrunken to the exact same size as the live data which it holds (even if that size is less than the minimum heap size for the process).
If the size of the live data in the process is less than the minimum heap size, the first garbage collection occurring after the process has been awaken will ensure that the heap size is changed to a size not smaller than the minimum heap size.
Note that emptying the call stack means that any surrounding catch is removed and has to be re-inserted after hibernation. One effect of this is that processes started using proc_lib (also indirectly, such as gen_server processes), should use proc_lib:hibernate/3 instead to ensure that the exception handler continues to work when the process wakes up.
- 最后一行的fee(Response, Id, N)函数将调用hibernate而不是自身
- 登录路由器后立即调用hibernate,而不是调用feed,然后阻塞着接收
- 记住导出feed/3,那样hibernate在醒来后可以回调它
- -module(mochiconntest_web).
-
- -export([start/1, stop/0, loop/2, feed/3]).
-
- %% External API
-
- start(Options) ->
- {DocRoot, Options1} = get_option(docroot, Options),
- Loop = fun (Req) ->
- ?MODULE:loop(Req, DocRoot)
- end,
- % we’ll set our maximum to 1 million connections. (default: 2048)
- mochiweb_http:start([{max, 1000000}, {name, ?MODULE}, {loop, Loop} | Options1]).
-
- stop() ->
- mochiweb_http:stop(?MODULE).
-
- loop(Req, DocRoot) ->
- "/" ++ Path = Req:get(path),
- case Req:get(method) of
- Method when Method =:= ‘GET’; Method =:= ‘HEAD’ ->
- case Path of
- "test/" ++ IdStr ->
- Response = Req:ok({"text/html; charset=utf-8",
- [{"Server","Mochiweb-Test"}],
- chunked}),
- {Id, _} = string:to_integer(IdStr),
- router:login(Id, self()),
- % Hibernate this process until it receives a message:
- proc_lib:hibernate(?MODULE, feed, [Response, Id, 1]);
- _ ->
-
- Req:not_found()
- end;
- ‘POST’ ->
- case Path of
- _ ->
- Req:not_found()
- end;
- _ ->
- Req:respond({501, [], []})
- end.
-
- feed(Response, Id, N) ->
- receive
- {router_msg, Msg} ->
- Html = io_lib:format("Recvd msg #~w: ‘~w’
", [N, Msg]), - Response:write_chunk(Html)
- end,
- % Hibernate this process until it receives a message:
- proc_lib:hibernate(?MODULE, feed, [Response, Id, N+1]).
-
-
- %% Internal API
-
- get_option(Option, Options) ->
- {proplists:get_value(Option, Options), proplists:delete(Option, Options)}.
总结