From a154c8c328ff96b1d174fc11822a1340fbecc554 Mon Sep 17 00:00:00 2001 From: Fernando Gasperi Jabalera Date: Mon, 2 Mar 2020 09:05:31 -0800 Subject: [PATCH] Make ProcessPool workers each use one pipe to send updates Summary: Make the `ProcessPool` use one pipe per worker for worker-to-master communication. Reviewed By: ngorogiannis Differential Revision: D20158845 fbshipit-source-id: dc15607f8 --- infer/src/base/ProcessPool.ml | 37 ++++++++++++++++------------------- 1 file changed, 17 insertions(+), 20 deletions(-) diff --git a/infer/src/base/ProcessPool.ml b/infer/src/base/ProcessPool.ml index 6e37eb232..cdf11690b 100644 --- a/infer/src/base/ProcessPool.ml +++ b/infer/src/base/ProcessPool.ml @@ -69,8 +69,8 @@ type ('work, 'final, 'result) t = (** array of child processes with their pids and channels we can use to send work down to each child *) ; children_states: 'work child_state Array.t (** array tracking the state of each worker *) - ; children_updates: Unix.File_descr.t - (** all the children send updates up the same pipe to the pool *) + ; children_updates: Unix.File_descr.t list + (** each child has it's own pipe to send updates to the pool *) ; task_bar: TaskBar.t ; tasks: ('work, 'result) TaskGenerator.t (** generator for work remaining to be done *) ; file_lock: Utils.file_lock (** file lock for sending worker messages *) } @@ -138,19 +138,16 @@ let rec really_read ?(pos = 0) ~len fd ~buf = timeout. If there is none left, return the list. *) let wait_for_updates pool buffer = let rec aux acc ~timeout = - let file_descr = pool.children_updates in (* Use select(2) so that we can both wait on the pipe of children updates and wait for a timeout. The timeout is for giving a chance to the taskbar of refreshing from time to time, as well as for checking for new work where none were previously available. *) let {Unix.Select_fds.read= read_fds} = - Unix.select ~read:[file_descr] ~write:[] ~except:[] ~timeout () + Unix.select ~read:pool.children_updates ~write:[] ~except:[] ~timeout () in match read_fds with - | _ :: _ :: _ -> - assert false | [] -> (* no updates, break loop *) acc - | [_file_descr] -> + | file_descr :: _ -> (* Read one OCaml value at a time. This is done by first reading the header of the marshalled value (fixed size), then get the total size of the data from that header, then request a read of the full OCaml value. @@ -164,9 +161,9 @@ let wait_for_updates pool buffer = as much as possible eagerly. This can empty the pipe without us having a way to tell that there is more to read anymore since the [select] call will return that there is nothing to read. *) - really_read pool.children_updates ~buf:buffer ~len:Marshal.header_size ; + really_read file_descr ~buf:buffer ~len:Marshal.header_size ; let data_size = Marshal.data_size buffer 0 in - really_read pool.children_updates ~buf:buffer ~pos:Marshal.header_size ~len:data_size ; + really_read file_descr ~buf:buffer ~pos:Marshal.header_size ~len:data_size ; aux (Marshal.from_bytes buffer 0 :: acc) ~timeout:`Immediately in aux [] ~timeout:refresh_timeout |> List.rev @@ -283,12 +280,12 @@ type 'a final_worker_message = Finished of int * 'a option | FinalCrash of int let collect_results (pool : (_, 'final, _) t) = let failed = ref false in - let updates_in = Unix.in_channel_of_descr pool.children_updates in (* use [Array.init] just to collect n messages, the order in the array will not be the same as the slots of the workers but that's ok *) Array.init pool.jobs ~f:(fun i -> if !failed then None else + let updates_in = List.nth_exn pool.children_updates i |> Unix.in_channel_of_descr in match (Marshal.from_channel updates_in : 'final final_worker_message) with | exception (End_of_file | Failure _) -> failed := true ; @@ -373,8 +370,8 @@ let fork_child ~file_lock ~child_prelude ~slot (updates_r, updates_w) ~f ~epilog let to_child_r, to_child_w = Unix.pipe () in match Unix.fork () with | `In_the_child -> - let[@warning "-26"] updates_r = Unix.close updates_r in - let[@warning "-26"] to_child_w = Unix.close to_child_w in + Unix.close updates_r ; + Unix.close to_child_w ; (* Pin to a core. [setcore] does the modulo for us. *) Setcore.setcore slot ; ProcessPoolState.in_child := true ; @@ -418,10 +415,13 @@ let fork_child ~file_lock ~child_prelude ~slot (updates_r, updates_w) ~f ~epilog Epilogues.run () ; Stdlib.exit 0 | `In_the_parent pid -> - let[@warning "-26"] to_child_r = Unix.close to_child_r in + Unix.close to_child_r ; + Unix.close updates_w ; {pid; down_pipe= Unix.out_channel_of_descr to_child_w} +let rec create_pipes n = if Int.equal n 0 then [] else Unix.pipe () :: create_pipes (n - 1) + let create : jobs:int -> child_prelude:(unit -> unit) @@ -432,17 +432,14 @@ let create : fun ~jobs ~child_prelude ~f ~child_epilogue ~tasks -> let file_lock = Utils.create_file_lock () in let task_bar = TaskBar.create ~jobs in - (* Pipe to communicate from children to parent. Only one pipe is needed: the messages sent by - children include the identifier of the child sending the message (its [slot]). This way there - is only one pipe to wait on for updates. *) - let ((pipe_child_r, pipe_child_w) as status_pipe) = Unix.pipe () in + let children_pipes = create_pipes jobs in let slots = Array.init jobs ~f:(fun slot -> - fork_child ~file_lock ~child_prelude ~slot status_pipe ~f ~epilogue:child_epilogue ) + let child_pipe = List.nth_exn children_pipes slot in + fork_child ~file_lock ~child_prelude ~slot child_pipe ~f ~epilogue:child_epilogue ) in (* we have forked the child processes and are now in the parent *) - let[@warning "-26"] pipe_child_w = Unix.close pipe_child_w in - let children_updates = pipe_child_r in + let children_updates = List.map children_pipes ~f:(fun (pipe_child_r, _) -> pipe_child_r) in let children_states = Array.create ~len:jobs Initializing in {slots; children_updates; jobs; task_bar; tasks= tasks (); children_states; file_lock}