中间件底层,websocket

MsgCenter.cpp 4.4KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208
  1. #include "StdAfx.h"
  2. #include "MsgCenter.h"
  3. #include "IMsgObserver.h"
  4. SINGLETON_IMPLEMENT(CMsgCenter)
  5. CMsgCenter::CMsgCenter(void) : m_Stop(false)
  6. {
  7. m_pThreadObj = AfxBeginThread(__dipatchThreadFunc, this);
  8. }
  9. CMsgCenter::~CMsgCenter(void)
  10. {
  11. m_Stop = true;
  12. if(m_pThreadObj != NULL)
  13. {
  14. DWORD ExitCode;
  15. GetExitCodeThread(m_pThreadObj->m_hThread, &ExitCode);
  16. if(ExitCode == STILL_ACTIVE)
  17. {
  18. m_ThreadSleepFlag.SetEvent();
  19. WaitForSingleObject(m_pThreadObj->m_hThread, INFINITE);
  20. }
  21. m_pThreadObj = NULL;
  22. }
  23. }
  24. /*****************************************************************
  25. **【函数名称】 __wait
  26. **【函数功能】 置线程于等待信号状态
  27. **【参数】
  28. **【返回值】
  29. ****************************************************************/
  30. void CMsgCenter::__wait( void )
  31. {
  32. m_ThreadSleepFlag.Lock();
  33. }
  34. void CMsgCenter::__releaseMsg(UINT MsgType, PARAM lpContent)
  35. {
  36. //2022-05-11
  37. if (IVR_MSG_PDU_RECIVE == MsgType)
  38. {
  39. CPduEntity* a_pPduEntity = (CPduEntity*)lpContent;
  40. if (a_pPduEntity != NULL)
  41. {
  42. delete a_pPduEntity;
  43. a_pPduEntity = NULL;
  44. }
  45. }
  46. }
  47. /*****************************************************************
  48. **【函数名称】 __dispatchMsg
  49. **【函数功能】 分派消息
  50. **【参数】
  51. **【返回值】
  52. ****************************************************************/
  53. int CMsgCenter::__dispatchMsg( void )
  54. {
  55. CSingleLock Locker(&m_RWLock, TRUE); // 互斥加锁
  56. // 获取事件队列信息
  57. if(m_MsgList.IsEmpty())
  58. return 0;
  59. MSG_INNER * pMsg = m_MsgList.RemoveHead();
  60. Locker.Unlock();
  61. // 根据实际类型分发消息
  62. __sendMsgToObserver(pMsg->MsgType, pMsg->Content);
  63. __releaseMsg(pMsg->MsgType, pMsg->Content);
  64. // 删除节点,释放内存
  65. delete pMsg;
  66. pMsg = NULL;
  67. // 返回
  68. return m_MsgList.GetCount();
  69. }
  70. /*****************************************************************
  71. **【函数名称】 __sendMsgToObserver
  72. **【函数功能】 消息发送给订阅者
  73. **【参数】
  74. **【返回值】
  75. ****************************************************************/
  76. void CMsgCenter::__sendMsgToObserver( UINT MsgType, const PARAM lpContent )
  77. {
  78. ObsvrMap::iterator itr;
  79. ObsvrMap::iterator itrEnd;
  80. itr = m_ObsvrMap.find(MsgType);
  81. if(itr == m_ObsvrMap.end())
  82. return;
  83. itrEnd = m_ObsvrMap.upper_bound(MsgType);
  84. for(; itr != itrEnd; ++itr)
  85. {
  86. itr->second->onMessage(MsgType, lpContent);
  87. }
  88. }
  89. /*****************************************************************
  90. **【函数名称】 __dipatchThreadFunc
  91. **【函数功能】 消息分发线程函数
  92. **【参数】
  93. **【返回值】
  94. ****************************************************************/
  95. UINT CMsgCenter::__dipatchThreadFunc( LPVOID pParam )
  96. {
  97. CMsgCenter* pSelf = (CMsgCenter*)pParam;
  98. while(!pSelf->m_Stop)
  99. {
  100. if(pSelf->__dispatchMsg() == 0)
  101. pSelf->__wait();
  102. } // end while
  103. return 0;
  104. }
  105. /*****************************************************************
  106. **【函数名称】 regist
  107. **【函数功能】 订阅消息
  108. **【参数】
  109. **【返回值】
  110. ****************************************************************/
  111. void CMsgCenter::regist( UINT MsgType, IMsgObserver* pObserver )
  112. {
  113. m_ObsvrMap.insert(pair<UINT, IMsgObserver*>(MsgType, pObserver));
  114. }
  115. /*****************************************************************
  116. **【函数名称】 unregist
  117. **【函数功能】 取消订阅
  118. **【参数】
  119. **【返回值】
  120. ****************************************************************/
  121. void CMsgCenter::unregist( UINT MsgType, IMsgObserver* pObserver )
  122. {
  123. ObsvrMap::iterator itr;
  124. ObsvrMap::iterator itrEnd;
  125. itr = m_ObsvrMap.find(MsgType);
  126. if(itr == m_ObsvrMap.end())
  127. return;
  128. itrEnd = m_ObsvrMap.upper_bound(MsgType);
  129. while(itr != itrEnd)
  130. {
  131. if(itr->second == pObserver)
  132. {
  133. m_ObsvrMap.erase(itr);
  134. break;
  135. }
  136. itr++;
  137. }
  138. }
  139. /*****************************************************************
  140. **【函数名称】 pushMsg
  141. **【函数功能】 压入消息
  142. **【参数】
  143. **【返回值】
  144. ****************************************************************/
  145. void CMsgCenter::pushMsg( UINT MsgType, const PARAM lpContent )
  146. {
  147. // 生成消息实体
  148. MSG_INNER* pMsg = new MSG_INNER;
  149. memset(pMsg, 0, sizeof(pMsg));
  150. pMsg->MsgType = MsgType; // 消息类型
  151. // 填充事件内容
  152. switch(MsgType)
  153. {
  154. case IVR_MSG_FLOW_STATE_UPDAET: // 流程状态变更
  155. case IVR_MSG_FLOW_CELL_UPDATE: // 流程节点更新
  156. {
  157. pMsg->Content = lpContent;
  158. }
  159. break;
  160. case IVR_MSG_PDU_RECIVE: // 2022-05-10 PDU消息
  161. {
  162. pMsg->Content = lpContent;
  163. }
  164. break;
  165. default:
  166. ASSERT(FALSE);
  167. } // end switch
  168. m_RWLock.Lock(); // 互斥加锁
  169. // 将消息插入缓冲队列
  170. m_MsgList.AddTail(pMsg);
  171. m_RWLock.Unlock();
  172. // 需要唤醒分发线程
  173. m_ThreadSleepFlag.SetEvent();
  174. }