Skip to content

Commit

Permalink
Revert "Split normal sockets and p2p sockets handling"
Browse files Browse the repository at this point in the history
This reverts commit 08c3a38.
  • Loading branch information
RipleyTom authored and Megamouse committed Apr 27, 2024
1 parent fcba193 commit b38ce94
Show file tree
Hide file tree
Showing 8 changed files with 119 additions and 202 deletions.
20 changes: 0 additions & 20 deletions rpcs3/Emu/Cell/lv2/sys_net/lv2_socket.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
#include "stdafx.h"
#include "lv2_socket.h"
#include "network_context.h"

LOG_CHANNEL(sys_net);

Expand Down Expand Up @@ -68,17 +67,6 @@ void lv2_socket::poll_queue(std::shared_ptr<ppu_thread> ppu, bs_t<lv2_socket::po
{
set_poll_event(event);
queue.emplace_back(std::move(ppu), poll_cb);

// Makes sure network_context thread is awaken
if (type == SYS_NET_SOCK_STREAM || type == SYS_NET_SOCK_DGRAM)
{
auto& nc = g_fxo->get<network_context>();
const u32 prev_value = nc.num_polls.fetch_add(1);
if (!prev_value)
{
nc.num_polls.notify_one();
}
}
}

s32 lv2_socket::clear_queue(ppu_thread* ppu)
Expand All @@ -93,14 +81,6 @@ s32 lv2_socket::clear_queue(ppu_thread* ppu)
{
it = queue.erase(it);
cleared++;

// Makes sure network_context thread can go back to sleep if there is no active polling
if (type == SYS_NET_SOCK_STREAM || type == SYS_NET_SOCK_DGRAM)
{
const u32 prev_value = g_fxo->get<network_context>().num_polls.fetch_sub(1);
ensure(prev_value);
}

continue;
}

Expand Down
10 changes: 6 additions & 4 deletions rpcs3/Emu/Cell/lv2/sys_net/lv2_socket_p2p.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -121,14 +121,16 @@ s32 lv2_socket_p2p::bind(const sys_net_sockaddr& addr)

socket_type real_socket{};

auto& nc = g_fxo->get<p2p_context>();
auto& nc = g_fxo->get<network_context>();
{
std::lock_guard list_lock(nc.list_p2p_ports_mutex);
if (!nc.list_p2p_ports.contains(p2p_port))
{
nc.list_p2p_ports.emplace(std::piecewise_construct, std::forward_as_tuple(p2p_port), std::forward_as_tuple(p2p_port));
}

nc.create_p2p_port(p2p_port);
auto& pport = ::at32(nc.list_p2p_ports, p2p_port);
real_socket = pport.p2p_socket;

{
std::lock_guard lock(pport.bound_p2p_vports_mutex);

Expand Down Expand Up @@ -330,7 +332,7 @@ void lv2_socket_p2p::close()
return;
}

auto& nc = g_fxo->get<p2p_context>();
auto& nc = g_fxo->get<network_context>();
{
std::lock_guard lock(nc.list_p2p_ports_mutex);
ensure(nc.list_p2p_ports.contains(port));
Expand Down
18 changes: 10 additions & 8 deletions rpcs3/Emu/Cell/lv2/sys_net/lv2_socket_p2ps.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -445,7 +445,7 @@ bool lv2_socket_p2ps::handle_listening(p2ps_encapsulated_tcp* tcp_header, [[mayb
const u64 key_connected = (reinterpret_cast<struct sockaddr_in*>(op_addr)->sin_addr.s_addr) | (static_cast<u64>(tcp_header->src_port) << 48) | (static_cast<u64>(tcp_header->dst_port) << 32);

{
auto& nc = g_fxo->get<p2p_context>();
auto& nc = g_fxo->get<network_context>();
auto& pport = ::at32(nc.list_p2p_ports, port);
pport.bound_p2p_streams.emplace(key_connected, new_sock_id);
}
Expand Down Expand Up @@ -593,14 +593,16 @@ s32 lv2_socket_p2ps::bind(const sys_net_sockaddr& addr)

socket_type real_socket{};

auto& nc = g_fxo->get<p2p_context>();
auto& nc = g_fxo->get<network_context>();
{
std::lock_guard list_lock(nc.list_p2p_ports_mutex);
if (!nc.list_p2p_ports.contains(p2p_port))
{
nc.list_p2p_ports.emplace(std::piecewise_construct, std::forward_as_tuple(p2p_port), std::forward_as_tuple(p2p_port));
}

nc.create_p2p_port(p2p_port);
auto& pport = ::at32(nc.list_p2p_ports, p2p_port);
real_socket = pport.p2p_socket;

{
// Ensures the socket & the bound list are updated at the same time to avoid races
std::lock_guard vport_lock(pport.bound_p2p_vports_mutex);
Expand Down Expand Up @@ -681,14 +683,14 @@ std::optional<s32> lv2_socket_p2ps::connect(const sys_net_sockaddr& addr)

socket_type real_socket{};

auto& nc = g_fxo->get<p2p_context>();
auto& nc = g_fxo->get<network_context>();
{
std::lock_guard list_lock(nc.list_p2p_ports_mutex);
if (!nc.list_p2p_ports.contains(port))
nc.list_p2p_ports.emplace(std::piecewise_construct, std::forward_as_tuple(port), std::forward_as_tuple(port));

nc.create_p2p_port(port);
auto& pport = ::at32(nc.list_p2p_ports, port);
real_socket = pport.p2p_socket;

{
std::lock_guard lock(pport.bound_p2p_vports_mutex);
if (vport == 0)
Expand Down Expand Up @@ -855,7 +857,7 @@ void lv2_socket_p2ps::close()
return;
}

auto& nc = g_fxo->get<p2p_context>();
auto& nc = g_fxo->get<network_context>();
{
std::lock_guard lock(nc.list_p2p_ports_mutex);
ensure(nc.list_p2p_ports.contains(port));
Expand Down
128 changes: 49 additions & 79 deletions rpcs3/Emu/Cell/lv2/sys_net/network_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,13 @@ LOG_CHANNEL(sys_net);
s32 send_packet_from_p2p_port(const std::vector<u8>& data, const sockaddr_in& addr)
{
s32 res{};
auto& nc = g_fxo->get<p2p_context>();
auto& nc = g_fxo->get<network_context>();
{
std::lock_guard list_lock(nc.list_p2p_ports_mutex);
if (nc.list_p2p_ports.contains(SCE_NP_PORT))
{
auto& def_port = ::at32(nc.list_p2p_ports, SCE_NP_PORT);
res = ::sendto(def_port.p2p_socket, reinterpret_cast<const char*>(data.data()), ::size32(data), 0, reinterpret_cast<const sockaddr*>(&addr), sizeof(sockaddr_in));
res = ::sendto(def_port.p2p_socket, reinterpret_cast<const char*>(data.data()), ::size32(data), 0, reinterpret_cast<const sockaddr*>(&addr), sizeof(sockaddr_in));

if (res == -1)
sys_net.error("Failed to send signaling packet: %s", get_last_error(false, false));
Expand All @@ -35,7 +35,7 @@ s32 send_packet_from_p2p_port(const std::vector<u8>& data, const sockaddr_in& ad
std::vector<std::vector<u8>> get_rpcn_msgs()
{
std::vector<std::vector<u8>> msgs;
auto& nc = g_fxo->get<p2p_context>();
auto& nc = g_fxo->get<network_context>();
{
std::lock_guard list_lock(nc.list_p2p_ports_mutex);
if (nc.list_p2p_ports.contains(SCE_NP_PORT))
Expand All @@ -59,7 +59,7 @@ std::vector<std::vector<u8>> get_rpcn_msgs()
std::vector<signaling_message> get_sign_msgs()
{
std::vector<signaling_message> msgs;
auto& nc = g_fxo->get<p2p_context>();
auto& nc = g_fxo->get<network_context>();
{
std::lock_guard list_lock(nc.list_p2p_ports_mutex);
if (nc.list_p2p_ports.contains(SCE_NP_PORT))
Expand All @@ -85,15 +85,15 @@ namespace np
void init_np_handler_dependencies();
}

p2p_thread::p2p_thread()
network_thread::network_thread()
{
np::init_np_handler_dependencies();
}

void p2p_thread::bind_sce_np_port()
void network_thread::bind_sce_np_port()
{
std::lock_guard list_lock(list_p2p_ports_mutex);
create_p2p_port(SCE_NP_PORT);
list_p2p_ports.emplace(std::piecewise_construct, std::forward_as_tuple(SCE_NP_PORT), std::forward_as_tuple(SCE_NP_PORT));
}

void network_thread::operator()()
Expand All @@ -109,14 +109,10 @@ void network_thread::operator()()
std::vector<bool> was_connecting(lv2_socket::id_count);
#endif

std::vector<::pollfd> p2p_fd(lv2_socket::id_count);

while (thread_ctrl::state() != thread_state::aborting)
{
if (!num_polls)
{
thread_ctrl::wait_on(num_polls, 0);
continue;
}

ensure(socklist.size() <= lv2_socket::id_count);

// Wait with 1ms timeout
Expand All @@ -126,6 +122,46 @@ void network_thread::operator()()
::poll(fds.data(), socklist.size(), 1);
#endif

// Check P2P sockets for incoming packets(timeout could probably be set at 0)
{
std::lock_guard lock(list_p2p_ports_mutex);
std::memset(p2p_fd.data(), 0, p2p_fd.size() * sizeof(::pollfd));
auto num_p2p_sockets = 0;
for (const auto& p2p_port : list_p2p_ports)
{
p2p_fd[num_p2p_sockets].events = POLLIN;
p2p_fd[num_p2p_sockets].revents = 0;
p2p_fd[num_p2p_sockets].fd = p2p_port.second.p2p_socket;
num_p2p_sockets++;
}

if (num_p2p_sockets)
{
#ifdef _WIN32
const auto ret_p2p = WSAPoll(p2p_fd.data(), num_p2p_sockets, 1);
#else
const auto ret_p2p = ::poll(p2p_fd.data(), num_p2p_sockets, 1);
#endif
if (ret_p2p > 0)
{
auto fd_index = 0;
for (auto& p2p_port : list_p2p_ports)
{
if ((p2p_fd[fd_index].revents & POLLIN) == POLLIN || (p2p_fd[fd_index].revents & POLLRDNORM) == POLLRDNORM)
{
while (p2p_port.second.recv_data())
;
}
fd_index++;
}
}
else if (ret_p2p < 0)
{
sys_net.error("[P2P] Error poll on master P2P socket: %d", get_last_error(false));
}
}
}

std::lock_guard lock(s_nw_mutex);

for (usz i = 0; i < socklist.size(); i++)
Expand Down Expand Up @@ -180,69 +216,3 @@ void network_thread::operator()()
}
}
}

// Must be used under list_p2p_ports_mutex lock!
void p2p_thread::create_p2p_port(u16 p2p_port)
{
if (!list_p2p_ports.contains(p2p_port))
{
list_p2p_ports.emplace(std::piecewise_construct, std::forward_as_tuple(p2p_port), std::forward_as_tuple(p2p_port));
const u32 prev_value = num_p2p_ports.fetch_add(1);
if (!prev_value)
{
num_p2p_ports.notify_one();
}
}
}

void p2p_thread::operator()()
{
std::vector<::pollfd> p2p_fd(lv2_socket::id_count);

while (thread_ctrl::state() != thread_state::aborting)
{
if (!num_p2p_ports)
{
thread_ctrl::wait_on(num_p2p_ports, 0);
continue;
}

// Check P2P sockets for incoming packets
auto num_p2p_sockets = 0;
{
std::lock_guard lock(list_p2p_ports_mutex);
std::memset(p2p_fd.data(), 0, p2p_fd.size() * sizeof(::pollfd));
for (const auto& p2p_port : list_p2p_ports)
{
p2p_fd[num_p2p_sockets].events = POLLIN;
p2p_fd[num_p2p_sockets].revents = 0;
p2p_fd[num_p2p_sockets].fd = p2p_port.second.p2p_socket;
num_p2p_sockets++;
}
}

#ifdef _WIN32
const auto ret_p2p = WSAPoll(p2p_fd.data(), num_p2p_sockets, 1);
#else
const auto ret_p2p = ::poll(p2p_fd.data(), num_p2p_sockets, 1);
#endif
if (ret_p2p > 0)
{
std::lock_guard lock(list_p2p_ports_mutex);
auto fd_index = 0;
for (auto& p2p_port : list_p2p_ports)
{
if ((p2p_fd[fd_index].revents & POLLIN) == POLLIN || (p2p_fd[fd_index].revents & POLLRDNORM) == POLLRDNORM)
{
while (p2p_port.second.recv_data())
;
}
fd_index++;
}
}
else if (ret_p2p < 0)
{
sys_net.error("[P2P] Error poll on master P2P socket: %d", get_last_error(false));
}
}
}
19 changes: 3 additions & 16 deletions rpcs3/Emu/Cell/lv2/sys_net/network_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,28 +11,15 @@ struct network_thread
{
std::vector<ppu_thread*> s_to_awake;
shared_mutex s_nw_mutex;
atomic_t<u32> num_polls = 0;

static constexpr auto thread_name = "Network Thread";

void operator()();
};

struct p2p_thread
{
shared_mutex list_p2p_ports_mutex;
std::map<u16, nt_p2p_port> list_p2p_ports;
atomic_t<u32> num_p2p_ports = 0;

static constexpr auto thread_name = "Network P2P Thread";
std::map<u16, nt_p2p_port> list_p2p_ports{};

p2p_thread();

void create_p2p_port(u16 p2p_port);
static constexpr auto thread_name = "Network Thread";

network_thread();
void bind_sce_np_port();
void operator()();
};

using network_context = named_thread<network_thread>;
using p2p_context = named_thread<p2p_thread>;
14 changes: 2 additions & 12 deletions rpcs3/Emu/NP/np_handler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -408,11 +408,11 @@ namespace np

void np_handler::init_np_handler_dependencies()
{
if (is_psn_active && g_cfg.net.psn_status == np_psn_status::psn_rpcn && g_fxo->is_init<p2p_context>() && !m_inited_np_handler_dependencies)
if (is_psn_active && g_cfg.net.psn_status == np_psn_status::psn_rpcn && g_fxo->is_init<network_context>() && !m_inited_np_handler_dependencies)
{
m_inited_np_handler_dependencies = true;

auto& nc = g_fxo->get<p2p_context>();
auto& nc = g_fxo->get<network_context>();
nc.bind_sce_np_port();

std::lock_guard lock(mutex_rpcn);
Expand Down Expand Up @@ -817,16 +817,6 @@ namespace np
string_to_online_name(rpcn->get_online_name(), online_name);
string_to_avatar_url(rpcn->get_avatar_url(), avatar_url);
public_ip_addr = rpcn->get_addr_sig();

if (!public_ip_addr)
{
rsx::overlays::queue_message(rpcn::rpcn_state_to_localized_string_id(rpcn::rpcn_state::failure_other));
rpcn_log.error("Failed to get a reply from RPCN signaling!");
is_psn_active = false;
rpcn->terminate_connection();
return;
}

local_ip_addr = std::bit_cast<u32, be_t<u32>>(rpcn->get_addr_local());

break;
Expand Down
Loading

0 comments on commit b38ce94

Please sign in to comment.