中间件底层,websocket

MsgCenter.cpp 7.4KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343
  1. #include "StdAfx.h"
  2. #include "MsgCenter.h"
  3. #include "IMsgObserver.h"
  4. SINGLETON_IMPLEMENT(CMsgCenter)
  5. CMsgCenter::CMsgCenter(void) : m_Stop(false)
  6. {
  7. m_pThreadObj = AfxBeginThread(__dipatchThreadFunc, this);
  8. }
  9. CMsgCenter::~CMsgCenter(void)
  10. {
  11. m_Stop = true;
  12. if (m_pThreadObj != NULL)
  13. {
  14. DWORD ExitCode;
  15. GetExitCodeThread(m_pThreadObj->m_hThread, &ExitCode);
  16. if (ExitCode == STILL_ACTIVE)
  17. {
  18. m_ThreadSleepFlag.SetEvent();
  19. WaitForSingleObject(m_pThreadObj->m_hThread, INFINITE);
  20. }
  21. m_pThreadObj = NULL;
  22. }
  23. }
  24. /*****************************************************************
  25. **【函数名称】 __wait
  26. **【函数功能】 置线程于等待信号状态
  27. **【参数】
  28. **【返回值】
  29. ****************************************************************/
  30. void CMsgCenter::__wait(void)
  31. {
  32. m_ThreadSleepFlag.Lock();
  33. }
  34. /*****************************************************************
  35. **【函数名称】 __releaseMsg
  36. **【函数功能】 释放消息
  37. **【参数】
  38. **【返回值】
  39. ****************************************************************/
  40. void CMsgCenter::__releaseMsg(UINT MsgType, PARAM lpContent)
  41. {
  42. // 填充事件内容
  43. switch (MsgType)
  44. {
  45. case DEV_EVENT_LOG: // 设备日志
  46. {
  47. //delete (EventLog*)lpContent;
  48. auto pEvtLog = (EventLog*)lpContent;
  49. if (pEvtLog != nullptr)
  50. {
  51. delete pEvtLog;
  52. pEvtLog = nullptr;
  53. }
  54. }
  55. break;
  56. case DEV_EVENT_RES_TYPE: // 设备资源类型
  57. {
  58. //delete (EventResType*)lpContent;
  59. auto pEvtResType = (EventResType*)lpContent;
  60. if (pEvtResType != nullptr)
  61. {
  62. delete pEvtResType;
  63. pEvtResType = nullptr;
  64. }
  65. }
  66. break;
  67. case DEV_EVENT_RES_DETAIL: // 设备资源明细
  68. {
  69. //delete (EventResDetail*)lpContent;
  70. auto pEvtResDetail = (EventResDetail*)lpContent;
  71. if (pEvtResDetail != nullptr)
  72. {
  73. delete pEvtResDetail;
  74. pEvtResDetail = nullptr;
  75. }
  76. }
  77. break;
  78. case DEV_EVENT_RES_STATUS: // 设备资源状态变化
  79. {
  80. //delete (EventResStatus*)lpContent;
  81. auto pEvtResStatus = (EventResStatus*)lpContent;
  82. if (pEvtResStatus != nullptr)
  83. {
  84. delete pEvtResStatus;
  85. pEvtResStatus = nullptr;
  86. }
  87. }
  88. break;
  89. case DEV_EVENT_LINE_OP_PROCESS: // 线路操作进展
  90. {
  91. // delete (EventOpProcess*)lpContent;
  92. auto pEvtOpProcess = (EventOpProcess*)lpContent;
  93. if (pEvtOpProcess != nullptr)
  94. {
  95. delete pEvtOpProcess;
  96. pEvtOpProcess = nullptr;
  97. }
  98. }
  99. break;
  100. case DEV_EVENT_LINE_OP_RESULT: // 线路操作执行结果
  101. {
  102. //delete (EventOpResult*)lpContent;
  103. // 2021-02-04
  104. EventOpResult* pEvtOpResult = (EventOpResult*)lpContent;
  105. if (pEvtOpResult != nullptr)
  106. {
  107. delete pEvtOpResult;
  108. pEvtOpResult = nullptr;
  109. }
  110. }
  111. break;
  112. case DEV_EVENT_DEV_OPERATOR: // 设备主动操作事件
  113. {
  114. //delete (EventDevOperation*)lpContent;
  115. auto pEvtDevOperation = (EventDevOperation*)lpContent;
  116. if (pEvtDevOperation != nullptr)
  117. {
  118. delete pEvtDevOperation;
  119. pEvtDevOperation = nullptr;
  120. }
  121. }
  122. break;
  123. } // end switch
  124. }
  125. /*****************************************************************
  126. **【函数名称】 __dispatchMsg
  127. **【函数功能】 分派消息
  128. **【参数】
  129. **【返回值】
  130. ****************************************************************/
  131. int CMsgCenter::__dispatchMsg(void)
  132. {
  133. CSingleLock Locker(&m_RWLock, TRUE);
  134. // 获取事件队列信息
  135. if (m_MsgList.IsEmpty())
  136. return 0;
  137. MSG_INNER* pMsg = m_MsgList.RemoveHead();
  138. Locker.Unlock();
  139. // 根据实际类型分发消息
  140. __sendMsgToObserver(pMsg->MsgType, pMsg->Content);
  141. __releaseMsg(pMsg->MsgType, pMsg->Content);
  142. // 删除节点,释放内存
  143. delete pMsg;
  144. pMsg = NULL;
  145. // 返回
  146. return m_MsgList.GetCount();
  147. }
  148. /*****************************************************************
  149. **【函数名称】 __sendMsgToObserver
  150. **【函数功能】 消息发送给订阅者
  151. **【参数】
  152. **【返回值】
  153. ****************************************************************/
  154. void CMsgCenter::__sendMsgToObserver(UINT MsgType, const PARAM lpContent)
  155. {
  156. ObsvrMap::iterator itr;
  157. ObsvrMap::iterator itrEnd;
  158. itr = m_ObsvrMap.find(MsgType);
  159. if (itr == m_ObsvrMap.end())
  160. return;
  161. itrEnd = m_ObsvrMap.upper_bound(MsgType);
  162. for (; itr != itrEnd; ++itr)
  163. {
  164. if (itr->second != nullptr) // 2021-02-02 增加判断是否是空,防止null造成异常
  165. {
  166. itr->second->onMessage(MsgType, lpContent);
  167. }
  168. else
  169. {
  170. ILogger::getInstance().log(LOG_CLASS_BUSI, LOG_LEVEL_WARNING, _T("{CMsgCenter}: 消息中心任务分发失败,MsgType[%u],[%u]"), MsgType, itr->first);
  171. }
  172. }
  173. }
  174. /*****************************************************************
  175. **【函数名称】 __dipatchThreadFunc
  176. **【函数功能】 消息分发线程函数
  177. **【参数】
  178. **【返回值】
  179. ****************************************************************/
  180. UINT CMsgCenter::__dipatchThreadFunc(LPVOID pParam)
  181. {
  182. CMsgCenter* pSelf = (CMsgCenter*)pParam;
  183. while (!pSelf->m_Stop)
  184. {
  185. if (pSelf->__dispatchMsg() == 0)
  186. pSelf->__wait();
  187. } // end while
  188. return 0;
  189. }
  190. /*****************************************************************
  191. **【函数名称】 regist
  192. **【函数功能】 订阅消息
  193. **【参数】
  194. **【返回值】
  195. ****************************************************************/
  196. void CMsgCenter::regist(UINT MsgType, IMsgObserver* pObserver)
  197. {
  198. m_ObsvrMap.insert(pair<UINT, IMsgObserver*>(MsgType, pObserver));
  199. }
  200. /*****************************************************************
  201. **【函数名称】 unregist
  202. **【函数功能】 取消订阅
  203. **【参数】
  204. **【返回值】
  205. ****************************************************************/
  206. void CMsgCenter::unregist(UINT MsgType, IMsgObserver* pObserver)
  207. {
  208. ObsvrMap::iterator itr;
  209. ObsvrMap::iterator itrEnd;
  210. itr = m_ObsvrMap.find(MsgType);
  211. if (itr == m_ObsvrMap.end())
  212. return;
  213. itrEnd = m_ObsvrMap.upper_bound(MsgType);
  214. while (itr != itrEnd)
  215. {
  216. if (itr->second == pObserver)
  217. {
  218. m_ObsvrMap.erase(itr);
  219. break;
  220. }
  221. itr++;
  222. }
  223. }
  224. /*****************************************************************
  225. **【函数名称】 pushMsg
  226. **【函数功能】 压入消息
  227. **【参数】
  228. **【返回值】
  229. ****************************************************************/
  230. void CMsgCenter::pushMsg(UINT MsgType, const PARAM lpContent)
  231. {
  232. // 生成消息实体
  233. MSG_INNER* pMsg = new(std::nothrow) MSG_INNER;
  234. memset(pMsg, 0, sizeof(pMsg));
  235. pMsg->MsgType = MsgType; // 消息类型
  236. // 填充事件内容
  237. switch (MsgType)
  238. {
  239. case DEV_EVENT_LOG: // 设备日志
  240. {
  241. pMsg->Content = new(std::nothrow) EventLog;
  242. memcpy(pMsg->Content, lpContent, sizeof(EventLog));
  243. }
  244. break;
  245. case DEV_EVENT_RES_TYPE: // 设备资源类型
  246. {
  247. pMsg->Content = new(std::nothrow) EventResType;
  248. memcpy(pMsg->Content, lpContent, sizeof(EventResType));
  249. }
  250. break;
  251. case DEV_EVENT_RES_DETAIL: // 设备资源明细
  252. {
  253. pMsg->Content = new(std::nothrow) EventResDetail;
  254. memcpy(pMsg->Content, lpContent, sizeof(EventResDetail));
  255. }
  256. break;
  257. case DEV_EVENT_INIT_END: // 设备初始化结束
  258. case DEV_EVENT_DEV_CLOSED: // 设备已关闭
  259. {
  260. pMsg->Content = NULL;
  261. }
  262. break;
  263. case DEV_EVENT_RES_STATUS: // 设备资源状态变化
  264. {
  265. pMsg->Content = new(std::nothrow) EventResStatus;
  266. memcpy(pMsg->Content, lpContent, sizeof(EventResStatus));
  267. }
  268. break;
  269. case DEV_EVENT_LINE_OP_PROCESS: // 线路操作进展
  270. {
  271. pMsg->Content = new(std::nothrow) EventOpProcess;
  272. memcpy(pMsg->Content, lpContent, sizeof(EventOpProcess));
  273. }
  274. break;
  275. case DEV_EVENT_LINE_OP_RESULT: // 线路操作执行结果
  276. {
  277. pMsg->Content = new(std::nothrow) EventOpResult;
  278. memcpy(pMsg->Content, lpContent, sizeof(EventOpResult));
  279. }
  280. break;
  281. case DEV_EVENT_DEV_OPERATOR: // 设备主动操作事件
  282. {
  283. pMsg->Content = new(std::nothrow) EventDevOperation;
  284. memcpy(pMsg->Content, lpContent, sizeof(EventDevOperation));
  285. }
  286. break;
  287. } // end switch
  288. m_RWLock.Lock(); // 互斥加锁
  289. // 将消息插入缓冲队列
  290. m_MsgList.AddTail(pMsg);
  291. m_RWLock.Unlock();
  292. // 需要唤醒分发线程
  293. m_ThreadSleepFlag.SetEvent();
  294. }