From d1d7fe44a779760f17d80d05edaeb29281dad461 Mon Sep 17 00:00:00 2001 From: Jiajie Chen Date: Thu, 23 May 2019 23:21:02 +0800 Subject: [PATCH] Fix condvar race(maybe) --- kernel/Cargo.lock | 2 +- kernel/src/arch/x86_64/gdt.rs | 3 -- kernel/src/drivers/net/router.rs | 4 +- kernel/src/sync/condvar.rs | 76 ++++++++++++++++++++++---------- kernel/src/syscall/fs.rs | 36 ++++++++------- kernel/src/syscall/mod.rs | 7 +-- 6 files changed, 75 insertions(+), 53 deletions(-) diff --git a/kernel/Cargo.lock b/kernel/Cargo.lock index 27d76c7..8d7b1e9 100644 --- a/kernel/Cargo.lock +++ b/kernel/Cargo.lock @@ -417,7 +417,7 @@ dependencies = [ [[package]] name = "rcore-thread" version = "0.1.0" -source = "git+https://github.com/rcore-os/rcore-thread#77e8e0778154734ee59525e043caab6339e14d75" +source = "git+https://github.com/rcore-os/rcore-thread#56021ab440fab8c7b819fed6a42649fb8bbaec07" dependencies = [ "deque 0.3.2 (git+https://github.com/rcore-os/deque.git?branch=no_std)", "log 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)", diff --git a/kernel/src/arch/x86_64/gdt.rs b/kernel/src/arch/x86_64/gdt.rs index 0ae266e..9db86dc 100644 --- a/kernel/src/arch/x86_64/gdt.rs +++ b/kernel/src/arch/x86_64/gdt.rs @@ -61,9 +61,6 @@ impl Cpu { let mut queue = self.ipi_handler_queue.lock(); queue.push(item); } - pub fn current() -> &'static mut Cpu { - unsafe { CPUS[super::cpu::id()].as_mut().unwrap() } - } pub fn handle_ipi(&self) { let mut queue = self.ipi_handler_queue.lock(); let handlers = core::mem::replace(queue.as_mut(), vec![]); diff --git a/kernel/src/drivers/net/router.rs b/kernel/src/drivers/net/router.rs index 2b2c6cb..23a7625 100644 --- a/kernel/src/drivers/net/router.rs +++ b/kernel/src/drivers/net/router.rs @@ -119,7 +119,7 @@ impl phy::TxToken for RouterTxToken { { let mut buffer = vec![0; len]; let res = f(&mut buffer); - debug!("out buf {}", len); + debug!("out buf {} data {:x?} port {}", len, &buffer[..20], (self.0).1); unsafe { AXI_STREAM_FIFO_TDR.write_volatile(2); @@ -157,7 +157,7 @@ impl Driver for RouterInterface { for i in 1..rdfo { buffer.push(AXI_STREAM_FIFO_RDFD.read_volatile() as u8); } - debug!("got packet of length {} port {}", rdfo, port); + debug!("got packet of length {} port {} data {:x?}", rdfo, port, &buffer[..20]); driver.buffer[port as usize].push(buffer); } drop(driver); diff --git a/kernel/src/sync/condvar.rs b/kernel/src/sync/condvar.rs index 61db073..0f19b03 100644 --- a/kernel/src/sync/condvar.rs +++ b/kernel/src/sync/condvar.rs @@ -1,4 +1,6 @@ use super::*; +use crate::arch::cpu; +use crate::process::processor; use crate::thread; use alloc::collections::VecDeque; use alloc::sync::Arc; @@ -26,26 +28,44 @@ impl Condvar { }); } - #[deprecated(note = "this may leads to lost wakeup problem. please use `wait` instead.")] - pub fn wait_any(condvars: &[&Condvar]) { - let token = Arc::new(thread::current()); - // Avoid racing in the same way as the function above - let mut locks = Vec::new(); - locks.reserve(condvars.len()); + fn add_to_wait_queue(&self) -> MutexGuard>, SpinNoIrq> { + let mut lock = self.wait_queue.lock(); + lock.push_back(Arc::new(thread::current())); + return lock; + } + + pub fn wait_event(condvar: &Condvar, mut condition: impl FnMut() -> Option) -> T { + Self::wait_events(&[condvar], condition) + } + + pub fn wait_events(condvars: &[&Condvar], mut condition: impl FnMut() -> Option) -> T { + let thread = thread::current(); + let tid = thread.id(); + let token = Arc::new(thread); for condvar in condvars { let mut lock = condvar.wait_queue.lock(); lock.push_back(token.clone()); - locks.push(lock); } - thread::park_action(move || { - drop(locks); - }); - } + let mut locks = Vec::with_capacity(condvars.len()); + loop { + for condvar in condvars { + let mut lock = condvar.wait_queue.lock(); + locks.push(lock); + } + processor().manager().sleep(tid, 0); + locks.clear(); - fn add_to_wait_queue(&self) -> MutexGuard>, SpinNoIrq> { - let mut lock = self.wait_queue.lock(); - lock.push_back(Arc::new(thread::current())); - return lock; + if let Some(res) = condition() { + let _ = FlagsGuard::no_irq_region(); + processor().manager().cancel_sleeping(tid); + for condvar in condvars { + let mut lock = condvar.wait_queue.lock(); + lock.retain(|t| !Arc::ptr_eq(t, &token)); + } + return res; + } + processor().yield_now(); + } } /// Park current thread and wait for this condvar to be notified. @@ -54,21 +74,28 @@ impl Condvar { S: MutexSupport, { let mutex = guard.mutex; - let lock = self.add_to_wait_queue(); + let token = Arc::new(thread::current()); + let mut lock = self.wait_queue.lock(); + lock.push_back(token.clone()); + thread::park_action(move || { drop(lock); drop(guard); }); - mutex.lock() + let ret = mutex.lock(); + let mut lock = self.wait_queue.lock(); + lock.retain(|t| !Arc::ptr_eq(&t, &token)); + ret } pub fn notify_one(&self) { - if let Some(t) = self.wait_queue.lock().pop_front() { + if let Some(t) = self.wait_queue.lock().front() { t.unpark(); } } pub fn notify_all(&self) { - while let Some(t) = self.wait_queue.lock().pop_front() { + let queue = self.wait_queue.lock(); + for t in queue.iter() { t.unpark(); } } @@ -76,14 +103,15 @@ impl Condvar { /// Return the number of waiters that were woken up. pub fn notify_n(&self, n: usize) -> usize { let mut count = 0; - while count < n { - if let Some(t) = self.wait_queue.lock().pop_front() { - t.unpark(); - count += 1; - } else { + let queue = self.wait_queue.lock(); + for t in queue.iter() { + if (count >= n) { break; } + t.unpark(); + count += 1; } + count } } diff --git a/kernel/src/syscall/fs.rs b/kernel/src/syscall/fs.rs index 90768c2..04a54b6 100644 --- a/kernel/src/syscall/fs.rs +++ b/kernel/src/syscall/fs.rs @@ -105,14 +105,17 @@ impl Syscall<'_> { drop(proc); let begin_time_ms = crate::trap::uptime_msec(); - loop { + Condvar::wait_events(&[&STDIN.pushed, &(*SOCKET_ACTIVITY)], move || { use PollEvents as PE; let proc = self.process(); let mut events = 0; for poll in polls.iter_mut() { poll.revents = PE::empty(); if let Some(file_like) = proc.files.get(&(poll.fd as usize)) { - let status = file_like.poll()?; + let status = match file_like.poll() { + Ok(ret) => ret, + Err(err) => return Some(Err(err)), + }; if status.error { poll.revents |= PE::HUP; events += 1; @@ -133,19 +136,15 @@ impl Syscall<'_> { drop(proc); if events > 0 { - return Ok(events); + return Some(Ok(events)); } let current_time_ms = crate::trap::uptime_msec(); if timeout_msecs < (1 << 31) && current_time_ms - begin_time_ms > timeout_msecs { - return Ok(0); + return Some(Ok(0)); } - - // NOTE: To run rustc, uncomment yield_now and comment Condvar. - // Waking up from pipe is unimplemented now. - // thread::yield_now(); - Condvar::wait_any(&[&STDIN.pushed, &(*SOCKET_ACTIVITY)]); - } + return None; + }) } pub fn sys_select( @@ -180,7 +179,7 @@ impl Syscall<'_> { drop(proc); let begin_time_ms = crate::trap::uptime_msec(); - loop { + Condvar::wait_events(&[&STDIN.pushed, &(*SOCKET_ACTIVITY)], move || { let proc = self.process(); let mut events = 0; for (&fd, file_like) in proc.files.iter() { @@ -190,7 +189,10 @@ impl Syscall<'_> { if !err_fds.contains(fd) && !read_fds.contains(fd) && !write_fds.contains(fd) { continue; } - let status = file_like.poll()?; + let status = match file_like.poll() { + Ok(ret) => ret, + Err(err) => return Some(Err(err)), + }; if status.error && err_fds.contains(fd) { err_fds.set(fd); events += 1; @@ -207,23 +209,23 @@ impl Syscall<'_> { drop(proc); if events > 0 { - return Ok(events); + return Some(Ok(events)); } if timeout_msecs == 0 { // no timeout, return now; - return Ok(0); + return Some(Ok(0)); } let current_time_ms = crate::trap::uptime_msec(); // infinity check if timeout_msecs < (1 << 31) && current_time_ms - begin_time_ms > timeout_msecs as usize { - return Ok(0); + return Some(Ok(0)); } - Condvar::wait_any(&[&STDIN.pushed, &(*SOCKET_ACTIVITY)]); - } + return None; + }) } pub fn sys_readv(&mut self, fd: usize, iov_ptr: *const IoVec, iov_count: usize) -> SysResult { diff --git a/kernel/src/syscall/mod.rs b/kernel/src/syscall/mod.rs index cc4086d..5b7def8 100644 --- a/kernel/src/syscall/mod.rs +++ b/kernel/src/syscall/mod.rs @@ -549,12 +549,7 @@ pub fn spin_and_wait(condvars: &[&Condvar], mut action: impl FnMut() -> Optio return result; } } - loop { - if let Some(result) = action() { - return result; - } - Condvar::wait_any(&condvars); - } + Condvar::wait_events(&condvars, action) } pub fn check_and_clone_cstr(user: *const u8) -> Result {