Wednesday, October 29, 2008

[译]使用Mochiweb构建百万级Comet程序,第二篇

在第一篇中,我们构建了一个mochiweb comet程序(不太实用),每10秒向客户端发送一条消息。我们调整了Linux内核的若干参数,并且为了测试性能和内存的使用量,写了一个发起大量连接的工具。我们发现,每个连接约占45KB内存。
第二篇,我们将调整程序,使其实用,且能节省内存。

  • 通过login/logout/send API实现一个消息路由器
  • 更新mochiweb应用,使其可以从路由器接收消息
  • 建立一套分布式的erlang系统,我们可以在不同的节点/主机运行该路由器
  • 写个工具,向路由器发送大量消息
  • 图形化显示24小时内存实用情况,优化mochiweb应用,使其节省内存开销
这意味着我们正在吧消息发送逻辑从mochiweb应用中剥离出来。通过第一部分的floodtest工具,我们
可以进行近乎产品化得性能测试。

实现消息路由器

路由器API只有3个函数:
  • login(Id, Pid) 注册一个进程(进程Id为Pid),为Id接收消息
  • logout(Pid)停止接收消息
  • send(Id, Msg)向任何以Id登陆的客户端发送消息Msg
注意,通过设计,一个进程可能会以多个不同得Id登陆。
下面的路由器模块使用了2张表存放Pid和Id之间的双向映射关系(#state record中的pid2id和id2pid)。
router.erl:
  1. -module(router).
  2. -behaviour(gen_server).

  3. -export([start_link/0]).
  4. -export([init/1, handle_call/3, handle_cast/2, handle_info/2,
  5. terminate/2, code_change/3]).

  6. -export([send/2, login/2, logout/1]).

  7. -define(SERVER, global:whereis_name(?MODULE)).

  8. % will hold bidirectional mapping between id <–> pid
  9. -record(state, {pid2id, id2pid}).

  10. start_link() ->
  11. gen_server:start_link({global, ?MODULE}, ?MODULE, [], []).

  12. % sends Msg to anyone logged in as Id
  13. send(Id, Msg) ->
  14. gen_server:call(?SERVER, {send, Id, Msg}).

  15. login(Id, Pid) when is_pid(Pid) ->
  16. gen_server:call(?SERVER, {login, Id, Pid}).

  17. logout(Pid) when is_pid(Pid) ->
  18. gen_server:call(?SERVER, {logout, Pid}).

  19. %%

  20. init([]) ->
  21. % set this so we can catch death of logged in pids:
  22. process_flag(trap_exit, true),
  23. % use ets for routing tables
  24. {ok, #state{
  25. pid2id = ets:new(?MODULE, [bag]),
  26. id2pid = ets:new(?MODULE, [bag])
  27. }
  28. }.

  29. handle_call({login, Id, Pid}, _From, State) when is_pid(Pid) ->
  30. ets:insert(State#state.pid2id, {Pid, Id}),
  31. ets:insert(State#state.id2pid, {Id, Pid}),
  32. link(Pid), % tell us if they exit, so we can log them out
  33. io:format("~w logged in as ~w\n",[Pid, Id]),
  34. {reply, ok, State};

  35. handle_call({logout, Pid}, _From, State) when is_pid(Pid) ->
  36. unlink(Pid),
  37. PidRows = ets:lookup(State#state.pid2id, Pid),
  38. case PidRows of
  39. [] ->
  40. ok;
  41. _ ->
  42. IdRows = [ {I,P} || {P,I} <- PidRows ], % invert tuples
  43. % delete all pid->id entries
  44. ets:delete(State#state.pid2id, Pid),
  45. % and all id->pid
  46. [ ets:delete_object(State#state.id2pid, Obj) || Obj <- IdRows ]
  47. end,
  48. io:format("pid ~w logged out\n",[Pid]),
  49. {reply, ok, State};

  50. handle_call({send, Id, Msg}, _From, State) ->
  51. % get pids who are logged in as this Id
  52. Pids = [ P || { _Id, P } <- ets:lookup(State#state.id2pid, Id) ],
  53. % send Msg to them all
  54. M = {router_msg, Msg},
  55. [ Pid ! M || Pid <- Pids ],
  56. {reply, ok, State}.

  57. % handle death and cleanup of logged in processes
  58. handle_info(Info, State) ->
  59. case Info of
  60. {‘EXIT’, Pid, _Why} ->
  61. % force logout:
  62. handle_call({logout, Pid}, blah, State);
  63. Wtf ->
  64. io:format("Caught unhandled message: ~w\n", [Wtf])
  65. end,
  66. {noreply, State}.

  67. handle_cast(_Msg, State) ->
  68. {noreply, State}.
  69. terminate(_Reason, _State) ->
  70. ok.
  71. code_change(_OldVsn, State, _Extra) ->
  72. {ok, State}.
更新mochiweb应用

让我们假设每个用户以基于连向mochiweb的URL的整形Id,用这个Id在消息路由器中注册。不是阻塞10秒钟,然后发送消息,而是mochiweb循环阻塞着从路由器接收消息。把路由器发送过来得每条消息包装成http块发送给客户端。
  • 客户端通过http://localhost:8000/test/123连接mochiweb
  • Mochiweb应用把该连接的pid和id '123'关联起来,注册在消息路由器
  • 假如你把(目标)地址id '123'的消息发送给路由器,该消息将被转发给正确得mochiweb进程,并在那个用户的浏览器中显示。
这里是更新后得mochiconntest_web.erl:

  1. -module(mochiconntest_web).

  2. -export([start/1, stop/0, loop/2]).

  3. %% External API

  4. start(Options) ->
  5. {DocRoot, Options1} = get_option(docroot, Options),
  6. Loop = fun (Req) ->
  7. ?MODULE:loop(Req, DocRoot)
  8. end,
  9. % we’ll set our maximum to 1 million connections. (default: 2048)
  10. mochiweb_http:start([{max, 1000000}, {name, ?MODULE}, {loop, Loop} | Options1]).

  11. stop() ->
  12. mochiweb_http:stop(?MODULE).

  13. loop(Req, DocRoot) ->
  14. "/" ++ Path = Req:get(path),
  15. case Req:get(method) of
  16. Method when Method =:= ‘GET’; Method =:= ‘HEAD’ ->
  17. case Path of
  18. "test/" ++ Id ->
  19. Response = Req:ok({"text/html; charset=utf-8",
  20. [{"Server","Mochiweb-Test"}],
  21. chunked}),
  22. % login using an integer rather than a string
  23. {IdInt, _} = string:to_integer(Id),
  24. router:login(IdInt, self()),
  25. feed(Response, IdInt, 1);
  26. _ ->
  27. Req:not_found()
  28. end;
  29. ‘POST’ ->
  30. case Path of
  31. _ ->
  32. Req:not_found()
  33. end;
  34. _ ->
  35. Req:respond({501, [], []})
  36. end.

  37. feed(Response, Id, N) ->
  38. receive
  39. {router_msg, Msg} ->
  40. Html = io_lib:format("Recvd msg #~w: ‘~s’", [N, Msg]),
  41. Response:write_chunk(Html)
  42. end,
  43. feed(Response, Id, N+1).

  44. %% Internal API

  45. get_option(Option, Options) ->
  46. {proplists:get_value(Option, Options), proplists:delete(Option, Options)}.

运行起来吧

现在让它们跑起来吧,我们将使用两个erlang shell,一个是mochiweb,另一个是路由器。编辑用来启动mochiweb的脚本start-dev.sh,给erl添加下列额外的参数:
  • -sname n1 命名erlang节点为'n1'
  • +K true 启用kernel-poll,处理大量连接时,似乎没有理由不启用这个选项
  • +P 134217727 默认的最大进程数目为32768;考虑到我们每个连接需要一个进程,我建议把它设为可以的最大值。通过'man erl'我们知道134,217,727是最大值。
现在运行make && ./start-dev.sh,你会看到这样的提示:(n1@localhost)1>,你得mochiweb程序正在运行,并且这个erlang节点有个名字。
现在运行另一个erlang shell:erl -sname n2
目前,那两个erlang实例彼此不知道对方,修正它:
(n2@localhost)1> nodes().
[]
(n2@localhost)2> net_adm:ping(n1@localhost).
pong
(n2@localhost)3> nodes().
[n1@localhost]
现在从这个shell中编译、启动路由器:
(n2@localhost)4> c(router).
{ok,router}
(n2@localhost)5> router:start_link().
{ok,<0.38.0>}
现在用你的浏览器访问http://localhost:8000/test/123(或者在控制台用lynx --source http://localhost:8000/test/123)。检查运行路由器得那个shell,你会看到有一位用户登录。
你现在可以向路由器发送消息,并能在你的浏览器中看到。目前为止,只可以发送字符串,因为在feed函数中我们使用io_lib:format的~s格式化它们,atoms会使程序崩溃。
n2@localhost)6> router:send(123, "Hello World").
(n2@localhost)7> router:send(123, "Why not open another browser window too?").
(n2@localhost)8> router:send(456, "This message will go into the void unless you are connected as /test/456 too").
看一下你的浏览器,comet程序已经正常运行。

运行在分布式erlang系统中

在不同机器上运行路由器和mochiweb前端是有意义的。假定你有几台闲置的电脑进行测试,你应该以分布式节点启动erlang shell。比如使用-name n1@host1.example.com替代-sname n1(同样对于n2)。通过net_adm:ping(...)确保它们互相可见。
注意在router.erl的16行,路由器进程的名字('router')是被全局注册的,因为在gen_server中我们使用了下列宏来识别/定位路由器,这已经适用于分布式系统:
-define(SERVER, global:whereis_name(?MODULE)).
在分布式系统中,进程的全局命名注册键是erlang天生由来的。

产生大量消息

在真实环境中,我们可能会看到长尾一样的使用模式,少些活跃用户加上大量的非活跃用户。然而,在这个测试用,我们不分青红皂白地给随机用户发送测试消息。
mesgen.erl:

  1. -module(msggen).
  2. -export([start/3]).

  3. start(0, _, _) -> ok;
  4. start(Num, Interval, Max) ->
  5. Id = random:uniform(Max),
  6. router:send(Id, "Fake message Num = " ++ Num),
  7. receive after Interval -> start(Num -1, Interval, Max) end.

它会给1到Max之间的随机的用户Id发送Num条消息,每次发送之间会等待Interval毫秒。
按照如下指令,你会看到整个运行情况。运行路由器和mochiweb程序,浏览器访问http://localhost:8000/test/3,然后运行:
erl -sname test
(test@localhost)1> net_adm:ping(n1@localhost).
pong
(test@localhost)2> c(msggen).
{ok,msggen}
(test@localhost)3> msggen:start(20, 10, 5).
ok
它给1-5之间的随机Id发送20条消息,每条消息之间有10毫秒的等待。
我们甚至可以并行地运行它们,模拟多个消息源。下面的例子会产生10个进程,给1-5之间的ids发送20条消息,每条消息之间等待100毫秒。
[ spawn(fun() -> msggen:start(20, 100, 5), io:format("~w finished.\n", [self()]) end) || _ <- lists:seq(1,10) ]. [<0.97.0>,<0.98.0>,<0.99.0>,<0.100.0>,<0.101.0>,<0.102.0>,
<0.103.0>,<0.104.0>,<0.105.0>,<0.106.0>]
<0.101.0> finished.
<0.105.0> finished.
<0.106.0> finished.
<0.104.0> finished.
<0.102.0> finished.
<0.98.0> finished.
<0.99.0> finished.
<0.100.0> finished.
<0.103.0> finished.
<0.97.0> finished.

再次CK10

我想我们需要进一步大规模的测试,客户端连接mochiweb程序,并被注册到消息路由器中。我们可以在路由器上产生大量测试消息,发往任何注册的客户端。让我们再测一遍1万个并发用户的情况,但是这一次我们让客户端多连一会。
假定你按照第一篇中的指令对内核进行了调节,增加最大文件限制等等,这很容易的。你已经运行了mochiweb程序和路由器。接着,让我们记录所有的通讯。
在没有客户端连接的情况下,mochiweb beam进程使用大约40MB内存(常驻的):
$ ps -o rss= -p `pgrep -f 'sname n1'`
40156
我写一行可怕的代码,每隔60秒打印一次:时间戳、mochiweb当前内存实用情况(KB单位)以及当前的连接数。在一个空闲的终端跑这段脚本:
$ MOCHIPID=`pgrep -f 'name n1'`; while [ 1 ] ; do NUMCON=`netstat -n | awk ‘/ESTABLISHED/ && $4==”127.0.0.1:8000″‘ | wc -l`; MEM=`ps -o rss= -p $MOCHIPID`; echo -e “`date`\t`date +%s`\t$MEM\t$NUMCON”; sleep 60; done | tee -a mochimem.log
现在在新的erl shell中启动第一篇中的floodtest工具:
erl> floodtest:start("/tmp/mochi-urls.txt", 10).
它将每秒建立10条连接的频率建立1万条连接。
你会看到,很快到达1万条连接:
erl> floodtest:start("/tmp/mochi-urls.txt", 10).
Stats: {825,0,0}
Stats: {1629,0,0}
Stats: {2397,0,0}
Stats: {3218,0,0}
Stats: {4057,0,0}
Stats: {4837,0,0}
Stats: {5565,0,0}
Stats: {6295,0,0}
Stats: {7022,0,0}
Stats: {7727,0,0}
Stats: {8415,0,0}
Stats: {9116,0,0}
Stats: {9792,0,0}
Stats: {10000,0,0}
...
请看那行脚本的输出:
Mon Oct 20 16:57:24 BST 2008 1224518244 40388 1
Mon Oct 20 16:58:25 BST 2008 1224518305 41120 263
Mon Oct 20 16:59:27 BST 2008 1224518367 65252 5267
Mon Oct 20 17:00:32 BST 2008 1224518432 89008 9836
Mon Oct 20 17:01:37 BST 2008 1224518497 90748 10001
Mon Oct 20 17:02:41 BST 2008 1224518561 90964 10001
Mon Oct 20 17:03:46 BST 2008 1224518626 90964 10001
Mon Oct 20 17:04:51 BST 2008 1224518691 90964 10001
1万条连接(加上我在firefox中打开的一条连接),大约占用90MB(90964KB)。
现在开始发送消息:

erl> [ spawn(fun() -> msggen:start(1000000, 100, 10000) end) || _ <- lists:seq(1,100) ]. [<0.65.0>,<0.66.0>,<0.67.0>,<0.68.0>,<0.69.0>,<0.70.0>,
<0.71.0>,<0.72.0>,<0.73.0>,<0.74.0>,<0.75.0>,<0.76.0>,
<0.77.0>,<0.78.0>,<0.79.0>,<0.80.0>,<0.81.0>,<0.82.0>,
<0.83.0>,<0.84.0>,<0.85.0>,<0.86.0>,<0.87.0>,<0.88.0>,
<0.89.0>,<0.90.0>,<0.91.0>,<0.92.0>,<0.93.0>|...]

100个进程,每个进程以每秒10条的频率随机地给1到1万之间的id发送1百万条消息。这意味着,路由器每秒发送1000条消息,平均每个客户端每10秒接收一条消息。

看一下floodtest的输出,你会看到客户端正在接收http块(记住这是{已连接的连接数,被关闭的连接数,接收到的数据块的数量}):

...
Stats: {10000,0,5912}
Stats: {10000,0,15496}
Stats: {10000,0,25145}
Stats: {10000,0,34755}
Stats: {10000,0,44342}
...

每个进程以每秒10条消息的频率发送1百万条消息,需要27个小时才能完成。

这里是10分钟后的内存使用情况:

Mon Oct 20 16:57:24 BST 2008 1224518244 40388 1
Mon Oct 20 16:58:25 BST 2008 1224518305 41120 263
Mon Oct 20 16:59:27 BST 2008 1224518367 65252 5267
Mon Oct 20 17:00:32 BST 2008 1224518432 89008 9836
Mon Oct 20 17:01:37 BST 2008 1224518497 90748 10001
Mon Oct 20 17:02:41 BST 2008 1224518561 90964 10001
Mon Oct 20 17:03:46 BST 2008 1224518626 90964 10001
Mon Oct 20 17:04:51 BST 2008 1224518691 90964 10001
Mon Oct 20 17:05:55 BST 2008 1224518755 90980 10001
Mon Oct 20 17:07:00 BST 2008 1224518820 91120 10001
Mon Oct 20 17:08:05 BST 2008 1224518885 98664 10001
Mon Oct 20 17:09:10 BST 2008 1224518950 106752 10001
Mon Oct 20 17:10:15 BST 2008 1224519015 114044 10001
Mon Oct 20 17:11:20 BST 2008 1224519080 119468 10001
Mon Oct 20 17:12:25 BST 2008 1224519145 125360 10001

当连上1万个客户端后,内存使用量就从40MB增加到90MB,当运行一段时间后,变为125MB。

值得指出的是,floodtest工具占用了大部分的CPU,msggen使用了2%的CPU,而路由器和mochiweb不到1%(只有客户端模拟程序占用大量的CPU,而服务器程序本身没怎么占用CPU)。这表明我们需要在多台机器或者多核CPU机器上跑测试。

24小时后的结果

这个测试跑了24个小时,同时把内存使用情况记录在mochimem.log文件中。这是1万个客户端,每秒1000条消息随机发送给客户端。
下面的bash/awk脚本调用gnuplot把mochimem.log文件中的数据图形化:

(echo -e "set terminal png size 500,300\nset xlabel \"Minutes Elapsed\"\nset ylabel \"Mem (KB)\"\nset title \"Mem usage with 10k active connections, 1000 msg/sec\"\nplot \"-\" using 1:2 with lines notitle" ; awk 'BEGIN{FS="\t";} NR%10==0 {if(!t){t=$2} mins=($2-t)/60; printf("%d %d\n",mins,$3)}' mochimem.log ; echo -e "end" ) | gnuplot > mochimem.png

图片展示了内存使用情况,24小时内基本上稳定在250MB。两个大的落差,一个是在开始的时候,另一个是在结束的时候,是因为我在mochiweb erlang进程中运行了:
erl> [erlang:garbage_collect(P) || P <- erlang:processes()].
这强制所有进程垃圾回收,释放了将近100MB的内存。接下来,我们一起研究如何节省内存开销。

在mochiweb中降低内存开销

mochiweb程序只是发送消息,并不会保存。内存使用量不会因为发送的消息数量增多而增多。
对与erlang的内存管理,我是新手。但我觉得时常地垃圾回收,会降低一部分的内存开销,最终让我们用最少的内存服务更多的用户。我们可能会牺牲一部分的CPU,但这是可以接受的。
深入了erlang文档 ,发现如下的选项:

erlang:system_flag(fullsweep_after, Number)

Number is a non-negative integer which indicates how many times generational garbages collections can be done without forcing a fullsweep collection. The value applies to new processes; processes already running are not affected.
In low-memory systems (especially without virtual memory), setting the value to 0 can help to conserve memory.
An alternative way to set this value is through the (operating system) environment variable ERL_FULLSWEEP_AFTER.

看起来不错,不过它只适用于新的进程,并且会影响到VM中的所有进程,而不是只有我们的mochiweb进程。
接着:

erlang:system_flag(min_heap_size, MinHeapSize)

Sets the default minimum heap size for processes. The size is given in words. The new min_heap_size only effects processes spawned after the change of min_heap_size has been made. The min_heap_size can be set for individual processes by use of spawn_opt/N or process_flag/2.

可能会有用,不过我确信mochiweb进程需要一个更大的堆。如果可以,我会尽可能避免修改mochiweb的源代码,增加spawn选项。
接着,这个映入我的眼帘:

erlang:hibernate(Module, Function, Args)

Puts the calling process into a wait state where its memory allocation has been reduced as much as possible, which is useful if the process does not expect to receive any messages in the near future.

The process will be awaken when a message is sent to it, and control will resume in Module:Function with the arguments given by Args with the call stack emptied, meaning that the process will terminate when that function returns. Thus erlang:hibernate/3 will never return to its caller.

If the process has any message in its message queue, the process will be awaken immediately in the same way as described above.

In more technical terms, what erlang:hibernate/3 does is the following. It discards the call stack for the process. Then it garbage collects the process. After the garbage collection, all live data is in one continuous heap. The heap is then shrunken to the exact same size as the live data which it holds (even if that size is less than the minimum heap size for the process).

If the size of the live data in the process is less than the minimum heap size, the first garbage collection occurring after the process has been awaken will ensure that the heap size is changed to a size not smaller than the minimum heap size.

Note that emptying the call stack means that any surrounding catch is removed and has to be re-inserted after hibernation. One effect of this is that processes started using proc_lib (also indirectly, such as gen_server processes), should use proc_lib:hibernate/3 instead to ensure that the exception handler continues to work when the process wakes up.

看上去是有道理的,每条消息后进行睡眠,看看会发生什么变化。
编辑mochiconntest_web.erl,进行如下更改:
  • 最后一行的fee(Response, Id, N)函数将调用hibernate而不是自身
  • 登录路由器后立即调用hibernate,而不是调用feed,然后阻塞着接收
  • 记住导出feed/3,那样hibernate在醒来后可以回调它
更新mochiconntest_web.erl:
  1. -module(mochiconntest_web).

  2. -export([start/1, stop/0, loop/2, feed/3]).

  3. %% External API

  4. start(Options) ->
  5. {DocRoot, Options1} = get_option(docroot, Options),
  6. Loop = fun (Req) ->
  7. ?MODULE:loop(Req, DocRoot)
  8. end,
  9. % we’ll set our maximum to 1 million connections. (default: 2048)
  10. mochiweb_http:start([{max, 1000000}, {name, ?MODULE}, {loop, Loop} | Options1]).

  11. stop() ->
  12. mochiweb_http:stop(?MODULE).

  13. loop(Req, DocRoot) ->
  14. "/" ++ Path = Req:get(path),
  15. case Req:get(method) of
  16. Method when Method =:= ‘GET’; Method =:= ‘HEAD’ ->
  17. case Path of
  18. "test/" ++ IdStr ->
  19. Response = Req:ok({"text/html; charset=utf-8",
  20. [{"Server","Mochiweb-Test"}],
  21. chunked}),
  22. {Id, _} = string:to_integer(IdStr),
  23. router:login(Id, self()),
  24. % Hibernate this process until it receives a message:
  25. proc_lib:hibernate(?MODULE, feed, [Response, Id, 1]);
  26. _ ->

  27. Req:not_found()
  28. end;
  29. ‘POST’ ->
  30. case Path of
  31. _ ->
  32. Req:not_found()
  33. end;
  34. _ ->
  35. Req:respond({501, [], []})
  36. end.

  37. feed(Response, Id, N) ->
  38. receive
  39. {router_msg, Msg} ->
  40. Html = io_lib:format("Recvd msg #~w: ‘~w’
    "
    , [N, Msg]),
  41. Response:write_chunk(Html)
  42. end,
  43. % Hibernate this process until it receives a message:
  44. proc_lib:hibernate(?MODULE, feed, [Response, Id, N+1]).


  45. %% Internal API

  46. get_option(Option, Options) ->
  47. {proplists:get_value(Option, Options), proplists:delete(Option, Options)}.
更改后,重新编译mochiweb,然后重新进行相同的c10k测试。
使用proc_lib:hibernate()后的结果
使用hibernate后,内存使用量仅为78MB,明显好于第一篇中的450MB。CPU使用量没有明显提高。

总结

我们基于mochiweb实现了comet程序,让我们可以通过ID任意地给客户端发送消息。经过24小时不间断地1000 msg/秒发送消息,1万个连接,使用了80MB内存,也就是每条连接占用8KB。我们还制作了精美的图表。
对于第一篇中的每条连接45KB来说,这是一个进步,而这一切归功于我们以一种更真实的方式进行测试,并且在消息之间使用了hibernate。

下一步

在第三篇中(不久发布),我将进行1百万连接的测试,测试程序部署在多cpu的64位服务器上,有足够的内存。
看看有什么区别。我也会列出一些窍门,用来模拟1百万个客户端。
程序将以一种pub-sub系统呈现,订阅将和用户id有关,并被程序保存,而不是由客户端在连接的时候提供。我们将加载典型的社交网数据:朋友。这将允许用户用他们的id登录,并能自动接受朋友们的事件。

Sunday, October 19, 2008

深入浅出AMF3

深入浅出AMF3

By Oscar

1 不同的序列化方式

让我们先考虑这样一个需求:客户端发送两个整数给服务器,服务器接收数据,计算它们的和,然后把结果传回给客户端。

我们如何序列化这两个整型数据呢?

1.1 传统的序列化方式

首先,我们考虑传统的序列化方式,一个报头,加上具体数据,大致如下:

两个字节消息类型

两个字节消息长度

四个字节value1

四个字节value2

也就是说,以这种包装数据的方式,我们需要发送12字节。

1.2 Java对象序列化方式

接下来,我们尝试一下Java对象序列化方式,我们需要定义这样的类:

public class SumUpMessage implements Serializable

{

private static final long serialVersionUID = 3357976281271608025L;

private int value1;

private int value2;

public int getValue1()

{

return value1;

}

public void setValue1(int value1)

{

this.value1 = value1;

}

public int getValue2()

{

return value2;

}

public void setValue2(int value2)

{

this.value2 = value2;

}

}

测试发现,我们需要发送69字节。

测试程序如下:

SumUpMessage data = new SumUpMessage();

ByteArrayOutputStream baos = new ByteArrayOutputStream();

ObjectOutputStream oos = new ObjectOutputStream(baos);

oos.writeObject(data);

int storedSize = baos.size();

System.out.println(String.format("SumUpMessage: 需要占用%d字节", storedSize));

1.3 AMF3方式

最后,让我们AMF3的封包方式,我们使用BlazeDS中的amf3相关API实现如下的测试程序:

SerializationContext serializationContext = SerializationContext.getSerializationContext();

ByteArrayOutputStream baos = new ByteArrayOutputStream();

Amf3Output amf3Output = new Amf3Output(serializationContext);

AmfTrace trace = new AmfTrace();

amf3Output.setDebugTrace(trace);

amf3Output.setOutputStream(baos);

try

{

amf3Output.writeObject(new SumUpMessage());

amf3Output.flush();

int storedSize = baos.size();

System.out.println(String.format("SumUpMessage.amf3: 需要占用%d字节", storedSize));

amf3Output.close();

}

catch (IOException e)

{

e.printStackTrace();

}

我们需要发送43字节。

1.4 总结

通过上述的粗糙测试,我们对于AMF3的封包有个大概的了解。AMF3的『压缩率』对于特别在乎网络带宽的公司,可能不是那么满意。但是它能提高我们日常的开发效率。因为amf3flash默认的封包方式,对于flash程序员,可以忽略字节流的概念,所有input/output都是对象或者原子数据(primitive type)。

让我继续吧。下一章我们将深入SumUpMessage被封装的过程。

2 AMF3详细分析

上面的测试,我们并没有实现Externalizable接口,也就是说,整个封装过程都是按照默认方式进行的。

接下来,我们实现Externalizable接口。

public void writeExternal(ObjectOutput out) throws IOException

{

out.writeInt(value1);

out.writeInt(value2);

}

现在只需要33字节。

接下来,我们用writeObject替代writeInt,即

public void writeExternal(ObjectOutput out) throws IOException

{

out.writeObject(value1);

out.writeObject(value2);

}

此时,只需占用29字节。

现在的问题是,为什么这三种不同的实现方式,而带来完全不同的效果呢?

好的,让我们继续分析吧。

2.1 默认的实现


0x0A: object marker

0x23: trait信息,包含是否externalizabledynamic以及SumUpMessage类的属性数量。

XXXXXXXX XXXXXXXX XXXXXXXX XXXXDE11

E: 表示是否实现了Externalizable接口

D: 是否是动态的

X: 表示所含属性的数量

整个整型数据以29-bit编码方式进行优化处理,所以只占用一个字节。

0x2D: 类名所占字节数,2*类名的字符数目+1

0x63 6F 6D 2E 6F 73 63 61 72 2E 53 75 6D 55 70 4D 65 73 73 61 67 65: 类名的字符串

0x0D: 属性名1所占字节数,也是2*属性名的字符数目+1

0x76 61 6C 75 65 31: 属性名1的字符串

0x0D: 属性名2所占字节数,也是2*属性名的字符数目+1

0x76 61 6C 75 65 32: 属性名2的字符串

0x04: integer marker

0x00: 属性1的值

0x04: integer marker

0x00: 属性2的值

2.2 实现Externalizable接口,使用writeInt输出


0x0A: object marker

0x07: trait信息

0x2D: 类名所占字节数

0x63 6F 6D 2E 6F 73 63 61 72 2E 53 75 6D 55 70 4D 65 73 73 61 67 65: 类名的字符串

0x00 00 00 00: 我们在writeExternal中实现的,写属性1的值

0x00 00 00 00: 我们在writeExternal中实现的,写属性2的值

2.3 实现Externalizable接口,使用writeObject输出


0x0A: object marker

0x07: trait信息

0x2D: 类名所占字节数

0x63 6F 6D 2E 6F 73 63 61 72 2E 53 75 6D 55 70 4D 65 73 73 61 67 65: 类名的字符串

0x04: 我们在writeExternal中实现的,integer marker

0x00: 我们在writeExternal中实现的,属性1的值

0x04: 我们在writeExternal中实现的,integer marker

0x00: 我们在writeExternal中实现的,属性2的值

2.4 29bit编码

29bit编码的目的是尽可能使用少量的内存来表示一个integer数据。它把一个byte的最高bit用来标记下一个byte是否属于这个integer 32bit中去掉用作标记的3bit,就剩下29bit用作存放实际数据。

3 下一步

下一步,我将把AMF3序列化和反序列化功能添加到Grizzly ARP中,另外实现wireshark插件解析AMF3格式协议。敬请期待。

4 参考资料