@ -60,7 +60,13 @@ type 'a boss_message =
| GoHome (* * all tasks done, prepare for teardown *)
| GoHome (* * all tasks done, prepare for teardown *)
(* * convenience function to send data down pipes without forgetting to flush *)
(* * convenience function to send data down pipes without forgetting to flush *)
let marshal_to_pipe fd x = Marshal . to_channel fd x [] ; Out_channel . flush fd
let marshal_to_pipe fd x =
PerfEvent . log ( fun logger ->
PerfEvent . log_begin_event logger ~ categories : [ " sys " ] ~ name : " send to pipe " () ) ;
Marshal . to_channel fd x [] ;
Out_channel . flush fd ;
PerfEvent . ( log ( fun logger -> log_end_event logger () ) )
(* * like [Unix.read] but reads until [len] bytes have been read *)
(* * like [Unix.read] but reads until [len] bytes have been read *)
let rec really_read ? ( pos = 0 ) ~ len fd ~ buf =
let rec really_read ? ( pos = 0 ) ~ len fd ~ buf =
@ -73,13 +79,45 @@ let rec really_read ?(pos = 0) ~len fd ~buf =
(* * return [true] if the [file_descr] is ready for reading after at most [timeout] has
(* * return [true] if the [file_descr] is ready for reading after at most [timeout] has
elapsed * )
elapsed * )
let wait_for_updates file_descr timeout =
let wait_for_updates pool buffer =
let file_descr = pool . children_updates in
let timeout = if TaskBar . is_interactive pool . task_bar then refresh_timeout else ` Never in
(* Use select ( 2 ) so that we can both wait on the pipe of children updates and wait for a
(* Use select ( 2 ) so that we can both wait on the pipe of children updates and wait for a
timeout . The timeout is for giving a chance to the taskbar of refreshing from time to time . * )
timeout . The timeout is for giving a chance to the taskbar of refreshing from time to time . * )
let { Unix . Select_fds . read = read_fds } =
let { Unix . Select_fds . read = read_fds } =
Unix . select ~ read : [ file_descr ] ~ write : [] ~ except : [] ~ timeout ()
Unix . select ~ read : [ file_descr ] ~ write : [] ~ except : [] ~ timeout ()
in
in
match read_fds with _ :: _ :: _ -> assert false | [] -> false | [ _ file_descr ] -> true
match read_fds with
| _ :: _ :: _ ->
assert false
| [] ->
(* not ready *) None
| [ _ file_descr ] ->
(* Read one OCaml value at a time. This is done by first reading the header of the marshalled
value ( fixed size ) , then get the total size of the data from that header , then request a
read of the full OCaml value .
This way the buffer is used for only one OCaml value at a time . This is simpler ( values do
not overlap across the end of a read and the beginning of another ) and means we do not need
a large buffer as long as messages are never bigger than the buffer .
This works somewhat like [ Marshal . from_channel ] but uses the file descriptor directly
instead of an [ in_channel ] . Do * not * read from the pipe via an [ in_channel ] as they read
as much as possible eagerly . This can empty the pipe without us having a way to tell that
there is more to read anymore since the [ select ] call will return that there is nothing to
read . * )
really_read pool . children_updates ~ buf : buffer ~ len : Marshal . header_size ;
let data_size = Marshal . data_size buffer 0 in
really_read pool . children_updates ~ buf : buffer ~ pos : Marshal . header_size ~ len : data_size ;
Some ( Marshal . from_bytes buffer 0 )
let wait_for_updates pool buffer =
PerfEvent . log ( fun logger ->
PerfEvent . log_begin_event logger ~ categories : [ " sys " ] ~ name : " wait for event " () ) ;
let update = wait_for_updates pool buffer in
PerfEvent . ( log ( fun logger -> log_end_event logger () ) ) ;
update
let killall pool ~ slot status =
let killall pool ~ slot status =
@ -106,34 +144,15 @@ let process_updates pool buffer =
has_dead_child pool
has_dead_child pool
| > Option . iter ~ f : ( fun ( slot , status ) ->
| > Option . iter ~ f : ( fun ( slot , status ) ->
killall pool ~ slot ( Unix . Exit_or_signal . to_string_hum status ) ) ;
killall pool ~ slot ( Unix . Exit_or_signal . to_string_hum status ) ) ;
let timeout = if TaskBar . is_interactive pool . task_bar then refresh_timeout else ` Never in
match wait_for_updates pool buffer with
if wait_for_updates pool . children_updates timeout then (
| Some ( UpdateStatus ( slot , t , status ) ) ->
(* Read one OCaml value at a time. This is done by first reading the header of the marshalled
value ( fixed size ) , then get the total size of the data from that header , then request a
read of the full OCaml value .
This way the buffer is used for only one OCaml value at a time . This is simpler ( values do
not overlap across the end of a read and the beginning of another ) and means we do not need
a large buffer as long as messages are never bigger than the buffer .
This works somewhat like [ Marshal . from_channel ] but uses the file descriptor directly
instead of an [ in_channel ] . Do * not * read from the pipe via an [ in_channel ] as they read
as much as possible eagerly . This can empty the pipe without us having a way to tell that
there is more to read anymore since the [ select ] call will return that there is nothing to
read . * )
really_read pool . children_updates ~ buf : buffer ~ len : Marshal . header_size ;
let data_size = Marshal . data_size buffer 0 in
really_read pool . children_updates ~ buf : buffer ~ pos : Marshal . header_size ~ len : data_size ;
let update = Marshal . from_bytes buffer 0 in
match update with
| UpdateStatus ( slot , t , status ) ->
TaskBar . update_status pool . task_bar ~ slot t status
TaskBar . update_status pool . task_bar ~ slot t status
| Crash slot ->
| Some ( Crash slot ) ->
let { pid } = pool . slots . ( slot ) in
let { pid } = pool . slots . ( slot ) in
(* clean crash, give the child process a chance to cleanup *)
(* clean crash, give the child process a chance to cleanup *)
Unix . wait ( ` Pid pid ) | > ignore ;
Unix . wait ( ` Pid pid ) | > ignore ;
killall pool ~ slot " see backtrace above "
killall pool ~ slot " see backtrace above "
| Ready slot -> (
| Some ( Ready slot ) -> (
TaskBar . tasks_done_add pool . task_bar 1 ;
TaskBar . tasks_done_add pool . task_bar 1 ;
match pool . tasks with
match pool . tasks with
| [] ->
| [] ->
@ -142,7 +161,9 @@ let process_updates pool buffer =
| x :: tasks ->
| x :: tasks ->
pool . tasks <- tasks ;
pool . tasks <- tasks ;
let { down_pipe } = pool . slots . ( slot ) in
let { down_pipe } = pool . slots . ( slot ) in
marshal_to_pipe down_pipe ( Do x ) ) )
marshal_to_pipe down_pipe ( Do x ) )
| None ->
()
(* * terminate all worker processes *)
(* * terminate all worker processes *)
@ -220,7 +241,13 @@ let fork_child ~child_prelude ~slot (updates_r, updates_w) ~f =
in
in
ProcessPoolState . update_status := update_status ;
ProcessPoolState . update_status := update_status ;
let orders_ic = Unix . in_channel_of_descr to_child_r in
let orders_ic = Unix . in_channel_of_descr to_child_r in
let receive_from_parent () = Marshal . from_channel orders_ic in
let receive_from_parent () =
PerfEvent . log ( fun logger ->
PerfEvent . log_begin_event logger ~ categories : [ " sys " ] ~ name : " receive from pipe " () ) ;
let x = Marshal . from_channel orders_ic in
PerfEvent . ( log ( fun logger -> log_end_event logger () ) ) ;
x
in
child_loop ~ slot send_to_parent receive_from_parent ~ f ;
child_loop ~ slot send_to_parent receive_from_parent ~ f ;
Out_channel . close updates_oc ;
Out_channel . close updates_oc ;
In_channel . close orders_ic ;
In_channel . close orders_ic ;