中间件底层,websocket

WebSocketServer.cpp 4.6KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165
  1. #include "stdafx.h"
  2. #include "WebSocketServer.h"
  3. #include <json/json.h>
  4. CWebSocketServer::CWebSocketServer()
  5. {
  6. endpoint_plain.clear_access_channels(websocketpp::log::alevel::all);
  7. endpoint_plain.set_access_channels(websocketpp::log::alevel::none | websocketpp::log::alevel::none);
  8. endpoint_plain.set_open_handler(bind(&CWebSocketServer::on_open<server_plain>, this, &endpoint_plain, std::placeholders::_1, WEBSOCKET_TYPE::NORMAL));
  9. endpoint_plain.set_close_handler(bind(&CWebSocketServer::on_close<server_plain>, this, &endpoint_plain, std::placeholders::_1, WEBSOCKET_TYPE::NORMAL));
  10. endpoint_plain.set_message_handler(bind(&CWebSocketServer::on_message<server_plain>, this, &endpoint_plain, std::placeholders::_1, std::placeholders::_2));
  11. endpoint_plain.init_asio(&ios);
  12. endpoint_plain.set_reuse_addr(true);
  13. endpoint_tls.clear_access_channels(websocketpp::log::alevel::all);
  14. endpoint_tls.set_access_channels(websocketpp::log::alevel::none | websocketpp::log::alevel::none);
  15. endpoint_tls.set_open_handler(bind(&CWebSocketServer::on_open<server_tls>, this, &endpoint_tls, std::placeholders::_1, WEBSOCKET_TYPE::TLS));
  16. endpoint_tls.set_close_handler(bind(&CWebSocketServer::on_close<server_tls>, this, &endpoint_tls, std::placeholders::_1, WEBSOCKET_TYPE::TLS));
  17. endpoint_tls.set_message_handler(bind(&CWebSocketServer::on_message<server_tls>, this, &endpoint_tls, std::placeholders::_1, std::placeholders::_2));
  18. // TLS endpoint has an extra handler for the tls init
  19. endpoint_tls.set_tls_init_handler(bind(&CWebSocketServer::on_tls_init, this, std::placeholders::_1));
  20. endpoint_tls.init_asio(&ios);
  21. endpoint_tls.set_reuse_addr(true);
  22. }
  23. CWebSocketServer::~CWebSocketServer()
  24. {
  25. stop();
  26. }
  27. bool CWebSocketServer::init(uint16_t port, uint16_t tlsPort, std::string sslFile, std::string sslPwd)
  28. {
  29. try
  30. {
  31. m_sslFile = sslFile;
  32. m_sslPwd = sslPwd;
  33. endpoint_plain.listen(port);
  34. endpoint_plain.start_accept();
  35. endpoint_tls.listen(tlsPort);
  36. endpoint_tls.start_accept();
  37. }
  38. catch (const std::exception& e)
  39. {
  40. std::cout << e.what() << std::endl;
  41. return false;
  42. }
  43. return true;
  44. }
  45. bool CWebSocketServer::sendMsg(websocketpp::connection_hdl hdl, std::string msg)
  46. {
  47. try
  48. {
  49. // ConvertGBKToUtf8(msg);
  50. msg = ansi_to_utf8(msg);
  51. std::unique_lock<std::mutex>lock(m_mut);
  52. auto type = m_servers[hdl.lock().get()];
  53. lock.unlock();
  54. if (WEBSOCKET_TYPE::NORMAL == type)
  55. {
  56. endpoint_plain.send(hdl, msg, websocketpp::frame::opcode::TEXT);
  57. }
  58. else if (WEBSOCKET_TYPE::TLS == type)
  59. {
  60. endpoint_tls.send(hdl, msg, websocketpp::frame::opcode::TEXT);
  61. }
  62. else
  63. {
  64. return false;
  65. }
  66. }
  67. catch (websocketpp::exception const & e) {
  68. stringstream str;
  69. str << "Echo failed because: "
  70. << "(" << e.what() << ")";
  71. std::cout << str.str() << std::endl;
  72. return false;
  73. }
  74. return true;
  75. }
  76. bool CWebSocketServer::sendMsg(long conID, std::string msg)
  77. {
  78. try
  79. {
  80. msg = ansi_to_utf8(msg);
  81. std::unique_lock<std::mutex>lock(m_mut);
  82. auto it = m_conIds.find(conID);
  83. if (it == m_conIds.end()) return false;
  84. auto type = m_servers[it->second.lock().get()];
  85. auto hdl = it->second;
  86. lock.unlock();
  87. if (WEBSOCKET_TYPE::NORMAL == type)
  88. {
  89. endpoint_plain.send(hdl, msg, websocketpp::frame::opcode::TEXT);
  90. }
  91. else if (WEBSOCKET_TYPE::TLS == type)
  92. {
  93. endpoint_tls.send(hdl, msg, websocketpp::frame::opcode::TEXT);
  94. }
  95. else
  96. {
  97. return false;
  98. }
  99. }
  100. catch (websocketpp::exception const & e) {
  101. stringstream str;
  102. str << "Echo failed because: "
  103. << "(" << e.what() << ")";
  104. std::cout << str.str() << std::endl;
  105. return false;
  106. }
  107. catch (std::exception e) {
  108. return false;
  109. }
  110. return true;
  111. }
  112. void CWebSocketServer::run()
  113. {
  114. // Start the Asio io_service run loop
  115. ios.run();
  116. }
  117. void CWebSocketServer::asyncRun()
  118. {
  119. // 独立运行client::run()的线程,主要是避免阻塞
  120. //m_Thread = websocketpp::lib::make_shared<websocketpp::lib::thread>(&websocketpp::server<websocketpp::config::asio>::run, &endpoint_plain);
  121. m_Thread = websocketpp::lib::make_shared<websocketpp::lib::thread>(boost::bind(&boost::asio::io_service::run, &ios));
  122. }
  123. void CWebSocketServer::close(websocketpp::connection_hdl hdl)
  124. {
  125. std::unique_lock<std::mutex>lock(m_mut);
  126. auto type = m_servers[hdl.lock().get()];
  127. lock.unlock();
  128. if (WEBSOCKET_TYPE::NORMAL == type)
  129. {
  130. endpoint_plain.close(hdl, websocketpp::close::status::normal, "");
  131. }
  132. else if (WEBSOCKET_TYPE::NORMAL == TLS)
  133. {
  134. endpoint_tls.close(hdl, websocketpp::close::status::normal, "");
  135. }
  136. }
  137. void CWebSocketServer::stop()
  138. {
  139. if (!ios.stopped()) {
  140. ios.stop();
  141. }
  142. if (endpoint_plain.is_listening()) {
  143. endpoint_plain.stop_listening();
  144. }
  145. if (endpoint_tls.is_listening()) {
  146. endpoint_tls.stop_listening();
  147. }
  148. if(m_Thread->joinable())
  149. m_Thread->join();
  150. }