#include "stdafx.h" #include "ClientManager.h" #include "SysException.h" #include "SystemInfo.h" #include "ClientSession.h" #include "MemPool.h" #include "MemNode.h" #define QUIT_KEY -1 #define MAX_CLIENT_NUM 1024 CClientManager::CClientManager(void) :m_Iocp(NULL), m_pThread(NULL), m_SockInitSuccess(false) { CSystemInfo SysInfo; m_ThreadNum = SysInfo.dwNumberOfProcessors; InitializeCriticalSection(&m_CliSection); m_MemPool = new CMemPool(MAX_CLIENT_NUM, MAX_CLIENT_NUM * MAX_CLIENT_NUM); } CClientManager::~CClientManager(void) { DeleteCriticalSection(&m_CliSection); delete m_MemPool; } int CClientManager::StartCliMng() { InitWinSock(); //创建IO完成端口 m_Iocp = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, m_ThreadNum); if (NULL == m_Iocp) { throw CSysException("CreateIOCompletionPort in StartCliMng.", ::WSAGetLastError()); } //创建工作线程池 m_pThread = new HANDLE[m_ThreadNum]; for (int nIndex = 0; nIndex < m_ThreadNum; nIndex++) { m_pThread[nIndex] = reinterpret_cast(_beginthreadex(0, 0, WorkThread, this, 0, NULL)); } return 0; } int CClientManager::StopCliMng() { CloseAll(); //通知工作线程结束 for (int nIndex = 0; nIndex < m_ThreadNum; nIndex++) { PostQueuedCompletionStatus(m_Iocp, -1, QUIT_KEY, NULL); } //等待所有线程结束 DWORD dwWaitRet; dwWaitRet = WaitForMultipleObjects(m_ThreadNum, m_pThread, TRUE, INFINITE); //删除工作线程 for (int nIndex = 0; nIndex < m_ThreadNum; nIndex++) { CloseHandle(m_pThread[nIndex]); } delete [] m_pThread; UnInitWinSock(); return 0; } unsigned __stdcall CClientManager::WorkThread(void* pThreadParams) { //获得对象指针 CClientManager* pClientManager = NULL; pClientManager = reinterpret_cast(pThreadParams); if (pClientManager == NULL) { return -1; } DWORD dwRecvData; ULONG_PTR dwKey; LPOVERLAPPED pOverLapped = NULL; CMemNode* pMemNode = NULL; while (true) { BOOL bRet; //等待IO完成端口得到通知 bRet = GetQueuedCompletionStatus(pClientManager->m_Iocp, &dwRecvData, &dwKey, &pOverLapped, INFINITE); CClientSession* pClientSession = reinterpret_cast(dwKey); if (!bRet) { DWORD dwTransfer, dwFlags; BOOL bResult = WSAGetOverlappedResult(pClientSession->GetSocket(), pOverLapped, &dwTransfer, false, &dwFlags); if (!bResult) { DWORD dwError; dwError = WSAGetLastError(); try { //发出IO请求之后连接被关闭 if (dwRecvData == 0 && pClientSession != NULL && pOverLapped != NULL) { pMemNode = static_cast(pOverLapped); if (pClientSession->m_ConnType == 0) { pClientSession->ReleaseMem(pMemNode); continue; } pClientSession->ReleaseMem(pMemNode); pClientSession->CloseSession(); } throw CSysException("Error in GetQueuedCompletionStatus - WorkThread", dwError); } catch (...) { } } continue; } if (dwKey == -1) { break; } //调用协议处理模块的接口进行处理 pMemNode = static_cast(pOverLapped) ; //连接已经关闭 if (pMemNode->GetBufType() != IO_CONN && dwRecvData == 0) { if (NULL != pMemNode) { pClientSession->ReleaseMem(pMemNode); } //关闭连接 pClientSession->CloseSession(); SYSTEMTIME sysTime; GetLocalTime(&sysTime); char szTime[MAX_PATH]; sprintf(szTime, "%04d-%02d-%02d %02d:%02d:%02d.%03d", sysTime.wYear, sysTime.wMonth, sysTime.wDay, sysTime.wHour, sysTime.wMinute, sysTime.wSecond, sysTime.wMilliseconds); //CRobotManager::m_AppLogger.WriteLog("GameRobot.log", "%s %s-连接已经被关闭。", szTime, pClientSession->GetAccount().c_str()); continue; } switch(pMemNode->GetBufType()) { case IO_CONN: { pClientSession->m_Connected = true; pClientSession->ReleaseMem(pMemNode); //发出多个读请求 for (int nIndex = 0; nIndex < 1; nIndex++) { pClientSession->ReadData(NULL); } pClientSession->OnConnection(); } break; case IO_SEND_REQ: { pClientSession->PostData(pMemNode); } break; case IO_SEND: { if (dwRecvData != pMemNode->GetUsedSize()) { pMemNode->RemoveData(dwRecvData); pClientSession->SendData(pMemNode); continue; } pClientSession->ReleaseMem(pMemNode); } break; case IO_RECV: { pMemNode->SetUseSize(dwRecvData); try { pClientSession->ProcMemNode(pMemNode); } catch (...) { //CRobotManager::m_AppLogger.WriteLog("GameRobot.log", "%s", "throw an exception at ProcMemNode"); } if (pClientSession->m_ConnType == 1) { //pClientSession->ReadData(NULL); } } break; default: break; } } return 0; } CClientSession* CClientManager::AddClientSession(const string& strClientID) { //创建Client CLISESSIONMAP::iterator Ite; EnterCriticalSection(&m_CliSection); Ite = m_ClientSessions.find(strClientID); if (Ite != m_ClientSessions.end()) { LeaveCriticalSection(&m_CliSection); return Ite->second; } CClientSession* pClientSession = NULL; pClientSession = new CClientSession(*this); m_ClientSessions[strClientID] = pClientSession; pClientSession->SetAccount(strClientID); LeaveCriticalSection(&m_CliSection); return pClientSession; } int CClientManager::DeleteClientSession(const string& strClientID) { EnterCriticalSection(&m_CliSection); CLISESSIONMAP::iterator Ite = m_ClientSessions.find(strClientID); if (Ite == m_ClientSessions.end()) { LeaveCriticalSection(&m_CliSection); return 0; } delete Ite->second; m_ClientSessions.erase(Ite); LeaveCriticalSection(&m_CliSection); return 0; } CClientSession* CClientManager::GetClientSession(const string& strClientID) { EnterCriticalSection(&m_CliSection); CLISESSIONMAP::iterator Ite = m_ClientSessions.find(strClientID); if (Ite == m_ClientSessions.end()) { LeaveCriticalSection(&m_CliSection); return NULL; } LeaveCriticalSection(&m_CliSection); return Ite->second; } void CClientManager::ClearAll() { EnterCriticalSection(&m_CliSection); for (CLISESSIONMAP::iterator Ite = m_ClientSessions.begin(); Ite != m_ClientSessions.end(); Ite++) { delete Ite->second; } m_ClientSessions.clear(); LeaveCriticalSection(&m_CliSection); } void CClientManager::InitWinSock() { if (m_SockInitSuccess) { return; } WSADATA wsaData; DWORD dwRet = WSAStartup(MAKEWORD(2, 2), &wsaData); if (dwRet != 0) { m_SockInitSuccess = false; throw CSysException("throw an exception in InitWinSock.", WSAGetLastError()); } if (LOBYTE(wsaData.wVersion) != 2 || HIBYTE(wsaData.wVersion) != 2) { WSACleanup(); throw CAppException("throw an exception in InitWinSock.", "incorrect version of winsock."); } m_SockInitSuccess = true; } void CClientManager::UnInitWinSock() { if (m_SockInitSuccess) { WSACleanup(); } } void CClientManager::AddToIocp(CClientSession* pClientSession) { //绑定到完成端口 if (m_Iocp != CreateIoCompletionPort(reinterpret_cast(pClientSession->GetSocket()), m_Iocp, reinterpret_cast(pClientSession), 0)) { throw CSysException("AddClientSession-CreateIoCompletionPort", GetLastError()); } } void CClientManager::CloseAll() { for (CLISESSIONMAP::iterator Ite = m_ClientSessions.begin(); Ite != m_ClientSessions.end(); Ite++) { CClientSession* pClientSession = Ite->second; if (pClientSession->IsConnected()) { pClientSession->CloseSession(); } } }