Skip to content

Commit

Permalink
Fixed a race condition and access violation on multithreaded journals
Browse files Browse the repository at this point in the history
  • Loading branch information
john-sharratt authored and theduke committed Jul 16, 2024
1 parent 78e1f82 commit ae54779
Show file tree
Hide file tree
Showing 5 changed files with 17 additions and 2 deletions.
2 changes: 1 addition & 1 deletion lib/wasix/src/journal/effector/memory_and_snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ impl JournalEffector {
)?;

// Break the region down into chunks that align with the resolution
let data = &memory.data_unchecked()[start..end];
let data = &memory.data_unchecked();
let mut offset = region.start;
while offset < region.end {
let next = region.end.min(offset + MEMORY_REGION_RESOLUTION);
Expand Down
5 changes: 5 additions & 0 deletions lib/wasix/src/journal/effector/save_event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,11 @@ impl JournalEffector {
) -> anyhow::Result<()> {
let env = ctx.data();
if !env.should_journal() {
tracing::trace!(
"skipping journal event save (enable={}, replaying={})",
env.enable_journal,
env.replaying_journal
);
return Ok(());
}

Expand Down
2 changes: 1 addition & 1 deletion lib/wasix/src/state/env.rs
Original file line number Diff line number Diff line change
Expand Up @@ -355,7 +355,7 @@ pub struct WasiEnv {
/// time that it will pause the CPU)
pub enable_exponential_cpu_backoff: Option<Duration>,

/// Flag that indicatees if the environment is currently replaying the journal
/// Flag that indicates if the environment is currently replaying the journal
/// (and hence it should not record new events)
pub replaying_journal: bool,

Expand Down
3 changes: 3 additions & 0 deletions lib/wasix/src/state/func_env.rs
Original file line number Diff line number Diff line change
Expand Up @@ -313,20 +313,23 @@ impl WasiFunctionEnv {
// prevent the initialization function from running
let restore_journals = self.data(&store).runtime.journals().clone();
if !restore_journals.is_empty() {
tracing::trace!("replaying journal=true");
self.data_mut(&mut store).replaying_journal = true;

for journal in restore_journals {
let ctx = self.env.clone().into_mut(&mut store);
let rewind = match restore_snapshot(ctx, journal, true) {
Ok(r) => r,
Err(err) => {
tracing::trace!("replaying journal=false (err={:?})", err);
self.data_mut(&mut store).replaying_journal = false;
return Err(err);
}
};
rewind_state = rewind.map(|rewind| (rewind, RewindResultType::RewindRestart));
}

tracing::trace!("replaying journal=false");
self.data_mut(&mut store).replaying_journal = false;
}

Expand Down
7 changes: 7 additions & 0 deletions lib/wasix/src/syscalls/journal/restore_snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,13 @@ pub unsafe fn restore_snapshot(
.map_err(anyhow_err_to_runtime_err)?;
}

// Once we get to this point we are no longer replaying the journal
// and need to clear this flag, the reason is that restoring the
// background threads may immediately process requests while this
// flag is still set which would be bad
tracing::trace!("replaying journal=false");
runner.ctx.data_mut().replaying_journal = false;

// Spawn all the threads
let thread_count = runner.spawn_threads.len();
tracing::trace!(thread_count, "restoring threads");
Expand Down

0 comments on commit ae54779

Please sign in to comment.