[processpool] collect results from children

Modify the scheduler to collect results from children at the end of the
parallel execution. Use this to collect backend stats and log their
aggregated sum.

@ -60,6 +60,15 @@ let copy from ~into =
into.summary_has_model_queries <- summary_has_model_queries
let merge stats1 stats2 =
{ summary_file_try_load= stats1.summary_file_try_load + stats2.summary_file_try_load
; summary_read_from_disk= stats1.summary_read_from_disk + stats2.summary_read_from_disk
; summary_cache_hits= stats1.summary_cache_hits + stats2.summary_cache_hits
; summary_cache_misses= stats1.summary_cache_misses + stats2.summary_cache_misses
; summary_has_model_queries= stats1.summary_has_model_queries + stats2.summary_has_model_queries
let initial =
{ summary_file_try_load= 0
; summary_read_from_disk= 0

@ -10,6 +10,8 @@ open! IStd
type t
val initial : t
val incr_summary_file_try_load : unit -> unit
(** a query to the filesystem attempting to load a summary file *)
@ -30,3 +32,6 @@ val get : unit -> t
(** get the stats so far *)
val pp : Format.formatter -> t -> unit
val merge : t -> t -> t
(** return a new value that adds up the stats in both arguments *)

@ -123,6 +123,19 @@ let main ~changed_files =
L.environment_info "Parallel jobs: %d@." Config.jobs ;
let tasks = TaskScheduler.schedule source_files_to_analyze in
(* Prepare tasks one cluster at a time while executing in parallel *)
let runner = Tasks.Runner.create ~jobs:Config.jobs ~f:analyze_target ~tasks in
Tasks.Runner.run runner ) ;
let runner =
Tasks.Runner.create ~jobs:Config.jobs ~f:analyze_target ~child_epilogue:BackendStats.get
let all_stats = Tasks.Runner.run runner in
let stats =
Array.fold all_stats ~init:BackendStats.initial ~f:(fun collated_stats stats_opt ->
match stats_opt with
| None ->
| Some stats ->
L.debug Analysis Quiet "gotstats:@\n%a@." BackendStats.pp stats ;
BackendStats.merge stats collated_stats )
L.debug Analysis Quiet "collected stats:@\n%a@." BackendStats.pp stats ) ;
output_json_makefile_stats source_files_to_analyze

@ -22,21 +22,18 @@ let fork_protect ~f x =
(* get different streams of random numbers in each fork, in particular to lessen contention in
`Filename.mk_temp` *)
Random.self_init () ;
~f:(fun () -> L.debug Analysis Quiet "%a@." BackendStats.pp (BackendStats.get ()))
~description:"dumping summaries stats" ;
f x
module Runner = struct
type 'a t = 'a ProcessPool.t
type ('work, 'final) t = ('work, 'final) ProcessPool.t
let create ~jobs ~f ~tasks =
let create ~jobs ~f ~child_epilogue ~tasks =
log (fun logger -> log_begin_event logger ~categories:["sys"] ~name:"fork prepare" ())) ;
ResultsDatabase.db_close () ;
let pool =
ProcessPool.create ~jobs ~f ~tasks
ProcessPool.create ~jobs ~f ~child_epilogue ~tasks
((* hack: run post-fork bookkeeping stuff by passing a dummy function to [fork_protect] *)
fork_protect ~f:(fun () -> ()))

