From 1760d765813b71cca6150208fc426fd309cfca3b Mon Sep 17 00:00:00 2001 From: WangRunji Date: Tue, 5 Jun 2018 01:32:21 +0800 Subject: [PATCH] Basic sync::mpsc. FIXME: deadlock. --- src/lib.rs | 6 +- src/sync/condvar.rs | 3 +- src/sync/mod.rs | 7 ++- src/sync/mpsc.rs | 142 ++++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 153 insertions(+), 5 deletions(-) create mode 100644 src/sync/mpsc.rs diff --git a/src/lib.rs b/src/lib.rs index 5a4bdff..9d3dce5 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -11,6 +11,7 @@ #![feature(unboxed_closures)] #![feature(naked_functions)] #![feature(asm)] +#![feature(optin_builtin_traits)] #![no_std] @@ -95,8 +96,9 @@ pub extern "C" fn rust_main(multiboot_information_address: usize) -> ! { unsafe{ arch::interrupt::enable(); } // thread::test::unpack(); - sync::test::philosopher_using_mutex(); - sync::test::philosopher_using_monitor(); +// sync::test::philosopher_using_mutex(); +// sync::test::philosopher_using_monitor(); + sync::mpsc::test::test_all(); // 直接进入用户态暂不可用:内核代码用户不可访问 // unsafe{ diff --git a/src/sync/condvar.rs b/src/sync/condvar.rs index 713385f..67dfe8c 100644 --- a/src/sync/condvar.rs +++ b/src/sync/condvar.rs @@ -2,13 +2,14 @@ use alloc::VecDeque; use super::*; use thread; +#[derive(Default)] pub struct Condvar { wait_queue: SpinNoIrqLock>, } impl Condvar { pub fn new() -> Self { - Condvar { wait_queue: SpinNoIrqLock::new(VecDeque::new()) } + Condvar::default() } pub fn _wait(&self) { self.wait_queue.lock().push_back(thread::current()); diff --git a/src/sync/mod.rs b/src/sync/mod.rs index f9a22b4..91cfea1 100644 --- a/src/sync/mod.rs +++ b/src/sync/mod.rs @@ -1,7 +1,10 @@ +pub use self::condvar::*; +pub use self::mutex::*; +pub use self::semaphore::*; + mod mutex; mod condvar; mod semaphore; +pub mod mpsc; pub mod test; -pub use self::mutex::*; -pub use self::condvar::*; \ No newline at end of file diff --git a/src/sync/mpsc.rs b/src/sync/mpsc.rs new file mode 100644 index 0000000..9c55b0f --- /dev/null +++ b/src/sync/mpsc.rs @@ -0,0 +1,142 @@ +use alloc::{arc::Arc, arc::Weak, VecDeque}; +use super::Condvar; +use super::ThreadLock as Mutex; + +struct Channel { + deque: Mutex>, + pushed: Condvar, +} + +impl Default for Channel { + fn default() -> Self { + Channel { + deque: Mutex::<_>::default(), + pushed: Condvar::default(), + } + } +} + +/// The receiving half of Rust's channel (or sync_channel) type. +/// This half can only be owned by one thread. +/// +/// Messages sent to the channel can be retrieved using recv. +pub struct Receiver { + inner: Arc>, +} + +unsafe impl Send for Receiver {} + +impl ! Sync for Receiver {} + +#[derive(Debug)] +pub struct RecvError; + +impl Receiver { + /// Attempts to wait for a value on this receiver, + /// returning an error if the corresponding channel has hung up. + pub fn recv(&self) -> Result { + let mut deque = self.inner.deque.lock(); + while deque.is_empty() { + deque = self.inner.pushed.wait(deque); + } + Ok(deque.pop_front().unwrap()) + } +} + +/// The sending-half of Rust's asynchronous channel type. +/// This half can only be owned by one thread, but it can be cloned to send to other threads. +/// +/// Messages can be sent through this channel with send. +#[derive(Clone)] +pub struct Sender { + inner: Weak>, +} + +unsafe impl Send for Sender {} + +impl ! Sync for Sender {} + +#[derive(Debug)] +pub struct SendError(pub T); + +impl Sender { + /// Attempts to send a value on this channel, + /// returning it back if it could not be sent. + pub fn send(&self, t: T) -> Result<(), SendError> { + match self.inner.upgrade() { + None => Err(SendError(t)), + Some(inner) => { + let mut deque = inner.deque.lock(); + deque.push_back(t); + Ok(()) + } + } + } +} + +/// Creates a new asynchronous channel, returning the sender/receiver halves. +pub fn channel() -> (Sender, Receiver) { + let channel = Arc::new(Channel::::default()); + let sender = Sender { inner: Arc::downgrade(&channel) }; + let receiver = Receiver { inner: channel }; + (sender, receiver) +} + +use alloc::boxed::Box; +use super::*; +use thread; + +pub mod test { + //! Copied from std::mpsc::test + + fn smoke() { + let (tx, rx) = channel::(); + tx.send(1).unwrap(); + assert_eq!(rx.recv().unwrap(), 1); + } + + fn drop_full() { + let (tx, _rx) = channel::>(); + tx.send(Box::new(1)).unwrap(); + } + + fn drop_full_shared() { + let (tx, _rx) = channel::>(); + drop(tx.clone()); + drop(tx.clone()); + tx.send(Box::new(1)).unwrap(); + } + + fn smoke_shared() { + let (tx, rx) = channel::(); + tx.send(1).unwrap(); + assert_eq!(rx.recv().unwrap(), 1); + let tx = tx.clone(); + tx.send(1).unwrap(); + assert_eq!(rx.recv().unwrap(), 1); + } + + fn smoke_threads() { + let (tx, rx) = channel::(); + let _t = thread::spawn(move || { + tx.send(1).unwrap(); + }); + assert_eq!(rx.recv().unwrap(), 1); + } + + fn smoke_port_gone() { + let (tx, rx) = channel::(); + drop(rx); + assert!(tx.send(1).is_err()); + } + + pub fn test_all() { + smoke(); + drop_full(); + drop_full_shared(); + smoke_shared(); +// smoke_threads(); // FIXME: deadlock + smoke_port_gone(); + println!("mpsc test end"); + } +} \ No newline at end of file