@ -72,8 +72,7 @@ type ('work, 'final, 'result) t =
; children_updates : Unix . File_descr . t list
(* * each child has it's own pipe to send updates to the pool *)
; task_bar : TaskBar . t
; tasks : ( ' work , ' result ) TaskGenerator . t (* * generator for work remaining to be done *)
; file_lock : Utils . file_lock (* * file lock for sending worker messages *) }
; tasks : ( ' work , ' result ) TaskGenerator . t (* * generator for work remaining to be done *) }
(* * {2 Constants} *)
@ -113,14 +112,12 @@ type 'a boss_message =
(* * convenience function to send data down pipes without forgetting to flush *)
let marshal_to_pipe ? file_lock fd x =
let marshal_to_pipe fd x =
PerfEvent . log ( fun logger ->
PerfEvent . log_begin_event logger ~ categories : [ " sys " ] ~ name : " send to pipe " () ) ;
Option . iter file_lock ~ f : ( fun { Utils . lock } -> lock () ) ;
Marshal . to_channel fd x [] ;
(* Channel flush should be inside the critical section. *)
Out_channel . flush fd ;
Option . iter file_lock ~ f : ( fun { Utils . unlock } -> unlock () ) ;
PerfEvent . ( log ( fun logger -> log_end_event logger () ) )
@ -374,7 +371,7 @@ let rec child_loop ~slot send_to_parent send_final receive_from_parent ~f ~epilo
The child inherits [ updates_w ] to send updates up to the parent , and a new pipe is set up for
the parent to send instructions down to the child . * )
let fork_child ~ file_lock ~ child_prologue ~ slot ( updates_r , updates_w ) ~ f ~ epilogue =
let fork_child ~ child_prologue ~ slot ( updates_r , updates_w ) ~ f ~ epilogue =
let to_child_r , to_child_w = Unix . pipe () in
match Unix . fork () with
| ` In_the_child ->
@ -386,11 +383,9 @@ let fork_child ~file_lock ~child_prologue ~slot (updates_r, updates_w) ~f ~epilo
ProcessPoolState . reset_pid () ;
child_prologue () ;
let updates_oc = Unix . out_channel_of_descr updates_w in
let send_to_parent ( message : ' b worker_message ) =
marshal_to_pipe ~ file_lock updates_oc message
in
let send_to_parent ( message : ' b worker_message ) = marshal_to_pipe updates_oc message in
let send_final ( final_message : ' a final_worker_message ) =
marshal_to_pipe ~ file_lock updates_oc final_message
marshal_to_pipe updates_oc final_message
in
(* Function to send updates up the pipe to the parent instead of directly to the task
bar . This is because only the parent knows about all the children , hence it's in charge of
@ -438,13 +433,12 @@ let create :
-> tasks : ( unit -> ( ' work , ' result ) TaskGenerator . t )
-> ( ' work , ' final , ' result ) t =
fun ~ jobs ~ child_prologue ~ f ~ child_epilogue ~ tasks ->
let file_lock = Utils . create_file_lock () in
let task_bar = TaskBar . create ~ jobs in
let children_pipes = create_pipes jobs in
let slots =
Array . init jobs ~ f : ( fun slot ->
let child_pipe = List . nth_exn children_pipes slot in
fork_child ~ file_lock ~ child_prologue ~ slot child_pipe ~ f ~ epilogue : child_epilogue )
fork_child ~ child_prologue ~ slot child_pipe ~ f ~ epilogue : child_epilogue )
in
ProcessPoolState . has_running_children := true ;
Epilogues . register ~ description : " Wait children processes exit " ~ f : ( fun () ->
@ -455,24 +449,23 @@ let create :
(* we have forked the child processes and are now in the parent *)
let children_updates = List . map children_pipes ~ f : ( fun ( pipe_child_r , _ ) -> pipe_child_r ) in
let children_states = Array . create ~ len : jobs Initializing in
{ slots ; children_updates ; jobs ; task_bar ; tasks = tasks () ; children_states ; file_lock }
{ slots ; children_updates ; jobs ; task_bar ; tasks = tasks () ; children_states }
let run pool =
Utils . with_file_lock ~ file_lock : pool . file_lock ~ f : ( fun () ->
let total_tasks = pool . tasks . remaining_tasks () in
TaskBar . set_tasks_total pool . task_bar total_tasks ;
TaskBar . tasks_done_reset pool . task_bar ;
(* allocate a buffer for reading children updates once for the whole run *)
let buffer = Bytes . create buffer_size in
(* wait for all children to run out of tasks *)
while not ( pool . tasks . is_empty () && all_children_idle pool ) do
process_updates pool buffer ;
TaskBar . refresh pool . task_bar
done ;
let results = wait_all pool in
TaskBar . finish pool . task_bar ;
results )
let total_tasks = pool . tasks . remaining_tasks () in
TaskBar . set_tasks_total pool . task_bar total_tasks ;
TaskBar . tasks_done_reset pool . task_bar ;
(* allocate a buffer for reading children updates once for the whole run *)
let buffer = Bytes . create buffer_size in
(* wait for all children to run out of tasks *)
while not ( pool . tasks . is_empty () && all_children_idle pool ) do
process_updates pool buffer ;
TaskBar . refresh pool . task_bar
done ;
let results = wait_all pool in
TaskBar . finish pool . task_bar ;
results
let run pool =