From 42b02453a0b30ed07be81e8b651a7495f438d797 Mon Sep 17 00:00:00 2001 From: Jiajie Chen Date: Wed, 6 Mar 2019 20:24:55 +0800 Subject: [PATCH] Implement basic version of sys_poll supporting stdin and socket, fix some deadlocks and introduce some others --- crate/thread/src/thread_pool.rs | 1 + kernel/src/drivers/mod.rs | 8 ++--- kernel/src/fs/stdio.rs | 5 ++- kernel/src/process/structs.rs | 12 +++++-- kernel/src/sync/condvar.rs | 16 +++++++-- kernel/src/syscall/fs.rs | 62 +++++++++++++++++++++++++++++++-- kernel/src/syscall/mod.rs | 6 +++- kernel/src/syscall/net.rs | 53 ++++++++++++++++++++-------- 8 files changed, 136 insertions(+), 27 deletions(-) diff --git a/crate/thread/src/thread_pool.rs b/crate/thread/src/thread_pool.rs index 3d464a7..60c6bdc 100644 --- a/crate/thread/src/thread_pool.rs +++ b/crate/thread/src/thread_pool.rs @@ -138,6 +138,7 @@ impl ThreadPool { (Status::Ready, _) => panic!("can not remove a process from ready queue"), (Status::Exited(_), _) => panic!("can not set status for a exited process"), (Status::Sleeping, Status::Exited(_)) => self.timer.lock().stop(Event::Wakeup(tid)), + (Status::Running(_), Status::Ready) => {} // to stop a thread, use stop() intead (_, Status::Ready) => self.scheduler.push(tid), _ => {} } diff --git a/kernel/src/drivers/mod.rs b/kernel/src/drivers/mod.rs index a30f0ca..683b0ec 100644 --- a/kernel/src/drivers/mod.rs +++ b/kernel/src/drivers/mod.rs @@ -5,7 +5,7 @@ use lazy_static::lazy_static; use smoltcp::wire::{EthernetAddress, Ipv4Address}; use smoltcp::socket::SocketSet; -use crate::sync::{SpinNoIrqLock, Condvar, MutexGuard, SpinNoIrq}; +use crate::sync::{ThreadLock, SpinLock, Condvar, MutexGuard, SpinNoIrq}; mod device_tree; pub mod bus; @@ -49,14 +49,14 @@ pub trait NetDriver : Send { lazy_static! { - pub static ref DRIVERS: SpinNoIrqLock>> = SpinNoIrqLock::new(Vec::new()); + pub static ref DRIVERS: SpinLock>> = SpinLock::new(Vec::new()); } lazy_static! { - pub static ref NET_DRIVERS: SpinNoIrqLock>> = SpinNoIrqLock::new(Vec::new()); + pub static ref NET_DRIVERS: ThreadLock>> = ThreadLock::new(Vec::new()); } -lazy_static! { +lazy_static!{ pub static ref SOCKET_ACTIVITY: Condvar = Condvar::new(); } diff --git a/kernel/src/fs/stdio.rs b/kernel/src/fs/stdio.rs index 184243a..bbce87b 100644 --- a/kernel/src/fs/stdio.rs +++ b/kernel/src/fs/stdio.rs @@ -11,7 +11,7 @@ use crate::sync::SpinNoIrqLock as Mutex; #[derive(Default)] pub struct Stdin { buf: Mutex>, - pushed: Condvar, + pub pushed: Condvar, } impl Stdin { @@ -36,6 +36,9 @@ impl Stdin { } } } + pub fn can_read(&self) -> bool { + self.buf.lock().len() > 0 + } } #[derive(Default)] diff --git a/kernel/src/process/structs.rs b/kernel/src/process/structs.rs index 94ca586..2be23f6 100644 --- a/kernel/src/process/structs.rs +++ b/kernel/src/process/structs.rs @@ -23,7 +23,7 @@ pub struct Thread { pub proc: Arc>, } -#[derive(Clone)] +#[derive(Clone, Debug)] pub enum SocketType { Raw, Tcp(Option), // save local endpoint for bind() @@ -31,7 +31,7 @@ pub enum SocketType { Icmp } -#[derive(Clone)] +#[derive(Clone, Debug)] pub struct SocketWrapper { pub handle: SocketHandle, pub socket_type: SocketType, @@ -187,6 +187,14 @@ impl Thread { info!("temporary copy data!"); let kstack = KernelStack::new(); + let iface = &mut *(NET_DRIVERS.lock()[0]); + let mut sockets = iface.sockets(); + for (_fd, file) in files.iter() { + if let FileLike::Socket(wrapper) = file { + sockets.retain(wrapper.handle); + } + } + Box::new(Thread { context: unsafe { Context::new_fork(tf, kstack.top(), memory_set.token()) }, diff --git a/kernel/src/sync/condvar.rs b/kernel/src/sync/condvar.rs index 036bd99..d4d8cde 100644 --- a/kernel/src/sync/condvar.rs +++ b/kernel/src/sync/condvar.rs @@ -1,20 +1,31 @@ use alloc::collections::VecDeque; use super::*; use crate::thread; +use alloc::sync::Arc; #[derive(Default)] pub struct Condvar { - wait_queue: SpinNoIrqLock>, + wait_queue: SpinNoIrqLock>>, } impl Condvar { pub fn new() -> Self { Condvar::default() } + pub fn _wait(&self) { - self.wait_queue.lock().push_back(thread::current()); + self.wait_queue.lock().push_back(Arc::new(thread::current())); thread::park(); } + + pub fn wait_any(condvars: &[&Condvar]) { + let token = Arc::new(thread::current()); + for condvar in condvars { + condvar.wait_queue.lock().push_back(token.clone()); + } + thread::park(); + } + pub fn wait<'a, T, S>(&self, guard: MutexGuard<'a, T, S>) -> MutexGuard<'a, T, S> where S: MutexSupport { @@ -23,6 +34,7 @@ impl Condvar { self._wait(); mutex.lock() } + pub fn notify_one(&self) { if let Some(t) = self.wait_queue.lock().pop_front() { t.unpark(); diff --git a/kernel/src/syscall/fs.rs b/kernel/src/syscall/fs.rs index a4afd75..6c866a0 100644 --- a/kernel/src/syscall/fs.rs +++ b/kernel/src/syscall/fs.rs @@ -1,9 +1,12 @@ //! Syscalls for file system use rcore_fs::vfs::Timespec; +use smoltcp::socket::*; use crate::fs::*; use crate::memory::MemorySet; +use crate::sync::Condvar; +use crate::drivers::{NET_DRIVERS, SOCKET_ACTIVITY}; use super::*; use super::net::*; @@ -49,15 +52,68 @@ pub struct PollFd { revents: u16 } +const POLLIN: u16 = 0x0001; +const POLPRI: u16 = 0x0002; +const POLLOUT: u16 = 0x0004; +const POLLERR: u16 = 0x0008; +const POLLHUP: u16 = 0x0010; +const POLLNVAL: u16 = 0x0020; + pub fn sys_poll(ufds: *mut PollFd, nfds: usize, timeout_msecs: usize) -> SysResult { info!("poll: ufds: {:?}, nfds: {}, timeout_msecs: {:#x}", ufds, nfds, timeout_msecs); let mut proc = process(); proc.memory_set.check_mut_array(ufds, nfds)?; + let slice = unsafe { slice::from_raw_parts_mut(ufds, nfds) }; + for i in 0..nfds { + if proc.files.get(&(slice[i].fd as usize)).is_none() { + return Err(SysError::EINVAL); + } + } + drop(proc); + + + let begin_time_ms = unsafe {crate::trap::TICK / crate::consts::USEC_PER_TICK / 1000}; + loop { + let mut proc = process(); + for i in 0..nfds { + match proc.files.get(&(slice[i].fd as usize)) { + Some(FileLike::File(_)) => { + // assume it is stdin for now + if (slice[i].events & POLLIN) != 0 && STDIN.can_read() { + slice[i].revents = POLLIN; + return Ok(0); + } + }, + Some(FileLike::Socket(wrapper)) => { + if let SocketType::Tcp(_) = wrapper.socket_type { + let iface = &mut *(NET_DRIVERS.lock()[0]); + let mut sockets = iface.sockets(); + let mut socket = sockets.get::(wrapper.handle); + + if !socket.is_open() { + slice[i].revents = POLLHUP; + return Ok(0); + } else if socket.can_recv() && (slice[i].events & POLLIN) != 0 { + slice[i].revents = POLLIN; + return Ok(0); + } else if socket.can_send() && (slice[i].events & POLLOUT) != 0 { + slice[i].revents = POLLIN; + return Ok(0); + } + } else { + unimplemented!() + } + } + None => { + slice[i].revents = POLLERR; + return Ok(0); + } + } + } + Condvar::wait_any(&[&STDIN.pushed, &(*SOCKET_ACTIVITY)]); + } - // emulate it for now - use core::time::Duration; - thread::sleep(Duration::from_millis(timeout_msecs as u64)); Ok(nfds as isize) } diff --git a/kernel/src/syscall/mod.rs b/kernel/src/syscall/mod.rs index 219a2fd..b5cf478 100644 --- a/kernel/src/syscall/mod.rs +++ b/kernel/src/syscall/mod.rs @@ -13,6 +13,7 @@ use crate::fs::FileHandle; use crate::process::*; use crate::thread; use crate::util; +use crate::arch::cpu; use self::fs::*; use self::mem::*; @@ -30,6 +31,9 @@ mod net; /// System call dispatcher pub fn syscall(id: usize, args: [usize; 6], tf: &mut TrapFrame) -> isize { + let pid = cpu::id(); + let tid = processor().tid(); + debug!("{}:{} syscall id {} begin", pid, tid, id); let ret = match id { // file 000 => sys_read(args[0], args[1] as *mut u8, args[2]), @@ -163,7 +167,7 @@ pub fn syscall(id: usize, args: [usize; 6], tf: &mut TrapFrame) -> isize { crate::trap::error(tf); } }; - debug!("syscall id {} ret with {:?}", id, ret); + debug!("{}:{} syscall id {} ret with {:?}", pid, tid, id, ret); match ret { Ok(code) => code, Err(err) => -(err as isize), diff --git a/kernel/src/syscall/net.rs b/kernel/src/syscall/net.rs index 7453027..dd4cacf 100644 --- a/kernel/src/syscall/net.rs +++ b/kernel/src/syscall/net.rs @@ -182,7 +182,6 @@ pub fn sys_connect(fd: usize, addr: *const u8, addrlen: usize) -> SysResult { let mut proc = process(); proc.memory_set.check_ptr(addr)?; - let iface = &mut *(NET_DRIVERS.lock()[0]); let mut dest = None; let mut port = 0; @@ -197,6 +196,8 @@ pub fn sys_connect(fd: usize, addr: *const u8, addrlen: usize) -> SysResult { let wrapper = proc.get_socket(fd)?; if let SocketType::Tcp(_) = wrapper.socket_type { + let mut drivers = NET_DRIVERS.lock(); + let iface = &mut *(drivers[0]); let mut sockets = iface.sockets(); let mut socket = sockets.get::(wrapper.handle); @@ -207,19 +208,28 @@ pub fn sys_connect(fd: usize, addr: *const u8, addrlen: usize) -> SysResult { // avoid deadlock drop(socket); drop(sockets); + drop(iface); + drop(drivers); // wait for connection result loop { + let mut drivers = NET_DRIVERS.lock(); + let iface = &mut *(drivers[0]); iface.poll(); let mut sockets = iface.sockets(); let mut socket = sockets.get::(wrapper.handle); if socket.state() == TcpState::SynSent { // still connecting - SOCKET_ACTIVITY._wait() + drop(socket); + drop(sockets); + drop(iface); + drop(drivers); + debug!("poll for connection wait"); + SOCKET_ACTIVITY._wait(); } else if socket.state() == TcpState::Established { break Ok(0); - } else if socket.state() == TcpState::Closed { + } else { break Err(SysError::ECONNREFUSED); } } @@ -515,18 +525,17 @@ pub fn sys_recvfrom( pub fn sys_close_socket(proc: &mut Process, fd: usize, handle: SocketHandle) -> SysResult { let iface = &mut *(NET_DRIVERS.lock()[0]); - let mut socket = iface.sockets().remove(handle); - match socket { - Socket::Tcp(ref mut tcp_socket) => { - tcp_socket.close(); - } - _ => {} - } - + let mut sockets = iface.sockets(); + sockets.release(handle); + sockets.prune(); Ok(0) } pub fn sys_bind(fd: usize, addr: *const u8, len: usize) -> SysResult { + info!( + "sys_bind: fd: {} addr: {:?} len: {}", + fd, addr, len + ); let mut proc = process(); proc.memory_set.check_array(addr, len)?; @@ -545,7 +554,7 @@ pub fn sys_bind(fd: usize, addr: *const u8, len: usize) -> SysResult { } let iface = &mut *(NET_DRIVERS.lock()[0]); - let wrapper = proc.get_socket_mut(fd)?; + let wrapper = &mut proc.get_socket_mut(fd)?; if let SocketType::Tcp(_) = wrapper.socket_type { wrapper.socket_type = SocketType::Tcp(Some(IpEndpoint::new(host.unwrap(), port))); Ok(0) @@ -555,6 +564,10 @@ pub fn sys_bind(fd: usize, addr: *const u8, len: usize) -> SysResult { } pub fn sys_listen(fd: usize, backlog: usize) -> SysResult { + info!( + "sys_listen: fd: {} backlog: {}", + fd, backlog + ); // smoltcp tcp sockets do not support backlog // open multiple sockets for each connection let mut proc = process(); @@ -575,6 +588,10 @@ pub fn sys_listen(fd: usize, backlog: usize) -> SysResult { } pub fn sys_accept(fd: usize, addr: *mut u8, addr_len: *mut u32) -> SysResult { + info!( + "sys_accept: fd: {} addr: {:?} addr_len: {:?}", + fd, addr, addr_len + ); // smoltcp tcp sockets do not support backlog // open multiple sockets for each connection let mut proc = process(); @@ -584,16 +601,17 @@ pub fn sys_accept(fd: usize, addr: *mut u8, addr_len: *mut u32) -> SysResult { let max_addr_len = unsafe { *addr_len } as usize; if max_addr_len < size_of::() { + debug!("length too short {}", max_addr_len); return Err(SysError::EINVAL); } proc.memory_set.check_mut_array(addr, max_addr_len)?; } - let iface = &mut *(NET_DRIVERS.lock()[0]); let wrapper = proc.get_socket_mut(fd)?; if let SocketType::Tcp(Some(endpoint)) = wrapper.socket_type { loop { + let iface = &mut *(NET_DRIVERS.lock()[0]); let mut sockets = iface.sockets(); let mut socket = sockets.get::(wrapper.handle); @@ -617,7 +635,7 @@ pub fn sys_accept(fd: usize, addr: *mut u8, addr_len: *mut u32) -> SysResult { fd, FileLike::Socket(SocketWrapper { handle: tcp_handle, - socket_type: SocketType::Tcp(None), + socket_type: SocketType::Tcp(Some(endpoint)), }), ) .unwrap(); @@ -634,14 +652,21 @@ pub fn sys_accept(fd: usize, addr: *mut u8, addr_len: *mut u32) -> SysResult { // avoid deadlock drop(socket); drop(sockets); + drop(iface); SOCKET_ACTIVITY._wait() } } else { + debug!("bad socket type {:?}", wrapper); Err(SysError::EINVAL) } } pub fn sys_getsockname(fd: usize, addr: *mut u8, addr_len: *mut u32) -> SysResult { + info!( + "sys_getsockname: fd: {} addr: {:?} addr_len: {:?}", + fd, addr, addr_len + ); + // smoltcp tcp sockets do not support backlog // open multiple sockets for each connection let mut proc = process();