Implement basic version of sys_poll supporting stdin and socket, fix some deadlocks and introduce some others

master
Jiajie Chen 6 years ago
parent 2683314c38
commit 42b02453a0

@ -138,6 +138,7 @@ impl ThreadPool {
(Status::Ready, _) => panic!("can not remove a process from ready queue"),
(Status::Exited(_), _) => panic!("can not set status for a exited process"),
(Status::Sleeping, Status::Exited(_)) => self.timer.lock().stop(Event::Wakeup(tid)),
(Status::Running(_), Status::Ready) => {} // to stop a thread, use stop() intead
(_, Status::Ready) => self.scheduler.push(tid),
_ => {}
}

@ -5,7 +5,7 @@ use lazy_static::lazy_static;
use smoltcp::wire::{EthernetAddress, Ipv4Address};
use smoltcp::socket::SocketSet;
use crate::sync::{SpinNoIrqLock, Condvar, MutexGuard, SpinNoIrq};
use crate::sync::{ThreadLock, SpinLock, Condvar, MutexGuard, SpinNoIrq};
mod device_tree;
pub mod bus;
@ -49,14 +49,14 @@ pub trait NetDriver : Send {
lazy_static! {
pub static ref DRIVERS: SpinNoIrqLock<Vec<Box<Driver>>> = SpinNoIrqLock::new(Vec::new());
pub static ref DRIVERS: SpinLock<Vec<Box<Driver>>> = SpinLock::new(Vec::new());
}
lazy_static! {
pub static ref NET_DRIVERS: SpinNoIrqLock<Vec<Box<NetDriver>>> = SpinNoIrqLock::new(Vec::new());
pub static ref NET_DRIVERS: ThreadLock<Vec<Box<NetDriver>>> = ThreadLock::new(Vec::new());
}
lazy_static! {
lazy_static!{
pub static ref SOCKET_ACTIVITY: Condvar = Condvar::new();
}

@ -11,7 +11,7 @@ use crate::sync::SpinNoIrqLock as Mutex;
#[derive(Default)]
pub struct Stdin {
buf: Mutex<VecDeque<char>>,
pushed: Condvar,
pub pushed: Condvar,
}
impl Stdin {
@ -36,6 +36,9 @@ impl Stdin {
}
}
}
pub fn can_read(&self) -> bool {
self.buf.lock().len() > 0
}
}
#[derive(Default)]

