aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJeff Garzik <jgarzik@exmulti.com>2013-03-30 21:05:22 -0700
committerJeff Garzik <jgarzik@exmulti.com>2013-03-30 21:05:22 -0700
commite3c063b31598e1ffa83a978fd2f4e0551ead7261 (patch)
tree433fb59877457ddd5c51863c15ccda529e886368
parentea83336f4eceecbc046e465509b792dd203327bc (diff)
parentc7f039b674b43b741f20bf7521eb8a68426f4275 (diff)
downloadbitcoin-e3c063b31598e1ffa83a978fd2f4e0551ead7261.tar.xz
Merge pull request #2409 from sipa/txoptim
Network optimalizations
-rw-r--r--src/main.cpp290
-rw-r--r--src/net.cpp185
-rw-r--r--src/net.h137
-rw-r--r--src/protocol.h3
-rw-r--r--src/serialize.h8
5 files changed, 405 insertions, 218 deletions
diff --git a/src/main.cpp b/src/main.cpp
index 22baf0f3eb..b29091b4fe 100644
--- a/src/main.cpp
+++ b/src/main.cpp
@@ -3029,6 +3029,115 @@ bool static AlreadyHave(const CInv& inv)
unsigned char pchMessageStart[4] = { 0xf9, 0xbe, 0xb4, 0xd9 };
+void static ProcessGetData(CNode* pfrom)
+{
+ std::deque<CInv>::iterator it = pfrom->vRecvGetData.begin();
+
+ vector<CInv> vNotFound;
+
+ while (it != pfrom->vRecvGetData.end()) {
+ // Don't bother if send buffer is too full to respond anyway
+ if (pfrom->nSendSize >= SendBufferSize())
+ break;
+
+ const CInv &inv = *it;
+ {
+ if (fShutdown)
+ break;
+ it++;
+
+ if (inv.type == MSG_BLOCK || inv.type == MSG_FILTERED_BLOCK)
+ {
+ // Send block from disk
+ map<uint256, CBlockIndex*>::iterator mi = mapBlockIndex.find(inv.hash);
+ if (mi != mapBlockIndex.end())
+ {
+ CBlock block;
+ block.ReadFromDisk((*mi).second);
+ if (inv.type == MSG_BLOCK)
+ pfrom->PushMessage("block", block);
+ else // MSG_FILTERED_BLOCK)
+ {
+ LOCK(pfrom->cs_filter);
+ if (pfrom->pfilter)
+ {
+ CMerkleBlock merkleBlock(block, *pfrom->pfilter);
+ pfrom->PushMessage("merkleblock", merkleBlock);
+ // CMerkleBlock just contains hashes, so also push any transactions in the block the client did not see
+ // This avoids hurting performance by pointlessly requiring a round-trip
+ // Note that there is currently no way for a node to request any single transactions we didnt send here -
+ // they must either disconnect and retry or request the full block.
+ // Thus, the protocol spec specified allows for us to provide duplicate txn here,
+ // however we MUST always provide at least what the remote peer needs
+ typedef std::pair<unsigned int, uint256> PairType;
+ BOOST_FOREACH(PairType& pair, merkleBlock.vMatchedTxn)
+ if (!pfrom->setInventoryKnown.count(CInv(MSG_TX, pair.second)))
+ pfrom->PushMessage("tx", block.vtx[pair.first]);
+ }
+ // else
+ // no response
+ }
+
+ // Trigger them to send a getblocks request for the next batch of inventory
+ if (inv.hash == pfrom->hashContinue)
+ {
+ // Bypass PushInventory, this must send even if redundant,
+ // and we want it right after the last block so they don't
+ // wait for other stuff first.
+ vector<CInv> vInv;
+ vInv.push_back(CInv(MSG_BLOCK, hashBestChain));
+ pfrom->PushMessage("inv", vInv);
+ pfrom->hashContinue = 0;
+ }
+ }
+ }
+ else if (inv.IsKnownType())
+ {
+ // Send stream from relay memory
+ bool pushed = false;
+ {
+ LOCK(cs_mapRelay);
+ map<CInv, CDataStream>::iterator mi = mapRelay.find(inv);
+ if (mi != mapRelay.end()) {
+ pfrom->PushMessage(inv.GetCommand(), (*mi).second);
+ pushed = true;
+ }
+ }
+ if (!pushed && inv.type == MSG_TX) {
+ LOCK(mempool.cs);
+ if (mempool.exists(inv.hash)) {
+ CTransaction tx = mempool.lookup(inv.hash);
+ CDataStream ss(SER_NETWORK, PROTOCOL_VERSION);
+ ss.reserve(1000);
+ ss << tx;
+ pfrom->PushMessage("tx", ss);
+ pushed = true;
+ }
+ }
+ if (!pushed) {
+ vNotFound.push_back(inv);
+ }
+ }
+
+ // Track requests for our stuff.
+ Inventory(inv.hash);
+ }
+ }
+
+ pfrom->vRecvGetData.erase(pfrom->vRecvGetData.begin(), it);
+
+ if (!vNotFound.empty()) {
+ // Let the peer know that we didn't find what it asked for, so it doesn't
+ // have to wait around forever. Currently only SPV clients actually care
+ // about this message: it's needed when they are recursively walking the
+ // dependencies of relevant unconfirmed transactions. SPV clients want to
+ // do that because they want to know about (and store and rebroadcast and
+ // risk analyze) the dependencies of transactions relevant to them, without
+ // having to download the entire memory pool.
+ pfrom->PushMessage("notfound", vNotFound);
+ }
+}
+
bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv)
{
RandAddSeedPerfmon();
@@ -3104,7 +3213,7 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv)
// Change version
pfrom->PushMessage("verack");
- pfrom->vSend.SetVersion(min(pfrom->nVersion, PROTOCOL_VERSION));
+ pfrom->ssSend.SetVersion(min(pfrom->nVersion, PROTOCOL_VERSION));
if (!pfrom->fInbound)
{
@@ -3168,7 +3277,7 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv)
else if (strCommand == "verack")
{
- pfrom->vRecv.SetVersion(min(pfrom->nVersion, PROTOCOL_VERSION));
+ pfrom->SetRecvVersion(min(pfrom->nVersion, PROTOCOL_VERSION));
}
@@ -3302,101 +3411,11 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv)
if (fDebugNet || (vInv.size() != 1))
printf("received getdata (%"PRIszu" invsz)\n", vInv.size());
- vector<CInv> vNotFound;
- BOOST_FOREACH(const CInv& inv, vInv)
- {
- if (fShutdown)
- return true;
- if (fDebugNet || (vInv.size() == 1))
- printf("received getdata for: %s\n", inv.ToString().c_str());
+ if ((fDebugNet && vInv.size() > 0) || (vInv.size() == 1))
+ printf("received getdata for: %s\n", vInv[0].ToString().c_str());
- if (inv.type == MSG_BLOCK || inv.type == MSG_FILTERED_BLOCK)
- {
- // Send block from disk
- map<uint256, CBlockIndex*>::iterator mi = mapBlockIndex.find(inv.hash);
- if (mi != mapBlockIndex.end())
- {
- CBlock block;
- block.ReadFromDisk((*mi).second);
- if (inv.type == MSG_BLOCK)
- pfrom->PushMessage("block", block);
- else // MSG_FILTERED_BLOCK)
- {
- LOCK(pfrom->cs_filter);
- if (pfrom->pfilter)
- {
- CMerkleBlock merkleBlock(block, *pfrom->pfilter);
- pfrom->PushMessage("merkleblock", merkleBlock);
- // CMerkleBlock just contains hashes, so also push any transactions in the block the client did not see
- // This avoids hurting performance by pointlessly requiring a round-trip
- // Note that there is currently no way for a node to request any single transactions we didnt send here -
- // they must either disconnect and retry or request the full block.
- // Thus, the protocol spec specified allows for us to provide duplicate txn here,
- // however we MUST always provide at least what the remote peer needs
- typedef std::pair<unsigned int, uint256> PairType;
- BOOST_FOREACH(PairType& pair, merkleBlock.vMatchedTxn)
- if (!pfrom->setInventoryKnown.count(CInv(MSG_TX, pair.second)))
- pfrom->PushMessage("tx", block.vtx[pair.first]);
- }
- // else
- // no response
- }
-
- // Trigger them to send a getblocks request for the next batch of inventory
- if (inv.hash == pfrom->hashContinue)
- {
- // Bypass PushInventory, this must send even if redundant,
- // and we want it right after the last block so they don't
- // wait for other stuff first.
- vector<CInv> vInv;
- vInv.push_back(CInv(MSG_BLOCK, hashBestChain));
- pfrom->PushMessage("inv", vInv);
- pfrom->hashContinue = 0;
- }
- }
- }
- else if (inv.IsKnownType())
- {
- // Send stream from relay memory
- bool pushed = false;
- {
- LOCK(cs_mapRelay);
- map<CInv, CDataStream>::iterator mi = mapRelay.find(inv);
- if (mi != mapRelay.end()) {
- pfrom->PushMessage(inv.GetCommand(), (*mi).second);
- pushed = true;
- }
- }
- if (!pushed && inv.type == MSG_TX) {
- LOCK(mempool.cs);
- if (mempool.exists(inv.hash)) {
- CTransaction tx = mempool.lookup(inv.hash);
- CDataStream ss(SER_NETWORK, PROTOCOL_VERSION);
- ss.reserve(1000);
- ss << tx;
- pfrom->PushMessage("tx", ss);
- pushed = true;
- }
- }
- if (!pushed) {
- vNotFound.push_back(inv);
- }
- }
-
- // Track requests for our stuff.
- Inventory(inv.hash);
-
- if (!vNotFound.empty()) {
- // Let the peer know that we didn't find what it asked for, so it doesn't
- // have to wait around forever. Currently only SPV clients actually care
- // about this message: it's needed when they are recursively walking the
- // dependencies of relevant unconfirmed transactions. SPV clients want to
- // do that because they want to know about (and store and rebroadcast and
- // risk analyze) the dependencies of transactions relevant to them, without
- // having to download the entire memory pool.
- pfrom->PushMessage("notfound", vNotFound);
- }
- }
+ pfrom->vRecvGetData.insert(pfrom->vRecvGetData.end(), vInv.begin(), vInv.end());
+ ProcessGetData(pfrom);
}
@@ -3705,13 +3724,11 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv)
return true;
}
+// requires LOCK(cs_vRecvMsg)
bool ProcessMessages(CNode* pfrom)
{
- CDataStream& vRecv = pfrom->vRecv;
- if (vRecv.empty())
- return true;
//if (fDebug)
- // printf("ProcessMessages(%u bytes)\n", vRecv.size());
+ // printf("ProcessMessages(%zu messages)\n", pfrom->vRecvMsg.size());
//
// Message format
@@ -3721,33 +3738,41 @@ bool ProcessMessages(CNode* pfrom)
// (4) checksum
// (x) data
//
+ bool fOk = true;
- loop
- {
+ if (!pfrom->vRecvGetData.empty())
+ ProcessGetData(pfrom);
+
+ std::deque<CNetMessage>::iterator it = pfrom->vRecvMsg.begin();
+ while (!pfrom->fDisconnect && it != pfrom->vRecvMsg.end()) {
// Don't bother if send buffer is too full to respond anyway
- if (pfrom->vSend.size() >= SendBufferSize())
+ if (pfrom->nSendSize >= SendBufferSize())
+ break;
+
+ // get next message
+ CNetMessage& msg = *it;
+
+ //if (fDebug)
+ // printf("ProcessMessages(message %u msgsz, %zu bytes, complete:%s)\n",
+ // msg.hdr.nMessageSize, msg.vRecv.size(),
+ // msg.complete() ? "Y" : "N");
+
+ // end, if an incomplete message is found
+ if (!msg.complete())
break;
+ // at this point, any failure means we can delete the current message
+ it++;
+
// Scan for message start
- CDataStream::iterator pstart = search(vRecv.begin(), vRecv.end(), BEGIN(pchMessageStart), END(pchMessageStart));
- int nHeaderSize = vRecv.GetSerializeSize(CMessageHeader());
- if (vRecv.end() - pstart < nHeaderSize)
- {
- if ((int)vRecv.size() > nHeaderSize)
- {
- printf("\n\nPROCESSMESSAGE MESSAGESTART NOT FOUND\n\n");
- vRecv.erase(vRecv.begin(), vRecv.end() - nHeaderSize);
- }
+ if (memcmp(msg.hdr.pchMessageStart, pchMessageStart, sizeof(pchMessageStart)) != 0) {
+ printf("\n\nPROCESSMESSAGE: INVALID MESSAGESTART\n\n");
+ fOk = false;
break;
}
- if (pstart - vRecv.begin() > 0)
- printf("\n\nPROCESSMESSAGE SKIPPED %"PRIpdd" BYTES\n\n", pstart - vRecv.begin());
- vRecv.erase(vRecv.begin(), pstart);
// Read header
- vector<char> vHeaderSave(vRecv.begin(), vRecv.begin() + nHeaderSize);
- CMessageHeader hdr;
- vRecv >> hdr;
+ CMessageHeader& hdr = msg.hdr;
if (!hdr.IsValid())
{
printf("\n\nPROCESSMESSAGE: ERRORS IN HEADER %s\n\n\n", hdr.GetCommand().c_str());
@@ -3757,19 +3782,9 @@ bool ProcessMessages(CNode* pfrom)
// Message size
unsigned int nMessageSize = hdr.nMessageSize;
- if (nMessageSize > MAX_SIZE)
- {
- printf("ProcessMessages(%s, %u bytes) : nMessageSize > MAX_SIZE\n", strCommand.c_str(), nMessageSize);
- continue;
- }
- if (nMessageSize > vRecv.size())
- {
- // Rewind and wait for rest of message
- vRecv.insert(vRecv.begin(), vHeaderSave.begin(), vHeaderSave.end());
- break;
- }
// Checksum
+ CDataStream& vRecv = msg.vRecv;
uint256 hash = Hash(vRecv.begin(), vRecv.begin() + nMessageSize);
unsigned int nChecksum = 0;
memcpy(&nChecksum, &hash, sizeof(nChecksum));
@@ -3780,20 +3795,16 @@ bool ProcessMessages(CNode* pfrom)
continue;
}
- // Copy message to its own buffer
- CDataStream vMsg(vRecv.begin(), vRecv.begin() + nMessageSize, vRecv.nType, vRecv.nVersion);
- vRecv.ignore(nMessageSize);
-
// Process message
bool fRet = false;
try
{
{
LOCK(cs_main);
- fRet = ProcessMessage(pfrom, strCommand, vMsg);
+ fRet = ProcessMessage(pfrom, strCommand, vRecv);
}
if (fShutdown)
- return true;
+ break;
}
catch (std::ios_base::failure& e)
{
@@ -3822,8 +3833,11 @@ bool ProcessMessages(CNode* pfrom)
printf("ProcessMessage(%s, %u bytes) FAILED\n", strCommand.c_str(), nMessageSize);
}
- vRecv.Compact();
- return true;
+ // In case the connection got shut down, its receive buffer was wiped
+ if (!pfrom->fDisconnect)
+ pfrom->vRecvMsg.erase(pfrom->vRecvMsg.begin(), it);
+
+ return fOk;
}
@@ -3837,7 +3851,7 @@ bool SendMessages(CNode* pto, bool fSendTrickle)
// Keep-alive ping. We send a nonce of zero because we don't use it anywhere
// right now.
- if (pto->nLastSend && GetTime() - pto->nLastSend > 30 * 60 && pto->vSend.empty()) {
+ if (pto->nLastSend && GetTime() - pto->nLastSend > 30 * 60 && pto->vSendMsg.empty()) {
uint64 nonce = 0;
if (pto->nVersion > BIP0031_VERSION)
pto->PushMessage("ping", nonce);
diff --git a/src/net.cpp b/src/net.cpp
index 6c8fe3ffc9..9ee6cb423c 100644
--- a/src/net.cpp
+++ b/src/net.cpp
@@ -536,7 +536,11 @@ void CNode::CloseSocketDisconnect()
printf("disconnecting node %s\n", addrName.c_str());
closesocket(hSocket);
hSocket = INVALID_SOCKET;
- vRecv.clear();
+
+ // in case this fails, we'll empty the recv buffer when the CNode is deleted
+ TRY_LOCK(cs_vRecvMsg, lockRecv);
+ if (lockRecv)
+ vRecvMsg.clear();
}
}
@@ -628,15 +632,128 @@ void CNode::copyStats(CNodeStats &stats)
}
#undef X
+// requires LOCK(cs_vRecvMsg)
+bool CNode::ReceiveMsgBytes(const char *pch, unsigned int nBytes)
+{
+ while (nBytes > 0) {
+
+ // get current incomplete message, or create a new one
+ if (vRecvMsg.empty() ||
+ vRecvMsg.back().complete())
+ vRecvMsg.push_back(CNetMessage(SER_NETWORK, nRecvVersion));
+
+ CNetMessage& msg = vRecvMsg.back();
+
+ // absorb network data
+ int handled;
+ if (!msg.in_data)
+ handled = msg.readHeader(pch, nBytes);
+ else
+ handled = msg.readData(pch, nBytes);
+
+ if (handled < 0)
+ return false;
+
+ pch += handled;
+ nBytes -= handled;
+ }
+
+ return true;
+}
+
+int CNetMessage::readHeader(const char *pch, unsigned int nBytes)
+{
+ // copy data to temporary parsing buffer
+ unsigned int nRemaining = 24 - nHdrPos;
+ unsigned int nCopy = std::min(nRemaining, nBytes);
+
+ memcpy(&hdrbuf[nHdrPos], pch, nCopy);
+ nHdrPos += nCopy;
+
+ // if header incomplete, exit
+ if (nHdrPos < 24)
+ return nCopy;
+ // deserialize to CMessageHeader
+ try {
+ hdrbuf >> hdr;
+ }
+ catch (std::exception &e) {
+ return -1;
+ }
+
+ // reject messages larger than MAX_SIZE
+ if (hdr.nMessageSize > MAX_SIZE)
+ return -1;
+ // switch state to reading message data
+ in_data = true;
+ vRecv.resize(hdr.nMessageSize);
+
+ return nCopy;
+}
+int CNetMessage::readData(const char *pch, unsigned int nBytes)
+{
+ unsigned int nRemaining = hdr.nMessageSize - nDataPos;
+ unsigned int nCopy = std::min(nRemaining, nBytes);
+
+ memcpy(&vRecv[nDataPos], pch, nCopy);
+ nDataPos += nCopy;
+
+ return nCopy;
+}
+
+
+
+// requires LOCK(cs_vSend)
+void SocketSendData(CNode *pnode)
+{
+ std::deque<CSerializeData>::iterator it = pnode->vSendMsg.begin();
+
+ while (it != pnode->vSendMsg.end()) {
+ const CSerializeData &data = *it;
+ assert(data.size() > pnode->nSendOffset);
+ int nBytes = send(pnode->hSocket, &data[pnode->nSendOffset], data.size() - pnode->nSendOffset, MSG_NOSIGNAL | MSG_DONTWAIT);
+ if (nBytes > 0) {
+ pnode->nLastSend = GetTime();
+ pnode->nSendOffset += nBytes;
+ if (pnode->nSendOffset == data.size()) {
+ pnode->nSendOffset = 0;
+ pnode->nSendSize -= data.size();
+ it++;
+ } else {
+ // could not send full message; stop sending more
+ break;
+ }
+ } else {
+ if (nBytes < 0) {
+ // error
+ int nErr = WSAGetLastError();
+ if (nErr != WSAEWOULDBLOCK && nErr != WSAEMSGSIZE && nErr != WSAEINTR && nErr != WSAEINPROGRESS)
+ {
+ printf("socket send error %d\n", nErr);
+ pnode->CloseSocketDisconnect();
+ }
+ }
+ // couldn't send anything at all
+ break;
+ }
+ }
+
+ if (it == pnode->vSendMsg.end()) {
+ assert(pnode->nSendOffset == 0);
+ assert(pnode->nSendSize == 0);
+ }
+ pnode->vSendMsg.erase(pnode->vSendMsg.begin(), it);
+}
+
void ThreadSocketHandler(void* parg)
{
// Make this thread recognisable as the networking thread
@@ -676,7 +793,7 @@ void ThreadSocketHandler2(void* parg)
BOOST_FOREACH(CNode* pnode, vNodesCopy)
{
if (pnode->fDisconnect ||
- (pnode->GetRefCount() <= 0 && pnode->vRecv.empty() && pnode->vSend.empty()))
+ (pnode->GetRefCount() <= 0 && pnode->vRecvMsg.empty() && pnode->nSendSize == 0 && pnode->ssSend.empty()))
{
// remove from vNodes
vNodes.erase(remove(vNodes.begin(), vNodes.end(), pnode), vNodes.end());
@@ -708,7 +825,7 @@ void ThreadSocketHandler2(void* parg)
TRY_LOCK(pnode->cs_vSend, lockSend);
if (lockSend)
{
- TRY_LOCK(pnode->cs_vRecv, lockRecv);
+ TRY_LOCK(pnode->cs_vRecvMsg, lockRecv);
if (lockRecv)
{
TRY_LOCK(pnode->cs_inventory, lockInv);
@@ -759,14 +876,18 @@ void ThreadSocketHandler2(void* parg)
{
if (pnode->hSocket == INVALID_SOCKET)
continue;
- FD_SET(pnode->hSocket, &fdsetRecv);
- FD_SET(pnode->hSocket, &fdsetError);
- hSocketMax = max(hSocketMax, pnode->hSocket);
- have_fds = true;
{
TRY_LOCK(pnode->cs_vSend, lockSend);
- if (lockSend && !pnode->vSend.empty())
- FD_SET(pnode->hSocket, &fdsetSend);
+ if (lockSend) {
+ // do not read, if draining write queue
+ if (!pnode->vSendMsg.empty())
+ FD_SET(pnode->hSocket, &fdsetSend);
+ else
+ FD_SET(pnode->hSocket, &fdsetRecv);
+ FD_SET(pnode->hSocket, &fdsetError);
+ hSocketMax = max(hSocketMax, pnode->hSocket);
+ have_fds = true;
+ }
}
}
}
@@ -873,15 +994,12 @@ void ThreadSocketHandler2(void* parg)
continue;
if (FD_ISSET(pnode->hSocket, &fdsetRecv) || FD_ISSET(pnode->hSocket, &fdsetError))
{
- TRY_LOCK(pnode->cs_vRecv, lockRecv);
+ TRY_LOCK(pnode->cs_vRecvMsg, lockRecv);
if (lockRecv)
{
- CDataStream& vRecv = pnode->vRecv;
- unsigned int nPos = vRecv.size();
-
- if (nPos > ReceiveBufferSize()) {
+ if (pnode->GetTotalRecvSize() > ReceiveFloodSize()) {
if (!pnode->fDisconnect)
- printf("socket recv flood control disconnect (%"PRIszu" bytes)\n", vRecv.size());
+ printf("socket recv flood control disconnect (%u bytes)\n", pnode->GetTotalRecvSize());
pnode->CloseSocketDisconnect();
}
else {
@@ -890,8 +1008,8 @@ void ThreadSocketHandler2(void* parg)
int nBytes = recv(pnode->hSocket, pchBuf, sizeof(pchBuf), MSG_DONTWAIT);
if (nBytes > 0)
{
- vRecv.resize(nPos + nBytes);
- memcpy(&vRecv[nPos], pchBuf, nBytes);
+ if (!pnode->ReceiveMsgBytes(pchBuf, nBytes))
+ pnode->CloseSocketDisconnect();
pnode->nLastRecv = GetTime();
}
else if (nBytes == 0)
@@ -925,34 +1043,13 @@ void ThreadSocketHandler2(void* parg)
{
TRY_LOCK(pnode->cs_vSend, lockSend);
if (lockSend)
- {
- CDataStream& vSend = pnode->vSend;
- if (!vSend.empty())
- {
- int nBytes = send(pnode->hSocket, &vSend[0], vSend.size(), MSG_NOSIGNAL | MSG_DONTWAIT);
- if (nBytes > 0)
- {
- vSend.erase(vSend.begin(), vSend.begin() + nBytes);
- pnode->nLastSend = GetTime();
- }
- else if (nBytes < 0)
- {
- // error
- int nErr = WSAGetLastError();
- if (nErr != WSAEWOULDBLOCK && nErr != WSAEMSGSIZE && nErr != WSAEINTR && nErr != WSAEINPROGRESS)
- {
- printf("socket send error %d\n", nErr);
- pnode->CloseSocketDisconnect();
- }
- }
- }
- }
+ SocketSendData(pnode);
}
//
// Inactivity checking
//
- if (pnode->vSend.empty())
+ if (pnode->vSendMsg.empty())
pnode->nLastSendEmpty = GetTime();
if (GetTime() - pnode->nTimeConnected > 60)
{
@@ -1691,11 +1788,15 @@ void ThreadMessageHandler2(void* parg)
pnodeTrickle = vNodesCopy[GetRand(vNodesCopy.size())];
BOOST_FOREACH(CNode* pnode, vNodesCopy)
{
+ if (pnode->fDisconnect)
+ continue;
+
// Receive messages
{
- TRY_LOCK(pnode->cs_vRecv, lockRecv);
+ TRY_LOCK(pnode->cs_vRecvMsg, lockRecv);
if (lockRecv)
- ProcessMessages(pnode);
+ if (!ProcessMessages(pnode))
+ pnode->CloseSocketDisconnect();
}
if (fShutdown)
return;
diff --git a/src/net.h b/src/net.h
index 3b46523cd9..368e4cd4bb 100644
--- a/src/net.h
+++ b/src/net.h
@@ -27,7 +27,7 @@ extern int nBestHeight;
-inline unsigned int ReceiveBufferSize() { return 1000*GetArg("-maxreceivebuffer", 5*1000); }
+inline unsigned int ReceiveFloodSize() { return 1000*GetArg("-maxreceivebuffer", 5*1000); }
inline unsigned int SendBufferSize() { return 1000*GetArg("-maxsendbuffer", 1*1000); }
void AddOneShot(std::string strDest);
@@ -42,6 +42,7 @@ unsigned short GetListenPort();
bool BindListenPort(const CService &bindAddr, std::string& strError=REF(std::string()));
void StartNode(void* parg);
bool StopNode();
+void SocketSendData(CNode *pnode);
enum
{
@@ -126,6 +127,44 @@ public:
+class CNetMessage {
+public:
+ bool in_data; // parsing header (false) or data (true)
+
+ CDataStream hdrbuf; // partially received header
+ CMessageHeader hdr; // complete header
+ unsigned int nHdrPos;
+
+ CDataStream vRecv; // received message data
+ unsigned int nDataPos;
+
+ CNetMessage(int nTypeIn, int nVersionIn) : hdrbuf(nTypeIn, nVersionIn), vRecv(nTypeIn, nVersionIn) {
+ hdrbuf.resize(24);
+ in_data = false;
+ nHdrPos = 0;
+ nDataPos = 0;
+ }
+
+ bool complete() const
+ {
+ if (!in_data)
+ return false;
+ return (hdr.nMessageSize == nDataPos);
+ }
+
+ void SetVersion(int nVersionIn)
+ {
+ hdrbuf.SetVersion(nVersionIn);
+ vRecv.SetVersion(nVersionIn);
+ }
+
+ int readHeader(const char *pch, unsigned int nBytes);
+ int readData(const char *pch, unsigned int nBytes);
+};
+
+
+
+
/** Information about a peer */
class CNode
@@ -134,16 +173,21 @@ public:
// socket
uint64 nServices;
SOCKET hSocket;
- CDataStream vSend;
- CDataStream vRecv;
+ CDataStream ssSend;
+ size_t nSendSize; // total size of all vSendMsg entries
+ size_t nSendOffset; // offset inside the first vSendMsg already sent
+ std::deque<CSerializeData> vSendMsg;
CCriticalSection cs_vSend;
- CCriticalSection cs_vRecv;
+
+ std::deque<CInv> vRecvGetData;
+ std::deque<CNetMessage> vRecvMsg;
+ CCriticalSection cs_vRecvMsg;
+ int nRecvVersion;
+
int64 nLastSend;
int64 nLastRecv;
int64 nLastSendEmpty;
int64 nTimeConnected;
- int nHeaderStart;
- unsigned int nMessageStart;
CAddress addr;
std::string addrName;
CService addrLocal;
@@ -191,16 +235,15 @@ public:
CCriticalSection cs_inventory;
std::multimap<int64, CInv> mapAskFor;
- CNode(SOCKET hSocketIn, CAddress addrIn, std::string addrNameIn = "", bool fInboundIn=false) : vSend(SER_NETWORK, MIN_PROTO_VERSION), vRecv(SER_NETWORK, MIN_PROTO_VERSION)
+ CNode(SOCKET hSocketIn, CAddress addrIn, std::string addrNameIn = "", bool fInboundIn=false) : ssSend(SER_NETWORK, MIN_PROTO_VERSION)
{
nServices = 0;
hSocket = hSocketIn;
+ nRecvVersion = MIN_PROTO_VERSION;
nLastSend = 0;
nLastRecv = 0;
nLastSendEmpty = GetTime();
nTimeConnected = GetTime();
- nHeaderStart = -1;
- nMessageStart = -1;
addr = addrIn;
addrName = addrNameIn == "" ? addr.ToStringIPPort() : addrNameIn;
nVersion = 0;
@@ -213,6 +256,8 @@ public:
fDisconnect = false;
nRefCount = 0;
nReleaseTime = 0;
+ nSendSize = 0;
+ nSendOffset = 0;
hashContinue = 0;
pindexLastGetBlocksBegin = 0;
hashLastGetBlocksEnd = 0;
@@ -250,6 +295,26 @@ public:
return std::max(nRefCount, 0) + (GetTime() < nReleaseTime ? 1 : 0);
}
+ // requires LOCK(cs_vRecvMsg)
+ unsigned int GetTotalRecvSize()
+ {
+ unsigned int total = 0;
+ BOOST_FOREACH(const CNetMessage &msg, vRecvMsg)
+ total += msg.vRecv.size() + 24;
+ return total;
+ }
+
+ // requires LOCK(cs_vRecvMsg)
+ bool ReceiveMsgBytes(const char *pch, unsigned int nBytes);
+
+ // requires LOCK(cs_vRecvMsg)
+ void SetRecvVersion(int nVersionIn)
+ {
+ nRecvVersion = nVersionIn;
+ BOOST_FOREACH(CNetMessage &msg, vRecvMsg)
+ msg.SetVersion(nVersionIn);
+ }
+
CNode* AddRef(int64 nTimeout=0)
{
if (nTimeout != 0)
@@ -324,11 +389,8 @@ public:
void BeginMessage(const char* pszCommand) EXCLUSIVE_LOCK_FUNCTION(cs_vSend)
{
ENTER_CRITICAL_SECTION(cs_vSend);
- if (nHeaderStart != -1)
- AbortMessage();
- nHeaderStart = vSend.size();
- vSend << CMessageHeader(pszCommand, 0);
- nMessageStart = vSend.size();
+ assert(ssSend.size() == 0);
+ ssSend << CMessageHeader(pszCommand, 0);
if (fDebug)
printf("sending: %s ", pszCommand);
}
@@ -336,11 +398,8 @@ public:
// TODO: Document the precondition of this function. Is cs_vSend locked?
void AbortMessage() UNLOCK_FUNCTION(cs_vSend)
{
- if (nHeaderStart < 0)
- return;
- vSend.resize(nHeaderStart);
- nHeaderStart = -1;
- nMessageStart = -1;
+ ssSend.clear();
+
LEAVE_CRITICAL_SECTION(cs_vSend);
if (fDebug)
@@ -357,26 +416,32 @@ public:
return;
}
- if (nHeaderStart < 0)
+ if (ssSend.size() == 0)
return;
// Set the size
- unsigned int nSize = vSend.size() - nMessageStart;
- memcpy((char*)&vSend[nHeaderStart] + CMessageHeader::MESSAGE_SIZE_OFFSET, &nSize, sizeof(nSize));
+ unsigned int nSize = ssSend.size() - CMessageHeader::HEADER_SIZE;
+ memcpy((char*)&ssSend[CMessageHeader::MESSAGE_SIZE_OFFSET], &nSize, sizeof(nSize));
// Set the checksum
- uint256 hash = Hash(vSend.begin() + nMessageStart, vSend.end());
+ uint256 hash = Hash(ssSend.begin() + CMessageHeader::HEADER_SIZE, ssSend.end());
unsigned int nChecksum = 0;
memcpy(&nChecksum, &hash, sizeof(nChecksum));
- assert(nMessageStart - nHeaderStart >= CMessageHeader::CHECKSUM_OFFSET + sizeof(nChecksum));
- memcpy((char*)&vSend[nHeaderStart] + CMessageHeader::CHECKSUM_OFFSET, &nChecksum, sizeof(nChecksum));
+ assert(ssSend.size () >= CMessageHeader::CHECKSUM_OFFSET + sizeof(nChecksum));
+ memcpy((char*)&ssSend[CMessageHeader::CHECKSUM_OFFSET], &nChecksum, sizeof(nChecksum));
if (fDebug) {
printf("(%d bytes)\n", nSize);
}
- nHeaderStart = -1;
- nMessageStart = -1;
+ std::deque<CSerializeData>::iterator it = vSendMsg.insert(vSendMsg.end(), CSerializeData());
+ ssSend.GetAndClear(*it);
+ nSendSize += (*it).size();
+
+ // If write queue empty, attempt "optimistic write"
+ if (it == vSendMsg.begin())
+ SocketSendData(this);
+
LEAVE_CRITICAL_SECTION(cs_vSend);
}
@@ -403,7 +468,7 @@ public:
try
{
BeginMessage(pszCommand);
- vSend << a1;
+ ssSend << a1;
EndMessage();
}
catch (...)
@@ -419,7 +484,7 @@ public:
try
{
BeginMessage(pszCommand);
- vSend << a1 << a2;
+ ssSend << a1 << a2;
EndMessage();
}
catch (...)
@@ -435,7 +500,7 @@ public:
try
{
BeginMessage(pszCommand);
- vSend << a1 << a2 << a3;
+ ssSend << a1 << a2 << a3;
EndMessage();
}
catch (...)
@@ -451,7 +516,7 @@ public:
try
{
BeginMessage(pszCommand);
- vSend << a1 << a2 << a3 << a4;
+ ssSend << a1 << a2 << a3 << a4;
EndMessage();
}
catch (...)
@@ -467,7 +532,7 @@ public:
try
{
BeginMessage(pszCommand);
- vSend << a1 << a2 << a3 << a4 << a5;
+ ssSend << a1 << a2 << a3 << a4 << a5;
EndMessage();
}
catch (...)
@@ -483,7 +548,7 @@ public:
try
{
BeginMessage(pszCommand);
- vSend << a1 << a2 << a3 << a4 << a5 << a6;
+ ssSend << a1 << a2 << a3 << a4 << a5 << a6;
EndMessage();
}
catch (...)
@@ -499,7 +564,7 @@ public:
try
{
BeginMessage(pszCommand);
- vSend << a1 << a2 << a3 << a4 << a5 << a6 << a7;
+ ssSend << a1 << a2 << a3 << a4 << a5 << a6 << a7;
EndMessage();
}
catch (...)
@@ -515,7 +580,7 @@ public:
try
{
BeginMessage(pszCommand);
- vSend << a1 << a2 << a3 << a4 << a5 << a6 << a7 << a8;
+ ssSend << a1 << a2 << a3 << a4 << a5 << a6 << a7 << a8;
EndMessage();
}
catch (...)
@@ -531,7 +596,7 @@ public:
try
{
BeginMessage(pszCommand);
- vSend << a1 << a2 << a3 << a4 << a5 << a6 << a7 << a8 << a9;
+ ssSend << a1 << a2 << a3 << a4 << a5 << a6 << a7 << a8 << a9;
EndMessage();
}
catch (...)
diff --git a/src/protocol.h b/src/protocol.h
index f5c162054e..4998425070 100644
--- a/src/protocol.h
+++ b/src/protocol.h
@@ -56,7 +56,8 @@ class CMessageHeader
CHECKSUM_SIZE=sizeof(int),
MESSAGE_SIZE_OFFSET=MESSAGE_START_SIZE+COMMAND_SIZE,
- CHECKSUM_OFFSET=MESSAGE_SIZE_OFFSET+MESSAGE_SIZE_SIZE
+ CHECKSUM_OFFSET=MESSAGE_SIZE_OFFSET+MESSAGE_SIZE_SIZE,
+ HEADER_SIZE=MESSAGE_START_SIZE+COMMAND_SIZE+MESSAGE_SIZE_SIZE+CHECKSUM_SIZE
};
char pchMessageStart[MESSAGE_START_SIZE];
char pchCommand[COMMAND_SIZE];
diff --git a/src/serialize.h b/src/serialize.h
index f2626281c1..e3d9939bcc 100644
--- a/src/serialize.h
+++ b/src/serialize.h
@@ -789,6 +789,7 @@ struct ser_streamplaceholder
+typedef std::vector<char, zero_after_free_allocator<char> > CSerializeData;
/** Double ended buffer combining vector and stream-like interfaces.
*
@@ -798,7 +799,7 @@ struct ser_streamplaceholder
class CDataStream
{
protected:
- typedef std::vector<char, zero_after_free_allocator<char> > vector_type;
+ typedef CSerializeData vector_type;
vector_type vch;
unsigned int nReadPos;
short state;
@@ -1095,6 +1096,11 @@ public:
::Unserialize(*this, obj, nType, nVersion);
return (*this);
}
+
+ void GetAndClear(CSerializeData &data) {
+ vch.swap(data);
+ CSerializeData().swap(vch);
+ }
};