Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf: use thread_local for encoding packets #47

Merged
merged 5 commits into from
Mar 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ jobs:

- name: Setup Rust toolchain and cache
uses: actions-rust-lang/setup-rust-toolchain@v1
with:
toolchain: ${{ env.RUST_NIGHTLY_TOOLCHAIN }}

- name: Run cargo test
run: cargo test --workspace --all-features
Expand Down
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ jemalloc-ctl = "0.5.4"
# jemalloc uses significantly less memory
#mimalloc = { version = "0.1.39" , default-features = false }

thread_local = { version = "1.1.8", features = ["nightly"] }


[lints.rust]
warnings = "deny"
Expand Down Expand Up @@ -123,6 +125,7 @@ cast_precision_loss = "allow" # consider denying
missing_errors_doc = "allow" # consider denying
wildcard_imports = "allow"
non_ascii_literal = "allow"
no_mangle_with_rust_abi = "allow"

perf = "deny"

Expand Down
6 changes: 0 additions & 6 deletions server/benches/many_zombies.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,6 @@ use divan::Bencher;
use server::{bounding_box::BoundingBox, FullEntityPose, Game, InitEntity, Targetable};
use valence_protocol::math::DVec3;

// #[global_allocator]
// static ALLOC: AllocProfiler = AllocProfiler::system();

#[global_allocator]
static ALLOC: mimalloc::MiMalloc = mimalloc::MiMalloc;

