diff --git a/src/net.cpp b/src/net.cpp index 3ac35b432efb80..45ae9b2050f119 100644 --- a/src/net.cpp +++ b/src/net.cpp @@ -100,10 +100,6 @@ enum BindFlags { BF_DONT_ADVERTISE = (1U << 1), }; -// The set of sockets cannot be modified while waiting -// The sleep time needs to be small to avoid new sockets stalling -static const uint64_t SELECT_TIMEOUT_MILLISECONDS = 50; - const std::string NET_MESSAGE_TYPE_OTHER = "*other*"; static const uint64_t RANDOMIZER_ID_NETGROUP = 0x6c0edd8036ef4036ULL; // SHA256("netgroup")[0:8] @@ -432,10 +428,8 @@ CNode* CConnman::ConnectNode(CAddress addrConnect, const char *pszDest, bool fCo } // Connect - std::unique_ptr sock; Proxy proxy; assert(!proxy.IsValid()); - std::unique_ptr i2p_transient_session; std::optional node_id; CService me; @@ -453,9 +447,7 @@ CNode* CConnman::ConnectNode(CAddress addrConnect, const char *pszDest, bool fCo /*is_important=*/conn_type == ConnectionType::MANUAL, proxy, proxyConnectionFailed, - me, - sock, - i2p_transient_session); + me); if (!proxyConnectionFailed) { // If a connection to the node was attempted, and failure (if any) is not caused by a problem connecting to @@ -472,9 +464,7 @@ CNode* CConnman::ConnectNode(CAddress addrConnect, const char *pszDest, bool fCo /*is_important=*/conn_type == ConnectionType::MANUAL, proxy, dummy, - me, - sock, - i2p_transient_session); + me); } // Check any other resolved address (if any) if we fail to connect if (!node_id.has_value()) { @@ -487,7 +477,6 @@ CNode* CConnman::ConnectNode(CAddress addrConnect, const char *pszDest, bool fCo const uint64_t nonce{GetDeterministicRandomizer(RANDOMIZER_ID_LOCALHOSTNONCE).Write(node_id.value()).Finalize()}; CNode* pnode = new CNode(node_id.value(), - std::move(sock), target_addr, CalculateKeyedNetGroup(target_addr), nonce, @@ -497,7 +486,6 @@ CNode* CConnman::ConnectNode(CAddress addrConnect, const char *pszDest, bool fCo /*inbound_onion=*/false, CNodeOptions{ .permission_flags = permission_flags, - .i2p_sam_session = std::move(i2p_transient_session), .recv_flood_size = nReceiveFloodSize, .use_v2transport = use_v2transport, }); @@ -512,17 +500,6 @@ CNode* CConnman::ConnectNode(CAddress addrConnect, const char *pszDest, bool fCo return nullptr; } -void CNode::CloseSocketDisconnect() -{ - fDisconnect = true; - LOCK(m_sock_mutex); - if (m_sock) { - LogDebug(BCLog::NET, "disconnecting peer=%d\n", id); - m_sock.reset(); - } - m_i2p_sam_session.reset(); -} - void CConnman::AddWhitelistPermissionFlags(NetPermissionFlags& flags, const CNetAddr &addr, const std::vector& ranges) const { for (const auto& subnet : ranges) { if (subnet.m_subnet.Match(addr)) { @@ -1532,7 +1509,7 @@ Transport::Info V2Transport::GetInfo() const noexcept return info; } -std::pair CConnman::SocketSendData(CNode& node) +std::pair CConnman::SendMessagesAsBytes(CNode& node) { AssertLockNotHeld(m_total_bytes_sent_mutex); @@ -1560,45 +1537,29 @@ std::pair CConnman::SocketSendData(CNode& node) if (expected_more.has_value()) Assume(!data.empty() == *expected_more); expected_more = more; data_left = !data.empty(); // will be overwritten on next loop if all of data gets sent - int nBytes = 0; - if (!data.empty()) { - LOCK(node.m_sock_mutex); - // There is no socket in case we've already disconnected, or in test cases without - // real connections. In these cases, we bail out immediately and just leave things - // in the send queue and transport. - if (!node.m_sock) { - break; - } - int flags = MSG_NOSIGNAL | MSG_DONTWAIT; -#ifdef MSG_MORE - if (more) { - flags |= MSG_MORE; - } -#endif - nBytes = node.m_sock->Send(reinterpret_cast(data.data()), data.size(), flags); - } - if (nBytes > 0) { + + std::string errmsg; + + const ssize_t sent{SendBytes(node.GetId(), data, more, errmsg)}; + + if (sent > 0) { node.m_last_send = GetTime(); - node.nSendBytes += nBytes; + node.nSendBytes += sent; // Notify transport that bytes have been processed. - node.m_transport->MarkBytesSent(nBytes); + node.m_transport->MarkBytesSent(sent); // Update statistics per message type. if (!msg_type.empty()) { // don't report v2 handshake bytes for now - node.AccountForSentBytes(msg_type, nBytes); + node.AccountForSentBytes(msg_type, sent); } - nSentSize += nBytes; - if ((size_t)nBytes != data.size()) { + nSentSize += sent; + if (static_cast(sent) != data.size()) { // could not send full message; stop sending more break; } } else { - if (nBytes < 0) { - // error - int nErr = WSAGetLastError(); - if (nErr != WSAEWOULDBLOCK && nErr != WSAEMSGSIZE && nErr != WSAEINTR && nErr != WSAEINPROGRESS) { - LogDebug(BCLog::NET, "socket send error for peer=%d: %s\n", node.GetId(), NetworkErrorString(nErr)); - node.CloseSocketDisconnect(); - } + if (sent < 0) { + LogDebug(BCLog::NET, "socket send error for peer=%d: %s\n", node.GetId(), errmsg); + MarkAsDisconnectAndCloseConnection(node); } break; } @@ -1680,8 +1641,7 @@ bool CConnman::AttemptToEvictConnection() return false; } -void CConnman::EventNewConnectionAccepted(NodeId node_id, - std::unique_ptr&& sock, +bool CConnman::EventNewConnectionAccepted(NodeId node_id, const CService& addr_bind_, const CService& addr_) { @@ -1707,7 +1667,7 @@ void CConnman::EventNewConnectionAccepted(NodeId node_id, if (!fNetworkActive) { LogDebug(BCLog::NET, "connection from %s dropped: not accepting new connections\n", addr.ToStringAddrPort()); - return; + return false; } // Don't accept connections from banned peers. @@ -1715,7 +1675,7 @@ void CConnman::EventNewConnectionAccepted(NodeId node_id, if (!NetPermissions::HasFlag(permission_flags, NetPermissionFlags::NoBan) && banned) { LogDebug(BCLog::NET, "connection from %s dropped (banned)\n", addr.ToStringAddrPort()); - return; + return false; } // Only accept connections from discouraged peers if our inbound slots aren't (almost) full. @@ -1723,7 +1683,7 @@ void CConnman::EventNewConnectionAccepted(NodeId node_id, if (!NetPermissions::HasFlag(permission_flags, NetPermissionFlags::NoBan) && nInbound + 1 >= m_max_inbound && discouraged) { LogDebug(BCLog::NET, "connection from %s dropped (discouraged)\n", addr.ToStringAddrPort()); - return; + return false; } if (nInbound >= m_max_inbound) @@ -1731,7 +1691,7 @@ void CConnman::EventNewConnectionAccepted(NodeId node_id, if (!AttemptToEvictConnection()) { // No connection to evict, disconnect the new connection LogDebug(BCLog::NET, "failed to find an eviction candidate - connection dropped (full)\n"); - return; + return false; } } @@ -1744,7 +1704,6 @@ void CConnman::EventNewConnectionAccepted(NodeId node_id, const bool use_v2transport(local_services & NODE_P2P_V2); CNode* pnode = new CNode(node_id, - std::move(sock), CAddress{addr, NODE_NONE}, CalculateKeyedNetGroup(addr), nonce, @@ -1768,6 +1727,8 @@ void CConnman::EventNewConnectionAccepted(NodeId node_id, // We received a new connection, harvest entropy from the time (and our peer count) RandAddEvent(static_cast(node_id)); + + return true; } bool CConnman::AddConnection(const std::string& address, ConnectionType conn_type, bool use_v2transport = false) @@ -1809,6 +1770,14 @@ bool CConnman::AddConnection(const std::string& address, ConnectionType conn_typ return true; } +void CConnman::MarkAsDisconnectAndCloseConnection(CNode& node) +{ + node.fDisconnect = true; + if (CloseConnection(node.GetId())) { + LogDebug(BCLog::NET, "disconnecting peer=%d\n", node.GetId()); + } +} + void CConnman::DisconnectNodes() { AssertLockNotHeld(m_nodes_mutex); @@ -1859,8 +1828,7 @@ void CConnman::DisconnectNodes() // release outbound grant (if any) pnode->grantOutbound.Release(); - // close socket and cleanup - pnode->CloseSocketDisconnect(); + MarkAsDisconnectAndCloseConnection(*pnode); // update connection count by network if (pnode->IsManualOrFullOutboundConn()) --m_network_conn_counts[pnode->addr.GetNetwork()]; @@ -1956,7 +1924,7 @@ void CConnman::EventReadyToSend(NodeId node_id, bool& cancel_recv) return; } - const auto [bytes_sent, data_left] = WITH_LOCK(node->cs_vSend, return SocketSendData(*node);); + const auto [bytes_sent, data_left] = WITH_LOCK(node->cs_vSend, return SendMessagesAsBytes(*node);); // If both receiving and (non-optimistic) sending were possible, we first attempt // sending. If that succeeds, but does not fully drain the send queue, do not @@ -1980,7 +1948,7 @@ void CConnman::EventGotData(NodeId node_id, const uint8_t* data, size_t n) bool notify = false; if (!node->ReceiveMsgBytes({data, n}, notify)) { - node->CloseSocketDisconnect(); + MarkAsDisconnectAndCloseConnection(*node); } RecordBytesRecv(n); if (notify) { @@ -2001,7 +1969,7 @@ void CConnman::EventGotEOF(NodeId node_id) if (!node->fDisconnect) { LogDebug(BCLog::NET, "socket closed for peer=%d\n", node_id); } - node->CloseSocketDisconnect(); + MarkAsDisconnectAndCloseConnection(*node); } void CConnman::EventGotPermanentReadError(NodeId node_id, const std::string& errmsg) @@ -2016,7 +1984,7 @@ void CConnman::EventGotPermanentReadError(NodeId node_id, const std::string& err if (!node->fDisconnect) { LogDebug(BCLog::NET, "socket recv error for peer=%d: %s\n", node_id, errmsg); } - node->CloseSocketDisconnect(); + MarkAsDisconnectAndCloseConnection(*node); } bool CConnman::ShouldTryToSend(NodeId node_id) const @@ -2068,164 +2036,6 @@ void CConnman::EventIOLoopCompletedForAllPeers() DisconnectNodes(); NotifyNumConnectionsChanged(); } - -Sock::EventsPerSock CConnman::GenerateWaitSockets(Span nodes) -{ - AssertLockNotHeld(m_nodes_mutex); - - Sock::EventsPerSock events_per_sock; - - for (const auto& sock : m_listen) { - events_per_sock.emplace(sock, Sock::Events{Sock::RECV}); - } - - for (CNode* pnode : nodes) { - const bool select_recv{ShouldTryToRecv(pnode->GetId())}; - const bool select_send{ShouldTryToSend(pnode->GetId())}; - if (!select_recv && !select_send) continue; - - LOCK(pnode->m_sock_mutex); - if (pnode->m_sock) { - Sock::Event event = (select_send ? Sock::SEND : 0) | (select_recv ? Sock::RECV : 0); - events_per_sock.emplace(pnode->m_sock, Sock::Events{event}); - } - } - - return events_per_sock; -} - -void CConnman::SocketHandler() -{ - AssertLockNotHeld(m_total_bytes_sent_mutex); - - Sock::EventsPerSock events_per_sock; - - { - const NodesSnapshot snap{*this, /*shuffle=*/false}; - - const auto timeout = std::chrono::milliseconds(SELECT_TIMEOUT_MILLISECONDS); - - // Check for the readiness of the already connected sockets and the - // listening sockets in one call ("readiness" as in poll(2) or - // select(2)). If none are ready, wait for a short while and return - // empty sets. - events_per_sock = GenerateWaitSockets(snap.Nodes()); - if (events_per_sock.empty() || !events_per_sock.begin()->first->WaitMany(timeout, events_per_sock)) { - interruptNet.sleep_for(timeout); - } - - // Service (send/receive) each of the already connected nodes. - SocketHandlerConnected(snap.Nodes(), events_per_sock); - } - - // Accept new connections from listening sockets. - SocketHandlerListening(events_per_sock); -} - -void CConnman::SocketHandlerConnected(const std::vector& nodes, - const Sock::EventsPerSock& events_per_sock) -{ - AssertLockNotHeld(m_nodes_mutex); - AssertLockNotHeld(m_total_bytes_sent_mutex); - - for (CNode* pnode : nodes) { - if (interruptNet) - return; - - // - // Receive - // - bool recvSet = false; - bool sendSet = false; - bool errorSet = false; - { - LOCK(pnode->m_sock_mutex); - if (!pnode->m_sock) { - continue; - } - const auto it = events_per_sock.find(pnode->m_sock); - if (it != events_per_sock.end()) { - recvSet = it->second.occurred & Sock::RECV; - sendSet = it->second.occurred & Sock::SEND; - errorSet = it->second.occurred & Sock::ERR; - } - } - - if (sendSet) { - bool cancel_recv; - - EventReadyToSend(pnode->GetId(), cancel_recv); - - if (cancel_recv) { - recvSet = false; - } - } - - if (recvSet || errorSet) - { - // typical socket buffer is 8K-64K - uint8_t pchBuf[0x10000]; - int nBytes = 0; - { - LOCK(pnode->m_sock_mutex); - if (!pnode->m_sock) { - continue; - } - nBytes = pnode->m_sock->Recv(pchBuf, sizeof(pchBuf), MSG_DONTWAIT); - } - if (nBytes > 0) - { - EventGotData(pnode->GetId(), pchBuf, nBytes); - } - else if (nBytes == 0) - { - EventGotEOF(pnode->GetId()); - } - else if (nBytes < 0) - { - // error - int nErr = WSAGetLastError(); - if (nErr != WSAEWOULDBLOCK && nErr != WSAEMSGSIZE && nErr != WSAEINTR && nErr != WSAEINPROGRESS) - { - EventGotPermanentReadError(pnode->GetId(), NetworkErrorString(nErr)); - } - } - } - - EventIOLoopCompletedForNode(pnode->GetId()); - } -} - -void CConnman::SocketHandlerListening(const Sock::EventsPerSock& events_per_sock) -{ - for (const auto& sock : m_listen) { - if (interruptNet) { - return; - } - const auto it = events_per_sock.find(sock); - if (it != events_per_sock.end() && it->second.occurred & Sock::RECV) { - CService addr_accepted; - - auto sock_accepted{AcceptConnection(*sock, addr_accepted)}; - - if (sock_accepted) { - NewSockAccepted(std::move(sock_accepted), GetBindAddress(*sock), addr_accepted); - } - } - } -} - -void CConnman::ThreadSocketHandler() -{ - AssertLockNotHeld(m_nodes_mutex); - AssertLockNotHeld(m_total_bytes_sent_mutex); - - while (!interruptNet) - { - EventIOLoopCompletedForAllPeers(); - SocketHandler(); - } -} void CConnman::WakeMessageHandler() { @@ -3269,9 +3079,6 @@ bool CConnman::Start(CScheduler& scheduler, const Options& connOptions) fMsgProcWake = false; } - // Send and receive from sockets, accept connections - threadSocketHandler = std::thread(&util::TraceThread, "net", [this] { ThreadSocketHandler(); }); - SockMan::Options sockman_options; Proxy i2p_sam; @@ -3369,8 +3176,6 @@ void CConnman::StopThreads() threadOpenAddedConnections.join(); if (threadDNSAddressSeed.joinable()) threadDNSAddressSeed.join(); - if (threadSocketHandler.joinable()) - threadSocketHandler.join(); } void CConnman::StopNodes() @@ -3393,7 +3198,7 @@ void CConnman::StopNodes() decltype(m_nodes) nodes; WITH_LOCK(m_nodes_mutex, nodes.swap(m_nodes)); for (auto& [id, pnode] : nodes) { - pnode->CloseSocketDisconnect(); + MarkAsDisconnectAndCloseConnection(*pnode); DeleteNode(pnode); } @@ -3711,7 +3516,6 @@ static std::unique_ptr MakeTransport(NodeId id, bool use_v2transport, } CNode::CNode(NodeId idIn, - std::shared_ptr sock, const CAddress& addrIn, uint64_t nKeyedNetGroupIn, uint64_t nLocalHostNonceIn, @@ -3722,7 +3526,6 @@ CNode::CNode(NodeId idIn, CNodeOptions&& node_opts) : m_transport{MakeTransport(idIn, node_opts.use_v2transport, conn_type_in == ConnectionType::INBOUND)}, m_permission_flags{node_opts.permission_flags}, - m_sock{sock}, m_connected{GetTime()}, addr{addrIn}, addrBind{addrBindIn}, @@ -3734,8 +3537,7 @@ CNode::CNode(NodeId idIn, m_conn_type{conn_type_in}, id{idIn}, nLocalHostNonce{nLocalHostNonceIn}, - m_recv_flood_size{node_opts.recv_flood_size}, - m_i2p_sam_session{std::move(node_opts.i2p_sam_session)} + m_recv_flood_size{node_opts.recv_flood_size} { if (inbound_onion) assert(conn_type_in == ConnectionType::INBOUND); @@ -3821,13 +3623,13 @@ void CConnman::PushMessage(CNode* pnode, CSerializedNetMsg&& msg) // If there was nothing to send before, and there is now (predicted by the "more" value // returned by the GetBytesToSend call above), attempt "optimistic write": - // because the poll/select loop may pause for SELECT_TIMEOUT_MILLISECONDS before actually + // because the poll/select loop may pause for a while before actually // doing a send, try sending from the calling thread if the queue was empty before. // With a V1Transport, more will always be true here, because adding a message always // results in sendable bytes there, but with V2Transport this is not the case (it may // still be in the handshake). if (queue_was_empty && more) { - SocketSendData(*pnode); + SendMessagesAsBytes(*pnode); } } } diff --git a/src/net.h b/src/net.h index 34c2a478d5bf20..30ce7c12ba7ff8 100644 --- a/src/net.h +++ b/src/net.h @@ -658,7 +658,6 @@ class V2Transport final : public Transport struct CNodeOptions { NetPermissionFlags permission_flags = NetPermissionFlags::None; - std::unique_ptr i2p_sam_session = nullptr; bool prefer_evict = false; size_t recv_flood_size{DEFAULT_MAXRECEIVEBUFFER * 1000}; bool use_v2transport = false; @@ -674,16 +673,6 @@ class CNode const NetPermissionFlags m_permission_flags; - /** - * Socket used for communication with the node. - * May not own a Sock object (after `CloseSocketDisconnect()` or during tests). - * `shared_ptr` (instead of `unique_ptr`) is used to avoid premature close of - * the underlying file descriptor by one thread while another thread is - * poll(2)-ing it for activity. - * @see https://github.com/bitcoin/bitcoin/issues/21744 for details. - */ - std::shared_ptr m_sock GUARDED_BY(m_sock_mutex); - /** Sum of GetMemoryUsage of all vSendMsg entries. */ size_t m_send_memusage GUARDED_BY(cs_vSend){0}; /** Total number of bytes sent on the wire to this peer. */ @@ -875,7 +864,6 @@ class CNode std::atomic m_min_ping_time{std::chrono::microseconds::max()}; CNode(NodeId id, - std::shared_ptr sock, const CAddress& addrIn, uint64_t nKeyedNetGroupIn, uint64_t nLocalHostNonceIn, @@ -937,8 +925,6 @@ class CNode nRefCount--; } - void CloseSocketDisconnect() EXCLUSIVE_LOCKS_REQUIRED(!m_sock_mutex); - void CopyStats(CNodeStats& stats) EXCLUSIVE_LOCKS_REQUIRED(!m_subver_mutex, !m_addr_local_mutex, !cs_vSend, !cs_vRecv); std::string ConnectionTypeAsString() const { return ::ConnectionTypeAsString(m_conn_type); } @@ -967,18 +953,6 @@ class CNode mapMsgTypeSize mapSendBytesPerMsgType GUARDED_BY(cs_vSend); mapMsgTypeSize mapRecvBytesPerMsgType GUARDED_BY(cs_vRecv); - - /** - * If an I2P session is created per connection (for outbound transient I2P - * connections) then it is stored here so that it can be destroyed when the - * socket is closed. I2P sessions involve a data/transport socket (in `m_sock`) - * and a control socket (in `m_i2p_sam_session`). For transient sessions, once - * the data socket is closed, the control socket is not going to be used anymore - * and is just taking up resources. So better close it as soon as `m_sock` is - * closed. - * Otherwise this unique_ptr is empty. - */ - std::unique_ptr m_i2p_sam_session GUARDED_BY(m_sock_mutex); }; /** @@ -1274,15 +1248,21 @@ class CConnman : private SockMan /** * Create a `CNode` object and add it to the `m_nodes` member. * @param[in] node_id Id of the newly accepted connection. - * @param[in] sock Connected socket to communicate with the peer. * @param[in] me The address and port at our side of the connection. * @param[in] them The address and port at the peer's side of the connection. + * @retval true on success + * @retval false on failure, meaning that the associated socket and node_id should be discarded */ - virtual void EventNewConnectionAccepted(NodeId node_id, - std::unique_ptr&& sock, + virtual bool EventNewConnectionAccepted(NodeId node_id, const CService& me, const CService& them) override; + /** + * Mark a node as disconnected and close its connection with the peer. + * @param[in] node Node to disconnect. + */ + void MarkAsDisconnectAndCloseConnection(CNode& node); + void DisconnectNodes() EXCLUSIVE_LOCKS_REQUIRED(!m_reconnections_mutex, !m_nodes_mutex); void NotifyNumConnectionsChanged(); /** Return true if the peer is inactive and should be disconnected. */ @@ -1311,37 +1291,7 @@ class CConnman : private SockMan virtual void EventIOLoopCompletedForAllPeers() override EXCLUSIVE_LOCKS_REQUIRED(!m_nodes_mutex, !m_reconnections_mutex); - - /** - * Generate a collection of sockets to check for IO readiness. - * @param[in] nodes Select from these nodes' sockets. - * @return sockets to check for readiness - */ - Sock::EventsPerSock GenerateWaitSockets(Span nodes) - EXCLUSIVE_LOCKS_REQUIRED(!m_nodes_mutex); - - /** - * Check connected and listening sockets for IO readiness and process them accordingly. - */ - void SocketHandler() - EXCLUSIVE_LOCKS_REQUIRED(!m_nodes_mutex, !m_total_bytes_sent_mutex, !mutexMsgProc); - - /** - * Do the read/write for connected sockets that are ready for IO. - * @param[in] nodes Nodes to process. The socket of each node is checked against `what`. - * @param[in] events_per_sock Sockets that are ready for IO. - */ - void SocketHandlerConnected(const std::vector& nodes, - const Sock::EventsPerSock& events_per_sock) - EXCLUSIVE_LOCKS_REQUIRED(!m_nodes_mutex, !m_total_bytes_sent_mutex, !mutexMsgProc); - - /** - * Accept incoming connections, one from each read-ready listening socket. - * @param[in] events_per_sock Sockets that are ready for IO. - */ - void SocketHandlerListening(const Sock::EventsPerSock& events_per_sock); - void ThreadSocketHandler() EXCLUSIVE_LOCKS_REQUIRED(!m_total_bytes_sent_mutex, !mutexMsgProc, !m_nodes_mutex, !m_reconnections_mutex); void ThreadDNSAddressSeed() EXCLUSIVE_LOCKS_REQUIRED(!m_addr_fetches_mutex, !m_nodes_mutex); uint64_t CalculateKeyedNetGroup(const CNetAddr& ad) const; @@ -1363,8 +1313,8 @@ class CConnman : private SockMan void DeleteNode(CNode* pnode); /** (Try to) send data from node's vSendMsg. Returns (bytes_sent, data_left). */ - std::pair SocketSendData(CNode& node) - EXCLUSIVE_LOCKS_REQUIRED(node.cs_vSend, !m_total_bytes_sent_mutex); + std::pair SendMessagesAsBytes(CNode& node) EXCLUSIVE_LOCKS_REQUIRED(node.cs_vSend) + EXCLUSIVE_LOCKS_REQUIRED(!m_total_bytes_sent_mutex); void DumpAddresses(); @@ -1539,7 +1489,6 @@ class CConnman : private SockMan std::atomic flagInterruptMsgProc{false}; std::thread threadDNSAddressSeed; - std::thread threadSocketHandler; std::thread threadOpenAddedConnections; std::thread threadOpenConnections; std::thread threadMessageHandler; diff --git a/src/sockman.cpp b/src/sockman.cpp index 644a27187f7ca7..e0dd3de501ee57 100644 --- a/src/sockman.cpp +++ b/src/sockman.cpp @@ -10,7 +10,11 @@ #include #include -CService GetBindAddress(const Sock& sock) +// The set of sockets cannot be modified while waiting +// The sleep time needs to be small to avoid new sockets stalling +static constexpr auto SELECT_TIMEOUT{50ms}; + +static CService GetBindAddress(const Sock& sock) { CService addr_bind; struct sockaddr_storage sockaddr_bind; @@ -104,6 +108,8 @@ bool SockMan::BindAndStartListening(const CService& to, bilingual_str& errmsg) void SockMan::StartSocketsThreads(const Options& options) { + m_thread_socket_handler = std::thread(&util::TraceThread, "net", [this] { ThreadSocketHandler(); }); + if (options.i2p.has_value()) { m_i2p_sam_session = std::make_unique( options.i2p->private_key_file, options.i2p->sam_proxy, &interruptNet); @@ -118,6 +124,10 @@ void SockMan::JoinSocketsThreads() if (m_thread_i2p_accept.joinable()) { m_thread_i2p_accept.join(); } + + if (m_thread_socket_handler.joinable()) { + m_thread_socket_handler.join(); + } } std::optional @@ -125,12 +135,14 @@ SockMan::ConnectAndMakeNodeId(const std::variant& t bool is_important, const Proxy& proxy, bool& proxy_failed, - CService& me, - std::unique_ptr& sock, - std::unique_ptr& i2p_transient_session) + CService& me) { + AssertLockNotHeld(m_connected_mutex); AssertLockNotHeld(m_unused_i2p_sessions_mutex); + std::unique_ptr sock; + std::unique_ptr i2p_transient_session; + Assume(!me.IsValid()); if (std::holds_alternative(to)) { @@ -194,6 +206,12 @@ SockMan::ConnectAndMakeNodeId(const std::variant& t const NodeId node_id{GetNewNodeId()}; + { + LOCK(m_connected_mutex); + m_connected.emplace(node_id, std::make_shared(std::move(sock), + std::move(i2p_transient_session))); + } + return node_id; } @@ -224,6 +242,8 @@ std::unique_ptr SockMan::AcceptConnection(const Sock& listen_sock, CServic void SockMan::NewSockAccepted(std::unique_ptr&& sock, const CService& me, const CService& them) { + AssertLockNotHeld(m_connected_mutex); + if (!sock->IsSelectable()) { LogPrintf("connection from %s dropped: non-selectable socket\n", them.ToStringAddrPort()); return; @@ -239,7 +259,14 @@ void SockMan::NewSockAccepted(std::unique_ptr&& sock, const CService& me, const NodeId node_id{GetNewNodeId()}; - EventNewConnectionAccepted(node_id, std::move(sock), me, them); + { + LOCK(m_connected_mutex); + m_connected.emplace(node_id, std::make_shared(std::move(sock))); + } + + if (!EventNewConnectionAccepted(node_id, me, them)) { + CloseConnection(node_id); + } } NodeId SockMan::GetNewNodeId() @@ -247,6 +274,60 @@ NodeId SockMan::GetNewNodeId() return m_next_node_id.fetch_add(1, std::memory_order_relaxed); } +bool SockMan::CloseConnection(NodeId node_id) +{ + LOCK(m_connected_mutex); + return m_connected.erase(node_id) > 0; +} + +ssize_t SockMan::SendBytes(NodeId node_id, + Span data, + bool will_send_more, + std::string& errmsg) const +{ + AssertLockNotHeld(m_connected_mutex); + + if (data.empty()) { + return 0; + } + + std::shared_ptr node_sockets; + + { + LOCK(m_connected_mutex); + auto it{m_connected.find(node_id)}; + if (it == m_connected.end()) { + // There is no socket in case we've already disconnected, or in test cases without + // real connections. In these cases, we bail out immediately and just leave things + // in the caller's send queue. + return 0; + } + node_sockets = it->second; + } + + int flags{MSG_NOSIGNAL | MSG_DONTWAIT}; +#ifdef MSG_MORE + if (will_send_more) { + flags |= MSG_MORE; + } +#endif + + const ssize_t sent{WITH_LOCK( + node_sockets->mutex, + return node_sockets->sock->Send(reinterpret_cast(data.data()), data.size(), flags);)}; + + if (sent >= 0) { + return sent; + } + + const int err{WSAGetLastError()}; + if (err == WSAEWOULDBLOCK || err == WSAEMSGSIZE || err == WSAEINTR || err == WSAEINPROGRESS) { + return 0; + } + errmsg = NetworkErrorString(err); + return -1; +} + void SockMan::CloseSockets() { m_listen.clear(); @@ -262,8 +343,16 @@ void SockMan::EventIOLoopCompletedForAllPeers() {} void SockMan::EventI2PListen(const CService&, bool) {} +void SockMan::TestOnlyAddExistentNode(NodeId node_id, std::unique_ptr&& sock) +{ + LOCK(m_connected_mutex); + m_connected.emplace(node_id, std::make_shared(std::move(sock))); +} + void SockMan::ThreadI2PAccept() { + AssertLockNotHeld(m_connected_mutex); + static constexpr auto err_wait_begin = 1s; static constexpr auto err_wait_cap = 5min; auto err_wait = err_wait_begin; @@ -297,3 +386,136 @@ void SockMan::ThreadI2PAccept() err_wait = err_wait_begin; } } + +void SockMan::ThreadSocketHandler() +{ + AssertLockNotHeld(m_connected_mutex); + + while (!interruptNet) { + EventIOLoopCompletedForAllPeers(); + + Sock::EventsPerSock events_per_sock; + + // Check for the readiness of the already connected sockets and the + // listening sockets in one call ("readiness" as in poll(2) or + // select(2)). If none are ready, wait for a short while and return + // empty sets. + events_per_sock = GenerateWaitSockets(); + if (events_per_sock.empty() || + !events_per_sock.begin()->first->WaitMany(SELECT_TIMEOUT, events_per_sock)) { + + interruptNet.sleep_for(SELECT_TIMEOUT); + } + + // Service (send/receive) each of the already connected sockets. + SocketHandlerConnected(events_per_sock); + + // Accept new connections from listening sockets. + SocketHandlerListening(events_per_sock); + } +} + +Sock::EventsPerSock SockMan::GenerateWaitSockets() +{ + AssertLockNotHeld(m_connected_mutex); + + Sock::EventsPerSock events_per_sock; + + for (const auto& sock : m_listen) { + events_per_sock.emplace(sock, Sock::Events{Sock::RECV}); + } + + auto connected_snapshot{WITH_LOCK(m_connected_mutex, return m_connected;)}; + + for (const auto& [node_id, node_sockets] : connected_snapshot) { + const bool select_recv{ShouldTryToRecv(node_id)}; + const bool select_send{ShouldTryToSend(node_id)}; + if (!select_recv && !select_send) continue; + + Sock::Event event = (select_send ? Sock::SEND : 0) | (select_recv ? Sock::RECV : 0); + events_per_sock.emplace(node_sockets->sock, Sock::Events{event}); + } + + return events_per_sock; +} + +void SockMan::SocketHandlerConnected(const Sock::EventsPerSock& events_per_sock) +{ + AssertLockNotHeld(m_connected_mutex); + + auto connected_snapshot{WITH_LOCK(m_connected_mutex, return m_connected;)}; + + for (const auto& [node_id, node_sockets] : connected_snapshot) { + if (interruptNet) { + return; + } + + bool recv_ready{false}; + bool send_ready{false}; + bool err_ready{false}; + + const auto it{events_per_sock.find(node_sockets->sock)}; + if (it != events_per_sock.end()) { + recv_ready = it->second.occurred & Sock::RECV; + send_ready = it->second.occurred & Sock::SEND; + err_ready = it->second.occurred & Sock::ERR; + } + + if (send_ready) { + bool cancel_recv; + + EventReadyToSend(node_id, cancel_recv); + + if (cancel_recv) { + recv_ready = false; + } + } + + if (recv_ready || err_ready) { + uint8_t buf[0x10000]; // typical socket buffer is 8K-64K + + const ssize_t nrecv{WITH_LOCK( + node_sockets->mutex, + return node_sockets->sock->Recv(buf, sizeof(buf), MSG_DONTWAIT);)}; + + switch (nrecv) { + case -1: { + const int err = WSAGetLastError(); + if (err != WSAEWOULDBLOCK && err != WSAEMSGSIZE && err != WSAEINTR && err != WSAEINPROGRESS) { + EventGotPermanentReadError(node_id, NetworkErrorString(err)); + } + break; + } + case 0: + EventGotEOF(node_id); + break; + default: + EventGotData(node_id, buf, nrecv); + break; + } + } + + EventIOLoopCompletedForNode(node_id); + } +} + +void SockMan::SocketHandlerListening(const Sock::EventsPerSock& events_per_sock) +{ + AssertLockNotHeld(m_connected_mutex); + + for (const auto& sock : m_listen) { + if (interruptNet) { + return; + } + const auto it = events_per_sock.find(sock); + if (it != events_per_sock.end() && it->second.occurred & Sock::RECV) { + CService addr_accepted; + + auto sock_accepted{AcceptConnection(*sock, addr_accepted)}; + + if (sock_accepted) { + NewSockAccepted(std::move(sock_accepted), GetBindAddress(*sock), addr_accepted); + } + } + } +} diff --git a/src/sockman.h b/src/sockman.h index bd63008a545da1..41ef612115b752 100644 --- a/src/sockman.h +++ b/src/sockman.h @@ -21,8 +21,6 @@ typedef int64_t NodeId; -CService GetBindAddress(const Sock& sock); - /** * A socket manager class which handles socket operations. * To use this class, inherit from it and implement the pure virtual methods. @@ -31,6 +29,8 @@ CService GetBindAddress(const Sock& sock); * - starting of necessary threads to process socket operations * - accepting incoming connections * - making outbound connections + * - closing connections + * - waiting for IO readiness on sockets and doing send/recv accordingly */ class SockMan { @@ -97,18 +97,14 @@ class SockMan * proxy, then it will be set to true. * @param[out] me If the connection was successful then this is set to the address on the * local side of the socket. - * @param[out] sock Connected socket, if the operation is successful. - * @param[out] i2p_transient_session I2P session, if the operation is successful. * @return Newly generated node id, or std::nullopt if the operation fails. */ std::optional ConnectAndMakeNodeId(const std::variant& to, bool is_important, const Proxy& proxy, bool& proxy_failed, - CService& me, - std::unique_ptr& sock, - std::unique_ptr& i2p_transient_session) - EXCLUSIVE_LOCKS_REQUIRED(!m_unused_i2p_sessions_mutex); + CService& me) + EXCLUSIVE_LOCKS_REQUIRED(!m_connected_mutex, !m_unused_i2p_sessions_mutex); /** * Accept a connection. @@ -125,13 +121,38 @@ class SockMan * @param[in] me Address at our end of the connection. * @param[in] them Address of the new peer. */ - void NewSockAccepted(std::unique_ptr&& sock, const CService& me, const CService& them); + void NewSockAccepted(std::unique_ptr&& sock, const CService& me, const CService& them) + EXCLUSIVE_LOCKS_REQUIRED(!m_connected_mutex); /** * Generate an id for a newly created node. */ NodeId GetNewNodeId(); + /** + * Disconnect a given peer by closing its socket and release resources occupied by it. + * @return Whether the peer existed and its socket was closed by this call. + */ + bool CloseConnection(NodeId node_id) + EXCLUSIVE_LOCKS_REQUIRED(!m_connected_mutex); + + /** + * Try to send some data to the given node. + * @param[in] node_id Identifier of the node to send to. + * @param[in] data The data to send, it might happen that only a prefix of this is sent. + * @param[in] will_send_more Used as an optimization if the caller knows that they will + * be sending more data soon after this call. + * @param[out] errmsg If <0 is returned then this will contain a human readable message + * explaining the error. + * @retval >=0 The number of bytes actually sent. + * @retval <0 A permanent error has occurred. + */ + ssize_t SendBytes(NodeId node_id, + Span data, + bool will_send_more, + std::string& errmsg) const + EXCLUSIVE_LOCKS_REQUIRED(!m_connected_mutex); + /** * Close all sockets. */ @@ -144,12 +165,13 @@ class SockMan /** * Be notified when a new connection has been accepted. * @param[in] node_id Id of the newly accepted connection. - * @param[in] sock Connected socket to communicate with the peer. * @param[in] me The address and port at our side of the connection. * @param[in] them The address and port at the peer's side of the connection. + * @retval true The new connection was accepted at the higher level. + * @retval false The connection was refused at the higher level, so the + * associated socket and node_id should be discarded by `SockMan`. */ - virtual void EventNewConnectionAccepted(NodeId node_id, - std::unique_ptr&& sock, + virtual bool EventNewConnectionAccepted(NodeId node_id, const CService& me, const CService& them) = 0; @@ -254,6 +276,15 @@ class SockMan */ std::vector> m_listen; +protected: + + /** + * During some tests mocked sockets are created outside of `SockMan`, make it + * possible to add those so that send/recv can be exercised. + */ + void TestOnlyAddExistentNode(NodeId node_id, std::unique_ptr&& sock) + EXCLUSIVE_LOCKS_REQUIRED(!m_connected_mutex); + private: /** @@ -266,13 +297,46 @@ class SockMan * Accept incoming I2P connections in a loop and call * `EventNewConnectionAccepted()` for each new connection. */ - void ThreadI2PAccept(); + void ThreadI2PAccept() + EXCLUSIVE_LOCKS_REQUIRED(!m_connected_mutex); + + /** + * Check connected and listening sockets for IO readiness and process them accordingly. + */ + void ThreadSocketHandler() + EXCLUSIVE_LOCKS_REQUIRED(!m_connected_mutex); + + /** + * Generate a collection of sockets to check for IO readiness. + * @return Sockets to check for readiness. + */ + Sock::EventsPerSock GenerateWaitSockets() + EXCLUSIVE_LOCKS_REQUIRED(!m_connected_mutex); + + /** + * Do the read/write for connected sockets that are ready for IO. + * @param[in] events_per_sock Sockets that are ready for IO. + */ + void SocketHandlerConnected(const Sock::EventsPerSock& events_per_sock) + EXCLUSIVE_LOCKS_REQUIRED(!m_connected_mutex); + + /** + * Accept incoming connections, one from each read-ready listening socket. + * @param[in] events_per_sock Sockets that are ready for IO. + */ + void SocketHandlerListening(const Sock::EventsPerSock& events_per_sock) + EXCLUSIVE_LOCKS_REQUIRED(!m_connected_mutex); /** * The id to assign to the next created node. Used to generate ids of nodes. */ std::atomic m_next_node_id{0}; + /** + * Thread that sends to and receives from sockets and accepts connections. + */ + std::thread m_thread_socket_handler; + /** * Thread that accepts incoming I2P connections in a loop, can be stopped via `interruptNet`. */ @@ -291,6 +355,55 @@ class SockMan * a host fails, then the created session is put to this pool for reuse. */ std::queue> m_unused_i2p_sessions GUARDED_BY(m_unused_i2p_sessions_mutex); + + /** + * The sockets used by a connected node - a data socket and an optional I2P session. + */ + struct NodeSockets { + explicit NodeSockets(std::unique_ptr&& s) + : sock{std::move(s)} + { + } + + explicit NodeSockets(std::shared_ptr&& s, std::unique_ptr&& sess) + : sock{std::move(s)}, + i2p_transient_session{std::move(sess)} + { + } + + /** + * Mutex that serializes the Send() and Recv() calls on `sock`. + */ + Mutex mutex; + + /** + * Underlying socket. + * `shared_ptr` (instead of `unique_ptr`) is used to avoid premature close of the + * underlying file descriptor by one thread while another thread is poll(2)-ing + * it for activity. + * @see https://github.com/bitcoin/bitcoin/issues/21744 for details. + */ + std::shared_ptr sock; + + /** + * When transient I2P sessions are used, then each node has its own session, otherwise + * all nodes use the session from `m_i2p_sam_session` and share the same I2P address. + * I2P sessions involve a data/transport socket (in `sock`) and a control socket + * (in `i2p_transient_session`). For transient sessions, once the data socket `sock` is + * closed, the control socket is not going to be used anymore and would be just taking + * resources. Storing it here makes its deletion together with `sock` automatic. + */ + std::unique_ptr i2p_transient_session; + }; + + mutable Mutex m_connected_mutex; + + /** + * Sockets for connected peers. + * The `shared_ptr` makes it possible to create a snapshot of this by simply copying + * it (under `m_connected_mutex`). + */ + std::unordered_map> m_connected GUARDED_BY(m_connected_mutex); }; #endif // BITCOIN_SOCKMAN_H diff --git a/src/test/denialofservice_tests.cpp b/src/test/denialofservice_tests.cpp index 9ee7e9c9fe24ba..d2e62c7395cd85 100644 --- a/src/test/denialofservice_tests.cpp +++ b/src/test/denialofservice_tests.cpp @@ -55,7 +55,6 @@ BOOST_AUTO_TEST_CASE(outbound_slow_chain_eviction) CAddress addr1(ip(0xa0b0c001), NODE_NONE); NodeId id{0}; CNode dummyNode1{id++, - /*sock=*/nullptr, addr1, /*nKeyedNetGroupIn=*/0, /*nLocalHostNonceIn=*/0, @@ -121,7 +120,6 @@ void AddRandomOutboundPeer(NodeId& id, std::vector& vNodes, PeerManager& } vNodes.emplace_back(new CNode{id++, - /*sock=*/nullptr, addr, /*nKeyedNetGroupIn=*/0, /*nLocalHostNonceIn=*/0, @@ -320,7 +318,6 @@ BOOST_AUTO_TEST_CASE(peer_discouragement) banman->ClearBanned(); NodeId id{0}; nodes[0] = new CNode{id++, - /*sock=*/nullptr, addr[0], /*nKeyedNetGroupIn=*/0, /*nLocalHostNonceIn=*/0, @@ -340,7 +337,6 @@ BOOST_AUTO_TEST_CASE(peer_discouragement) BOOST_CHECK(!banman->IsDiscouraged(other_addr)); // Different address, not discouraged nodes[1] = new CNode{id++, - /*sock=*/nullptr, addr[1], /*nKeyedNetGroupIn=*/1, /*nLocalHostNonceIn=*/1, @@ -370,7 +366,6 @@ BOOST_AUTO_TEST_CASE(peer_discouragement) // Make sure non-IP peers are discouraged and disconnected properly. nodes[2] = new CNode{id++, - /*sock=*/nullptr, addr[2], /*nKeyedNetGroupIn=*/1, /*nLocalHostNonceIn=*/1, @@ -412,7 +407,6 @@ BOOST_AUTO_TEST_CASE(DoS_bantime) CAddress addr(ip(0xa0b0c001), NODE_NONE); NodeId id{0}; CNode dummyNode{id++, - /*sock=*/nullptr, addr, /*nKeyedNetGroupIn=*/4, /*nLocalHostNonceIn=*/4, diff --git a/src/test/fuzz/connman.cpp b/src/test/fuzz/connman.cpp index beefc9d82edbe7..f58cd109b0f70b 100644 --- a/src/test/fuzz/connman.cpp +++ b/src/test/fuzz/connman.cpp @@ -46,12 +46,13 @@ FUZZ_TARGET(connman, .init = initialize_connman) CNetAddr random_netaddr; CNode random_node = ConsumeNode(fuzzed_data_provider); + connman.AddTestNode(random_node, std::make_unique(fuzzed_data_provider)); CSubNet random_subnet; std::string random_string; LIMITED_WHILE(fuzzed_data_provider.ConsumeBool(), 100) { CNode& p2p_node{*ConsumeNodeAsUniquePtr(fuzzed_data_provider).release()}; - connman.AddTestNode(p2p_node); + connman.AddTestNode(p2p_node, std::make_unique(fuzzed_data_provider)); } LIMITED_WHILE(fuzzed_data_provider.ConsumeBool(), 10000) { diff --git a/src/test/fuzz/net.cpp b/src/test/fuzz/net.cpp index 4cec96274e7a2b..42b5c7180bb39e 100644 --- a/src/test/fuzz/net.cpp +++ b/src/test/fuzz/net.cpp @@ -41,9 +41,6 @@ FUZZ_TARGET(net, .init = initialize_net) LIMITED_WHILE(fuzzed_data_provider.ConsumeBool(), 10000) { CallOneOf( fuzzed_data_provider, - [&] { - node.CloseSocketDisconnect(); - }, [&] { CNodeStats stats; node.CopyStats(stats); diff --git a/src/test/fuzz/p2p_handshake.cpp b/src/test/fuzz/p2p_handshake.cpp index 6c1ed11d456a25..d8271da2f65dc8 100644 --- a/src/test/fuzz/p2p_handshake.cpp +++ b/src/test/fuzz/p2p_handshake.cpp @@ -64,7 +64,7 @@ FUZZ_TARGET(p2p_handshake, .init = ::initialize) const auto num_peers_to_add = fuzzed_data_provider.ConsumeIntegralInRange(1, 3); for (int i = 0; i < num_peers_to_add; ++i) { peers.push_back(ConsumeNodeAsUniquePtr(fuzzed_data_provider, i).release()); - connman.AddTestNode(*peers.back()); + connman.AddTestNode(*peers.back(), std::make_unique(fuzzed_data_provider)); peerman->InitializeNode( *peers.back(), static_cast(fuzzed_data_provider.ConsumeIntegral())); diff --git a/src/test/fuzz/p2p_headers_presync.cpp b/src/test/fuzz/p2p_headers_presync.cpp index e22087303a579c..f9d4512758f96f 100644 --- a/src/test/fuzz/p2p_headers_presync.cpp +++ b/src/test/fuzz/p2p_headers_presync.cpp @@ -55,7 +55,7 @@ void HeadersSyncSetup::ResetAndInitialize() for (auto conn_type : conn_types) { CAddress addr{}; - m_connections.push_back(new CNode(id++, nullptr, addr, 0, 0, addr, "", conn_type, false)); + m_connections.push_back(new CNode(id++, addr, 0, 0, addr, "", conn_type, false)); CNode& p2p_node = *m_connections.back(); connman.Handshake( diff --git a/src/test/fuzz/process_message.cpp b/src/test/fuzz/process_message.cpp index 6373eac1c32403..e77a26a5b96a5f 100644 --- a/src/test/fuzz/process_message.cpp +++ b/src/test/fuzz/process_message.cpp @@ -67,7 +67,7 @@ FUZZ_TARGET(process_message, .init = initialize_process_message) } CNode& p2p_node = *ConsumeNodeAsUniquePtr(fuzzed_data_provider).release(); - connman.AddTestNode(p2p_node); + connman.AddTestNode(p2p_node, std::make_unique(fuzzed_data_provider)); FillNode(fuzzed_data_provider, connman, p2p_node); const auto mock_time = ConsumeTime(fuzzed_data_provider); diff --git a/src/test/fuzz/process_messages.cpp b/src/test/fuzz/process_messages.cpp index 62f38967a391df..4de9acb5cf111b 100644 --- a/src/test/fuzz/process_messages.cpp +++ b/src/test/fuzz/process_messages.cpp @@ -59,7 +59,7 @@ FUZZ_TARGET(process_messages, .init = initialize_process_messages) FillNode(fuzzed_data_provider, connman, p2p_node); - connman.AddTestNode(p2p_node); + connman.AddTestNode(p2p_node, std::make_unique(fuzzed_data_provider)); } LIMITED_WHILE(fuzzed_data_provider.ConsumeBool(), 30) diff --git a/src/test/fuzz/util/net.h b/src/test/fuzz/util/net.h index 1a5902329e26a3..caa884b49b1ee2 100644 --- a/src/test/fuzz/util/net.h +++ b/src/test/fuzz/util/net.h @@ -109,7 +109,7 @@ template auto ConsumeNode(FuzzedDataProvider& fuzzed_data_provider, const std::optional& node_id_in = std::nullopt) noexcept { const NodeId node_id = node_id_in.value_or(fuzzed_data_provider.ConsumeIntegralInRange(0, std::numeric_limits::max())); - const auto sock = std::make_shared(fuzzed_data_provider); + //const auto sock = std::make_shared(fuzzed_data_provider); const CAddress address = ConsumeAddress(fuzzed_data_provider); const uint64_t keyed_net_group = fuzzed_data_provider.ConsumeIntegral(); const uint64_t local_host_nonce = fuzzed_data_provider.ConsumeIntegral(); @@ -120,7 +120,6 @@ auto ConsumeNode(FuzzedDataProvider& fuzzed_data_provider, const std::optional(node_id, - sock, address, keyed_net_group, local_host_nonce, @@ -131,7 +130,6 @@ auto ConsumeNode(FuzzedDataProvider& fuzzed_data_provider, const std::optional& nodes, PeerManager& peerman, Connm const bool inbound_onion{onion_peer && conn_type == ConnectionType::INBOUND}; nodes.emplace_back(new CNode{++id, - /*sock=*/nullptr, addr, /*nKeyedNetGroupIn=*/0, /*nLocalHostNonceIn=*/0, diff --git a/src/test/net_tests.cpp b/src/test/net_tests.cpp index 4c72d03ab18da9..fd6dfb6358303d 100644 --- a/src/test/net_tests.cpp +++ b/src/test/net_tests.cpp @@ -60,7 +60,6 @@ BOOST_AUTO_TEST_CASE(cnode_simple_test) std::string pszDest; std::unique_ptr pnode1 = std::make_unique(id++, - /*sock=*/nullptr, addr, /*nKeyedNetGroupIn=*/0, /*nLocalHostNonceIn=*/0, @@ -78,7 +77,6 @@ BOOST_AUTO_TEST_CASE(cnode_simple_test) BOOST_CHECK_EQUAL(pnode1->ConnectedThroughNetwork(), Network::NET_IPV4); std::unique_ptr pnode2 = std::make_unique(id++, - /*sock=*/nullptr, addr, /*nKeyedNetGroupIn=*/1, /*nLocalHostNonceIn=*/1, @@ -96,7 +94,6 @@ BOOST_AUTO_TEST_CASE(cnode_simple_test) BOOST_CHECK_EQUAL(pnode2->ConnectedThroughNetwork(), Network::NET_IPV4); std::unique_ptr pnode3 = std::make_unique(id++, - /*sock=*/nullptr, addr, /*nKeyedNetGroupIn=*/0, /*nLocalHostNonceIn=*/0, @@ -114,7 +111,6 @@ BOOST_AUTO_TEST_CASE(cnode_simple_test) BOOST_CHECK_EQUAL(pnode3->ConnectedThroughNetwork(), Network::NET_IPV4); std::unique_ptr pnode4 = std::make_unique(id++, - /*sock=*/nullptr, addr, /*nKeyedNetGroupIn=*/1, /*nLocalHostNonceIn=*/1, @@ -613,7 +609,6 @@ BOOST_AUTO_TEST_CASE(ipv4_peer_with_ipv6_addrMe_test) ipv4AddrPeer.s_addr = 0xa0b0c001; CAddress addr = CAddress(CService(ipv4AddrPeer, 7777), NODE_NETWORK); std::unique_ptr pnode = std::make_unique(/*id=*/0, - /*sock=*/nullptr, addr, /*nKeyedNetGroupIn=*/0, /*nLocalHostNonceIn=*/0, @@ -667,7 +662,6 @@ BOOST_AUTO_TEST_CASE(get_local_addr_for_peer_port) in_addr peer_out_in_addr; peer_out_in_addr.s_addr = htonl(0x01020304); CNode peer_out{/*id=*/0, - /*sock=*/nullptr, /*addrIn=*/CAddress{CService{peer_out_in_addr, 8333}, NODE_NETWORK}, /*nKeyedNetGroupIn=*/0, /*nLocalHostNonceIn=*/0, @@ -688,7 +682,6 @@ BOOST_AUTO_TEST_CASE(get_local_addr_for_peer_port) in_addr peer_in_in_addr; peer_in_in_addr.s_addr = htonl(0x05060708); CNode peer_in{/*id=*/0, - /*sock=*/nullptr, /*addrIn=*/CAddress{CService{peer_in_in_addr, 8333}, NODE_NETWORK}, /*nKeyedNetGroupIn=*/0, /*nLocalHostNonceIn=*/0, @@ -825,7 +818,6 @@ BOOST_AUTO_TEST_CASE(initial_advertise_from_version_message) in_addr peer_in_addr; peer_in_addr.s_addr = htonl(0x01020304); CNode peer{/*id=*/0, - /*sock=*/nullptr, /*addrIn=*/CAddress{CService{peer_in_addr, 8333}, NODE_NETWORK}, /*nKeyedNetGroupIn=*/0, /*nLocalHostNonceIn=*/0, @@ -900,7 +892,6 @@ BOOST_AUTO_TEST_CASE(advertise_local_address) { auto CreatePeer = [](const CAddress& addr) { return std::make_unique(/*id=*/0, - /*sock=*/nullptr, addr, /*nKeyedNetGroupIn=*/0, /*nLocalHostNonceIn=*/0, diff --git a/src/test/util/net.h b/src/test/util/net.h index f58e75b3c0722c..e8e96138cc9d86 100644 --- a/src/test/util/net.h +++ b/src/test/util/net.h @@ -50,6 +50,12 @@ struct ConnmanTestMsg : public CConnman { return m_nodes; } + void AddTestNode(CNode& node, std::unique_ptr&& sock) + { + TestOnlyAddExistentNode(node.GetId(), std::move(sock)); + AddTestNode(node); + } + void AddTestNode(CNode& node) { LOCK(m_nodes_mutex);