diff options
Diffstat (limited to 'src/net.cpp')
-rw-r--r-- | src/net.cpp | 192 |
1 files changed, 144 insertions, 48 deletions
diff --git a/src/net.cpp b/src/net.cpp index 3406a28b0e..9ee6cb423c 100644 --- a/src/net.cpp +++ b/src/net.cpp @@ -3,7 +3,6 @@ // Distributed under the MIT/X11 software license, see the accompanying // file COPYING or http://www.opensource.org/licenses/mit-license.php. -#include "irc.h" #include "db.h" #include "net.h" #include "init.h" @@ -347,7 +346,6 @@ bool GetMyExternalIP2(const CService& addrConnect, const char* pszGet, const cha return error("GetMyExternalIP() : connection closed"); } -// We now get our external IP from the IRC server first and only use this as a backup bool GetMyExternalIP(CNetAddr& ipRet) { CService addrConnect; @@ -538,7 +536,11 @@ void CNode::CloseSocketDisconnect() printf("disconnecting node %s\n", addrName.c_str()); closesocket(hSocket); hSocket = INVALID_SOCKET; - vRecv.clear(); + + // in case this fails, we'll empty the recv buffer when the CNode is deleted + TRY_LOCK(cs_vRecvMsg, lockRecv); + if (lockRecv) + vRecvMsg.clear(); } } @@ -630,15 +632,128 @@ void CNode::copyStats(CNodeStats &stats) } #undef X +// requires LOCK(cs_vRecvMsg) +bool CNode::ReceiveMsgBytes(const char *pch, unsigned int nBytes) +{ + while (nBytes > 0) { + + // get current incomplete message, or create a new one + if (vRecvMsg.empty() || + vRecvMsg.back().complete()) + vRecvMsg.push_back(CNetMessage(SER_NETWORK, nRecvVersion)); + + CNetMessage& msg = vRecvMsg.back(); + + // absorb network data + int handled; + if (!msg.in_data) + handled = msg.readHeader(pch, nBytes); + else + handled = msg.readData(pch, nBytes); + + if (handled < 0) + return false; + + pch += handled; + nBytes -= handled; + } + + return true; +} + +int CNetMessage::readHeader(const char *pch, unsigned int nBytes) +{ + // copy data to temporary parsing buffer + unsigned int nRemaining = 24 - nHdrPos; + unsigned int nCopy = std::min(nRemaining, nBytes); + + memcpy(&hdrbuf[nHdrPos], pch, nCopy); + nHdrPos += nCopy; + + // if header incomplete, exit + if (nHdrPos < 24) + return nCopy; + // deserialize to CMessageHeader + try { + hdrbuf >> hdr; + } + catch (std::exception &e) { + return -1; + } + + // reject messages larger than MAX_SIZE + if (hdr.nMessageSize > MAX_SIZE) + return -1; + // switch state to reading message data + in_data = true; + vRecv.resize(hdr.nMessageSize); + + return nCopy; +} +int CNetMessage::readData(const char *pch, unsigned int nBytes) +{ + unsigned int nRemaining = hdr.nMessageSize - nDataPos; + unsigned int nCopy = std::min(nRemaining, nBytes); + + memcpy(&vRecv[nDataPos], pch, nCopy); + nDataPos += nCopy; + + return nCopy; +} + + + +// requires LOCK(cs_vSend) +void SocketSendData(CNode *pnode) +{ + std::deque<CSerializeData>::iterator it = pnode->vSendMsg.begin(); + + while (it != pnode->vSendMsg.end()) { + const CSerializeData &data = *it; + assert(data.size() > pnode->nSendOffset); + int nBytes = send(pnode->hSocket, &data[pnode->nSendOffset], data.size() - pnode->nSendOffset, MSG_NOSIGNAL | MSG_DONTWAIT); + if (nBytes > 0) { + pnode->nLastSend = GetTime(); + pnode->nSendOffset += nBytes; + if (pnode->nSendOffset == data.size()) { + pnode->nSendOffset = 0; + pnode->nSendSize -= data.size(); + it++; + } else { + // could not send full message; stop sending more + break; + } + } else { + if (nBytes < 0) { + // error + int nErr = WSAGetLastError(); + if (nErr != WSAEWOULDBLOCK && nErr != WSAEMSGSIZE && nErr != WSAEINTR && nErr != WSAEINPROGRESS) + { + printf("socket send error %d\n", nErr); + pnode->CloseSocketDisconnect(); + } + } + // couldn't send anything at all + break; + } + } + + if (it == pnode->vSendMsg.end()) { + assert(pnode->nSendOffset == 0); + assert(pnode->nSendSize == 0); + } + pnode->vSendMsg.erase(pnode->vSendMsg.begin(), it); +} + void ThreadSocketHandler(void* parg) { // Make this thread recognisable as the networking thread @@ -678,7 +793,7 @@ void ThreadSocketHandler2(void* parg) BOOST_FOREACH(CNode* pnode, vNodesCopy) { if (pnode->fDisconnect || - (pnode->GetRefCount() <= 0 && pnode->vRecv.empty() && pnode->vSend.empty())) + (pnode->GetRefCount() <= 0 && pnode->vRecvMsg.empty() && pnode->nSendSize == 0 && pnode->ssSend.empty())) { // remove from vNodes vNodes.erase(remove(vNodes.begin(), vNodes.end(), pnode), vNodes.end()); @@ -710,7 +825,7 @@ void ThreadSocketHandler2(void* parg) TRY_LOCK(pnode->cs_vSend, lockSend); if (lockSend) { - TRY_LOCK(pnode->cs_vRecv, lockRecv); + TRY_LOCK(pnode->cs_vRecvMsg, lockRecv); if (lockRecv) { TRY_LOCK(pnode->cs_inventory, lockInv); @@ -761,14 +876,18 @@ void ThreadSocketHandler2(void* parg) { if (pnode->hSocket == INVALID_SOCKET) continue; - FD_SET(pnode->hSocket, &fdsetRecv); - FD_SET(pnode->hSocket, &fdsetError); - hSocketMax = max(hSocketMax, pnode->hSocket); - have_fds = true; { TRY_LOCK(pnode->cs_vSend, lockSend); - if (lockSend && !pnode->vSend.empty()) - FD_SET(pnode->hSocket, &fdsetSend); + if (lockSend) { + // do not read, if draining write queue + if (!pnode->vSendMsg.empty()) + FD_SET(pnode->hSocket, &fdsetSend); + else + FD_SET(pnode->hSocket, &fdsetRecv); + FD_SET(pnode->hSocket, &fdsetError); + hSocketMax = max(hSocketMax, pnode->hSocket); + have_fds = true; + } } } } @@ -875,15 +994,12 @@ void ThreadSocketHandler2(void* parg) continue; if (FD_ISSET(pnode->hSocket, &fdsetRecv) || FD_ISSET(pnode->hSocket, &fdsetError)) { - TRY_LOCK(pnode->cs_vRecv, lockRecv); + TRY_LOCK(pnode->cs_vRecvMsg, lockRecv); if (lockRecv) { - CDataStream& vRecv = pnode->vRecv; - unsigned int nPos = vRecv.size(); - - if (nPos > ReceiveBufferSize()) { + if (pnode->GetTotalRecvSize() > ReceiveFloodSize()) { if (!pnode->fDisconnect) - printf("socket recv flood control disconnect (%"PRIszu" bytes)\n", vRecv.size()); + printf("socket recv flood control disconnect (%u bytes)\n", pnode->GetTotalRecvSize()); pnode->CloseSocketDisconnect(); } else { @@ -892,8 +1008,8 @@ void ThreadSocketHandler2(void* parg) int nBytes = recv(pnode->hSocket, pchBuf, sizeof(pchBuf), MSG_DONTWAIT); if (nBytes > 0) { - vRecv.resize(nPos + nBytes); - memcpy(&vRecv[nPos], pchBuf, nBytes); + if (!pnode->ReceiveMsgBytes(pchBuf, nBytes)) + pnode->CloseSocketDisconnect(); pnode->nLastRecv = GetTime(); } else if (nBytes == 0) @@ -927,34 +1043,13 @@ void ThreadSocketHandler2(void* parg) { TRY_LOCK(pnode->cs_vSend, lockSend); if (lockSend) - { - CDataStream& vSend = pnode->vSend; - if (!vSend.empty()) - { - int nBytes = send(pnode->hSocket, &vSend[0], vSend.size(), MSG_NOSIGNAL | MSG_DONTWAIT); - if (nBytes > 0) - { - vSend.erase(vSend.begin(), vSend.begin() + nBytes); - pnode->nLastSend = GetTime(); - } - else if (nBytes < 0) - { - // error - int nErr = WSAGetLastError(); - if (nErr != WSAEWOULDBLOCK && nErr != WSAEMSGSIZE && nErr != WSAEINTR && nErr != WSAEINPROGRESS) - { - printf("socket send error %d\n", nErr); - pnode->CloseSocketDisconnect(); - } - } - } - } + SocketSendData(pnode); } // // Inactivity checking // - if (pnode->vSend.empty()) + if (pnode->vSendMsg.empty()) pnode->nLastSendEmpty = GetTime(); if (GetTime() - pnode->nTimeConnected > 60) { @@ -1155,6 +1250,7 @@ static const char *strMainNetDNSSeed[][2] = { static const char *strTestNetDNSSeed[][2] = { {"bitcoin.petertodd.org", "testnet-seed.bitcoin.petertodd.org"}, + {"bluematt.me", "testnet-seed.bluematt.me"}, {NULL, NULL} }; @@ -1692,11 +1788,15 @@ void ThreadMessageHandler2(void* parg) pnodeTrickle = vNodesCopy[GetRand(vNodesCopy.size())]; BOOST_FOREACH(CNode* pnode, vNodesCopy) { + if (pnode->fDisconnect) + continue; + // Receive messages { - TRY_LOCK(pnode->cs_vRecv, lockRecv); + TRY_LOCK(pnode->cs_vRecvMsg, lockRecv); if (lockRecv) - ProcessMessages(pnode); + if (!ProcessMessages(pnode)) + pnode->CloseSocketDisconnect(); } if (fShutdown) return; @@ -1928,10 +2028,6 @@ void StartNode(void* parg) if (fUseUPnP) MapPort(); - // Get addresses from IRC and advertise ours - if (!NewThread(ThreadIRCSeed, NULL)) - printf("Error: NewThread(ThreadIRCSeed) failed\n"); - // Send and receive from sockets, accept connections if (!NewThread(ThreadSocketHandler, NULL)) printf("Error: NewThread(ThreadSocketHandler) failed\n"); |