fn main() {
// Run registered benchmarks.
divan::main();
Expand Down
68 changes: 3 additions & 65 deletions server/src/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,17 +77,6 @@ pub struct WriterComm {
enc: PacketEncoder,
}

pub fn encode_packet<P>(pkt: &P) -> anyhow::Result<bytes::Bytes>
where
P: valence_protocol::Packet + Encode,
{
let mut enc = PacketEncoder::default();
enc.append_packet(pkt)?;
let bytes = enc.take();

Ok(bytes.freeze())
}

type ReaderComm = flume::Receiver<PacketFrame>;

impl WriterComm {
Expand Down Expand Up @@ -394,60 +383,9 @@ impl Io {

monoio::spawn(async move {
while let Ok(bytes) = s2c_rx.recv_async().await {
// if macos
// if there are multiple elements in the channel, batch them.
// This is especially useful on macOS which does not support
// io_uring and has a high cost for each write (context switch for each syscall).
#[cfg(target_os = "macos")]
{
if s2c_rx.is_empty() {
if let Err(e) = io_write.send_packet(bytes).await {
error!("{e:?}");
break;
}
continue;
}

let mut byte_collect = bytes.to_vec();

// we are using drain so we do not go in infinite loop
for other_byte in s2c_rx.drain() {
let other_byte = other_byte.to_vec();
// todo: or extend slice
byte_collect.extend(other_byte);
}

let bytes = bytes::Bytes::from(byte_collect);

if let Err(e) = io_write.send_packet(bytes).await {
error!("{e:?}");
break;
}
continue;
}

// if linux
#[cfg(target_os = "linux")]
{
if let Err(e) = io_write.send_packet(bytes).await {
error!("{e:?}");
break;
}
continue;
}

// if windows panic
#[cfg(target_os = "windows")]
{
panic!("windows not supported");
continue;
}

// if other panic
#[cfg(not(any(target_os = "macos", target_os = "linux", target_os = "windows")))]
{
panic!("unsupported os");
continue;
if let Err(e) = io_write.send_packet(bytes).await {
error!("{e:?}");
break;
}
}
});
Expand Down
24 changes: 16 additions & 8 deletions server/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
#![allow(clippy::many_single_char_names)]
#![feature(thread_local)]

#[global_allocator]
static ALLOC: jemallocator::Jemalloc = jemallocator::Jemalloc;

extern crate core;
mod chunk;
mod singleton;

use std::{
cell::UnsafeCell,
Expand All @@ -16,13 +18,15 @@ use std::{
use anyhow::Context;
use evenio::prelude::*;
use jemalloc_ctl::{epoch, stats};
use ndarray::s;
use signal_hook::iterator::Signals;
use tracing::{info, instrument, warn};
use valence_protocol::math::DVec3;

use crate::{
bounding_box::BoundingBox,
io::{server, ClientConnection, Packets},
singleton::encoder::Encoder,
};

mod global;
Expand Down Expand Up @@ -97,8 +101,8 @@ struct KillAllEntities;
#[derive(Event, Copy, Clone)]
#[allow(dead_code)]
struct StatsEvent {
ms_per_tick_mean: f64,
ms_per_tick_std_dev: f64,
ms_per_tick_mean_1s: f64,
ms_per_tick_mean_5s: f64,
allocated: usize,
resident: usize,
}
Expand Down Expand Up @@ -173,6 +177,9 @@ impl Game {
let bounding_boxes = world.spawn();
world.insert(bounding_boxes, bounding_box::EntityBoundingBoxes::default());

let encoder = world.spawn();
world.insert(encoder, Encoder::default());

let mut game = Self {
world,
last_ticks: VecDeque::default(),
Expand Down Expand Up @@ -220,7 +227,7 @@ impl Game {
#[instrument(skip_all)]
pub fn tick(&mut self) {
const LAST_TICK_HISTORY_SIZE: usize = 100;
const MSPT_HISTORY_SIZE: usize = 20;
const MSPT_HISTORY_SIZE: usize = 100;

let now = Instant::now();
self.last_ticks.push_back(now);
Expand Down Expand Up @@ -257,10 +264,11 @@ impl Game {

if self.last_ms_per_tick.len() > MSPT_HISTORY_SIZE {
// efficient
let arr = ndarray::Array::from_iter(self.last_ms_per_tick.iter().copied());
let arr = ndarray::Array::from_iter(self.last_ms_per_tick.iter().copied().rev());

let std_dev = arr.std(0.0);
let mean = arr.mean().unwrap();
// last 1 second (20 ticks) 5 seconds (100 ticks) and 25 seconds (500 ticks)
let mean_1_second = arr.slice(s![..20]).mean().unwrap();
let mean_5_seconds = arr.slice(s![..100]).mean().unwrap();

let allocated = stats::allocated::mib().unwrap();
let resident = stats::resident::mib().unwrap();
Expand All @@ -276,8 +284,8 @@ impl Game {
let resident = resident.read().unwrap();

self.world.send(StatsEvent {
ms_per_tick_mean: mean,
ms_per_tick_std_dev: std_dev,
ms_per_tick_mean_1s: mean_1_second,
ms_per_tick_mean_5s: mean_5_seconds,
allocated,
resident,
});
Expand Down
1 change: 1 addition & 0 deletions server/src/singleton.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
pub mod encoder;
60 changes: 60 additions & 0 deletions server/src/singleton/encoder.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
// https://stackoverflow.com/a/61681112/4889030
// https://matklad.github.io/2020/10/03/fast-thread-locals-in-rust.html
use std::cell::UnsafeCell;

use evenio::component::Component;
use thread_local::ThreadLocal;
use valence_protocol::{Encode, Packet, PacketEncoder};

#[derive(Default, Component)]
pub struct Encoder {
local: ThreadLocal<UnsafeCell<PacketEncoder>>,
}

impl Encoder {
pub fn append<P: Packet + Encode>(&self, packet: &P) -> anyhow::Result<()> {
let encoder = self.local.get_or_default();

// Safety:
// The use of `unsafe` here is justified by the guarantees provided by the `ThreadLocal` and
// `UnsafeCell` usage patterns:
// 1. Thread-local storage ensures that the `UnsafeCell<PacketEncoder>` is accessed only
// within the context of a single thread, eliminating the risk of concurrent access
// violations.
// 2. `UnsafeCell` is the fundamental building block for mutable shared state in Rust. By
// using `UnsafeCell`, we're explicitly signaling that the contained value
// (`PacketEncoder`) may be mutated through a shared reference. This is necessary because
// Rust's borrowing rules disallow mutable aliasing, which would be violated if we
// attempted to mutate through a shared reference without `UnsafeCell`.
// 3. The dereference of `encoder.get()` to obtain a mutable reference to the
// `PacketEncoder` (`&mut *encoder.get()`) is safe under the assumption that no other
// references to the `PacketEncoder` are concurrently alive. This assumption is upheld by
// the `ThreadLocal` storage, ensuring that the mutable reference is exclusive to the
// current thread.
// Therefore, the use of `unsafe` is encapsulated within this method and does not leak
// unsafe guarantees to the caller, provided the `Encoder` struct itself is used in a
// thread-safe manner.
let encoder = unsafe { &mut *encoder.get() };
encoder.append_packet(packet)
}

pub fn drain(&mut self) -> impl Iterator<Item = bytes::Bytes> + '_ {
self.local.iter_mut().map(|encoder| {
let encoder = encoder.get_mut();
encoder.take().freeze()
})
}
}

#[cfg(test)]
mod tests {
use std::panic::{RefUnwindSafe, UnwindSafe};

use crate::singleton::encoder::Encoder;

const fn _assert_auto_trait_impls()
where
Encoder: Send + Sync + UnwindSafe + RefUnwindSafe,
{
}
}
20 changes: 12 additions & 8 deletions server/src/system/entity_move_logic.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use evenio::{
entity::EntityId,
event::Receiver,
fetch::Fetcher,
fetch::{Fetcher, Single},
query::{Not, Query, With},
rayon::prelude::*,
};
Expand All @@ -12,7 +12,7 @@ use valence_protocol::{
};

use crate::{
io::encode_packet, EntityReaction, FullEntityPose, Gametick, MinecraftEntity, Player,
singleton::encoder::Encoder, EntityReaction, FullEntityPose, Gametick, MinecraftEntity, Player,
RunningSpeed, Targetable,
};

Expand Down Expand Up @@ -46,6 +46,7 @@ pub fn entity_move_logic(
&Player, // 3
Not<&MinecraftEntity>, // not 1
)>,
encoder: Single<&mut Encoder>,
) {
use valence_protocol::packets::play;

Expand All @@ -56,6 +57,8 @@ pub fn entity_move_logic(

let target = target.position;

let encoder = encoder.0;

entities.par_iter_mut().for_each(|query| {
let EntityQuery {
id,
Expand Down Expand Up @@ -114,12 +117,13 @@ pub fn entity_move_logic(
};

// todo: remove unwrap
let pos = encode_packet(&pos).unwrap();
let look = encode_packet(&look).unwrap();
encoder.append(&pos).unwrap();
encoder.append(&look).unwrap();
});

player.iter().for_each(|(_, player, ..)| {
let _ = player.packets.writer.send_raw(pos.clone());
let _ = player.packets.writer.send_raw(look.clone());
for bytes in encoder.drain() {
player.par_iter().for_each(|(_, player, _)| {
let _ = player.packets.writer.send_raw(bytes.clone());
});
});
}
}
3 changes: 2 additions & 1 deletion server/src/system/init_entity.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ pub fn init_entity(
};

players.iter_mut().for_each(|player| {
player.packets.writer.send_packet(&pkt).unwrap();
// todo: handle error
let _ = player.packets.writer.send_packet(&pkt);
});
}
2 changes: 0 additions & 2 deletions server/src/system/reset_bounding_boxes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,6 @@ pub struct EntityQuery<'a> {
pose: &'a mut FullEntityPose,
}

// #[no_mangle]
// #[instrument(skip_all, name = "reset_bounding_boxes")]
pub fn reset_bounding_boxes(
_: Receiver<Gametick>,
entity_bounding_boxes: Single<&mut EntityBoundingBoxes>,
Expand Down
15 changes: 8 additions & 7 deletions server/src/system/tps_message.rs
Original file line number Diff line number Diff line change
@@ -1,24 +1,25 @@
use evenio::prelude::*;
use tracing::instrument;

use crate::{bytes_to_mb, Player, StatsEvent};
use crate::{bytes_to_mb, FullEntityPose, Player, StatsEvent};

#[instrument(skip_all, name = "tps_message")]
pub fn tps_message(r: Receiver<StatsEvent>, mut players: Fetcher<&mut Player>) {
pub fn tps_message(r: Receiver<StatsEvent>, mut players: Fetcher<(&mut Player, &FullEntityPose)>) {
let StatsEvent {
ms_per_tick_mean,
ms_per_tick_std_dev,
ms_per_tick_mean_1s,
ms_per_tick_mean_5s,
resident,
..
} = r.event;

// let allocated = bytes_to_mb(*allocated);
let resident = bytes_to_mb(*resident);

// make sexy with stddev & mean symbol
let message = format!("µ={ms_per_tick_mean:.2}, σ={ms_per_tick_std_dev:.2}, {resident:.2}MiB");
players.iter_mut().for_each(|(player, _)| {
// make sexy with stddev & mean symbol
let message =
format!("µms {ms_per_tick_mean_1s:.2} {ms_per_tick_mean_5s:.2}, {resident:.2}MiB");

players.iter_mut().for_each(|player| {
// todo: handle error
let _ = player.packets.writer.send_chat_message(&message);
});
Expand Down