fastdo  0.5.12
eiennet_websocket.hpp
浏览该文件的文档.
1 #pragma once
2 
3 namespace eiennet
4 {
5 
7 namespace ws
8 {
32  /*
33  一个为未分片的消息(FIN为1,opcode非0)
34  一个分片的消息由起始帧:
35  (FIN为0,opcode非0),
36  若干(0个或多个)帧(FIN为0,opcode为0),
37  结束帧(FIN为1,opcode为0)
38  */
39  /*
40  HTTP升级为websocket连接的请求
41  GET /xxx HTTP/1.1
42  Host: hostname[:port]
43  Upgrade: websocket
44  Connection: Upgrade
45  Sec-WebSocket-Key: 随机生成的16字节的值用BASE64编码后的字符串
46  Origin: 其值为请求发起页面的scheme://hostname (因为中间可能经过代理才到达服务器,服务器可以根据该字段选择是否和客户端建立连接)
47  Sec-WebSocket-Version: 13
48  [Sec-WebSocket-Protocol: 其值为由逗号分隔的子协议的名字,按优先度排序,每个名字必须唯一]
49  [Sec-WebSocket-Extension: 表示协议级别的扩展]
50 
51  连接建立的合法响应
52  HTTP/1.1 101 xxx
53  Upgrade: websocket
54  Connection: Upgrade
55  Sec-WebSocket-Accept: 其值为 Base64( SHA1( RequestHeader{Sec-WebSocket-Key} + "258EAFA5-E914-47DA-95CA-C5AB0DC85B11" ) )
56  [Sec-WebSocket-Protocol: 其值为请求发来的子协议中的一个或空]
57  */
58 
61 {
62  dataContinued = 0x0U,
63  dataText = 0x1U,
64  dataBinary = 0x2U,
65  dataRsv3 = 0x3U,
66  dataRsv4 = 0x4U,
67  dataRsv5 = 0x5U,
68  dataRsv6 = 0x6U,
69  dataRsv7 = 0x7U,
70  ctrlClose = 0x8U,
71  ctrlPing = 0x9U,
72  ctrlPong = 0xAU,
73  ctrlRsvB = 0xBU,
74  ctrlRsvC = 0xCU,
75  ctrlRsvD = 0xDU,
76  ctrlRsvE = 0xEU,
77  ctrlRsvF = 0xFU,
78 };
79 
82 {
85  //errCnnUpgradeFail,
86 };
87 
90 {
91  ftBasic = 0b0010,
92  ftBasicMasked = 0b0011,
93  ftExtended16 = 0b0100,
95  ftExtended64 = 0b1000,
97 };
98 
101 {
106 };
107 
108 #pragma pack(push, 1)
109 
111 struct FrameBase
112 {
118 
121 
123  {
124  memset( this, 0, sizeof(*this) );
125  }
126 };
127 
128 // WebSocket数据帧根据Payload-len来决定帧头部大小
129 // Payload-len < 126: 2bytes, 16bits
130 // Payload-len ==126: 4bytes, 32bits
131 // Payload-len ==127: 10bytes, 80bits
132 // 除此之外,如果mask位为1,则包含32bits的掩码Key,即
133 // Payload-len < 126: 6bytes, 48bits
134 // Payload-len ==126: 8bytes, 64bits
135 // Payload-len ==127: 14bytes, 112bits
136 
137 // 基本帧 payload len < 126, mask == 0
139 {
141  {
142  memset( this, 0, sizeof(*this) );
143  }
144 
145  template < typename _Ty >
146  _Ty * data()
147  {
148  return reinterpret_cast<_Ty*>( this + 1 );
149  }
150 };
151 #pragma pack(pop)
152 
153 #pragma pack(push, 2)
154 
155 // 基本掩码帧 payload len < 126, mask == 1
157 {
158  winux::uint8 maskingKey[4];
159 
161  {
162  memset( this, 0, sizeof(*this) );
163  }
164 
165  template < typename _Ty >
166  _Ty * data()
167  {
168  return reinterpret_cast<_Ty*>( this + 1 );
169  }
170 };
171 
172 // 16位数据扩展帧 payload len == 126, mask == 0
174 {
176 
178  {
179  memset( this, 0, sizeof(*this) );
180  }
181 
182  template < typename _Ty >
183  _Ty * data()
184  {
185  return reinterpret_cast<_Ty*>( this + 1 );
186  }
187 };
188 
189 // 16位数据扩展掩码帧 payload len == 126, mask == 1
191 {
193  winux::uint8 maskingKey[4];
194 
196  {
197  memset( this, 0, sizeof(*this) );
198  }
199 
200  template < typename _Ty >
201  _Ty * data()
202  {
203  return reinterpret_cast<_Ty*>( this + 1 );
204  }
205 };
206 
207 // 64位数据扩展帧 payload len == 127, mask == 0
209 {
211 
213  {
214  memset( this, 0, sizeof(*this) );
215  }
216 
217  template < typename _Ty >
218  _Ty * data()
219  {
220  return reinterpret_cast<_Ty*>( this + 1 );
221  }
222 };
223 
224 // 64位数据扩展掩码帧 payload len == 127, mask == 1
226 {
228  winux::uint8 maskingKey[4];
229 
231  {
232  memset( this, 0, sizeof(*this) );
233  }
234 
235  template < typename _Ty >
236  _Ty * data()
237  {
238  return reinterpret_cast<_Ty*>( this + 1 );
239  }
240 };
241 
242 // 关闭帧载荷数据
244 {
246  template < typename _Ty >
247  _Ty * data()
248  {
249  return reinterpret_cast<_Ty*>( this + 1 );
250  }
251 };
252 
253 
254 #pragma pack(pop)
255 
256 // 发送WebSocket帧
257 EIENNET_FUNC_DECL(bool) SendWebSocketFrame( eiennet::Socket * sock, OpCode opcode, bool fin, bool mask, winux::byte * payloadData = nullptr, winux::uint payloadDataSize = 0 );
258 // 发送WebSocket数据
259 EIENNET_FUNC_DECL(bool) SendWebSocketBuffer( eiennet::Socket * sock, OpCode opcode, bool mask, winux::Buffer payloadData = winux::Buffer(), int perFrameMaxPayloadSize = -1 );
260 // 发送WebSocket数据
261 EIENNET_FUNC_DECL(bool) SendWebSocketAnsi( eiennet::Socket * sock, OpCode opcode, bool mask, winux::AnsiString payloadData = winux::AnsiString(), int perFrameMaxPayloadSize = -1 );
262 
263 
265 
267 {
268  WebSocketWrapper( ClientCtx * clientCtx ) : clientCtx(clientCtx)
269  {
270  }
272  winux::AnsiString messageBuf; // websocket消息缓冲区,用于接收多帧数据(一条消息可能分片)
273  int messageType = 0; // 消息类型,1文本;2二进制
274  ws::ReadyState state = ws::stateConnecting; // 连接状态
275 
276  bool close( winux::uint16 errCode = -1, winux::AnsiString const & errStr = "" )
277  {
278  winux::GrowBuffer data;
279  if ( errCode != (winux::uint16)-1 )
280  {
281  errCode = htont(errCode);
282  data.append( &errCode, sizeof(errCode) );
283  data.append(errStr);
284  }
285  if ( !SendWebSocketBuffer( clientCtx->clientSockPtr.get(), ws::ctrlClose, false, std::move(data) ) ) return false;
286 
287  if ( this->state == ws::stateClosing )
288  this->state = ws::stateClosed;
289  else
290  this->state = ws::stateClosing;
291  return true;
292  }
293 
294  bool send( winux::AnsiString const & data, ws::OpCode opcode = ws::dataText )
295  {
296  return SendWebSocketAnsi( clientCtx->clientSockPtr.get(), opcode, false, data, 65535 );
297  }
298 };
299 
302 {
303 public:
305  : ClientCtx( clientId, clientEpStr, clientSockPtr ), url(http::Url::urlSimple), websocket(this)
306  {
307  }
309  {
310  ColorOutput( winux::fgPurple, "Client[", clientId, "]客户场景`", clientEpStr, "`析构" );
311  }
312  DataRecvSendCtx forClient; // 数据收发场景
313  winux::AnsiString requestHeaderStr; // 请求头字符串
315  winux::GrowBuffer requestBody; // 请求体数据
316  http::Url url; // URL
317 
318  bool isWebSocketWrapper = false; // 是否启用websocket包装
320 };
321 
323 template < class _ClientCtx = WsHttpClientCtx >
324 class WsHttpServer : public Server<_ClientCtx>
325 {
326 public:
328 
329  using OpenHandlerFunction = std::function< void( ClientCtxSharedPointer clientCtxPtr ) >;
330  using MessageHandlerFunction = std::function< void( ClientCtxSharedPointer clientCtxPtr, winux::AnsiString const & data, int messageType ) >;
331  using CloseHandlerFunction = std::function< void( ClientCtxSharedPointer clientCtxPtr, winux::uint16 errCode, winux::AnsiString const & errStr ) >;
332  using ErrorHandlerFunction = std::function< void( ClientCtxSharedPointer clientCtxPtr, WebSocketErrorCode ec ) >;
333 
334  WsHttpServer( winux::String const & serverIp, winux::ushort port, int threadCount = 10, int listenBacklog = 10, double durationSec = 0.1 )
335  : Server<_ClientCtx>( ip::EndPoint( serverIp, port ), threadCount, listenBacklog, durationSec )
336  {
337  }
338 
339  void onOpenHandler( OpenHandlerFunction handler ) { _openHandler = handler; }
340  void onMessageHandler( MessageHandlerFunction handler ) { _messageHandler = handler; }
341  void onCloseHandler( CloseHandlerFunction handler ) { _closeHandler = handler; }
342  void onErrorHandler( ErrorHandlerFunction handler ) { _errorHandler = handler; }
343 
344 protected:
345  // 业务启动函数
346  virtual void onStartup( ClientCtxSharedPointer clientCtxPtr ) override
347  {
348  ColorOutput( winux::fgYellow, "Client[", clientCtxPtr->clientId, "]客户`", clientCtxPtr->clientEpStr, "`到来" );
349  this->_pool.task( &WsHttpServer::_doRecvRequestHeaderTask, this, clientCtxPtr );
350  }
351 
352  // 尝试接收一个请求头的任务
354  {
355  int rcWait;
356  int rc = clientCtxPtr->clientSockPtr->recvWaitUntilTarget(
357  "\r\n\r\n",
358  &clientCtxPtr->forClient.data,
359  &clientCtxPtr->forClient.extraData,
360  &clientCtxPtr->forClient.hadBytes,
361  &clientCtxPtr->forClient.startpos,
362  &clientCtxPtr->forClient.pos,
363  this->_pool.getDurationSec(),
364  &rcWait,
365  [&clientCtxPtr] ( int had, void* ) {
366  clientCtxPtr->forClient.retryCount = 0; // 重置次数
367  }
368  );
369 
370  if ( clientCtxPtr->forClient.pos != -1 ) // 接收成功
371  {
372  // 解析请求头
373  clientCtxPtr->requestHeaderStr = clientCtxPtr->forClient.data.toAnsi();
374  clientCtxPtr->requestHeader.clear();
375  clientCtxPtr->requestHeader.parse(clientCtxPtr->requestHeaderStr);
376 
377  clientCtxPtr->forClient.resetStatus(); // 重置收发状态
378  clientCtxPtr->forClient.data = std::move(clientCtxPtr->forClient.extraData); // 把额外接收的数据当作请求体数据
379  // 从Content-Length获取请求体大小
380  int contentLength = static_cast<http::Header&>(clientCtxPtr->requestHeader).getHeader<int>("Content-Length");
381  clientCtxPtr->forClient.targetBytes = contentLength - clientCtxPtr->forClient.data.getSize(); // 需要接收的目标大小
382  if ( clientCtxPtr->forClient.targetBytes > 0 )
383  {
384  // 投递尝试接收请求体
385  this->_pool.task( &WsHttpServer::_doRecvRequestBodyTask, this, clientCtxPtr );
386  }
387  else
388  {
389  // 投递处理请求任务
390  this->_pool.task( &WsHttpServer::_doRequestTask, this, clientCtxPtr );
391  }
392  }
393  else if ( rcWait == 0 )
394  {
395  clientCtxPtr->forClient.retryCount++;
396  if ( clientCtxPtr->forClient.retryCount < DataRecvSendCtx::RetryCount )
397  {
398  // 重投尝试接收请求头
399  this->_pool.task( &WsHttpServer::_doRecvRequestHeaderTask, this, clientCtxPtr );
400  }
401  else
402  {
403  ColorOutput( winux::fgRed, "Client[", clientCtxPtr->clientId, "]从客户`", clientCtxPtr->clientEpStr, "`接收一个请求头时长时间没有数据到来" );
404  ColorOutput( winux::fgFuchsia, "Client[", clientCtxPtr->clientId, "]移除客户`", clientCtxPtr->clientEpStr, "`,还剩", this->_clients.size() - 1, "客户" );
405  this->removeClient(clientCtxPtr->clientId);
406  }
407  }
408  else
409  {
410  ColorOutput( winux::fgRed, "Client[", clientCtxPtr->clientId, "]从客户`", clientCtxPtr->clientEpStr, "`接收头部出错" );
411  ColorOutput( winux::fgFuchsia, "Client[", clientCtxPtr->clientId, "]移除客户`", clientCtxPtr->clientEpStr, "`,还剩", this->_clients.size() - 1, "客户" );
412  this->removeClient(clientCtxPtr->clientId);
413  }
414  }
415 
416  // 尝试接收一个请求体的任务
418  {
419  int rcWait;
420  int rc = clientCtxPtr->clientSockPtr->recvWaitUntilSize(
421  clientCtxPtr->forClient.targetBytes,
422  &clientCtxPtr->forClient.data,
423  &clientCtxPtr->forClient.hadBytes,
424  this->_pool.getDurationSec(),
425  &rcWait,
426  [&clientCtxPtr] ( int had, void* ) {
427  clientCtxPtr->forClient.retryCount = 0;
428  }
429  );
430 
431  if ( clientCtxPtr->forClient.hadBytes == clientCtxPtr->forClient.targetBytes ) // 接收完成
432  {
433  // 保存请求体数据
434  clientCtxPtr->requestBody = std::move(clientCtxPtr->forClient.data);
435  clientCtxPtr->forClient.resetStatus(); // 重置收发状态
436 
437  // 投递处理请求任务
438  this->_pool.task( &WsHttpServer::_doRequestTask, this, clientCtxPtr );
439  }
440  else if ( rcWait == 0 )
441  {
442  clientCtxPtr->forClient.retryCount++;
443  if ( clientCtxPtr->forClient.retryCount < DataRecvSendCtx::RetryCount )
444  {
445  // 重投尝试接收请求体
446  this->_pool.task( &WsHttpServer::_doRecvRequestBodyTask, this, clientCtxPtr );
447  }
448  else
449  {
450  ColorOutput( winux::fgRed, "Client[", clientCtxPtr->clientId, "]从客户`", clientCtxPtr->clientEpStr, "`接收一个请求体时长时间没有数据到来" );
451  ColorOutput( winux::fgFuchsia, "Client[", clientCtxPtr->clientId, "]移除客户`", clientCtxPtr->clientEpStr, "`,还剩", this->_clients.size() - 1, "客户" );
452  this->removeClient(clientCtxPtr->clientId);
453  }
454  }
455  else
456  {
457  ColorOutput( winux::fgRed, "Client[", clientCtxPtr->clientId, "]从客户`", clientCtxPtr->clientEpStr, "`接收请求体出错" );
458  ColorOutput( winux::fgFuchsia, "Client[", clientCtxPtr->clientId, "]移除客户`", clientCtxPtr->clientEpStr, "`,还剩", this->_clients.size() - 1, "客户" );
459  this->removeClient(clientCtxPtr->clientId);
460  }
461  }
462 
463  // 处理一个请求的任务
465  {
466  ColorOutput( winux::fgGreen, "Client[", clientCtxPtr->clientId, "]从客户`", clientCtxPtr->clientEpStr, "`接收请求头(bytes:", clientCtxPtr->requestHeaderStr.size(), ")" );
467  std::cout << clientCtxPtr->requestHeaderStr;
468  ColorOutput( winux::fgAtrovirens, "Client[", clientCtxPtr->clientId, "]从客户`", clientCtxPtr->clientEpStr, "`接收请求体(bytes:", clientCtxPtr->requestBody.getSize(), ")\n" );
469 
470  // 构造响应
471  std::stringbuf rspBuf;
472  std::ostream rspOut(&rspBuf);
473  http::Header rspHdr;
474  rspHdr.setResponseLine( "HTTP/1.1 200 OK", false );
475  rspHdr["Content-Type"] = "text/html";
476 
477  clientCtxPtr->url.clear();
478  clientCtxPtr->url.parse( clientCtxPtr->requestHeader.getUrl() );
479 
480  // 调用具体业务处理
481  this->_webProcess( clientCtxPtr, clientCtxPtr->requestHeader, clientCtxPtr->url, rspHdr, rspOut );
482 
483  // 输出响应
484  winux::AnsiString rspStr = rspBuf.str();
485  if ( rspStr.size() > 0 )
486  rspHdr("Content-Length") << rspStr.size();
487 
488  // 发送响应
489  if ( clientCtxPtr->clientSockPtr->sendUntil( rspHdr.toString() + rspStr ) )
490  {
491  // 如果不保活就删除连接,否则继续投请求任务
492  if ( winux::StrLower( clientCtxPtr->requestHeader["Connection"] ) == "keep-alive" )
493  {
494  // 尝试接收下一个请求头
495  this->_pool.task( &WsHttpServer::_doRecvRequestHeaderTask, this, clientCtxPtr );
496  }
497  else if ( clientCtxPtr->isWebSocketWrapper ) // 是否启用websocket包装,投递websocket处理任务
498  {
499  // 已经成功建立websocket连接
500  clientCtxPtr->websocket.state = ws::stateOpen;
501  // 调用onopen事件虚函数
502  this->onOpen(clientCtxPtr);
503 
504  // 投递接收websocket帧任务
505  this->_pool.task( &WsHttpServer::_doRecvWebSocketFrameTask, this, clientCtxPtr );
506  }
507  else
508  {
509  ColorOutput( winux::fgAqua, "Client[", clientCtxPtr->clientId, "]客户`", clientCtxPtr->clientEpStr, "`不保活,通信完毕即关闭" );
510  ColorOutput( winux::fgFuchsia, "Client[", clientCtxPtr->clientId, "]移除客户`", clientCtxPtr->clientEpStr, "`,还剩", this->_clients.size() - 1, "客户" );
511  this->removeClient(clientCtxPtr->clientId);
512  }
513  }
514  else // 发送响应失败
515  {
516  if ( clientCtxPtr->isWebSocketWrapper )
517  {
518  // websocket出错,连接意外失效
519  this->onError( clientCtxPtr, errCnnUnexpectedInvalid );
520  }
521  ColorOutput( winux::fgRed, "Client[", clientCtxPtr->clientId, "]客户`", clientCtxPtr->clientEpStr, "`发送响应失败" );
522  ColorOutput( winux::fgFuchsia, "Client[", clientCtxPtr->clientId, "]移除客户`", clientCtxPtr->clientEpStr, "`,还剩", this->_clients.size() - 1, "客户" );
523  this->removeClient(clientCtxPtr->clientId);
524  }
525  }
526 
527  // 具体业务处理
528  void _webProcess( ClientCtxSharedPointer & clientCtxPtr, http::Header const & reqHdr, http::Url const & url, http::Header & rspHdr, std::ostream & rspOut )
529  {
530  // 是否升级为websocket连接
531  if (
532  reqHdr.hasHeader("Upgrade") && reqHdr.getHeader("Upgrade") == "websocket" &&
533  reqHdr.hasHeader("Connection") && reqHdr.getHeader("Connection") == "Upgrade" &&
534  reqHdr.hasHeader("Sec-WebSocket-Key")
535  )
536  {
537  rspHdr.clear();
538  rspHdr.setResponseLine( "HTTP/1.1 101 Switching Protocols", false );
539  rspHdr["Upgrade"] = "websocket";
540  rspHdr["Connection"] = "Upgrade";
541  winux::String secWebsocketAccept = Base64Encode( winux::Sha1( reqHdr.getHeader("Sec-WebSocket-Key") + "258EAFA5-E914-47DA-95CA-C5AB0DC85B11" ) );
542  rspHdr["Sec-WebSocket-Accept"] = secWebsocketAccept;
543 
544  ColorOutput( winux::fgAqua, "Client[", clientCtxPtr->clientId, "]客户`", clientCtxPtr->clientEpStr, "`升级为websocket连接" );
545  std::cout << rspHdr.toString();
546 
547  // 正在升级为websocket
548  clientCtxPtr->websocket.state = ws::stateConnecting;
549 
550  // 启用WebSocket包装
551  clientCtxPtr->isWebSocketWrapper = true;
552  }
553  else
554  {
555  http::Vars get( url.getRawQueryStr().c_str() ); // GET变量
556  winux::String urlPathStr = url.getPath();
557  if ( 1 ) //urlPathStr == "" )
558  {
559  rspOut << "This is a websocket server!";
560  }
561  }
562  }
563 
564  // 接收一个WebSocket帧的任务
566  {
567  winux::GrowBuffer frameBuf;
568  ws::FrameBase * frame0;
569  winux::int64 wantBytes = 0;
570  winux::uint8 frameType = 0;
571 
572  // 先读取2字节的帧基础
573  if ( !clientCtxPtr->clientSockPtr->recvUntilSize( sizeof(ws::FrameBase), &frameBuf ) )
574  {
575  // websocket出错,连接意外失效
576  this->onError( clientCtxPtr, errCnnUnexpectedInvalid );
577  ColorOutput( winux::fgRed, "Client[", clientCtxPtr->clientId, "]客户`", clientCtxPtr->clientEpStr, "`读取帧基础失败" );
578  goto RecvFrameFailure;
579  }
580 
581  frame0 = frameBuf.getBuf<ws::FrameBase>();
582 
583  // 判断并读取载荷数据长度
584  if ( frame0->payloadLen < 126 )
585  {
586  frameType |= ws::ftBasic;
587  auto frame1 = frameBuf.getBuf<ws::FrameBasic>();
588  wantBytes += frame1->payloadLen;
589  }
590  else if ( frame0->payloadLen == 126 )
591  {
592  frameType |= ws::ftExtended16;
593  if ( !clientCtxPtr->clientSockPtr->recvUntilSize( sizeof(ws::FrameExtended16::extendedPayloadLen), &frameBuf ) )
594  {
595  // websocket出错,连接意外失效
596  this->onError( clientCtxPtr, errCnnUnexpectedInvalid );
597  ColorOutput( winux::fgRed, "Client[", clientCtxPtr->clientId, "]客户`", clientCtxPtr->clientEpStr, "`读取16位帧载荷长度失败" );
598  goto RecvFrameFailure;
599  }
600  auto len = ntoht( frameBuf.getBuf<ws::FrameExtended16>()->extendedPayloadLen ); // `extendedPayloadLen` is network byte order
601  wantBytes += len;
602  }
603  else // 127
604  {
605  frameType |= ws::ftExtended64;
606  if ( !clientCtxPtr->clientSockPtr->recvUntilSize( sizeof(ws::FrameExtended64::extendedPayloadLen), &frameBuf ) )
607  {
608  // websocket出错,连接意外失效
609  this->onError( clientCtxPtr, errCnnUnexpectedInvalid );
610  ColorOutput( winux::fgRed, "Client[", clientCtxPtr->clientId, "]客户`", clientCtxPtr->clientEpStr, "`读取64位帧载荷长度失败" );
611  goto RecvFrameFailure;
612  }
613  auto len = ntoht( frameBuf.getBuf<ws::FrameExtended64>()->extendedPayloadLen ); // `extendedPayloadLen` is network byte order
614  wantBytes += len;
615  }
616 
617  // 是否有掩码
618  if ( frame0->mask )
619  {
620  frameType |= 1;
621  // 读取掩码
622  if ( !clientCtxPtr->clientSockPtr->recvUntilSize( 4, &frameBuf ) )
623  {
624  // websocket出错,连接意外失效
625  this->onError( clientCtxPtr, errCnnUnexpectedInvalid );
626  ColorOutput( winux::fgRed, "Client[", clientCtxPtr->clientId, "]客户`", clientCtxPtr->clientEpStr, "`读取帧掩码失败" );
627  goto RecvFrameFailure;
628  }
629  }
630 
631  // 读取载荷数据
632  if ( !clientCtxPtr->clientSockPtr->recvUntilSize( (int)wantBytes, &frameBuf ) )
633  {
634  // websocket出错,连接意外失效
635  this->onError( clientCtxPtr, errCnnUnexpectedInvalid );
636  ColorOutput( winux::fgRed, "Client[", clientCtxPtr->clientId, "]客户`", clientCtxPtr->clientEpStr, "`读取载荷数据失败" );
637  goto RecvFrameFailure;
638  }
639 
640  // 进入帧处理
641  switch ( frameType )
642  {
643  case ws::ftBasic:
644  {
645  auto frame = frameBuf.getBuf<ws::FrameBasic>();
646  // 得到数据
647  char * data = frame->data<char>();
648  size_t len = frame->payloadLen;
649 
650  // 投递处理WebSocket帧任务
651  this->_pool.task( &WsHttpServer::_doProcessWebSocketFrameTask, this, clientCtxPtr, (bool)frame->fin, (winux::uint)frame->opcode, winux::Buffer( data, len ) );
652  }
653  break;
654  case ws::ftBasicMasked:
655  {
656  auto frame = frameBuf.getBuf<ws::FrameBasicMasked>();
657  // 得到数据
658  char * data = frame->data<char>();
659  int len = frame->payloadLen;
660 
661  // 掩码算法
662  for ( int i = 0; i < len; ++i )
663  data[i] ^= frame->maskingKey[ i % 4 ];
664 
665  // 投递处理WebSocket帧任务
666  this->_pool.task( &WsHttpServer::_doProcessWebSocketFrameTask, this, clientCtxPtr, (bool)frame->fin, (winux::uint)frame->opcode, winux::Buffer( data, len ) );
667  }
668  break;
669  case ws::ftExtended16:
670  {
671  auto frame = frameBuf.getBuf<ws::FrameExtended16>();
672  // 得到数据
673  char * data = frame->data<char>();
674  int len = ntoht(frame->extendedPayloadLen);
675 
676  // 投递处理WebSocket帧任务
677  this->_pool.task( &WsHttpServer::_doProcessWebSocketFrameTask, this, clientCtxPtr, (bool)frame->fin, (winux::uint)frame->opcode, winux::Buffer( data, len ) );
678  }
679  break;
681  {
682  auto frame = frameBuf.getBuf<ws::FrameExtended16Masked>();
683  // 得到数据
684  char * data = frame->data<char>();
685  int len = ntoht(frame->extendedPayloadLen);
686 
687  // 掩码算法
688  for ( int i = 0; i < len; ++i )
689  data[i] ^= frame->maskingKey[ i % 4 ];
690 
691  // 投递处理WebSocket帧任务
692  this->_pool.task( &WsHttpServer::_doProcessWebSocketFrameTask, this, clientCtxPtr, (bool)frame->fin, (winux::uint)frame->opcode, winux::Buffer( data, (winux::uint)len ) );
693  }
694  break;
695  case ws::ftExtended64:
696  {
697  auto frame = frameBuf.getBuf<ws::FrameExtended64>();
698  // 得到数据
699  char * data = frame->data<char>();
700  winux::int64 len = ntoht(frame->extendedPayloadLen);
701 
702  // 投递处理WebSocket帧任务
703  this->_pool.task( &WsHttpServer::_doProcessWebSocketFrameTask, this, clientCtxPtr, (bool)frame->fin, (winux::uint)frame->opcode, winux::Buffer( data, (winux::uint)len ) );
704  }
705  break;
707  {
708  auto frame = frameBuf.getBuf<ws::FrameExtended64Masked>();
709  // 得到数据
710  char * data = frame->data<char>();
711  winux::int64 len = ntoht(frame->extendedPayloadLen);
712 
713  // 掩码算法
714  for ( winux::int64 i = 0; i < len; ++i )
715  data[i] ^= frame->maskingKey[ i % 4 ];
716 
717  // 投递处理WebSocket帧任务
718  this->_pool.task( &WsHttpServer::_doProcessWebSocketFrameTask, this, clientCtxPtr, (bool)frame->fin, (winux::uint)frame->opcode, winux::Buffer( data, (winux::uint)len ) );
719  }
720  break;
721  }
722 
723  return;
724 
725  RecvFrameFailure:
726  ColorOutput( winux::fgFuchsia, "Client[", clientCtxPtr->clientId, "]移除客户`", clientCtxPtr->clientEpStr, "`,还剩", this->_clients.size() - 1, "客户" );
727  this->removeClient(clientCtxPtr->clientId);
728  }
729 
730  // 处理WebSocket帧的任务
731  void _doProcessWebSocketFrameTask( ClientCtxSharedPointer clientCtxPtr, bool fin, winux::uint opcode, winux::Buffer & payloadData )
732  {
733  ColorOutput( winux::fgGreen, "Client[", clientCtxPtr->clientId, "]客户`", clientCtxPtr->clientEpStr, "`收到一个帧{fin:", (winux::uint)fin, ",opcode:", opcode, ",datasize:", payloadData.getSize(), "}" );
734 
735  switch ( opcode )
736  {
737  case ws::dataContinued:
738  {
739  if ( fin )
740  {
741  clientCtxPtr->websocket.messageBuf.append( payloadData.getBuf<char>(), payloadData.getSize() );
742  this->onMessage( clientCtxPtr, clientCtxPtr->websocket.messageBuf, clientCtxPtr->websocket.messageType );
743  clientCtxPtr->websocket.messageBuf.clear();
744  clientCtxPtr->websocket.messageType = 0;
745  }
746  else
747  {
748  clientCtxPtr->websocket.messageBuf.append( payloadData.getBuf<char>(), payloadData.getSize() );
749  }
750 
751  // 投递接收下一个websocket帧的任务
752  this->_pool.task( &WsHttpServer::_doRecvWebSocketFrameTask, this, clientCtxPtr );
753  }
754  break;
755  case ws::dataText:
756  case ws::dataBinary:
757  case ws::dataRsv3:
758  case ws::dataRsv4:
759  case ws::dataRsv5:
760  case ws::dataRsv6:
761  case ws::dataRsv7:
762  {
763  if ( fin )
764  {
765  clientCtxPtr->websocket.messageBuf.append( payloadData.getBuf<char>(), payloadData.getSize() );
766  clientCtxPtr->websocket.messageType = opcode;
767  this->onMessage( clientCtxPtr, clientCtxPtr->websocket.messageBuf, clientCtxPtr->websocket.messageType );
768  clientCtxPtr->websocket.messageBuf.clear();
769  clientCtxPtr->websocket.messageType = 0;
770  }
771  else
772  {
773  clientCtxPtr->websocket.messageBuf.append( payloadData.getBuf<char>(), payloadData.getSize() );
774  clientCtxPtr->websocket.messageType = opcode;
775  }
776 
777  // 投递接收下一个websocket帧的任务
778  this->_pool.task( &WsHttpServer::_doRecvWebSocketFrameTask, this, clientCtxPtr );
779  }
780  break;
781  case ws::ctrlClose:
782  {
783  // 收到一个close帧
784  // 如果有载荷
785  winux::uint16 errCode = 0;
786  winux::AnsiString errStr;
787  if ( payloadData.getSize() > 0 )
788  {
789  auto closeData = payloadData.getBuf<ws::CloseFramePayloadData>();
790  errCode = ntoht(closeData->code);
791  errStr.assign( closeData->data<char>(), payloadData.getSize() - sizeof(ws::CloseFramePayloadData::code) );
792  ColorOutput( winux::fgMaroon, "Client[", clientCtxPtr->clientId, "]客户`", clientCtxPtr->clientEpStr, "`收到一个Close帧{errcode:", errCode, ",errstr:", errStr, "}" );
793  }
794  else // 没有载荷数据
795  {
796  ColorOutput( winux::fgMaroon, "Client[", clientCtxPtr->clientId, "]客户`", clientCtxPtr->clientEpStr, "`收到一个Close帧" );
797  }
798 
799  // 收到一个close帧,如果处在closing状态,应立即关闭连接并改为closed状态;如果不处在closing状态则改为closing,发送一个close帧到对方后改为closed。
800  if ( clientCtxPtr->websocket.state == ws::stateClosing )
801  {
802  clientCtxPtr->clientSockPtr->close();
803  clientCtxPtr->websocket.state = ws::stateClosed;
804 
805  this->onClose( clientCtxPtr, errCode, errStr ); // 处理onclose事件
806 
807  // 删除场景
808  ColorOutput( winux::fgFuchsia, "Client[", clientCtxPtr->clientId, "]移除客户`", clientCtxPtr->clientEpStr, "`,还剩", this->_clients.size() - 1, "客户" );
809  this->removeClient(clientCtxPtr->clientId);
810  }
811  else
812  {
813  clientCtxPtr->websocket.state = ws::stateClosing;
814 
815  if ( clientCtxPtr->websocket.close( errCode, errStr ) )
816  {
817  ColorOutput( winux::fgAqua, "服务器回了一个close帧" );
818  clientCtxPtr->clientSockPtr->close();
819  clientCtxPtr->websocket.state = ws::stateClosed;
820 
821  this->onClose( clientCtxPtr, errCode, errStr ); // 处理onclose事件
822 
823  ColorOutput( winux::fgFuchsia, "Client[", clientCtxPtr->clientId, "]移除客户`", clientCtxPtr->clientEpStr, "`,还剩", this->_clients.size() - 1, "客户" );
824  this->removeClient(clientCtxPtr->clientId);
825  }
826  }
827  }
828  break;
829  case ws::ctrlPing:
830  {
831  // 收到一个ping帧,回一个pong帧
832  SendWebSocketAnsi( clientCtxPtr->clientSockPtr.get(), ws::ctrlPong, false );
833 
834  // 投递接收下一个websocket帧任务
835  this->_pool.task( &WsHttpServer::_doRecvWebSocketFrameTask, this, clientCtxPtr );
836  }
837  break;
838  case ws::ctrlPong:
839  {
840  // 收到pong帧,无视
841 
842  // 投递接收下一个websocket帧任务
843  this->_pool.task( &WsHttpServer::_doRecvWebSocketFrameTask, this, clientCtxPtr );
844  }
845  break;
846  default:
847  // 意外帧
848  ColorOutput( winux::fgRed, "Client[", clientCtxPtr->clientId, "]客户`", clientCtxPtr->clientEpStr, "`收到一个意外帧{fin:",(winux::uint)fin,",opcode:",opcode,",datasize:",payloadData.getSize(),"}" );
849  ColorOutput( winux::fgFuchsia, "Client[", clientCtxPtr->clientId, "]移除客户`", clientCtxPtr->clientEpStr, "`,还剩", this->_clients.size() - 1, "客户" );
850  this->removeClient(clientCtxPtr->clientId);
851  break;
852  }
853  }
854 
855  virtual void onOpen( ClientCtxSharedPointer clientCtxPtr )
856  {
857  if ( _openHandler ) _openHandler(clientCtxPtr);
858  }
859 
860  virtual void onMessage( ClientCtxSharedPointer clientCtxPtr, winux::AnsiString const & data, int messageType )
861  {
862  if ( _messageHandler ) _messageHandler( clientCtxPtr, data, messageType );
863  }
864 
865  virtual void onClose( ClientCtxSharedPointer clientCtxPtr, winux::uint16 errCode, winux::AnsiString const & errStr )
866  {
867  if ( _closeHandler ) _closeHandler( clientCtxPtr, errCode, errStr );
868  }
869 
870  virtual void onError( ClientCtxSharedPointer clientCtxPtr, WebSocketErrorCode ec )
871  {
872  if ( _errorHandler ) _errorHandler( clientCtxPtr, ec );
873  }
874 
879 };
880 
881 
882 } // namespace ws
883 
884 } // namespace eiennet
#define EIENNET_FUNC_DECL(ret)
代表HTTP头部
Definition: http_misc.hpp:10
void _doRequestTask(ClientCtxSharedPointer clientCtxPtr)
void * getBuf() const
暴露缓冲区指针
Definition: utilities.hpp:485
客户场景类基础
String StrLower(String const &str)
bool SendWebSocketFrame(eiennet::Socket *sock, OpCode opcode, bool fin, bool mask, winux::byte *payloadData=nullptr, winux::uint payloadDataSize=0)
virtual void onMessage(ClientCtxSharedPointer clientCtxPtr, winux::AnsiString const &data, int messageType)
void clear()
清空
std::basic_string< char > AnsiString
Definition: utilities.hpp:165
MessageHandlerFunction _messageHandler
winux::String getHeader(winux::String const &name, winux::String const &defval=winux::String()) const
Definition: http_misc.hpp:76
winux::String toString(bool isAddCrlfAtEnd=true) const
virtual void onStartup(ClientCtxSharedPointer clientCtxPtr) override
virtual void onError(ClientCtxSharedPointer clientCtxPtr, WebSocketErrorCode ec)
static void ColorOutput(winux::ConsoleAttr const &ca, _ArgType &&...arg)
Definition: console.hpp:178
void onCloseHandler(CloseHandlerFunction handler)
WsHttpServer(winux::String const &serverIp, winux::ushort port, int threadCount=10, int listenBacklog=10, double durationSec=0.1)
void _doRecvRequestHeaderTask(ClientCtxSharedPointer clientCtxPtr)
std::function< void(ClientCtxSharedPointer clientCtxPtr, winux::AnsiString const &data, int messageType) > MessageHandlerFunction
void _doRecvRequestBodyTask(ClientCtxSharedPointer clientCtxPtr)
端点基类(套接字地址对象基类)
OpenHandlerFunction _openHandler
Buffer Sha1(void const *buf, size_t size)
将数据进行sha1编码,返回二进制数据
http协议的相关简单类封装
Definition: http.hpp:32
std::function< void(ClientCtxSharedPointer clientCtxPtr, WebSocketErrorCode ec) > ErrorHandlerFunction
void onOpenHandler(OpenHandlerFunction handler)
std::function< void(ClientCtxSharedPointer clientCtxPtr, winux::uint16 errCode, winux::AnsiString const &errStr) > CloseHandlerFunction
winux::String const & getRawQueryStr() const
获取未解析的查询段字符串.不以&#39;?&#39;开头.
Definition: http_url.hpp:90
URL类
Definition: http_url.hpp:8
WsHttpClientCtx(winux::uint64 clientId, winux::String clientEpStr, winux::SharedPointer< ip::tcp::Socket > clientSockPtr)
virtual void onOpen(ClientCtxSharedPointer clientCtxPtr)
std::function< void(ClientCtxSharedPointer clientCtxPtr) > OpenHandlerFunction
void _webProcess(ClientCtxSharedPointer &clientCtxPtr, http::Header const &reqHdr, http::Url const &url, http::Header &rspHdr, std::ostream &rspOut)
bool SendWebSocketBuffer(eiennet::Socket *sock, OpCode opcode, bool mask, winux::Buffer payloadData=winux::Buffer(), int perFrameMaxPayloadSize=-1)
void onMessageHandler(MessageHandlerFunction handler)
解析和设置get/post变量
Definition: http_misc.hpp:263
ReadyState
就绪状态
uint getSize() const
获取数据大小
Definition: utilities.hpp:494
套接字基类
缓冲区,表示内存中一块2进制数据(利用malloc/realloc进行内存分配)
Definition: utilities.hpp:436
数据收发场景,存放数据收发过程中的一些变量
ErrorHandlerFunction _errorHandler
bool hasHeader(winux::String const &name) const
Definition: http_misc.hpp:79
unsigned int uint
Definition: utilities.hpp:128
服务器类基础
FrameType
数据帧类型
void setResponseLine(winux::String const &responseLine, bool setStatus=true)
设置响应行 格式: HttpVersion StatusCode StatusStr eg. HTTP/1.1 200 OK
unsigned char byte
Definition: utilities.hpp:159
网络通信库
String Base64Encode(void const *buf, int size)
Base64编码
高效的可增长缓冲区,1.33倍冗余量
Definition: utilities.hpp:531
void _doRecvWebSocketFrameTask(ClientCtxSharedPointer clientCtxPtr)
_Ty ntoht(_Ty v)
unsigned short ushort
Definition: utilities.hpp:131
typename Server< _ClientCtx >::ClientCtxSharedPointer ClientCtxSharedPointer
unsigned short uint16
Definition: utilities.hpp:131
_Ty htont(_Ty v)
WebSocketErrorCode
WebSocket错误码
winux::String getPath() const
获取路径.
virtual void onClose(ClientCtxSharedPointer clientCtxPtr, winux::uint16 errCode, winux::AnsiString const &errStr)
CloseHandlerFunction _closeHandler
unsigned long long uint64
Definition: utilities.hpp:146
bool close(winux::uint16 errCode=-1, winux::AnsiString const &errStr="")
WebSocketWrapper(ClientCtx *clientCtx)
unsigned char uint8
Definition: utilities.hpp:133
winux::SharedPointer< ip::tcp::Socket > clientSockPtr
void onErrorHandler(ErrorHandlerFunction handler)
bool send(winux::AnsiString const &data, ws::OpCode opcode=ws::dataText)
std::basic_string< tchar > String
Definition: utilities.hpp:162
void append(void const *data, uint size)
添加数据
bool SendWebSocketAnsi(eiennet::Socket *sock, OpCode opcode, bool mask, winux::AnsiString payloadData=winux::AnsiString(), int perFrameMaxPayloadSize=-1)
void _doProcessWebSocketFrameTask(ClientCtxSharedPointer clientCtxPtr, bool fin, winux::uint opcode, winux::Buffer &payloadData)
long long int64
Definition: utilities.hpp:148