aboutsummaryrefslogtreecommitdiff
path: root/src/net.cpp
diff options
context:
space:
mode:
authorCory Fields <cory-nospam-@coryfields.com>2016-09-12 20:00:33 -0400
committerPieter Wuille <pieter.wuille@gmail.com>2016-11-03 13:32:09 -0700
commit3e32cd09f643bf7d4344d3bb2e1136f186f3d109 (patch)
tree315c62f9fa5df102dce6f211083bfa682c3794be /src/net.cpp
parentb98c14c4e362a8e59d7a3b021651a19ea61b29dd (diff)
connman is in charge of pushing messages
The changes here are dense and subtle, but hopefully all is more explicit than before. - CConnman is now in charge of sending data rather than the nodes themselves. This is necessary because many decisions need to be made with all nodes in mind, and a model that requires the nodes calling up to their manager quickly turns to spaghetti. - The per-node-serializer (ssSend) has been replaced with a (quasi-)const send-version. Since the send version for serialization can only change once per connection, we now explicitly tag messages with INIT_PROTO_VERSION if they are sent before the handshake. With this done, there's no need to lock for access to nSendVersion. Also, a new stream is used for each message, so there's no need to lock during the serialization process. - This takes care of accounting for optimistic sends, so the nOptimisticBytesWritten hack can be removed. - -dropmessagestest and -fuzzmessagestest have not been preserved, as I suspect they haven't been used in years.
Diffstat (limited to 'src/net.cpp')
-rw-r--r--src/net.cpp91
1 files changed, 69 insertions, 22 deletions
diff --git a/src/net.cpp b/src/net.cpp
index 18d25cbcd1..f271aed242 100644
--- a/src/net.cpp
+++ b/src/net.cpp
@@ -394,6 +394,9 @@ CNode* CConnman::ConnectNode(CAddress addrConnect, const char *pszDest, bool fCo
uint64_t nonce = GetDeterministicRandomizer(RANDOMIZER_ID_LOCALHOSTNONCE).Write(id).Finalize();
CNode* pnode = new CNode(id, nLocalServices, GetBestHeight(), hSocket, addrConnect, CalculateKeyedNetGroup(addrConnect), nonce, pszDest ? pszDest : "", false);
+
+ PushVersion(pnode, GetTime());
+
GetNodeSignals().InitializeNode(pnode->GetId(), pnode);
pnode->AddRef();
@@ -415,6 +418,24 @@ CNode* CConnman::ConnectNode(CAddress addrConnect, const char *pszDest, bool fCo
return NULL;
}
+void CConnman::PushVersion(CNode* pnode, int64_t nTime)
+{
+ ServiceFlags nLocalNodeServices = pnode->GetLocalServices();
+ CAddress addrYou = (pnode->addr.IsRoutable() && !IsProxy(pnode->addr) ? pnode->addr : CAddress(CService(), pnode->addr.nServices));
+ CAddress addrMe = CAddress(CService(), nLocalNodeServices);
+ uint64_t nonce = pnode->GetLocalNonce();
+ int nNodeStartingHeight = pnode->nMyStartingHeight;
+ NodeId id = pnode->GetId();
+
+ PushMessageWithVersion(pnode, INIT_PROTO_VERSION, NetMsgType::VERSION, PROTOCOL_VERSION, (uint64_t)nLocalNodeServices, nTime, addrYou, addrMe,
+ nonce, strSubVersion, nNodeStartingHeight, ::fRelayTxes);
+
+ if (fLogIPs)
+ LogPrint("net", "send version message: version %d, blocks=%d, us=%s, them=%s, peer=%d\n", PROTOCOL_VERSION, nNodeStartingHeight, addrMe.ToString(), addrYou.ToString(), id);
+ else
+ LogPrint("net", "send version message: version %d, blocks=%d, us=%s, peer=%d\n", PROTOCOL_VERSION, nNodeStartingHeight, addrMe.ToString(), id);
+}
+
void CConnman::DumpBanlist()
{
SweepBanned(); // clean unused entries (if bantime has expired)
@@ -450,23 +471,6 @@ void CNode::CloseSocketDisconnect()
vRecvMsg.clear();
}
-void CNode::PushVersion()
-{
- int64_t nTime = (fInbound ? GetAdjustedTime() : GetTime());
- CAddress addrYou = (addr.IsRoutable() && !IsProxy(addr) ? addr : CAddress(CService(), addr.nServices));
- CAddress addrMe = CAddress(CService(), nLocalServices);
- if (fLogIPs)
- LogPrint("net", "send version message: version %d, blocks=%d, us=%s, them=%s, peer=%d\n", PROTOCOL_VERSION, nMyStartingHeight, addrMe.ToString(), addrYou.ToString(), id);
- else
- LogPrint("net", "send version message: version %d, blocks=%d, us=%s, peer=%d\n", PROTOCOL_VERSION, nMyStartingHeight, addrMe.ToString(), id);
- PushMessage(NetMsgType::VERSION, PROTOCOL_VERSION, (uint64_t)nLocalServices, nTime, addrYou, addrMe,
- nLocalHostNonce, strSubVersion, nMyStartingHeight, ::fRelayTxes);
-}
-
-
-
-
-
void CConnman::ClearBanned()
{
{
@@ -2530,7 +2534,8 @@ CNode::CNode(NodeId idIn, ServiceFlags nLocalServicesIn, int nMyStartingHeightIn
filterInventoryKnown(50000, 0.000001),
nLocalHostNonce(nLocalHostNonceIn),
nLocalServices(nLocalServicesIn),
- nMyStartingHeight(nMyStartingHeightIn)
+ nMyStartingHeight(nMyStartingHeightIn),
+ nSendVersion(0)
{
nServices = NODE_NONE;
nServicesExpected = NODE_NONE;
@@ -2587,10 +2592,6 @@ CNode::CNode(NodeId idIn, ServiceFlags nLocalServicesIn, int nMyStartingHeightIn
LogPrint("net", "Added connection to %s peer=%d\n", addrName, id);
else
LogPrint("net", "Added connection peer=%d\n", id);
-
- // Be shy and don't send version until we hear
- if (hSocket != INVALID_SOCKET && !fInbound)
- PushVersion();
}
CNode::~CNode()
@@ -2696,6 +2697,52 @@ void CNode::EndMessage(const char* pszCommand) UNLOCK_FUNCTION(cs_vSend)
LEAVE_CRITICAL_SECTION(cs_vSend);
}
+CDataStream CConnman::BeginMessage(CNode* pnode, int nVersion, int flags, const std::string& sCommand)
+{
+ return {SER_NETWORK, (nVersion ? nVersion : pnode->GetSendVersion()) | flags, CMessageHeader(Params().MessageStart(), sCommand.c_str(), 0) };
+}
+
+void CConnman::EndMessage(CDataStream& strm)
+{
+ // Set the size
+ assert(strm.size () >= CMessageHeader::HEADER_SIZE);
+ unsigned int nSize = strm.size() - CMessageHeader::HEADER_SIZE;
+ WriteLE32((uint8_t*)&strm[CMessageHeader::MESSAGE_SIZE_OFFSET], nSize);
+ // Set the checksum
+ uint256 hash = Hash(strm.begin() + CMessageHeader::HEADER_SIZE, strm.end());
+ memcpy((char*)&strm[CMessageHeader::CHECKSUM_OFFSET], hash.begin(), CMessageHeader::CHECKSUM_SIZE);
+
+}
+
+void CConnman::PushMessage(CNode* pnode, CDataStream& strm, const std::string& sCommand)
+{
+ if(strm.empty())
+ return;
+
+ unsigned int nSize = strm.size() - CMessageHeader::HEADER_SIZE;
+ LogPrint("net", "sending %s (%d bytes) peer=%d\n", SanitizeString(sCommand.c_str()), nSize, pnode->id);
+
+ size_t nBytesSent = 0;
+ {
+ LOCK(pnode->cs_vSend);
+ if(pnode->hSocket == INVALID_SOCKET) {
+ return;
+ }
+ bool optimisticSend(pnode->vSendMsg.empty());
+ pnode->vSendMsg.emplace_back(strm.begin(), strm.end());
+
+ //log total amount of bytes per command
+ pnode->mapSendBytesPerMsgCmd[sCommand] += strm.size();
+ pnode->nSendSize += strm.size();
+
+ // If write queue empty, attempt "optimistic write"
+ if (optimisticSend == true)
+ nBytesSent = SocketSendData(pnode);
+ }
+ if (nBytesSent)
+ RecordBytesSent(nBytesSent);
+}
+
bool CConnman::ForNode(NodeId id, std::function<bool(CNode* pnode)> func)
{
CNode* found = nullptr;