Skip to content

Commit

Permalink
🎨 - Use Queue instead of Futures
Browse files Browse the repository at this point in the history
  • Loading branch information
rolandpeelen committed Jul 7, 2023
1 parent 4bd6290 commit 41c66eb
Show file tree
Hide file tree
Showing 6 changed files with 228 additions and 32 deletions.
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,5 @@ pub mod clean;
pub mod helpers;
pub mod logs;
pub mod package_tree;
pub mod queue;
pub mod watcher;
1 change: 1 addition & 0 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ pub mod clean;
pub mod helpers;
pub mod logs;
pub mod package_tree;
pub mod queue;
pub mod watcher;

fn main() {
Expand Down
175 changes: 175 additions & 0 deletions src/queue.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
use std::{
collections::VecDeque,
sync::{Condvar, Mutex},
};

/// A trait describing the general behaviour of a Queue
pub trait Queue<T> {
/// Creates a new, empty queue
fn new() -> Self;

/// Push a new item in the queue
fn push(&self, value: T);

/// Removes an item from the queue
fn pop(&self) -> T;

/// Returns the size of the queue
fn len(&self) -> usize;

/// checks if the queue is empty
fn is_empty(&self) -> bool;
}

/// A FIFO queue implemented using a VecDeque and a Mutex
#[derive(Debug)]
pub struct FifoQueue<T> {
/// The underlying data structure of the queue
data: Mutex<VecDeque<T>>,
cv: Condvar,
}

impl<T> Queue<T> for FifoQueue<T> {
/// Creates a new, empty queue
fn new() -> Self {
Self {
data: Mutex::new(VecDeque::new()),
cv: Condvar::new(),
}
}

/// Adds an element to the back of the queue
fn push(&self, value: T) {
let mut data = self.data.lock().unwrap();
data.push_back(value);
self.cv.notify_one();
}

/// Removes an element from the front of the queue
/// Returns None if the queue is empty
fn pop(&self) -> T {
let mut data = self.data.lock().unwrap();

while data.is_empty() {
data = self.cv.wait(data).unwrap();
}

data.pop_front().unwrap()
}

/// Returns the size of the queue
fn len(&self) -> usize {
let data = self.data.lock().unwrap();
data.len()
}

/// Checks if the queue is empty
fn is_empty(&self) -> bool {
let data = self.data.lock().unwrap();
data.is_empty()
}
}

#[cfg(test)]
mod test {
use crate::prelude::*;
use crate::FifoQueue;
use std::{sync::Arc, thread};

#[test]
fn test_basic_functionalities() {
let queue = FifoQueue::new();

// Test push and pop
queue.push(1);
queue.push(2);
queue.push(3);

assert_eq!(queue.pop(), 1);
assert_eq!(queue.pop(), 2);
assert_eq!(queue.pop(), 3);

// Test size and is_empty
assert_eq!(queue.len(), 0);
assert!(queue.is_empty());

queue.push(4);
queue.push(5);

assert_eq!(queue.len(), 2);
assert!(!queue.is_empty());
}

#[test]
fn test_queue_thread_safety() {
// createa a queue of numbers
let queue = Arc::new(FifoQueue::<i32>::new());

let q1 = queue.clone();
let t1 = thread::spawn(move || {
q1.push(1);
q1.push(2);
});

let q2 = queue.clone();
let t2 = thread::spawn(move || {
q2.push(3);
q2.push(4)
});

t1.join().unwrap();
t2.join().unwrap();

assert_eq!(queue.len(), 4);
}

#[test]
fn test_concurrent_pushes_and_pops() {
let queue = Arc::new(FifoQueue::new());

let queue1 = queue.clone();
let handle1 = thread::spawn(move || {
for i in 0..1000 {
queue1.push(i);
}
});

let queue2 = queue.clone();
let handle2 = thread::spawn(move || {
for _ in 0..1000 {
queue2.pop();
}
});

handle1.join().unwrap();
handle2.join().unwrap();

assert!(queue.is_empty());
}

#[test]
fn test_concurrent_mixed_operations() {
let queue = Arc::new(FifoQueue::new());

let queue1 = queue.clone();
let handle1 = thread::spawn(move || {
for i in 0..1000 {
queue1.push(i);
queue1.pop();
}
});

let queue2 = queue.clone();
let handle2 = thread::spawn(move || {
for i in 0..1000 {
queue2.push(i);
queue2.pop();
}
});

handle1.join().unwrap();
handle2.join().unwrap();

assert!(queue.is_empty());
}
}
71 changes: 39 additions & 32 deletions src/watcher.rs
Original file line number Diff line number Diff line change
@@ -1,40 +1,39 @@
use crate::build;
use crate::helpers;
use futures::{
channel::mpsc::{channel, Receiver},
SinkExt, StreamExt,
};
use crate::queue::FifoQueue;
use crate::queue::*;
use futures_timer::Delay;
use notify::{Config, Event, RecommendedWatcher, RecursiveMode, Watcher};
use notify::{Config, Error, Event, RecommendedWatcher, RecursiveMode, Watcher};
use std::sync::Arc;
use std::time::Duration;

fn async_watcher() -> notify::Result<(RecommendedWatcher, Receiver<notify::Result<Event>>)> {
// set the buffer large enough so that we don't trigger unecessary rebuilds
let (mut tx, rx) = channel(100000);

fn async_watcher(q: Arc<FifoQueue<Result<Event, Error>>>) -> notify::Result<RecommendedWatcher> {
// Automatically select the best implementation for your platform.
// You can also access each implementation directly e.g. INotifyWatcher.
let watcher = RecommendedWatcher::new(
move |res| {
futures::executor::block_on(async {
tx.send(res).await.unwrap();
})
},
move |res| futures::executor::block_on(async { q.push(res) }),
Config::default(),
)?;

Ok((watcher, rx))
Ok(watcher)
}

async fn async_watch(path: &str, filter: &Option<regex::Regex>) -> notify::Result<()> {
let (mut watcher, rx) = async_watcher()?;
watcher.watch(path.as_ref(), RecursiveMode::Recursive)?;
let mut ready_chunks = rx.ready_chunks(100000);

async fn async_watch(
q: Arc<FifoQueue<Result<Event, Error>>>,
path: &str,
filter: &Option<regex::Regex>,
) -> notify::Result<()> {
loop {
let events = ready_chunks.next().await.unwrap();
let needs_compile = events.iter().any(|event| match event {
Ok(event) => event.paths.iter().any(|path| {
let mut events: Vec<Event> = vec![];
while !q.is_empty() {
match q.pop() {
Ok(event) => events.push(event),
Err(_) => (),
}
}

let needs_compile = events.iter().any(|event| {
event.paths.iter().any(|path| {
let path_buf = path.to_path_buf();
let name = path_buf
.file_name()
Expand All @@ -55,25 +54,33 @@ async fn async_watch(path: &str, filter: &Option<regex::Regex>) -> notify::Resul

_ => false,
}
}),
Err(_) => false,
})
});

if needs_compile {
// we wait for a bit before starting the compile as a debouncer
let delay = Duration::from_millis(200);
Delay::new(delay).await;
// we drain the channel to avoid triggering multiple compiles
let _ = ready_chunks.next().await;
// Flush any remaining events that came in before
while !q.is_empty() {
let _ = q.pop();
}

let _ = build::build(filter, path);
}
}
}

pub fn start(filter: &Option<regex::Regex>, folder: &str) {
futures::executor::block_on(async {
if let Err(e) = async_watch(folder, filter).await {
let queue = Arc::new(FifoQueue::<Result<Event, Error>>::new());
let producer = queue.clone();
let consumer = queue.clone();

let mut watcher = async_watcher(producer).expect("Could not create watcher");
watcher
.watch(folder.as_ref(), RecursiveMode::Recursive)
.expect("Could not start watcher");

if let Err(e) = async_watch(consumer, folder, filter).await {
println!("error: {:?}", e)
}
});
})
}
8 changes: 8 additions & 0 deletions testrepo/packages/main/src/Main.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,14 @@ import * as Dep01 from "@testrepo/dep01/src/Dep01.mjs";

console.log("01");

console.log("01");

console.log("01");

console.log("01");

console.log("01");

Dep01.log(undefined);

var $$Array;
Expand Down
4 changes: 4 additions & 0 deletions testrepo/packages/main/src/Main.res
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
Js.log("01")
Js.log("01")
Js.log("01")
Js.log("01")
Js.log("01")
Dep01.log()

module Array = Belt.Array
Expand Down

0 comments on commit 41c66eb

Please sign in to comment.