aboutsummaryrefslogtreecommitdiff
path: root/net.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'net.cpp')
-rw-r--r--net.cpp342
1 files changed, 220 insertions, 122 deletions
diff --git a/net.cpp b/net.cpp
index 0569604bab..71295d5dee 100644
--- a/net.cpp
+++ b/net.cpp
@@ -13,7 +13,6 @@ bool OpenNetworkConnection(const CAddress& addrConnect);
-
//
// Global state variables
//
@@ -25,6 +24,7 @@ uint64 nLocalHostNonce = 0;
bool fShutdown = false;
array<int, 10> vnThreadsRunning;
SOCKET hListenSocket = INVALID_SOCKET;
+int64 nThreadSocketHandlerHeartbeat = INT64_MAX;
vector<CNode*> vNodes;
CCriticalSection cs_vNodes;
@@ -65,7 +65,7 @@ bool ConnectSocket(const CAddress& addrConnect, SOCKET& hSocketRet)
if (fProxy)
{
- printf("Proxy connecting %s\n", addrConnect.ToStringLog().c_str());
+ printf("proxy connecting %s\n", addrConnect.ToStringLog().c_str());
char pszSocks4IP[] = "\4\1\0\0\0\0\0\0user";
memcpy(pszSocks4IP + 2, &addrConnect.port, 2);
memcpy(pszSocks4IP + 4, &addrConnect.ip, 4);
@@ -87,9 +87,11 @@ bool ConnectSocket(const CAddress& addrConnect, SOCKET& hSocketRet)
if (pchRet[1] != 0x5a)
{
closesocket(hSocket);
- return error("Proxy returned error %d", pchRet[1]);
+ if (pchRet[1] != 0x5b)
+ printf("ERROR: Proxy returned error %d\n", pchRet[1]);
+ return false;
}
- printf("Proxy connection established %s\n", addrConnect.ToStringLog().c_str());
+ printf("proxy connected %s\n", addrConnect.ToStringLog().c_str());
}
hSocketRet = hSocket;
@@ -219,6 +221,7 @@ bool AddAddress(CAddrDB& addrdb, CAddress addr, bool fCurrentlyOnline)
if (it == mapAddresses.end())
{
// New address
+ printf("AddAddress(%s)\n", addr.ToStringLog().c_str());
mapAddresses.insert(make_pair(addr.GetKey(), addr));
addrdb.WriteAddress(addr);
return true;
@@ -256,7 +259,7 @@ void AddressCurrentlyConnected(const CAddress& addr)
if (it != mapAddresses.end())
{
CAddress& addrFound = (*it).second;
- int64 nUpdateInterval = 60 * 60;
+ int64 nUpdateInterval = 20 * 60;
if (addrFound.nTime < GetAdjustedTime() - nUpdateInterval)
{
// Periodically update most recently seen time
@@ -417,7 +420,13 @@ CNode* ConnectNode(CAddress addrConnect, int64 nTimeout)
}
/// debug print
- printf("trying connection %s\n", addrConnect.ToStringLog().c_str());
+ printf("trying connection %s lastseen=%.1fhrs lasttry=%.1fhrs\n",
+ addrConnect.ToStringLog().c_str(),
+ (double)(addrConnect.nTime - GetAdjustedTime())/3600.0,
+ (double)(addrConnect.nLastTry - GetAdjustedTime())/3600.0);
+
+ CRITICAL_BLOCK(cs_mapAddresses)
+ mapAddresses[addrConnect.GetKey()].nLastTry = GetAdjustedTime();
// Connect
SOCKET hSocket;
@@ -428,7 +437,7 @@ CNode* ConnectNode(CAddress addrConnect, int64 nTimeout)
// Set to nonblocking
#ifdef __WXMSW__
- u_long nOne = 1;
+ u_long nOne = 1;
if (ioctlsocket(hSocket, FIONBIO, &nOne) == SOCKET_ERROR)
printf("ConnectSocket() : ioctlsocket nonblocking setting failed, error %d\n", WSAGetLastError());
#else
@@ -445,29 +454,23 @@ CNode* ConnectNode(CAddress addrConnect, int64 nTimeout)
CRITICAL_BLOCK(cs_vNodes)
vNodes.push_back(pnode);
- CRITICAL_BLOCK(cs_mapAddresses)
- mapAddresses[addrConnect.GetKey()].nLastFailed = 0;
+ pnode->nTimeConnected = GetTime();
return pnode;
}
else
{
- CRITICAL_BLOCK(cs_mapAddresses)
- mapAddresses[addrConnect.GetKey()].nLastFailed = GetAdjustedTime();
return NULL;
}
}
void CNode::DoDisconnect()
{
+ if (fDebug)
+ printf("%s ", DateTimeStrFormat("%x %H:%M:%S", GetTime()).c_str());
printf("disconnecting node %s\n", addr.ToStringLog().c_str());
closesocket(hSocket);
- // If outbound and never got version message, mark address as failed
- if (!fInbound && !fSuccessfullyConnected)
- CRITICAL_BLOCK(cs_mapAddresses)
- mapAddresses[addr.GetKey()].nLastFailed = GetAdjustedTime();
-
// All of a nodes broadcasts and subscriptions are automatically torn down
// when it goes down, so a node has to stay up to keep its broadcast going.
@@ -508,7 +511,7 @@ void ThreadSocketHandler(void* parg)
PrintException(&e, "ThreadSocketHandler()");
} catch (...) {
vnThreadsRunning[0]--;
- PrintException(NULL, "ThreadSocketHandler()");
+ throw; // support pthread_cancel()
}
printf("ThreadSocketHandler exiting\n");
@@ -531,15 +534,18 @@ void ThreadSocketHandler2(void* parg)
vector<CNode*> vNodesCopy = vNodes;
foreach(CNode* pnode, vNodesCopy)
{
- if (pnode->ReadyToDisconnect() && pnode->vRecv.empty() && pnode->vSend.empty())
+ if (pnode->fDisconnect ||
+ (pnode->GetRefCount() <= 0 && pnode->vRecv.empty() && pnode->vSend.empty()))
{
// remove from vNodes
vNodes.erase(remove(vNodes.begin(), vNodes.end(), pnode), vNodes.end());
+
+ // close socket
pnode->DoDisconnect();
// hold in disconnected pool until all refs are released
pnode->nReleaseTime = max(pnode->nReleaseTime, GetTime() + 5 * 60);
- if (pnode->fNetworkNode)
+ if (pnode->fNetworkNode || pnode->fInbound)
pnode->Release();
vNodesDisconnected.push_back(pnode);
}
@@ -582,8 +588,10 @@ void ThreadSocketHandler2(void* parg)
fd_set fdsetRecv;
fd_set fdsetSend;
+ fd_set fdsetError;
FD_ZERO(&fdsetRecv);
FD_ZERO(&fdsetSend);
+ FD_ZERO(&fdsetError);
SOCKET hSocketMax = 0;
FD_SET(hListenSocket, &fdsetRecv);
hSocketMax = max(hSocketMax, hListenSocket);
@@ -592,6 +600,7 @@ void ThreadSocketHandler2(void* parg)
foreach(CNode* pnode, vNodes)
{
FD_SET(pnode->hSocket, &fdsetRecv);
+ FD_SET(pnode->hSocket, &fdsetError);
hSocketMax = max(hSocketMax, pnode->hSocket);
TRY_CRITICAL_BLOCK(pnode->cs_vSend)
if (!pnode->vSend.empty())
@@ -600,30 +609,21 @@ void ThreadSocketHandler2(void* parg)
}
vnThreadsRunning[0]--;
- int nSelect = select(hSocketMax + 1, &fdsetRecv, &fdsetSend, NULL, &timeout);
+ int nSelect = select(hSocketMax + 1, &fdsetRecv, &fdsetSend, &fdsetError, &timeout);
vnThreadsRunning[0]++;
if (fShutdown)
return;
if (nSelect == SOCKET_ERROR)
{
int nErr = WSAGetLastError();
- printf("select failed: %d\n", nErr);
+ printf("socket select error %d\n", nErr);
for (int i = 0; i <= hSocketMax; i++)
- {
FD_SET(i, &fdsetRecv);
- FD_SET(i, &fdsetSend);
- }
+ FD_ZERO(&fdsetSend);
+ FD_ZERO(&fdsetError);
Sleep(timeout.tv_usec/1000);
}
- //// debug print
- //foreach(CNode* pnode, vNodes)
- //{
- // printf("vRecv = %-5d ", pnode->vRecv.size());
- // printf("vSend = %-5d ", pnode->vSend.size());
- //}
- //printf("\n");
-
//
// Accept new connections
@@ -641,7 +641,7 @@ void ThreadSocketHandler2(void* parg)
if (hSocket == INVALID_SOCKET)
{
if (WSAGetLastError() != WSAEWOULDBLOCK)
- printf("ERROR ThreadSocketHandler accept failed: %d\n", WSAGetLastError());
+ printf("socket error accept failed: %d\n", WSAGetLastError());
}
else
{
@@ -669,7 +669,7 @@ void ThreadSocketHandler2(void* parg)
//
// Receive
//
- if (FD_ISSET(hSocket, &fdsetRecv))
+ if (FD_ISSET(hSocket, &fdsetRecv) || FD_ISSET(hSocket, &fdsetError))
{
TRY_CRITICAL_BLOCK(pnode->cs_vRecv)
{
@@ -677,25 +677,29 @@ void ThreadSocketHandler2(void* parg)
unsigned int nPos = vRecv.size();
// typical socket buffer is 8K-64K
- const unsigned int nBufSize = 0x10000;
- vRecv.resize(nPos + nBufSize);
- int nBytes = recv(hSocket, &vRecv[nPos], nBufSize, 0);
- vRecv.resize(nPos + max(nBytes, 0));
- if (nBytes == 0)
+ char pchBuf[0x10000];
+ int nBytes = recv(hSocket, pchBuf, sizeof(pchBuf), MSG_DONTWAIT);
+ if (nBytes > 0)
+ {
+ vRecv.resize(nPos + nBytes);
+ memcpy(&vRecv[nPos], pchBuf, nBytes);
+ pnode->nLastRecv = GetTime();
+ }
+ else if (nBytes == 0)
{
// socket closed gracefully
if (!pnode->fDisconnect)
- printf("recv: socket closed\n");
+ printf("socket closed\n");
pnode->fDisconnect = true;
}
else if (nBytes < 0)
{
- // socket error
+ // error
int nErr = WSAGetLastError();
if (nErr != WSAEWOULDBLOCK && nErr != WSAEMSGSIZE && nErr != WSAEINTR && nErr != WSAEINPROGRESS)
{
if (!pnode->fDisconnect)
- printf("recv failed: %d\n", nErr);
+ printf("socket recv error %d\n", nErr);
pnode->fDisconnect = true;
}
}
@@ -712,28 +716,63 @@ void ThreadSocketHandler2(void* parg)
CDataStream& vSend = pnode->vSend;
if (!vSend.empty())
{
- int nBytes = send(hSocket, &vSend[0], vSend.size(), MSG_NOSIGNAL);
+ int nBytes = send(hSocket, &vSend[0], vSend.size(), MSG_NOSIGNAL | MSG_DONTWAIT);
if (nBytes > 0)
{
vSend.erase(vSend.begin(), vSend.begin() + nBytes);
+ pnode->nLastSend = GetTime();
}
- else if (nBytes == 0)
- {
- if (pnode->ReadyToDisconnect())
- pnode->vSend.clear();
- }
- else
+ else if (nBytes < 0)
{
- printf("send error %d\n", nBytes);
- if (pnode->ReadyToDisconnect())
- pnode->vSend.clear();
+ // error
+ int nErr = WSAGetLastError();
+ if (nErr != WSAEWOULDBLOCK && nErr != WSAEMSGSIZE && nErr != WSAEINTR && nErr != WSAEINPROGRESS)
+ {
+ printf("socket send error %d\n", nErr);
+ pnode->fDisconnect = true;
+ }
}
}
}
}
+
+ //
+ // Inactivity checking
+ //
+ if (pnode->vSend.empty())
+ pnode->nLastSendEmpty = GetTime();
+ if (GetTime() - pnode->nTimeConnected > 60)
+ {
+ if (pnode->nLastRecv == 0 || pnode->nLastSend == 0)
+ {
+ printf("socket no message in first 60 seconds, %d %d\n", pnode->nLastRecv != 0, pnode->nLastSend != 0);
+ pnode->fDisconnect = true;
+ }
+ else if (GetTime() - pnode->nLastSend > 10 * 60 && GetTime() - pnode->nLastSendEmpty > 10 * 60)
+ {
+ printf("socket not sending\n");
+ pnode->fDisconnect = true;
+ }
+ else if (GetTime() - pnode->nLastRecv > (pnode->nVersion >= 107 ? 15*60 : 90*60))
+ {
+ printf("socket inactivity timeout\n");
+ pnode->fDisconnect = true;
+ }
+ }
+ }
+
+
+ //// debug heartbeat
+ static int64 nHeartbeat1;
+ if (GetTime() - nHeartbeat1 >= 5 * 60)
+ {
+ printf("%s sendrecv\n", DateTimeStrFormat("%x %H:%M:%S", GetTime()).c_str());
+ nHeartbeat1 = GetTime();
+ fDebug = true;
}
+ nThreadSocketHandlerHeartbeat = GetTime();
Sleep(10);
}
}
@@ -772,15 +811,20 @@ void ThreadOpenConnections2(void* parg)
{
printf("ThreadOpenConnections started\n");
- // Connect to one specified address
+ // Connect to specific addresses
while (mapArgs.count("-connect"))
{
- OpenNetworkConnection(CAddress(mapArgs["-connect"]));
- for (int i = 0; i < 10; i++)
+ foreach(string strAddr, mapMultiArgs["-connect"])
{
- Sleep(1000);
- if (fShutdown)
- return;
+ CAddress addr(strAddr, NODE_NETWORK);
+ if (addr.IsValid())
+ OpenNetworkConnection(addr);
+ for (int i = 0; i < 10; i++)
+ {
+ Sleep(1000);
+ if (fShutdown)
+ return;
+ }
}
}
@@ -821,12 +865,7 @@ void ThreadOpenConnections2(void* parg)
// Choose an address to connect to based on most recently seen
//
CAddress addrConnect;
- int64 nBestTime = 0;
- int64 nDelay = ((60 * 60) << vNodes.size());
- if (vNodes.size() >= 3)
- nDelay *= 4;
- if (nGotIRCAddresses > 0)
- nDelay *= 100;
+ int64 nBest = INT64_MIN;
// Do this here so we don't have to critsect vNodes inside mapAddresses critsect
set<unsigned int> setConnected;
@@ -841,24 +880,51 @@ void ThreadOpenConnections2(void* parg)
const CAddress& addr = item.second;
if (!addr.IsIPv4() || !addr.IsValid() || setConnected.count(addr.ip))
continue;
+ int64 nSinceLastSeen = GetAdjustedTime() - addr.nTime;
+ int64 nSinceLastTry = GetAdjustedTime() - addr.nLastTry;
// Randomize the order in a deterministic way, putting the standard port first
- int64 nRandomizer = (uint64)(addr.nLastFailed * 9567851 + addr.ip * 7789) % (1 * 60 * 60);
+ int64 nRandomizer = (uint64)(addr.nLastTry * 9567851 + addr.ip * 7789) % (30 * 60);
if (addr.port != DEFAULT_PORT)
- nRandomizer += 1 * 60 * 60;
+ nRandomizer += 30 * 60;
+
+ // Last seen Base retry frequency
+ // <1 hour 10 min
+ // 1 hour 1 hour
+ // 4 hours 2 hours
+ // 24 hours 5 hours
+ // 48 hours 7 hours
+ // 7 days 13 hours
+ // 30 days 27 hours
+ // 90 days 46 hours
+ // 365 days 93 hours
+ int64 nDelay = 3600.0 * sqrt(fabs(nSinceLastSeen) / 3600.0) + nRandomizer;
+
+ // Fast reconnect for one hour after last seen
+ if (nSinceLastSeen < 60 * 60)
+ nDelay = 10 * 60;
// Limit retry frequency
- if (GetAdjustedTime() < addr.nLastFailed + nDelay + nRandomizer)
+ if (nSinceLastTry < nDelay)
continue;
- // Try again only after all addresses had a first attempt
- int64 nTime = addr.nTime - nRandomizer;
- if (addr.nLastFailed > addr.nTime)
- nTime -= 365 * 24 * 60 * 60;
+ // If we have IRC, we'll be notified when they first come online,
+ // and again every 24 hours by the refresh broadcast.
+ if (nGotIRCAddresses > 0 && vNodes.size() >= 2 && nSinceLastSeen > 24 * 60 * 60)
+ continue;
- if (nTime > nBestTime)
+ // Only try the old stuff if we don't have enough connections
+ if (vNodes.size() >= 2 && nSinceLastSeen > 7 * 24 * 60 * 60)
+ continue;
+ if (vNodes.size() >= 4 && nSinceLastSeen > 24 * 60 * 60)
+ continue;
+
+ // If multiple addresses are ready, prioritize by time since
+ // last seen and time since last tried.
+ int64 nScore = min(nSinceLastTry, (int64)24 * 60 * 60) - nSinceLastSeen - nRandomizer;
+ if (nScore > nBest)
{
- nBestTime = nTime;
+ nBest = nScore;
addrConnect = addr;
}
}
@@ -941,7 +1007,7 @@ void ThreadMessageHandler(void* parg)
void ThreadMessageHandler2(void* parg)
{
printf("ThreadMessageHandler started\n");
- SetThreadPriority(GetCurrentThread(), THREAD_PRIORITY_BELOW_NORMAL);
+ SetThreadPriority(THREAD_PRIORITY_BELOW_NORMAL);
loop
{
// Poll the connected nodes for messages
@@ -1063,39 +1129,31 @@ bool BindListenPort(string& strError)
return true;
}
-bool StartNode(string& strError)
+void StartNode(void* parg)
{
- strError = "";
if (pnodeLocalHost == NULL)
pnodeLocalHost = new CNode(INVALID_SOCKET, CAddress("127.0.0.1", nLocalServices));
#ifdef __WXMSW__
// Get local host ip
- char pszHostName[255];
- if (gethostname(pszHostName, sizeof(pszHostName)) == SOCKET_ERROR)
+ char pszHostName[1000] = "";
+ if (gethostname(pszHostName, sizeof(pszHostName)) != SOCKET_ERROR)
{
- strError = strprintf("Error: Unable to get IP address of this computer (gethostname returned error %d)", WSAGetLastError());
- printf("%s\n", strError.c_str());
- return false;
- }
- struct hostent* phostent = gethostbyname(pszHostName);
- if (!phostent)
- {
- strError = strprintf("Error: Unable to get IP address of this computer (gethostbyname returned error %d)", WSAGetLastError());
- printf("%s\n", strError.c_str());
- return false;
- }
-
- // Take the first IP that isn't loopback 127.x.x.x
- for (int i = 0; phostent->h_addr_list[i] != NULL; i++)
- printf("host ip %d: %s\n", i, CAddress(*(unsigned int*)phostent->h_addr_list[i]).ToStringIP().c_str());
- for (int i = 0; phostent->h_addr_list[i] != NULL; i++)
- {
- CAddress addr(*(unsigned int*)phostent->h_addr_list[i], DEFAULT_PORT, nLocalServices);
- if (addr.IsValid() && addr.GetByte(3) != 127)
+ struct hostent* phostent = gethostbyname(pszHostName);
+ if (phostent)
{
- addrLocalHost = addr;
- break;
+ // Take the first IP that isn't loopback 127.x.x.x
+ for (int i = 0; phostent->h_addr_list[i] != NULL; i++)
+ printf("host ip %d: %s\n", i, CAddress(*(unsigned int*)phostent->h_addr_list[i]).ToStringIP().c_str());
+ for (int i = 0; phostent->h_addr_list[i] != NULL; i++)
+ {
+ CAddress addr(*(unsigned int*)phostent->h_addr_list[i], DEFAULT_PORT, nLocalServices);
+ if (addr.IsValid() && addr.GetByte(3) != 127)
+ {
+ addrLocalHost = addr;
+ break;
+ }
+ }
}
}
#else
@@ -1145,45 +1203,85 @@ bool StartNode(string& strError)
}
else
{
- if (addrIncoming.ip)
+ if (addrIncoming.IsValid())
addrLocalHost.ip = addrIncoming.ip;
if (GetMyExternalIP(addrLocalHost.ip))
{
addrIncoming = addrLocalHost;
CWalletDB().WriteSetting("addrIncoming", addrIncoming);
+ printf("addrLocalHost = %s\n", addrLocalHost.ToString().c_str());
}
}
- // Get addresses from IRC and advertise ours
- if (_beginthread(ThreadIRCSeed, 0, NULL) == -1)
- printf("Error: _beginthread(ThreadIRCSeed) failed\n");
-
//
// Start threads
//
- if (_beginthread(ThreadSocketHandler, 0, NULL) == -1)
- {
- strError = "Error: _beginthread(ThreadSocketHandler) failed";
- printf("%s\n", strError.c_str());
- return false;
- }
- if (_beginthread(ThreadOpenConnections, 0, NULL) == -1)
- {
- strError = "Error: _beginthread(ThreadOpenConnections) failed";
- printf("%s\n", strError.c_str());
- return false;
- }
+ // Get addresses from IRC and advertise ours
+ if (!CreateThread(ThreadIRCSeed, NULL))
+ printf("Error: CreateThread(ThreadIRCSeed) failed\n");
+
+ // Send and receive from sockets, accept connections
+ pthread_t hThreadSocketHandler = CreateThread(ThreadSocketHandler, NULL, true);
+
+ // Initiate outbound connections
+ if (!CreateThread(ThreadOpenConnections, NULL))
+ printf("Error: CreateThread(ThreadOpenConnections) failed\n");
+
+ // Process messages
+ if (!CreateThread(ThreadMessageHandler, NULL))
+ printf("Error: CreateThread(ThreadMessageHandler) failed\n");
- if (_beginthread(ThreadMessageHandler, 0, NULL) == -1)
+ // Generate coins in the background
+ GenerateBitcoins(fGenerateBitcoins);
+
+ //
+ // Thread monitoring
+ //
+ loop
{
- strError = "Error: _beginthread(ThreadMessageHandler) failed";
- printf("%s\n", strError.c_str());
- return false;
+ Sleep(15000);
+ if (GetTime() - nThreadSocketHandlerHeartbeat > 4 * 60)
+ {
+ // First see if closing sockets will free it
+ printf("*** ThreadSocketHandler is stopped ***\n");
+ CRITICAL_BLOCK(cs_vNodes)
+ {
+ foreach(CNode* pnode, vNodes)
+ {
+ bool fGot = false;
+ TRY_CRITICAL_BLOCK(pnode->cs_vRecv)
+ TRY_CRITICAL_BLOCK(pnode->cs_vSend)
+ fGot = true;
+ if (!fGot)
+ {
+ printf("*** closing socket\n");
+ closesocket(pnode->hSocket);
+ pnode->fDisconnect = true;
+ }
+ }
+ }
+ Sleep(10000);
+ if (GetTime() - nThreadSocketHandlerHeartbeat < 60)
+ continue;
+
+ // Hopefully it never comes to this.
+ // We know it'll always be hung in the recv or send call.
+ // cs_vRecv or cs_vSend may be left permanently unreleased,
+ // but we always only use TRY_CRITICAL_SECTION on them.
+ printf("*** Restarting ThreadSocketHandler ***\n");
+ TerminateThread(hThreadSocketHandler, 0);
+ #ifdef __WXMSW__
+ CloseHandle(hThreadSocketHandler);
+ #endif
+ vnThreadsRunning[0] = 0;
+
+ // Restart
+ hThreadSocketHandler = CreateThread(ThreadSocketHandler, NULL, true);
+ nThreadSocketHandlerHeartbeat = GetTime();
+ }
}
-
- return true;
}
bool StopNode()