// Copyright (c) 2021 The Bitcoin Core developers // Distributed under the MIT software license, see the accompanying // file COPYING or http://www.opensource.org/licenses/mit-license.php. #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include namespace ipc { namespace capnp { namespace { void IpcLogFn(bool raise, std::string message) { LogDebug(BCLog::IPC, "%s\n", message); if (raise) throw Exception(message); } class CapnpProtocol : public Protocol { public: ~CapnpProtocol() noexcept(true) { if (m_loop) { std::unique_lock lock(m_loop->m_mutex); m_loop->removeClient(lock); } if (m_loop_thread.joinable()) m_loop_thread.join(); assert(!m_loop); }; std::unique_ptr connect(int fd, const char* exe_name) override { startLoop(exe_name); return mp::ConnectStream(*m_loop, fd); } void listen(int listen_fd, const char* exe_name, interfaces::Init& init) override { startLoop(exe_name); if (::listen(listen_fd, /*backlog=*/5) != 0) { throw std::system_error(errno, std::system_category()); } mp::ListenConnections(*m_loop, listen_fd, init); } void serve(int fd, const char* exe_name, interfaces::Init& init, const std::function& ready_fn = {}) override { assert(!m_loop); mp::g_thread_context.thread_name = mp::ThreadName(exe_name); m_loop.emplace(exe_name, &IpcLogFn, &m_context); if (ready_fn) ready_fn(); mp::ServeStream(*m_loop, fd, init); m_loop->loop(); m_loop.reset(); } void addCleanup(std::type_index type, void* iface, std::function cleanup) override { mp::ProxyTypeRegister::types().at(type)(iface).cleanup.emplace_back(std::move(cleanup)); } Context& context() override { return m_context; } void startLoop(const char* exe_name) { if (m_loop) return; std::promise promise; m_loop_thread = std::thread([&] { util::ThreadRename("capnp-loop"); m_loop.emplace(exe_name, &IpcLogFn, &m_context); { std::unique_lock lock(m_loop->m_mutex); m_loop->addClient(lock); } promise.set_value(); m_loop->loop(); m_loop.reset(); }); promise.get_future().wait(); } Context m_context; std::thread m_loop_thread; std::optional m_loop; }; } // namespace std::unique_ptr MakeCapnpProtocol() { return std::make_unique(); } } // namespace capnp } // namespace ipc