@ -38,7 +38,8 @@ type ('work, 'final) t =
; children_updates : Unix . File_descr . t
(* * all the children send updates up the same pipe to the pool *)
; task_bar : TaskBar . t
; tasks : ' work task_generator (* * generator for work remaining to be done *) }
; tasks : ' work task_generator (* * generator for work remaining to be done *)
; file_lock : Utils . file_lock (* * file lock for sending worker messages *) }
(* * {2 Constants} *)
@ -78,10 +79,13 @@ type 'a boss_message =
| GoHome (* * all tasks done, prepare for teardown *)
(* * convenience function to send data down pipes without forgetting to flush *)
let marshal_to_pipe fd x =
let marshal_to_pipe ? file_lock fd x =
PerfEvent . log ( fun logger ->
PerfEvent . log_begin_event logger ~ categories : [ " sys " ] ~ name : " send to pipe " () ) ;
Option . iter file_lock ~ f : ( fun { Utils . lock } -> lock () ) ;
Marshal . to_channel fd x [] ;
Option . iter file_lock ~ f : ( fun { Utils . unlock } -> unlock () ) ;
Out_channel . flush fd ;
PerfEvent . ( log ( fun logger -> log_end_event logger () ) )
@ -324,7 +328,7 @@ let rec child_loop ~slot send_to_parent send_final receive_from_parent ~f ~epilo
The child inherits [ updates_w ] to send updates up to the parent , and a new pipe is set up for
the parent to send instructions down to the child . * )
let fork_child ~ child_prelude ~ slot ( updates_r , updates_w ) ~ f ~ epilogue =
let fork_child ~ file_lock ~ child_prelude ~ slot ( updates_r , updates_w ) ~ f ~ epilogue =
let to_child_r , to_child_w = Unix . pipe () in
match Unix . fork () with
| ` In_the_child ->
@ -336,9 +340,11 @@ let fork_child ~child_prelude ~slot (updates_r, updates_w) ~f ~epilogue =
ProcessPoolState . reset_pid () ;
child_prelude () ;
let updates_oc = Unix . out_channel_of_descr updates_w in
let send_to_parent ( message : worker_message ) = marshal_to_pipe updates_oc message in
let send_to_parent ( message : worker_message ) =
marshal_to_pipe ~ file_lock updates_oc message
in
let send_final ( final_message : ' a final_worker_message ) =
marshal_to_pipe updates_oc final_message
marshal_to_pipe ~ file_lock updates_oc final_message
in
(* Function to send updates up the pipe to the parent instead of directly to the task
bar . This is because only the parent knows about all the children , hence it's in charge of
@ -379,6 +385,7 @@ let create :
-> tasks : ' work task_generator
-> ( ' work , ' final ) t =
fun ~ jobs ~ child_prelude ~ f ~ child_epilogue ~ tasks ->
let file_lock = Utils . create_file_lock () in
let task_bar = TaskBar . create ~ jobs in
(* Pipe to communicate from children to parent. Only one pipe is needed: the messages sent by
children include the identifier of the child sending the message ( its [ slot ] ) . This way there
@ -386,27 +393,28 @@ let create :
let ( ( pipe_child_r , pipe_child_w ) as status_pipe ) = Unix . pipe () in
let slots =
Array . init jobs ~ f : ( fun slot ->
fork_child ~ child_prelude ~ slot status_pipe ~ f ~ epilogue : child_epilogue )
fork_child ~ file_lock ~ child_prelude ~ slot status_pipe ~ f ~ epilogue : child_epilogue )
in
(* we have forked the child processes and are now in the parent *)
let [ @ warning " -26 " ] pipe_child_w = Unix . close pipe_child_w in
let children_updates = pipe_child_r in
let children_states = Array . create ~ len : jobs Initializing in
{ slots ; children_updates ; jobs ; task_bar ; tasks ; children_states }
{ slots ; children_updates ; jobs ; task_bar ; tasks ; children_states ; file_lock }
let run pool =
let total_tasks = pool . tasks . remaining_tasks () in
TaskBar . set_tasks_total pool . task_bar total_tasks ;
TaskBar . tasks_done_reset pool . task_bar ;
(* allocate a buffer for reading children updates once for the whole run *)
let buffer = Bytes . create buffer_size in
(* wait for all children to run out of tasks *)
while not ( pool . tasks . is_empty () && all_children_idle pool ) do
process_updates pool buffer ; TaskBar . refresh pool . task_bar
done ;
let results = wait_all pool in
TaskBar . finish pool . task_bar ; results
Utils . with_file_lock ~ file_lock : pool . file_lock ~ f : ( fun () ->
let total_tasks = pool . tasks . remaining_tasks () in
TaskBar . set_tasks_total pool . task_bar total_tasks ;
TaskBar . tasks_done_reset pool . task_bar ;
(* allocate a buffer for reading children updates once for the whole run *)
let buffer = Bytes . create buffer_size in
(* wait for all children to run out of tasks *)
while not ( pool . tasks . is_empty () && all_children_idle pool ) do
process_updates pool buffer ; TaskBar . refresh pool . task_bar
done ;
let results = wait_all pool in
TaskBar . finish pool . task_bar ; results )
let run pool =