diff --git a/infer/src/backend/InferAnalyze.ml b/infer/src/backend/InferAnalyze.ml index e45a8fa66..3441b0634 100644 --- a/infer/src/backend/InferAnalyze.ml +++ b/infer/src/backend/InferAnalyze.ml @@ -16,10 +16,9 @@ let create_exe_env_tasks source_file exe_env : Tasks.t = Summary.clear_cache () ; Typ.Procname.SQLite.clear_cache () ; Random.self_init () ; - Tasks.create - [ (fun () -> - Callbacks.iterate_callbacks exe_env ; - if Config.write_html then Printer.write_all_html_files source_file ) ] + [ (fun () -> + Callbacks.iterate_callbacks exe_env ; + if Config.write_html then Printer.write_all_html_files source_file ) ] (** Create tasks to analyze a cluster *) diff --git a/infer/src/backend/Tasks.ml b/infer/src/backend/Tasks.ml index 6928bf008..d0e408a90 100644 --- a/infer/src/backend/Tasks.ml +++ b/infer/src/backend/Tasks.ml @@ -8,32 +8,18 @@ open! IStd module L = Logging -type closure = unit -> unit +type task = unit -> unit -type t = {closures: closure list; continuations: closure Queue.t} +type t = task list -let create ?(continuation= None) closures = - let continuations = - match continuation with None -> Queue.create () | Some closure -> Queue.singleton closure - in - {closures; continuations} - - -(* Aggregate closures into groups of the given size *) +(** Aggregate closures into groups of the given size *) let aggregate ~size t = let group_to_closure group () = List.iter ~f:(fun closure -> closure ()) group in - let group_size = if size > 0 then size else List.length t.closures / Config.jobs in - if group_size > 1 then - let groups = List.groupi ~break:(fun n _ _ -> Int.equal (n mod group_size) 0) t.closures in - let closures = List.map ~f:group_to_closure groups in - {t with closures} - else t + let group_size = if size > 0 then size else List.length t / Config.jobs in + if group_size > 1 then List.chunks_of t ~length:group_size |> List.map ~f:group_to_closure else t -let run t = - List.iter ~f:(fun f -> f ()) t.closures ; - Queue.iter ~f:(fun closure -> closure ()) t.continuations - +let run t = List.iter ~f:(fun f -> f ()) t let fork_protect ~f x = EventLogger.prepare () ; @@ -43,21 +29,18 @@ let fork_protect ~f x = module Runner = struct - type runner = {pool: ProcessPool.t; all_continuations: closure Queue.t} + type tasks = t + + type t = {pool: ProcessPool.t} - let create ~jobs = {pool= ProcessPool.create ~jobs; all_continuations= Queue.create ()} + let create ~jobs = {pool= ProcessPool.create ~jobs} let start runner ~tasks = let pool = runner.pool in - Queue.enqueue_all runner.all_continuations (Queue.to_list tasks.continuations) ; (* Flush here all buffers to avoid passing unflushed data to forked processes, leading to duplication *) Pervasives.flush_all () ; - List.iter - ~f:(fun x -> ProcessPool.start_child ~f:(fun f -> fork_protect ~f ()) ~pool x) - tasks.closures + List.iter ~f:(fun x -> ProcessPool.start_child ~f:(fun f -> fork_protect ~f ()) ~pool x) tasks - let complete runner = - ProcessPool.wait_all runner.pool ; - Queue.iter ~f:(fun f -> f ()) runner.all_continuations + let complete runner = ProcessPool.wait_all runner.pool end diff --git a/infer/src/backend/Tasks.mli b/infer/src/backend/Tasks.mli index 7cf68c511..4a3c1ca37 100644 --- a/infer/src/backend/Tasks.mli +++ b/infer/src/backend/Tasks.mli @@ -7,37 +7,33 @@ open! IStd -(** A sequence of tasks that can be executed in parallel, - with a continuation to be executed at the end *) -type t +(** Each task executes a closure *) +type task = unit -> unit -(** Each task/continuation executes a closure *) -type closure = unit -> unit - -(* Aggregate closures into groups of the given size *) +(** A sequence of tasks that can be executed in parallel *) +type t = task list val aggregate : size:int -> t -> t - -val create : ?continuation:closure option -> closure list -> t -(** Create tasks with a list of closures to be executed in parallel, - and an optional continuation to be executed afterwards *) +(** Aggregate closures into groups of the given size *) val run : t -> unit -(** Run the closures and continuation *) +(** Run the closures *) val fork_protect : f:('a -> 'b) -> 'a -> 'b (** does the bookkeeping necessary to safely execute an infer function [f] after a call to fork(2) *) +(** A runner accepts new tasks repeatedly for parallel execution *) module Runner : sig - (** A runner accepts new tasks repeatedly for parallel execution *) - type runner + type tasks = t + + type t - val create : jobs:int -> runner - (** Create a runner *) + val create : jobs:int -> t + (** Create a runner running [jobs] jobs in parallel *) - val start : runner -> tasks:t -> unit + val start : t -> tasks:tasks -> unit (** Start the given tasks with the runner *) - val complete : runner -> unit - (** Complete all the outstanding tasks *) + val complete : t -> unit + (** Wait until all the outstanding tasks are completed *) end