1. The simplest socket server.
1. var socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
2. socket.Bind(new IPEndPoint(IPAddress.Parse("Any"), 10086));
3. socket.Listen(10);
4.
5. while (true)
6. {
7. //blocked until accept an new client.
8. var client = socket.Accept();
9.
10. //create a new thread to receive message from client.
11. Task.Factory.StartNew(() =>
12. {
13. var buffer = new byte[1024];
14. while (true)
15. {
16. //blocked until receive the message.
17. int len = client.Receive(buffer);
18.
19. //...
20. }
21. });
22. }
这是最简单的阻塞式Socket服务器,服务端socket会一直阻塞在Accept方法,直到一个客户端socket链接接入。
Receive方法为阻塞方法,故需要开启线程循环监听数据,并进行处理。
该方式性能有限,不能满足高并发,高容量,高时效性的场景。
2. The simple async socket server.
1. var socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
2. socket.Bind(new IPEndPoint(IPAddress.Parse("Any"), 10086));
3. socket.Listen(10);
4.
5. socket.BeginAccept(sender =>
6. {
7. var client = (Socket)sender.AsyncState;
8. var buffer = new byte[1024];
9. client.BeginReceive(buffer, 0, 1024, SocketFlags.None, sender1 =>
10. {
11. //do something.
12. //call begin receive again.
13. //Need use EndReceive Method to complete this action.
14. }, client);
15.
16. //call begin accept again.
17. }, socket);
使用Begin与End完成简单的异步服务,这种方式也成为APM模式。
该方式相比于上述阻塞式服务有较大的性能提升,但在高并发情况下,每个Begin/End方法都会创建一个IAsyncResult实例,造成巨大的额外开销。
3. The High-I/O async socket server.
1. var endPoint = new IPEndPoint(IPAddress.Parse("Any"), 10086);
2.
3. var socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
4. socket.Bind(endPoint);
5. socket.Listen(100);
6.
7. var acceptAsyncProxy = new SocketAsyncEventArgs
8. {
9. RemoteEndPoint = endPoint,
10. UserToken = socket
11. };
12. acceptAsyncProxy.Completed += (sender, e) =>
13. {
14. var client = (Socket)sender;
15. var buffer = new byte[1024];
16. var receiveAsyncProxy = new SocketAsyncEventArgs
17. {
18. RemoteEndPoint = client.RemoteEndPoint,
19. UserToken = client
20. };
21. receiveAsyncProxy.SetBuffer(buffer, 0, 1024);
22. receiveAsyncProxy.Completed += (sender1, e1) =>
23. {
24. var buf = e1.Buffer;
25.
26. //do something.
27. client.ReceiveAsync(e1);
28. };
29. client.ReceiveAsync(receiveAsyncProxy);
30.
31. socket.AcceptAsync(e);
32. };
33. socket.AcceptAsync(acceptAsyncProxy);;
该方式相较于上述APM模式,使用 SocketAsyncEventArgs(以下简称 SAEA)替代了Begin/End方法过程中不必要的开销,做到了复用。
当我们能够预测并发量,有针对性地预先分配好SAEA实例,在发送与接收数据时能够复用,则能很好地提升服务器的性能。
通常情况下,我们会给每一个客户端Socket分配一个接收与发送的SAEA实例,并根据需要预先分配数个接收客户端Socket链接的SAEA实例。
4. Enhance
在使用SAEA的基础上,还可以进一步进行优化。
主要的优化点在于接收与发送的Buffer,在大量的数据收发中,如果频繁创建buffer用于接收或者发送,则会产生巨大的开销,内存溢出甚至宕机。
以下针对接收与发送分别作出说明。
4.1接收端
根据业务需求,我们可以预先分配好一个总的buffer,使得每个SAEA都能够分配使用其中的一块区域。当一个客户端Socket断开时,我们认为该SAEA之前持有的这部分区域被释放,可以给下一个客户端Socket连入时使用。
1. _bufferManager = new BufferManager(bufferSize * MaxConnectionNumber, bufferSize);
2. // preallocate pool of SocketAsyncEventArgs objects
3. SocketAsyncEventArgs socketEventArg;
4.
5. var socketArgsProxyList = new List<SocketAsyncEventArgsProxy>(MaxConnectionNumber);
6.
7. for (int i = 0; i < MaxConnectionNumber; i++)
8. {
9. //Pre-allocate a set of reusable SocketAsyncEventArgs
10. socketEventArg = new SocketAsyncEventArgs();
11. _bufferManager.SetBuffer(socketEventArg);
12.
13. socketArgsProxyList.Add(new SocketAsyncEventArgsProxy(socketEventArg));
14. }
通常情况下,我们根据预设客户端Socket的链接数设置总buffer的大小,而每一个客户端Socket分配的缓冲区buffer的大小需要根据实际业务调整。
上面代码中的Buffer Manager类,管理着所有接收SAEA的缓冲区Buffer。
4.2发送端
类似的,我们也会为每一个SAEA分配一个用于维护发送数据的Sending Queue发送队列(以下简称SQ),其遵循先进先出的规则。
1. if (_sendingQueuePool == null)
2. _sendingQueuePool = ((SocketServerBase)((ISocketServerAccessor)appSession.AppServer).SocketServer).SendingQueuePool;
3.
4. SendingQueue queue;
5. if (_sendingQueuePool.TryGet(out queue))
6. {
7. _sendingQueue = queue;
8. queue.StartEnqueue();
9. }
但有一点不同的是,考虑到吞吐量的因素,需要对发送的数据进行管理再发送。我们需要创建一个Sending Queue Pool(以下简称SP)用于管理SQ。
通常,每一个SAEA持有一个SQ发送队列。当吞吐量较大,当前SQ正被SAEA占用时,后续需要发送的数据需要创建一个新的SQ来进行维护。直到当前的数据都发送完成时,数据都由新的SQ进行管理。
1. SendingQueue newQueue;
2.
3. if (!_sendingQueuePool.TryGet(out newQueue))
4. {
5. OnSendEnd(CloseReason.InternalError, true);
6. AppSession.Logger.Error("There is no enougth sending queue can be used.");
7. return;
8. }
9.
10. var oldQueue = Interlocked.CompareExchange(ref _sendingQueue, newQueue, queue);
11.
12. if (!ReferenceEquals(oldQueue, queue))
13. {
14. if (newQueue != null)
15. _sendingQueuePool.Push(newQueue);
16.
17. if (IsInClosingOrClosed)
18. {
19. OnSendEnd();
20. }
21. else
22. {
23. OnSendEnd(CloseReason.InternalError, true);
24. AppSession.Logger.Error("Failed to switch the sending queue.");
25. }
26.
27. return;
28. }
29.
30. //Start to allow enqueue
31. newQueue.StartEnqueue();
32. queue.StopEnqueue();
值得注意的是,为了避免正在发送中的数据被篡改,将当前的SQ置于只读状态。SP也不能无限制地创建新的SQ,当超过最大SQ数量时,便返回失败。因为此时服务器已经达到了瓶颈,继续创建也只是在不停往内存中写入数据,严重时会造成宕机。
当一个SAEA持有多个SQ时,发送完毕后需要将其进行回收,以便复用。
1. //Initialize SocketAsyncEventArgs for sending
2. _socketEventArgSend = new SocketAsyncEventArgs();
3. _socketEventArgSend.Completed += new EventHandler<SocketAsyncEventArgs>(OnSendingCompleted);
4.
5. _socketEventArgSend.UserToken = queue;
6.
7. if (queue.Count > 1)
8. _socketEventArgSend.BufferList = queue;
9. else
10. {
11. var item = queue[0];
12. _socketEventArgSend.SetBuffer(item.Array, item.Offset, item.Count);
13. }
14.
15. var client = Client;
16.
17. if (client == null)
18. {
19. OnSendError(queue, CloseReason.SocketError);
20. return;
21. }
22.
23. if (!client.SendAsync(_socketEventArgSend))
24. OnSendingCompleted(client, _socketEventArgSend);
无论发送是否成功,我们都需要调用其发送完成的回调函数,回调函数中将再次取得该客户端Socket对应的SQ,再一次调用发送方法,如果此时有待发送的数据的话。如此以达成递归发送数据。
4. Enhance – Bonus
再进一步的优化。
请参考 Buffer Manage,Sending Queue,Smart Pool的实现,以便获得更深的理解。
4.1 接收端
1)不同于发送端,接收端需要考虑分包的因素。故需要设定一个协议,来让服务器知道一个数据包是否已经接收完成。这里只提供几种思路。
最简单的做法是使用特殊分隔符作为数据结束的标识;比较安全的方法是使用特殊请求头,在首个数据包里申明包的长度。也可以在每个数据包里都加上请求头,以及序号。更为保险地,可以在每个数据包结尾加入Crc等校验。当然,后两种做法无疑会产生额外开销。
2)与发送端类似,在数据较大时,需要进行分包处理。类似于SQ的原理,将接收到的数据进行缓存,处理完毕后进行释放。
4.2 发送端
1)建议预先分配1/6的预估最大客户端链接数的数量的SQ;最多可维护链接数的2倍的SQ,即每个SAEA可持有一个正在发送的SQ,与一个等待发送并持续收集发送数据的SQ。更多的SQ不利于维护,利用率也低。
2)再次注意,SQ的数据发送完成后,需要进行释放,将其维护的数据集合清空。如果当前存在等待SQ,还需要把当前执行的SQ进行回收,返回至SP中,以便复用。
3)设置单个SQ的数据容量上限,较多的数据发送耗时会非常久,有可能造成服务器上待发送数据积压,内存溢出。
4)复用同一个数据。当一条数据需要推送至所有客户端Socket时,反复为其开辟一块区域开销非常大,在使用时需注意。
4.3 架构
如图所示,这只是一种应用架构,能较好地隔离开Socket的维护部分,专注于业务开发。
Socket Session维护着Socket,初始化时向SP申请SQ,并通过SAEA实现高性能的接收发送方法。
Socket Server为Socket监听服务,当客户端Socket连入时创建与之对应的Socket Session,为其分配数据缓冲区以及数据接收所需的SAEA,并且通知App Server。
App Session为Socket Session业务层次上的实现,无需关注Socket,提供发送方法,以及数据过滤方法。当处理完成一个数据时,通知App Server。
Receive Filter类为具体的数据处理类,依据自定义协议,转化为基础数据类型Request Info。
Receive Filter Factory为创建Receive Filter的工厂方法类。针对不同业务场景创建具体的Receive Filter。
App Server为核心中枢,其创建和维护Socket Server,并且维护所有的App Session。在创建App Session时,将Socket Session与之绑定,并通过指定的Receive Filter Factory为其创建相应的过滤器Receive Filter。在接收到App Session处理后的基础数据Request Info时,根据其属性Key加载对应的Command。
Command中可实现具体业务,对数据Request Info进行处理。
Command Loader为Command帮助类,通过反射遍历并获取Command,在通过其Name与基础数据的Key判别,加载相应的Command。
本文暂时没有评论,来添加一个吧(●'◡'●)