From 21eb5adadbe3110a8708f2570185566e1f137a49 Mon Sep 17 00:00:00 2001 From: Gavin Andresen Date: Wed, 6 Mar 2013 22:31:26 -0500 Subject: Port Thread* methods to boost::thread_group --- src/bitcoinrpc.cpp | 136 ++++++------------ src/bitcoinrpc.h | 3 +- src/checkqueue.h | 18 --- src/init.cpp | 73 +++------- src/main.cpp | 12 +- src/main.h | 6 +- src/net.cpp | 354 ++++++++-------------------------------------- src/net.h | 15 +- src/qt/optionsmodel.cpp | 5 +- src/test/test_bitcoin.cpp | 6 +- 10 files changed, 134 insertions(+), 494 deletions(-) (limited to 'src') diff --git a/src/bitcoinrpc.cpp b/src/bitcoinrpc.cpp index c7219920ca..53b3635753 100644 --- a/src/bitcoinrpc.cpp +++ b/src/bitcoinrpc.cpp @@ -30,13 +30,12 @@ using namespace boost; using namespace boost::asio; using namespace json_spirit; -void ThreadRPCServer2(void* parg); - static std::string strRPCUserColonPass; -const Object emptyobj; - -void ThreadRPCServer3(void* parg); +// These are created by StartRPCThreads, destroyed in StopRPCThreads +static asio::io_service* rpc_io_service = NULL; +static ssl::context* rpc_ssl_context = NULL; +static boost::thread_group* rpc_worker_group = NULL; static inline unsigned short GetDefaultRPCPort() { @@ -650,26 +649,7 @@ private: iostreams::stream< SSLIOStreamDevice > _stream; }; -void ThreadRPCServer(void* parg) -{ - // Make this thread recognisable as the RPC listener - RenameThread("bitcoin-rpclist"); - - try - { - vnThreadsRunning[THREAD_RPCLISTENER]++; - ThreadRPCServer2(parg); - vnThreadsRunning[THREAD_RPCLISTENER]--; - } - catch (std::exception& e) { - vnThreadsRunning[THREAD_RPCLISTENER]--; - PrintException(&e, "ThreadRPCServer()"); - } catch (...) { - vnThreadsRunning[THREAD_RPCLISTENER]--; - PrintException(NULL, "ThreadRPCServer()"); - } - printf("ThreadRPCServer exited\n"); -} +void ServiceConnection(AcceptedConnection *conn); // Forward declaration required for RPCListen template @@ -711,11 +691,8 @@ static void RPCAcceptHandler(boost::shared_ptr< basic_socket_acceptoris_open()) + if (error != asio::error::operation_aborted && acceptor->is_open()) RPCListen(acceptor, context, fUseSSL); AcceptedConnectionImpl* tcp_conn = dynamic_cast< AcceptedConnectionImpl* >(conn); @@ -729,28 +706,22 @@ static void RPCAcceptHandler(boost::shared_ptr< basic_socket_acceptorpeer.address())) + else if (tcp_conn && !ClientAllowed(tcp_conn->peer.address())) { // Only send a 403 if we're not using SSL to prevent a DoS during the SSL handshake. if (!fUseSSL) conn->stream() << HTTPReply(HTTP_FORBIDDEN, "", false) << std::flush; delete conn; } - - // start HTTP client thread - else if (!NewThread(ThreadRPCServer3, conn)) { - printf("Failed to create RPC server client thread\n"); + else { + ServiceConnection(conn); + conn->close(); delete conn; } - - vnThreadsRunning[THREAD_RPCLISTENER]--; } -void ThreadRPCServer2(void* parg) +void StartRPCThreads() { - printf("ThreadRPCServer started\n"); - strRPCUserColonPass = mapArgs["-rpcuser"] + ":" + mapArgs["-rpcpassword"]; if ((mapArgs["-rpcpassword"] == "") || (mapArgs["-rpcuser"] == mapArgs["-rpcpassword"])) @@ -781,27 +752,28 @@ void ThreadRPCServer2(void* parg) return; } - const bool fUseSSL = GetBoolArg("-rpcssl"); + assert(rpc_io_service == NULL); + rpc_io_service = new asio::io_service(); + rpc_ssl_context = new ssl::context(*rpc_io_service, ssl::context::sslv23); - asio::io_service io_service; + const bool fUseSSL = GetBoolArg("-rpcssl"); - ssl::context context(io_service, ssl::context::sslv23); if (fUseSSL) { - context.set_options(ssl::context::no_sslv2); + rpc_ssl_context->set_options(ssl::context::no_sslv2); filesystem::path pathCertFile(GetArg("-rpcsslcertificatechainfile", "server.cert")); if (!pathCertFile.is_complete()) pathCertFile = filesystem::path(GetDataDir()) / pathCertFile; - if (filesystem::exists(pathCertFile)) context.use_certificate_chain_file(pathCertFile.string()); + if (filesystem::exists(pathCertFile)) rpc_ssl_context->use_certificate_chain_file(pathCertFile.string()); else printf("ThreadRPCServer ERROR: missing server certificate file %s\n", pathCertFile.string().c_str()); filesystem::path pathPKFile(GetArg("-rpcsslprivatekeyfile", "server.pem")); if (!pathPKFile.is_complete()) pathPKFile = filesystem::path(GetDataDir()) / pathPKFile; - if (filesystem::exists(pathPKFile)) context.use_private_key_file(pathPKFile.string(), ssl::context::pem); + if (filesystem::exists(pathPKFile)) rpc_ssl_context->use_private_key_file(pathPKFile.string(), ssl::context::pem); else printf("ThreadRPCServer ERROR: missing server private key file %s\n", pathPKFile.string().c_str()); string strCiphers = GetArg("-rpcsslciphers", "TLSv1+HIGH:!SSLv2:!aNULL:!eNULL:!AH:!3DES:@STRENGTH"); - SSL_CTX_set_cipher_list(context.impl(), strCiphers.c_str()); + SSL_CTX_set_cipher_list(rpc_ssl_context->impl(), strCiphers.c_str()); } // Try a dual IPv6/IPv4 socket, falling back to separate IPv4 and IPv6 sockets @@ -809,9 +781,7 @@ void ThreadRPCServer2(void* parg) asio::ip::address bindAddress = loopback ? asio::ip::address_v6::loopback() : asio::ip::address_v6::any(); ip::tcp::endpoint endpoint(bindAddress, GetArg("-rpcport", GetDefaultRPCPort())); boost::system::error_code v6_only_error; - boost::shared_ptr acceptor(new ip::tcp::acceptor(io_service)); - - boost::signals2::signal StopRequests; + boost::shared_ptr acceptor(new ip::tcp::acceptor(*rpc_io_service)); bool fListening = false; std::string strerr; @@ -826,11 +796,7 @@ void ThreadRPCServer2(void* parg) acceptor->bind(endpoint); acceptor->listen(socket_base::max_connections); - RPCListen(acceptor, context, fUseSSL); - // Cancel outstanding listen-requests for this acceptor when shutting down - StopRequests.connect(signals2::slot( - static_cast(&ip::tcp::acceptor::close), acceptor.get()) - .track(acceptor)); + RPCListen(acceptor, *rpc_ssl_context, fUseSSL); fListening = true; } @@ -846,17 +812,13 @@ void ThreadRPCServer2(void* parg) bindAddress = loopback ? asio::ip::address_v4::loopback() : asio::ip::address_v4::any(); endpoint.address(bindAddress); - acceptor.reset(new ip::tcp::acceptor(io_service)); + acceptor.reset(new ip::tcp::acceptor(*rpc_io_service)); acceptor->open(endpoint.protocol()); acceptor->set_option(boost::asio::ip::tcp::acceptor::reuse_address(true)); acceptor->bind(endpoint); acceptor->listen(socket_base::max_connections); - RPCListen(acceptor, context, fUseSSL); - // Cancel outstanding listen-requests for this acceptor when shutting down - StopRequests.connect(signals2::slot( - static_cast(&ip::tcp::acceptor::close), acceptor.get()) - .track(acceptor)); + RPCListen(acceptor, *rpc_ssl_context, fUseSSL); fListening = true; } @@ -872,11 +834,20 @@ void ThreadRPCServer2(void* parg) return; } - vnThreadsRunning[THREAD_RPCLISTENER]--; - while (!fShutdown) - io_service.run_one(); - vnThreadsRunning[THREAD_RPCLISTENER]++; - StopRequests(); + rpc_worker_group = new boost::thread_group(); + for (int i = 0; i < GetArg("-rpcthreads", 4); i++) + rpc_worker_group->create_thread(boost::bind(&asio::io_service::run, rpc_io_service)); +} + +void StopRPCThreads() +{ + if (rpc_io_service == NULL) return; + + rpc_io_service->stop(); + rpc_worker_group->join_all(); + delete rpc_worker_group; rpc_worker_group = NULL; + delete rpc_ssl_context; rpc_ssl_context = NULL; + delete rpc_io_service; rpc_io_service = NULL; } class JSONRequest @@ -953,32 +924,11 @@ static string JSONRPCExecBatch(const Array& vReq) return write_string(Value(ret), false) + "\n"; } -static CCriticalSection cs_THREAD_RPCHANDLER; - -void ThreadRPCServer3(void* parg) +void ServiceConnection(AcceptedConnection *conn) { - // Make this thread recognisable as the RPC handler - RenameThread("bitcoin-rpchand"); - - { - LOCK(cs_THREAD_RPCHANDLER); - vnThreadsRunning[THREAD_RPCHANDLER]++; - } - AcceptedConnection *conn = (AcceptedConnection *) parg; - bool fRun = true; - loop { - if (fShutdown || !fRun) - { - conn->close(); - delete conn; - { - LOCK(cs_THREAD_RPCHANDLER); - --vnThreadsRunning[THREAD_RPCHANDLER]; - } - return; - } - + while (fRun) + { int nProto = 0; map mapHeaders; string strRequest, strMethod, strURI; @@ -1049,12 +999,6 @@ void ThreadRPCServer3(void* parg) break; } } - - delete conn; - { - LOCK(cs_THREAD_RPCHANDLER); - vnThreadsRunning[THREAD_RPCHANDLER]--; - } } json_spirit::Value CRPCTable::execute(const std::string &strMethod, const json_spirit::Array ¶ms) const diff --git a/src/bitcoinrpc.h b/src/bitcoinrpc.h index 6a3554aea6..315fd92383 100644 --- a/src/bitcoinrpc.h +++ b/src/bitcoinrpc.h @@ -67,7 +67,8 @@ enum RPCErrorCode json_spirit::Object JSONRPCError(int code, const std::string& message); -void ThreadRPCServer(void* parg); +void StartRPCThreads(); +void StopRPCThreads(); int CommandLineRPC(int argc, char *argv[]); /** Convert parameter values for RPC call from strings to command-specific JSON objects. */ diff --git a/src/checkqueue.h b/src/checkqueue.h index 12dde36fe7..eba424fbaa 100644 --- a/src/checkqueue.h +++ b/src/checkqueue.h @@ -33,9 +33,6 @@ private: // Master thread blocks on this when out of work boost::condition_variable condMaster; - // Quit method blocks on this until all workers are gone - boost::condition_variable condQuit; - // The queue of elements to be processed. // As the order of booleans doesn't matter, it is used as a LIFO (stack) std::vector queue; @@ -85,8 +82,6 @@ private: while (queue.empty()) { if ((fMaster || fQuit) && nTodo == 0) { nTotal--; - if (nTotal==0) - condQuit.notify_one(); bool fRet = fAllOk; // reset the status for new work later if (fMaster) @@ -151,20 +146,7 @@ public: condWorker.notify_all(); } - // Shut the queue down - void Quit() { - boost::unique_lock lock(mutex); - fQuit = true; - // No need to wake the master, as he will quit automatically when all jobs are - // done. - condWorker.notify_all(); - - while (nTotal > 0) - condQuit.wait(lock); - } - ~CCheckQueue() { - Quit(); } friend class CCheckQueueControl; diff --git a/src/init.cpp b/src/init.cpp index d61bfa8921..a2e85f2ef2 100644 --- a/src/init.cpp +++ b/src/init.cpp @@ -68,27 +68,16 @@ void Shutdown(void* parg) // Make this thread recognisable as the shutdown thread RenameThread("bitcoin-shutoff"); - - bool fFirstThread = false; - { - TRY_LOCK(cs_Shutdown, lockShutdown); - if (lockShutdown) - { - fFirstThread = !fTaken; - fTaken = true; - } - } - static bool fExit; - if (fFirstThread) + nTransactionsUpdated++; + StopRPCThreads(); + bitdb.Flush(false); + StopNode(); { fShutdown = true; fRequestShutdown = true; nTransactionsUpdated++; + StopRPCThreads(); bitdb.Flush(false); - { - LOCK(cs_main); - ThreadScriptCheckQuit(); - } StopNode(); { LOCK(cs_main); @@ -128,7 +117,7 @@ void Shutdown(void* parg) void DetectShutdownThread(boost::thread_group* threadGroup) { while (fRequestShutdown == false) - Sleep(200); + MilliSleep(200); threadGroup->interrupt_all(); } @@ -313,6 +302,7 @@ std::string HelpMessage() " -rpcport= " + _("Listen for JSON-RPC connections on (default: 8332 or testnet: 18332)") + "\n" + " -rpcallowip= " + _("Allow JSON-RPC connections from specified IP address") + "\n" + " -rpcconnect= " + _("Send commands to node running on (default: 127.0.0.1)") + "\n" + + " -rpcthreads= " + _("Use this mean threads to service RPC calls (default: 4)") + "\n" + " -blocknotify= " + _("Execute command when the best block changes (%s in cmd is replaced by block hash)") + "\n" + " -walletnotify= " + _("Execute command when a wallet transaction changes (%s in cmd is replaced by TxID)") + "\n" + " -alertnotify= " + _("Execute command when a relevant alert is received (%s in cmd is replaced by message)") + "\n" + @@ -354,22 +344,16 @@ struct CImportingNow } }; -struct CImportData { - std::vector vFiles; -}; - -void ThreadImport(void *data) { - CImportData *import = reinterpret_cast(data); +void ThreadImport(std::vector vImportFiles) +{ RenameThread("bitcoin-loadblk"); - vnThreadsRunning[THREAD_IMPORT]++; - // -reindex if (fReindex) { CImportingNow imp; int nFile = 0; - while (!fRequestShutdown) { + while (true) { CDiskBlockPos pos(nFile, 0); FILE *file = OpenBlockFile(pos, true); if (!file) @@ -378,18 +362,16 @@ void ThreadImport(void *data) { LoadExternalBlockFile(file, &pos); nFile++; } - if (!fRequestShutdown) { - pblocktree->WriteReindexing(false); - fReindex = false; - printf("Reindexing finished\n"); - // To avoid ending up in a situation without genesis block, re-try initializing (no-op if reindexing worked): - InitBlockIndex(); - } + pblocktree->WriteReindexing(false); + fReindex = false; + printf("Reindexing finished\n"); + // To avoid ending up in a situation without genesis block, re-try initializing (no-op if reindexing worked): + InitBlockIndex(); } // hardcoded $DATADIR/bootstrap.dat filesystem::path pathBootstrap = GetDataDir() / "bootstrap.dat"; - if (filesystem::exists(pathBootstrap) && !fRequestShutdown) { + if (filesystem::exists(pathBootstrap)) { FILE *file = fopen(pathBootstrap.string().c_str(), "rb"); if (file) { CImportingNow imp; @@ -401,9 +383,7 @@ void ThreadImport(void *data) { } // -loadblock= - BOOST_FOREACH(boost::filesystem::path &path, import->vFiles) { - if (fRequestShutdown) - break; + BOOST_FOREACH(boost::filesystem::path &path, vImportFiles) { FILE *file = fopen(path.string().c_str(), "rb"); if (file) { CImportingNow imp; @@ -411,10 +391,6 @@ void ThreadImport(void *data) { LoadExternalBlockFile(file); } } - - delete import; - - vnThreadsRunning[THREAD_IMPORT]--; } /** Initialize bitcoin. @@ -615,7 +591,7 @@ bool AppInit2(boost::thread_group& threadGroup) if (nScriptCheckThreads) { printf("Using %u threads for script verification\n", nScriptCheckThreads); for (int i=0; i vImportFiles; if (mapArgs.count("-loadblock")) { BOOST_FOREACH(string strFile, mapMultiArgs["-loadblock"]) - pimport->vFiles.push_back(strFile); + vImportFiles.push_back(strFile); } - NewThread(ThreadImport, pimport); + threadGroup.create_thread(boost::bind(&ThreadImport, vImportFiles)); // ********************************************************* Step 10: load peers @@ -1038,11 +1011,11 @@ bool AppInit2(boost::thread_group& threadGroup) printf("mapWallet.size() = %"PRIszu"\n", pwalletMain->mapWallet.size()); printf("mapAddressBook.size() = %"PRIszu"\n", pwalletMain->mapAddressBook.size()); - if (!NewThread(StartNode, NULL)) + if (!NewThread(StartNode, (void*)&threadGroup)) InitError(_("Error: could not start node")); if (fServer) - NewThread(ThreadRPCServer, NULL); + StartRPCThreads(); // Generate coins in the background GenerateBitcoins(GetBoolArg("-gen", false), pwalletMain); diff --git a/src/main.cpp b/src/main.cpp index 53ec554611..39a291d078 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -1560,15 +1560,9 @@ bool FindUndoPos(CValidationState &state, int nFile, CDiskBlockPos &pos, unsigne static CCheckQueue scriptcheckqueue(128); -void ThreadScriptCheck(void*) { - vnThreadsRunning[THREAD_SCRIPTCHECK]++; +void ThreadScriptCheck() { RenameThread("bitcoin-scriptch"); scriptcheckqueue.Thread(); - vnThreadsRunning[THREAD_SCRIPTCHECK]--; -} - -void ThreadScriptCheckQuit() { - scriptcheckqueue.Quit(); } bool CBlock::ConnectBlock(CValidationState &state, CBlockIndex* pindex, CCoinsViewCache &view, bool fJustCheck) @@ -2867,7 +2861,9 @@ bool LoadExternalBlockFile(FILE* fileIn, CDiskBlockPos *dbp) } } uint64 nRewind = blkdat.GetPos(); - while (blkdat.good() && !blkdat.eof() && !fRequestShutdown) { + while (blkdat.good() && !blkdat.eof()) { + boost::this_thread::interruption_point(); + blkdat.SetPos(nRewind); nRewind++; // start one byte further next time, in case of failure blkdat.SetLimit(); // remove former limit diff --git a/src/main.h b/src/main.h index 43c3051e4c..c6bd77d3dc 100644 --- a/src/main.h +++ b/src/main.h @@ -150,12 +150,8 @@ CBlockIndex* FindBlockByHeight(int nHeight); bool ProcessMessages(CNode* pfrom); /** Send queued protocol messages to be sent to a give node */ bool SendMessages(CNode* pto, bool fSendTrickle); -/** Run the importer thread, which deals with reindexing, loading bootstrap.dat, and whatever is passed to -loadblock */ -void ThreadImport(void *parg); /** Run an instance of the script checking thread */ -void ThreadScriptCheck(void* parg); -/** Stop the script checking threads */ -void ThreadScriptCheckQuit(); +void ThreadScriptCheck(); /** Run the miner threads */ void GenerateBitcoins(bool fGenerate, CWallet* pwallet); /** Generate a new block, without valid proof-of-work */ diff --git a/src/net.cpp b/src/net.cpp index fe0dcd41b3..557bd6f878 100644 --- a/src/net.cpp +++ b/src/net.cpp @@ -26,14 +26,6 @@ using namespace boost; static const int MAX_OUTBOUND_CONNECTIONS = 8; -void ThreadMessageHandler2(void* parg); -void ThreadSocketHandler2(void* parg); -void ThreadOpenConnections2(void* parg); -void ThreadOpenAddedConnections2(void* parg); -#ifdef USE_UPNP -void ThreadMapPort2(void* parg); -#endif -void ThreadDNSAddressSeed2(void* parg); bool OpenNetworkConnection(const CAddress& addrConnect, CSemaphoreGrant *grantOutbound = NULL, const char *strDest = NULL, bool fOneShot = false); @@ -46,7 +38,6 @@ struct LocalServiceInfo { // Global state variables // bool fDiscover = true; -bool fUseUPnP = false; uint64 nLocalServices = NODE_NETWORK; static CCriticalSection cs_mapLocalHost; static map mapLocalHost; @@ -754,32 +745,10 @@ void SocketSendData(CNode *pnode) pnode->vSendMsg.erase(pnode->vSendMsg.begin(), it); } -void ThreadSocketHandler(void* parg) -{ - // Make this thread recognisable as the networking thread - RenameThread("bitcoin-net"); - - try - { - vnThreadsRunning[THREAD_SOCKETHANDLER]++; - ThreadSocketHandler2(parg); - vnThreadsRunning[THREAD_SOCKETHANDLER]--; - } - catch (std::exception& e) { - vnThreadsRunning[THREAD_SOCKETHANDLER]--; - PrintException(&e, "ThreadSocketHandler()"); - } catch (...) { - vnThreadsRunning[THREAD_SOCKETHANDLER]--; - throw; // support pthread_cancel() - } - printf("ThreadSocketHandler exited\n"); -} - static list vNodesDisconnected; -void ThreadSocketHandler2(void* parg) +void ThreadSocketHandler() { - printf("ThreadSocketHandler started\n"); unsigned int nPrevNodeCount = 0; loop { @@ -892,12 +861,10 @@ void ThreadSocketHandler2(void* parg) } } - vnThreadsRunning[THREAD_SOCKETHANDLER]--; int nSelect = select(have_fds ? hSocketMax + 1 : 0, &fdsetRecv, &fdsetSend, &fdsetError, &timeout); - vnThreadsRunning[THREAD_SOCKETHANDLER]++; - if (fShutdown) - return; + boost::this_thread::interruption_point(); + if (nSelect == SOCKET_ERROR) { if (have_fds) @@ -984,8 +951,7 @@ void ThreadSocketHandler2(void* parg) } BOOST_FOREACH(CNode* pnode, vNodesCopy) { - if (fShutdown) - return; + boost::this_thread::interruption_point(); // // Receive @@ -1089,31 +1055,8 @@ void ThreadSocketHandler2(void* parg) #ifdef USE_UPNP -void ThreadMapPort(void* parg) +void ThreadMapPort() { - // Make this thread recognisable as the UPnP thread - RenameThread("bitcoin-UPnP"); - - try - { - vnThreadsRunning[THREAD_UPNP]++; - ThreadMapPort2(parg); - vnThreadsRunning[THREAD_UPNP]--; - } - catch (std::exception& e) { - vnThreadsRunning[THREAD_UPNP]--; - PrintException(&e, "ThreadMapPort()"); - } catch (...) { - vnThreadsRunning[THREAD_UPNP]--; - PrintException(NULL, "ThreadMapPort()"); - } - printf("ThreadMapPort exited\n"); -} - -void ThreadMapPort2(void* parg) -{ - printf("ThreadMapPort started\n"); - std::string port = strprintf("%u", GetListenPort()); const char * multicastif = 0; const char * minissdpdpath = 0; @@ -1154,33 +1097,9 @@ void ThreadMapPort2(void* parg) } string strDesc = "Bitcoin " + FormatFullVersion(); -#ifndef UPNPDISCOVER_SUCCESS - /* miniupnpc 1.5 */ - r = UPNP_AddPortMapping(urls.controlURL, data.first.servicetype, - port.c_str(), port.c_str(), lanaddr, strDesc.c_str(), "TCP", 0); -#else - /* miniupnpc 1.6 */ - r = UPNP_AddPortMapping(urls.controlURL, data.first.servicetype, - port.c_str(), port.c_str(), lanaddr, strDesc.c_str(), "TCP", 0, "0"); -#endif - if(r!=UPNPCOMMAND_SUCCESS) - printf("AddPortMapping(%s, %s, %s) failed with code %d (%s)\n", - port.c_str(), port.c_str(), lanaddr, r, strupnperror(r)); - else - printf("UPnP Port Mapping successful.\n"); - int i = 1; - loop { - if (fShutdown || !fUseUPnP) - { - r = UPNP_DeletePortMapping(urls.controlURL, data.first.servicetype, port.c_str(), "TCP", 0); - printf("UPNP_DeletePortMapping() returned : %d\n", r); - freeUPNPDevlist(devlist); devlist = 0; - FreeUPNPUrls(&urls); - return; - } - if (i % 600 == 0) // Refresh every 20 minutes - { + try { + loop { #ifndef UPNPDISCOVER_SUCCESS /* miniupnpc 1.5 */ r = UPNP_AddPortMapping(urls.controlURL, data.first.servicetype, @@ -1196,33 +1115,49 @@ void ThreadMapPort2(void* parg) port.c_str(), port.c_str(), lanaddr, r, strupnperror(r)); else printf("UPnP Port Mapping successful.\n");; + + MilliSleep(20*60*1000); // Refresh every 20 minutes } - MilliSleep(2000); - i++; + } + catch (boost::thread_interrupted) + { + r = UPNP_DeletePortMapping(urls.controlURL, data.first.servicetype, port.c_str(), "TCP", 0); + printf("UPNP_DeletePortMapping() returned : %d\n", r); + freeUPNPDevlist(devlist); devlist = 0; + FreeUPNPUrls(&urls); + throw; } } else { printf("No valid UPnP IGDs found\n"); freeUPNPDevlist(devlist); devlist = 0; if (r != 0) FreeUPNPUrls(&urls); - loop { - if (fShutdown || !fUseUPnP) - return; - MilliSleep(2000); - } } } -void MapPort() +void MapPort(bool fUseUPnP) { - if (fUseUPnP && vnThreadsRunning[THREAD_UPNP] < 1) + static boost::thread* upnp_thread = NULL; + + if (fUseUPnP) { - if (!NewThread(ThreadMapPort, NULL)) - printf("Error: ThreadMapPort(ThreadMapPort) failed\n"); + if (upnp_thread) { + upnp_thread->interrupt(); + upnp_thread->join(); + delete upnp_thread; + } + upnp_thread = new boost::thread(boost::bind(&TraceThread >, "upnp", &ThreadMapPort)); + } + else if (upnp_thread) { + upnp_thread->interrupt(); + upnp_thread->join(); + delete upnp_thread; + upnp_thread = NULL; } } + #else -void MapPort() +void MapPort(bool) { // Intentionally left blank. } @@ -1254,32 +1189,10 @@ static const char *strTestNetDNSSeed[][2] = { {NULL, NULL} }; -void ThreadDNSAddressSeed(void* parg) -{ - // Make this thread recognisable as the DNS seeding thread - RenameThread("bitcoin-dnsseed"); - - try - { - vnThreadsRunning[THREAD_DNSSEED]++; - ThreadDNSAddressSeed2(parg); - vnThreadsRunning[THREAD_DNSSEED]--; - } - catch (std::exception& e) { - vnThreadsRunning[THREAD_DNSSEED]--; - PrintException(&e, "ThreadDNSAddressSeed()"); - } catch (...) { - vnThreadsRunning[THREAD_DNSSEED]--; - throw; // support pthread_cancel() - } - printf("ThreadDNSAddressSeed exited\n"); -} - -void ThreadDNSAddressSeed2(void* parg) +void ThreadDNSAddressSeed() { static const char *(*strDNSSeed)[2] = fTestNet ? strTestNetDNSSeed : strMainNetDNSSeed; - printf("ThreadDNSAddressSeed started\n"); int found = 0; printf("Loading addresses from DNS seeds (could take a while)\n"); @@ -1409,57 +1322,6 @@ void DumpAddresses() addrman.size(), GetTimeMillis() - nStart); } -void ThreadDumpAddress2(void* parg) -{ - printf("ThreadDumpAddress started\n"); - - vnThreadsRunning[THREAD_DUMPADDRESS]++; - while (!fShutdown) - { - DumpAddresses(); - vnThreadsRunning[THREAD_DUMPADDRESS]--; - MilliSleep(100000); - vnThreadsRunning[THREAD_DUMPADDRESS]++; - } - vnThreadsRunning[THREAD_DUMPADDRESS]--; -} - -void ThreadDumpAddress(void* parg) -{ - // Make this thread recognisable as the address dumping thread - RenameThread("bitcoin-adrdump"); - - try - { - ThreadDumpAddress2(parg); - } - catch (std::exception& e) { - PrintException(&e, "ThreadDumpAddress()"); - } - printf("ThreadDumpAddress exited\n"); -} - -void ThreadOpenConnections(void* parg) -{ - // Make this thread recognisable as the connection opening thread - RenameThread("bitcoin-opencon"); - - try - { - vnThreadsRunning[THREAD_OPENCONNECTIONS]++; - ThreadOpenConnections2(parg); - vnThreadsRunning[THREAD_OPENCONNECTIONS]--; - } - catch (std::exception& e) { - vnThreadsRunning[THREAD_OPENCONNECTIONS]--; - PrintException(&e, "ThreadOpenConnections()"); - } catch (...) { - vnThreadsRunning[THREAD_OPENCONNECTIONS]--; - PrintException(NULL, "ThreadOpenConnections()"); - } - printf("ThreadOpenConnections exited\n"); -} - void static ProcessOneShot() { string strDest; @@ -1478,10 +1340,8 @@ void static ProcessOneShot() } } -void ThreadOpenConnections2(void* parg) +void ThreadOpenConnections() { - printf("ThreadOpenConnections started\n"); - // Connect to specific addresses if (mapArgs.count("-connect") && mapMultiArgs["-connect"].size() > 0) { @@ -1495,8 +1355,6 @@ void ThreadOpenConnections2(void* parg) for (int i = 0; i < 10 && i < nLoop; i++) { MilliSleep(500); - if (fShutdown) - return; } } MilliSleep(500); @@ -1509,18 +1367,10 @@ void ThreadOpenConnections2(void* parg) { ProcessOneShot(); - vnThreadsRunning[THREAD_OPENCONNECTIONS]--; MilliSleep(500); - vnThreadsRunning[THREAD_OPENCONNECTIONS]++; - if (fShutdown) - return; - - vnThreadsRunning[THREAD_OPENCONNECTIONS]--; CSemaphoreGrant grant(*semOutbound); - vnThreadsRunning[THREAD_OPENCONNECTIONS]++; - if (fShutdown) - return; + boost::this_thread::interruption_point(); // Add seed nodes if IRC isn't working if (addrman.size()==0 && (GetTime() - nStart > 60) && !fTestNet) @@ -1600,38 +1450,15 @@ void ThreadOpenConnections2(void* parg) } } -void ThreadOpenAddedConnections(void* parg) +void ThreadOpenAddedConnections() { - // Make this thread recognisable as the connection opening thread - RenameThread("bitcoin-opencon"); - - try - { - vnThreadsRunning[THREAD_ADDEDCONNECTIONS]++; - ThreadOpenAddedConnections2(parg); - vnThreadsRunning[THREAD_ADDEDCONNECTIONS]--; - } - catch (std::exception& e) { - vnThreadsRunning[THREAD_ADDEDCONNECTIONS]--; - PrintException(&e, "ThreadOpenAddedConnections()"); - } catch (...) { - vnThreadsRunning[THREAD_ADDEDCONNECTIONS]--; - PrintException(NULL, "ThreadOpenAddedConnections()"); - } - printf("ThreadOpenAddedConnections exited\n"); -} - -void ThreadOpenAddedConnections2(void* parg) -{ - printf("ThreadOpenAddedConnections started\n"); - { LOCK(cs_vAddedNodes); vAddedNodes = mapMultiArgs["-addnode"]; } if (HaveNameProxy()) { - while(!fShutdown) { + while(true) { list lAddresses(0); { LOCK(cs_vAddedNodes); @@ -1643,14 +1470,9 @@ void ThreadOpenAddedConnections2(void* parg) CSemaphoreGrant grant(*semOutbound); OpenNetworkConnection(addr, &grant, strAddNode.c_str()); MilliSleep(500); - if (fShutdown) - return; } - vnThreadsRunning[THREAD_ADDEDCONNECTIONS]--; MilliSleep(120000); // Retry every 2 minutes - vnThreadsRunning[THREAD_ADDEDCONNECTIONS]++; } - return; } for (unsigned int i = 0; true; i++) @@ -1695,16 +1517,8 @@ void ThreadOpenAddedConnections2(void* parg) CSemaphoreGrant grant(*semOutbound); OpenNetworkConnection(CAddress(vserv[i % vserv.size()]), &grant); MilliSleep(500); - if (fShutdown) - return; } - if (fShutdown) - return; - vnThreadsRunning[THREAD_ADDEDCONNECTIONS]--; MilliSleep(120000); // Retry every 2 minutes - vnThreadsRunning[THREAD_ADDEDCONNECTIONS]++; - if (fShutdown) - return; } } @@ -1714,8 +1528,7 @@ bool OpenNetworkConnection(const CAddress& addrConnect, CSemaphoreGrant *grantOu // // Initiate outbound network connection // - if (fShutdown) - return false; + boost::this_thread::interruption_point(); if (!strDest) if (IsLocal(addrConnect) || FindNode((CNetAddr)addrConnect) || CNode::IsBanned(addrConnect) || @@ -1724,11 +1537,9 @@ bool OpenNetworkConnection(const CAddress& addrConnect, CSemaphoreGrant *grantOu if (strDest && FindNode(strDest)) return false; - vnThreadsRunning[THREAD_OPENCONNECTIONS]--; CNode* pnode = ConnectNode(addrConnect, strDest); - vnThreadsRunning[THREAD_OPENCONNECTIONS]++; - if (fShutdown) - return false; + boost::this_thread::interruption_point(); + if (!pnode) return false; if (grantOutbound) @@ -1746,33 +1557,10 @@ bool OpenNetworkConnection(const CAddress& addrConnect, CSemaphoreGrant *grantOu - -void ThreadMessageHandler(void* parg) -{ - // Make this thread recognisable as the message handling thread - RenameThread("bitcoin-msghand"); - - try - { - vnThreadsRunning[THREAD_MESSAGEHANDLER]++; - ThreadMessageHandler2(parg); - vnThreadsRunning[THREAD_MESSAGEHANDLER]--; - } - catch (std::exception& e) { - vnThreadsRunning[THREAD_MESSAGEHANDLER]--; - PrintException(&e, "ThreadMessageHandler()"); - } catch (...) { - vnThreadsRunning[THREAD_MESSAGEHANDLER]--; - PrintException(NULL, "ThreadMessageHandler()"); - } - printf("ThreadMessageHandler exited\n"); -} - -void ThreadMessageHandler2(void* parg) +void ThreadMessageHandler() { - printf("ThreadMessageHandler started\n"); SetThreadPriority(THREAD_PRIORITY_BELOW_NORMAL); - while (!fShutdown) + while (true) { vector vNodesCopy; { @@ -1798,8 +1586,7 @@ void ThreadMessageHandler2(void* parg) if (!ProcessMessages(pnode)) pnode->CloseSocketDisconnect(); } - if (fShutdown) - return; + boost::this_thread::interruption_point(); // Send messages { @@ -1807,8 +1594,7 @@ void ThreadMessageHandler2(void* parg) if (lockSend) SendMessages(pnode, pnode == pnodeTrickle); } - if (fShutdown) - return; + boost::this_thread::interruption_point(); } { @@ -1817,16 +1603,7 @@ void ThreadMessageHandler2(void* parg) pnode->Release(); } - // Wait and allow messages to bunch up. - // Reduce vnThreadsRunning so StopNode has permission to exit while - // we're sleeping, but we must always check fShutdown after doing this. - vnThreadsRunning[THREAD_MESSAGEHANDLER]--; MilliSleep(100); - if (fRequestShutdown) - StartShutdown(); - vnThreadsRunning[THREAD_MESSAGEHANDLER]++; - if (fShutdown) - return; } } @@ -2000,6 +1777,8 @@ void static Discover() void StartNode(void* parg) { + boost::thread_group* threadGroup = (boost::thread_group*)parg; + // Make this thread recognisable as the startup thread RenameThread("bitcoin-start"); @@ -2021,38 +1800,32 @@ void StartNode(void* parg) if (!GetBoolArg("-dnsseed", true)) printf("DNS seeding disabled\n"); else - if (!NewThread(ThreadDNSAddressSeed, NULL)) - printf("Error: NewThread(ThreadDNSAddressSeed) failed\n"); + threadGroup->create_thread(boost::bind(&TraceThread >, "dnsseed", &ThreadDNSAddressSeed)); // Map ports with UPnP - if (fUseUPnP) - MapPort(); + MapPort(GetBoolArg("-upnp", USE_UPNP)); // Send and receive from sockets, accept connections - if (!NewThread(ThreadSocketHandler, NULL)) - printf("Error: NewThread(ThreadSocketHandler) failed\n"); + threadGroup->create_thread(boost::bind(&TraceThread, "net", &ThreadSocketHandler)); // Initiate outbound connections from -addnode - if (!NewThread(ThreadOpenAddedConnections, NULL)) - printf("Error: NewThread(ThreadOpenAddedConnections) failed\n"); + threadGroup->create_thread(boost::bind(&TraceThread, "addcon", &ThreadOpenAddedConnections)); // Initiate outbound connections - if (!NewThread(ThreadOpenConnections, NULL)) - printf("Error: NewThread(ThreadOpenConnections) failed\n"); + threadGroup->create_thread(boost::bind(&TraceThread, "opencon", &ThreadOpenConnections)); // Process messages - if (!NewThread(ThreadMessageHandler, NULL)) - printf("Error: NewThread(ThreadMessageHandler) failed\n"); + threadGroup->create_thread(boost::bind(&TraceThread, "msghand", &ThreadMessageHandler)); // Dump network addresses - if (!NewThread(ThreadDumpAddress, NULL)) - printf("Error; NewThread(ThreadDumpAddress) failed\n"); + threadGroup->create_thread(boost::bind(&LoopForever, "dumpaddr", &DumpAddresses, 10000)); } bool StopNode() { printf("StopNode()\n"); GenerateBitcoins(false, NULL); + MapPort(false); fShutdown = true; nTransactionsUpdated++; int64 nStart = GetTime(); @@ -2070,19 +1843,6 @@ bool StopNode() break; MilliSleep(20); } while(true); - if (vnThreadsRunning[THREAD_SOCKETHANDLER] > 0) printf("ThreadSocketHandler still running\n"); - if (vnThreadsRunning[THREAD_OPENCONNECTIONS] > 0) printf("ThreadOpenConnections still running\n"); - if (vnThreadsRunning[THREAD_MESSAGEHANDLER] > 0) printf("ThreadMessageHandler still running\n"); - if (vnThreadsRunning[THREAD_RPCLISTENER] > 0) printf("ThreadRPCListener still running\n"); - if (vnThreadsRunning[THREAD_RPCHANDLER] > 0) printf("ThreadsRPCServer still running\n"); -#ifdef USE_UPNP - if (vnThreadsRunning[THREAD_UPNP] > 0) printf("ThreadMapPort still running\n"); -#endif - if (vnThreadsRunning[THREAD_DNSSEED] > 0) printf("ThreadDNSAddressSeed still running\n"); - if (vnThreadsRunning[THREAD_ADDEDCONNECTIONS] > 0) printf("ThreadOpenAddedConnections still running\n"); - if (vnThreadsRunning[THREAD_DUMPADDRESS] > 0) printf("ThreadDumpAddresses still running\n"); - while (vnThreadsRunning[THREAD_MESSAGEHANDLER] > 0 || vnThreadsRunning[THREAD_RPCHANDLER] > 0) - MilliSleep(20); MilliSleep(50); DumpAddresses(); diff --git a/src/net.h b/src/net.h index 66e528acbb..1a6f7d4de3 100644 --- a/src/net.h +++ b/src/net.h @@ -37,7 +37,7 @@ void AddressCurrentlyConnected(const CService& addr); CNode* FindNode(const CNetAddr& ip); CNode* FindNode(const CService& ip); CNode* ConnectNode(CAddress addrConnect, const char *strDest = NULL, int64 nTimeout=0); -void MapPort(); +void MapPort(bool fUseUPnP); unsigned short GetListenPort(); bool BindListenPort(const CService &bindAddr, std::string& strError=REF(std::string())); void StartNode(void* parg); @@ -72,23 +72,10 @@ CAddress GetLocalAddress(const CNetAddr *paddrPeer = NULL); /** Thread types */ enum threadId { - THREAD_SOCKETHANDLER, - THREAD_OPENCONNECTIONS, - THREAD_MESSAGEHANDLER, - THREAD_RPCLISTENER, - THREAD_UPNP, - THREAD_DNSSEED, - THREAD_ADDEDCONNECTIONS, - THREAD_DUMPADDRESS, - THREAD_RPCHANDLER, - THREAD_IMPORT, - THREAD_SCRIPTCHECK, - THREAD_MAX }; extern bool fDiscover; -extern bool fUseUPnP; extern uint64 nLocalServices; extern uint64 nLocalHostNonce; extern boost::array vnThreadsRunning; diff --git a/src/qt/optionsmodel.cpp b/src/qt/optionsmodel.cpp index cd3170a306..6b1b4e3d8e 100644 --- a/src/qt/optionsmodel.cpp +++ b/src/qt/optionsmodel.cpp @@ -219,9 +219,8 @@ bool OptionsModel::setData(const QModelIndex & index, const QVariant & value, in settings.setValue("fMinimizeToTray", fMinimizeToTray); break; case MapPortUPnP: - fUseUPnP = value.toBool(); - settings.setValue("fUseUPnP", fUseUPnP); - MapPort(); + settings.setValue("fUseUPnP", value.toBool()); + MapPort(value.toBool()); break; case MinimizeOnClose: fMinimizeOnClose = value.toBool(); diff --git a/src/test/test_bitcoin.cpp b/src/test/test_bitcoin.cpp index 1116507a34..1bf9a28acc 100644 --- a/src/test/test_bitcoin.cpp +++ b/src/test/test_bitcoin.cpp @@ -17,6 +17,7 @@ extern void noui_connect(); struct TestingSetup { CCoinsViewDB *pcoinsdbview; boost::filesystem::path pathTemp; + boost::thread_group threadGroup; TestingSetup() { fPrintToDebugger = true; // don't want to write to debug.log file @@ -35,11 +36,12 @@ struct TestingSetup { RegisterWallet(pwalletMain); nScriptCheckThreads = 3; for (int i=0; i < nScriptCheckThreads-1; i++) - NewThread(ThreadScriptCheck, NULL); + threadGroup.create_thread(&ThreadScriptCheck); } ~TestingSetup() { - ThreadScriptCheckQuit(); + threadGroup.interrupt_all(); + threadGroup.join_all(); delete pwalletMain; pwalletMain = NULL; delete pcoinsTip; -- cgit v1.2.3