aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorPieter Wuille <pieter.wuille@gmail.com>2013-03-24 16:52:24 +0100
committerPieter Wuille <sipa@ulyssis.org>2013-03-29 23:56:26 +0100
commit41b052ad87633d5a8a989c512c8710b875f2ba88 (patch)
tree0337b36bb1ced74a3f1400d840c191f9c738dd3d /src
parent967f24590b43f0f84148f669d886b40fe45aa978 (diff)
Use per-message send buffer, rather than per connection
Diffstat (limited to 'src')
-rw-r--r--src/main.cpp13
-rw-r--r--src/net.cpp59
-rw-r--r--src/net.h65
-rw-r--r--src/protocol.h3
-rw-r--r--src/serialize.h8
5 files changed, 86 insertions, 62 deletions
diff --git a/src/main.cpp b/src/main.cpp
index 0c80f23df9..77061fabd6 100644
--- a/src/main.cpp
+++ b/src/main.cpp
@@ -3104,7 +3104,7 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv)
// Change version
pfrom->PushMessage("verack");
- pfrom->vSend.SetVersion(min(pfrom->nVersion, PROTOCOL_VERSION));
+ pfrom->ssSend.SetVersion(min(pfrom->nVersion, PROTOCOL_VERSION));
if (!pfrom->fInbound)
{
@@ -3722,9 +3722,9 @@ bool ProcessMessages(CNode* pfrom)
bool fOk = true;
std::deque<CNetMessage>::iterator it = pfrom->vRecvMsg.begin();
- while (it != pfrom->vRecvMsg.end()) {
+ while (!pfrom->fDisconnect && it != pfrom->vRecvMsg.end()) {
// Don't bother if send buffer is too full to respond anyway
- if (pfrom->vSend.size() >= SendBufferSize())
+ if (pfrom->nSendSize >= SendBufferSize())
break;
// get next message
@@ -3811,7 +3811,10 @@ bool ProcessMessages(CNode* pfrom)
printf("ProcessMessage(%s, %u bytes) FAILED\n", strCommand.c_str(), nMessageSize);
}
- pfrom->vRecvMsg.erase(pfrom->vRecvMsg.begin(), it);
+ // In case the connection got shut down, its receive buffer was wiped
+ if (!pfrom->fDisconnect)
+ pfrom->vRecvMsg.erase(pfrom->vRecvMsg.begin(), it);
+
return fOk;
}
@@ -3826,7 +3829,7 @@ bool SendMessages(CNode* pto, bool fSendTrickle)
// Keep-alive ping. We send a nonce of zero because we don't use it anywhere
// right now.
- if (pto->nLastSend && GetTime() - pto->nLastSend > 30 * 60 && pto->vSend.empty()) {
+ if (pto->nLastSend && GetTime() - pto->nLastSend > 30 * 60 && pto->vSendMsg.empty()) {
uint64 nonce = 0;
if (pto->nVersion > BIP0031_VERSION)
pto->PushMessage("ping", nonce);
diff --git a/src/net.cpp b/src/net.cpp
index 1016d5d9f0..9ee6cb423c 100644
--- a/src/net.cpp
+++ b/src/net.cpp
@@ -715,26 +715,43 @@ int CNetMessage::readData(const char *pch, unsigned int nBytes)
// requires LOCK(cs_vSend)
void SocketSendData(CNode *pnode)
{
- CDataStream& vSend = pnode->vSend;
- if (vSend.empty())
- return;
-
- int nBytes = send(pnode->hSocket, &vSend[0], vSend.size(), MSG_NOSIGNAL | MSG_DONTWAIT);
- if (nBytes > 0)
- {
- vSend.erase(vSend.begin(), vSend.begin() + nBytes);
- pnode->nLastSend = GetTime();
- }
- else if (nBytes < 0)
- {
- // error
- int nErr = WSAGetLastError();
- if (nErr != WSAEWOULDBLOCK && nErr != WSAEMSGSIZE && nErr != WSAEINTR && nErr != WSAEINPROGRESS)
- {
- printf("socket send error %d\n", nErr);
- pnode->CloseSocketDisconnect();
+ std::deque<CSerializeData>::iterator it = pnode->vSendMsg.begin();
+
+ while (it != pnode->vSendMsg.end()) {
+ const CSerializeData &data = *it;
+ assert(data.size() > pnode->nSendOffset);
+ int nBytes = send(pnode->hSocket, &data[pnode->nSendOffset], data.size() - pnode->nSendOffset, MSG_NOSIGNAL | MSG_DONTWAIT);
+ if (nBytes > 0) {
+ pnode->nLastSend = GetTime();
+ pnode->nSendOffset += nBytes;
+ if (pnode->nSendOffset == data.size()) {
+ pnode->nSendOffset = 0;
+ pnode->nSendSize -= data.size();
+ it++;
+ } else {
+ // could not send full message; stop sending more
+ break;
+ }
+ } else {
+ if (nBytes < 0) {
+ // error
+ int nErr = WSAGetLastError();
+ if (nErr != WSAEWOULDBLOCK && nErr != WSAEMSGSIZE && nErr != WSAEINTR && nErr != WSAEINPROGRESS)
+ {
+ printf("socket send error %d\n", nErr);
+ pnode->CloseSocketDisconnect();
+ }
+ }
+ // couldn't send anything at all
+ break;
}
}
+
+ if (it == pnode->vSendMsg.end()) {
+ assert(pnode->nSendOffset == 0);
+ assert(pnode->nSendSize == 0);
+ }
+ pnode->vSendMsg.erase(pnode->vSendMsg.begin(), it);
}
void ThreadSocketHandler(void* parg)
@@ -776,7 +793,7 @@ void ThreadSocketHandler2(void* parg)
BOOST_FOREACH(CNode* pnode, vNodesCopy)
{
if (pnode->fDisconnect ||
- (pnode->GetRefCount() <= 0 && pnode->vRecvMsg.empty() && pnode->vSend.empty()))
+ (pnode->GetRefCount() <= 0 && pnode->vRecvMsg.empty() && pnode->nSendSize == 0 && pnode->ssSend.empty()))
{
// remove from vNodes
vNodes.erase(remove(vNodes.begin(), vNodes.end(), pnode), vNodes.end());
@@ -863,7 +880,7 @@ void ThreadSocketHandler2(void* parg)
TRY_LOCK(pnode->cs_vSend, lockSend);
if (lockSend) {
// do not read, if draining write queue
- if (!pnode->vSend.empty())
+ if (!pnode->vSendMsg.empty())
FD_SET(pnode->hSocket, &fdsetSend);
else
FD_SET(pnode->hSocket, &fdsetRecv);
@@ -1032,7 +1049,7 @@ void ThreadSocketHandler2(void* parg)
//
// Inactivity checking
//
- if (pnode->vSend.empty())
+ if (pnode->vSendMsg.empty())
pnode->nLastSendEmpty = GetTime();
if (GetTime() - pnode->nTimeConnected > 60)
{
diff --git a/src/net.h b/src/net.h
index d779265b79..9805e39f18 100644
--- a/src/net.h
+++ b/src/net.h
@@ -173,7 +173,10 @@ public:
// socket
uint64 nServices;
SOCKET hSocket;
- CDataStream vSend;
+ CDataStream ssSend;
+ size_t nSendSize; // total size of all vSendMsg entries
+ size_t nSendOffset; // offset inside the first vSendMsg already sent
+ std::deque<CSerializeData> vSendMsg;
CCriticalSection cs_vSend;
std::deque<CNetMessage> vRecvMsg;
@@ -184,8 +187,6 @@ public:
int64 nLastRecv;
int64 nLastSendEmpty;
int64 nTimeConnected;
- int nHeaderStart;
- unsigned int nMessageStart;
CAddress addr;
std::string addrName;
CService addrLocal;
@@ -233,7 +234,7 @@ public:
CCriticalSection cs_inventory;
std::multimap<int64, CInv> mapAskFor;
- CNode(SOCKET hSocketIn, CAddress addrIn, std::string addrNameIn = "", bool fInboundIn=false) : vSend(SER_NETWORK, MIN_PROTO_VERSION)
+ CNode(SOCKET hSocketIn, CAddress addrIn, std::string addrNameIn = "", bool fInboundIn=false) : ssSend(SER_NETWORK, MIN_PROTO_VERSION)
{
nServices = 0;
hSocket = hSocketIn;
@@ -242,8 +243,6 @@ public:
nLastRecv = 0;
nLastSendEmpty = GetTime();
nTimeConnected = GetTime();
- nHeaderStart = -1;
- nMessageStart = -1;
addr = addrIn;
addrName = addrNameIn == "" ? addr.ToStringIPPort() : addrNameIn;
nVersion = 0;
@@ -256,6 +255,8 @@ public:
fDisconnect = false;
nRefCount = 0;
nReleaseTime = 0;
+ nSendSize = 0;
+ nSendOffset = 0;
hashContinue = 0;
pindexLastGetBlocksBegin = 0;
hashLastGetBlocksEnd = 0;
@@ -387,11 +388,8 @@ public:
void BeginMessage(const char* pszCommand) EXCLUSIVE_LOCK_FUNCTION(cs_vSend)
{
ENTER_CRITICAL_SECTION(cs_vSend);
- if (nHeaderStart != -1)
- AbortMessage();
- nHeaderStart = vSend.size();
- vSend << CMessageHeader(pszCommand, 0);
- nMessageStart = vSend.size();
+ assert(ssSend.size() == 0);
+ ssSend << CMessageHeader(pszCommand, 0);
if (fDebug)
printf("sending: %s ", pszCommand);
}
@@ -399,11 +397,8 @@ public:
// TODO: Document the precondition of this function. Is cs_vSend locked?
void AbortMessage() UNLOCK_FUNCTION(cs_vSend)
{
- if (nHeaderStart < 0)
- return;
- vSend.resize(nHeaderStart);
- nHeaderStart = -1;
- nMessageStart = -1;
+ ssSend.clear();
+
LEAVE_CRITICAL_SECTION(cs_vSend);
if (fDebug)
@@ -420,30 +415,32 @@ public:
return;
}
- if (nHeaderStart < 0)
+ if (ssSend.size() == 0)
return;
// Set the size
- unsigned int nSize = vSend.size() - nMessageStart;
- memcpy((char*)&vSend[nHeaderStart] + CMessageHeader::MESSAGE_SIZE_OFFSET, &nSize, sizeof(nSize));
+ unsigned int nSize = ssSend.size() - CMessageHeader::HEADER_SIZE;
+ memcpy((char*)&ssSend[CMessageHeader::MESSAGE_SIZE_OFFSET], &nSize, sizeof(nSize));
// Set the checksum
- uint256 hash = Hash(vSend.begin() + nMessageStart, vSend.end());
+ uint256 hash = Hash(ssSend.begin() + CMessageHeader::HEADER_SIZE, ssSend.end());
unsigned int nChecksum = 0;
memcpy(&nChecksum, &hash, sizeof(nChecksum));
- assert(nMessageStart - nHeaderStart >= CMessageHeader::CHECKSUM_OFFSET + sizeof(nChecksum));
- memcpy((char*)&vSend[nHeaderStart] + CMessageHeader::CHECKSUM_OFFSET, &nChecksum, sizeof(nChecksum));
+ assert(ssSend.size () >= CMessageHeader::CHECKSUM_OFFSET + sizeof(nChecksum));
+ memcpy((char*)&ssSend[CMessageHeader::CHECKSUM_OFFSET], &nChecksum, sizeof(nChecksum));
if (fDebug) {
printf("(%d bytes)\n", nSize);
}
+ std::deque<CSerializeData>::iterator it = vSendMsg.insert(vSendMsg.end(), CSerializeData());
+ ssSend.GetAndClear(*it);
+ nSendSize += (*it).size();
+
// If write queue empty, attempt "optimistic write"
- if (nHeaderStart == 0)
+ if (it == vSendMsg.begin())
SocketSendData(this);
- nHeaderStart = -1;
- nMessageStart = -1;
LEAVE_CRITICAL_SECTION(cs_vSend);
}
@@ -470,7 +467,7 @@ public:
try
{
BeginMessage(pszCommand);
- vSend << a1;
+ ssSend << a1;
EndMessage();
}
catch (...)
@@ -486,7 +483,7 @@ public:
try
{
BeginMessage(pszCommand);
- vSend << a1 << a2;
+ ssSend << a1 << a2;
EndMessage();
}
catch (...)
@@ -502,7 +499,7 @@ public:
try
{
BeginMessage(pszCommand);
- vSend << a1 << a2 << a3;
+ ssSend << a1 << a2 << a3;
EndMessage();
}
catch (...)
@@ -518,7 +515,7 @@ public:
try
{
BeginMessage(pszCommand);
- vSend << a1 << a2 << a3 << a4;
+ ssSend << a1 << a2 << a3 << a4;
EndMessage();
}
catch (...)
@@ -534,7 +531,7 @@ public:
try
{
BeginMessage(pszCommand);
- vSend << a1 << a2 << a3 << a4 << a5;
+ ssSend << a1 << a2 << a3 << a4 << a5;
EndMessage();
}
catch (...)
@@ -550,7 +547,7 @@ public:
try
{
BeginMessage(pszCommand);
- vSend << a1 << a2 << a3 << a4 << a5 << a6;
+ ssSend << a1 << a2 << a3 << a4 << a5 << a6;
EndMessage();
}
catch (...)
@@ -566,7 +563,7 @@ public:
try
{
BeginMessage(pszCommand);
- vSend << a1 << a2 << a3 << a4 << a5 << a6 << a7;
+ ssSend << a1 << a2 << a3 << a4 << a5 << a6 << a7;
EndMessage();
}
catch (...)
@@ -582,7 +579,7 @@ public:
try
{
BeginMessage(pszCommand);
- vSend << a1 << a2 << a3 << a4 << a5 << a6 << a7 << a8;
+ ssSend << a1 << a2 << a3 << a4 << a5 << a6 << a7 << a8;
EndMessage();
}
catch (...)
@@ -598,7 +595,7 @@ public:
try
{
BeginMessage(pszCommand);
- vSend << a1 << a2 << a3 << a4 << a5 << a6 << a7 << a8 << a9;
+ ssSend << a1 << a2 << a3 << a4 << a5 << a6 << a7 << a8 << a9;
EndMessage();
}
catch (...)
diff --git a/src/protocol.h b/src/protocol.h
index f5c162054e..4998425070 100644
--- a/src/protocol.h
+++ b/src/protocol.h
@@ -56,7 +56,8 @@ class CMessageHeader
CHECKSUM_SIZE=sizeof(int),
MESSAGE_SIZE_OFFSET=MESSAGE_START_SIZE+COMMAND_SIZE,
- CHECKSUM_OFFSET=MESSAGE_SIZE_OFFSET+MESSAGE_SIZE_SIZE
+ CHECKSUM_OFFSET=MESSAGE_SIZE_OFFSET+MESSAGE_SIZE_SIZE,
+ HEADER_SIZE=MESSAGE_START_SIZE+COMMAND_SIZE+MESSAGE_SIZE_SIZE+CHECKSUM_SIZE
};
char pchMessageStart[MESSAGE_START_SIZE];
char pchCommand[COMMAND_SIZE];
diff --git a/src/serialize.h b/src/serialize.h
index f2626281c1..e3d9939bcc 100644
--- a/src/serialize.h
+++ b/src/serialize.h
@@ -789,6 +789,7 @@ struct ser_streamplaceholder
+typedef std::vector<char, zero_after_free_allocator<char> > CSerializeData;
/** Double ended buffer combining vector and stream-like interfaces.
*
@@ -798,7 +799,7 @@ struct ser_streamplaceholder
class CDataStream
{
protected:
- typedef std::vector<char, zero_after_free_allocator<char> > vector_type;
+ typedef CSerializeData vector_type;
vector_type vch;
unsigned int nReadPos;
short state;
@@ -1095,6 +1096,11 @@ public:
::Unserialize(*this, obj, nType, nVersion);
return (*this);
}
+
+ void GetAndClear(CSerializeData &data) {
+ vch.swap(data);
+ CSerializeData().swap(vch);
+ }
};