Make ProcessPool workers each use one pipe to send updates

Summary: Make the `ProcessPool` use one pipe per worker for worker-to-master communication.

Reviewed By: ngorogiannis

Differential Revision: D20158845

fbshipit-source-id: dc15607f8
master
Fernando Gasperi Jabalera 5 years ago committed by Facebook Github Bot
parent 17c565c2aa
commit a154c8c328

@ -69,8 +69,8 @@ type ('work, 'final, 'result) t =
(** array of child processes with their pids and channels we can use to send work down to
each child *)
; children_states: 'work child_state Array.t (** array tracking the state of each worker *)
; children_updates: Unix.File_descr.t
(** all the children send updates up the same pipe to the pool *)
; children_updates: Unix.File_descr.t list
(** each child has it's own pipe to send updates to the pool *)
; task_bar: TaskBar.t
; tasks: ('work, 'result) TaskGenerator.t (** generator for work remaining to be done *)
; file_lock: Utils.file_lock (** file lock for sending worker messages *) }
@ -138,19 +138,16 @@ let rec really_read ?(pos = 0) ~len fd ~buf =
timeout. If there is none left, return the list. *)
let wait_for_updates pool buffer =
let rec aux acc ~timeout =
let file_descr = pool.children_updates in
(* Use select(2) so that we can both wait on the pipe of children updates and wait for a
timeout. The timeout is for giving a chance to the taskbar of refreshing from time to time,
as well as for checking for new work where none were previously available. *)
let {Unix.Select_fds.read= read_fds} =
Unix.select ~read:[file_descr] ~write:[] ~except:[] ~timeout ()
Unix.select ~read:pool.children_updates ~write:[] ~except:[] ~timeout ()
in
match read_fds with
| _ :: _ :: _ ->
assert false
| [] ->
(* no updates, break loop *) acc
| [_file_descr] ->
| file_descr :: _ ->
(* Read one OCaml value at a time. This is done by first reading the header of the marshalled
value (fixed size), then get the total size of the data from that header, then request a
read of the full OCaml value.
@ -164,9 +161,9 @@ let wait_for_updates pool buffer =
as much as possible eagerly. This can empty the pipe without us having a way to tell that
there is more to read anymore since the [select] call will return that there is nothing to
read. *)
really_read pool.children_updates ~buf:buffer ~len:Marshal.header_size ;
really_read file_descr ~buf:buffer ~len:Marshal.header_size ;
let data_size = Marshal.data_size buffer 0 in
really_read pool.children_updates ~buf:buffer ~pos:Marshal.header_size ~len:data_size ;
really_read file_descr ~buf:buffer ~pos:Marshal.header_size ~len:data_size ;
aux (Marshal.from_bytes buffer 0 :: acc) ~timeout:`Immediately
in
aux [] ~timeout:refresh_timeout |> List.rev
@ -283,12 +280,12 @@ type 'a final_worker_message = Finished of int * 'a option | FinalCrash of int
let collect_results (pool : (_, 'final, _) t) =
let failed = ref false in
let updates_in = Unix.in_channel_of_descr pool.children_updates in
(* use [Array.init] just to collect n messages, the order in the array will not be the same as the
slots of the workers but that's ok *)
Array.init pool.jobs ~f:(fun i ->
if !failed then None
else
let updates_in = List.nth_exn pool.children_updates i |> Unix.in_channel_of_descr in
match (Marshal.from_channel updates_in : 'final final_worker_message) with
| exception (End_of_file | Failure _) ->
failed := true ;
@ -373,8 +370,8 @@ let fork_child ~file_lock ~child_prelude ~slot (updates_r, updates_w) ~f ~epilog
let to_child_r, to_child_w = Unix.pipe () in
match Unix.fork () with
| `In_the_child ->
let[@warning "-26"] updates_r = Unix.close updates_r in
let[@warning "-26"] to_child_w = Unix.close to_child_w in
Unix.close updates_r ;
Unix.close to_child_w ;
(* Pin to a core. [setcore] does the modulo <number of cores> for us. *)
Setcore.setcore slot ;
ProcessPoolState.in_child := true ;
@ -418,10 +415,13 @@ let fork_child ~file_lock ~child_prelude ~slot (updates_r, updates_w) ~f ~epilog
Epilogues.run () ;
Stdlib.exit 0
| `In_the_parent pid ->
let[@warning "-26"] to_child_r = Unix.close to_child_r in
Unix.close to_child_r ;
Unix.close updates_w ;
{pid; down_pipe= Unix.out_channel_of_descr to_child_w}
let rec create_pipes n = if Int.equal n 0 then [] else Unix.pipe () :: create_pipes (n - 1)
let create :
jobs:int
-> child_prelude:(unit -> unit)
@ -432,17 +432,14 @@ let create :
fun ~jobs ~child_prelude ~f ~child_epilogue ~tasks ->
let file_lock = Utils.create_file_lock () in
let task_bar = TaskBar.create ~jobs in
(* Pipe to communicate from children to parent. Only one pipe is needed: the messages sent by
children include the identifier of the child sending the message (its [slot]). This way there
is only one pipe to wait on for updates. *)
let ((pipe_child_r, pipe_child_w) as status_pipe) = Unix.pipe () in
let children_pipes = create_pipes jobs in
let slots =
Array.init jobs ~f:(fun slot ->
fork_child ~file_lock ~child_prelude ~slot status_pipe ~f ~epilogue:child_epilogue )
let child_pipe = List.nth_exn children_pipes slot in
fork_child ~file_lock ~child_prelude ~slot child_pipe ~f ~epilogue:child_epilogue )
in
(* we have forked the child processes and are now in the parent *)
let[@warning "-26"] pipe_child_w = Unix.close pipe_child_w in
let children_updates = pipe_child_r in
let children_updates = List.map children_pipes ~f:(fun (pipe_child_r, _) -> pipe_child_r) in
let children_states = Array.create ~len:jobs Initializing in
{slots; children_updates; jobs; task_bar; tasks= tasks (); children_states; file_lock}

Loading…
Cancel
Save