在第一篇中,我们构建了一个mochiweb comet程序(不太实用),每10秒向客户端发送一条消息。我们调整了Linux内核的若干参数,并且为了测试性能和内存的使用量,写了一个发起大量连接的工具。我们发现,每个连接约占45KB内存。
第二篇,我们将调整程序,使其实用,且能节省内存。
通过login/logout/send API实现一个消息路由器 - 更新mochiweb应用,使其可以从路由器接收消息
- 建立一套分布式的erlang系统,我们可以在不同的节点/主机运行该路由器
- 写个工具,向路由器发送大量消息
- 图形化显示24小时内存实用情况,优化mochiweb应用,使其节省内存开销
这意味着我们正在吧消息发送逻辑从mochiweb应用中剥离出来。通过第一部分的floodtest工具,我们
可以进行近乎产品化得性能测试。
实现消息路由器
路由器API只有3个函数:
注意,通过设计,一个进程可能会以多个不同得Id登陆。
下面的路由器模块使用了2张表存放Pid和Id之间的双向映射关系(#state record中的pid2id和id2pid)。
router.erl:
-
-module(router).
-
-behaviour(gen_server).
-
-
-export([start_link/0]).
-
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
-
terminate/2, code_change/3]).
-
-
-export([send/2, login/2, logout/1]).
-
-
-define(SERVER, global:whereis_name(?MODULE)).
-
-
% will hold bidirectional mapping between id <–> pid
-
-record(state, {pid2id, id2pid}).
-
-
start_link() ->
-
gen_server:start_link({global, ?MODULE}, ?MODULE, [], []).
-
-
% sends Msg to anyone logged in as Id
-
send(Id, Msg) ->
-
gen_server:call(?SERVER, {send, Id, Msg}).
-
-
login(Id, Pid) when is_pid(Pid) ->
-
gen_server:call(?SERVER, {login, Id, Pid}).
-
-
logout(Pid) when is_pid(Pid) ->
-
gen_server:call(?SERVER, {logout, Pid}).
-
-
%%
-
-
init([]) ->
-
% set this so we can catch death of logged in pids:
-
process_flag(trap_exit, true),
-
% use ets for routing tables
-
{ok, #state{
-
pid2id = ets:new(?MODULE, [bag]),
-
id2pid = ets:new(?MODULE, [bag])
-
}
-
}.
-
-
handle_call({login, Id, Pid}, _From, State) when is_pid(Pid) ->
-
ets:insert(State#state.pid2id, {Pid, Id}),
-
ets:insert(State#state.id2pid, {Id, Pid}),
-
link(Pid), % tell us if they exit, so we can log them out
-
io:format("~w logged in as ~w\n",[Pid, Id]),
-
{reply, ok, State};
-
-
handle_call({logout, Pid}, _From, State) when is_pid(Pid) ->
-
unlink(Pid),
-
PidRows = ets:lookup(State#state.pid2id, Pid),
-
case PidRows of
-
[] ->
-
ok;
-
_ ->
-
IdRows = [ {I,P} || {P,I} <- PidRows ], % invert tuples
-
% delete all pid->id entries
-
ets:delete(State#state.pid2id, Pid),
-
% and all id->pid
-
[ ets:delete_object(State#state.id2pid, Obj) || Obj <- IdRows ]
-
end,
-
io:format("pid ~w logged out\n",[Pid]),
-
{reply, ok, State};
-
-
handle_call({send, Id, Msg}, _From, State) ->
-
% get pids who are logged in as this Id
-
Pids = [ P || { _Id, P } <- ets:lookup(State#state.id2pid, Id) ],
-
% send Msg to them all
-
M = {router_msg, Msg},
-
[ Pid ! M || Pid <- Pids ],
-
{reply, ok, State}.
-
-
% handle death and cleanup of logged in processes
-
handle_info(Info, State) ->
-
case Info of
-
{‘EXIT’, Pid, _Why} ->
-
% force logout:
-
handle_call({logout, Pid}, blah, State);
-
Wtf ->
-
io:format("Caught unhandled message: ~w\n", [Wtf])
-
end,
-
{noreply, State}.
-
-
handle_cast(_Msg, State) ->
-
{noreply, State}.
-
terminate(_Reason, _State) ->
-
ok.
-
code_change(_OldVsn, State, _Extra) ->
-
{ok, State}.
更新mochiweb应用
让我们假设每个用户以基于连向mochiweb的URL的整形Id,用这个Id在消息路由器中注册。不是阻塞10秒钟,然后发送消息,而是mochiweb循环阻塞着从路由器接收消息。把路由器发送过来得每条消息包装成http块发送给客户端。
这里是更新后得mochiconntest_web.erl:
-
-module(mochiconntest_web).
-
-
-export([start/1, stop/0, loop/2]).
-
-
%% External API
-
-
start(Options) ->
-
{DocRoot, Options1} = get_option(docroot, Options),
-
Loop = fun (Req) ->
-
?MODULE:loop(Req, DocRoot)
-
end,
-
% we’ll set our maximum to 1 million connections. (default: 2048)
-
mochiweb_http:start([{max, 1000000}, {name, ?MODULE}, {loop, Loop} | Options1]).
-
-
stop() ->
-
mochiweb_http:stop(?MODULE).
-
-
loop(Req, DocRoot) ->
-
"/" ++ Path = Req:get(path),
-
case Req:get(method) of
-
Method when Method =:= ‘GET’; Method =:= ‘HEAD’ ->
-
case Path of
-
"test/" ++ Id ->
-
Response = Req:ok({"text/html; charset=utf-8",
-
[{"Server","Mochiweb-Test"}],
-
chunked}),
-
% login using an integer rather than a string
-
{IdInt, _} = string:to_integer(Id),
-
router:login(IdInt, self()),
-
feed(Response, IdInt, 1);
-
_ ->
-
Req:not_found()
-
end;
-
‘POST’ ->
-
case Path of
-
_ ->
-
Req:not_found()
-
end;
-
_ ->
-
Req:respond({501, [], []})
-
end.
-
-
feed(Response, Id, N) ->
-
receive
-
{router_msg, Msg} ->
-
Html = io_lib:format("Recvd msg #~w: ‘~s’", [N, Msg]),
-
Response:write_chunk(Html)
-
end,
-
feed(Response, Id, N+1).
-
-
%% Internal API
-
-
get_option(Option, Options) ->
-
{proplists:get_value(Option, Options), proplists:delete(Option, Options)}.
运行起来吧
现在让它们跑起来吧,我们将使用两个erlang shell,一个是mochiweb,另一个是路由器。编辑用来启动mochiweb的脚本start-dev.sh,给erl添加下列额外的参数:
-
-sname n1 命名erlang节点为'n1'
-
+K true 启用kernel-poll,处理大量连接时,似乎没有理由不启用这个选项
-
+P 134217727 默认的最大进程数目为32768;考虑到我们每个连接需要一个进程,我建议把它设为可以的最大值。通过'man erl'我们知道134,217,727是最大值。
现在运行make && ./start-dev.sh,你会看到这样的提示:(
n1@localhost)1>,你得mochiweb程序正在运行,并且这个erlang节点有个名字。
现在运行另一个erlang shell:erl -sname n2
目前,那两个erlang实例彼此不知道对方,修正它:
(n2@localhost)1> nodes().
[]
(n2@localhost)2> net_adm:ping(n1@localhost).
pong
(n2@localhost)3> nodes().
[n1@localhost]
现在从这个shell中编译、启动路由器:
(n2@localhost)4> c(router).
{ok,router}
(n2@localhost)5> router:start_link().
{ok,<0.38.0>}
你现在可以向路由器发送消息,并能在你的浏览器中看到。目前为止,只可以发送字符串,因为在feed函数中我们使用io_lib:format的~s格式化它们,atoms会使程序崩溃。
n2@localhost)6> router:send(123, "Hello World").
(n2@localhost)7> router:send(123, "Why not open another browser window too?").
(n2@localhost)8> router:send(456, "This message will go into the void unless you are connected as /test/456 too").
看一下你的浏览器,comet程序已经正常运行。
运行在分布式erlang系统中
在不同机器上运行路由器和mochiweb前端是有意义的。假定你有几台闲置的电脑进行测试,你应该以分布式节点启动erlang shell。比如使用-name
n1@host1.example.com替代-sname n1(同样对于n2)。通过net_adm:ping(...)确保它们互相可见。
注意在router.erl的16行,路由器进程的名字('router')是被全局注册的,因为在gen_server中我们使用了下列宏来识别/定位路由器,这已经适用于分布式系统:
-define(SERVER, global:whereis_name(?MODULE)).
在分布式系统中,进程的全局命名注册键是erlang天生由来的。
产生大量消息
在真实环境中,我们可能会看到长尾一样的使用模式,少些活跃用户加上大量的非活跃用户。然而,在这个测试用,我们不分青红皂白地给随机用户发送测试消息。
mesgen.erl:
-
-module(msggen).
-
-export([start/3]).
-
-
start(0, _, _) -> ok;
-
start(Num, Interval, Max) ->
-
Id = random:uniform(Max),
-
router:send(Id, "Fake message Num = " ++ Num),
-
receive after Interval -> start(Num -1, Interval, Max) end.
它会给1到Max之间的随机的用户Id发送Num条消息,每次发送之间会等待Interval毫秒。
erl -sname test
(test@localhost)1> net_adm:ping(n1@localhost).
pong
(test@localhost)2> c(msggen).
{ok,msggen}
(test@localhost)3> msggen:start(20, 10, 5).
ok
它给1-5之间的随机Id发送20条消息,每条消息之间有10毫秒的等待。
我们甚至可以并行地运行它们,模拟多个消息源。下面的例子会产生10个进程,给1-5之间的ids发送20条消息,每条消息之间等待100毫秒。
[ spawn(fun() -> msggen:start(20, 100, 5), io:format("~w finished.\n", [self()]) end) || _ <- lists:seq(1,10) ]. [<0.97.0>,<0.98.0>,<0.99.0>,<0.100.0>,<0.101.0>,<0.102.0>,
<0.103.0>,<0.104.0>,<0.105.0>,<0.106.0>]
<0.101.0> finished.
<0.105.0> finished.
<0.106.0> finished.
<0.104.0> finished.
<0.102.0> finished.
<0.98.0> finished.
<0.99.0> finished.
<0.100.0> finished.
<0.103.0> finished.
<0.97.0> finished.
再次CK10
我想我们需要进一步大规模的测试,客户端连接mochiweb程序,并被注册到消息路由器中。我们可以在路由器上产生大量测试消息,发往任何注册的客户端。让我们再测一遍1万个并发用户的情况,但是这一次我们让客户端多连一会。
假定你按照第一篇中的指令对内核进行了调节,增加最大文件限制等等,这很容易的。你已经运行了mochiweb程序和路由器。接着,让我们记录所有的通讯。
在没有客户端连接的情况下,mochiweb beam进程使用大约40MB内存(常驻的):
$ ps -o rss= -p `pgrep -f 'sname n1'`
40156
我写一行可怕的代码,每隔60秒打印一次:时间戳、mochiweb当前内存实用情况(KB单位)以及当前的连接数。在一个空闲的终端跑这段脚本:
$ MOCHIPID=`pgrep -f 'name n1'`; while [ 1 ] ; do NUMCON=`netstat -n | awk ‘/ESTABLISHED/ && $4==”127.0.0.1:8000″‘ | wc -l`; MEM=`ps -o rss= -p $MOCHIPID`; echo -e “`date`\t`date +%s`\t$MEM\t$NUMCON”; sleep 60; done | tee -a mochimem.log
现在在新的erl shell中启动第一篇中的floodtest工具:
erl> floodtest:start("/tmp/mochi-urls.txt", 10).
它将每秒建立10条连接的频率建立1万条连接。
你会看到,很快到达1万条连接:
erl> floodtest:start("/tmp/mochi-urls.txt", 10).
Stats: {825,0,0}
Stats: {1629,0,0}
Stats: {2397,0,0}
Stats: {3218,0,0}
Stats: {4057,0,0}
Stats: {4837,0,0}
Stats: {5565,0,0}
Stats: {6295,0,0}
Stats: {7022,0,0}
Stats: {7727,0,0}
Stats: {8415,0,0}
Stats: {9116,0,0}
Stats: {9792,0,0}
Stats: {10000,0,0}
...
请看那行脚本的输出:
Mon Oct 20 16:57:24 BST 2008 1224518244 40388 1
Mon Oct 20 16:58:25 BST 2008 1224518305 41120 263
Mon Oct 20 16:59:27 BST 2008 1224518367 65252 5267
Mon Oct 20 17:00:32 BST 2008 1224518432 89008 9836
Mon Oct 20 17:01:37 BST 2008 1224518497 90748 10001
Mon Oct 20 17:02:41 BST 2008 1224518561 90964 10001
Mon Oct 20 17:03:46 BST 2008 1224518626 90964 10001
Mon Oct 20 17:04:51 BST 2008 1224518691 90964 10001
1万条连接(加上我在firefox中打开的一条连接),大约占用90MB(90964KB)。
现在开始发送消息:
erl> [ spawn(fun() -> msggen:start(1000000, 100, 10000) end) || _ <- lists:seq(1,100) ]. [<0.65.0>,<0.66.0>,<0.67.0>,<0.68.0>,<0.69.0>,<0.70.0>,
<0.71.0>,<0.72.0>,<0.73.0>,<0.74.0>,<0.75.0>,<0.76.0>,
<0.77.0>,<0.78.0>,<0.79.0>,<0.80.0>,<0.81.0>,<0.82.0>,
<0.83.0>,<0.84.0>,<0.85.0>,<0.86.0>,<0.87.0>,<0.88.0>,
<0.89.0>,<0.90.0>,<0.91.0>,<0.92.0>,<0.93.0>|...]
100个进程,每个进程以每秒10条的频率随机地给1到1万之间的id发送1百万条消息。这意味着,路由器每秒发送1000条消息,平均每个客户端每10秒接收一条消息。
看一下floodtest的输出,你会看到客户端正在接收http块(记住这是{已连接的连接数,被关闭的连接数,接收到的数据块的数量}):
...
Stats: {10000,0,5912}
Stats: {10000,0,15496}
Stats: {10000,0,25145}
Stats: {10000,0,34755}
Stats: {10000,0,44342}
...
每个进程以每秒10条消息的频率发送1百万条消息,需要27个小时才能完成。
这里是10分钟后的内存使用情况:
Mon Oct 20 16:57:24 BST 2008 1224518244 40388 1
Mon Oct 20 16:58:25 BST 2008 1224518305 41120 263
Mon Oct 20 16:59:27 BST 2008 1224518367 65252 5267
Mon Oct 20 17:00:32 BST 2008 1224518432 89008 9836
Mon Oct 20 17:01:37 BST 2008 1224518497 90748 10001
Mon Oct 20 17:02:41 BST 2008 1224518561 90964 10001
Mon Oct 20 17:03:46 BST 2008 1224518626 90964 10001
Mon Oct 20 17:04:51 BST 2008 1224518691 90964 10001
Mon Oct 20 17:05:55 BST 2008 1224518755 90980 10001
Mon Oct 20 17:07:00 BST 2008 1224518820 91120 10001
Mon Oct 20 17:08:05 BST 2008 1224518885 98664 10001
Mon Oct 20 17:09:10 BST 2008 1224518950 106752 10001
Mon Oct 20 17:10:15 BST 2008 1224519015 114044 10001
Mon Oct 20 17:11:20 BST 2008 1224519080 119468 10001
Mon Oct 20 17:12:25 BST 2008 1224519145 125360 10001
当连上1万个客户端后,内存使用量就从40MB增加到90MB,当运行一段时间后,变为125MB。
值得指出的是,floodtest工具占用了大部分的CPU,msggen使用了2%的CPU,而路由器和mochiweb不到1%(只有客户端模拟程序占用大量的CPU,而服务器程序本身没怎么占用CPU)。这表明我们需要在多台机器或者多核CPU机器上跑测试。
24小时后的结果
这个测试跑了24个小时,同时把内存使用情况记录在mochimem.log文件中。这是1万个客户端,每秒1000条消息随机发送给客户端。
下面的bash/awk脚本调用gnuplot把mochimem.log文件中的数据图形化:
(echo -e "set terminal png size 500,300\nset xlabel \"Minutes Elapsed\"\nset ylabel \"Mem (KB)\"\nset title \"Mem usage with 10k active connections, 1000 msg/sec\"\nplot \"-\" using 1:2 with lines notitle" ; awk 'BEGIN{FS="\t";} NR%10==0 {if(!t){t=$2} mins=($2-t)/60; printf("%d %d\n",mins,$3)}' mochimem.log ; echo -e "end" ) | gnuplot > mochimem.png
图片展示了内存使用情况,24小时内基本上稳定在250MB。两个大的落差,一个是在开始的时候,另一个是在结束的时候,是因为我在mochiweb erlang进程中运行了:
erl> [erlang:garbage_collect(P) || P <- erlang:processes()].
这强制所有进程垃圾回收,释放了将近100MB的内存。接下来,我们一起研究如何节省内存开销。
在mochiweb中降低内存开销
mochiweb程序只是发送消息,并不会保存。内存使用量不会因为发送的消息数量增多而增多。
对与erlang的内存管理,我是新手。但我觉得时常地垃圾回收,会降低一部分的内存开销,最终让我们用最少的内存服务更多的用户。我们可能会牺牲一部分的CPU,但这是可以接受的。
erlang:system_flag(fullsweep_after, Number)
Number is a non-negative integer which indicates how many times generational garbages collections can be done without forcing a fullsweep collection. The value applies to new processes; processes already running are not affected.
In low-memory systems (especially without virtual memory), setting the value to 0 can help to conserve memory.
An alternative way to set this value is through the (operating system) environment variable ERL_FULLSWEEP_AFTER.
看起来不错,不过它只适用于新的进程,并且会影响到VM中的所有进程,而不是只有我们的mochiweb进程。
接着:
erlang:system_flag(min_heap_size, MinHeapSize)
Sets the default minimum heap size for processes. The size is given in words. The new min_heap_size only effects processes spawned after the change of min_heap_size has been made. The min_heap_size can be set for individual processes by use of spawn_opt/N or process_flag/2.
可能会有用,不过我确信mochiweb进程需要一个更大的堆。如果可以,我会尽可能避免修改mochiweb的源代码,增加spawn选项。
接着,这个映入我的眼帘:
erlang:hibernate(Module, Function, Args)
Puts the calling process into a wait state where its memory allocation has been reduced as much as possible, which is useful if the process does not expect to receive any messages in the near future.
The process will be awaken when a message is sent to it, and control will resume in Module:Function with the arguments given by Args with the call stack emptied, meaning that the process will terminate when that function returns. Thus erlang:hibernate/3 will never return to its caller.
If the process has any message in its message queue, the process will be awaken immediately in the same way as described above.
In more technical terms, what erlang:hibernate/3 does is the following. It discards the call stack for the process. Then it garbage collects the process. After the garbage collection, all live data is in one continuous heap. The heap is then shrunken to the exact same size as the live data which it holds (even if that size is less than the minimum heap size for the process).
If the size of the live data in the process is less than the minimum heap size, the first garbage collection occurring after the process has been awaken will ensure that the heap size is changed to a size not smaller than the minimum heap size.
Note that emptying the call stack means that any surrounding catch is removed and has to be re-inserted after hibernation. One effect of this is that processes started using proc_lib (also indirectly, such as gen_server processes), should use proc_lib:hibernate/3 instead to ensure that the exception handler continues to work when the process wakes up.
看上去是有道理的,每条消息后进行睡眠,看看会发生什么变化。
编辑mochiconntest_web.erl,进行如下更改:
-
最后一行的fee(Response, Id, N)函数将调用hibernate而不是自身
-
登录路由器后立即调用hibernate,而不是调用feed,然后阻塞着接收
-
记住导出feed/3,那样hibernate在醒来后可以回调它
更新mochiconntest_web.erl:
-
-module(mochiconntest_web).
-
-
-export([start/1, stop/0, loop/2, feed/3]).
-
-
%% External API
-
-
start(Options) ->
-
{DocRoot, Options1} = get_option(docroot, Options),
-
Loop = fun (Req) ->
-
?MODULE:loop(Req, DocRoot)
-
end,
-
% we’ll set our maximum to 1 million connections. (default: 2048)
-
mochiweb_http:start([{max, 1000000}, {name, ?MODULE}, {loop, Loop} | Options1]).
-
-
stop() ->
-
mochiweb_http:stop(?MODULE).
-
-
loop(Req, DocRoot) ->
-
"/" ++ Path = Req:get(path),
-
case Req:get(method) of
-
Method when Method =:= ‘GET’; Method =:= ‘HEAD’ ->
-
case Path of
-
"test/" ++ IdStr ->
-
Response = Req:ok({"text/html; charset=utf-8",
-
[{"Server","Mochiweb-Test"}],
-
chunked}),
-
{Id, _} = string:to_integer(IdStr),
-
router:login(Id, self()),
-
% Hibernate this process until it receives a message:
-
proc_lib:hibernate(?MODULE, feed, [Response, Id, 1]);
-
_ ->
-
-
Req:not_found()
-
end;
-
‘POST’ ->
-
case Path of
-
_ ->
-
Req:not_found()
-
end;
-
_ ->
-
Req:respond({501, [], []})
-
end.
-
-
feed(Response, Id, N) ->
-
receive
-
{router_msg, Msg} ->
-
Html = io_lib:format("Recvd msg #~w: ‘~w’
", [N, Msg]),
-
Response:write_chunk(Html)
-
end,
-
% Hibernate this process until it receives a message:
-
proc_lib:hibernate(?MODULE, feed, [Response, Id, N+1]).
-
-
-
%% Internal API
-
-
get_option(Option, Options) ->
-
{proplists:get_value(Option, Options), proplists:delete(Option, Options)}.
更改后,重新编译mochiweb,然后重新进行相同的c10k测试。
使用proc_lib:hibernate()后的结果
使用hibernate后,内存使用量仅为78MB,明显好于第一篇中的450MB。CPU使用量没有明显提高。
总结
我们基于mochiweb实现了comet程序,让我们可以通过ID任意地给客户端发送消息。经过24小时不间断地1000 msg/秒发送消息,1万个连接,使用了80MB内存,也就是每条连接占用8KB。我们还制作了精美的图表。
对于第一篇中的每条连接45KB来说,这是一个进步,而这一切归功于我们以一种更真实的方式进行测试,并且在消息之间使用了hibernate。
下一步
在第三篇中(不久发布),我将进行1百万连接的测试,测试程序部署在多cpu的64位服务器上,有足够的内存。
看看有什么区别。我也会列出一些窍门,用来模拟1百万个客户端。
程序将以一种pub-sub系统呈现,订阅将和用户id有关,并被程序保存,而不是由客户端在连接的时候提供。我们将加载典型的社交网数据:朋友。这将允许用户用他们的id登录,并能自动接受朋友们的事件。