@ -23,7 +23,7 @@ pub struct Thread {
pub proc: Arc<Mutex<Process>>,
}
#[derive(Clone)]
#[derive(Clone, Debug)]
pub enum SocketType {
Raw,
Tcp(Option<IpEndpoint>), // save local endpoint for bind()
@ -31,7 +31,7 @@ pub enum SocketType {
Icmp
}
#[derive(Clone)]
#[derive(Clone, Debug)]
pub struct SocketWrapper {
pub handle: SocketHandle,
pub socket_type: SocketType,
@ -187,6 +187,14 @@ impl Thread {
info!("temporary copy data!");
let kstack = KernelStack::new();
let iface = &mut *(NET_DRIVERS.lock()[0]);
let mut sockets = iface.sockets();
for (_fd, file) in files.iter() {
if let FileLike::Socket(wrapper) = file {
sockets.retain(wrapper.handle);
}
}
Box::new(Thread {
context: unsafe { Context::new_fork(tf, kstack.top(), memory_set.token()) },

@ -1,20 +1,31 @@
use alloc::collections::VecDeque;
use super::*;
use crate::thread;
use alloc::sync::Arc;
#[derive(Default)]
pub struct Condvar {
wait_queue: SpinNoIrqLock<VecDeque<thread::Thread>>,
wait_queue: SpinNoIrqLock<VecDeque<Arc<thread::Thread>>>,
}
impl Condvar {
pub fn new() -> Self {
Condvar::default()
}
pub fn _wait(&self) {
self.wait_queue.lock().push_back(thread::current());
self.wait_queue.lock().push_back(Arc::new(thread::current()));
thread::park();
}
pub fn wait_any(condvars: &[&Condvar]) {
let token = Arc::new(thread::current());
for condvar in condvars {
condvar.wait_queue.lock().push_back(token.clone());
}
thread::park();
}
pub fn wait<'a, T, S>(&self, guard: MutexGuard<'a, T, S>) -> MutexGuard<'a, T, S>
where S: MutexSupport
{
@ -23,6 +34,7 @@ impl Condvar {
self._wait();
mutex.lock()
}
pub fn notify_one(&self) {
if let Some(t) = self.wait_queue.lock().pop_front() {
t.unpark();

@ -1,9 +1,12 @@
//! Syscalls for file system
use rcore_fs::vfs::Timespec;
use smoltcp::socket::*;
use crate::fs::*;
use crate::memory::MemorySet;
use crate::sync::Condvar;
use crate::drivers::{NET_DRIVERS, SOCKET_ACTIVITY};
use super::*;
use super::net::*;
@ -49,15 +52,68 @@ pub struct PollFd {
revents: u16
}
const POLLIN: u16 = 0x0001;
const POLPRI: u16 = 0x0002;
const POLLOUT: u16 = 0x0004;
const POLLERR: u16 = 0x0008;
const POLLHUP: u16 = 0x0010;
const POLLNVAL: u16 = 0x0020;
pub fn sys_poll(ufds: *mut PollFd, nfds: usize, timeout_msecs: usize) -> SysResult {
info!("poll: ufds: {:?}, nfds: {}, timeout_msecs: {:#x}", ufds, nfds, timeout_msecs);
let mut proc = process();
proc.memory_set.check_mut_array(ufds, nfds)?;
let slice = unsafe { slice::from_raw_parts_mut(ufds, nfds) };
for i in 0..nfds {
if proc.files.get(&(slice[i].fd as usize)).is_none() {
return Err(SysError::EINVAL);
}
}
drop(proc);
let begin_time_ms = unsafe {crate::trap::TICK / crate::consts::USEC_PER_TICK / 1000};
loop {
let mut proc = process();
for i in 0..nfds {
match proc.files.get(&(slice[i].fd as usize)) {
Some(FileLike::File(_)) => {
// assume it is stdin for now
if (slice[i].events & POLLIN) != 0 && STDIN.can_read() {
slice[i].revents = POLLIN;
return Ok(0);
}
},
Some(FileLike::Socket(wrapper)) => {
if let SocketType::Tcp(_) = wrapper.socket_type {
let iface = &mut *(NET_DRIVERS.lock()[0]);
let mut sockets = iface.sockets();
let mut socket = sockets.get::<TcpSocket>(wrapper.handle);
if !socket.is_open() {
slice[i].revents = POLLHUP;
return Ok(0);
} else if socket.can_recv() && (slice[i].events & POLLIN) != 0 {
slice[i].revents = POLLIN;
return Ok(0);
} else if socket.can_send() && (slice[i].events & POLLOUT) != 0 {
slice[i].revents = POLLIN;
return Ok(0);
}
} else {
unimplemented!()
}
}
None => {
slice[i].revents = POLLERR;
return Ok(0);
}
}
}
Condvar::wait_any(&[&STDIN.pushed, &(*SOCKET_ACTIVITY)]);
}
// emulate it for now
use core::time::Duration;
thread::sleep(Duration::from_millis(timeout_msecs as u64));
Ok(nfds as isize)
}

@ -13,6 +13,7 @@ use crate::fs::FileHandle;
use crate::process::*;
use crate::thread;
use crate::util;
use crate::arch::cpu;
use self::fs::*;
use self::mem::*;
@ -30,6 +31,9 @@ mod net;
/// System call dispatcher
pub fn syscall(id: usize, args: [usize; 6], tf: &mut TrapFrame) -> isize {
let pid = cpu::id();
let tid = processor().tid();
debug!("{}:{} syscall id {} begin", pid, tid, id);
let ret = match id {
// file
000 => sys_read(args[0], args[1] as *mut u8, args[2]),
@ -163,7 +167,7 @@ pub fn syscall(id: usize, args: [usize; 6], tf: &mut TrapFrame) -> isize {
crate::trap::error(tf);
}
};
debug!("syscall id {} ret with {:?}", id, ret);
debug!("{}:{} syscall id {} ret with {:?}", pid, tid, id, ret);
match ret {
Ok(code) => code,
Err(err) => -(err as isize),

@ -182,7 +182,6 @@ pub fn sys_connect(fd: usize, addr: *const u8, addrlen: usize) -> SysResult {
let mut proc = process();
proc.memory_set.check_ptr(addr)?;
let iface = &mut *(NET_DRIVERS.lock()[0]);
let mut dest = None;
let mut port = 0;
@ -197,6 +196,8 @@ pub fn sys_connect(fd: usize, addr: *const u8, addrlen: usize) -> SysResult {
let wrapper = proc.get_socket(fd)?;
if let SocketType::Tcp(_) = wrapper.socket_type {
let mut drivers = NET_DRIVERS.lock();
let iface = &mut *(drivers[0]);
let mut sockets = iface.sockets();
let mut socket = sockets.get::<TcpSocket>(wrapper.handle);
@ -207,19 +208,28 @@ pub fn sys_connect(fd: usize, addr: *const u8, addrlen: usize) -> SysResult {
// avoid deadlock
drop(socket);
drop(sockets);
drop(iface);
drop(drivers);
// wait for connection result
loop {
let mut drivers = NET_DRIVERS.lock();
let iface = &mut *(drivers[0]);
iface.poll();
let mut sockets = iface.sockets();
let mut socket = sockets.get::<TcpSocket>(wrapper.handle);
if socket.state() == TcpState::SynSent {
// still connecting
SOCKET_ACTIVITY._wait()
drop(socket);
drop(sockets);
drop(iface);
drop(drivers);
debug!("poll for connection wait");
SOCKET_ACTIVITY._wait();
} else if socket.state() == TcpState::Established {
break Ok(0);
} else if socket.state() == TcpState::Closed {
} else {
break Err(SysError::ECONNREFUSED);
}
}
@ -515,18 +525,17 @@ pub fn sys_recvfrom(
pub fn sys_close_socket(proc: &mut Process, fd: usize, handle: SocketHandle) -> SysResult {
let iface = &mut *(NET_DRIVERS.lock()[0]);
let mut socket = iface.sockets().remove(handle);
match socket {
Socket::Tcp(ref mut tcp_socket) => {
tcp_socket.close();
}
_ => {}
}
let mut sockets = iface.sockets();
sockets.release(handle);
sockets.prune();
Ok(0)
}
pub fn sys_bind(fd: usize, addr: *const u8, len: usize) -> SysResult {
info!(
"sys_bind: fd: {} addr: {:?} len: {}",
fd, addr, len
);
let mut proc = process();
proc.memory_set.check_array(addr, len)?;
@ -545,7 +554,7 @@ pub fn sys_bind(fd: usize, addr: *const u8, len: usize) -> SysResult {
}
let iface = &mut *(NET_DRIVERS.lock()[0]);
let wrapper = proc.get_socket_mut(fd)?;
let wrapper = &mut proc.get_socket_mut(fd)?;
if let SocketType::Tcp(_) = wrapper.socket_type {
wrapper.socket_type = SocketType::Tcp(Some(IpEndpoint::new(host.unwrap(), port)));
Ok(0)
@ -555,6 +564,10 @@ pub fn sys_bind(fd: usize, addr: *const u8, len: usize) -> SysResult {
}
pub fn sys_listen(fd: usize, backlog: usize) -> SysResult {
info!(
"sys_listen: fd: {} backlog: {}",
fd, backlog
);
// smoltcp tcp sockets do not support backlog
// open multiple sockets for each connection
let mut proc = process();
@ -575,6 +588,10 @@ pub fn sys_listen(fd: usize, backlog: usize) -> SysResult {
}
pub fn sys_accept(fd: usize, addr: *mut u8, addr_len: *mut u32) -> SysResult {
info!(
"sys_accept: fd: {} addr: {:?} addr_len: {:?}",
fd, addr, addr_len
);
// smoltcp tcp sockets do not support backlog
// open multiple sockets for each connection
let mut proc = process();
@ -584,16 +601,17 @@ pub fn sys_accept(fd: usize, addr: *mut u8, addr_len: *mut u32) -> SysResult {
let max_addr_len = unsafe { *addr_len } as usize;
if max_addr_len < size_of::<SockaddrIn>() {
debug!("length too short {}", max_addr_len);
return Err(SysError::EINVAL);
}
proc.memory_set.check_mut_array(addr, max_addr_len)?;
}
let iface = &mut *(NET_DRIVERS.lock()[0]);
let wrapper = proc.get_socket_mut(fd)?;
if let SocketType::Tcp(Some(endpoint)) = wrapper.socket_type {
loop {
let iface = &mut *(NET_DRIVERS.lock()[0]);
let mut sockets = iface.sockets();
let mut socket = sockets.get::<TcpSocket>(wrapper.handle);
@ -617,7 +635,7 @@ pub fn sys_accept(fd: usize, addr: *mut u8, addr_len: *mut u32) -> SysResult {
fd,
FileLike::Socket(SocketWrapper {
handle: tcp_handle,
socket_type: SocketType::Tcp(None),
socket_type: SocketType::Tcp(Some(endpoint)),
}),
)
.unwrap();
@ -634,14 +652,21 @@ pub fn sys_accept(fd: usize, addr: *mut u8, addr_len: *mut u32) -> SysResult {
// avoid deadlock
drop(socket);
drop(sockets);
drop(iface);
SOCKET_ACTIVITY._wait()
}
} else {
debug!("bad socket type {:?}", wrapper);
Err(SysError::EINVAL)
}
}
pub fn sys_getsockname(fd: usize, addr: *mut u8, addr_len: *mut u32) -> SysResult {
info!(
"sys_getsockname: fd: {} addr: {:?} addr_len: {:?}",
fd, addr, addr_len
);
// smoltcp tcp sockets do not support backlog
// open multiple sockets for each connection
let mut proc = process();

Loading…
Cancel
Save