#include #include "ChatRecordMng.h" #include "ChatRecord.h" #include "MemNode.h" #include "MemPool.h" #define PRIVATE_MSG_FILE "PrivateMsg.dat" #define WORLD_MSG_FILE "WorldMsg.dat" #define GUILD_MSG_FILE "GuildMsg.dat" #define RACE_MSG_FILE "RaceMsg.dat" #define GROUP_MSG_FILE "GroupMsg.dat" #define COMM_MSG_FILE "CommMsg.dat" #define ADV_MSG_FILE "AdvMsg.dat" #define SYS_MSG_FILE "SysMsg.dat" #define CHATGROUP_MSG_FILE "ChatGroupMsg.dat" #define SAMECITY_MSG_FILE "SameCityMsg.dat" #define LOCAL_MSG_FILE "LocalMsg.dat" #define LOG_THREAD_NUM 1 CChatRecordMng::CChatRecordMng(void) { m_MemPool = new CMemPool(1024, 1024 * 1024); m_bIsStart = false; m_pChatRecEvent = NULL; } CChatRecordMng::~CChatRecordMng(void) { if (m_MemPool) { delete m_MemPool; } } int CChatRecordMng::StartLogger(const char* szPath, IChatRecEvent* pEvent) { if (m_bIsStart) { StopLogger(); } m_pChatRecEvent = pEvent; m_Iocp = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, LOG_THREAD_NUM); if (m_Iocp == NULL) { return -1; } m_pThread = new HANDLE[LOG_THREAD_NUM]; for (int nIndex = 0; nIndex < LOG_THREAD_NUM; nIndex++) { m_pThread[nIndex] = reinterpret_cast(_beginthreadex(NULL, 0, WorkThread, this, 0, NULL)); } m_pWorldRecord = new CChatRecord(*this); m_pWorldRecord->SetLogPath(szPath); m_pWorldRecord->Open(WORLD_MSG_FILE); m_pRaceRecord = new CChatRecord(*this); m_pRaceRecord->SetLogPath(szPath); m_pRaceRecord->Open(RACE_MSG_FILE); m_pGuildRecord = new CChatRecord(*this); m_pGuildRecord->SetLogPath(szPath); m_pGuildRecord->Open(GUILD_MSG_FILE); m_pPrivateRecord = new CChatRecord(*this); m_pPrivateRecord->SetLogPath(szPath); m_pPrivateRecord->Open(PRIVATE_MSG_FILE); m_pGroupRecord = new CChatRecord(*this); m_pGroupRecord->SetLogPath(szPath); m_pGroupRecord->Open(GROUP_MSG_FILE); m_pCommRecord = new CChatRecord(*this); m_pCommRecord->SetLogPath(szPath); m_pCommRecord->Open(COMM_MSG_FILE); m_pAdvise = new CChatRecord(*this); m_pAdvise->SetLogPath(szPath); m_pAdvise->Open(ADV_MSG_FILE); m_pSystem = new CChatRecord(*this); m_pSystem->SetLogPath(szPath); m_pSystem->Open(SYS_MSG_FILE); m_pChatGroup = new CChatRecord(*this); m_pChatGroup->SetLogPath(szPath); m_pChatGroup->Open(CHATGROUP_MSG_FILE); m_pSameCity = new CChatRecord(*this); m_pSameCity->SetLogPath(szPath); m_pSameCity->Open(SAMECITY_MSG_FILE); m_pLocal = new CChatRecord(*this); m_pLocal->SetLogPath(szPath); m_pLocal->Open(LOCAL_MSG_FILE); m_bIsStart = true; return 0; } int CChatRecordMng::StopLogger() { if (!m_bIsStart) { return 0; } for (int nIndex = 0; nIndex < LOG_THREAD_NUM; nIndex++) { PostQueuedCompletionStatus(m_Iocp, -1, -1, NULL); } WaitForMultipleObjects(LOG_THREAD_NUM, m_pThread, TRUE, INFINITE); for (int nIndex = 0; nIndex < LOG_THREAD_NUM; nIndex++) { CloseHandle(m_pThread[nIndex]); } delete [] m_pThread; delete m_pWorldRecord; delete m_pRaceRecord; delete m_pGuildRecord; delete m_pPrivateRecord; delete m_pGroupRecord; delete m_pCommRecord; delete m_pAdvise; delete m_pSystem; delete m_pChatGroup; delete m_pSameCity; delete m_pLocal; m_bIsStart = false; return 0; } unsigned __stdcall CChatRecordMng::WorkThread(void* pThreadParams) { CChatRecordMng* pChatRecordMng = reinterpret_cast(pThreadParams); if (pChatRecordMng == NULL) { return 0xdead; } DWORD dwRecvData; ULONG_PTR dwKey; CChatRecord* pRecord = NULL; LPOVERLAPPED pOverLapped = NULL; CMemNode* pMemNode = NULL; while (true) { BOOL bRet; //等待IO完成端口得到通知 bRet = GetQueuedCompletionStatus(pChatRecordMng->m_Iocp, &dwRecvData, &dwKey, &pOverLapped, INFINITE); if (!bRet) { continue; } if (dwKey == -1) { break; } pRecord = reinterpret_cast(dwKey); pMemNode = static_cast(pOverLapped); switch(pMemNode->GetBufType()) { case IO_WRITE_REQ: { pMemNode->SetBufType(IO_WRITE); pMemNode->Offset = GetFileSize(pRecord->m_FileHandle, NULL); if (!WriteFile(pRecord->m_FileHandle, pMemNode->GetBufferPrt(), pMemNode->GetUsedSize(), NULL, pMemNode)) { if (ERROR_IO_PENDING != GetLastError()) { DWORD dwErrorCode = GetLastError(); if (ERROR_IO_PENDING != dwErrorCode) { continue; } } } } break; case IO_WRITE: { if (dwRecvData != pMemNode->GetUsedSize()) { pMemNode->RemoveData(dwRecvData); pMemNode->Offset = GetFileSize(pRecord->m_FileHandle, NULL); WriteFile(pRecord->m_FileHandle, pMemNode->GetBufferPrt(), pMemNode->GetUsedSize(), NULL, pMemNode); } else { pChatRecordMng->m_MemPool->ReleaseMemNode(pMemNode); } } break; case IO_READ: { pMemNode->SetUseSize(dwRecvData); pRecord->m_CurPos += dwRecvData; pRecord->OnData(pMemNode); CMemNode* pNewMemNode = pChatRecordMng->m_MemPool->NewMemNode(); pNewMemNode->Offset = pRecord->m_CurPos; pNewMemNode->SetBufType(IO_READ); BOOL bResult = ReadFile(pRecord->m_FileHandle, pNewMemNode->GetBufferPrt(), pNewMemNode->GetSize(), NULL, pNewMemNode); if (!bResult) { DWORD dwError = GetLastError(); switch(dwError) { //已经到了文件尾 case ERROR_HANDLE_EOF: { pNewMemNode->SetUseSize(0); pRecord->OnData(pNewMemNode); } break; case ERROR_IO_PENDING: { } break; default: { pChatRecordMng->m_MemPool->ReleaseMemNode(pNewMemNode); } } } } break; default: break; } } return 0; } int CChatRecordMng::WriteChatLog(CHAT_TYPE ct, bool bMe, const char* szRoleName, const char* szChatObj, const char* szChat) { CChatRecord* pRecord = NULL; pRecord = GetRecord(ct); if (pRecord == NULL) { return 0; } string strBody; //聊天对象角色名 strBody.append(szChatObj, strlen(szChatObj) + 1); //自己角色名 strBody.append(szRoleName, strlen(szRoleName) + 1); CHATDIRECT cd; int nsize = sizeof(cd); cd = bMe ? LOCAL_SEND : PEER_SEND; strBody.append((char*)&cd, sizeof(cd)); unsigned char ucClientType; ucClientType = ct; strBody.append((char*)&ucClientType, sizeof(ucClientType)); SYSTEMTIME sysTime; GetLocalTime(&sysTime); strBody.append((char*)&sysTime, sizeof(sysTime)); strBody.append(szChat, strlen(szChat) + 1); FILEHEADER msgHeader; msgHeader.m_Cmd = ct; msgHeader.m_DataType = 0xFF; msgHeader.m_DataLen = strBody.length() + sizeof(msgHeader); string strMsg; strMsg.append((char*)&msgHeader, sizeof(msgHeader)); strMsg.append(strBody); pRecord->Write(strMsg.c_str(), strMsg.length()); return 0; } int CChatRecordMng::ReadChatLog(CChatRecordMng::CHAT_TYPE ct) { CChatRecord* pRecord = NULL; pRecord = GetRecord(ct); if (pRecord == NULL) { return 0; } pRecord->Read(); return 0; } int CChatRecordMng::AddRecord(CChatRecord* pRecord) { if (m_Iocp != CreateIoCompletionPort(pRecord->m_FileHandle, m_Iocp, reinterpret_cast(pRecord), 0)) { return GetLastError(); } return 0; } CChatRecord* CChatRecordMng::GetRecord(CChatRecordMng::CHAT_TYPE ct) { CChatRecord* pRecord = NULL; switch(ct) { case CT_PRIVATE_CHAT: case CT_PRIVATE_IRC_CHAT: { pRecord = m_pPrivateRecord; } break; case CT_WORLD_CHAT: case CT_WORLD_IRC_CHAT: { pRecord = m_pWorldRecord; } break; case CT_UNION_CHAT: case CT_UNION_IRC_CHAT: { pRecord = m_pGuildRecord; } break; case CT_SCOPE_CHAT: { pRecord = m_pCommRecord; } break; case CT_TEAM_CHAT: { pRecord = m_pGuildRecord; } break; case CT_CUST_GROUP: { pRecord = m_pChatGroup; } break; case CT_SYS_EVENT_NOTI: case CT_SYS_WARNING: { pRecord = m_pSystem; } break; case CT_CUST_BULLETIN: { pRecord = m_pAdvise; } break; default: break; } return pRecord; } int CChatRecordMng::ClearChat() { m_pWorldRecord->Clear(); m_pRaceRecord->Clear(); m_pGuildRecord->Clear(); m_pPrivateRecord->Clear(); m_pGroupRecord->Clear(); m_pCommRecord->Clear(); m_pAdvise->Clear(); m_pSystem->Clear(); m_pChatGroup->Clear(); m_pSameCity->Clear(); m_pLocal->Clear(); return 0; }