aboutsummaryrefslogtreecommitdiff
path: root/src/net.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/net.cpp')
-rw-r--r--src/net.cpp461
1 files changed, 280 insertions, 181 deletions
diff --git a/src/net.cpp b/src/net.cpp
index bf2beb7740..4434793c4c 100644
--- a/src/net.cpp
+++ b/src/net.cpp
@@ -164,8 +164,9 @@ int GetnScore(const CService& addr)
// Is our peer's addrLocal potentially useful as an external IP source?
bool IsPeerAddrLocalGood(CNode *pnode)
{
- return fDiscover && pnode->addr.IsRoutable() && pnode->addrLocal.IsRoutable() &&
- !IsLimited(pnode->addrLocal.GetNetwork());
+ CService addrLocal = pnode->GetAddrLocal();
+ return fDiscover && pnode->addr.IsRoutable() && addrLocal.IsRoutable() &&
+ !IsLimited(addrLocal.GetNetwork());
}
// pushes our own address to a peer
@@ -180,7 +181,7 @@ void AdvertiseLocal(CNode *pnode)
if (IsPeerAddrLocalGood(pnode) && (!addrLocal.IsRoutable() ||
GetRand((GetnScore(addrLocal) > LOCAL_MANUAL) ? 8:2) == 0))
{
- addrLocal.SetIP(pnode->addrLocal);
+ addrLocal.SetIP(pnode->GetAddrLocal());
}
if (addrLocal.IsRoutable())
{
@@ -307,9 +308,11 @@ CNode* CConnman::FindNode(const CSubNet& subNet)
CNode* CConnman::FindNode(const std::string& addrName)
{
LOCK(cs_vNodes);
- BOOST_FOREACH(CNode* pnode, vNodes)
- if (pnode->addrName == addrName)
+ BOOST_FOREACH(CNode* pnode, vNodes) {
+ if (pnode->GetAddrName() == addrName) {
return (pnode);
+ }
+ }
return NULL;
}
@@ -342,8 +345,8 @@ CNode* CConnman::ConnectNode(CAddress addrConnect, const char *pszDest, bool fCo
CNode* pnode = FindNode((CService)addrConnect);
if (pnode)
{
- pnode->AddRef();
- return pnode;
+ LogPrintf("Failed to open new connection, already connected\n");
+ return NULL;
}
}
@@ -369,18 +372,14 @@ CNode* CConnman::ConnectNode(CAddress addrConnect, const char *pszDest, bool fCo
// In that case, drop the connection that was just created, and return the existing CNode instead.
// Also store the name we used to connect in that CNode, so that future FindNode() calls to that
// name catch this early.
+ LOCK(cs_vNodes);
CNode* pnode = FindNode((CService)addrConnect);
if (pnode)
{
- pnode->AddRef();
- {
- LOCK(cs_vNodes);
- if (pnode->addrName.empty()) {
- pnode->addrName = std::string(pszDest);
- }
- }
+ pnode->MaybeSetAddrName(std::string(pszDest));
CloseSocket(hSocket);
- return pnode;
+ LogPrintf("Failed to open new connection, already connected\n");
+ return NULL;
}
}
@@ -391,13 +390,7 @@ CNode* CConnman::ConnectNode(CAddress addrConnect, const char *pszDest, bool fCo
uint64_t nonce = GetDeterministicRandomizer(RANDOMIZER_ID_LOCALHOSTNONCE).Write(id).Finalize();
CNode* pnode = new CNode(id, nLocalServices, GetBestHeight(), hSocket, addrConnect, CalculateKeyedNetGroup(addrConnect), nonce, pszDest ? pszDest : "", false);
pnode->nServicesExpected = ServiceFlags(addrConnect.nServices & nRelevantServices);
- pnode->nTimeConnected = GetTime();
pnode->AddRef();
- GetNodeSignals().InitializeNode(pnode, *this);
- {
- LOCK(cs_vNodes);
- vNodes.push_back(pnode);
- }
return pnode;
} else if (!proxyConnectionFailed) {
@@ -432,16 +425,12 @@ void CConnman::DumpBanlist()
void CNode::CloseSocketDisconnect()
{
fDisconnect = true;
+ LOCK(cs_hSocket);
if (hSocket != INVALID_SOCKET)
{
LogPrint("net", "disconnecting peer=%d\n", id);
CloseSocket(hSocket);
}
-
- // in case this fails, we'll empty the recv buffer when the CNode is deleted
- TRY_LOCK(cs_vRecvMsg, lockRecv);
- if (lockRecv)
- vRecvMsg.clear();
}
void CConnman::ClearBanned()
@@ -605,6 +594,33 @@ void CConnman::AddWhitelistedRange(const CSubNet &subnet) {
vWhitelistedRange.push_back(subnet);
}
+
+std::string CNode::GetAddrName() const {
+ LOCK(cs_addrName);
+ return addrName;
+}
+
+void CNode::MaybeSetAddrName(const std::string& addrNameIn) {
+ LOCK(cs_addrName);
+ if (addrName.empty()) {
+ addrName = addrNameIn;
+ }
+}
+
+CService CNode::GetAddrLocal() const {
+ LOCK(cs_addrLocal);
+ return addrLocal;
+}
+
+void CNode::SetAddrLocal(const CService& addrLocalIn) {
+ LOCK(cs_addrLocal);
+ if (addrLocal.IsValid()) {
+ error("Addr local already set for node: %i. Refusing to change from %s to %s", id, addrLocal.ToString(), addrLocalIn.ToString());
+ } else {
+ addrLocal = addrLocalIn;
+ }
+}
+
#undef X
#define X(name) stats.name = name
void CNode::copyStats(CNodeStats &stats)
@@ -612,20 +628,33 @@ void CNode::copyStats(CNodeStats &stats)
stats.nodeid = this->GetId();
X(nServices);
X(addr);
- X(fRelayTxes);
+ {
+ LOCK(cs_filter);
+ X(fRelayTxes);
+ }
X(nLastSend);
X(nLastRecv);
X(nTimeConnected);
X(nTimeOffset);
- X(addrName);
+ stats.addrName = GetAddrName();
X(nVersion);
- X(cleanSubVer);
+ {
+ LOCK(cs_SubVer);
+ X(cleanSubVer);
+ }
X(fInbound);
+ X(fAddnode);
X(nStartingHeight);
- X(nSendBytes);
- X(mapSendBytesPerMsgCmd);
- X(nRecvBytes);
- X(mapRecvBytesPerMsgCmd);
+ {
+ LOCK(cs_vSend);
+ X(mapSendBytesPerMsgCmd);
+ X(nSendBytes);
+ }
+ {
+ LOCK(cs_vRecv);
+ X(mapRecvBytesPerMsgCmd);
+ X(nRecvBytes);
+ }
X(fWhitelisted);
// It is common for nodes with good ping times to suddenly become lagged,
@@ -645,20 +674,24 @@ void CNode::copyStats(CNodeStats &stats)
stats.dPingWait = (((double)nPingUsecWait) / 1e6);
// Leave string empty if addrLocal invalid (not filled in yet)
- stats.addrLocal = addrLocal.IsValid() ? addrLocal.ToString() : "";
+ CService addrLocalUnlocked = GetAddrLocal();
+ stats.addrLocal = addrLocalUnlocked.IsValid() ? addrLocalUnlocked.ToString() : "";
}
#undef X
-// requires LOCK(cs_vRecvMsg)
bool CNode::ReceiveMsgBytes(const char *pch, unsigned int nBytes, bool& complete)
{
complete = false;
+ int64_t nTimeMicros = GetTimeMicros();
+ LOCK(cs_vRecv);
+ nLastRecv = nTimeMicros / 1000000;
+ nRecvBytes += nBytes;
while (nBytes > 0) {
// get current incomplete message, or create a new one
if (vRecvMsg.empty() ||
vRecvMsg.back().complete())
- vRecvMsg.push_back(CNetMessage(Params().MessageStart(), SER_NETWORK, nRecvVersion));
+ vRecvMsg.push_back(CNetMessage(Params().MessageStart(), SER_NETWORK, INIT_PROTO_VERSION));
CNetMessage& msg = vRecvMsg.back();
@@ -690,7 +723,7 @@ bool CNode::ReceiveMsgBytes(const char *pch, unsigned int nBytes, bool& complete
assert(i != mapRecvBytesPerMsgCmd.end());
i->second += msg.hdr.nMessageSize + CMessageHeader::HEADER_SIZE;
- msg.nTime = GetTimeMicros();
+ msg.nTime = nTimeMicros;
complete = true;
}
}
@@ -698,6 +731,33 @@ bool CNode::ReceiveMsgBytes(const char *pch, unsigned int nBytes, bool& complete
return true;
}
+void CNode::SetSendVersion(int nVersionIn)
+{
+ // Send version may only be changed in the version message, and
+ // only one version message is allowed per session. We can therefore
+ // treat this value as const and even atomic as long as it's only used
+ // once a version message has been successfully processed. Any attempt to
+ // set this twice is an error.
+ if (nSendVersion != 0) {
+ error("Send version already set for node: %i. Refusing to change from %i to %i", id, nSendVersion, nVersionIn);
+ } else {
+ nSendVersion = nVersionIn;
+ }
+}
+
+int CNode::GetSendVersion() const
+{
+ // The send version should always be explicitly set to
+ // INIT_PROTO_VERSION rather than using this value until SetSendVersion
+ // has been called.
+ if (nSendVersion == 0) {
+ error("Requesting unset send version for node: %i. Using %i", id, INIT_PROTO_VERSION);
+ return INIT_PROTO_VERSION;
+ }
+ return nSendVersion;
+}
+
+
int CNetMessage::readHeader(const char *pch, unsigned int nBytes)
{
// copy data to temporary parsing buffer
@@ -763,7 +823,7 @@ const uint256& CNetMessage::GetMessageHash() const
// requires LOCK(cs_vSend)
-size_t SocketSendData(CNode *pnode)
+size_t CConnman::SocketSendData(CNode *pnode) const
{
auto it = pnode->vSendMsg.begin();
size_t nSentSize = 0;
@@ -771,15 +831,22 @@ size_t SocketSendData(CNode *pnode)
while (it != pnode->vSendMsg.end()) {
const auto &data = *it;
assert(data.size() > pnode->nSendOffset);
- int nBytes = send(pnode->hSocket, reinterpret_cast<const char*>(data.data()) + pnode->nSendOffset, data.size() - pnode->nSendOffset, MSG_NOSIGNAL | MSG_DONTWAIT);
+ int nBytes = 0;
+ {
+ LOCK(pnode->cs_hSocket);
+ if (pnode->hSocket == INVALID_SOCKET)
+ break;
+ nBytes = send(pnode->hSocket, reinterpret_cast<const char*>(data.data()) + pnode->nSendOffset, data.size() - pnode->nSendOffset, MSG_NOSIGNAL | MSG_DONTWAIT);
+ }
if (nBytes > 0) {
- pnode->nLastSend = GetTime();
+ pnode->nLastSend = GetSystemTimeInSeconds();
pnode->nSendBytes += nBytes;
pnode->nSendOffset += nBytes;
nSentSize += nBytes;
if (pnode->nSendOffset == data.size()) {
pnode->nSendOffset = 0;
pnode->nSendSize -= data.size();
+ pnode->fPauseSend = pnode->nSendSize > nSendBufferMaxSize;
it++;
} else {
// could not send full message; stop sending more
@@ -1051,8 +1118,7 @@ void CConnman::ThreadSocketHandler()
std::vector<CNode*> vNodesCopy = vNodes;
BOOST_FOREACH(CNode* pnode, vNodesCopy)
{
- if (pnode->fDisconnect ||
- (pnode->GetRefCount() <= 0 && pnode->vRecvMsg.empty() && pnode->nSendSize == 0))
+ if (pnode->fDisconnect)
{
// remove from vNodes
vNodes.erase(remove(vNodes.begin(), vNodes.end(), pnode), vNodes.end());
@@ -1075,24 +1141,18 @@ void CConnman::ThreadSocketHandler()
BOOST_FOREACH(CNode* pnode, vNodesDisconnectedCopy)
{
// wait until threads are done using it
- if (pnode->GetRefCount() <= 0)
- {
+ if (pnode->GetRefCount() <= 0) {
bool fDelete = false;
{
- TRY_LOCK(pnode->cs_vSend, lockSend);
- if (lockSend)
- {
- TRY_LOCK(pnode->cs_vRecvMsg, lockRecv);
- if (lockRecv)
- {
- TRY_LOCK(pnode->cs_inventory, lockInv);
- if (lockInv)
- fDelete = true;
+ TRY_LOCK(pnode->cs_inventory, lockInv);
+ if (lockInv) {
+ TRY_LOCK(pnode->cs_vSend, lockSend);
+ if (lockSend) {
+ fDelete = true;
}
}
}
- if (fDelete)
- {
+ if (fDelete) {
vNodesDisconnected.remove(pnode);
DeleteNode(pnode);
}
@@ -1136,42 +1196,38 @@ void CConnman::ThreadSocketHandler()
LOCK(cs_vNodes);
BOOST_FOREACH(CNode* pnode, vNodes)
{
- if (pnode->hSocket == INVALID_SOCKET)
- continue;
- FD_SET(pnode->hSocket, &fdsetError);
- hSocketMax = std::max(hSocketMax, pnode->hSocket);
- have_fds = true;
-
// Implement the following logic:
// * If there is data to send, select() for sending data. As this only
// happens when optimistic write failed, we choose to first drain the
// write buffer in this case before receiving more. This avoids
// needlessly queueing received data, if the remote peer is not themselves
// receiving data. This means properly utilizing TCP flow control signalling.
- // * Otherwise, if there is no (complete) message in the receive buffer,
- // or there is space left in the buffer, select() for receiving data.
- // * (if neither of the above applies, there is certainly one message
- // in the receiver buffer ready to be processed).
- // Together, that means that at least one of the following is always possible,
- // so we don't deadlock:
- // * We send some data.
- // * We wait for data to be received (and disconnect after timeout).
- // * We process a message in the buffer (message handler thread).
+ // * Otherwise, if there is space left in the receive buffer, select() for
+ // receiving data.
+ // * Hand off all complete messages to the processor, to be handled without
+ // blocking here.
+
+ bool select_recv = !pnode->fPauseRecv;
+ bool select_send;
{
- TRY_LOCK(pnode->cs_vSend, lockSend);
- if (lockSend) {
- if (!pnode->vSendMsg.empty()) {
- FD_SET(pnode->hSocket, &fdsetSend);
- continue;
- }
- }
+ LOCK(pnode->cs_vSend);
+ select_send = !pnode->vSendMsg.empty();
}
- {
- TRY_LOCK(pnode->cs_vRecvMsg, lockRecv);
- if (lockRecv && (
- pnode->vRecvMsg.empty() || !pnode->vRecvMsg.front().complete() ||
- pnode->GetTotalRecvSize() <= GetReceiveFloodSize()))
- FD_SET(pnode->hSocket, &fdsetRecv);
+
+ LOCK(pnode->cs_hSocket);
+ if (pnode->hSocket == INVALID_SOCKET)
+ continue;
+
+ FD_SET(pnode->hSocket, &fdsetError);
+ hSocketMax = std::max(hSocketMax, pnode->hSocket);
+ have_fds = true;
+
+ if (select_send) {
+ FD_SET(pnode->hSocket, &fdsetSend);
+ continue;
+ }
+ if (select_recv) {
+ FD_SET(pnode->hSocket, &fdsetRecv);
}
}
}
@@ -1225,27 +1281,52 @@ void CConnman::ThreadSocketHandler()
//
// Receive
//
- if (pnode->hSocket == INVALID_SOCKET)
- continue;
- if (FD_ISSET(pnode->hSocket, &fdsetRecv) || FD_ISSET(pnode->hSocket, &fdsetError))
+ bool recvSet = false;
+ bool sendSet = false;
+ bool errorSet = false;
+ {
+ LOCK(pnode->cs_hSocket);
+ if (pnode->hSocket == INVALID_SOCKET)
+ continue;
+ recvSet = FD_ISSET(pnode->hSocket, &fdsetRecv);
+ sendSet = FD_ISSET(pnode->hSocket, &fdsetSend);
+ errorSet = FD_ISSET(pnode->hSocket, &fdsetError);
+ }
+ if (recvSet || errorSet)
{
- TRY_LOCK(pnode->cs_vRecvMsg, lockRecv);
- if (lockRecv)
{
{
// typical socket buffer is 8K-64K
char pchBuf[0x10000];
- int nBytes = recv(pnode->hSocket, pchBuf, sizeof(pchBuf), MSG_DONTWAIT);
+ int nBytes = 0;
+ {
+ LOCK(pnode->cs_hSocket);
+ if (pnode->hSocket == INVALID_SOCKET)
+ continue;
+ nBytes = recv(pnode->hSocket, pchBuf, sizeof(pchBuf), MSG_DONTWAIT);
+ }
if (nBytes > 0)
{
bool notify = false;
if (!pnode->ReceiveMsgBytes(pchBuf, nBytes, notify))
pnode->CloseSocketDisconnect();
- if(notify)
- condMsgProc.notify_one();
- pnode->nLastRecv = GetTime();
- pnode->nRecvBytes += nBytes;
RecordBytesRecv(nBytes);
+ if (notify) {
+ size_t nSizeAdded = 0;
+ auto it(pnode->vRecvMsg.begin());
+ for (; it != pnode->vRecvMsg.end(); ++it) {
+ if (!it->complete())
+ break;
+ nSizeAdded += it->vRecv.size() + CMessageHeader::HEADER_SIZE;
+ }
+ {
+ LOCK(pnode->cs_vProcessMsg);
+ pnode->vProcessMsg.splice(pnode->vProcessMsg.end(), pnode->vRecvMsg, pnode->vRecvMsg.begin(), it);
+ pnode->nProcessQueueSize += nSizeAdded;
+ pnode->fPauseRecv = pnode->nProcessQueueSize > nReceiveFloodSize;
+ }
+ WakeMessageHandler();
+ }
}
else if (nBytes == 0)
{
@@ -1272,22 +1353,19 @@ void CConnman::ThreadSocketHandler()
//
// Send
//
- if (pnode->hSocket == INVALID_SOCKET)
- continue;
- if (FD_ISSET(pnode->hSocket, &fdsetSend))
+ if (sendSet)
{
- TRY_LOCK(pnode->cs_vSend, lockSend);
- if (lockSend) {
- size_t nBytes = SocketSendData(pnode);
- if (nBytes)
- RecordBytesSent(nBytes);
+ LOCK(pnode->cs_vSend);
+ size_t nBytes = SocketSendData(pnode);
+ if (nBytes) {
+ RecordBytesSent(nBytes);
}
}
//
// Inactivity checking
//
- int64_t nTime = GetTime();
+ int64_t nTime = GetSystemTimeInSeconds();
if (nTime - pnode->nTimeConnected > 60)
{
if (pnode->nLastRecv == 0 || pnode->nLastSend == 0)
@@ -1310,6 +1388,11 @@ void CConnman::ThreadSocketHandler()
LogPrintf("ping timeout: %fs\n", 0.000001 * (GetTimeMicros() - pnode->nPingUsecStart));
pnode->fDisconnect = true;
}
+ else if (!pnode->fSuccessfullyConnected)
+ {
+ LogPrintf("version handshake timeout from %d\n", pnode->id);
+ pnode->fDisconnect = true;
+ }
}
}
{
@@ -1320,8 +1403,14 @@ void CConnman::ThreadSocketHandler()
}
}
-
-
+void CConnman::WakeMessageHandler()
+{
+ {
+ std::lock_guard<std::mutex> lock(mutexMsgProc);
+ fMsgProcWake = true;
+ }
+ condMsgProc.notify_one();
+}
@@ -1631,7 +1720,12 @@ void CConnman::ThreadOpenConnections()
{
LOCK(cs_vNodes);
BOOST_FOREACH(CNode* pnode, vNodes) {
- if (!pnode->fInbound) {
+ if (!pnode->fInbound && !pnode->fAddnode) {
+ // Netgroups for inbound and addnode peers are not excluded because our goal here
+ // is to not use multiple of our limited outbound slots on a single netgroup
+ // but inbound and addnode peers do not use our outbound slots. Inbound peers
+ // also have the added issue that they're attacker controlled and could be used
+ // to prevent us from connecting to particular hosts if we used them here.
setConnected.insert(pnode->addr.GetGroup());
nOutbound++;
}
@@ -1738,8 +1832,9 @@ std::vector<AddedNodeInfo> CConnman::GetAddedNodeInfo()
if (pnode->addr.IsValid()) {
mapConnected[pnode->addr] = pnode->fInbound;
}
- if (!pnode->addrName.empty()) {
- mapConnectedByName[pnode->addrName] = std::make_pair(pnode->fInbound, static_cast<const CService&>(pnode->addr));
+ std::string addrName = pnode->GetAddrName();
+ if (!addrName.empty()) {
+ mapConnectedByName[std::move(addrName)] = std::make_pair(pnode->fInbound, static_cast<const CService&>(pnode->addr));
}
}
}
@@ -1776,27 +1871,35 @@ void CConnman::ThreadOpenAddedConnections()
vAddedNodes = mapMultiArgs.at("-addnode");
}
- for (unsigned int i = 0; true; i++)
+ while (true)
{
+ CSemaphoreGrant grant(*semAddnode);
std::vector<AddedNodeInfo> vInfo = GetAddedNodeInfo();
+ bool tried = false;
for (const AddedNodeInfo& info : vInfo) {
if (!info.fConnected) {
- CSemaphoreGrant grant(*semOutbound);
+ if (!grant.TryAcquire()) {
+ // If we've used up our semaphore and need a new one, lets not wait here since while we are waiting
+ // the addednodeinfo state might change.
+ break;
+ }
// If strAddedNode is an IP/port, decode it immediately, so
// OpenNetworkConnection can detect existing connections to that IP/port.
+ tried = true;
CService service(LookupNumeric(info.strAddedNode.c_str(), Params().GetDefaultPort()));
- OpenNetworkConnection(CAddress(service, NODE_NONE), false, &grant, info.strAddedNode.c_str(), false);
+ OpenNetworkConnection(CAddress(service, NODE_NONE), false, &grant, info.strAddedNode.c_str(), false, false, true);
if (!interruptNet.sleep_for(std::chrono::milliseconds(500)))
return;
}
}
- if (!interruptNet.sleep_for(std::chrono::minutes(2)))
+ // Retry every 60 seconds if a connection was attempted, otherwise two seconds
+ if (!interruptNet.sleep_for(std::chrono::seconds(tried ? 60 : 2)))
return;
}
}
// if successful, this moves the passed grant to the constructed node
-bool CConnman::OpenNetworkConnection(const CAddress& addrConnect, bool fCountFailure, CSemaphoreGrant *grantOutbound, const char *pszDest, bool fOneShot, bool fFeeler)
+bool CConnman::OpenNetworkConnection(const CAddress& addrConnect, bool fCountFailure, CSemaphoreGrant *grantOutbound, const char *pszDest, bool fOneShot, bool fFeeler, bool fAddnode)
{
//
// Initiate outbound network connection
@@ -1825,6 +1928,14 @@ bool CConnman::OpenNetworkConnection(const CAddress& addrConnect, bool fCountFai
pnode->fOneShot = true;
if (fFeeler)
pnode->fFeeler = true;
+ if (fAddnode)
+ pnode->fAddnode = true;
+
+ GetNodeSignals().InitializeNode(pnode, *this);
+ {
+ LOCK(cs_vNodes);
+ vNodes.push_back(pnode);
+ }
return true;
}
@@ -1842,7 +1953,7 @@ void CConnman::ThreadMessageHandler()
}
}
- bool fSleep = true;
+ bool fMoreWork = false;
BOOST_FOREACH(CNode* pnode, vNodesCopy)
{
@@ -1850,30 +1961,15 @@ void CConnman::ThreadMessageHandler()
continue;
// Receive messages
- {
- TRY_LOCK(pnode->cs_vRecvMsg, lockRecv);
- if (lockRecv)
- {
- if (!GetNodeSignals().ProcessMessages(pnode, *this, flagInterruptMsgProc))
- pnode->CloseSocketDisconnect();
-
- if (pnode->nSendSize < GetSendBufferSize())
- {
- if (!pnode->vRecvGetData.empty() || (!pnode->vRecvMsg.empty() && pnode->vRecvMsg[0].complete()))
- {
- fSleep = false;
- }
- }
- }
- }
+ bool fMoreNodeWork = GetNodeSignals().ProcessMessages(pnode, *this, flagInterruptMsgProc);
+ fMoreWork |= (fMoreNodeWork && !pnode->fPauseSend);
if (flagInterruptMsgProc)
return;
// Send messages
{
- TRY_LOCK(pnode->cs_vSend, lockSend);
- if (lockSend)
- GetNodeSignals().SendMessages(pnode, *this, flagInterruptMsgProc);
+ LOCK(pnode->cs_sendProcessing);
+ GetNodeSignals().SendMessages(pnode, *this, flagInterruptMsgProc);
}
if (flagInterruptMsgProc)
return;
@@ -1885,10 +1981,11 @@ void CConnman::ThreadMessageHandler()
pnode->Release();
}
- if (fSleep) {
- std::unique_lock<std::mutex> lock(mutexMsgProc);
- condMsgProc.wait_until(lock, std::chrono::steady_clock::now() + std::chrono::milliseconds(100));
+ std::unique_lock<std::mutex> lock(mutexMsgProc);
+ if (!fMoreWork) {
+ condMsgProc.wait_until(lock, std::chrono::steady_clock::now() + std::chrono::milliseconds(100), [this] { return fMsgProcWake; });
}
+ fMsgProcWake = false;
}
}
@@ -2076,8 +2173,10 @@ CConnman::CConnman(uint64_t nSeed0In, uint64_t nSeed1In) : nSeed0(nSeed0In), nSe
nSendBufferMaxSize = 0;
nReceiveFloodSize = 0;
semOutbound = NULL;
+ semAddnode = NULL;
nMaxConnections = 0;
nMaxOutbound = 0;
+ nMaxAddnode = 0;
nBestHeight = 0;
clientInterface = NULL;
flagInterruptMsgProc = false;
@@ -2099,10 +2198,11 @@ bool CConnman::Start(CScheduler& scheduler, std::string& strNodeError, Options c
nLocalServices = connOptions.nLocalServices;
nMaxConnections = connOptions.nMaxConnections;
nMaxOutbound = std::min((connOptions.nMaxOutbound), nMaxConnections);
+ nMaxAddnode = connOptions.nMaxAddnode;
nMaxFeeler = connOptions.nMaxFeeler;
nSendBufferMaxSize = connOptions.nSendBufferMaxSize;
- nReceiveFloodSize = connOptions.nSendBufferMaxSize;
+ nReceiveFloodSize = connOptions.nReceiveFloodSize;
nMaxOutboundLimit = connOptions.nMaxOutboundLimit;
nMaxOutboundTimeframe = connOptions.nMaxOutboundTimeframe;
@@ -2110,8 +2210,9 @@ bool CConnman::Start(CScheduler& scheduler, std::string& strNodeError, Options c
SetBestHeight(connOptions.nBestHeight);
clientInterface = connOptions.uiInterface;
- if (clientInterface)
- clientInterface->InitMessage(_("Loading addresses..."));
+ if (clientInterface) {
+ clientInterface->InitMessage(_("Loading P2P addresses..."));
+ }
// Load addresses from peers.dat
int64_t nStart = GetTimeMillis();
{
@@ -2151,6 +2252,10 @@ bool CConnman::Start(CScheduler& scheduler, std::string& strNodeError, Options c
// initialize semaphore
semOutbound = new CSemaphore(std::min((nMaxOutbound + nMaxFeeler), nMaxConnections));
}
+ if (semAddnode == NULL) {
+ // initialize semaphore
+ semAddnode = new CSemaphore(nMaxAddnode);
+ }
//
// Start threads
@@ -2159,6 +2264,11 @@ bool CConnman::Start(CScheduler& scheduler, std::string& strNodeError, Options c
interruptNet.reset();
flagInterruptMsgProc = false;
+ {
+ std::unique_lock<std::mutex> lock(mutexMsgProc);
+ fMsgProcWake = false;
+ }
+
// Send and receive from sockets, accept connections
threadSocketHandler = std::thread(&TraceThread<std::function<void()> >, "net", std::function<void()>(std::bind(&CConnman::ThreadSocketHandler, this)));
@@ -2178,7 +2288,7 @@ bool CConnman::Start(CScheduler& scheduler, std::string& strNodeError, Options c
threadMessageHandler = std::thread(&TraceThread<std::function<void()> >, "msghand", std::function<void()>(std::bind(&CConnman::ThreadMessageHandler, this)));
// Dump network addresses
- scheduler.scheduleEvery(boost::bind(&CConnman::DumpData, this), DUMP_ADDRESSES_INTERVAL);
+ scheduler.scheduleEvery(std::bind(&CConnman::DumpData, this), DUMP_ADDRESSES_INTERVAL * 1000);
return true;
}
@@ -2209,9 +2319,17 @@ void CConnman::Interrupt()
interruptNet();
InterruptSocks5(true);
- if (semOutbound)
- for (int i=0; i<(nMaxOutbound + nMaxFeeler); i++)
+ if (semOutbound) {
+ for (int i=0; i<(nMaxOutbound + nMaxFeeler); i++) {
semOutbound->post();
+ }
+ }
+
+ if (semAddnode) {
+ for (int i=0; i<nMaxAddnode; i++) {
+ semAddnode->post();
+ }
+ }
}
void CConnman::Stop()
@@ -2235,8 +2353,7 @@ void CConnman::Stop()
// Close sockets
BOOST_FOREACH(CNode* pnode, vNodes)
- if (pnode->hSocket != INVALID_SOCKET)
- CloseSocket(pnode->hSocket);
+ pnode->CloseSocketDisconnect();
BOOST_FOREACH(ListenSocket& hListenSocket, vhListenSocket)
if (hListenSocket.socket != INVALID_SOCKET)
if (!CloseSocket(hListenSocket.socket))
@@ -2254,6 +2371,8 @@ void CConnman::Stop()
vhListenSocket.clear();
delete semOutbound;
semOutbound = NULL;
+ delete semAddnode;
+ semAddnode = NULL;
}
void CConnman::DeleteNode(CNode* pnode)
@@ -2347,32 +2466,14 @@ void CConnman::GetNodeStats(std::vector<CNodeStats>& vstats)
vstats.reserve(vNodes.size());
for(std::vector<CNode*>::iterator it = vNodes.begin(); it != vNodes.end(); ++it) {
CNode* pnode = *it;
- CNodeStats stats;
- pnode->copyStats(stats);
- vstats.push_back(stats);
+ vstats.emplace_back();
+ pnode->copyStats(vstats.back());
}
}
-bool CConnman::DisconnectAddress(const CNetAddr& netAddr)
-{
- if (CNode* pnode = FindNode(netAddr)) {
- pnode->fDisconnect = true;
- return true;
- }
- return false;
-}
-
-bool CConnman::DisconnectSubnet(const CSubNet& subNet)
-{
- if (CNode* pnode = FindNode(subNet)) {
- pnode->fDisconnect = true;
- return true;
- }
- return false;
-}
-
bool CConnman::DisconnectNode(const std::string& strNode)
{
+ LOCK(cs_vNodes);
if (CNode* pnode = FindNode(strNode)) {
pnode->fDisconnect = true;
return true;
@@ -2391,16 +2492,6 @@ bool CConnman::DisconnectNode(NodeId id)
return false;
}
-void CConnman::RelayTransaction(const CTransaction& tx)
-{
- CInv inv(MSG_TX, tx.GetHash());
- LOCK(cs_vNodes);
- BOOST_FOREACH(CNode* pnode, vNodes)
- {
- pnode->PushInventory(inv);
- }
-}
-
void CConnman::RecordBytesRecv(uint64_t bytes)
{
LOCK(cs_totalBytesRecv);
@@ -2528,6 +2619,7 @@ unsigned int CConnman::GetReceiveFloodSize() const { return nReceiveFloodSize; }
unsigned int CConnman::GetSendBufferSize() const{ return nSendBufferMaxSize; }
CNode::CNode(NodeId idIn, ServiceFlags nLocalServicesIn, int nMyStartingHeightIn, SOCKET hSocketIn, const CAddress& addrIn, uint64_t nKeyedNetGroupIn, uint64_t nLocalHostNonceIn, const std::string& addrNameIn, bool fInboundIn) :
+ nTimeConnected(GetSystemTimeInSeconds()),
addr(addrIn),
fInbound(fInboundIn),
id(idIn),
@@ -2547,13 +2639,13 @@ CNode::CNode(NodeId idIn, ServiceFlags nLocalServicesIn, int nMyStartingHeightIn
nLastRecv = 0;
nSendBytes = 0;
nRecvBytes = 0;
- nTimeConnected = GetTime();
nTimeOffset = 0;
addrName = addrNameIn == "" ? addr.ToStringIPPort() : addrNameIn;
nVersion = 0;
strSubVer = "";
fWhitelisted = false;
fOneShot = false;
+ fAddnode = false;
fClient = false; // set by version message
fFeeler = false;
fSuccessfullyConnected = false;
@@ -2583,6 +2675,9 @@ CNode::CNode(NodeId idIn, ServiceFlags nLocalServicesIn, int nMyStartingHeightIn
minFeeFilter = 0;
lastSentFeeFilter = 0;
nextSendTimeFeeFilter = 0;
+ fPauseRecv = false;
+ fPauseSend = false;
+ nProcessQueueSize = 0;
BOOST_FOREACH(const std::string &msg, getAllNetMessageTypes())
mapRecvBytesPerMsgCmd[msg] = 0;
@@ -2636,6 +2731,11 @@ void CNode::AskFor(const CInv& inv)
mapAskFor.insert(std::make_pair(nRequestTime, inv));
}
+bool CConnman::NodeFullyConnected(const CNode* pnode)
+{
+ return pnode && pnode->fSuccessfullyConnected && !pnode->fDisconnect;
+}
+
void CConnman::PushMessage(CNode* pnode, CSerializedNetMsg&& msg)
{
size_t nMessageSize = msg.data.size();
@@ -2653,15 +2753,14 @@ void CConnman::PushMessage(CNode* pnode, CSerializedNetMsg&& msg)
size_t nBytesSent = 0;
{
LOCK(pnode->cs_vSend);
- if(pnode->hSocket == INVALID_SOCKET) {
- return;
- }
bool optimisticSend(pnode->vSendMsg.empty());
//log total amount of bytes per command
pnode->mapSendBytesPerMsgCmd[msg.command] += nTotalSize;
pnode->nSendSize += nTotalSize;
+ if (pnode->nSendSize > nSendBufferMaxSize)
+ pnode->fPauseSend = true;
pnode->vSendMsg.push_back(std::move(serializedHeader));
if (nMessageSize)
pnode->vSendMsg.push_back(std::move(msg.data));
@@ -2684,19 +2783,19 @@ bool CConnman::ForNode(NodeId id, std::function<bool(CNode* pnode)> func)
break;
}
}
- return found != nullptr && func(found);
+ return found != nullptr && NodeFullyConnected(found) && func(found);
}
int64_t PoissonNextSend(int64_t nNow, int average_interval_seconds) {
return nNow + (int64_t)(log1p(GetRand(1ULL << 48) * -0.0000000000000035527136788 /* -1/2^48 */) * average_interval_seconds * -1000000.0 + 0.5);
}
-CSipHasher CConnman::GetDeterministicRandomizer(uint64_t id)
+CSipHasher CConnman::GetDeterministicRandomizer(uint64_t id) const
{
return CSipHasher(nSeed0, nSeed1).Write(id);
}
-uint64_t CConnman::CalculateKeyedNetGroup(const CAddress& ad)
+uint64_t CConnman::CalculateKeyedNetGroup(const CAddress& ad) const
{
std::vector<unsigned char> vchNetGroup(ad.GetGroup());