Fix condvar race(maybe)

master
Jiajie Chen 6 years ago
parent f10421bf83
commit d1d7fe44a7

2
kernel/Cargo.lock generated

@ -417,7 +417,7 @@ dependencies = [
[[package]] [[package]]
name = "rcore-thread" name = "rcore-thread"
version = "0.1.0" version = "0.1.0"
source = "git+https://github.com/rcore-os/rcore-thread#77e8e0778154734ee59525e043caab6339e14d75" source = "git+https://github.com/rcore-os/rcore-thread#56021ab440fab8c7b819fed6a42649fb8bbaec07"
dependencies = [ dependencies = [
"deque 0.3.2 (git+https://github.com/rcore-os/deque.git?branch=no_std)", "deque 0.3.2 (git+https://github.com/rcore-os/deque.git?branch=no_std)",
"log 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)",

@ -61,9 +61,6 @@ impl Cpu {
let mut queue = self.ipi_handler_queue.lock(); let mut queue = self.ipi_handler_queue.lock();
queue.push(item); queue.push(item);
} }
pub fn current() -> &'static mut Cpu {
unsafe { CPUS[super::cpu::id()].as_mut().unwrap() }
}
pub fn handle_ipi(&self) { pub fn handle_ipi(&self) {
let mut queue = self.ipi_handler_queue.lock(); let mut queue = self.ipi_handler_queue.lock();
let handlers = core::mem::replace(queue.as_mut(), vec![]); let handlers = core::mem::replace(queue.as_mut(), vec![]);

@ -119,7 +119,7 @@ impl phy::TxToken for RouterTxToken {
{ {
let mut buffer = vec![0; len]; let mut buffer = vec![0; len];
let res = f(&mut buffer); let res = f(&mut buffer);
debug!("out buf {}", len); debug!("out buf {} data {:x?} port {}", len, &buffer[..20], (self.0).1);
unsafe { unsafe {
AXI_STREAM_FIFO_TDR.write_volatile(2); AXI_STREAM_FIFO_TDR.write_volatile(2);
@ -157,7 +157,7 @@ impl Driver for RouterInterface {
for i in 1..rdfo { for i in 1..rdfo {
buffer.push(AXI_STREAM_FIFO_RDFD.read_volatile() as u8); buffer.push(AXI_STREAM_FIFO_RDFD.read_volatile() as u8);
} }
debug!("got packet of length {} port {}", rdfo, port); debug!("got packet of length {} port {} data {:x?}", rdfo, port, &buffer[..20]);
driver.buffer[port as usize].push(buffer); driver.buffer[port as usize].push(buffer);
} }
drop(driver); drop(driver);

@ -1,4 +1,6 @@
use super::*; use super::*;
use crate::arch::cpu;
use crate::process::processor;
use crate::thread; use crate::thread;
use alloc::collections::VecDeque; use alloc::collections::VecDeque;
use alloc::sync::Arc; use alloc::sync::Arc;
@ -26,26 +28,44 @@ impl Condvar {
}); });
} }
#[deprecated(note = "this may leads to lost wakeup problem. please use `wait` instead.")] fn add_to_wait_queue(&self) -> MutexGuard<VecDeque<Arc<thread::Thread>>, SpinNoIrq> {
pub fn wait_any(condvars: &[&Condvar]) { let mut lock = self.wait_queue.lock();
let token = Arc::new(thread::current()); lock.push_back(Arc::new(thread::current()));
// Avoid racing in the same way as the function above return lock;
let mut locks = Vec::new(); }
locks.reserve(condvars.len());
pub fn wait_event<T>(condvar: &Condvar, mut condition: impl FnMut() -> Option<T>) -> T {
Self::wait_events(&[condvar], condition)
}
pub fn wait_events<T>(condvars: &[&Condvar], mut condition: impl FnMut() -> Option<T>) -> T {
let thread = thread::current();
let tid = thread.id();
let token = Arc::new(thread);
for condvar in condvars { for condvar in condvars {
let mut lock = condvar.wait_queue.lock(); let mut lock = condvar.wait_queue.lock();
lock.push_back(token.clone()); lock.push_back(token.clone());
locks.push(lock);
} }
thread::park_action(move || { let mut locks = Vec::with_capacity(condvars.len());
drop(locks); loop {
}); for condvar in condvars {
} let mut lock = condvar.wait_queue.lock();
locks.push(lock);
}
processor().manager().sleep(tid, 0);
locks.clear();
fn add_to_wait_queue(&self) -> MutexGuard<VecDeque<Arc<thread::Thread>>, SpinNoIrq> { if let Some(res) = condition() {
let mut lock = self.wait_queue.lock(); let _ = FlagsGuard::no_irq_region();
lock.push_back(Arc::new(thread::current())); processor().manager().cancel_sleeping(tid);
return lock; for condvar in condvars {
let mut lock = condvar.wait_queue.lock();
lock.retain(|t| !Arc::ptr_eq(t, &token));
}
return res;
}
processor().yield_now();
}
} }
/// Park current thread and wait for this condvar to be notified. /// Park current thread and wait for this condvar to be notified.
@ -54,21 +74,28 @@ impl Condvar {
S: MutexSupport, S: MutexSupport,
{ {
let mutex = guard.mutex; let mutex = guard.mutex;
let lock = self.add_to_wait_queue(); let token = Arc::new(thread::current());
let mut lock = self.wait_queue.lock();
lock.push_back(token.clone());
thread::park_action(move || { thread::park_action(move || {
drop(lock); drop(lock);
drop(guard); drop(guard);
}); });
mutex.lock() let ret = mutex.lock();
let mut lock = self.wait_queue.lock();
lock.retain(|t| !Arc::ptr_eq(&t, &token));
ret
} }
pub fn notify_one(&self) { pub fn notify_one(&self) {
if let Some(t) = self.wait_queue.lock().pop_front() { if let Some(t) = self.wait_queue.lock().front() {
t.unpark(); t.unpark();
} }
} }
pub fn notify_all(&self) { pub fn notify_all(&self) {
while let Some(t) = self.wait_queue.lock().pop_front() { let queue = self.wait_queue.lock();
for t in queue.iter() {
t.unpark(); t.unpark();
} }
} }
@ -76,14 +103,15 @@ impl Condvar {
/// Return the number of waiters that were woken up. /// Return the number of waiters that were woken up.
pub fn notify_n(&self, n: usize) -> usize { pub fn notify_n(&self, n: usize) -> usize {
let mut count = 0; let mut count = 0;
while count < n { let queue = self.wait_queue.lock();
if let Some(t) = self.wait_queue.lock().pop_front() { for t in queue.iter() {
t.unpark(); if (count >= n) {
count += 1;
} else {
break; break;
} }
t.unpark();
count += 1;
} }
count count
} }
} }

@ -105,14 +105,17 @@ impl Syscall<'_> {
drop(proc); drop(proc);
let begin_time_ms = crate::trap::uptime_msec(); let begin_time_ms = crate::trap::uptime_msec();
loop { Condvar::wait_events(&[&STDIN.pushed, &(*SOCKET_ACTIVITY)], move || {
use PollEvents as PE; use PollEvents as PE;
let proc = self.process(); let proc = self.process();
let mut events = 0; let mut events = 0;
for poll in polls.iter_mut() { for poll in polls.iter_mut() {
poll.revents = PE::empty(); poll.revents = PE::empty();
if let Some(file_like) = proc.files.get(&(poll.fd as usize)) { if let Some(file_like) = proc.files.get(&(poll.fd as usize)) {
let status = file_like.poll()?; let status = match file_like.poll() {
Ok(ret) => ret,
Err(err) => return Some(Err(err)),
};
if status.error { if status.error {
poll.revents |= PE::HUP; poll.revents |= PE::HUP;
events += 1; events += 1;
@ -133,19 +136,15 @@ impl Syscall<'_> {
drop(proc); drop(proc);
if events > 0 { if events > 0 {
return Ok(events); return Some(Ok(events));
} }
let current_time_ms = crate::trap::uptime_msec(); let current_time_ms = crate::trap::uptime_msec();
if timeout_msecs < (1 << 31) && current_time_ms - begin_time_ms > timeout_msecs { if timeout_msecs < (1 << 31) && current_time_ms - begin_time_ms > timeout_msecs {
return Ok(0); return Some(Ok(0));
} }
return None;
// NOTE: To run rustc, uncomment yield_now and comment Condvar. })
// Waking up from pipe is unimplemented now.
// thread::yield_now();
Condvar::wait_any(&[&STDIN.pushed, &(*SOCKET_ACTIVITY)]);
}
} }
pub fn sys_select( pub fn sys_select(
@ -180,7 +179,7 @@ impl Syscall<'_> {
drop(proc); drop(proc);
let begin_time_ms = crate::trap::uptime_msec(); let begin_time_ms = crate::trap::uptime_msec();
loop { Condvar::wait_events(&[&STDIN.pushed, &(*SOCKET_ACTIVITY)], move || {
let proc = self.process(); let proc = self.process();
let mut events = 0; let mut events = 0;
for (&fd, file_like) in proc.files.iter() { for (&fd, file_like) in proc.files.iter() {
@ -190,7 +189,10 @@ impl Syscall<'_> {
if !err_fds.contains(fd) && !read_fds.contains(fd) && !write_fds.contains(fd) { if !err_fds.contains(fd) && !read_fds.contains(fd) && !write_fds.contains(fd) {
continue; continue;
} }
let status = file_like.poll()?; let status = match file_like.poll() {
Ok(ret) => ret,
Err(err) => return Some(Err(err)),
};
if status.error && err_fds.contains(fd) { if status.error && err_fds.contains(fd) {
err_fds.set(fd); err_fds.set(fd);
events += 1; events += 1;
@ -207,23 +209,23 @@ impl Syscall<'_> {
drop(proc); drop(proc);
if events > 0 { if events > 0 {
return Ok(events); return Some(Ok(events));
} }
if timeout_msecs == 0 { if timeout_msecs == 0 {
// no timeout, return now; // no timeout, return now;
return Ok(0); return Some(Ok(0));
} }
let current_time_ms = crate::trap::uptime_msec(); let current_time_ms = crate::trap::uptime_msec();
// infinity check // infinity check
if timeout_msecs < (1 << 31) && current_time_ms - begin_time_ms > timeout_msecs as usize if timeout_msecs < (1 << 31) && current_time_ms - begin_time_ms > timeout_msecs as usize
{ {
return Ok(0); return Some(Ok(0));
} }
Condvar::wait_any(&[&STDIN.pushed, &(*SOCKET_ACTIVITY)]); return None;
} })
} }
pub fn sys_readv(&mut self, fd: usize, iov_ptr: *const IoVec, iov_count: usize) -> SysResult { pub fn sys_readv(&mut self, fd: usize, iov_ptr: *const IoVec, iov_count: usize) -> SysResult {

@ -549,12 +549,7 @@ pub fn spin_and_wait<T>(condvars: &[&Condvar], mut action: impl FnMut() -> Optio
return result; return result;
} }
} }
loop { Condvar::wait_events(&condvars, action)
if let Some(result) = action() {
return result;
}
Condvar::wait_any(&condvars);
}
} }
pub fn check_and_clone_cstr(user: *const u8) -> Result<String, SysError> { pub fn check_and_clone_cstr(user: *const u8) -> Result<String, SysError> {

Loading…
Cancel
Save