@ -21,11 +21,16 @@ val fork_protect : f:('a -> 'b) -> 'a -> 'b
(** A runner accepts new tasks repeatedly for parallel execution *)
module Runner : sig
type 'a t
val create : jobs:int -> f:'a doer -> tasks:'a task_generator -> 'a t
type ('work, 'final) t
val create :
-> f:'work doer
-> child_epilogue:(unit -> 'final)
-> tasks:'work task_generator
-> ('work, 'final) t
(** Create a runner running [jobs] jobs in parallel *)
val run : 'a t -> unit
val run : (_, 'final) t -> 'final option Array.t
(** Start the given tasks with the runner and wait until completion *)

@ -9,6 +9,8 @@ open! IStd
module F = Format
module L = Logging
let log_or_die fmt = if Config.keep_going then L.internal_error fmt else L.die InternalError fmt
type child_info = {pid: Pid.t; down_pipe: Out_channel.t}
(** The master's abstraction of state for workers.
@ -26,17 +28,17 @@ type 'a task_generator =
; next: unit -> 'a option }
(** the state of the pool *)
type 'a t =
type ('work, 'final) t =
{ jobs: int
(** number of jobs running in parallel, i.e. number of children we are responsible for *)
; slots: child_info Array.t
(** array of child processes with their pids and channels we can use to send work down to
each child *)
; children_states: 'a child_state Array.t (** array tracking the state of each worker *)
; children_states: 'work child_state Array.t (** array tracking the state of each worker *)
; children_updates: Unix.File_descr.t
(** all the children send updates up the same pipe to the pool *)
; task_bar: TaskBar.t
; tasks: 'a task_generator (** generator for work remaining to be done *) }
; tasks: 'work task_generator (** generator for work remaining to be done *) }
(** {2 Constants} *)
@ -232,6 +234,8 @@ let process_updates pool buffer =
| UpdateStatus (slot, t, status) ->
TaskBar.update_status pool.task_bar ~slot t status
| Crash slot ->
(* NOTE: the workers only send this message if {!Config.keep_going} is not [true] so if
we receive it we know we should fail hard *)
let {pid} = pool.slots.(slot) in
(* clean crash, give the child process a chance to cleanup *)
Unix.wait (`Pid pid) |> ignore ;
@ -254,14 +258,39 @@ let process_updates pool buffer =
type 'a final_worker_message = Finished of int * 'a option | FinalCrash of int
let collect_results (pool : (_, 'final) t) =
let failed = ref false in
let updates_in = Unix.in_channel_of_descr pool.children_updates in
(* use [Array.init] just to collect n messages, the order in the array will not be the same as the
slots of the workers but that's ok *)
Array.init pool.jobs ~f:(fun i ->
if !failed then None
match (Marshal.from_channel updates_in : 'final final_worker_message) with
| exception (End_of_file | Failure _) ->
failed := true ;
log_or_die "@[<v>error reading %dth final values from children@]%!" i ;
| FinalCrash slot ->
(* NOTE: the workers only send this message if {!Config.keep_going} is not [true] so if
we receive it we know we should fail hard *)
killall pool ~slot "see backtrace above"
| Finished (_slot, data) ->
data )
(** terminate all worker processes *)
let wait_all pool =
(* tell each alive worker to go home and wait(2) them, one by one; the order doesn't matter since
we want to wait for all of them eventually anyway. *)
(* tell each alive worker to go home *)
Array.iter pool.slots ~f:(fun {down_pipe} ->
marshal_to_pipe down_pipe GoHome ; Out_channel.close down_pipe ) ;
let results = collect_results pool in
(* wait(2) workers one by one; the order doesn't matter since we want to wait for all of them
eventually anyway. *)
let errors =
Array.foldi ~init:[] pool.slots ~f:(fun slot errors {pid; down_pipe} ->
marshal_to_pipe down_pipe GoHome ;
Out_channel.close down_pipe ;
Array.foldi ~init:[] pool.slots ~f:(fun slot errors {pid} ->
match Unix.wait (`Pid pid) with
| _pid, Ok () ->
@ -269,21 +298,33 @@ let wait_all pool =
(* Collect all children errors and die only at the end to avoid creating zombies. *)
(slot, status) :: errors )
if not (List.is_empty errors) then
let log_or_die = if Config.keep_going then L.internal_error else L.die InternalError in
( if not (List.is_empty errors) then
let pp_error f (slot, status) =
F.fprintf f "Error in infer subprocess %d: %s@." slot
(Unix.Exit_or_signal.to_string_hum status)
log_or_die "@[<v>%a@]%!" (Pp.seq ~print_env:Pp.text_break ~sep:"" pp_error) errors
log_or_die "@[<v>%a@]%!" (Pp.seq ~print_env:Pp.text_break ~sep:"" pp_error) errors ) ;
(** worker loop: wait for tasks and run [f] on them until we are told to go home *)
let rec child_loop ~slot send_to_parent receive_from_parent ~f =
let rec child_loop ~slot send_to_parent send_final receive_from_parent ~f ~epilogue =
send_to_parent (Ready slot) ;
match receive_from_parent () with
| GoHome ->
| GoHome -> (
match epilogue () with
| data ->
send_final (Finished (slot, Some data))
| exception e ->
IExn.reraise_if e ~f:(fun () ->
if Config.keep_going then (
L.internal_error "Error running epilogue in subprocess %d: %a@." slot Exn.pp e ;
send_final (Finished (slot, None)) ;
false )
else (
(* crash hard, but first let the master know that we have crashed *)
send_final (FinalCrash slot) ;
true ) ) )
| Do stuff ->
( try f stuff
with e ->
@ -296,14 +337,14 @@ let rec child_loop ~slot send_to_parent receive_from_parent ~f =
(* crash hard, but first let the master know that we have crashed *)
send_to_parent (Crash slot) ;
true ) ) ) ;
child_loop ~slot send_to_parent receive_from_parent ~f
child_loop ~slot send_to_parent send_final receive_from_parent ~f ~epilogue
(** Fork a new child and start it so that it is ready for work.
The child inherits [updates_w] to send updates up to the parent, and a new pipe is set up for
the parent to send instructions down to the child. *)
let fork_child ~child_prelude ~slot (updates_r, updates_w) ~f =
let fork_child ~child_prelude ~slot (updates_r, updates_w) ~f ~epilogue =
let to_child_r, to_child_w = Unix.pipe () in
match Unix.fork () with
| `In_the_child ->
@ -316,6 +357,9 @@ let fork_child ~child_prelude ~slot (updates_r, updates_w) ~f =
child_prelude () ;
let updates_oc = Unix.out_channel_of_descr updates_w in
let send_to_parent (message : worker_message) = marshal_to_pipe updates_oc message in
let send_final (final_message : 'a final_worker_message) =
marshal_to_pipe updates_oc final_message
(* Function to send updates up the pipe to the parent instead of directly to the task
bar. This is because only the parent knows about all the children, hence it's in charge of
actually updating the task bar. *)
@ -337,7 +381,7 @@ let fork_child ~child_prelude ~slot (updates_r, updates_w) ~f =
PerfEvent.(log (fun logger -> log_end_event logger ())) ;
child_loop ~slot send_to_parent receive_from_parent ~f ;
child_loop ~slot send_to_parent send_final receive_from_parent ~f ~epilogue ;
Out_channel.close updates_oc ;
In_channel.close orders_ic ;
Epilogues.run () ;
@ -348,14 +392,22 @@ let fork_child ~child_prelude ~slot (updates_r, updates_w) ~f =
let create :
jobs:int -> child_prelude:(unit -> unit) -> f:('a -> unit) -> tasks:'a task_generator -> 'a t =
fun ~jobs ~child_prelude ~f ~tasks ->
-> child_prelude:(unit -> unit)
-> f:('work -> unit)
-> child_epilogue:(unit -> 'final)
-> tasks:'work task_generator
-> ('work, 'final) t =
fun ~jobs ~child_prelude ~f ~child_epilogue ~tasks ->
let task_bar = TaskBar.create ~jobs in
(* Pipe to communicate from children to parent. Only one pipe is needed: the messages sent by
children include the identifier of the child sending the message (its [slot]). This way there
is only one pipe to wait on for updates. *)
let ((pipe_child_r, pipe_child_w) as status_pipe) = Unix.pipe () in
let slots = Array.init jobs ~f:(fun slot -> fork_child ~child_prelude ~slot status_pipe ~f) in
let slots =
Array.init jobs ~f:(fun slot ->
fork_child ~child_prelude ~slot status_pipe ~f ~epilogue:child_epilogue )
(* we have forked the child processes and are now in the parent *)
let[@warning "-26"] pipe_child_w = Unix.close pipe_child_w in
let children_updates = pipe_child_r in
@ -373,11 +425,12 @@ let run pool =
while not (pool.tasks.is_empty () && all_children_idle pool) do
process_updates pool buffer ; TaskBar.refresh pool.task_bar
done ;
wait_all pool ;
TaskBar.finish pool.task_bar
let results = wait_all pool in
TaskBar.finish pool.task_bar ; results
let run pool =
PerfEvent.(log (fun logger -> log_instant_event logger ~name:"start process pool" Global)) ;
run pool ;
PerfEvent.(log (fun logger -> log_instant_event logger ~name:"end process pool" Global))
let results = run pool in
PerfEvent.(log (fun logger -> log_instant_event logger ~name:"end process pool" Global)) ;

@ -23,8 +23,9 @@ open! IStd
See also {!module-ProcessPoolState}. *)
(** A ['a t] process pool accepts tasks of type ['a]. ['a] will be marshalled over a Unix pipe.*)
type _ t
(** A [('work, 'final) t] process pool accepts tasks of type ['work] and produces an array of
results of type ['final]. ['work] and ['final] will be marshalled over a Unix pipe.*)
type (_, _) t
(** abstraction for generating jobs *)
type 'a task_generator =
@ -43,8 +44,14 @@ type 'a task_generator =
val create :
jobs:int -> child_prelude:(unit -> unit) -> f:('a -> unit) -> tasks:'a task_generator -> 'a t
-> child_prelude:(unit -> unit)
-> f:('work -> unit)
-> child_epilogue:(unit -> 'final)
-> tasks:'work task_generator
-> ('work, 'final) t
(** Create a new pool of processes running [jobs] jobs in parallel *)
val run : 'a t -> unit
(** use the processes in the given process pool to run all the given tasks in parallel. *)
val run : (_, 'final) t -> 'final option Array.t
(** use the processes in the given process pool to run all the given tasks in parallel and return
the results of the epilogues *)

@ -56,8 +56,12 @@ let run_compilation_database compilation_database should_capture_file =
L.progress "Starting %s %d files@\n%!" Config.clang_frontend_action_string number_of_jobs ;
let compilation_commands = List.map ~f:create_cmd compilation_data in
let tasks = Tasks.gen_of_list compilation_commands in
let runner = Tasks.Runner.create ~jobs:Config.jobs ~f:invoke_cmd ~tasks in
Tasks.Runner.run runner ;
(* no stats to record so [child_epilogue] does nothing and we ignore the return
{!Tasks.Runner.run} *)
let runner =
Tasks.Runner.create ~jobs:Config.jobs ~f:invoke_cmd ~child_epilogue:(fun () -> ()) ~tasks
Tasks.Runner.run runner |> ignore ;
L.progress "@." ;
L.(debug Analysis Medium) "Ran %d jobs" number_of_jobs
