中间件底层,websocket

MsgCenter.cpp 3.8KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177
  1. #include "StdAfx.h"
  2. #include "MsgCenter.h"
  3. #include "IMsgObserver.h"
  4. SINGLETON_IMPLEMENT(CMsgCenter)
  5. CMsgCenter::CMsgCenter(void) : m_Stop(false)
  6. {
  7. m_pThreadObj = AfxBeginThread(__dipatchThreadFunc, this);
  8. }
  9. CMsgCenter::~CMsgCenter(void)
  10. {
  11. m_Stop = true;
  12. if(m_pThreadObj != NULL)
  13. {
  14. DWORD ExitCode;
  15. GetExitCodeThread(m_pThreadObj->m_hThread, &ExitCode);
  16. if(ExitCode == STILL_ACTIVE)
  17. {
  18. m_ThreadSleepFlag.SetEvent();
  19. WaitForSingleObject(m_pThreadObj->m_hThread, INFINITE);
  20. }
  21. m_pThreadObj = NULL;
  22. }
  23. }
  24. /*****************************************************************
  25. **【函数名称】 __wait
  26. **【函数功能】 置线程于等待信号状态
  27. **【参数】
  28. **【返回值】
  29. ****************************************************************/
  30. void CMsgCenter::__wait( void )
  31. {
  32. m_ThreadSleepFlag.Lock();
  33. }
  34. /*****************************************************************
  35. **【函数名称】 __dispatchMsg
  36. **【函数功能】 分派消息
  37. **【参数】
  38. **【返回值】
  39. ****************************************************************/
  40. int CMsgCenter::__dispatchMsg( void )
  41. {
  42. CSingleLock Locker(&m_RWLock, TRUE); // 互斥加锁
  43. // 获取事件队列信息
  44. if(m_MsgList.IsEmpty())
  45. return 0;
  46. MSG_INNER * pMsg = m_MsgList.RemoveHead();
  47. Locker.Unlock();
  48. // 根据实际类型分发消息
  49. __sendMsgToObserver(pMsg->MsgType, pMsg->Content);
  50. // 删除节点,释放内存
  51. delete pMsg;
  52. pMsg = NULL;
  53. // 返回
  54. return m_MsgList.GetCount();
  55. }
  56. /*****************************************************************
  57. **【函数名称】 __sendMsgToObserver
  58. **【函数功能】 消息发送给订阅者
  59. **【参数】
  60. **【返回值】
  61. ****************************************************************/
  62. void CMsgCenter::__sendMsgToObserver( UINT MsgType, const PARAM lpContent )
  63. {
  64. ObsvrMap::iterator itr;
  65. ObsvrMap::iterator itrEnd;
  66. itr = m_ObsvrMap.find(MsgType);
  67. if(itr == m_ObsvrMap.end())
  68. return;
  69. itrEnd = m_ObsvrMap.upper_bound(MsgType);
  70. for(; itr != itrEnd; ++itr)
  71. {
  72. itr->second->onMessage(MsgType, lpContent);
  73. }
  74. }
  75. /*****************************************************************
  76. **【函数名称】 __dipatchThreadFunc
  77. **【函数功能】 消息分发线程函数
  78. **【参数】
  79. **【返回值】
  80. ****************************************************************/
  81. UINT CMsgCenter::__dipatchThreadFunc( LPVOID pParam )
  82. {
  83. CMsgCenter* pSelf = (CMsgCenter*)pParam;
  84. while(!pSelf->m_Stop)
  85. {
  86. if(pSelf->__dispatchMsg() == 0)
  87. pSelf->__wait();
  88. } // end while
  89. return 0;
  90. }
  91. /*****************************************************************
  92. **【函数名称】 regist
  93. **【函数功能】 订阅消息
  94. **【参数】
  95. **【返回值】
  96. ****************************************************************/
  97. void CMsgCenter::regist( UINT MsgType, IMsgObserver* pObserver )
  98. {
  99. m_ObsvrMap.insert(pair<UINT, IMsgObserver*>(MsgType, pObserver));
  100. }
  101. /*****************************************************************
  102. **【函数名称】 unregist
  103. **【函数功能】 取消订阅
  104. **【参数】
  105. **【返回值】
  106. ****************************************************************/
  107. void CMsgCenter::unregist( UINT MsgType, IMsgObserver* pObserver )
  108. {
  109. ObsvrMap::iterator itr;
  110. ObsvrMap::iterator itrEnd;
  111. itr = m_ObsvrMap.find(MsgType);
  112. if(itr == m_ObsvrMap.end())
  113. return;
  114. itrEnd = m_ObsvrMap.upper_bound(MsgType);
  115. while(itr != itrEnd)
  116. {
  117. if(itr->second == pObserver)
  118. {
  119. m_ObsvrMap.erase(itr);
  120. break;
  121. }
  122. itr++;
  123. }
  124. }
  125. /*****************************************************************
  126. **【函数名称】 pushMsg
  127. **【函数功能】 压入消息
  128. **【参数】
  129. **【返回值】
  130. ****************************************************************/
  131. void CMsgCenter::pushMsg( UINT MsgType, PARAM lpContent )
  132. {
  133. // 生成消息实体
  134. MSG_INNER* pMsg = new MSG_INNER;
  135. memset(pMsg, 0, sizeof(pMsg));
  136. pMsg->MsgType = MsgType; // 消息类型
  137. pMsg->Content = lpContent;
  138. m_RWLock.Lock(); // 互斥加锁
  139. // 将消息插入缓冲队列
  140. m_MsgList.AddTail(pMsg);
  141. m_RWLock.Unlock();
  142. // 需要唤醒分发线程
  143. m_ThreadSleepFlag.SetEvent();
  144. }