Skip to content

Commit

Permalink
[runtime] Enhancement: Adaptable timer res
Browse files Browse the repository at this point in the history
  • Loading branch information
anandbonde committed Jul 14, 2024
1 parent c037af7 commit 582652e
Showing 1 changed file with 66 additions and 15 deletions.
81 changes: 66 additions & 15 deletions src/rust/runtime/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ pub use queue::{
QType,
};
pub use scheduler::TaskId;
use x86::time::rdtscp;

#[cfg(feature = "libdpdk")]
pub use dpdk_rs as libdpdk;
Expand Down Expand Up @@ -82,16 +83,21 @@ use ::std::{
SystemTime,
},
};
use std::pin::Pin;
use std::{
cmp::{
max,
min,
},
pin::Pin,
};

//======================================================================================================================
// Constants
//======================================================================================================================

// TODO: Make this more accurate using rdtsc.
// FIXME: https://github.com/microsoft/demikernel/issues/1226
const TIMER_RESOLUTION: usize = 64;
const TIMER_FINER_RESOLUTION: usize = 2;
const MIN_TIMER_RESOLUTION: usize = 64;
const MAX_TIMER_RESOLUTION: usize = 64_000;
const FINE_TIMER_RESOLUTION: usize = 2;

//======================================================================================================================
// Structures
Expand All @@ -109,6 +115,12 @@ pub struct DemiRuntime {
network_table: NetworkQueueTable,
/// Number of iterations that we have polled since advancing the clock.
ts_iters: usize,
/// Last tsc counter value.
last_tsc: u64,
/// List time instant.
last_time: Instant,
/// Timer resolution.
timer_resolution: usize,
/// Tasks that have been completed and removed from the
completed_tasks: HashMap<QToken, (QDesc, OperationResult)>,
}
Expand Down Expand Up @@ -146,6 +158,12 @@ impl SharedDemiRuntime {
ephemeral_ports: EphemeralPorts::default(),
network_table: NetworkQueueTable::default(),
ts_iters: 0,
last_tsc: unsafe {
let (rdtscp, _) = rdtscp();
rdtscp
},
last_time: Instant::now(),
timer_resolution: MIN_TIMER_RESOLUTION,
completed_tasks: HashMap::<QToken, (QDesc, OperationResult)>::new(),
}))
}
Expand Down Expand Up @@ -212,11 +230,12 @@ impl SharedDemiRuntime {
return Err(Fail::new(libc::EINVAL, &cause));
}

// 2. None of the tasks have already completed, so start a timer and move the clock.
self.advance_clock_to_now();
// None of the tasks have already completed.

loop {
if let Some(boxed_task) = self.scheduler.get_next_completed_task(TIMER_RESOLUTION) {
self.advance_clock_to_now();
let max_iterations = self.timer_resolution;
if let Some(boxed_task) = self.scheduler.get_next_completed_task(max_iterations) {
// Perform bookkeeping for the completed and removed task.
trace!("Removing coroutine: {:?}", boxed_task.get_name());
let completed_qt: QToken = boxed_task.get_id().into();
Expand All @@ -240,9 +259,6 @@ impl SharedDemiRuntime {
return Err(Fail::new(libc::ETIMEDOUT, "wait timed out"));
}
}

// Advance the clock and continue running tasks.
self.advance_clock_to_now();
}
}

Expand Down Expand Up @@ -359,8 +375,8 @@ impl SharedDemiRuntime {
/// the clock.
fn run_next(&mut self, timeout: Duration) -> Option<(QToken, QDesc, OperationResult)> {
let iterations: usize = match timeout {
timeout if timeout.as_secs() > 0 => TIMER_RESOLUTION,
_ => TIMER_FINER_RESOLUTION,
timeout if timeout.as_secs() > 0 => self.timer_resolution,
_ => FINE_TIMER_RESOLUTION,
};
if let Some(boxed_task) = self.scheduler.get_next_completed_task(iterations) {
// Perform bookkeeping for the completed and removed task.
Expand Down Expand Up @@ -478,15 +494,16 @@ impl SharedDemiRuntime {

/// Moves time forward deterministically.
pub fn advance_clock(&mut self, now: Instant) {
timer::global_advance_clock(now)
timer::global_advance_clock(now);
self.adjust_time_resolution(now);
}

/// Moves time forward to the current real time.
fn advance_clock_to_now(&mut self) {
if self.ts_iters == 0 {
self.advance_clock(Instant::now());
}
self.ts_iters = (self.ts_iters + 1) % TIMER_RESOLUTION;
self.ts_iters = (self.ts_iters + 1) % self.timer_resolution;
}

/// Gets the current time according to our internal timer.
Expand Down Expand Up @@ -535,6 +552,34 @@ impl SharedDemiRuntime {
trace!("Check address in use: {:?}", local);
self.network_table.addr_in_use(local)
}

fn adjust_time_resolution(&mut self, now: Instant) {
let curr_tsc: u64 = unsafe {
let (tsc, _): (u64, u32) = rdtscp();
tsc
};
let cycles: usize = (curr_tsc - self.last_tsc) as usize;

let time_in_seconds = (now - self.last_time).as_secs();
if time_in_seconds == 0 {
return;
}

let cycles_per_second: usize = cycles / time_in_seconds as usize;
if cycles_per_second == 0 {
return;
}

let cycles_per_quanta: usize = cycles_per_second / MIN_TIMER_RESOLUTION;
if cycles_per_quanta == 0 {
return;
}

self.timer_resolution = max(MIN_TIMER_RESOLUTION, min(MAX_TIMER_RESOLUTION, cycles_per_quanta));
trace!("Adjusted timer resolution to: {:?}", self.timer_resolution);
self.last_tsc = curr_tsc;
self.last_time = now;
}
}

impl<T> SharedObject<T> {
Expand Down Expand Up @@ -597,6 +642,12 @@ impl Default for SharedDemiRuntime {
ephemeral_ports: EphemeralPorts::default(),
network_table: NetworkQueueTable::default(),
ts_iters: 0,
last_tsc: unsafe {
let (tsc, _) = rdtscp();
tsc
},
last_time: Instant::now(),
timer_resolution: MIN_TIMER_RESOLUTION,
completed_tasks: HashMap::<QToken, (QDesc, OperationResult)>::new(),
}))
}
Expand Down

0 comments on commit 582652e

Please sign in to comment.