diff --git a/crate/thread/Cargo.toml b/crate/thread/Cargo.toml index a228acf..20a00a0 100644 --- a/crate/thread/Cargo.toml +++ b/crate/thread/Cargo.toml @@ -7,4 +7,5 @@ edition = "2018" [dependencies] log = "0.4" -spin = "0.4" \ No newline at end of file +spin = "0.5" +deque = { git = "https://github.com/wangrunji0408/deque.git", branch = "no_std" } \ No newline at end of file diff --git a/crate/thread/src/scheduler/mod.rs b/crate/thread/src/scheduler/mod.rs index 8d6367f..9974f51 100644 --- a/crate/thread/src/scheduler/mod.rs +++ b/crate/thread/src/scheduler/mod.rs @@ -5,18 +5,20 @@ use spin::Mutex; pub use self::rr::RRScheduler; pub use self::stride::StrideScheduler; +pub use self::work_stealing::WorkStealingScheduler; mod rr; mod stride; +mod work_stealing; type Pid = usize; /// The scheduler for a ThreadPool -pub trait Scheduler: Sync + 'static { +pub trait Scheduler: 'static { /// Push a thread to the back of ready queue. fn push(&self, pid: Pid); /// Select a thread to run, pop it from the queue. - fn pop(&self) -> Option; + fn pop(&self, cpu_id: usize) -> Option; /// Got a tick from CPU. /// Return true if need reschedule. fn tick(&self, current_pid: Pid) -> bool; diff --git a/crate/thread/src/scheduler/rr.rs b/crate/thread/src/scheduler/rr.rs index 5ff7589..b404191 100644 --- a/crate/thread/src/scheduler/rr.rs +++ b/crate/thread/src/scheduler/rr.rs @@ -21,7 +21,7 @@ impl Scheduler for RRScheduler { fn push(&self, pid: usize) { self.inner.lock().push(pid); } - fn pop(&self) -> Option { + fn pop(&self, _cpu_id: usize) -> Option { self.inner.lock().pop() } fn tick(&self, current_pid: usize) -> bool { diff --git a/crate/thread/src/scheduler/stride.rs b/crate/thread/src/scheduler/stride.rs index c16fcef..262d64d 100644 --- a/crate/thread/src/scheduler/stride.rs +++ b/crate/thread/src/scheduler/stride.rs @@ -38,7 +38,7 @@ impl Scheduler for StrideScheduler { fn push(&self, pid: usize) { self.inner.lock().push(pid); } - fn pop(&self) -> Option { + fn pop(&self, _cpu_id: usize) -> Option { self.inner.lock().pop() } fn tick(&self, current_pid: usize) -> bool { diff --git a/crate/thread/src/scheduler/work_stealing.rs b/crate/thread/src/scheduler/work_stealing.rs new file mode 100644 index 0000000..d085633 --- /dev/null +++ b/crate/thread/src/scheduler/work_stealing.rs @@ -0,0 +1,56 @@ +use super::*; +use deque::{self, Stealer, Worker, Stolen}; + +pub struct WorkStealingScheduler { + /// The ready queue of each processors + workers: Vec>, + /// Stealers to all processors' queue + stealers: Vec>, +} + +impl WorkStealingScheduler { + pub fn new(core_num: usize) -> Self { + let (workers, stealers) = (0..core_num).map(|_| deque::new()).unzip(); + WorkStealingScheduler { workers, stealers } + } +} + +impl Scheduler for WorkStealingScheduler { + fn push(&self, pid: usize) { + // TODO: push to random queue? + // now just push to cpu0 + self.workers[0].push(pid); + trace!("work-stealing: cpu0 push thread {}", pid); + } + + fn pop(&self, cpu_id: usize) -> Option { + if let Some(pid) = self.workers[cpu_id].pop() { + trace!("work-stealing: cpu{} pop thread {}", cpu_id, pid); + return Some(pid); + } + let n = self.workers.len(); + for i in 1..n { + let mut other_id = cpu_id + i; + if other_id >= n { + other_id -= n; + } + loop { + match self.stealers[other_id].steal() { + Stolen::Abort => {} // retry + Stolen::Empty => break, + Stolen::Data(pid) => { + trace!("work-stealing: cpu{} steal thread {} from cpu{}", cpu_id, pid, other_id); + return Some(pid); + } + } + } + } + None + } + + fn tick(&self, _current_pid: usize) -> bool { + true + } + + fn set_priority(&self, _pid: usize, _priority: u8) {} +} diff --git a/crate/thread/src/thread_pool.rs b/crate/thread/src/thread_pool.rs index 6b05b58..2f2bacd 100644 --- a/crate/thread/src/thread_pool.rs +++ b/crate/thread/src/thread_pool.rs @@ -103,7 +103,7 @@ impl ThreadPool { /// The manager first mark it `Running`, /// then take out and return its Context. pub(crate) fn run(&self, cpu_id: usize) -> Option<(Tid, Box)> { - self.scheduler.pop() + self.scheduler.pop(cpu_id) .map(|tid| { let mut proc_lock = self.threads[tid].lock(); let mut proc = proc_lock.as_mut().expect("process not exist");