#include "StdAfx.h" #include "NetLinkMain.h" #include "NetClient.h" #include "NetServer.h" #include "PduEntity.h" SINGLETON_IMPLEMENT(CNetLinkMain) /***************************************************************** **【函数名称】 getCommInstance **【函数功能】 得到消息通讯接口实例 **【参数】 **【返回值】 ****************************************************************/ IPduComm* CInterfaceWindow::getIocpCommInstance() { return &CNetLinkMain::GetInstance(); } /***************************************************************** **【函数名称】 getLinkInstance **【函数功能】 得到消息通讯接口实例 **【参数】 **【返回值】 ****************************************************************/ IPduLink* CInterfaceWindow::getIocpLinkInstance() { return &CNetLinkMain::GetInstance(); } CNetLinkMain::CNetLinkMain(void) : m_pServer(NULL), m_hIocp(INVALID_HANDLE_VALUE), m_phWorkerThreads(NULL), m_ThreadCount(0), m_ThreadStop(true), m_DevType(PDU_DEV_TYPE_UNKNOWN), m_DevId(0) { WSADATA wsaData; if (WSAStartup(MAKEWORD(2,2), &wsaData) != NO_ERROR) AfxMessageBox("WSAStartup执行失败"); } CNetLinkMain::~CNetLinkMain(void) { StopAll(); } /***************************************************************** **【函数名称】 __workerThread **【函数功能】 为IOCP请求服务的工作者线程 **【参数】 **【返回值】 ****************************************************************/ DWORD WINAPI CNetLinkMain::__workerThread( LPVOID lpParam ) { CNetLinkMain* pSelf = (CNetLinkMain*)lpParam; ASSERT(pSelf != NULL); OVERLAPPED* pOverlapped = NULL; CSocketBase* pNetEnd = NULL; DWORD dwBytesTransfered = 0; BOOL Res; while(!pSelf->m_ThreadStop) { // 循环处理请求,知道接收到Shutdown信息为止 Res = GetQueuedCompletionStatus(pSelf->m_hIocp, &dwBytesTransfered, (PULONG_PTR)&pNetEnd, &pOverlapped, INFINITE); // 如果收到的是退出标志,则直接退出 if(VAL_THREAD_EXIT_CODE == pNetEnd) break; // 判断是否出现了错误 if(!Res) { if(!pSelf->__handleNetEndError(pNetEnd, GetLastError())) { ASSERT(FALSE); // break; } continue; } else { if(dwBytesTransfered == 0 && !pSelf->__isClientExisted(pNetEnd)) continue; pNetEnd->doJob(pOverlapped, dwBytesTransfered); } pSelf->m_ClientContainer.clearClientDeleted(); } TRACE(_T("Thread quit @CNetLinkMain::_workerThread\r\n")); return 0; } /***************************************************************** **【函数名称】 __handleNetEndError **【函数功能】 处理网络端错误 **【参数】 **【返回值】 ****************************************************************/ bool CNetLinkMain::__handleNetEndError( CSocketBase* pNetEnd, DWORD ErrorCode ) { switch(ErrorCode) { case WAIT_TIMEOUT: // 超时,继续等待 { return true; } break; case ERROR_CONNECTION_ABORTED: // 连接被中断 { return true; // 此处pNetEnd往往是为无效野指针,故而不能调用pNetEnd->onDisconnect()。 } break; case ERROR_NETNAME_DELETED: // 客户端异常退出 case ERROR_PORT_UNREACHABLE: // 没有任何服务正在远程系统上的目标网络终结点上操作 default: { if(pNetEnd != NULL) { pNetEnd->onDisconnect(); return true; } else return false; // IO错误,返回false退出工作线程 } } } /***************************************************************** **【函数名称】 __getCountOfProcessors **【函数功能】 获得本机的处理器数量 **【参数】 **【返回值】 ****************************************************************/ int CNetLinkMain::__getCountOfProcessors( void ) { SYSTEM_INFO si; GetSystemInfo(&si); return si.dwNumberOfProcessors; } /***************************************************************** **【函数名称】 __getThreadCount **【函数功能】 获得服务器端线程数量 **【参数】 **【返回值】 ****************************************************************/ int CNetLinkMain::__getThreadCount( PDU_DEV_TYPE a_LocalType ) { int Count = 0; switch(a_LocalType) { case PDU_DEV_TYPE_CTI: case PDU_DEV_TYPE_SC_SERVER: Count = __getCountOfProcessors(); break; case PDU_DEV_TYPE_ACD: Count = VAL_THREAD_PER_PROCESSOR * __getCountOfProcessors(); break; default: Count = __getCountOfProcessors(); break; } return Count; } /***************************************************************** **【函数名称】 __setupIocp **【函数功能】 创建完成端口服务端模式 **【参数】 **【返回值】 ****************************************************************/ bool CNetLinkMain::__setupIocp( PDU_DEV_TYPE a_LocalType ) { // 建立完成端口 m_hIocp = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0); if ( NULL == m_hIocp) return false; // 根据本机中的处理器数量,建立对应的线程数 m_ThreadCount = __getThreadCount(a_LocalType); // 为工作者线程初始化句柄 m_phWorkerThreads = new HANDLE[m_ThreadCount]; m_ThreadStop = false; // 根据计算出来的数量建立工作者线程 DWORD ThreadID; for (int i = 0; i < m_ThreadCount; i++) { m_phWorkerThreads[i] = ::CreateThread(0, 0, __workerThread, (void *)this, 0, &ThreadID); } TRACE(_T("ThreadCount: %d @CNetLinkMain::__setupIocp\r\n"), m_ThreadCount); return true; } /***************************************************************** **【函数名称】 __setLocalInfo **【函数功能】 设定PDU本端设备类型及ID **【参数】 a_nDevType 本端设备类型 a_nDevId 本端设备ID **【返回值】 ****************************************************************/ void CNetLinkMain::__setLocalInfo( PDU_DEV_TYPE a_nDevType, int a_nDevId ) { m_DevType = a_nDevType; m_DevId = a_nDevId; } /***************************************************************** **【函数名称】 __setLinkContent **【函数功能】 生成连接信息内容 **【参数】 a_nLinkInfo SOCKET连接状态 a_pClient 关联的客户端 **【返回值】 ****************************************************************/ void CNetLinkMain::__setLinkContent( PduLinkContent& linkContent, PDU_LINK_STATE a_nLinkInfo, CNetClient* a_pClient ) { memset(&linkContent, 0, sizeof(PduLinkContent)); CString strIp = _T(""); linkContent.nLocalType = m_DevType; linkContent.nLocalId = m_DevId; linkContent.nLinkState = a_nLinkInfo; a_pClient->getFarLinkInfo(strIp, linkContent.nFarPort); a_pClient->getFarDevInfo(linkContent.nFarType, linkContent.nFarId); lstrcpy(linkContent.szFarIp, strIp); } /***************************************************************** **【函数名称】 __setLinkContent **【函数功能】 生成连接信息内容 **【参数】 a_nLinkInfo SOCKET连接状态 a_pClient 关联的客户端 **【返回值】 ****************************************************************/ void CNetLinkMain::__setLinkContent( PduLinkContent& linkContent, CNetClient* a_pClient ) { memset(&linkContent, 0, sizeof(PduLinkContent)); CString strIp = _T(""); linkContent.nLocalType = m_DevType; linkContent.nLocalId = m_DevId; a_pClient->getFarLinkInfo(strIp, linkContent.nFarPort); a_pClient->getFarDevInfo(linkContent.nFarType, linkContent.nFarId); lstrcpy(linkContent.szFarIp, strIp); } /***************************************************************** **【函数名称】 __isClientExisted **【函数功能】 判断客户端是否还存在 **【参数】 **【返回值】 ****************************************************************/ bool CNetLinkMain::__isClientExisted( CSocketBase* pClient ) { if(pClient == m_pServer) return true; return m_ClientContainer.getClient((CNetClient*)pClient) != NULL; } /***************************************************************** **【函数名称】 Send **【函数功能】 发送PDU命令 **【参数】 a_pCmd 要发送的命令实体 a_nDestType 消息要发送的目标设备类型 a_nDestId 消息要发送的目标设备ID **【返回值】 ****************************************************************/ BOOL CNetLinkMain::Send( CPduEntity* a_pCmd, PDU_DEV_TYPE a_nDestType, int a_nDestId ) { CNetClient* pClient = NULL; // 对注册命令的特殊处理 if(a_pCmd->GetCmdType() == PDU_CMD_REG && a_pCmd->GetIsExecReturn()) { // 查找要发送命令的连接 pClient = m_ClientContainer.getClient(a_pCmd->GetAssoSocket()); if(pClient == NULL) return FALSE; // 触发注册事件 if(a_pCmd->GetDataBool(0)) { pClient->setFarDevInfo(a_nDestType, a_nDestId); onConnRegistOK(pClient); } else { onConnRegistFailed(pClient); } // end if } else { // 查找要发送命令的连接 pClient = m_ClientContainer.getClient(a_nDestType, a_nDestId); if(pClient == NULL) return FALSE; } // end if // 发送命令 a_pCmd->SetLocalDevInfo(m_DevType, m_DevId); // 填充发送方信息 a_pCmd->SetPeerDevInfo(a_nDestType, a_nDestId); // 填充接收方信息 return pClient->sendDirectly(a_pCmd) ? TRUE : FALSE; } /***************************************************************** **【函数名称】 Send2All **【函数功能】 向所有注册过的客户端发送PDU **【参数】 **【返回值】 ****************************************************************/ void CNetLinkMain::Send2All( CPduEntity* a_pCmd ) { // 发送命令 a_pCmd->SetLocalDevInfo(m_DevType, m_DevId); // 填充发送方信息 m_ClientContainer.send2All(a_pCmd); } /***************************************************************** **【函数名称】 getLocalInfo **【函数功能】 读取本端设备类型及ID **【参数】 **【返回值】 a_DevType 本端设备类型 a_DevId 本端设备ID ****************************************************************/ void CNetLinkMain::getLocalInfo( PDU_DEV_TYPE& a_DevType, int& a_DevId ) { a_DevType = m_DevType; a_DevId = m_DevId; } /***************************************************************** **【函数名称】 GetLinkInfo **【函数功能】 获取指定连接的网络信息 **【参数】 OUT a_strIp 指定连接的对端IP OUT a_nPort 指定连接的对端PORT IN a_nType 要查询的连接类型 IN a_nId 要查询的连接ID **【返回值】 ****************************************************************/ BOOL CNetLinkMain::GetLinkInfo( CString& a_strIp, int& a_nPort, PDU_DEV_TYPE a_nType, int a_nId ) { // 查找连接 CNetClient* pClient = m_ClientContainer.getClient(a_nType, a_nId); if(pClient == NULL) return FALSE; // 获取连接的网络信息 pClient->getFarLinkInfo(a_strIp, a_nPort); return TRUE; } /***************************************************************** **【函数名称】 GetPeerIp **【函数功能】 获取PDU命令的发送方IP **【参数】 OUT a_strIp 发送当前PDU命令的对端IP IN a_pCmd 当前接收到的PDU命令 **【返回值】 ****************************************************************/ BOOL CNetLinkMain::GetPeerIp( CString& a_strIp, CPduEntity* a_pCmd ) { CNetClient* pClient = m_ClientContainer.getClient(a_pCmd->GetAssoSocket()); if(pClient == NULL) return FALSE; // 获取连接的网络信息 int nPort = 0; pClient->getFarLinkInfo(a_strIp, nPort); return TRUE; } /***************************************************************** **【函数名称】 CreatePduServer **【函数功能】 创建PDU通讯服务器 **【参数】 a_nListenPort SOCKET监听端口 a_nLocalType 服务器设备类型 a_nLocalId 服务器设备ID **【返回值】 ****************************************************************/ BOOL CNetLinkMain::CreatePduServer( int a_nListenPort, PDU_DEV_TYPE a_nLocalType, int a_nLocalId ) { if(!__setupIocp(a_nLocalType)) return FALSE; m_pServer = new CNetServer(this); ASSERT(m_pServer != NULL); // 启动SERVER if(m_pServer->create(a_nListenPort)) { __setLocalInfo(a_nLocalType, a_nLocalId); return TRUE; } else { delete m_pServer; m_pServer = NULL; } // end if return FALSE; } /***************************************************************** **【函数名称】 CreatePduClient **【函数功能】 创建PDU通讯客户端 **【参数】 a_strFarIp 要连接的服务器IP a_nFarPort 要连接的服务器端口号 a_nLocalType 本端设备类型 a_nLocalId 本端设备ID a_nFarType 对端设备类型 a_nFarId 对端设备ID **【返回值】 TRUE 启动连接成功 FALSE 启动连接失败 ****************************************************************/ BOOL CNetLinkMain::CreatePduClient( CString a_strFarIp, int a_nFarPort, PDU_DEV_TYPE a_nLocalType, int a_nLocalId, PDU_DEV_TYPE a_nFarType, int a_nFarId, bool IsAutoReconnect ) { // 如果已存在到对端的连接 CNetClient* pClient = m_ClientContainer.getClient(a_nFarType, a_nFarId); if(pClient != NULL) return FALSE; // 开始连接 pClient = new CNetClient(this, PDU_LINK_TYPE_CLIENT, IsAutoReconnect); m_ClientContainer.insertClient(pClient); __setLocalInfo(a_nLocalType, a_nLocalId); if(pClient->connect2Server(a_strFarIp, a_nFarPort, a_nFarType, a_nFarId)) { return TRUE; } else { return FALSE; } // end if } /***************************************************************** **【函数名称】 ClosePduClient **【函数功能】 关闭PDU通讯客户端 **【参数】 a_nFarType 对端设备类型 a_nFarId 对端设备ID **【返回值】 TRUE 启动连接成功 FALSE 启动连接失败 ****************************************************************/ BOOL CNetLinkMain::ClosePduClient( PDU_DEV_TYPE a_nFarType, int a_nFarId ) { return m_ClientContainer.removeClient(a_nFarType, a_nFarId) ? TRUE : FALSE; } /***************************************************************** **【函数名称】 RegistPduLinkProc **【函数功能】 订阅连接管理事件 **【参数】 a_pPduLinkProc 要对事件进行处理的接口类 bIsInsert 添加/删除定阅 **【返回值】 ****************************************************************/ BOOL CNetLinkMain::RegistPduLinkProc( IPduLinkProc* a_pPduLinkProc, BOOL bIsInsert ) { return m_EventHost.RegLinkProc(a_pPduLinkProc, bIsInsert); } /***************************************************************** **【函数名称】 RegistPduCommProc **【函数功能】 订阅消息通知事件 **【参数】 a_pPduCommProc 要对事件进行处理的接口类 bIsInsert 添加/删除定阅 **【返回值】 ****************************************************************/ BOOL CNetLinkMain::RegistPduCommProc( IPduCommProc* a_pPduCommProc, BOOL bIsInsert ) { return m_EventHost.RegCommProc(a_pPduCommProc, bIsInsert); } /***************************************************************** **【函数名称】 waitNetEnd **【函数功能】 等候网络一端的连接和数据 **【参数】 pNetEnd:网络一端 **【返回值】 ****************************************************************/ bool CNetLinkMain::waitNetEnd( CSocketBase* pNetEnd ) { ASSERT(pNetEnd != NULL); // 将用于和客户端通信的SOCKET绑定到完成端口中 return CreateIoCompletionPort((HANDLE)pNetEnd->assoSocket(), m_hIocp, (DWORD)pNetEnd, 0) != NULL; } /***************************************************************** **【函数名称】 onConnEstablished **【函数功能】 初始连接建立的后续处理 **【参数】 **【返回值】 ****************************************************************/ void CNetLinkMain::onConnEstablished( CNetClient* a_pClient ) { ASSERT(a_pClient != NULL); if(!waitNetEnd(a_pClient) || !a_pClient->ready4Recv()) { delete a_pClient; TRACE(_T("{!waitNetEnd(a_pClient) || !a_pClient->ready4Recv()} @CNetLinkMain::onConnEstablished\r\n")); return; } // 对SOCKET容器的处理 PDU_LINK_STATE nLinkState = m_ClientContainer.onConnEstablished(a_pClient); // 对事件观查者的处理 PduLinkContent linkContent; __setLinkContent(linkContent, nLinkState, a_pClient); m_EventHost.OnLinkStateChanged(linkContent); // 对SOCKET监控的处理 a_pClient->onConnEstablished(); } /***************************************************************** **【函数名称】 onConnFailed **【函数功能】 初连接断开/失败的后续处理 **【参数】 **【返回值】 ****************************************************************/ void CNetLinkMain::onConnFailed( CNetClient* a_pClient ) { // 对SOCKET监控的处理 a_pClient->onConnFailed(); PduLinkContent linkContent; __setLinkContent(linkContent, a_pClient); // 对SOCKET容器的处理 linkContent.nLinkState = m_ClientContainer.onConnFailed(a_pClient); // 对事件观查者的处理 m_EventHost.OnLinkStateChanged(linkContent); } /***************************************************************** **【函数名称】 onConnRegistOK **【函数功能】 连接注册成功的后续处理 **【参数】 **【返回值】 ****************************************************************/ void CNetLinkMain::onConnRegistOK( CNetClient* a_pClient ) { // 对SOCKET监控的处理 a_pClient->onConnRegistOK(); // 对SOCKET容器的处理 PDU_LINK_STATE nLinkState = m_ClientContainer.onConnRegistOK(a_pClient); // 对事件观查者的处理 PduLinkContent linkContent; __setLinkContent(linkContent, nLinkState, a_pClient); m_EventHost.OnLinkStateChanged(linkContent); } /***************************************************************** **【函数名称】 onConnRegistFailed **【函数功能】 连接注册失败的后续处理 **【参数】 **【返回值】 ****************************************************************/ void CNetLinkMain::onConnRegistFailed( CNetClient* a_pClient ) { // 对SOCKET监控的处理 a_pClient->onConnRegistFailed(); // 对SOCKET容器的处理 PDU_LINK_STATE nLinkState = m_ClientContainer.onConnRegistFailed(a_pClient); // 对事件观查者的处理 PduLinkContent linkContent; __setLinkContent(linkContent, nLinkState, a_pClient); m_EventHost.OnLinkStateChanged(linkContent); } /***************************************************************** **【函数名称】 onRecvCommand **【函数功能】 命令接收的处理 **【参数】 a_pCmd 当前接收到的命令实体 **【返回值】 ****************************************************************/ void CNetLinkMain::onRecvCommand( CPduEntity* a_pCmd ) { // 接收端校验 ASSERT((a_pCmd->GetPeerDevId() == (ULONG)m_DevId && a_pCmd->GetPeerDevType() == (ULONG)m_DevType)); // 对注册命令的特殊处理 if(a_pCmd->GetCmdType() == PDU_CMD_REG && a_pCmd->GetIsExecReturn()) { // 查找要接收命令的连接 CNetClient* pClient = m_ClientContainer.getClient(a_pCmd->GetAssoSocket()); if(pClient == NULL) return; // 触发注册事件 if(a_pCmd->GetDataBool(0)) { onConnRegistOK(pClient); } else { onConnRegistFailed(pClient); } // end if } // end if // 对事件观查者的处理 m_EventHost.OnRecvCommand(a_pCmd); } /***************************************************************** **【函数名称】 StopAll **【函数功能】 停止 **【参数】 **【返回值】 ****************************************************************/ void CNetLinkMain::StopAll( void ) { if(!m_ThreadStop) { m_ThreadStop = true; for (int i = 0; i < m_ThreadCount; ++i) { // 通知所有的完成端口操作退出 PostQueuedCompletionStatus(m_hIocp, 0, (DWORD)VAL_THREAD_EXIT_CODE, NULL); } // 等待所有的客户端资源退出 WaitForMultipleObjects(m_ThreadCount, m_phWorkerThreads, TRUE, INFINITE); // 释放工作者线程句柄指针 for( int i=0;i