在第一篇中,我们构建了一个mochiweb comet程序(不太实用),每10秒向客户端发送一条消息。我们调整了Linux内核的若干参数,并且为了测试性能和内存的使用量,写了一个发起大量连接的工具。我们发现,每个连接约占45KB内存。

  • 通过login/logout/send API实现一个消息路由器
  • 更新mochiweb应用,使其可以从路由器接收消息
  • 建立一套分布式的erlang系统,我们可以在不同的节点/主机运行该路由器
  • 写个工具,向路由器发送大量消息
  • 图形化显示24小时内存实用情况,优化mochiweb应用,使其节省内存开销


  • login(Id, Pid) 注册一个进程(进程Id为Pid),为Id接收消息
  • logout(Pid)停止接收消息
  • send(Id, Msg)向任何以Id登陆的客户端发送消息Msg
下面的路由器模块使用了2张表存放Pid和Id之间的双向映射关系(#state record中的pid2id和id2pid)。
  1. -module(router).
  2. -behaviour(gen_server).

  3. -export([start_link/0]).
  4. -export([init/1, handle_call/3, handle_cast/2, handle_info/2,
  5. terminate/2, code_change/3]).

  6. -export([send/2, login/2, logout/1]).

  7. -define(SERVER, global:whereis_name(?MODULE)).

  8. % will hold bidirectional mapping between id <–> pid
  9. -record(state, {pid2id, id2pid}).

  10. start_link() ->
  11. gen_server:start_link({global, ?MODULE}, ?MODULE, [], []).

  12. % sends Msg to anyone logged in as Id
  13. send(Id, Msg) ->
  14. gen_server:call(?SERVER, {send, Id, Msg}).

  15. login(Id, Pid) when is_pid(Pid) ->
  16. gen_server:call(?SERVER, {login, Id, Pid}).

  17. logout(Pid) when is_pid(Pid) ->
  18. gen_server:call(?SERVER, {logout, Pid}).

  19. %%

  20. init([]) ->
  21. % set this so we can catch death of logged in pids:
  22. process_flag(trap_exit, true),
  23. % use ets for routing tables
  24. {ok, #state{
  25. pid2id = ets:new(?MODULE, [bag]),
  26. id2pid = ets:new(?MODULE, [bag])
  27. }
  28. }.

  29. handle_call({login, Id, Pid}, _From, State) when is_pid(Pid) ->
  30. ets:insert(State#state.pid2id, {Pid, Id}),
  31. ets:insert(State#state.id2pid, {Id, Pid}),
  32. link(Pid), % tell us if they exit, so we can log them out
  33. io:format("~w logged in as ~w\n",[Pid, Id]),
  34. {reply, ok, State};

  35. handle_call({logout, Pid}, _From, State) when is_pid(Pid) ->
  36. unlink(Pid),
  37. PidRows = ets:lookup(State#state.pid2id, Pid),
  38. case PidRows of
  39. [] ->
  40. ok;
  41. _ ->
  42. IdRows = [ {I,P} || {P,I} <- PidRows ], % invert tuples
  43. % delete all pid->id entries
  44. ets:delete(State#state.pid2id, Pid),
  45. % and all id->pid
  46. [ ets:delete_object(State#state.id2pid, Obj) || Obj <- IdRows ]
  47. end,
  48. io:format("pid ~w logged out\n",[Pid]),
  49. {reply, ok, State};

  50. handle_call({send, Id, Msg}, _From, State) ->
  51. % get pids who are logged in as this Id
  52. Pids = [ P || { _Id, P } <- ets:lookup(State#state.id2pid, Id) ],
  53. % send Msg to them all
  54. M = {router_msg, Msg},
  55. [ Pid ! M || Pid <- Pids ],
  56. {reply, ok, State}.

  57. % handle death and cleanup of logged in processes
  58. handle_info(Info, State) ->
  59. case Info of
  60. {‘EXIT’, Pid, _Why} ->
  61. % force logout:
  62. handle_call({logout, Pid}, blah, State);
  63. Wtf ->
  64. io:format("Caught unhandled message: ~w\n", [Wtf])
  65. end,
  66. {noreply, State}.

  67. handle_cast(_Msg, State) ->
  68. {noreply, State}.
  69. terminate(_Reason, _State) ->
  70. ok.
  71. code_change(_OldVsn, State, _Extra) ->
  72. {ok, State}.

  • 客户端通过http://localhost:8000/test/123连接mochiweb
  • Mochiweb应用把该连接的pid和id '123'关联起来,注册在消息路由器
  • 假如你把(目标)地址id '123'的消息发送给路由器,该消息将被转发给正确得mochiweb进程,并在那个用户的浏览器中显示。

  1. -module(mochiconntest_web).

  2. -export([start/1, stop/0, loop/2]).

  3. %% External API

  4. start(Options) ->
  5. {DocRoot, Options1} = get_option(docroot, Options),
  6. Loop = fun (Req) ->
  7. ?MODULE:loop(Req, DocRoot)
  8. end,
  9. % we’ll set our maximum to 1 million connections. (default: 2048)
  10. mochiweb_http:start([{max, 1000000}, {name, ?MODULE}, {loop, Loop} | Options1]).

  11. stop() ->
  12. mochiweb_http:stop(?MODULE).

  13. loop(Req, DocRoot) ->
  14. "/" ++ Path = Req:get(path),
  15. case Req:get(method) of
  16. Method when Method =:= ‘GET’; Method =:= ‘HEAD’ ->
  17. case Path of
  18. "test/" ++ Id ->
  19. Response = Req:ok({"text/html; charset=utf-8",
  20. [{"Server","Mochiweb-Test"}],
  21. chunked}),
  22. % login using an integer rather than a string
  23. {IdInt, _} = string:to_integer(Id),
  24. router:login(IdInt, self()),
  25. feed(Response, IdInt, 1);
  26. _ ->
  27. Req:not_found()
  28. end;
  29. ‘POST’ ->
  30. case Path of
  31. _ ->
  32. Req:not_found()
  33. end;
  34. _ ->
  35. Req:respond({501, [], []})
  36. end.

  37. feed(Response, Id, N) ->
  38. receive
  39. {router_msg, Msg} ->
  40. Html = io_lib:format("Recvd msg #~w: ‘~s’", [N, Msg]),
  41. Response:write_chunk(Html)
  42. end,
  43. feed(Response, Id, N+1).

  44. %% Internal API

  45. get_option(Option, Options) ->
  46. {proplists:get_value(Option, Options), proplists:delete(Option, Options)}.


现在让它们跑起来吧,我们将使用两个erlang shell,一个是mochiweb,另一个是路由器。编辑用来启动mochiweb的脚本,给erl添加下列额外的参数:
  • -sname n1 命名erlang节点为'n1'
  • +K true 启用kernel-poll,处理大量连接时,似乎没有理由不启用这个选项
  • +P 134217727 默认的最大进程数目为32768;考虑到我们每个连接需要一个进程,我建议把它设为可以的最大值。通过'man erl'我们知道134,217,727是最大值。
现在运行make && ./,你会看到这样的提示:(n1@localhost)1>,你得mochiweb程序正在运行,并且这个erlang节点有个名字。
现在运行另一个erlang shell:erl -sname n2
(n2@localhost)1> nodes().
(n2@localhost)2> net_adm:ping(n1@localhost).
(n2@localhost)3> nodes().
(n2@localhost)4> c(router).
(n2@localhost)5> router:start_link().
现在用你的浏览器访问http://localhost:8000/test/123(或者在控制台用lynx --source http://localhost:8000/test/123)。检查运行路由器得那个shell,你会看到有一位用户登录。
n2@localhost)6> router:send(123, "Hello World").
(n2@localhost)7> router:send(123, "Why not open another browser window too?").
(n2@localhost)8> router:send(456, "This message will go into the void unless you are connected as /test/456 too").


在不同机器上运行路由器和mochiweb前端是有意义的。假定你有几台闲置的电脑进行测试,你应该以分布式节点启动erlang shell。比如使用-name n1@host1.example.com替代-sname n1(同样对于n2)。通过net_adm:ping(...)确保它们互相可见。
-define(SERVER, global:whereis_name(?MODULE)).



  1. -module(msggen).
  2. -export([start/3]).

  3. start(0, _, _) -> ok;
  4. start(Num, Interval, Max) ->
  5. Id = random:uniform(Max),
  6. router:send(Id, "Fake message Num = " ++ Num),
  7. receive after Interval -> start(Num -1, Interval, Max) end.

erl -sname test
(test@localhost)1> net_adm:ping(n1@localhost).
(test@localhost)2> c(msggen).
(test@localhost)3> msggen:start(20, 10, 5).
[ spawn(fun() -> msggen:start(20, 100, 5), io:format("~w finished.\n", [self()]) end) || _ <- lists:seq(1,10) ]. [<0.97.0>,<0.98.0>,<0.99.0>,<0.100.0>,<0.101.0>,<0.102.0>,
<0.101.0> finished.
<0.105.0> finished.
<0.106.0> finished.
<0.104.0> finished.
<0.102.0> finished.
<0.98.0> finished.
<0.99.0> finished.
<0.100.0> finished.
<0.103.0> finished.
<0.97.0> finished.


在没有客户端连接的情况下,mochiweb beam进程使用大约40MB内存(常驻的):
$ ps -o rss= -p `pgrep -f 'sname n1'`
$ MOCHIPID=`pgrep -f 'name n1'`; while [ 1 ] ; do NUMCON=`netstat -n | awk ‘/ESTABLISHED/ && $4==”″‘ | wc -l`; MEM=`ps -o rss= -p $MOCHIPID`; echo -e “`date`\t`date +%s`\t$MEM\t$NUMCON”; sleep 60; done | tee -a mochimem.log
现在在新的erl shell中启动第一篇中的floodtest工具:
erl> floodtest:start("/tmp/mochi-urls.txt", 10).
erl> floodtest:start("/tmp/mochi-urls.txt", 10).
Stats: {825,0,0}
Stats: {1629,0,0}
Stats: {2397,0,0}
Stats: {3218,0,0}
Stats: {4057,0,0}
Stats: {4837,0,0}
Stats: {5565,0,0}
Stats: {6295,0,0}
Stats: {7022,0,0}
Stats: {7727,0,0}
Stats: {8415,0,0}
Stats: {9116,0,0}
Stats: {9792,0,0}
Stats: {10000,0,0}
Mon Oct 20 16:57:24 BST 2008 1224518244 40388 1
Mon Oct 20 16:58:25 BST 2008 1224518305 41120 263
Mon Oct 20 16:59:27 BST 2008 1224518367 65252 5267
Mon Oct 20 17:00:32 BST 2008 1224518432 89008 9836
Mon Oct 20 17:01:37 BST 2008 1224518497 90748 10001
Mon Oct 20 17:02:41 BST 2008 1224518561 90964 10001
Mon Oct 20 17:03:46 BST 2008 1224518626 90964 10001
Mon Oct 20 17:04:51 BST 2008 1224518691 90964 10001

erl> [ spawn(fun() -> msggen:start(1000000, 100, 10000) end) || _ <- lists:seq(1,100) ]. [<0.65.0>,<0.66.0>,<0.67.0>,<0.68.0>,<0.69.0>,<0.70.0>,



Stats: {10000,0,5912}
Stats: {10000,0,15496}
Stats: {10000,0,25145}
Stats: {10000,0,34755}
Stats: {10000,0,44342}



(echo -e "set terminal png size 500,300\nset xlabel \"Minutes Elapsed\"\nset ylabel \"Mem (KB)\"\nset title \"Mem usage with 10k active connections, 1000 msg/sec\"\nplot \"-\" using 1:2 with lines notitle" ; awk 'BEGIN{FS="\t";} NR%10==0 {if(!t){t=$2} mins=($2-t)/60; printf("%d %d\n",mins,$3)}' mochimem.log ; echo -e "end" ) | gnuplot > mochimem.png

图片展示了内存使用情况,24小时内基本上稳定在250MB。两个大的落差,一个是在开始的时候,另一个是在结束的时候,是因为我在mochiweb erlang进程中运行了:
erl> [erlang:garbage_collect(P) || P <- erlang:processes()].


深入了erlang文档 ,发现如下的选项:

erlang:system_flag(fullsweep_after, Number)

Number is a non-negative integer which indicates how many times generational garbages collections can be done without forcing a fullsweep collection. The value applies to new processes; processes already running are not affected.
In low-memory systems (especially without virtual memory), setting the value to 0 can help to conserve memory.
An alternative way to set this value is through the (operating system) environment variable ERL_FULLSWEEP_AFTER.


erlang:system_flag(min_heap_size, MinHeapSize)

Sets the default minimum heap size for processes. The size is given in words. The new min_heap_size only effects processes spawned after the change of min_heap_size has been made. The min_heap_size can be set for individual processes by use of spawn_opt/N or process_flag/2.


erlang:hibernate(Module, Function, Args)

Puts the calling process into a wait state where its memory allocation has been reduced as much as possible, which is useful if the process does not expect to receive any messages in the near future.

The process will be awaken when a message is sent to it, and control will resume in Module:Function with the arguments given by Args with the call stack emptied, meaning that the process will terminate when that function returns. Thus erlang:hibernate/3 will never return to its caller.

If the process has any message in its message queue, the process will be awaken immediately in the same way as described above.

In more technical terms, what erlang:hibernate/3 does is the following. It discards the call stack for the process. Then it garbage collects the process. After the garbage collection, all live data is in one continuous heap. The heap is then shrunken to the exact same size as the live data which it holds (even if that size is less than the minimum heap size for the process).

If the size of the live data in the process is less than the minimum heap size, the first garbage collection occurring after the process has been awaken will ensure that the heap size is changed to a size not smaller than the minimum heap size.

Note that emptying the call stack means that any surrounding catch is removed and has to be re-inserted after hibernation. One effect of this is that processes started using proc_lib (also indirectly, such as gen_server processes), should use proc_lib:hibernate/3 instead to ensure that the exception handler continues to work when the process wakes up.

  • 最后一行的fee(Response, Id, N)函数将调用hibernate而不是自身
  • 登录路由器后立即调用hibernate,而不是调用feed,然后阻塞着接收
  • 记住导出feed/3,那样hibernate在醒来后可以回调它
  1. -module(mochiconntest_web).

  2. -export([start/1, stop/0, loop/2, feed/3]).

  3. %% External API

  4. start(Options) ->
  5. {DocRoot, Options1} = get_option(docroot, Options),
  6. Loop = fun (Req) ->
  7. ?MODULE:loop(Req, DocRoot)
  8. end,
  9. % we’ll set our maximum to 1 million connections. (default: 2048)
  10. mochiweb_http:start([{max, 1000000}, {name, ?MODULE}, {loop, Loop} | Options1]).

  11. stop() ->
  12. mochiweb_http:stop(?MODULE).

  13. loop(Req, DocRoot) ->
  14. "/" ++ Path = Req:get(path),
  15. case Req:get(method) of
  16. Method when Method =:= ‘GET’; Method =:= ‘HEAD’ ->
  17. case Path of
  18. "test/" ++ IdStr ->
  19. Response = Req:ok({"text/html; charset=utf-8",
  20. [{"Server","Mochiweb-Test"}],
  21. chunked}),
  22. {Id, _} = string:to_integer(IdStr),
  23. router:login(Id, self()),
  24. % Hibernate this process until it receives a message:
  25. proc_lib:hibernate(?MODULE, feed, [Response, Id, 1]);
  26. _ ->

  27. Req:not_found()
  28. end;
  29. ‘POST’ ->
  30. case Path of
  31. _ ->
  32. Req:not_found()
  33. end;
  34. _ ->
  35. Req:respond({501, [], []})
  36. end.

  37. feed(Response, Id, N) ->
  38. receive
  39. {router_msg, Msg} ->
  40. Html = io_lib:format("Recvd msg #~w: ‘~w’
    , [N, Msg]),
  41. Response:write_chunk(Html)
  42. end,
  43. % Hibernate this process until it receives a message:
  44. proc_lib:hibernate(?MODULE, feed, [Response, Id, N+1]).

  45. %% Internal API

  46. get_option(Option, Options) ->
  47. {proplists:get_value(Option, Options), proplists:delete(Option, Options)}.


我们基于mochiweb实现了comet程序,让我们可以通过ID任意地给客户端发送消息。经过24小时不间断地1000 msg/秒发送消息,1万个连接,使用了80MB内存,也就是每条连接占用8KB。我们还制作了精美的图表。



