@ -9,11 +9,13 @@ open! IStd
module F = Format
module F = Format
module L = Logging
module L = Logging
type child_info = { pid : Pid . t ; down_pipe : Out_channel . t }
(* * the state of the pool *)
(* * the state of the pool *)
type ' a t =
type ' a t =
{ jobs : int
{ jobs : int
(* * number of jobs running in parallel, i.e. number of children we are responsible for *)
(* * number of jobs running in parallel, i.e. number of children we are responsible for *)
; slots : ( Pid . t * Out_channel . t ) Array . t
; slots : child_info Array . t
(* * array of child processes with their pids and channels we can use to send work down to
(* * array of child processes with their pids and channels we can use to send work down to
each child * )
each child * )
; children_updates : Unix . File_descr . t
; children_updates : Unix . File_descr . t
@ -50,6 +52,7 @@ type worker_message =
(* * [ ( i, t, status ) ]: starting a task from slot [i], at start time [t], with description
(* * [ ( i, t, status ) ]: starting a task from slot [i], at start time [t], with description
[ status ] . Watch out that [ status ] must not be too close in length to [ buffer_size ] . * )
[ status ] . Watch out that [ status ] must not be too close in length to [ buffer_size ] . * )
| Ready of int (* * finished the given task, ready to receive messages *)
| Ready of int (* * finished the given task, ready to receive messages *)
| Crash of int (* * there was an error and the child is no longer receiving messages *)
(* * messages from the parent process down to worker processes *)
(* * messages from the parent process down to worker processes *)
type ' a boss_message =
type ' a boss_message =
@ -68,21 +71,43 @@ let rec really_read ?(pos= 0) ~len fd ~buf =
really_read ~ pos : ( pos + read ) ~ len : ( len - read ) fd ~ buf
really_read ~ pos : ( pos + read ) ~ len : ( len - read ) fd ~ buf
(* * main dispatch function that responds to messages from worker processes and updates the taskbar
(* * return [true] if the [file_descr] is ready for reading after at most [timeout] has
periodically * )
elapsed * )
let process_updates pool buffer =
let wait_for_updates file_descr timeout =
let timeout = if TaskBar . is_interactive pool . task_bar then refresh_timeout else ` Never in
(* Use select ( 2 ) so that we can both wait on the pipe of children updates and wait for a
(* Use select ( 2 ) so that we can both wait on the pipe of children updates and wait for a
timeout . The timeout is for giving a chance to the taskbar of refreshing from time to time . * )
timeout . The timeout is for giving a chance to the taskbar of refreshing from time to time . * )
let { Unix . Select_fds . read = read_fds } =
let { Unix . Select_fds . read = read_fds } =
Unix . select ~ read : [ pool. children_updates ] ~ write : [] ~ except : [] ~ timeout ()
Unix . select ~ read : [ file_descr ] ~ write : [] ~ except : [] ~ timeout ()
match read_fds with
match read_fds with _ :: _ :: _ -> assert false | [] -> false | [ _ file_descr ] -> true
| _ :: _ :: _ ->
assert false
| [] ->
let killall pool ~ slot status =
Array . iter pool . slots ~ f : ( fun { pid } ->
| [ _ updates_fd ] ->
match Signal . send Signal . term ( ` Pid pid ) with ` Ok | ` No_such_process -> () ) ;
Array . iter pool . slots ~ f : ( fun { pid } ->
try Unix . wait ( ` Pid pid ) | > ignore with Unix . Unix_error ( ECHILD , _ , _ ) ->
(* some children may have died already, it's fine *) () ) ;
L . die InternalError " Subprocess %d: %s " slot status
let has_dead_child pool =
Unix . wait_nohang ` Any
| > Option . map ~ f : ( fun ( dead_pid , status ) ->
( Array . find_mapi_exn pool . slots ~ f : ( fun slot { pid } ->
if Pid . equal pid dead_pid then Some slot else None )
, status ) )
(* * main dispatch function that responds to messages from worker processes and updates the taskbar
periodically * )
let process_updates pool buffer =
(* abort everything if some child has died unexpectedly *)
has_dead_child pool
| > Option . iter ~ f : ( fun ( slot , status ) ->
killall pool ~ slot ( Unix . Exit_or_signal . to_string_hum status ) ) ;
let timeout = if TaskBar . is_interactive pool . task_bar then refresh_timeout else ` Never in
if wait_for_updates pool . children_updates timeout then (
(* Read one OCaml value at a time. This is done by first reading the header of the marshalled
(* Read one OCaml value at a time. This is done by first reading the header of the marshalled
value ( fixed size ) , then get the total size of the data from that header , then request a
value ( fixed size ) , then get the total size of the data from that header , then request a
read of the full OCaml value .
read of the full OCaml value .
@ -99,9 +124,15 @@ let process_updates pool buffer =
really_read pool . children_updates ~ buf : buffer ~ len : Marshal . header_size ;
really_read pool . children_updates ~ buf : buffer ~ len : Marshal . header_size ;
let data_size = Marshal . data_size buffer 0 in
let data_size = Marshal . data_size buffer 0 in
really_read pool . children_updates ~ buf : buffer ~ pos : Marshal . header_size ~ len : data_size ;
really_read pool . children_updates ~ buf : buffer ~ pos : Marshal . header_size ~ len : data_size ;
match Marshal . from_bytes buffer 0 with
let update = Marshal . from_bytes buffer 0 in
match update with
| UpdateStatus ( slot , t , status ) ->
| UpdateStatus ( slot , t , status ) ->
TaskBar . update_status pool . task_bar ~ slot t status
TaskBar . update_status pool . task_bar ~ slot t status
| Crash slot ->
let { pid } = ( pool . slots ) . ( slot ) in
(* clean crash, give the child process a chance to cleanup *)
Unix . wait ( ` Pid pid ) | > ignore ;
killall pool ~ slot " see backtrace above "
| Ready slot ->
| Ready slot ->
TaskBar . tasks_done_add pool . task_bar 1 ;
TaskBar . tasks_done_add pool . task_bar 1 ;
match pool . tasks with
match pool . tasks with
@ -110,29 +141,30 @@ let process_updates pool buffer =
pool . idle_children <- pool . idle_children + 1
pool . idle_children <- pool . idle_children + 1
| x :: tasks ->
| x :: tasks ->
pool . tasks <- tasks ;
pool . tasks <- tasks ;
let pipe = snd ( pool . slots ) . ( slot ) in
let { down_ pipe} = ( pool . slots ) . ( slot ) in
marshal_to_pipe pipe ( Do x )
marshal_to_pipe down_ pipe ( Do x ) )
(* * terminate all worker processes *)
(* * terminate all worker processes *)
let wait_all pool =
let wait_all pool =
(* tell all workers to go home *)
(* tell each alive worker to go home and wait ( 2 ) them, one by one; the order doesn't matter since
Array . iter pool . slots ~ f : ( fun ( _ , pipe ) -> marshal_to_pipe pipe GoHome ; Out_channel . close pipe ) ;
we want to wait for all of them eventually anyway . * )
(* wait ( 2 ) all the pids one by one; the order doesn't matter since we want to wait for all of them
eventually anyway . * )
let errors =
let errors =
Array . fold ~ init : [] pool . slots ~ f : ( fun errors ( pid , _ ) ->
Array . foldi ~ init : [] pool . slots ~ f : ( fun slot errors { pid ; down_pipe } ->
marshal_to_pipe down_pipe GoHome ;
Out_channel . close down_pipe ;
match Unix . wait ( ` Pid pid ) with
match Unix . wait ( ` Pid pid ) with
| _ pid , Ok () ->
| _ pid , Ok () ->
| _ pid , ( Error _ as status ) ->
| _ pid , ( Error _ as status ) ->
(* Collect all children errors and die only at the end to avoid creating zombies. *)
(* Collect all children errors and die only at the end to avoid creating zombies. *)
status :: errors )
( slot , status ) :: errors )
if not ( List . is_empty errors ) then
if not ( List . is_empty errors ) then
let log_or_die = if Config . keep_going then L . internal_error else L . die InternalError in
let log_or_die = if Config . keep_going then L . internal_error else L . die InternalError in
let pp_error f status =
let pp_error f ( slot , status ) =
F . fprintf f " Error in infer subprocess: %s@. " ( Unix . Exit_or_signal . to_string_hum status )
F . fprintf f " Error in infer subprocess %d: %s@. " slot
( Unix . Exit_or_signal . to_string_hum status )
log_or_die " @[<v>%a@]%! " ( Pp . seq ~ print_env : Pp . text_break ~ sep : " " pp_error ) errors
log_or_die " @[<v>%a@]%! " ( Pp . seq ~ print_env : Pp . text_break ~ sep : " " pp_error ) errors
@ -141,12 +173,20 @@ let wait_all pool =
let rec child_loop ~ slot send_to_parent receive_from_parent ~ f =
let rec child_loop ~ slot send_to_parent receive_from_parent ~ f =
send_to_parent ( Ready slot ) ;
send_to_parent ( Ready slot ) ;
match receive_from_parent () with
match receive_from_parent () with
| Do stuff ->
(* TODO: error handling *)
f stuff ;
child_loop ~ slot send_to_parent receive_from_parent ~ f
| GoHome ->
| GoHome ->
| Do stuff ->
( try f stuff with e ->
IExn . reraise_if e ~ f : ( fun () ->
if Config . keep_going then (
L . internal_error " Error in subprocess %d: %a@. " slot Exn . pp e ;
(* do not raise and continue accepting jobs *)
false )
else (
(* crash hard, but first let the master know that we have crashed *)
send_to_parent ( Crash slot ) ;
true ) ) ) ;
child_loop ~ slot send_to_parent receive_from_parent ~ f
(* * Fork a new child and start it so that it is ready for work.
(* * Fork a new child and start it so that it is ready for work.
@ -186,7 +226,7 @@ let fork_child ~child_prelude ~slot (updates_r, updates_w) ~f =
Pervasives . exit 0
Pervasives . exit 0
| ` In_the_parent pid ->
| ` In_the_parent pid ->
let [ @ warning " -26 " ] to_child_r = Unix . close to_child_r in
let [ @ warning " -26 " ] to_child_r = Unix . close to_child_r in
(pid , Unix . out_channel_of_descr to_child_w )
{pid ; down_pipe = Unix . out_channel_of_descr to_child_w }
let create : jobs : int -> child_prelude : ( unit -> unit ) -> TaskBar . t -> f : ( ' a -> unit ) -> ' a t =
let create : jobs : int -> child_prelude : ( unit -> unit ) -> TaskBar . t -> f : ( ' a -> unit ) -> ' a t =
@ -214,7 +254,7 @@ let run pool tasks =
(* allocate a buffer for reading children updates once for the whole run *)
(* allocate a buffer for reading children updates once for the whole run *)
let buffer = Bytes . create buffer_size in
let buffer = Bytes . create buffer_size in
(* wait for all children to run out of tasks *)
(* wait for all children to run out of tasks *)
while pool. idle_children < pool . jobs do
while not ( List . is_empty pool . tasks && pool . idle_children > = pool . jobs ) do
process_updates pool buffer ; TaskBar . refresh pool . task_bar
process_updates pool buffer ; TaskBar . refresh pool . task_bar
done ;
done ;
wait_all pool
wait_all pool