ThreadLock. Dining philosophers problem. Fix thread::spawn.

master
WangRunji 7 years ago
parent 25dde04795
commit 31bc92aec6

@ -90,6 +90,9 @@ pub extern "C" fn rust_main(multiboot_information_address: usize) -> ! {
unsafe{ arch::interrupt::enable(); } unsafe{ arch::interrupt::enable(); }
// thread::test::unpack();
sync::philosopher::philosopher();
// 直接进入用户态暂不可用:内核代码用户不可访问 // 直接进入用户态暂不可用:内核代码用户不可访问
// unsafe{ // unsafe{
// use arch::syscall; // use arch::syscall;

@ -1,4 +1,4 @@
//! Mutex (Spin, Spin-NoInterrupt, Yield) //! Mutex (Spin, SpinNoIrq, Thread)
//! //!
//! Modified from spin::mutex. //! Modified from spin::mutex.
@ -6,29 +6,28 @@ use core::sync::atomic::{AtomicBool, ATOMIC_BOOL_INIT, Ordering};
use core::cell::UnsafeCell; use core::cell::UnsafeCell;
use core::ops::{Deref, DerefMut}; use core::ops::{Deref, DerefMut};
use core::fmt; use core::fmt;
use core::marker::PhantomData;
use arch::interrupt; use arch::interrupt;
pub type SpinLock<T> = Mutex<T, Spin>; pub type SpinLock<T> = Mutex<T, Spin>;
pub type SpinNoIrqLock<T> = Mutex<T, SpinNoIrq>; pub type SpinNoIrqLock<T> = Mutex<T, SpinNoIrq>;
pub type YieldLock<T> = Mutex<T, Yield>; pub type ThreadLock<T> = Mutex<T, Thread>;
/// Spin & no-interrupt lock
pub struct Mutex<T: ?Sized, S: MutexSupport> pub struct Mutex<T: ?Sized, S: MutexSupport>
{ {
lock: AtomicBool, lock: AtomicBool,
support: PhantomData<S>, support: S,
data: UnsafeCell<T>, data: UnsafeCell<T>,
} }
/// A guard to which the protected data can be accessed /// A guard to which the protected data can be accessed
/// ///
/// When the guard falls out of scope it will release the lock. /// When the guard falls out of scope it will release the lock.
pub struct MutexGuard<'a, T: ?Sized + 'a, S: MutexSupport> pub struct MutexGuard<'a, T: ?Sized + 'a, S: MutexSupport + 'a>
{ {
lock: &'a AtomicBool, lock: &'a AtomicBool,
data: &'a mut T, data: &'a mut T,
support: S, support: &'a S,
support_guard: S::GuardData,
} }
// Same unsafe impls as `std::sync::Mutex` // Same unsafe impls as `std::sync::Mutex`
@ -54,11 +53,11 @@ impl<T, S: MutexSupport> Mutex<T, S>
/// drop(lock); /// drop(lock);
/// } /// }
/// ``` /// ```
pub const fn new(user_data: T) -> Mutex<T, S> { pub fn new(user_data: T) -> Mutex<T, S> {
Mutex { Mutex {
lock: ATOMIC_BOOL_INIT, lock: ATOMIC_BOOL_INIT,
data: UnsafeCell::new(user_data), data: UnsafeCell::new(user_data),
support: PhantomData, support: S::new(),
} }
} }
@ -77,7 +76,7 @@ impl<T: ?Sized, S: MutexSupport> Mutex<T, S>
while self.lock.compare_and_swap(false, true, Ordering::Acquire) != false { while self.lock.compare_and_swap(false, true, Ordering::Acquire) != false {
// Wait until the lock looks unlocked before retrying // Wait until the lock looks unlocked before retrying
while self.lock.load(Ordering::Relaxed) { while self.lock.load(Ordering::Relaxed) {
S::cpu_relax(); self.support.cpu_relax();
} }
} }
} }
@ -99,12 +98,13 @@ impl<T: ?Sized, S: MutexSupport> Mutex<T, S>
/// ``` /// ```
pub fn lock(&self) -> MutexGuard<T, S> pub fn lock(&self) -> MutexGuard<T, S>
{ {
let support = S::before_lock(); let support_guard = S::before_lock();
self.obtain_lock(); self.obtain_lock();
MutexGuard { MutexGuard {
lock: &self.lock, lock: &self.lock,
data: unsafe { &mut *self.data.get() }, data: unsafe { &mut *self.data.get() },
support, support: &self.support,
support_guard,
} }
} }
@ -122,26 +122,26 @@ impl<T: ?Sized, S: MutexSupport> Mutex<T, S>
/// Tries to lock the mutex. If it is already locked, it will return None. Otherwise it returns /// Tries to lock the mutex. If it is already locked, it will return None. Otherwise it returns
/// a guard within Some. /// a guard within Some.
pub fn try_lock(&self) -> Option<MutexGuard<T, S>> { pub fn try_lock(&self) -> Option<MutexGuard<T, S>> {
let support = S::before_lock(); let support_guard = S::before_lock();
if self.lock.compare_and_swap(false, true, Ordering::Acquire) == false { if self.lock.compare_and_swap(false, true, Ordering::Acquire) == false {
Some(MutexGuard { Some(MutexGuard {
lock: &self.lock, lock: &self.lock,
data: unsafe { &mut *self.data.get() }, data: unsafe { &mut *self.data.get() },
support, support: &self.support,
support_guard,
}) })
} else { } else {
support.after_unlock();
None None
} }
} }
} }
impl<T: ?Sized + fmt::Debug, S: MutexSupport> fmt::Debug for Mutex<T, S> impl<T: ?Sized + fmt::Debug, S: MutexSupport + fmt::Debug> fmt::Debug for Mutex<T, S>
{ {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self.try_lock() { match self.try_lock() {
Some(guard) => write!(f, "Mutex<{:?}> {{ data: {:?} }}", self.support, &*guard), Some(guard) => write!(f, "Mutex {{ data: {:?}, support: {:?} }}", &*guard, self.support),
None => write!(f, "Mutex<{:?}> {{ <locked> }}", self.support), None => write!(f, "Mutex {{ <locked>, support: {:?} }}", self.support),
} }
} }
} }
@ -174,58 +174,157 @@ impl<'a, T: ?Sized, S: MutexSupport> Drop for MutexGuard<'a, T, S>
/// Low-level support for mutex /// Low-level support for mutex
pub trait MutexSupport { pub trait MutexSupport {
type GuardData;
fn new() -> Self;
/// Called when failing to acquire the lock /// Called when failing to acquire the lock
fn cpu_relax(); fn cpu_relax(&self);
/// Called before lock() & try_lock() /// Called before lock() & try_lock()
fn before_lock() -> Self; fn before_lock() -> Self::GuardData;
/// Called when MutexGuard dropping & try_lock() failed /// Called when MutexGuard dropping
fn after_unlock(&self); fn after_unlock(&self);
} }
/// Spin lock /// Spin lock
#[derive(Debug)]
pub struct Spin; pub struct Spin;
impl MutexSupport for Spin { impl MutexSupport for Spin {
fn cpu_relax() { type GuardData = ();
fn new() -> Self { Spin }
fn cpu_relax(&self) {
unsafe { asm!("pause" :::: "volatile"); } unsafe { asm!("pause" :::: "volatile"); }
} }
fn before_lock() -> Self { fn before_lock() -> Self::GuardData {}
Spin
}
fn after_unlock(&self) {} fn after_unlock(&self) {}
} }
/// Spin & no-interrupt lock /// Spin & no-interrupt lock
pub struct SpinNoIrq { #[derive(Debug)]
flags: usize, pub struct SpinNoIrq;
/// Contains RFLAGS before disable interrupt, will auto restore it when dropping
pub struct FlagsGuard(usize);
impl Drop for FlagsGuard {
fn drop(&mut self) {
unsafe { interrupt::restore(self.0) };
}
} }
impl MutexSupport for SpinNoIrq { impl MutexSupport for SpinNoIrq {
fn cpu_relax() { type GuardData = FlagsGuard;
fn new() -> Self {
SpinNoIrq
}
fn cpu_relax(&self) {
unsafe { asm!("pause" :::: "volatile"); } unsafe { asm!("pause" :::: "volatile"); }
} }
fn before_lock() -> Self { fn before_lock() -> Self::GuardData {
SpinNoIrq { FlagsGuard(unsafe { interrupt::disable_and_store() })
flags: unsafe { interrupt::disable_and_store() },
} }
fn after_unlock(&self) {}
} }
use thread;
use alloc::VecDeque;
/// With thread support
pub struct Thread {
wait_queue: SpinLock<VecDeque<thread::Thread>>,
}
impl MutexSupport for Thread {
type GuardData = ();
fn new() -> Self {
Thread { wait_queue: SpinLock::new(VecDeque::new()) }
}
fn cpu_relax(&self) {
self.wait_queue.lock().push_back(thread::current());
thread::park();
}
fn before_lock() -> Self::GuardData {}
fn after_unlock(&self) { fn after_unlock(&self) {
unsafe { interrupt::restore(self.flags) }; if let Some(t) = self.wait_queue.lock().pop_front() {
t.unpark();
}
} }
} }
/// With thread support
pub struct Yield;
impl MutexSupport for Yield { pub mod philosopher {
fn cpu_relax() {
use thread; use thread;
thread::yield_now(); use core::time::Duration;
use alloc::{arc::Arc, Vec};
use super::ThreadLock as Mutex;
struct Philosopher {
name: &'static str,
left: usize,
right: usize,
} }
fn before_lock() -> Self {
unimplemented!() impl Philosopher {
fn new(name: &'static str, left: usize, right: usize) -> Philosopher {
Philosopher {
name,
left,
right,
} }
fn after_unlock(&self) { }
unimplemented!()
fn eat(&self, table: &Table) {
let _left = table.forks[self.left].lock();
let _right = table.forks[self.right].lock();
println!("{} is eating.", self.name);
thread::sleep(Duration::from_secs(1));
}
fn think(&self) {
println!("{} is thinking.", self.name);
thread::sleep(Duration::from_secs(1));
}
}
struct Table {
forks: Vec<Mutex<()>>,
}
pub fn philosopher() {
let table = Arc::new(Table {
forks: vec![
Mutex::new(()),
Mutex::new(()),
Mutex::new(()),
Mutex::new(()),
Mutex::new(()),
]
});
let philosophers = vec![
Philosopher::new("1", 0, 1),
Philosopher::new("2", 1, 2),
Philosopher::new("3", 2, 3),
Philosopher::new("4", 3, 4),
Philosopher::new("5", 0, 4),
];
let handles: Vec<_> = philosophers.into_iter().map(|p| {
let table = table.clone();
thread::spawn(move || {
for i in 0..5 {
p.think();
p.eat(&table);
println!("{} iter {} end.", p.name, i);
}
})
}).collect();
for h in handles {
h.join().unwrap();
}
println!("philosophers dining end");
} }
} }

