Skip to content

Commit

Permalink
implement most of the Playing state, vehicle positions, vehicle data
Browse files Browse the repository at this point in the history
  • Loading branch information
lionkor committed Jan 22, 2024
1 parent 630d5f1 commit 8654beb
Show file tree
Hide file tree
Showing 2 changed files with 261 additions and 21 deletions.
43 changes: 25 additions & 18 deletions include/Network.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#include "State.h"
#include "Sync.h"
#include "Transport.h"
#include "Util.h"
#include <boost/asio.hpp>
#include <boost/asio/execution_context.hpp>
#include <boost/asio/thread_pool.hpp>
Expand Down Expand Up @@ -54,6 +55,9 @@ struct Client {
/// but better than simply using the ID like the old protocol.
const uint64_t udp_magic;

const ip::udp::endpoint& udp_endpoint() const { return m_udp_endpoint; }
void set_udp_endpoint(const ip::udp::endpoint& ep) { m_udp_endpoint = ep; }

private:
/// Call this when the client seems to have timed out. Will send a ping and set a flag.
/// Returns true if try-again, false if the connection was closed.
Expand Down Expand Up @@ -86,6 +90,8 @@ struct Client {
std::vector<uint8_t> m_header { bmp::Header::SERIALIZED_SIZE };
bmp::Packet m_packet {};

ip::udp::endpoint m_udp_endpoint;

class Network& m_network;
};

Expand All @@ -94,6 +100,14 @@ struct Vehicle {
Sync<ClientID> owner;
Sync<std::vector<uint8_t>> data;

Vehicle(std::span<uint8_t> raw_data)
: data(std::vector<uint8_t>(raw_data.begin(), raw_data.end())) {
reset_status(data.get());
}

/// Resets all status fields to zero and reads any statuses present in the data into the fields.
void reset_status(std::span<const uint8_t> status_data);

struct Status {
glm::vec3 rvel {};
glm::vec4 rot {};
Expand All @@ -102,23 +116,9 @@ struct Vehicle {
float time {};
};

Status get_status() {
std::unique_lock lock(m_mtx);
refresh_cache(lock);
return {
.rvel = m_rvel,
.rot = m_rot,
.vel = m_vel,
.pos = m_pos,
.time = m_time,
};
}
Status get_status();

void update_status(const std::vector<uint8_t>& raw_packet) {
std::unique_lock lock(m_mtx);
m_needs_refresh = true;
m_status_data = raw_packet;
}
void update_status(std::span<const uint8_t> raw_packet);

const std::vector<uint8_t>& get_raw_status() const { return m_status_data; }

Expand All @@ -132,7 +132,7 @@ struct Vehicle {
/// Parses the status_data on request sets needs_refresh = false.
void refresh_cache(std::unique_lock<std::recursive_mutex>& lock);

bool m_needs_refresh = true;
bool m_needs_refresh = false;
glm::vec3 m_rvel {};
glm::vec4 m_rot {};
glm::vec3 m_vel {};
Expand Down Expand Up @@ -178,6 +178,12 @@ class Network {

size_t vehicle_count() const;

/// Creates a Playing state packet from uncompressed data.
bmp::Packet make_playing_packet(bmp::Purpose purpose, ClientID from_id, VehicleID veh_id, const std::vector<uint8_t>& data);

/// Sends a <System> or <Server> chat message to all or only one client(s).
void send_system_chat_message(const std::string& msg, ClientID to = 0xffffffff);

/// To be called by accept() async handler once an accept() is completed.
void accept();

Expand All @@ -192,7 +198,7 @@ class Network {
/// Reads a packet from the given UDP socket, returning the client's endpoint as an out-argument.
bmp::Packet udp_read(ip::udp::endpoint& out_ep);
/// Sends a packet to the specified UDP endpoint via the UDP socket.
void udp_write(bmp::Packet& packet, const ip::udp::endpoint& to_ep);
void udp_write(bmp::Packet& packet, const ip::udp::endpoint& ep);

void udp_read_main();
void tcp_listen_main();
Expand Down Expand Up @@ -220,6 +226,7 @@ class Network {
Sync<std::unordered_map<ip::udp::endpoint, ClientID>> m_udp_endpoints {};

ClientID new_client_id();
VehicleID new_vehicle_id();

thread_pool m_threadpool { std::thread::hardware_concurrency() };
Sync<bool> m_shutdown { false };
Expand Down
239 changes: 236 additions & 3 deletions src/Network.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -357,12 +357,12 @@ bmp::Packet Network::udp_read(ip::udp::endpoint& out_ep) {
return packet;
}

void Network::udp_write(bmp::Packet& packet, const ip::udp::endpoint& to_ep) {
void Network::udp_write(bmp::Packet& packet, const ip::udp::endpoint& ep) {
auto header = packet.finalize();
std::vector<uint8_t> data(header.size + bmp::Header::SERIALIZED_SIZE);
auto offset = header.serialize_to(data);
std::copy(packet.raw_data.begin(), packet.raw_data.end(), data.begin() + static_cast<long>(offset));
m_udp_socket.send_to(buffer(data), to_ep, {});
m_udp_socket.send_to(buffer(data), ep, {});
}

Network::Network()
Expand Down Expand Up @@ -484,6 +484,7 @@ void Network::udp_read_main() {
}
// not yet set! nice! set!
endpoints->emplace(ep, id);
client->set_udp_endpoint(ep);
// now transfer them to the next state
beammp_debugf("Client {} successfully connected via UDP", client->id);
// send state change and further stuff asynchronously - we dont really care here, just wanna
Expand Down Expand Up @@ -618,11 +619,132 @@ void Network::handle_packet(ClientID id, const bmp::Packet& packet) {
}

void Network::handle_playing(ClientID id, const bmp::Packet& packet, std::shared_ptr<Client>& client) {
switch (packet.purpose) {
auto decompressed = packet.get_readable_data();
uint16_t vid;
auto offset = bmp::deserialize(vid, decompressed);
// use this to operate on ;)
std::span<uint8_t> data(decompressed.begin() + long(offset), decompressed.end());

// kick?? if vehicle is not owned by this player
// TODO: What's the best course of action here?
if (vid != 0xffff && m_vehicles->at(vid)->owner != id) {
disconnect(id, fmt::format("Tried to change vehicle {}, but owner is {}", vid, id));
return;
}

switch (packet.purpose) {
case bmp::Purpose::VehicleSpawn: {
// TODO: Check vehicle spawn limit
auto vehicles = m_vehicles.synchronize();
// overwrite the vid since it's definitely invalid anyways
vid = new_vehicle_id();
auto vehicle = std::make_shared<Vehicle>(data);
vehicles->emplace(vid, vehicle);
beammp_debugf("Client {} spawned vehicle {}", id, vid);
break;
}
case bmp::Purpose::VehicleDelete: {
auto vehicles = m_vehicles.synchronize();
vehicles->erase(vid);
beammp_debugf("Client {} deleted vehicle {}", id, vid);
break;
}
case bmp::Purpose::VehicleReset: {
auto vehicles = m_vehicles.synchronize();
vehicles->at(vid)->reset_status(data);
beammp_debugf("Client {} reset vehicle {}", id, vid);
break;
}
case bmp::Purpose::VehicleEdited: {
auto vehicles = m_vehicles.synchronize();
auto veh_data = vehicles->at(vid)->data.synchronize();
veh_data->resize(data.size());
std::copy(data.begin(), data.end(), veh_data->begin());
beammp_debugf("Client {} edited vehicle {}", id, vid);
break;
}
case bmp::Purpose::VehicleCouplerChanged: {
break;
}
case bmp::Purpose::SpectatorSwitched: {
break;
}
case bmp::Purpose::ApplyInput: {
break;
}
case bmp::Purpose::ApplyElectrics: {
break;
}
case bmp::Purpose::ApplyNodes: {
break;
}
case bmp::Purpose::ApplyBreakgroups: {
break;
}
case bmp::Purpose::ApplyPowertrain: {
break;
}
case bmp::Purpose::ApplyPosition: {
auto vehicles = m_vehicles.synchronize();
vehicles->at(vid)->update_status(data);
break;
}
case bmp::Purpose::ChatMessage: {
std::string msg(data.begin(), data.end());
LogChatMessage(client->name.get(), int(client->id), msg);
break;
}
case bmp::Purpose::Event: {
break;
}
case bmp::Purpose::StateChangeLeaving: {
beammp_infof("Client {} leaving", id);
// TODO: Not implemented properly, change to state and leave cleanly instead.
disconnect(id, "Leaving");
break;
}
default:
beammp_errorf("Got 0x{:x} in state {}. This is not allowed. Disconnecting the client", uint16_t(packet.purpose), int(client->state.get()));
disconnect(id, "invalid purpose in current state");
break;
}
auto clients = playing_clients();
bmp::Packet broadcast {
.purpose = packet.purpose,
.flags = packet.flags,
.raw_data = {},
};
// 4 extra bytes for the pid header
broadcast.raw_data.resize(packet.raw_data.size() + 4);
// write pid
offset = bmp::serialize(uint32_t(id), broadcast.raw_data);
// write all raw data from the original - this is ok because we also copied the flags,
// so if this is compressed, we send it as compressed and don't try to compress it again.
std::copy(packet.raw_data.begin(), packet.raw_data.end(), broadcast.raw_data.begin() + long(offset));
// add playing header

// broadcast packets to clients based on their category
switch (bmp::category_of(packet.purpose)) {
case bmp::Category::Position:
for (auto& [this_id, this_client] : clients) {
udp_write(broadcast, this_client->udp_endpoint());
}
break;
case bmp::Category::None:
beammp_warnf("Category 'None' for packet purpose {} in state {} is unexpected", int(packet.purpose), int(client->state.get()));
break;
case bmp::Category::Nodes:
case bmp::Category::Vehicle:
case bmp::Category::Input:
case bmp::Category::Electrics:
case bmp::Category::Powertrain:
case bmp::Category::Chat:
case bmp::Category::Event:
default:
for (auto& [this_id, this_client] : clients) {
this_client->tcp_write(broadcast);
}
break;
}
}

Expand Down Expand Up @@ -1022,3 +1144,114 @@ ClientID Network::new_client_id() {
*id += 1;
return new_id;
}

VehicleID Network::new_vehicle_id() {
static Sync<VehicleID> s_id { 0 };
auto id = s_id.synchronize();
VehicleID new_id = *id;
*id += 1;
return new_id;
}

void Network::send_system_chat_message(const std::string& msg, ClientID to) {
auto packet = make_playing_packet(bmp::Purpose::ChatMessage, 0xffffffff, 0xffff, std::vector<uint8_t>(msg.begin(), msg.end()));
auto clients = playing_clients();
if (to != 0xffffffff) {
for (auto& [this_id, this_client] : clients) {
this_client->tcp_write(packet);
}
LogChatMessage("<System>", -1, msg);
} else {
for (auto& [this_id, this_client] : clients) {
if (this_id == to) {
this_client->tcp_write(packet);
break;
}
}
LogChatMessage(fmt::format("<System to {}>", to), -1, msg);
}
}

bmp::Packet Network::make_playing_packet(bmp::Purpose purpose, ClientID from_id, VehicleID veh_id, const std::vector<uint8_t>& data) {
bmp::Packet packet {
.purpose = purpose,
.raw_data = {},
};
packet.raw_data.resize(data.size() + 6);
auto offset = bmp::serialize(uint32_t(from_id), packet.raw_data);
offset += bmp::serialize(uint16_t(veh_id), std::span<uint8_t>(packet.raw_data.begin() + long(offset), packet.raw_data.end()));
std::copy(data.begin(), data.end(), packet.raw_data.begin() + long(offset));
return packet;
}
void Vehicle::reset_status(std::span<const uint8_t> status_data) {
auto json = nlohmann::json::parse(status_data.data());
if (json["rvel"].is_array()) {
auto array = json["rvel"].get<std::vector<float>>();
m_rvel = {
array.at(0),
array.at(1),
array.at(2)
};
} else {
m_rvel = {};
}

if (json["rot"].is_array()) {
auto array = json["rot"].get<std::vector<float>>();
m_rot = {
array.at(0),
array.at(1),
array.at(2),
array.at(3),
};
} else {
m_rot = {};
}

if (json["vel"].is_array()) {
auto array = json["vel"].get<std::vector<float>>();
m_vel = {
array.at(0),
array.at(1),
array.at(2)
};
} else {
m_vel = {};
}

if (json["pos"].is_array()) {
auto array = json["pos"].get<std::vector<float>>();
m_pos = {
array.at(0),
array.at(1),
array.at(2)
};
} else {
m_pos = {};
}

if (json["tim"].is_number()) {
m_time = json["tim"].get<float>();
} else {
m_time = {};
}
}

Vehicle::Status Vehicle::get_status() {
std::unique_lock lock(m_mtx);
refresh_cache(lock);
return {
.rvel = m_rvel,
.rot = m_rot,
.vel = m_vel,
.pos = m_pos,
.time = m_time,
};
}

void Vehicle::update_status(std::span<const uint8_t> raw_packet) {
std::unique_lock lock(m_mtx);
m_needs_refresh = true;
m_status_data.resize(raw_packet.size());
std::copy(raw_packet.begin(), raw_packet.end(), m_status_data.begin());
}

0 comments on commit 8654beb

Please sign in to comment.