中间件底层,websocket

CellSocket.cpp 10.0KB


  1. #include "StdAfx.h"
  2. #include "CellSocket.h"
  3. #include "IvrFlow.h"
  4. #include "FlowDataProvider.h"
  5. #include "MarkupSTL.h"
  6. using namespace std;
  7. IMPLEMENT_CELL_AUTOCREATE(CCellSocket, CELL_NAME_SOCKET)
  8. CCellSocket::CCellSocket(void) : m_FarPort(0), m_TimeOut(0), m_SuccessPos(0), m_FailPos(0)
  9. {
  10. }
  11. CCellSocket::CCellSocket( CCellSocket & cellSocket ) : CCellBase(cellSocket)
  12. {
  13. POSITION pos = cellSocket.m_InputVarList.GetHeadPosition();
  14. while(pos != NULL)
  15. {
  16. InputVarInfo *pVarInfo = cellSocket.m_InputVarList.GetNext(pos);
  17. ASSERT(pVarInfo != NULL);
  18. InputVarInfo *pVarInfoTmp = new InputVarInfo(*pVarInfo);
  19. ASSERT(pVarInfoTmp != NULL);
  20. m_InputVarList.AddTail(pVarInfoTmp);
  21. }
  22. for (int i = 0; i < cellSocket.m_OutputVarList.GetSize(); ++i)
  23. {
  24. ASSERT(cellSocket.m_OutputVarList[i].GetLength() != 0);
  25. m_OutputVarList.Add(cellSocket.m_OutputVarList[i]);
  26. }
  27. m_OpType = cellSocket.m_OpType;
  28. m_FarAddr = cellSocket.m_FarAddr;
  29. m_FarPort = cellSocket.m_FarPort;
  30. m_TimeOut = cellSocket.m_TimeOut;
  31. m_SuccessPos = cellSocket.m_SuccessPos;
  32. m_FailPos = cellSocket.m_FailPos;
  33. }
  34. CCellSocket::~CCellSocket(void)
  35. {
  36. __release();
  37. }
  38. /*****************************************************************
  39. **【函数名称】 __release
  40. **【函数功能】 释放资源
  41. **【参数】
  42. **【返回值】
  43. ****************************************************************/
  44. void CCellSocket::__release(void)
  45. {
  46. InputVarInfo* pInputVarInfo = NULL;
  47. while (!m_InputVarList.IsEmpty())
  48. {
  49. pInputVarInfo = m_InputVarList.RemoveHead();
  50. delete pInputVarInfo;
  51. pInputVarInfo = NULL;
  52. }
  53. }
  54. /*****************************************************************
  55. **【函数名称】 __formatInputVar
  56. **【函数功能】 格式化输入字符串
  57. **【参数】
  58. **【返回值】
  59. ****************************************************************/
  60. int CCellSocket::__formatInputVar( char* InputText )
  61. {
  62. CMarkupSTL XmlInput;
  63. XmlInput.AddElem(SOCKET_INPUT_PREFIX);
  64. XmlInput.AddAttrib(SOCKET_INPUT_OP_TYPE, m_OpType);
  65. int i = 1;
  66. CString VarName;
  67. CString VarValue;
  68. POSITION Pos = m_InputVarList.GetHeadPosition();
  69. while(Pos != NULL)
  70. {
  71. InputVarInfo* pVarInfo = m_InputVarList.GetNext(Pos);
  72. ASSERT(pVarInfo != NULL);
  73. if(pVarInfo->VarType == CELL_DATA_VAR)
  74. {
  75. if(!m_pIvrFlow->findVarValue(pVarInfo->szVarValue, VarValue))
  76. {
  77. CString Info;
  78. _getCellInfo(Info);
  79. ILogger::getInstance().log(LOG_CLASS_BUSI, LOG_LEVEL_WARNING, _T("{Cell}: 执行[%s]出错,变量[%s]未找到对应值"), Info, pVarInfo->szVarValue);
  80. }
  81. }
  82. else
  83. {
  84. VarValue = pVarInfo->szVarValue;
  85. }
  86. VarName.Format(_T("%s%d"), SOCKET_VAR_PREFIX, i++);
  87. XmlInput.AddChildElem(VarName, VarValue);
  88. }
  89. lstrcpy(InputText, XmlInput.GetDoc().c_str());
  90. return XmlInput.GetDoc().length();
  91. }
  92. /*****************************************************************
  93. **【函数名称】 __analyzeOutputVar
  94. **【函数功能】 解析输出字符串
  95. **【参数】
  96. **【返回值】
  97. ****************************************************************/
  98. bool CCellSocket::__analyzeOutputVar( char* OutputText )
  99. {
  100. CMarkupSTL XmlOutput;
  101. if(!XmlOutput.SetDoc(OutputText))
  102. {
  103. CString Info;
  104. _getCellInfo(Info);
  105. ILogger::getInstance().log(LOG_CLASS_BUSI, LOG_LEVEL_WARNING, _T("{Cell}: 执行[%s]出错,无法加载网络返回数据, Data = %s"), Info, OutputText);
  106. return false;
  107. }
  108. if(!XmlOutput.FindElem(SOCKET_OUTPUT_PREFIX))
  109. {
  110. CString Info;
  111. _getCellInfo(Info);
  112. ILogger::getInstance().log(LOG_CLASS_BUSI, LOG_LEVEL_WARNING, _T("{Cell}: 执行[%s]出错,无法解析网络返回数据, Data = %s"), Info, OutputText);
  113. return false;
  114. }
  115. CString Element;
  116. Element = XmlOutput.GetAttrib(SOCKET_INPUT_OP_TYPE).c_str();
  117. if(Element != m_OpType)
  118. {
  119. CString Info;
  120. _getCellInfo(Info);
  121. ILogger::getInstance().log(LOG_CLASS_BUSI, LOG_LEVEL_WARNING, _T("{Cell}: 执行[%s]出错,网络返回的操作类型[%s]与当前类型[%s]不匹配"), Info, Element, m_OpType);
  122. return false;
  123. }
  124. for(int i = 0; i < m_OutputVarList.GetSize(); ++i)
  125. {
  126. Element.Format(_T("%s%d"), SOCKET_VAR_PREFIX, i + 1);
  127. if(XmlOutput.FindChildElem(Element))
  128. {
  129. XmlOutput.IntoElem();
  130. m_pIvrFlow->addVar(m_OutputVarList[i], XmlOutput.GetData().c_str());
  131. XmlOutput.OutOfElem();
  132. }
  133. else
  134. {
  135. CString Info;
  136. _getCellInfo(Info);
  137. ILogger::getInstance().log(LOG_CLASS_BUSI, LOG_LEVEL_WARNING, _T("{Cell}: 执行[%s]出错,无法在网络数据中定位字段[%s], Data = %s"), Info, Element, OutputText);
  138. return false;
  139. }
  140. }
  141. return true;
  142. }
  143. /*****************************************************************
  144. **【函数名称】 __wait4Recv
  145. **【函数功能】 等候接收网络数据
  146. **【参数】
  147. **【返回值】
  148. ****************************************************************/
  149. bool CCellSocket::__wait4Recv(SOCKET Sock)
  150. {
  151. struct timeval tv;
  152. fd_set fds;
  153. FD_ZERO(&fds);
  154. FD_SET(Sock, &fds);
  155. tv.tv_sec = m_TimeOut;
  156. tv.tv_usec = 0;
  157. int res = select(0, &fds, NULL, NULL, &tv);
  158. if(res > 0)
  159. return FD_ISSET(Sock, &fds) > 0 ? true : false;
  160. else
  161. return false;
  162. }
  163. /*****************************************************************
  164. **【函数名称】 addInputVar
  165. **【函数功能】 添加输入变量
  166. **【参数】
  167. **【返回值】
  168. ****************************************************************/
  169. void CCellSocket::addInputVar( int VarType, const CString& VarVal )
  170. {
  171. InputVarInfo *pInputVar = new InputVarInfo(VarType, VarVal);
  172. ASSERT(pInputVar != NULL);
  173. m_InputVarList.AddTail(pInputVar);
  174. }
  175. /*****************************************************************
  176. **【函数名称】 Operate
  177. **【函数功能】 节点执行函数
  178. **【参数】
  179. **【返回值】 下一个节点编号
  180. ****************************************************************/
  181. int CCellSocket::operate( void )
  182. {
  183. if(m_pIvrFlow == NULL)
  184. return CELL_OP_ERROR;
  185. CString Info;
  186. _getCellInfo(Info);
  187. ILogger::getInstance().log(LOG_CLASS_BUSI, LOG_LEVEL_NORMAL, _T("{Cell}: 开始执行[%s]"), Info);
  188. SOCKET Sock = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
  189. if(Sock == INVALID_SOCKET)
  190. {
  191. ILogger::getInstance().log(LOG_CLASS_BUSI, LOG_LEVEL_WARNING, _T("{Cell}: 执行[%s]出错, 初始化套接字失败"), Info);
  192. return m_FailPos;
  193. }
  194. CHAR Buffer[VAR_LEN] = { 0 };
  195. int Len = __formatInputVar(Buffer);
  196. SOCKADDR_IN AddrFar;
  197. AddrFar.sin_family = AF_INET;
  198. AddrFar.sin_port = htons(m_FarPort);
  199. inet_pton(AF_INET, m_FarAddr, &(AddrFar.sin_addr));
  200. if(AddrFar.sin_addr.s_addr == INADDR_NONE)
  201. {
  202. hostent* pHost = gethostbyname(m_FarAddr);
  203. if(pHost == NULL)
  204. {
  205. closesocket(Sock);
  206. ILogger::getInstance().log(LOG_CLASS_BUSI, LOG_LEVEL_WARNING, _T("{Cell}: 执行[%s]出错, 远端地址异常, FarAddr = %s"), Info, m_FarAddr);
  207. return m_FailPos;
  208. }
  209. memcpy((char*)&AddrFar.sin_addr, pHost->h_addr, pHost->h_length);
  210. }
  211. int AddrLen = sizeof(SOCKADDR);
  212. Len = sendto(Sock, Buffer, Len, 0, (SOCKADDR*)&AddrFar, AddrLen) ;
  213. if(Len == SOCKET_ERROR)
  214. {
  215. closesocket(Sock);
  216. ILogger::getInstance().log(LOG_CLASS_BUSI, LOG_LEVEL_WARNING, _T("{Cell}: 执行[%s]出错, 发送通讯数据失败, ErrCode = %d"), Info, WSAGetLastError());
  217. return m_FailPos;
  218. }
  219. if(!__wait4Recv(Sock))
  220. {
  221. closesocket(Sock);
  222. ILogger::getInstance().log(LOG_CLASS_BUSI, LOG_LEVEL_WARNING, _T("{Cell}: 执行[%s]出错, 接收数据超时"), Info);
  223. return m_FailPos;
  224. }
  225. ZeroMemory(Buffer, VAR_LEN);
  226. ZeroMemory(&AddrFar, sizeof(AddrFar));
  227. Len = recvfrom(Sock, Buffer, VAR_LEN, 0, (SOCKADDR*)&AddrFar, &AddrLen);
  228. if(Len <= 0)
  229. {
  230. ILogger::getInstance().log(LOG_CLASS_BUSI, LOG_LEVEL_WARNING, _T("{Cell}: 执行[%s]出错, 接收数据失败, ErrCode = %d"), Info, WSAGetLastError());
  231. closesocket(Sock);
  232. return m_FailPos;
  233. }
  234. closesocket(Sock);
  235. Buffer[Len] = 0;
  236. if(!__analyzeOutputVar(Buffer))
  237. return m_FailPos;
  238. else
  239. return m_SuccessPos;
  240. }
  241. /*****************************************************************
  242. **【函数名称】 copy
  243. **【函数功能】 拷贝自身
  244. **【参数】
  245. **【返回值】 拷贝副本
  246. ****************************************************************/
  247. CCellBase * CCellSocket::copy( void )
  248. {
  249. CCellBase *pCellBase = new CCellSocket(*this);
  250. return pCellBase;
  251. }
  252. /*****************************************************************
  253. **【函数名称】 fillData
  254. **【函数功能】 节点解析,填充数据
  255. **【参数】 Provider:数据提供器
  256. **【返回值】 成功true,失败false
  257. ****************************************************************/
  258. bool CCellSocket::fillData( IFlowDataProvider& Provider )
  259. {
  260. CString Data;
  261. do
  262. {
  263. if(!Provider.getData(CELL_ATTRIBUTE_POS, Data))
  264. {
  265. Data = _T("节点号");
  266. break;
  267. }
  268. else
  269. {
  270. sscanf_s(Data, _T("%d"), &m_Pos);
  271. if(m_Pos < 1)
  272. {
  273. Data = _T("节点号");
  274. break;
  275. }
  276. }
  277. if(!Provider.getData(CELL_ATTRIBUTE_OP_TYPE, m_OpType))
  278. {
  279. Data = _T("操作类型标识");
  280. break;
  281. }
  282. if(!Provider.getData(CELL_ATTRIBUTE_FAR_IP, m_FarAddr))
  283. {
  284. Data = _T("远端IP");
  285. break;
  286. }
  287. if(!Provider.getData(CELL_ATTRIBUTE_FAR_PORT, Data))
  288. {
  289. Data = _T("远端端口");
  290. break;
  291. }
  292. else
  293. {
  294. sscanf_s(Data, _T("%d"), &m_FarPort);
  295. if(m_FarPort <= 0)
  296. {
  297. Data = _T("远端端口");
  298. break;
  299. }
  300. }
  301. if(!Provider.getData(CELL_ATTRIBUTE_RECV_TIME_OUT, Data))
  302. {
  303. Data = _T("接收超时");
  304. break;
  305. }
  306. else
  307. {
  308. sscanf_s(Data, _T("%d"), &m_TimeOut);
  309. if(m_TimeOut <= 0)
  310. {
  311. Data = _T("接收超时");
  312. break;
  313. }
  314. }
  315. if(!Provider.getData(CELL_ATTRIBUTE_SUCCESS_POS, Data))
  316. {
  317. Data = _T("成功跳转节点");
  318. break;
  319. }
  320. else
  321. {
  322. sscanf_s(Data, _T("%d"), &m_SuccessPos);
  323. if(m_SuccessPos < 0)
  324. {
  325. Data = _T("成功跳转节点");
  326. break;
  327. }
  328. }
  329. if(!Provider.getData(CELL_ATTRIBUTE_FAIL_POS, Data))
  330. {
  331. Data = _T("失败跳转节点");
  332. break;
  333. }
  334. else
  335. {
  336. sscanf_s(Data, _T("%d"), &m_FailPos);
  337. if(m_FailPos < 0)
  338. {
  339. Data = _T("失败跳转节点");
  340. break;
  341. }
  342. }
  343. Data.Format(_T("%s[@%s='%d']/%s"), XPATH_CELL, CELL_ATTRIBUTE_POS, m_Pos, FLOW_SUB_NODE_SOCK_INPUT);
  344. if(!Provider.getFlowSocketInputVar(Data, *this))
  345. {
  346. Data = _T("输入变量");
  347. break;
  348. }
  349. Data.Format(_T("%s[@%s='%d']/%s"), XPATH_CELL, CELL_ATTRIBUTE_POS, m_Pos, FLOW_SUB_NODE_SOCK_OUTPUT);
  350. if(!Provider.getDataSet(Data, CELL_ATTRIBUTE_VAR, m_OutputVarList))
  351. {
  352. Data = _T("输出变量");
  353. break;
  354. }
  355. Provider.getData(CELL_ATTRIBUTE_NOTE, m_Note);
  356. return true;
  357. } while (false);
  358. ILogger::getInstance().log(LOG_CLASS_BUSI, LOG_LEVEL_ERROR, _T("{Cell}: 节点[%s]解析失败, '%s'错误"), CELL_NAME_SOCKET, Data);
  359. return false;
  360. }