@ -9,16 +9,14 @@ open! IStd
module F = Format
module L = Logging
exception ProcnameAlreadyLocked
module TaskGenerator = struct
type ' a t =
type ( ' a , ' b ) t =
{ remaining_tasks : unit -> int
; is_empty : unit -> bool
; finished : completed: bool -> ' a -> unit
; finished : result: ' b option -> ' a -> unit
; next : unit -> ' a option }
let chain ( gen1 : ' a t ) ( gen2 : ' a t ) : ' a t =
let chain ( gen1 : ( ' a , ' b ) t ) ( gen2 : ( ' a , ' b ) t ) : ( ' a , ' b ) t =
let remaining_tasks () = gen1 . remaining_tasks () + gen2 . remaining_tasks () in
let gen1_returned_empty = ref false in
let gen1_is_empty () =
@ -26,20 +24,19 @@ module TaskGenerator = struct
! gen1_returned_empty
in
let is_empty () = gen1_is_empty () && gen2 . is_empty () in
let finished ~ completed work_item =
if gen1_is_empty () then gen2 . finished ~ completed work_item
else gen1 . finished ~ completed work_item
let finished ~ result work_item =
if gen1_is_empty () then gen2 . finished ~ result work_item else gen1 . finished ~ result work_item
in
let next x = if gen1_is_empty () then gen2 . next x else gen1 . next x in
{ remaining_tasks ; is_empty ; finished ; next }
let of_list ( lst : ' a list ) : ' a t =
let of_list ( lst : ' a list ) : ( ' a , _ ) t =
let content = ref lst in
let length = ref ( List . length lst ) in
let remaining_tasks () = ! length in
let is_empty () = List . is_empty ! content in
let finished ~ completed : _ _ work_item = decr length in
let finished ~ result : _ _ work_item = decr length in
let next () =
match ! content with
| [] ->
@ -65,7 +62,7 @@ type child_info = {pid: Pid.t; down_pipe: Out_channel.t}
type ' a child_state = Initializing | Idle | Processing of ' a
(* * the state of the pool *)
type ( ' work , ' final ) t =
type ( ' work , ' final , ' result ) t =
{ jobs : int
(* * number of jobs running in parallel, i.e. number of children we are responsible for *)
; slots : child_info Array . t
@ -75,7 +72,7 @@ type ('work, 'final) t =
; children_updates : Unix . File_descr . t
(* * all the children send updates up the same pipe to the pool *)
; task_bar : TaskBar . t
; tasks : ' work TaskGenerator . t (* * generator for work remaining to be done *)
; tasks : ( ' work , ' result ) TaskGenerator . t (* * generator for work remaining to be done *)
; file_lock : Utils . file_lock (* * file lock for sending worker messages *) }
(* * {2 Constants} *)
@ -99,11 +96,11 @@ let buffer_size = 65_535
the pipe will crash in the parent process . This is a limitation of the way we read from the pipe
for now . To lift it , it should be possible to extend the buffer to the required length if we
notice that we are trying to read more than [ buffer_size ] for example . * )
type worker_message =
type ' result worker_message =
| UpdateStatus of int * Mtime . t * string
(* * [ ( i, t, status ) ]: starting a task from slot [i], at start time [t], with description
[ status ] . Watch out that [ status ] must not be too close in length to [ buffer_size ] . * )
| Ready of { worker : int ; completed: bool }
| Ready of { worker : int ; result: ' result }
(* * Sent after finishing initializing or after finishing a given task. When received by
master , this moves the worker state from [ Initializing ] or [ Processing _ ] to [ Idle ] . * )
| Crash of int (* * there was an error and the child is no longer receiving messages *)
@ -265,12 +262,12 @@ let process_updates pool buffer =
(* clean crash, give the child process a chance to cleanup *)
Unix . wait ( ` Pid pid ) | > ignore ;
killall pool ~ slot " see backtrace above "
| Ready { worker = slot ; completed } ->
| Ready { worker = slot ; result } ->
( match pool . children_states . ( slot ) with
| Initializing ->
()
| Processing work ->
pool . tasks . finished ~ completed work
pool . tasks . finished ~ result work
| Idle ->
L . die InternalError " Received a Ready message from an idle worker@. " ) ;
TaskBar . set_remaining_tasks pool . task_bar ( pool . tasks . remaining_tasks () ) ;
@ -284,7 +281,7 @@ let process_updates pool buffer =
type ' a final_worker_message = Finished of int * ' a option | FinalCrash of int
let collect_results ( pool : ( _ , ' final ) t ) =
let collect_results ( pool : ( _ , ' final , _ ) t ) =
let failed = ref false in
let updates_in = Unix . in_channel_of_descr pool . children_updates in
(* use [Array.init] just to collect n messages, the order in the array will not be the same as the
@ -332,9 +329,8 @@ let wait_all pool =
(* * worker loop: wait for tasks and run [f] on them until we are told to go home *)
let rec child_loop ~ slot send_to_parent send_final receive_from_parent ~ f ~ epilogue ~ prev_completed
=
send_to_parent ( Ready { worker = slot ; completed = prev_completed } ) ;
let rec child_loop ~ slot send_to_parent send_final receive_from_parent ~ f ~ epilogue ~ prev_result =
send_to_parent ( Ready { worker = slot ; result = prev_result } ) ;
match receive_from_parent () with
| GoHome -> (
match epilogue () with
@ -352,23 +348,21 @@ let rec child_loop ~slot send_to_parent send_final receive_from_parent ~f ~epilo
true ) ) )
| Do stuff ->
let result =
try f stuff ; true with
| ProcnameAlreadyLocked ->
false
| e ->
IExn . reraise_if e ~ f : ( fun () ->
if Config . keep_going then (
L . internal_error " Error in subprocess %d: %a@. " slot Exn . pp e ;
(* do not raise and continue accepting jobs *)
false )
else (
(* crash hard, but first let the master know that we have crashed *)
send_to_parent ( Crash slot ) ;
true ) ) ;
true
try f stuff
with e ->
IExn . reraise_if e ~ f : ( fun () ->
if Config . keep_going then (
L . internal_error " Error in subprocess %d: %a@. " slot Exn . pp e ;
(* do not raise and continue accepting jobs *)
false )
else (
(* crash hard, but first let the master know that we have crashed *)
send_to_parent ( Crash slot ) ;
true ) ) ;
None
in
child_loop ~ slot send_to_parent send_final receive_from_parent ~ f ~ epilogue
~ prev_ completed : result
~ prev_ result : result
(* * Fork a new child and start it so that it is ready for work.
@ -387,7 +381,7 @@ let fork_child ~file_lock ~child_prelude ~slot (updates_r, updates_w) ~f ~epilog
ProcessPoolState . reset_pid () ;
child_prelude () ;
let updates_oc = Unix . out_channel_of_descr updates_w in
let send_to_parent ( message : worker_message ) =
let send_to_parent ( message : ' b worker_message ) =
marshal_to_pipe ~ file_lock updates_oc message
in
let send_final ( final_message : ' a final_worker_message ) =
@ -414,8 +408,7 @@ let fork_child ~file_lock ~child_prelude ~slot (updates_r, updates_w) ~f ~epilog
PerfEvent . ( log ( fun logger -> log_end_event logger () ) ) ;
x
in
child_loop ~ slot send_to_parent send_final receive_from_parent ~ f ~ epilogue
~ prev_completed : true ;
child_loop ~ slot send_to_parent send_final receive_from_parent ~ f ~ epilogue ~ prev_result : None ;
Out_channel . close updates_oc ;
In_channel . close orders_ic ;
Epilogues . run () ;
@ -428,10 +421,10 @@ let fork_child ~file_lock ~child_prelude ~slot (updates_r, updates_w) ~f ~epilog
let create :
jobs : int
-> child_prelude : ( unit -> unit )
-> f : ( ' work -> unit )
-> f : ( ' work -> ' result option )
-> child_epilogue : ( unit -> ' final )
-> tasks : ( unit -> ' work TaskGenerator . t )
-> ( ' work , ' final ) t =
-> tasks : ( unit -> ( ' work , ' result ) TaskGenerator . t )
-> ( ' work , ' final , ' result ) t =
fun ~ jobs ~ child_prelude ~ f ~ child_epilogue ~ tasks ->
let file_lock = Utils . create_file_lock () in
let task_bar = TaskBar . create ~ jobs in