@ -133,7 +133,8 @@ fn sys_exit(error_code: usize) -> i32 {
} }
fn sys_sleep(time: usize) -> i32 { fn sys_sleep(time: usize) -> i32 {
thread::sleep(time); use core::time::Duration;
thread::sleep(Duration::from_millis(time as u64 * 10));
0 0
} }

@ -6,6 +6,7 @@
use process::*; use process::*;
use core::marker::PhantomData; use core::marker::PhantomData;
use core::ptr; use core::ptr;
use core::time::Duration;
use alloc::boxed::Box; use alloc::boxed::Box;
/// Gets a handle to the thread that invokes it. /// Gets a handle to the thread that invokes it.
@ -16,13 +17,16 @@ pub fn current() -> Thread {
} }
/// Puts the current thread to sleep for the specified amount of time. /// Puts the current thread to sleep for the specified amount of time.
pub fn sleep(time: usize) { pub fn sleep(dur: Duration) {
// TODO: use core::time::Duration info!("sleep: {:?}", dur);
info!("sleep: {} ticks", time);
let mut processor = PROCESSOR.try().unwrap().lock(); let mut processor = PROCESSOR.try().unwrap().lock();
let pid = processor.current_pid(); let pid = processor.current_pid();
processor.sleep(pid, time); processor.sleep(pid, dur_to_ticks(dur));
processor.schedule(); processor.schedule();
fn dur_to_ticks(dur: Duration) -> usize {
return dur.as_secs() as usize * 100 + dur.subsec_nanos() as usize / 10_000_000;
}
} }
/// Spawns a new thread, returning a JoinHandle for it. /// Spawns a new thread, returning a JoinHandle for it.
@ -33,7 +37,8 @@ pub fn spawn<F, T>(f: F) -> JoinHandle<T>
{ {
info!("spawn:"); info!("spawn:");
use process; use process;
let pid = process::add_kernel_process(kernel_thread_entry::<F, T>, &f as *const _ as usize); let f = Box::leak(Box::new(f));
let pid = process::add_kernel_process(kernel_thread_entry::<F, T>, f as *mut _ as usize);
return JoinHandle { return JoinHandle {
thread: Thread { pid }, thread: Thread { pid },
mark: PhantomData, mark: PhantomData,
@ -44,11 +49,12 @@ pub fn spawn<F, T>(f: F) -> JoinHandle<T>
F: Send + 'static + FnOnce() -> T, F: Send + 'static + FnOnce() -> T,
T: Send + 'static, T: Send + 'static,
{ {
let f = unsafe { ptr::read(f as *mut F) }; let f = unsafe { Box::from_raw(f as *mut F) };
let ret = Box::new(f()); let ret = Box::new(f());
let mut processor = PROCESSOR.try().unwrap().lock(); let mut processor = PROCESSOR.try().unwrap().lock();
let pid = processor.current_pid(); let pid = processor.current_pid();
processor.exit(pid, Box::into_raw(ret) as usize); processor.exit(pid, Box::into_raw(ret) as usize);
processor.schedule();
unreachable!() unreachable!()
} }
} }
@ -115,6 +121,7 @@ impl<T> JoinHandle<T> {
pub mod test { pub mod test {
use thread; use thread;
use core::time::Duration;
pub fn unpack() { pub fn unpack() {
let parked_thread = thread::spawn(|| { let parked_thread = thread::spawn(|| {
@ -125,7 +132,7 @@ pub mod test {
}); });
// Let some time pass for the thread to be spawned. // Let some time pass for the thread to be spawned.
thread::sleep(200); thread::sleep(Duration::from_secs(2));
println!("Unpark the thread"); println!("Unpark the thread");
parked_thread.thread().unpark(); parked_thread.thread().unpark();

Loading…
Cancel
Save