From 955d4077aac621697246bcb20a854ba97e37c519 Mon Sep 17 00:00:00 2001 From: Russell Yanofsky Date: Thu, 23 Aug 2018 13:42:31 -0400 Subject: multiprocess: Add IPC connectAddress and listenAddress methods Allow listening on and connecting to unix sockets. --- src/ipc/capnp/protocol.cpp | 10 +++++ src/ipc/interfaces.cpp | 30 +++++++++++++++ src/ipc/process.cpp | 96 +++++++++++++++++++++++++++++++++++++++++++++- src/ipc/process.h | 10 +++++ src/ipc/protocol.h | 20 ++++++++++ 5 files changed, 165 insertions(+), 1 deletion(-) (limited to 'src/ipc') diff --git a/src/ipc/capnp/protocol.cpp b/src/ipc/capnp/protocol.cpp index 73276d6d90..9d18d62102 100644 --- a/src/ipc/capnp/protocol.cpp +++ b/src/ipc/capnp/protocol.cpp @@ -23,6 +23,8 @@ #include #include #include +#include +#include #include namespace ipc { @@ -51,6 +53,14 @@ public: startLoop(exe_name); return mp::ConnectStream(*m_loop, fd); } + void listen(int listen_fd, const char* exe_name, interfaces::Init& init) override + { + startLoop(exe_name); + if (::listen(listen_fd, /*backlog=*/5) != 0) { + throw std::system_error(errno, std::system_category()); + } + mp::ListenConnections(*m_loop, listen_fd, init); + } void serve(int fd, const char* exe_name, interfaces::Init& init) override { assert(!m_loop); diff --git a/src/ipc/interfaces.cpp b/src/ipc/interfaces.cpp index b409443f64..33555f05d4 100644 --- a/src/ipc/interfaces.cpp +++ b/src/ipc/interfaces.cpp @@ -2,6 +2,7 @@ // Distributed under the MIT software license, see the accompanying // file COPYING or http://www.opensource.org/licenses/mit-license.php. +#include #include #include #include @@ -56,6 +57,35 @@ public: exit_status = EXIT_SUCCESS; return true; } + std::unique_ptr connectAddress(std::string& address) override + { + if (address.empty() || address == "0") return nullptr; + int fd; + if (address == "auto") { + // Treat "auto" the same as "unix" except don't treat it an as error + // if the connection is not accepted. Just return null so the caller + // can work offline without a connection, or spawn a new + // bitcoin-node process and connect to it. + address = "unix"; + try { + fd = m_process->connect(gArgs.GetDataDirNet(), "bitcoin-node", address); + } catch (const std::system_error& e) { + // If connection type is auto and socket path isn't accepting connections, or doesn't exist, catch the error and return null; + if (e.code() == std::errc::connection_refused || e.code() == std::errc::no_such_file_or_directory) { + return nullptr; + } + throw; + } + } else { + fd = m_process->connect(gArgs.GetDataDirNet(), "bitcoin-node", address); + } + return m_protocol->connect(fd, m_exe_name); + } + void listenAddress(std::string& address) override + { + int fd = m_process->bind(gArgs.GetDataDirNet(), m_exe_name, address); + m_protocol->listen(fd, m_exe_name, m_init); + } void addCleanup(std::type_index type, void* iface, std::function cleanup) override { m_protocol->addCleanup(type, iface, std::move(cleanup)); diff --git a/src/ipc/process.cpp b/src/ipc/process.cpp index 9657dcd092..432c365d8f 100644 --- a/src/ipc/process.cpp +++ b/src/ipc/process.cpp @@ -4,22 +4,28 @@ #include #include +#include #include #include #include #include +#include #include #include +#include #include #include #include #include -#include +#include +#include #include #include #include +using util::RemovePrefixView; + namespace ipc { namespace { class ProcessImpl : public Process @@ -54,7 +60,95 @@ public: } return true; } + int connect(const fs::path& data_dir, + const std::string& dest_exe_name, + std::string& address) override; + int bind(const fs::path& data_dir, const std::string& exe_name, std::string& address) override; }; + +static bool ParseAddress(std::string& address, + const fs::path& data_dir, + const std::string& dest_exe_name, + struct sockaddr_un& addr, + std::string& error) +{ + if (address.compare(0, 4, "unix") == 0 && (address.size() == 4 || address[4] == ':')) { + fs::path path; + if (address.size() <= 5) { + path = data_dir / fs::PathFromString(strprintf("%s.sock", RemovePrefixView(dest_exe_name, "bitcoin-"))); + } else { + path = data_dir / fs::PathFromString(address.substr(5)); + } + std::string path_str = fs::PathToString(path); + address = strprintf("unix:%s", path_str); + if (path_str.size() >= sizeof(addr.sun_path)) { + error = strprintf("Unix address path %s exceeded maximum socket path length", fs::quoted(fs::PathToString(path))); + return false; + } + memset(&addr, 0, sizeof(addr)); + addr.sun_family = AF_UNIX; + strncpy(addr.sun_path, path_str.c_str(), sizeof(addr.sun_path)-1); + return true; + } + + error = strprintf("Unrecognized address '%s'", address); + return false; +} + +int ProcessImpl::connect(const fs::path& data_dir, + const std::string& dest_exe_name, + std::string& address) +{ + struct sockaddr_un addr; + std::string error; + if (!ParseAddress(address, data_dir, dest_exe_name, addr, error)) { + throw std::invalid_argument(error); + } + + int fd; + if ((fd = ::socket(addr.sun_family, SOCK_STREAM, 0)) == -1) { + throw std::system_error(errno, std::system_category()); + } + if (::connect(fd, (struct sockaddr*)&addr, sizeof(addr)) == 0) { + return fd; + } + int connect_error = errno; + if (::close(fd) != 0) { + LogPrintf("Error closing file descriptor %i '%s': %s\n", fd, address, SysErrorString(errno)); + } + throw std::system_error(connect_error, std::system_category()); +} + +int ProcessImpl::bind(const fs::path& data_dir, const std::string& exe_name, std::string& address) +{ + struct sockaddr_un addr; + std::string error; + if (!ParseAddress(address, data_dir, exe_name, addr, error)) { + throw std::invalid_argument(error); + } + + if (addr.sun_family == AF_UNIX) { + fs::path path = addr.sun_path; + if (path.has_parent_path()) fs::create_directories(path.parent_path()); + if (fs::symlink_status(path).type() == fs::file_type::socket) { + fs::remove(path); + } + } + + int fd; + if ((fd = ::socket(addr.sun_family, SOCK_STREAM, 0)) == -1) { + throw std::system_error(errno, std::system_category()); + } + + if (::bind(fd, (struct sockaddr*)&addr, sizeof(addr)) == 0) { + return fd; + } + int bind_error = errno; + if (::close(fd) != 0) { + LogPrintf("Error closing file descriptor %i: %s\n", fd, SysErrorString(errno)); + } + throw std::system_error(bind_error, std::system_category()); +} } // namespace std::unique_ptr MakeProcess() { return std::make_unique(); } diff --git a/src/ipc/process.h b/src/ipc/process.h index 40f2d2acf6..2ed8b73fab 100644 --- a/src/ipc/process.h +++ b/src/ipc/process.h @@ -34,6 +34,16 @@ public: //! process. If so, return true and a file descriptor for communicating //! with the parent process. virtual bool checkSpawned(int argc, char* argv[], int& fd) = 0; + + //! Canonicalize and connect to address, returning socket descriptor. + virtual int connect(const fs::path& data_dir, + const std::string& dest_exe_name, + std::string& address) = 0; + + //! Create listening socket, bind and canonicalize address, and return socket descriptor. + virtual int bind(const fs::path& data_dir, + const std::string& exe_name, + std::string& address) = 0; }; //! Constructor for Process interface. Implementation will vary depending on diff --git a/src/ipc/protocol.h b/src/ipc/protocol.h index 4cd892e411..1e355784ad 100644 --- a/src/ipc/protocol.h +++ b/src/ipc/protocol.h @@ -25,11 +25,31 @@ public: //! Return Init interface that forwards requests over given socket descriptor. //! Socket communication is handled on a background thread. + //! + //! @note It could be potentially useful in the future to add + //! std::function on_disconnect callback argument here. But there + //! isn't an immediate need, because the protocol implementation can clean + //! up its own state (calling ProxyServer destructors, etc) on disconnect, + //! and any client calls will just throw ipc::Exception errors after a + //! disconnect. virtual std::unique_ptr connect(int fd, const char* exe_name) = 0; + //! Listen for connections on provided socket descriptor, accept them, and + //! handle requests on accepted connections. This method doesn't block, and + //! performs I/O on a background thread. + virtual void listen(int listen_fd, const char* exe_name, interfaces::Init& init) = 0; + //! Handle requests on provided socket descriptor, forwarding them to the //! provided Init interface. Socket communication is handled on the //! current thread, and this call blocks until the socket is closed. + //! + //! @note: If this method is called, it needs be called before connect() or + //! listen() methods, because for ease of implementation it's inflexible and + //! always runs the event loop in the foreground thread. It can share its + //! event loop with the other methods but can't share an event loop that was + //! created by them. This isn't really a problem because serve() is only + //! called by spawned child processes that call it immediately to + //! communicate back with parent processes. virtual void serve(int fd, const char* exe_name, interfaces::Init& init) = 0; //! Add cleanup callback to interface that will run when the interface is -- cgit v1.2.3