中间件底层,websocket

NetClient.cpp 8.8KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324
  1. #include "stdafx.h"
  2. #include <boost\bind.hpp>
  3. #include <chrono>
  4. #include <thread>
  5. #include <boost/asio.hpp>
  6. #include "NetClient.h"
  7. #include "PduEntity.h"
  8. #include "PduEntityHead.h"
  9. #include "PduDataFormat.h"
  10. #include "Logger.h"
  11. CNetClient::CNetClient(): thread_(nullptr), m_pSchedulerHeart(nullptr), m_pSchedulerAutoConn(nullptr), m_IsConnSuccess(false)
  12. {
  13. }
  14. CNetClient::~CNetClient()
  15. {
  16. }
  17. void CNetClient::setLinkStateChanged(LinkStateChangedFuncCB funcCB)
  18. {
  19. if (funcCB != nullptr&&this->m_LinkStateChangedFuncCB == nullptr) {
  20. this->m_LinkStateChangedFuncCB = funcCB;
  21. }
  22. }
  23. void CNetClient::setRecvCommand(RecvComdFuncCB funcCB)
  24. {
  25. if (funcCB != nullptr&&this->m_RecvComdFuncCB == nullptr) {
  26. this->m_RecvComdFuncCB = funcCB;
  27. }
  28. }
  29. bool CNetClient::CreatePduClient(CString a_strFarIp, int a_nFarPort, PDU_DEV_TYPE a_nLocalType, int a_nLocalId, bool IsAutoReconnect)
  30. {
  31. char logName[128];
  32. snprintf(logName, 128, "wsClient_%s", devTypeToStr(a_nLocalType).c_str());
  33. if (Logger::GetInstance()->Init("./log/", "wsClient_" + devTypeToStr(a_nLocalType) + "_log")) {
  34. Logger::GetInstance()->Start();
  35. }
  36. if (!CPduDataFormat::getInstance()->Load("./PDUFormat.ini")) {
  37. LOG_DEBUG("PDU配置文件加载失败");
  38. }
  39. init();
  40. m_IsAutoReconnect = IsAutoReconnect;
  41. return ConnectToServer(a_strFarIp, a_nFarPort, a_nLocalType, a_nLocalId);
  42. }
  43. bool CNetClient::ClosePduClient(PDU_DEV_TYPE a_nFarType, int a_nFarId)
  44. {
  45. m_IsAutoReconnect = false;
  46. KillTimerListen();
  47. KillTimerConn();
  48. close();
  49. terminate();
  50. LOG_DEBUG("Pdu Client关闭完成");
  51. return true;
  52. }
  53. bool CNetClient::Send(CPduEntity * a_pCmd, PDU_DEV_TYPE a_nDestType, int a_nDestId)
  54. {
  55. a_pCmd->SetLocalDevInfo(m_DevType, m_DevId); // 填充发送方信息
  56. a_pCmd->SetPeerDevInfo(a_nDestType, a_nDestId); // 填充接收方信息
  57. // 生成要发送的数据缓冲区
  58. UCHAR szBuf[PDU_HEAD_LEN + PDU_MAX_DATA_LEN];
  59. ZeroMemory(szBuf, PDU_HEAD_LEN + PDU_MAX_DATA_LEN);
  60. int nLen = a_pCmd->CreatePackge(szBuf);
  61. return __SendBuf(szBuf, nLen);
  62. }
  63. void CNetClient::SetLinkContent(PDU_LINK_STATE a_nLinkInfo)
  64. {
  65. PduLinkContent linkContent;
  66. memset(&linkContent, 0, sizeof(PduLinkContent));
  67. linkContent.nLocalType = m_DevType;
  68. linkContent.nLocalId = m_DevId;
  69. linkContent.nLinkState = a_nLinkInfo;
  70. lstrcpy(linkContent.szFarIp, uri.c_str());
  71. if (this->m_LinkStateChangedFuncCB != nullptr) {
  72. this->m_LinkStateChangedFuncCB(linkContent);
  73. }
  74. }
  75. void CNetClient::SetTimerListen()
  76. {
  77. // 注册成功启用心跳
  78. if (m_pSchedulerHeart == nullptr) {
  79. m_pSchedulerHeart = std::make_unique<CScheduler>(ios);
  80. m_pSchedulerHeart->setOnTimeFuncCB(std::bind(&this_type::onTime, this, std::placeholders::_1));
  81. }
  82. m_pSchedulerHeart->setTimer(PDU_LISTEN_TIMER, VAL_LISTEN_INTERVAL);
  83. LOG_DEBUG("ws开启心跳");
  84. }
  85. void CNetClient::KillTimerListen()
  86. {
  87. if (m_pSchedulerHeart != nullptr) {
  88. m_pSchedulerHeart->killTimer();
  89. }
  90. LOG_DEBUG("ws关闭心跳");
  91. }
  92. void CNetClient::SetTimerConn()
  93. {
  94. if (m_pSchedulerAutoConn == nullptr) {
  95. m_pSchedulerAutoConn = std::make_unique<CScheduler>(ios);
  96. m_pSchedulerAutoConn->setOnTimeFuncCB(std::bind(&this_type::onTime, this, std::placeholders::_1));
  97. }
  98. m_pSchedulerAutoConn->setTimer(PDU_RECONN_TIMER, VAL_RECONNECT_INTERVAL);
  99. LOG_DEBUG("ws开启自动连接");
  100. }
  101. void CNetClient::KillTimerConn()
  102. {
  103. if (m_pSchedulerAutoConn != nullptr) {
  104. m_pSchedulerAutoConn->killTimer();
  105. }
  106. LOG_DEBUG("ws关闭自动连接");
  107. }
  108. void CNetClient::on_open(websocketpp::connection_hdl hdl)
  109. {
  110. std::cout << "连接成功" << std::endl;
  111. m_IsConnSuccess.store(true);
  112. SetLinkContent(PDU_LINK_STATE_SUCCESSED); // 连接状态通知
  113. KillTimerConn(); // 关闭自动连接
  114. // 生成注册命令
  115. CPduEntity reg(PDU_CMD_REG);
  116. Send(&reg);
  117. LOG_DEBUG("ws连接成功并发送注册命令");
  118. }
  119. void CNetClient::on_fail(websocketpp::connection_hdl hdl)
  120. {
  121. std::cout << "连接失败" << std::endl;
  122. m_IsConnSuccess.store(false);
  123. SetLinkContent(PDU_LINK_STATE_FAILED);
  124. LOG_DEBUG("ws连接失败");
  125. if (m_IsAutoReconnect) { // 自动尝试重连
  126. SetTimerConn();
  127. }
  128. }
  129. void CNetClient::on_close(websocketpp::connection_hdl hdl)
  130. {
  131. std::cout << "连接关闭" << std::endl;
  132. m_IsConnSuccess.store(false);
  133. LOG_DEBUG("ws连接关闭");
  134. KillTimerListen(); // 定时心跳关闭
  135. SetLinkContent(PDU_LINK_STATE_DISCONNECTED);
  136. if (m_IsAutoReconnect) { // 自动尝试重连
  137. SetTimerConn();
  138. }
  139. }
  140. void CNetClient::on_message(websocketpp::connection_hdl hdl, message_ptr msg)
  141. {
  142. auto str = msg->get_payload();
  143. UCHAR m_szPdu[PDU_HEAD_LEN + PDU_MAX_DATA_LEN];
  144. memcpy(m_szPdu, &str[0], str.size());
  145. CPduEntityHead pduHead;
  146. // 解析PDU数据包头
  147. if (!pduHead.Decode(m_szPdu)) {
  148. memset(m_szPdu, 0, PDU_HEAD_LEN + PDU_MAX_DATA_LEN);
  149. }
  150. else {
  151. // 拷贝当前要处理的数据到临时缓冲区
  152. UCHAR szTmpBuf[PDU_MAX_DATA_LEN];
  153. ZeroMemory(szTmpBuf, PDU_MAX_DATA_LEN);
  154. memcpy_s(szTmpBuf, PDU_MAX_DATA_LEN, m_szPdu, PDU_HEAD_LEN + pduHead.GetDataLen());
  155. // 对数据进行处理
  156. CPduEntity pduEntity(&pduHead, &szTmpBuf[PDU_HEAD_LEN]);
  157. PDU_CMD_TYPE CmdType = pduEntity.GetCmdType();
  158. if (PDU_CMD_REG == CmdType) { // 注册
  159. if (pduEntity.GetIsExecReturn()) { // 注册返回命令
  160. if (pduEntity.GetDataBool(0)) { // 注册成功
  161. std::cout << "设备注册成功..." << std::endl;
  162. SetLinkContent(PDU_LINK_STATE_REG_OK);
  163. SetTimerListen();
  164. LOG_DEBUG("设备注册成功");
  165. }
  166. else { // 注册失败
  167. SetLinkContent(PDU_LINK_STATE_REG_FAILED);
  168. LOG_DEBUG("设备注册失败");
  169. }
  170. }
  171. }
  172. else {
  173. auto peerType = devTypeToStr(pduEntity.GetPeerDevType());
  174. auto localType = devTypeToStr(pduEntity.GetLocalDevType());
  175. if (PDU_CMD_LISTEN == pduEntity.GetCmdType()) {
  176. LOG_DEBUG("设备[%s]收到心跳消息", localType.c_str());
  177. }
  178. else {
  179. LOG_DEBUG("设备[%s]收到[%s]的消息[%d]", localType.c_str(), peerType.c_str(), pduEntity.GetCmdType());
  180. }
  181. if (this->m_RecvComdFuncCB != nullptr) {
  182. this->m_RecvComdFuncCB(&pduEntity);
  183. }
  184. }
  185. }
  186. }
  187. int CNetClient::init()
  188. {
  189. c.set_access_channels(websocketpp::log::alevel::none ^ websocketpp::log::alevel::none);
  190. c.set_error_channels(websocketpp::log::alevel::none ^ websocketpp::log::alevel::none);
  191. c.init_asio(&ios);
  192. //c.init_asio();
  193. c.set_message_handler(websocketpp::lib::bind(&this_type::on_message, this, ::_1, ::_2));
  194. c.set_fail_handler(websocketpp::lib::bind(&this_type::on_fail, this, ::_1));
  195. c.set_open_handler(websocketpp::lib::bind(&this_type::on_open, this, ::_1));
  196. c.set_close_handler(websocketpp::lib::bind(&this_type::on_close, this, ::_1));
  197. c.set_reuse_addr(true);
  198. c.start_perpetual();
  199. if (thread_ == nullptr)
  200. thread_ = websocketpp::lib::make_shared<websocketpp::lib::thread>(boost::bind(&boost::asio::io_service::run, &ios));
  201. return 1;
  202. }
  203. bool CNetClient::ConnectToServer(const CString & a_strFarIp, int a_nFarPort, PDU_DEV_TYPE a_nLocalType, int a_nLocalId)
  204. {
  205. CString urlStr;
  206. urlStr.Format("ws://%s:%d", a_strFarIp, a_nFarPort);
  207. uri = urlStr.GetBuffer(0);
  208. urlStr.ReleaseBuffer();
  209. m_DevType = a_nLocalType;
  210. LOG_DEBUG("ws连接开始连接[%s]", uri.c_str());
  211. return connect();
  212. }
  213. bool CNetClient::connect()
  214. {
  215. websocketpp::lib::error_code ec;
  216. client::connection_ptr con = c.get_connection(uri, ec);
  217. if (ec)
  218. {
  219. LOG_DEBUG("ws连接失败,失败信息[%s]",ec.message().c_str());
  220. std::cout << "could not create connection because: " << ec.message() << std::endl;
  221. return false;
  222. }
  223. hdl_ = con->get_handle();
  224. c.connect(con);
  225. //thread1_ = websocketpp::lib::make_shared<websocketpp::lib::thread>(boost::bind(&client::run, &c));
  226. return true;
  227. }
  228. void CNetClient::close()
  229. {
  230. try
  231. {
  232. c.close(hdl_, websocketpp::close::status::normal, "");
  233. }
  234. catch (const std::exception& e) {
  235. std::cout << e.what() << std::endl;
  236. }
  237. }
  238. void CNetClient::terminate()
  239. {
  240. c.stop_perpetual();
  241. if (thread_&&thread_->joinable())
  242. thread_->join();
  243. }
  244. bool CNetClient::__SendBuf(unsigned char a_szBuf[], std::uint32_t a_nBufLen)
  245. {
  246. try
  247. {
  248. std::string msg((char*)a_szBuf);
  249. c.send(hdl_, a_szBuf, a_nBufLen, websocketpp::frame::opcode::BINARY);
  250. return true;
  251. }
  252. catch (websocketpp::exception const &)
  253. {
  254. return false;
  255. }
  256. }
  257. void CNetClient::onTime(const std::int32_t& TimerId)
  258. {
  259. if (TimerId == PDU_LISTEN_TIMER) { // 心跳定时器
  260. CPduEntity Monitor(PDU_CMD_LISTEN);
  261. Send(&Monitor, PDU_DEV_TYPE_UNKNOWN, 0);
  262. static std::int64_t nSecond = 0;
  263. if (nSecond == 0) {
  264. std::cout << "定时器开始..." << std::endl;
  265. }
  266. else {
  267. std::cout << "定时器运行中:" << time(NULL) - nSecond << std::endl;
  268. }
  269. nSecond = time(NULL);
  270. }
  271. else if (PDU_RECONN_TIMER == TimerId && !m_IsConnSuccess.load()) { // 自动重连
  272. LOG_DEBUG("ws即将自动重新连接");
  273. connect();
  274. }
  275. }
  276. inline std::string CNetClient::devTypeToStr(PDU_DEV_TYPE devType)
  277. {
  278. std::string devTypeStr = "未知";
  279. if (PDU_DEV_TYPE_CTI == devType) {
  280. devTypeStr = "CTI";
  281. }
  282. else if (PDU_DEV_TYPE_IVR == devType) {
  283. devTypeStr = "IVR";
  284. }
  285. else if (PDU_DEV_TYPE_ACD == devType) {
  286. devTypeStr = "ACD";
  287. }
  288. else if (PDU_DEV_TYPE_SERVER == devType) {
  289. devTypeStr = "HpServer";
  290. }
  291. return devTypeStr;
  292. }
  293. CNetClient CNetClient::instance;
  294. CNetInterface* CNetInterface::getNetInstance() { return CNetClient::getInstance(); };