diff --git a/infer/src/backend/InferAnalyze.ml b/infer/src/backend/InferAnalyze.ml index eb09f4eaf..c92af5e61 100644 --- a/infer/src/backend/InferAnalyze.ml +++ b/infer/src/backend/InferAnalyze.ml @@ -106,6 +106,5 @@ let main ~changed_files = (* Prepare tasks one cluster at a time while executing in parallel *) let tasks = Tasks.gen_of_list source_files_to_analyze in let runner = Tasks.Runner.create ~jobs:Config.jobs ~f:analyze_source_file ~tasks in - let n_tasks = !n_source_files_to_analyze in - Tasks.Runner.run runner ~n_tasks ) ; + Tasks.Runner.run runner ) ; output_json_makefile_stats source_files_to_analyze diff --git a/infer/src/backend/Tasks.ml b/infer/src/backend/Tasks.ml index 732adc616..861305c98 100644 --- a/infer/src/backend/Tasks.ml +++ b/infer/src/backend/Tasks.ml @@ -56,16 +56,17 @@ module Runner = struct pool - let run runner ~n_tasks = + let run runner = (* Flush here all buffers to avoid passing unflushed data to forked processes, leading to duplication *) Pervasives.flush_all () ; (* Compact heap before forking *) Gc.compact () ; - ProcessPool.run runner n_tasks + ProcessPool.run runner end let gen_of_list (lst : 'a list) : 'a task_generator = let content = ref lst in + let n_tasks = List.length lst in let is_empty () = List.is_empty !content in let next _finished_item = match !content with @@ -75,4 +76,4 @@ let gen_of_list (lst : 'a list) : 'a task_generator = content := xs ; Some x in - {is_empty; next} + {n_tasks; is_empty; next} diff --git a/infer/src/backend/Tasks.mli b/infer/src/backend/Tasks.mli index 5de6fd3c4..4692e4a62 100644 --- a/infer/src/backend/Tasks.mli +++ b/infer/src/backend/Tasks.mli @@ -26,6 +26,6 @@ module Runner : sig val create : jobs:int -> f:'a doer -> tasks:'a task_generator -> 'a t (** Create a runner running [jobs] jobs in parallel *) - val run : 'a t -> n_tasks:int -> unit + val run : 'a t -> unit (** Start the given tasks with the runner and wait until completion *) end diff --git a/infer/src/base/ProcessPool.ml b/infer/src/base/ProcessPool.ml index 2cc10cffa..d6e8774f4 100644 --- a/infer/src/base/ProcessPool.ml +++ b/infer/src/base/ProcessPool.ml @@ -11,7 +11,7 @@ module L = Logging type child_info = {pid: Pid.t; down_pipe: Out_channel.t} -type 'a task_generator = {is_empty: unit -> bool; next: 'a option -> 'a option} +type 'a task_generator = {n_tasks: int; is_empty: unit -> bool; next: 'a option -> 'a option} (** the state of the pool *) type 'a t = @@ -25,15 +25,13 @@ type 'a t = ; children_updates: Unix.File_descr.t (** all the children send updates up the same pipe to the pool *) ; task_bar: TaskBar.t - ; tasks: 'a task_generator (** generator for work remaining to be done *) - ; mutable idle_children: int - (** number of children currently ready for more work, but there are no tasks to send to - them *) - } + ; tasks: 'a task_generator (** generator for work remaining to be done *) } (** {2 Constants} *) -(** refresh rate of the task bar (worst case: it also refreshes on children updates) *) +(** refresh rate of the task bar (worst case: it also refreshes on children updates) + this is now mandatory to allow checking for new work packets, when none were + previously available *) let refresh_timeout = let frames_per_second = 12 in `After (Time_ns.Span.of_int_ms (1_000 / frames_per_second)) @@ -85,11 +83,11 @@ let rec really_read ?(pos = 0) ~len fd ~buf = elapsed *) let wait_for_updates pool buffer = let file_descr = pool.children_updates in - let timeout = if TaskBar.is_interactive pool.task_bar then refresh_timeout else `Never in (* Use select(2) so that we can both wait on the pipe of children updates and wait for a - timeout. The timeout is for giving a chance to the taskbar of refreshing from time to time. *) + timeout. The timeout is for giving a chance to the taskbar of refreshing from time to time, + as well as for checking for new work where none were previously available. *) let {Unix.Select_fds.read= read_fds} = - Unix.select ~read:[file_descr] ~write:[] ~except:[] ~timeout () + Unix.select ~read:[file_descr] ~write:[] ~except:[] ~timeout:refresh_timeout () in match read_fds with | _ :: _ :: _ -> @@ -142,6 +140,21 @@ let has_dead_child pool = , status ) ) +let idle_children pool = + Array.fold pool.pending_items ~init:0 ~f:(fun acc -> function Some _ -> acc | None -> 1 + acc) + + +let send_work_to_child pool slot = + match pool.tasks.next pool.pending_items.(slot) with + | None -> + TaskBar.update_status pool.task_bar ~slot (Mtime_clock.now ()) "idle" ; + pool.pending_items.(slot) <- None + | Some x -> + let {down_pipe} = pool.slots.(slot) in + pool.pending_items.(slot) <- Some x ; + marshal_to_pipe down_pipe (Do x) + + (** main dispatch function that responds to messages from worker processes and updates the taskbar periodically *) let process_updates pool buffer = @@ -157,18 +170,16 @@ let process_updates pool buffer = (* clean crash, give the child process a chance to cleanup *) Unix.wait (`Pid pid) |> ignore ; killall pool ~slot "see backtrace above" - | Some (Ready slot) -> ( + | Some (Ready slot) -> TaskBar.tasks_done_add pool.task_bar 1 ; - match pool.tasks.next pool.pending_items.(slot) with - | None -> - TaskBar.update_status pool.task_bar ~slot (Mtime_clock.now ()) "idle" ; - pool.idle_children <- pool.idle_children + 1 - | Some x -> - let {down_pipe} = pool.slots.(slot) in - pool.pending_items.(slot) <- Some x ; - marshal_to_pipe down_pipe (Do x) ) - | None -> - () + send_work_to_child pool slot + | None -> ( + (* no updates, so try to schedule more work if there is an idle worker *) + match Array.findi pool.pending_items ~f:(fun _idx item -> Option.is_none item) with + | None -> + () + | Some (idle_slot, _) -> + send_work_to_child pool idle_slot ) (** terminate all worker processes *) @@ -277,11 +288,11 @@ let create : let[@warning "-26"] pipe_child_w = Unix.close pipe_child_w in let children_updates = pipe_child_r in let pending_items : 'a option Array.t = Array.create ~len:jobs None in - {slots; children_updates; jobs; task_bar; tasks; pending_items; idle_children= 0} + {slots; children_updates; jobs; task_bar; tasks; pending_items} -let run pool n_tasks = - TaskBar.set_tasks_total pool.task_bar n_tasks ; +let run pool = + TaskBar.set_tasks_total pool.task_bar pool.tasks.n_tasks ; TaskBar.tasks_done_reset pool.task_bar ; (* Start with a negative number of completed tasks to account for the initial [Ready] messages. All the children start by sending [Ready], which is interpreted by the parent process @@ -291,14 +302,14 @@ let run pool n_tasks = (* allocate a buffer for reading children updates once for the whole run *) let buffer = Bytes.create buffer_size in (* wait for all children to run out of tasks *) - while not (pool.tasks.is_empty () && pool.idle_children >= pool.jobs) do + while not (pool.tasks.is_empty () && idle_children pool >= pool.jobs) do process_updates pool buffer ; TaskBar.refresh pool.task_bar done ; wait_all pool ; TaskBar.finish pool.task_bar -let run pool n_tasks = +let run pool = PerfEvent.(log (fun logger -> log_instant_event logger ~name:"start process pool" Global)) ; - run pool n_tasks ; + run pool ; PerfEvent.(log (fun logger -> log_instant_event logger ~name:"end process pool" Global)) diff --git a/infer/src/base/ProcessPool.mli b/infer/src/base/ProcessPool.mli index f52afa684..cc04f3c4b 100644 --- a/infer/src/base/ProcessPool.mli +++ b/infer/src/base/ProcessPool.mli @@ -26,15 +26,25 @@ open! IStd (** A ['a t] process pool accepts tasks of type ['a]. ['a] will be marshalled over a Unix pipe.*) type _ t -(** abstraction for generating jobs; [next finished_item] produces the next task, - and receives the task that was finished, or [None] if this is the first time a worker - begins work on a task *) -type 'a task_generator = {is_empty: unit -> bool; next: 'a option -> 'a option} +(** abstraction for generating jobs *) +type 'a task_generator = + { n_tasks: int + (** total number of tasks -- only used for reporting, so imprecision is not a bug *) + ; is_empty: unit -> bool + (** when should the main loop of the task manager stop expecting new tasks *) + ; next: 'a option -> 'a option + (** [next (Some finished_item)] generates the next work item. + The worker requesting more work has just finished processing [finished_item]. + [None] is passed when the worker was previously idle. + + In particular, it is OK to for [next] to return [None] even when [is_empty] + is false. This corresponds to the case where there is more work to be done, + but it is not schedulable until some already scheduled work is finished. *) + } val create : jobs:int -> child_prelude:(unit -> unit) -> f:('a -> unit) -> tasks:'a task_generator -> 'a t (** Create a new pool of processes running [jobs] jobs in parallel *) -val run : 'a t -> int -> unit -(** use the processes in the given process pool to run all the given tasks in parallel. - the int argument is used for counting only *) +val run : 'a t -> unit +(** use the processes in the given process pool to run all the given tasks in parallel. *) diff --git a/infer/src/base/TaskBar.ml b/infer/src/base/TaskBar.ml index 7e34bb452..091b6aa7c 100644 --- a/infer/src/base/TaskBar.ml +++ b/infer/src/base/TaskBar.ml @@ -192,6 +192,3 @@ let finish = function Out_channel.flush stderr | NonInteractive | Quiet -> () - - -let is_interactive = function MultiLine _ -> true | NonInteractive | Quiet -> false diff --git a/infer/src/base/TaskBar.mli b/infer/src/base/TaskBar.mli index b83cfda55..a7d3728e1 100644 --- a/infer/src/base/TaskBar.mli +++ b/infer/src/base/TaskBar.mli @@ -30,6 +30,3 @@ val tasks_done_add : t -> int -> unit val finish : t -> unit (** tear down the task bar and ready the terminal for more output *) - -val is_interactive : t -> bool -(** does the task bar expect periodic refresh? *) diff --git a/infer/src/integration/CaptureCompilationDatabase.ml b/infer/src/integration/CaptureCompilationDatabase.ml index a852bb42d..b1d132cba 100644 --- a/infer/src/integration/CaptureCompilationDatabase.ml +++ b/infer/src/integration/CaptureCompilationDatabase.ml @@ -57,8 +57,7 @@ let run_compilation_database compilation_database should_capture_file = let compilation_commands = List.map ~f:create_cmd compilation_data in let tasks = Tasks.gen_of_list compilation_commands in let runner = Tasks.Runner.create ~jobs:Config.jobs ~f:invoke_cmd ~tasks in - let n_tasks = List.length compilation_commands in - Tasks.Runner.run runner ~n_tasks ; + Tasks.Runner.run runner ; L.progress "@." ; L.(debug Analysis Medium) "Ran %d jobs" number_of_jobs