aboutsummaryrefslogtreecommitdiff
path: root/src/net.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/net.cpp')
-rw-r--r--src/net.cpp157
1 files changed, 93 insertions, 64 deletions
diff --git a/src/net.cpp b/src/net.cpp
index bf2beb7740..b275bdd809 100644
--- a/src/net.cpp
+++ b/src/net.cpp
@@ -437,11 +437,6 @@ void CNode::CloseSocketDisconnect()
LogPrint("net", "disconnecting peer=%d\n", id);
CloseSocket(hSocket);
}
-
- // in case this fails, we'll empty the recv buffer when the CNode is deleted
- TRY_LOCK(cs_vRecvMsg, lockRecv);
- if (lockRecv)
- vRecvMsg.clear();
}
void CConnman::ClearBanned()
@@ -621,6 +616,7 @@ void CNode::copyStats(CNodeStats &stats)
X(nVersion);
X(cleanSubVer);
X(fInbound);
+ X(fAddnode);
X(nStartingHeight);
X(nSendBytes);
X(mapSendBytesPerMsgCmd);
@@ -649,16 +645,18 @@ void CNode::copyStats(CNodeStats &stats)
}
#undef X
-// requires LOCK(cs_vRecvMsg)
bool CNode::ReceiveMsgBytes(const char *pch, unsigned int nBytes, bool& complete)
{
complete = false;
+ int64_t nTimeMicros = GetTimeMicros();
+ nLastRecv = nTimeMicros / 1000000;
+ nRecvBytes += nBytes;
while (nBytes > 0) {
// get current incomplete message, or create a new one
if (vRecvMsg.empty() ||
vRecvMsg.back().complete())
- vRecvMsg.push_back(CNetMessage(Params().MessageStart(), SER_NETWORK, nRecvVersion));
+ vRecvMsg.push_back(CNetMessage(Params().MessageStart(), SER_NETWORK, INIT_PROTO_VERSION));
CNetMessage& msg = vRecvMsg.back();
@@ -690,7 +688,7 @@ bool CNode::ReceiveMsgBytes(const char *pch, unsigned int nBytes, bool& complete
assert(i != mapRecvBytesPerMsgCmd.end());
i->second += msg.hdr.nMessageSize + CMessageHeader::HEADER_SIZE;
- msg.nTime = GetTimeMicros();
+ msg.nTime = nTimeMicros;
complete = true;
}
}
@@ -763,7 +761,7 @@ const uint256& CNetMessage::GetMessageHash() const
// requires LOCK(cs_vSend)
-size_t SocketSendData(CNode *pnode)
+size_t CConnman::SocketSendData(CNode *pnode)
{
auto it = pnode->vSendMsg.begin();
size_t nSentSize = 0;
@@ -780,6 +778,7 @@ size_t SocketSendData(CNode *pnode)
if (pnode->nSendOffset == data.size()) {
pnode->nSendOffset = 0;
pnode->nSendSize -= data.size();
+ pnode->fPauseSend = pnode->nSendSize > nSendBufferMaxSize;
it++;
} else {
// could not send full message; stop sending more
@@ -1051,8 +1050,7 @@ void CConnman::ThreadSocketHandler()
std::vector<CNode*> vNodesCopy = vNodes;
BOOST_FOREACH(CNode* pnode, vNodesCopy)
{
- if (pnode->fDisconnect ||
- (pnode->GetRefCount() <= 0 && pnode->vRecvMsg.empty() && pnode->nSendSize == 0))
+ if (pnode->fDisconnect)
{
// remove from vNodes
vNodes.erase(remove(vNodes.begin(), vNodes.end(), pnode), vNodes.end());
@@ -1082,13 +1080,9 @@ void CConnman::ThreadSocketHandler()
TRY_LOCK(pnode->cs_vSend, lockSend);
if (lockSend)
{
- TRY_LOCK(pnode->cs_vRecvMsg, lockRecv);
- if (lockRecv)
- {
TRY_LOCK(pnode->cs_inventory, lockInv);
if (lockInv)
fDelete = true;
- }
}
}
if (fDelete)
@@ -1148,15 +1142,10 @@ void CConnman::ThreadSocketHandler()
// write buffer in this case before receiving more. This avoids
// needlessly queueing received data, if the remote peer is not themselves
// receiving data. This means properly utilizing TCP flow control signalling.
- // * Otherwise, if there is no (complete) message in the receive buffer,
- // or there is space left in the buffer, select() for receiving data.
- // * (if neither of the above applies, there is certainly one message
- // in the receiver buffer ready to be processed).
- // Together, that means that at least one of the following is always possible,
- // so we don't deadlock:
- // * We send some data.
- // * We wait for data to be received (and disconnect after timeout).
- // * We process a message in the buffer (message handler thread).
+ // * Otherwise, if there is space left in the receive buffer, select() for
+ // receiving data.
+ // * Hand off all complete messages to the processor, to be handled without
+ // blocking here.
{
TRY_LOCK(pnode->cs_vSend, lockSend);
if (lockSend) {
@@ -1167,10 +1156,7 @@ void CConnman::ThreadSocketHandler()
}
}
{
- TRY_LOCK(pnode->cs_vRecvMsg, lockRecv);
- if (lockRecv && (
- pnode->vRecvMsg.empty() || !pnode->vRecvMsg.front().complete() ||
- pnode->GetTotalRecvSize() <= GetReceiveFloodSize()))
+ if (!pnode->fPauseRecv)
FD_SET(pnode->hSocket, &fdsetRecv);
}
}
@@ -1229,8 +1215,6 @@ void CConnman::ThreadSocketHandler()
continue;
if (FD_ISSET(pnode->hSocket, &fdsetRecv) || FD_ISSET(pnode->hSocket, &fdsetError))
{
- TRY_LOCK(pnode->cs_vRecvMsg, lockRecv);
- if (lockRecv)
{
{
// typical socket buffer is 8K-64K
@@ -1241,11 +1225,23 @@ void CConnman::ThreadSocketHandler()
bool notify = false;
if (!pnode->ReceiveMsgBytes(pchBuf, nBytes, notify))
pnode->CloseSocketDisconnect();
- if(notify)
- condMsgProc.notify_one();
- pnode->nLastRecv = GetTime();
- pnode->nRecvBytes += nBytes;
RecordBytesRecv(nBytes);
+ if (notify) {
+ size_t nSizeAdded = 0;
+ auto it(pnode->vRecvMsg.begin());
+ for (; it != pnode->vRecvMsg.end(); ++it) {
+ if (!it->complete())
+ break;
+ nSizeAdded += it->vRecv.size() + CMessageHeader::HEADER_SIZE;
+ }
+ {
+ LOCK(pnode->cs_vProcessMsg);
+ pnode->vProcessMsg.splice(pnode->vProcessMsg.end(), pnode->vRecvMsg, pnode->vRecvMsg.begin(), it);
+ pnode->nProcessQueueSize += nSizeAdded;
+ pnode->fPauseRecv = pnode->nProcessQueueSize > nReceiveFloodSize;
+ }
+ WakeMessageHandler();
+ }
}
else if (nBytes == 0)
{
@@ -1279,8 +1275,9 @@ void CConnman::ThreadSocketHandler()
TRY_LOCK(pnode->cs_vSend, lockSend);
if (lockSend) {
size_t nBytes = SocketSendData(pnode);
- if (nBytes)
+ if (nBytes) {
RecordBytesSent(nBytes);
+ }
}
}
@@ -1320,8 +1317,14 @@ void CConnman::ThreadSocketHandler()
}
}
-
-
+void CConnman::WakeMessageHandler()
+{
+ {
+ std::lock_guard<std::mutex> lock(mutexMsgProc);
+ fMsgProcWake = true;
+ }
+ condMsgProc.notify_one();
+}
@@ -1631,7 +1634,12 @@ void CConnman::ThreadOpenConnections()
{
LOCK(cs_vNodes);
BOOST_FOREACH(CNode* pnode, vNodes) {
- if (!pnode->fInbound) {
+ if (!pnode->fInbound && !pnode->fAddnode) {
+ // Netgroups for inbound and addnode peers are not excluded because our goal here
+ // is to not use multiple of our limited outbound slots on a single netgroup
+ // but inbound and addnode peers do not use our outbound slots. Inbound peers
+ // also have the added issue that they're attacker controlled and could be used
+ // to prevent us from connecting to particular hosts if we used them here.
setConnected.insert(pnode->addr.GetGroup());
nOutbound++;
}
@@ -1776,27 +1784,35 @@ void CConnman::ThreadOpenAddedConnections()
vAddedNodes = mapMultiArgs.at("-addnode");
}
- for (unsigned int i = 0; true; i++)
+ while (true)
{
+ CSemaphoreGrant grant(*semAddnode);
std::vector<AddedNodeInfo> vInfo = GetAddedNodeInfo();
+ bool tried = false;
for (const AddedNodeInfo& info : vInfo) {
if (!info.fConnected) {
- CSemaphoreGrant grant(*semOutbound);
+ if (!grant.TryAcquire()) {
+ // If we've used up our semaphore and need a new one, lets not wait here since while we are waiting
+ // the addednodeinfo state might change.
+ break;
+ }
// If strAddedNode is an IP/port, decode it immediately, so
// OpenNetworkConnection can detect existing connections to that IP/port.
+ tried = true;
CService service(LookupNumeric(info.strAddedNode.c_str(), Params().GetDefaultPort()));
- OpenNetworkConnection(CAddress(service, NODE_NONE), false, &grant, info.strAddedNode.c_str(), false);
+ OpenNetworkConnection(CAddress(service, NODE_NONE), false, &grant, info.strAddedNode.c_str(), false, false, true);
if (!interruptNet.sleep_for(std::chrono::milliseconds(500)))
return;
}
}
- if (!interruptNet.sleep_for(std::chrono::minutes(2)))
+ // Retry every 60 seconds if a connection was attempted, otherwise two seconds
+ if (!interruptNet.sleep_for(std::chrono::seconds(tried ? 60 : 2)))
return;
}
}
// if successful, this moves the passed grant to the constructed node
-bool CConnman::OpenNetworkConnection(const CAddress& addrConnect, bool fCountFailure, CSemaphoreGrant *grantOutbound, const char *pszDest, bool fOneShot, bool fFeeler)
+bool CConnman::OpenNetworkConnection(const CAddress& addrConnect, bool fCountFailure, CSemaphoreGrant *grantOutbound, const char *pszDest, bool fOneShot, bool fFeeler, bool fAddnode)
{
//
// Initiate outbound network connection
@@ -1825,6 +1841,8 @@ bool CConnman::OpenNetworkConnection(const CAddress& addrConnect, bool fCountFai
pnode->fOneShot = true;
if (fFeeler)
pnode->fFeeler = true;
+ if (fAddnode)
+ pnode->fAddnode = true;
return true;
}
@@ -1842,7 +1860,7 @@ void CConnman::ThreadMessageHandler()
}
}
- bool fSleep = true;
+ bool fMoreWork = false;
BOOST_FOREACH(CNode* pnode, vNodesCopy)
{
@@ -1850,22 +1868,8 @@ void CConnman::ThreadMessageHandler()
continue;
// Receive messages
- {
- TRY_LOCK(pnode->cs_vRecvMsg, lockRecv);
- if (lockRecv)
- {
- if (!GetNodeSignals().ProcessMessages(pnode, *this, flagInterruptMsgProc))
- pnode->CloseSocketDisconnect();
-
- if (pnode->nSendSize < GetSendBufferSize())
- {
- if (!pnode->vRecvGetData.empty() || (!pnode->vRecvMsg.empty() && pnode->vRecvMsg[0].complete()))
- {
- fSleep = false;
- }
- }
- }
- }
+ bool fMoreNodeWork = GetNodeSignals().ProcessMessages(pnode, *this, flagInterruptMsgProc);
+ fMoreWork |= (fMoreNodeWork && !pnode->fPauseSend);
if (flagInterruptMsgProc)
return;
@@ -1885,10 +1889,11 @@ void CConnman::ThreadMessageHandler()
pnode->Release();
}
- if (fSleep) {
- std::unique_lock<std::mutex> lock(mutexMsgProc);
- condMsgProc.wait_until(lock, std::chrono::steady_clock::now() + std::chrono::milliseconds(100));
+ std::unique_lock<std::mutex> lock(mutexMsgProc);
+ if (!fMoreWork) {
+ condMsgProc.wait_until(lock, std::chrono::steady_clock::now() + std::chrono::milliseconds(100), [this] { return fMsgProcWake; });
}
+ fMsgProcWake = false;
}
}
@@ -2076,8 +2081,10 @@ CConnman::CConnman(uint64_t nSeed0In, uint64_t nSeed1In) : nSeed0(nSeed0In), nSe
nSendBufferMaxSize = 0;
nReceiveFloodSize = 0;
semOutbound = NULL;
+ semAddnode = NULL;
nMaxConnections = 0;
nMaxOutbound = 0;
+ nMaxAddnode = 0;
nBestHeight = 0;
clientInterface = NULL;
flagInterruptMsgProc = false;
@@ -2099,10 +2106,11 @@ bool CConnman::Start(CScheduler& scheduler, std::string& strNodeError, Options c
nLocalServices = connOptions.nLocalServices;
nMaxConnections = connOptions.nMaxConnections;
nMaxOutbound = std::min((connOptions.nMaxOutbound), nMaxConnections);
+ nMaxAddnode = connOptions.nMaxAddnode;
nMaxFeeler = connOptions.nMaxFeeler;
nSendBufferMaxSize = connOptions.nSendBufferMaxSize;
- nReceiveFloodSize = connOptions.nSendBufferMaxSize;
+ nReceiveFloodSize = connOptions.nReceiveFloodSize;
nMaxOutboundLimit = connOptions.nMaxOutboundLimit;
nMaxOutboundTimeframe = connOptions.nMaxOutboundTimeframe;
@@ -2151,6 +2159,10 @@ bool CConnman::Start(CScheduler& scheduler, std::string& strNodeError, Options c
// initialize semaphore
semOutbound = new CSemaphore(std::min((nMaxOutbound + nMaxFeeler), nMaxConnections));
}
+ if (semAddnode == NULL) {
+ // initialize semaphore
+ semAddnode = new CSemaphore(nMaxAddnode);
+ }
//
// Start threads
@@ -2159,6 +2171,11 @@ bool CConnman::Start(CScheduler& scheduler, std::string& strNodeError, Options c
interruptNet.reset();
flagInterruptMsgProc = false;
+ {
+ std::unique_lock<std::mutex> lock(mutexMsgProc);
+ fMsgProcWake = false;
+ }
+
// Send and receive from sockets, accept connections
threadSocketHandler = std::thread(&TraceThread<std::function<void()> >, "net", std::function<void()>(std::bind(&CConnman::ThreadSocketHandler, this)));
@@ -2227,6 +2244,10 @@ void CConnman::Stop()
if (threadSocketHandler.joinable())
threadSocketHandler.join();
+ if (semAddnode)
+ for (int i=0; i<nMaxAddnode; i++)
+ semOutbound->post();
+
if (fAddressesInitialized)
{
DumpData();
@@ -2254,6 +2275,8 @@ void CConnman::Stop()
vhListenSocket.clear();
delete semOutbound;
semOutbound = NULL;
+ delete semAddnode;
+ semAddnode = NULL;
}
void CConnman::DeleteNode(CNode* pnode)
@@ -2554,6 +2577,7 @@ CNode::CNode(NodeId idIn, ServiceFlags nLocalServicesIn, int nMyStartingHeightIn
strSubVer = "";
fWhitelisted = false;
fOneShot = false;
+ fAddnode = false;
fClient = false; // set by version message
fFeeler = false;
fSuccessfullyConnected = false;
@@ -2583,6 +2607,9 @@ CNode::CNode(NodeId idIn, ServiceFlags nLocalServicesIn, int nMyStartingHeightIn
minFeeFilter = 0;
lastSentFeeFilter = 0;
nextSendTimeFeeFilter = 0;
+ fPauseRecv = false;
+ fPauseSend = false;
+ nProcessQueueSize = 0;
BOOST_FOREACH(const std::string &msg, getAllNetMessageTypes())
mapRecvBytesPerMsgCmd[msg] = 0;
@@ -2662,6 +2689,8 @@ void CConnman::PushMessage(CNode* pnode, CSerializedNetMsg&& msg)
pnode->mapSendBytesPerMsgCmd[msg.command] += nTotalSize;
pnode->nSendSize += nTotalSize;
+ if (pnode->nSendSize > nSendBufferMaxSize)
+ pnode->fPauseSend = true;
pnode->vSendMsg.push_back(std::move(serializedHeader));
if (nMessageSize)
pnode->vSendMsg.push_back(std::move(msg.data));