aboutsummaryrefslogtreecommitdiff
path: root/src/net.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/net.cpp')
-rw-r--r--src/net.cpp485
1 files changed, 277 insertions, 208 deletions
diff --git a/src/net.cpp b/src/net.cpp
index 37e7dfed4c..cf94faf854 100644
--- a/src/net.cpp
+++ b/src/net.cpp
@@ -44,10 +44,15 @@
// We add a random period time (0 to 1 seconds) to feeler connections to prevent synchronization.
#define FEELER_SLEEP_WINDOW 1
-#if !defined(HAVE_MSG_NOSIGNAL) && !defined(MSG_NOSIGNAL)
+#if !defined(HAVE_MSG_NOSIGNAL)
#define MSG_NOSIGNAL 0
#endif
+// MSG_DONTWAIT is not available on some platforms, if it doesn't exist define it as 0
+#if !defined(HAVE_MSG_DONTWAIT)
+#define MSG_DONTWAIT 0
+#endif
+
// Fix for ancient MinGW versions, that don't have defined these in ws2tcpip.h.
// Todo: Can be removed when our pull-tester is upgraded to a modern MinGW version.
#ifdef WIN32
@@ -164,8 +169,9 @@ int GetnScore(const CService& addr)
// Is our peer's addrLocal potentially useful as an external IP source?
bool IsPeerAddrLocalGood(CNode *pnode)
{
- return fDiscover && pnode->addr.IsRoutable() && pnode->addrLocal.IsRoutable() &&
- !IsLimited(pnode->addrLocal.GetNetwork());
+ CService addrLocal = pnode->GetAddrLocal();
+ return fDiscover && pnode->addr.IsRoutable() && addrLocal.IsRoutable() &&
+ !IsLimited(addrLocal.GetNetwork());
}
// pushes our own address to a peer
@@ -180,11 +186,11 @@ void AdvertiseLocal(CNode *pnode)
if (IsPeerAddrLocalGood(pnode) && (!addrLocal.IsRoutable() ||
GetRand((GetnScore(addrLocal) > LOCAL_MANUAL) ? 8:2) == 0))
{
- addrLocal.SetIP(pnode->addrLocal);
+ addrLocal.SetIP(pnode->GetAddrLocal());
}
if (addrLocal.IsRoutable())
{
- LogPrint("net", "AdvertiseLocal: advertising address %s\n", addrLocal.ToString());
+ LogPrint(BCLog::NET, "AdvertiseLocal: advertising address %s\n", addrLocal.ToString());
FastRandomContext insecure_rand;
pnode->PushAddress(addrLocal, insecure_rand);
}
@@ -307,9 +313,11 @@ CNode* CConnman::FindNode(const CSubNet& subNet)
CNode* CConnman::FindNode(const std::string& addrName)
{
LOCK(cs_vNodes);
- BOOST_FOREACH(CNode* pnode, vNodes)
- if (pnode->addrName == addrName)
+ BOOST_FOREACH(CNode* pnode, vNodes) {
+ if (pnode->GetAddrName() == addrName) {
return (pnode);
+ }
+ }
return NULL;
}
@@ -342,13 +350,13 @@ CNode* CConnman::ConnectNode(CAddress addrConnect, const char *pszDest, bool fCo
CNode* pnode = FindNode((CService)addrConnect);
if (pnode)
{
- pnode->AddRef();
- return pnode;
+ LogPrintf("Failed to open new connection, already connected\n");
+ return NULL;
}
}
/// debug print
- LogPrint("net", "trying connection %s lastseen=%.1fhrs\n",
+ LogPrint(BCLog::NET, "trying connection %s lastseen=%.1fhrs\n",
pszDest ? pszDest : addrConnect.ToString(),
pszDest ? 0.0 : (double)(GetAdjustedTime() - addrConnect.nTime)/3600.0);
@@ -369,18 +377,14 @@ CNode* CConnman::ConnectNode(CAddress addrConnect, const char *pszDest, bool fCo
// In that case, drop the connection that was just created, and return the existing CNode instead.
// Also store the name we used to connect in that CNode, so that future FindNode() calls to that
// name catch this early.
+ LOCK(cs_vNodes);
CNode* pnode = FindNode((CService)addrConnect);
if (pnode)
{
- pnode->AddRef();
- {
- LOCK(cs_vNodes);
- if (pnode->addrName.empty()) {
- pnode->addrName = std::string(pszDest);
- }
- }
+ pnode->MaybeSetAddrName(std::string(pszDest));
CloseSocket(hSocket);
- return pnode;
+ LogPrintf("Failed to open new connection, already connected\n");
+ return NULL;
}
}
@@ -391,13 +395,7 @@ CNode* CConnman::ConnectNode(CAddress addrConnect, const char *pszDest, bool fCo
uint64_t nonce = GetDeterministicRandomizer(RANDOMIZER_ID_LOCALHOSTNONCE).Write(id).Finalize();
CNode* pnode = new CNode(id, nLocalServices, GetBestHeight(), hSocket, addrConnect, CalculateKeyedNetGroup(addrConnect), nonce, pszDest ? pszDest : "", false);
pnode->nServicesExpected = ServiceFlags(addrConnect.nServices & nRelevantServices);
- pnode->nTimeConnected = GetTime();
pnode->AddRef();
- GetNodeSignals().InitializeNode(pnode, *this);
- {
- LOCK(cs_vNodes);
- vNodes.push_back(pnode);
- }
return pnode;
} else if (!proxyConnectionFailed) {
@@ -425,23 +423,19 @@ void CConnman::DumpBanlist()
if (!bandb.Write(banmap))
SetBannedSetDirty(true);
- LogPrint("net", "Flushed %d banned node ips/subnets to banlist.dat %dms\n",
+ LogPrint(BCLog::NET, "Flushed %d banned node ips/subnets to banlist.dat %dms\n",
banmap.size(), GetTimeMillis() - nStart);
}
void CNode::CloseSocketDisconnect()
{
fDisconnect = true;
+ LOCK(cs_hSocket);
if (hSocket != INVALID_SOCKET)
{
- LogPrint("net", "disconnecting peer=%d\n", id);
+ LogPrint(BCLog::NET, "disconnecting peer=%d\n", id);
CloseSocket(hSocket);
}
-
- // in case this fails, we'll empty the recv buffer when the CNode is deleted
- TRY_LOCK(cs_vRecvMsg, lockRecv);
- if (lockRecv)
- vRecvMsg.clear();
}
void CConnman::ClearBanned()
@@ -571,7 +565,7 @@ void CConnman::SweepBanned()
{
setBanned.erase(it++);
setBannedIsDirty = true;
- LogPrint("net", "%s: Removed banned node ip/subnet from banlist.dat: %s\n", __func__, subNet.ToString());
+ LogPrint(BCLog::NET, "%s: Removed banned node ip/subnet from banlist.dat: %s\n", __func__, subNet.ToString());
}
else
++it;
@@ -605,6 +599,33 @@ void CConnman::AddWhitelistedRange(const CSubNet &subnet) {
vWhitelistedRange.push_back(subnet);
}
+
+std::string CNode::GetAddrName() const {
+ LOCK(cs_addrName);
+ return addrName;
+}
+
+void CNode::MaybeSetAddrName(const std::string& addrNameIn) {
+ LOCK(cs_addrName);
+ if (addrName.empty()) {
+ addrName = addrNameIn;
+ }
+}
+
+CService CNode::GetAddrLocal() const {
+ LOCK(cs_addrLocal);
+ return addrLocal;
+}
+
+void CNode::SetAddrLocal(const CService& addrLocalIn) {
+ LOCK(cs_addrLocal);
+ if (addrLocal.IsValid()) {
+ error("Addr local already set for node: %i. Refusing to change from %s to %s", id, addrLocal.ToString(), addrLocalIn.ToString());
+ } else {
+ addrLocal = addrLocalIn;
+ }
+}
+
#undef X
#define X(name) stats.name = name
void CNode::copyStats(CNodeStats &stats)
@@ -612,21 +633,33 @@ void CNode::copyStats(CNodeStats &stats)
stats.nodeid = this->GetId();
X(nServices);
X(addr);
- X(fRelayTxes);
+ {
+ LOCK(cs_filter);
+ X(fRelayTxes);
+ }
X(nLastSend);
X(nLastRecv);
X(nTimeConnected);
X(nTimeOffset);
- X(addrName);
+ stats.addrName = GetAddrName();
X(nVersion);
- X(cleanSubVer);
+ {
+ LOCK(cs_SubVer);
+ X(cleanSubVer);
+ }
X(fInbound);
X(fAddnode);
X(nStartingHeight);
- X(nSendBytes);
- X(mapSendBytesPerMsgCmd);
- X(nRecvBytes);
- X(mapRecvBytesPerMsgCmd);
+ {
+ LOCK(cs_vSend);
+ X(mapSendBytesPerMsgCmd);
+ X(nSendBytes);
+ }
+ {
+ LOCK(cs_vRecv);
+ X(mapRecvBytesPerMsgCmd);
+ X(nRecvBytes);
+ }
X(fWhitelisted);
// It is common for nodes with good ping times to suddenly become lagged,
@@ -646,20 +679,24 @@ void CNode::copyStats(CNodeStats &stats)
stats.dPingWait = (((double)nPingUsecWait) / 1e6);
// Leave string empty if addrLocal invalid (not filled in yet)
- stats.addrLocal = addrLocal.IsValid() ? addrLocal.ToString() : "";
+ CService addrLocalUnlocked = GetAddrLocal();
+ stats.addrLocal = addrLocalUnlocked.IsValid() ? addrLocalUnlocked.ToString() : "";
}
#undef X
-// requires LOCK(cs_vRecvMsg)
bool CNode::ReceiveMsgBytes(const char *pch, unsigned int nBytes, bool& complete)
{
complete = false;
+ int64_t nTimeMicros = GetTimeMicros();
+ LOCK(cs_vRecv);
+ nLastRecv = nTimeMicros / 1000000;
+ nRecvBytes += nBytes;
while (nBytes > 0) {
// get current incomplete message, or create a new one
if (vRecvMsg.empty() ||
vRecvMsg.back().complete())
- vRecvMsg.push_back(CNetMessage(Params().MessageStart(), SER_NETWORK, nRecvVersion));
+ vRecvMsg.push_back(CNetMessage(Params().MessageStart(), SER_NETWORK, INIT_PROTO_VERSION));
CNetMessage& msg = vRecvMsg.back();
@@ -674,7 +711,7 @@ bool CNode::ReceiveMsgBytes(const char *pch, unsigned int nBytes, bool& complete
return false;
if (msg.in_data && msg.hdr.nMessageSize > MAX_PROTOCOL_MESSAGE_LENGTH) {
- LogPrint("net", "Oversized message from peer=%i, disconnecting\n", GetId());
+ LogPrint(BCLog::NET, "Oversized message from peer=%i, disconnecting\n", GetId());
return false;
}
@@ -691,7 +728,7 @@ bool CNode::ReceiveMsgBytes(const char *pch, unsigned int nBytes, bool& complete
assert(i != mapRecvBytesPerMsgCmd.end());
i->second += msg.hdr.nMessageSize + CMessageHeader::HEADER_SIZE;
- msg.nTime = GetTimeMicros();
+ msg.nTime = nTimeMicros;
complete = true;
}
}
@@ -699,6 +736,33 @@ bool CNode::ReceiveMsgBytes(const char *pch, unsigned int nBytes, bool& complete
return true;
}
+void CNode::SetSendVersion(int nVersionIn)
+{
+ // Send version may only be changed in the version message, and
+ // only one version message is allowed per session. We can therefore
+ // treat this value as const and even atomic as long as it's only used
+ // once a version message has been successfully processed. Any attempt to
+ // set this twice is an error.
+ if (nSendVersion != 0) {
+ error("Send version already set for node: %i. Refusing to change from %i to %i", id, nSendVersion, nVersionIn);
+ } else {
+ nSendVersion = nVersionIn;
+ }
+}
+
+int CNode::GetSendVersion() const
+{
+ // The send version should always be explicitly set to
+ // INIT_PROTO_VERSION rather than using this value until SetSendVersion
+ // has been called.
+ if (nSendVersion == 0) {
+ error("Requesting unset send version for node: %i. Using %i", id, INIT_PROTO_VERSION);
+ return INIT_PROTO_VERSION;
+ }
+ return nSendVersion;
+}
+
+
int CNetMessage::readHeader(const char *pch, unsigned int nBytes)
{
// copy data to temporary parsing buffer
@@ -764,7 +828,7 @@ const uint256& CNetMessage::GetMessageHash() const
// requires LOCK(cs_vSend)
-size_t SocketSendData(CNode *pnode)
+size_t CConnman::SocketSendData(CNode *pnode) const
{
auto it = pnode->vSendMsg.begin();
size_t nSentSize = 0;
@@ -772,15 +836,22 @@ size_t SocketSendData(CNode *pnode)
while (it != pnode->vSendMsg.end()) {
const auto &data = *it;
assert(data.size() > pnode->nSendOffset);
- int nBytes = send(pnode->hSocket, reinterpret_cast<const char*>(data.data()) + pnode->nSendOffset, data.size() - pnode->nSendOffset, MSG_NOSIGNAL | MSG_DONTWAIT);
+ int nBytes = 0;
+ {
+ LOCK(pnode->cs_hSocket);
+ if (pnode->hSocket == INVALID_SOCKET)
+ break;
+ nBytes = send(pnode->hSocket, reinterpret_cast<const char*>(data.data()) + pnode->nSendOffset, data.size() - pnode->nSendOffset, MSG_NOSIGNAL | MSG_DONTWAIT);
+ }
if (nBytes > 0) {
- pnode->nLastSend = GetTime();
+ pnode->nLastSend = GetSystemTimeInSeconds();
pnode->nSendBytes += nBytes;
pnode->nSendOffset += nBytes;
nSentSize += nBytes;
if (pnode->nSendOffset == data.size()) {
pnode->nSendOffset = 0;
pnode->nSendSize -= data.size();
+ pnode->fPauseSend = pnode->nSendSize > nSendBufferMaxSize;
it++;
} else {
// could not send full message; stop sending more
@@ -1016,7 +1087,7 @@ void CConnman::AcceptConnection(const ListenSocket& hListenSocket) {
{
if (!AttemptToEvictConnection()) {
// No connection to evict, disconnect the new connection
- LogPrint("net", "failed to find an eviction candidate - connection dropped (full)\n");
+ LogPrint(BCLog::NET, "failed to find an eviction candidate - connection dropped (full)\n");
CloseSocket(hSocket);
return;
}
@@ -1030,7 +1101,7 @@ void CConnman::AcceptConnection(const ListenSocket& hListenSocket) {
pnode->fWhitelisted = whitelisted;
GetNodeSignals().InitializeNode(pnode, *this);
- LogPrint("net", "connection from %s accepted\n", addr.ToString());
+ LogPrint(BCLog::NET, "connection from %s accepted\n", addr.ToString());
{
LOCK(cs_vNodes);
@@ -1052,8 +1123,7 @@ void CConnman::ThreadSocketHandler()
std::vector<CNode*> vNodesCopy = vNodes;
BOOST_FOREACH(CNode* pnode, vNodesCopy)
{
- if (pnode->fDisconnect ||
- (pnode->GetRefCount() <= 0 && pnode->vRecvMsg.empty() && pnode->nSendSize == 0))
+ if (pnode->fDisconnect)
{
// remove from vNodes
vNodes.erase(remove(vNodes.begin(), vNodes.end(), pnode), vNodes.end());
@@ -1076,24 +1146,18 @@ void CConnman::ThreadSocketHandler()
BOOST_FOREACH(CNode* pnode, vNodesDisconnectedCopy)
{
// wait until threads are done using it
- if (pnode->GetRefCount() <= 0)
- {
+ if (pnode->GetRefCount() <= 0) {
bool fDelete = false;
{
- TRY_LOCK(pnode->cs_vSend, lockSend);
- if (lockSend)
- {
- TRY_LOCK(pnode->cs_vRecvMsg, lockRecv);
- if (lockRecv)
- {
- TRY_LOCK(pnode->cs_inventory, lockInv);
- if (lockInv)
- fDelete = true;
+ TRY_LOCK(pnode->cs_inventory, lockInv);
+ if (lockInv) {
+ TRY_LOCK(pnode->cs_vSend, lockSend);
+ if (lockSend) {
+ fDelete = true;
}
}
}
- if (fDelete)
- {
+ if (fDelete) {
vNodesDisconnected.remove(pnode);
DeleteNode(pnode);
}
@@ -1137,42 +1201,38 @@ void CConnman::ThreadSocketHandler()
LOCK(cs_vNodes);
BOOST_FOREACH(CNode* pnode, vNodes)
{
- if (pnode->hSocket == INVALID_SOCKET)
- continue;
- FD_SET(pnode->hSocket, &fdsetError);
- hSocketMax = std::max(hSocketMax, pnode->hSocket);
- have_fds = true;
-
// Implement the following logic:
// * If there is data to send, select() for sending data. As this only
// happens when optimistic write failed, we choose to first drain the
// write buffer in this case before receiving more. This avoids
// needlessly queueing received data, if the remote peer is not themselves
// receiving data. This means properly utilizing TCP flow control signalling.
- // * Otherwise, if there is no (complete) message in the receive buffer,
- // or there is space left in the buffer, select() for receiving data.
- // * (if neither of the above applies, there is certainly one message
- // in the receiver buffer ready to be processed).
- // Together, that means that at least one of the following is always possible,
- // so we don't deadlock:
- // * We send some data.
- // * We wait for data to be received (and disconnect after timeout).
- // * We process a message in the buffer (message handler thread).
+ // * Otherwise, if there is space left in the receive buffer, select() for
+ // receiving data.
+ // * Hand off all complete messages to the processor, to be handled without
+ // blocking here.
+
+ bool select_recv = !pnode->fPauseRecv;
+ bool select_send;
{
- TRY_LOCK(pnode->cs_vSend, lockSend);
- if (lockSend) {
- if (!pnode->vSendMsg.empty()) {
- FD_SET(pnode->hSocket, &fdsetSend);
- continue;
- }
- }
+ LOCK(pnode->cs_vSend);
+ select_send = !pnode->vSendMsg.empty();
}
- {
- TRY_LOCK(pnode->cs_vRecvMsg, lockRecv);
- if (lockRecv && (
- pnode->vRecvMsg.empty() || !pnode->vRecvMsg.front().complete() ||
- pnode->GetTotalRecvSize() <= GetReceiveFloodSize()))
- FD_SET(pnode->hSocket, &fdsetRecv);
+
+ LOCK(pnode->cs_hSocket);
+ if (pnode->hSocket == INVALID_SOCKET)
+ continue;
+
+ FD_SET(pnode->hSocket, &fdsetError);
+ hSocketMax = std::max(hSocketMax, pnode->hSocket);
+ have_fds = true;
+
+ if (select_send) {
+ FD_SET(pnode->hSocket, &fdsetSend);
+ continue;
+ }
+ if (select_recv) {
+ FD_SET(pnode->hSocket, &fdsetRecv);
}
}
}
@@ -1226,33 +1286,59 @@ void CConnman::ThreadSocketHandler()
//
// Receive
//
- if (pnode->hSocket == INVALID_SOCKET)
- continue;
- if (FD_ISSET(pnode->hSocket, &fdsetRecv) || FD_ISSET(pnode->hSocket, &fdsetError))
+ bool recvSet = false;
+ bool sendSet = false;
+ bool errorSet = false;
+ {
+ LOCK(pnode->cs_hSocket);
+ if (pnode->hSocket == INVALID_SOCKET)
+ continue;
+ recvSet = FD_ISSET(pnode->hSocket, &fdsetRecv);
+ sendSet = FD_ISSET(pnode->hSocket, &fdsetSend);
+ errorSet = FD_ISSET(pnode->hSocket, &fdsetError);
+ }
+ if (recvSet || errorSet)
{
- TRY_LOCK(pnode->cs_vRecvMsg, lockRecv);
- if (lockRecv)
{
{
// typical socket buffer is 8K-64K
char pchBuf[0x10000];
- int nBytes = recv(pnode->hSocket, pchBuf, sizeof(pchBuf), MSG_DONTWAIT);
+ int nBytes = 0;
+ {
+ LOCK(pnode->cs_hSocket);
+ if (pnode->hSocket == INVALID_SOCKET)
+ continue;
+ nBytes = recv(pnode->hSocket, pchBuf, sizeof(pchBuf), MSG_DONTWAIT);
+ }
if (nBytes > 0)
{
bool notify = false;
if (!pnode->ReceiveMsgBytes(pchBuf, nBytes, notify))
pnode->CloseSocketDisconnect();
- if(notify)
- condMsgProc.notify_one();
- pnode->nLastRecv = GetTime();
- pnode->nRecvBytes += nBytes;
RecordBytesRecv(nBytes);
+ if (notify) {
+ size_t nSizeAdded = 0;
+ auto it(pnode->vRecvMsg.begin());
+ for (; it != pnode->vRecvMsg.end(); ++it) {
+ if (!it->complete())
+ break;
+ nSizeAdded += it->vRecv.size() + CMessageHeader::HEADER_SIZE;
+ }
+ {
+ LOCK(pnode->cs_vProcessMsg);
+ pnode->vProcessMsg.splice(pnode->vProcessMsg.end(), pnode->vRecvMsg, pnode->vRecvMsg.begin(), it);
+ pnode->nProcessQueueSize += nSizeAdded;
+ pnode->fPauseRecv = pnode->nProcessQueueSize > nReceiveFloodSize;
+ }
+ WakeMessageHandler();
+ }
}
else if (nBytes == 0)
{
// socket closed gracefully
- if (!pnode->fDisconnect)
- LogPrint("net", "socket closed\n");
+ if (!pnode->fDisconnect) {
+ LogPrint(BCLog::NET, "socket closed\n");
+ }
pnode->CloseSocketDisconnect();
}
else if (nBytes < 0)
@@ -1273,27 +1359,24 @@ void CConnman::ThreadSocketHandler()
//
// Send
//
- if (pnode->hSocket == INVALID_SOCKET)
- continue;
- if (FD_ISSET(pnode->hSocket, &fdsetSend))
+ if (sendSet)
{
- TRY_LOCK(pnode->cs_vSend, lockSend);
- if (lockSend) {
- size_t nBytes = SocketSendData(pnode);
- if (nBytes)
- RecordBytesSent(nBytes);
+ LOCK(pnode->cs_vSend);
+ size_t nBytes = SocketSendData(pnode);
+ if (nBytes) {
+ RecordBytesSent(nBytes);
}
}
//
// Inactivity checking
//
- int64_t nTime = GetTime();
+ int64_t nTime = GetSystemTimeInSeconds();
if (nTime - pnode->nTimeConnected > 60)
{
if (pnode->nLastRecv == 0 || pnode->nLastSend == 0)
{
- LogPrint("net", "socket no message in first 60 seconds, %d %d from %d\n", pnode->nLastRecv != 0, pnode->nLastSend != 0, pnode->id);
+ LogPrint(BCLog::NET, "socket no message in first 60 seconds, %d %d from %d\n", pnode->nLastRecv != 0, pnode->nLastSend != 0, pnode->id);
pnode->fDisconnect = true;
}
else if (nTime - pnode->nLastSend > TIMEOUT_INTERVAL)
@@ -1311,6 +1394,11 @@ void CConnman::ThreadSocketHandler()
LogPrintf("ping timeout: %fs\n", 0.000001 * (GetTimeMicros() - pnode->nPingUsecStart));
pnode->fDisconnect = true;
}
+ else if (!pnode->fSuccessfullyConnected)
+ {
+ LogPrintf("version handshake timeout from %d\n", pnode->id);
+ pnode->fDisconnect = true;
+ }
}
}
{
@@ -1321,8 +1409,14 @@ void CConnman::ThreadSocketHandler()
}
}
-
-
+void CConnman::WakeMessageHandler()
+{
+ {
+ std::lock_guard<std::mutex> lock(mutexMsgProc);
+ fMsgProcWake = true;
+ }
+ condMsgProc.notify_one();
+}
@@ -1541,7 +1635,7 @@ void CConnman::DumpAddresses()
CAddrDB adb;
adb.Write(addrman);
- LogPrint("net", "Flushed %d addresses to peers.dat %dms\n",
+ LogPrint(BCLog::NET, "Flushed %d addresses to peers.dat %dms\n",
addrman.size(), GetTimeMillis() - nStart);
}
@@ -1714,7 +1808,7 @@ void CConnman::ThreadOpenConnections()
int randsleep = GetRandInt(FEELER_SLEEP_WINDOW * 1000);
if (!interruptNet.sleep_for(std::chrono::milliseconds(randsleep)))
return;
- LogPrint("net", "Making feeler connection to %s\n", addrConnect.ToString());
+ LogPrint(BCLog::NET, "Making feeler connection to %s\n", addrConnect.ToString());
}
OpenNetworkConnection(addrConnect, (int)setConnected.size() >= std::min(nMaxConnections - 1, 2), &grant, NULL, false, fFeeler);
@@ -1744,8 +1838,9 @@ std::vector<AddedNodeInfo> CConnman::GetAddedNodeInfo()
if (pnode->addr.IsValid()) {
mapConnected[pnode->addr] = pnode->fInbound;
}
- if (!pnode->addrName.empty()) {
- mapConnectedByName[pnode->addrName] = std::make_pair(pnode->fInbound, static_cast<const CService&>(pnode->addr));
+ std::string addrName = pnode->GetAddrName();
+ if (!addrName.empty()) {
+ mapConnectedByName[std::move(addrName)] = std::make_pair(pnode->fInbound, static_cast<const CService&>(pnode->addr));
}
}
}
@@ -1842,6 +1937,12 @@ bool CConnman::OpenNetworkConnection(const CAddress& addrConnect, bool fCountFai
if (fAddnode)
pnode->fAddnode = true;
+ GetNodeSignals().InitializeNode(pnode, *this);
+ {
+ LOCK(cs_vNodes);
+ vNodes.push_back(pnode);
+ }
+
return true;
}
@@ -1858,7 +1959,7 @@ void CConnman::ThreadMessageHandler()
}
}
- bool fSleep = true;
+ bool fMoreWork = false;
BOOST_FOREACH(CNode* pnode, vNodesCopy)
{
@@ -1866,30 +1967,15 @@ void CConnman::ThreadMessageHandler()
continue;
// Receive messages
- {
- TRY_LOCK(pnode->cs_vRecvMsg, lockRecv);
- if (lockRecv)
- {
- if (!GetNodeSignals().ProcessMessages(pnode, *this, flagInterruptMsgProc))
- pnode->CloseSocketDisconnect();
-
- if (pnode->nSendSize < GetSendBufferSize())
- {
- if (!pnode->vRecvGetData.empty() || (!pnode->vRecvMsg.empty() && pnode->vRecvMsg[0].complete()))
- {
- fSleep = false;
- }
- }
- }
- }
+ bool fMoreNodeWork = GetNodeSignals().ProcessMessages(pnode, *this, flagInterruptMsgProc);
+ fMoreWork |= (fMoreNodeWork && !pnode->fPauseSend);
if (flagInterruptMsgProc)
return;
// Send messages
{
- TRY_LOCK(pnode->cs_vSend, lockSend);
- if (lockSend)
- GetNodeSignals().SendMessages(pnode, *this, flagInterruptMsgProc);
+ LOCK(pnode->cs_sendProcessing);
+ GetNodeSignals().SendMessages(pnode, *this, flagInterruptMsgProc);
}
if (flagInterruptMsgProc)
return;
@@ -1901,10 +1987,11 @@ void CConnman::ThreadMessageHandler()
pnode->Release();
}
- if (fSleep) {
- std::unique_lock<std::mutex> lock(mutexMsgProc);
- condMsgProc.wait_until(lock, std::chrono::steady_clock::now() + std::chrono::milliseconds(100));
+ std::unique_lock<std::mutex> lock(mutexMsgProc);
+ if (!fMoreWork) {
+ condMsgProc.wait_until(lock, std::chrono::steady_clock::now() + std::chrono::milliseconds(100), [this] { return fMsgProcWake; });
}
+ fMsgProcWake = false;
}
}
@@ -2064,9 +2151,7 @@ void Discover(boost::thread_group& threadGroup)
void CConnman::SetNetworkActive(bool active)
{
- if (fDebug) {
- LogPrint("net", "SetNetworkActive: %s\n", active);
- }
+ LogPrint(BCLog::NET, "SetNetworkActive: %s\n", active);
if (!active) {
fNetworkActive = false;
@@ -2121,7 +2206,7 @@ bool CConnman::Start(CScheduler& scheduler, std::string& strNodeError, Options c
nMaxFeeler = connOptions.nMaxFeeler;
nSendBufferMaxSize = connOptions.nSendBufferMaxSize;
- nReceiveFloodSize = connOptions.nSendBufferMaxSize;
+ nReceiveFloodSize = connOptions.nReceiveFloodSize;
nMaxOutboundLimit = connOptions.nMaxOutboundLimit;
nMaxOutboundTimeframe = connOptions.nMaxOutboundTimeframe;
@@ -2129,8 +2214,9 @@ bool CConnman::Start(CScheduler& scheduler, std::string& strNodeError, Options c
SetBestHeight(connOptions.nBestHeight);
clientInterface = connOptions.uiInterface;
- if (clientInterface)
- clientInterface->InitMessage(_("Loading addresses..."));
+ if (clientInterface) {
+ clientInterface->InitMessage(_("Loading P2P addresses..."));
+ }
// Load addresses from peers.dat
int64_t nStart = GetTimeMillis();
{
@@ -2154,7 +2240,7 @@ bool CConnman::Start(CScheduler& scheduler, std::string& strNodeError, Options c
SetBannedSetDirty(false); // no need to write down, just read data
SweepBanned(); // sweep out unused entries
- LogPrint("net", "Loaded %d banned node ips/subnets from banlist.dat %dms\n",
+ LogPrint(BCLog::NET, "Loaded %d banned node ips/subnets from banlist.dat %dms\n",
banmap.size(), GetTimeMillis() - nStart);
} else {
LogPrintf("Invalid or missing banlist.dat; recreating\n");
@@ -2182,6 +2268,11 @@ bool CConnman::Start(CScheduler& scheduler, std::string& strNodeError, Options c
interruptNet.reset();
flagInterruptMsgProc = false;
+ {
+ std::unique_lock<std::mutex> lock(mutexMsgProc);
+ fMsgProcWake = false;
+ }
+
// Send and receive from sockets, accept connections
threadSocketHandler = std::thread(&TraceThread<std::function<void()> >, "net", std::function<void()>(std::bind(&CConnman::ThreadSocketHandler, this)));
@@ -2201,7 +2292,7 @@ bool CConnman::Start(CScheduler& scheduler, std::string& strNodeError, Options c
threadMessageHandler = std::thread(&TraceThread<std::function<void()> >, "msghand", std::function<void()>(std::bind(&CConnman::ThreadMessageHandler, this)));
// Dump network addresses
- scheduler.scheduleEvery(boost::bind(&CConnman::DumpData, this), DUMP_ADDRESSES_INTERVAL);
+ scheduler.scheduleEvery(std::bind(&CConnman::DumpData, this), DUMP_ADDRESSES_INTERVAL * 1000);
return true;
}
@@ -2232,9 +2323,17 @@ void CConnman::Interrupt()
interruptNet();
InterruptSocks5(true);
- if (semOutbound)
- for (int i=0; i<(nMaxOutbound + nMaxFeeler); i++)
+ if (semOutbound) {
+ for (int i=0; i<(nMaxOutbound + nMaxFeeler); i++) {
semOutbound->post();
+ }
+ }
+
+ if (semAddnode) {
+ for (int i=0; i<nMaxAddnode; i++) {
+ semAddnode->post();
+ }
+ }
}
void CConnman::Stop()
@@ -2250,10 +2349,6 @@ void CConnman::Stop()
if (threadSocketHandler.joinable())
threadSocketHandler.join();
- if (semAddnode)
- for (int i=0; i<nMaxAddnode; i++)
- semOutbound->post();
-
if (fAddressesInitialized)
{
DumpData();
@@ -2262,8 +2357,7 @@ void CConnman::Stop()
// Close sockets
BOOST_FOREACH(CNode* pnode, vNodes)
- if (pnode->hSocket != INVALID_SOCKET)
- CloseSocket(pnode->hSocket);
+ pnode->CloseSocketDisconnect();
BOOST_FOREACH(ListenSocket& hListenSocket, vhListenSocket)
if (hListenSocket.socket != INVALID_SOCKET)
if (!CloseSocket(hListenSocket.socket))
@@ -2316,11 +2410,6 @@ void CConnman::MarkAddressGood(const CAddress& addr)
addrman.Good(addr);
}
-void CConnman::AddNewAddress(const CAddress& addr, const CAddress& addrFrom, int64_t nTimePenalty)
-{
- addrman.Add(addr, addrFrom, nTimePenalty);
-}
-
void CConnman::AddNewAddresses(const std::vector<CAddress>& vAddr, const CAddress& addrFrom, int64_t nTimePenalty)
{
addrman.Add(vAddr, addrFrom, nTimePenalty);
@@ -2376,32 +2465,14 @@ void CConnman::GetNodeStats(std::vector<CNodeStats>& vstats)
vstats.reserve(vNodes.size());
for(std::vector<CNode*>::iterator it = vNodes.begin(); it != vNodes.end(); ++it) {
CNode* pnode = *it;
- CNodeStats stats;
- pnode->copyStats(stats);
- vstats.push_back(stats);
- }
-}
-
-bool CConnman::DisconnectAddress(const CNetAddr& netAddr)
-{
- if (CNode* pnode = FindNode(netAddr)) {
- pnode->fDisconnect = true;
- return true;
+ vstats.emplace_back();
+ pnode->copyStats(vstats.back());
}
- return false;
-}
-
-bool CConnman::DisconnectSubnet(const CSubNet& subNet)
-{
- if (CNode* pnode = FindNode(subNet)) {
- pnode->fDisconnect = true;
- return true;
- }
- return false;
}
bool CConnman::DisconnectNode(const std::string& strNode)
{
+ LOCK(cs_vNodes);
if (CNode* pnode = FindNode(strNode)) {
pnode->fDisconnect = true;
return true;
@@ -2420,16 +2491,6 @@ bool CConnman::DisconnectNode(NodeId id)
return false;
}
-void CConnman::RelayTransaction(const CTransaction& tx)
-{
- CInv inv(MSG_TX, tx.GetHash());
- LOCK(cs_vNodes);
- BOOST_FOREACH(CNode* pnode, vNodes)
- {
- pnode->PushInventory(inv);
- }
-}
-
void CConnman::RecordBytesRecv(uint64_t bytes)
{
LOCK(cs_totalBytesRecv);
@@ -2557,6 +2618,7 @@ unsigned int CConnman::GetReceiveFloodSize() const { return nReceiveFloodSize; }
unsigned int CConnman::GetSendBufferSize() const{ return nSendBufferMaxSize; }
CNode::CNode(NodeId idIn, ServiceFlags nLocalServicesIn, int nMyStartingHeightIn, SOCKET hSocketIn, const CAddress& addrIn, uint64_t nKeyedNetGroupIn, uint64_t nLocalHostNonceIn, const std::string& addrNameIn, bool fInboundIn) :
+ nTimeConnected(GetSystemTimeInSeconds()),
addr(addrIn),
fInbound(fInboundIn),
id(idIn),
@@ -2576,7 +2638,6 @@ CNode::CNode(NodeId idIn, ServiceFlags nLocalServicesIn, int nMyStartingHeightIn
nLastRecv = 0;
nSendBytes = 0;
nRecvBytes = 0;
- nTimeConnected = GetTime();
nTimeOffset = 0;
addrName = addrNameIn == "" ? addr.ToStringIPPort() : addrNameIn;
nVersion = 0;
@@ -2613,15 +2674,19 @@ CNode::CNode(NodeId idIn, ServiceFlags nLocalServicesIn, int nMyStartingHeightIn
minFeeFilter = 0;
lastSentFeeFilter = 0;
nextSendTimeFeeFilter = 0;
+ fPauseRecv = false;
+ fPauseSend = false;
+ nProcessQueueSize = 0;
BOOST_FOREACH(const std::string &msg, getAllNetMessageTypes())
mapRecvBytesPerMsgCmd[msg] = 0;
mapRecvBytesPerMsgCmd[NET_MESSAGE_COMMAND_OTHER] = 0;
- if (fLogIPs)
- LogPrint("net", "Added connection to %s peer=%d\n", addrName, id);
- else
- LogPrint("net", "Added connection peer=%d\n", id);
+ if (fLogIPs) {
+ LogPrint(BCLog::NET, "Added connection to %s peer=%d\n", addrName, id);
+ } else {
+ LogPrint(BCLog::NET, "Added connection peer=%d\n", id);
+ }
}
CNode::~CNode()
@@ -2648,7 +2713,7 @@ void CNode::AskFor(const CInv& inv)
nRequestTime = it->second;
else
nRequestTime = 0;
- LogPrint("net", "askfor %s %d (%s) peer=%d\n", inv.ToString(), nRequestTime, DateTimeStrFormat("%H:%M:%S", nRequestTime/1000000), id);
+ LogPrint(BCLog::NET, "askfor %s %d (%s) peer=%d\n", inv.ToString(), nRequestTime, DateTimeStrFormat("%H:%M:%S", nRequestTime/1000000), id);
// Make sure not to reuse time indexes to keep things in the same order
int64_t nNow = GetTimeMicros() - 1000000;
@@ -2666,11 +2731,16 @@ void CNode::AskFor(const CInv& inv)
mapAskFor.insert(std::make_pair(nRequestTime, inv));
}
+bool CConnman::NodeFullyConnected(const CNode* pnode)
+{
+ return pnode && pnode->fSuccessfullyConnected && !pnode->fDisconnect;
+}
+
void CConnman::PushMessage(CNode* pnode, CSerializedNetMsg&& msg)
{
size_t nMessageSize = msg.data.size();
size_t nTotalSize = nMessageSize + CMessageHeader::HEADER_SIZE;
- LogPrint("net", "sending %s (%d bytes) peer=%d\n", SanitizeString(msg.command.c_str()), nMessageSize, pnode->id);
+ LogPrint(BCLog::NET, "sending %s (%d bytes) peer=%d\n", SanitizeString(msg.command.c_str()), nMessageSize, pnode->id);
std::vector<unsigned char> serializedHeader;
serializedHeader.reserve(CMessageHeader::HEADER_SIZE);
@@ -2683,15 +2753,14 @@ void CConnman::PushMessage(CNode* pnode, CSerializedNetMsg&& msg)
size_t nBytesSent = 0;
{
LOCK(pnode->cs_vSend);
- if(pnode->hSocket == INVALID_SOCKET) {
- return;
- }
bool optimisticSend(pnode->vSendMsg.empty());
//log total amount of bytes per command
pnode->mapSendBytesPerMsgCmd[msg.command] += nTotalSize;
pnode->nSendSize += nTotalSize;
+ if (pnode->nSendSize > nSendBufferMaxSize)
+ pnode->fPauseSend = true;
pnode->vSendMsg.push_back(std::move(serializedHeader));
if (nMessageSize)
pnode->vSendMsg.push_back(std::move(msg.data));
@@ -2714,19 +2783,19 @@ bool CConnman::ForNode(NodeId id, std::function<bool(CNode* pnode)> func)
break;
}
}
- return found != nullptr && func(found);
+ return found != nullptr && NodeFullyConnected(found) && func(found);
}
int64_t PoissonNextSend(int64_t nNow, int average_interval_seconds) {
return nNow + (int64_t)(log1p(GetRand(1ULL << 48) * -0.0000000000000035527136788 /* -1/2^48 */) * average_interval_seconds * -1000000.0 + 0.5);
}
-CSipHasher CConnman::GetDeterministicRandomizer(uint64_t id)
+CSipHasher CConnman::GetDeterministicRandomizer(uint64_t id) const
{
return CSipHasher(nSeed0, nSeed1).Write(id);
}
-uint64_t CConnman::CalculateKeyedNetGroup(const CAddress& ad)
+uint64_t CConnman::CalculateKeyedNetGroup(const CAddress& ad) const
{
std::vector<unsigned char> vchNetGroup(ad.GetGroup());