From 5a18ad5c691b7d32e8a5bf7f45d6667bec4a4ab2 Mon Sep 17 00:00:00 2001 From: Nikos Gorogiannis Date: Tue, 7 May 2019 09:00:47 -0700 Subject: [PATCH] [tasks] relax task generator interface contract Summary: A more dynamic scheduling scheme will potentially run into the situation where no new work packets can be scheduled, but more work will be possible to schedule in the future, perhaps when some dependent work packet finishes being analysed. The current implementation prevents that, as it expects that if a worker goes idle, it stays idle. The changes here address this in two parts: - the `select` call is always given a finite timeout. If given an infinite timeout, we will not be able to poll the task generator for more work, where none were previously possible. - when the `select` call times out without updates, check if there is an idle child, and if so if the task generator has more work right now. See also ProcessPool.mli for comments. Reviewed By: mbouaziz Differential Revision: D15197749 fbshipit-source-id: babe5da8e --- infer/src/backend/InferAnalyze.ml | 3 +- infer/src/backend/Tasks.ml | 7 +- infer/src/backend/Tasks.mli | 2 +- infer/src/base/ProcessPool.ml | 65 +++++++++++-------- infer/src/base/ProcessPool.mli | 24 +++++-- infer/src/base/TaskBar.ml | 3 - infer/src/base/TaskBar.mli | 3 - .../integration/CaptureCompilationDatabase.ml | 3 +- 8 files changed, 62 insertions(+), 48 deletions(-) diff --git a/infer/src/backend/InferAnalyze.ml b/infer/src/backend/InferAnalyze.ml index eb09f4eaf..c92af5e61 100644 --- a/infer/src/backend/InferAnalyze.ml +++ b/infer/src/backend/InferAnalyze.ml @@ -106,6 +106,5 @@ let main ~changed_files = (* Prepare tasks one cluster at a time while executing in parallel *) let tasks = Tasks.gen_of_list source_files_to_analyze in let runner = Tasks.Runner.create ~jobs:Config.jobs ~f:analyze_source_file ~tasks in - let n_tasks = !n_source_files_to_analyze in - Tasks.Runner.run runner ~n_tasks ) ; + Tasks.Runner.run runner ) ; output_json_makefile_stats source_files_to_analyze diff --git a/infer/src/backend/Tasks.ml b/infer/src/backend/Tasks.ml index 732adc616..861305c98 100644 --- a/infer/src/backend/Tasks.ml +++ b/infer/src/backend/Tasks.ml @@ -56,16 +56,17 @@ module Runner = struct pool - let run runner ~n_tasks = + let run runner = (* Flush here all buffers to avoid passing unflushed data to forked processes, leading to duplication *) Pervasives.flush_all () ; (* Compact heap before forking *) Gc.compact () ; - ProcessPool.run runner n_tasks + ProcessPool.run runner end let gen_of_list (lst : 'a list) : 'a task_generator = let content = ref lst in + let n_tasks = List.length lst in let is_empty () = List.is_empty !content in let next _finished_item = match !content with @@ -75,4 +76,4 @@ let gen_of_list (lst : 'a list) : 'a task_generator = content := xs ; Some x in - {is_empty; next} + {n_tasks; is_empty; next} diff --git a/infer/src/backend/Tasks.mli b/infer/src/backend/Tasks.mli index 5de6fd3c4..4692e4a62 100644 --- a/infer/src/backend/Tasks.mli +++ b/infer/src/backend/Tasks.mli @@ -26,6 +26,6 @@ module Runner : sig val create : jobs:int -> f:'a doer -> tasks:'a task_generator -> 'a t (** Create a runner running [jobs] jobs in parallel *) - val run : 'a t -> n_tasks:int -> unit + val run : 'a t -> unit (** Start the given tasks with the runner and wait until completion *) end diff --git a/infer/src/base/ProcessPool.ml b/infer/src/base/ProcessPool.ml index 2cc10cffa..d6e8774f4 100644 --- a/infer/src/base/ProcessPool.ml +++ b/infer/src/base/ProcessPool.ml @@ -11,7 +11,7 @@ module L = Logging type child_info = {pid: Pid.t; down_pipe: Out_channel.t} -type 'a task_generator = {is_empty: unit -> bool; next: 'a option -> 'a option} +type 'a task_generator = {n_tasks: int; is_empty: unit -> bool; next: 'a option -> 'a option} (** the state of the pool *) type 'a t = @@ -25,15 +25,13 @@ type 'a t = ; children_updates: Unix.File_descr.t (** all the children send updates up the same pipe to the pool *) ; task_bar: TaskBar.t - ; tasks: 'a task_generator (** generator for work remaining to be done *) - ; mutable idle_children: int - (** number of children currently ready for more work, but there are no tasks to send to - them *) - } + ; tasks: 'a task_generator (** generator for work remaining to be done *) } (** {2 Constants} *) -(** refresh rate of the task bar (worst case: it also refreshes on children updates) *) +(** refresh rate of the task bar (worst case: it also refreshes on children updates) + this is now mandatory to allow checking for new work packets, when none were + previously available *) let refresh_timeout = let frames_per_second = 12 in `After (Time_ns.Span.of_int_ms (1_000 / frames_per_second)) @@ -85,11 +83,11 @@ let rec really_read ?(pos = 0) ~len fd ~buf = elapsed *) let wait_for_updates pool buffer = let file_descr = pool.children_updates in - let timeout = if TaskBar.is_interactive pool.task_bar then refresh_timeout else `Never in (* Use select(2) so that we can both wait on the pipe of children updates and wait for a - timeout. The timeout is for giving a chance to the taskbar of refreshing from time to time. *) + timeout. The timeout is for giving a chance to the taskbar of refreshing from time to time, + as well as for checking for new work where none were previously available. *) let {Unix.Select_fds.read= read_fds} = - Unix.select ~read:[file_descr] ~write:[] ~except:[] ~timeout () + Unix.select ~read:[file_descr] ~write:[] ~except:[] ~timeout:refresh_timeout () in match read_fds with | _ :: _ :: _ -> @@ -142,6 +140,21 @@ let has_dead_child pool = , status ) ) +let idle_children pool = + Array.fold pool.pending_items ~init:0 ~f:(fun acc -> function Some _ -> acc | None -> 1 + acc) + + +let send_work_to_child pool slot = + match pool.tasks.next pool.pending_items.(slot) with + | None -> + TaskBar.update_status pool.task_bar ~slot (Mtime_clock.now ()) "idle" ; + pool.pending_items.(slot) <- None + | Some x -> + let {down_pipe} = pool.slots.(slot) in + pool.pending_items.(slot) <- Some x ; + marshal_to_pipe down_pipe (Do x) + + (** main dispatch function that responds to messages from worker processes and updates the taskbar periodically *) let process_updates pool buffer = @@ -157,18 +170,16 @@ let process_updates pool buffer = (* clean crash, give the child process a chance to cleanup *) Unix.wait (`Pid pid) |> ignore ; killall pool ~slot "see backtrace above" - | Some (Ready slot) -> ( + | Some (Ready slot) -> TaskBar.tasks_done_add pool.task_bar 1 ; - match pool.tasks.next pool.pending_items.(slot) with - | None -> - TaskBar.update_status pool.task_bar ~slot (Mtime_clock.now ()) "idle" ; - pool.idle_children <- pool.idle_children + 1 - | Some x -> - let {down_pipe} = pool.slots.(slot) in - pool.pending_items.(slot) <- Some x ; - marshal_to_pipe down_pipe (Do x) ) - | None -> - () + send_work_to_child pool slot + | None -> ( + (* no updates, so try to schedule more work if there is an idle worker *) + match Array.findi pool.pending_items ~f:(fun _idx item -> Option.is_none item) with + | None -> + () + | Some (idle_slot, _) -> + send_work_to_child pool idle_slot ) (** terminate all worker processes *) @@ -277,11 +288,11 @@ let create : let[@warning "-26"] pipe_child_w = Unix.close pipe_child_w in let children_updates = pipe_child_r in let pending_items : 'a option Array.t = Array.create ~len:jobs None in - {slots; children_updates; jobs; task_bar; tasks; pending_items; idle_children= 0} + {slots; children_updates; jobs; task_bar; tasks; pending_items} -let run pool n_tasks = - TaskBar.set_tasks_total pool.task_bar n_tasks ; +let run pool = + TaskBar.set_tasks_total pool.task_bar pool.tasks.n_tasks ; TaskBar.tasks_done_reset pool.task_bar ; (* Start with a negative number of completed tasks to account for the initial [Ready] messages. All the children start by sending [Ready], which is interpreted by the parent process @@ -291,14 +302,14 @@ let run pool n_tasks = (* allocate a buffer for reading children updates once for the whole run *) let buffer = Bytes.create buffer_size in (* wait for all children to run out of tasks *) - while not (pool.tasks.is_empty () && pool.idle_children >= pool.jobs) do + while not (pool.tasks.is_empty () && idle_children pool >= pool.jobs) do process_updates pool buffer ; TaskBar.refresh pool.task_bar done ; wait_all pool ; TaskBar.finish pool.task_bar -let run pool n_tasks = +let run pool = PerfEvent.(log (fun logger -> log_instant_event logger ~name:"start process pool" Global)) ; - run pool n_tasks ; + run pool ; PerfEvent.(log (fun logger -> log_instant_event logger ~name:"end process pool" Global)) diff --git a/infer/src/base/ProcessPool.mli b/infer/src/base/ProcessPool.mli index f52afa684..cc04f3c4b 100644 --- a/infer/src/base/ProcessPool.mli +++ b/infer/src/base/ProcessPool.mli @@ -26,15 +26,25 @@ open! IStd (** A ['a t] process pool accepts tasks of type ['a]. ['a] will be marshalled over a Unix pipe.*) type _ t -(** abstraction for generating jobs; [next finished_item] produces the next task, - and receives the task that was finished, or [None] if this is the first time a worker - begins work on a task *) -type 'a task_generator = {is_empty: unit -> bool; next: 'a option -> 'a option} +(** abstraction for generating jobs *) +type 'a task_generator = + { n_tasks: int + (** total number of tasks -- only used for reporting, so imprecision is not a bug *) + ; is_empty: unit -> bool + (** when should the main loop of the task manager stop expecting new tasks *) + ; next: 'a option -> 'a option + (** [next (Some finished_item)] generates the next work item. + The worker requesting more work has just finished processing [finished_item]. + [None] is passed when the worker was previously idle. + + In particular, it is OK to for [next] to return [None] even when [is_empty] + is false. This corresponds to the case where there is more work to be done, + but it is not schedulable until some already scheduled work is finished. *) + } val create : jobs:int -> child_prelude:(unit -> unit) -> f:('a -> unit) -> tasks:'a task_generator -> 'a t (** Create a new pool of processes running [jobs] jobs in parallel *) -val run : 'a t -> int -> unit -(** use the processes in the given process pool to run all the given tasks in parallel. - the int argument is used for counting only *) +val run : 'a t -> unit +(** use the processes in the given process pool to run all the given tasks in parallel. *) diff --git a/infer/src/base/TaskBar.ml b/infer/src/base/TaskBar.ml index 7e34bb452..091b6aa7c 100644 --- a/infer/src/base/TaskBar.ml +++ b/infer/src/base/TaskBar.ml @@ -192,6 +192,3 @@ let finish = function Out_channel.flush stderr | NonInteractive | Quiet -> () - - -let is_interactive = function MultiLine _ -> true | NonInteractive | Quiet -> false diff --git a/infer/src/base/TaskBar.mli b/infer/src/base/TaskBar.mli index b83cfda55..a7d3728e1 100644 --- a/infer/src/base/TaskBar.mli +++ b/infer/src/base/TaskBar.mli @@ -30,6 +30,3 @@ val tasks_done_add : t -> int -> unit val finish : t -> unit (** tear down the task bar and ready the terminal for more output *) - -val is_interactive : t -> bool -(** does the task bar expect periodic refresh? *) diff --git a/infer/src/integration/CaptureCompilationDatabase.ml b/infer/src/integration/CaptureCompilationDatabase.ml index a852bb42d..b1d132cba 100644 --- a/infer/src/integration/CaptureCompilationDatabase.ml +++ b/infer/src/integration/CaptureCompilationDatabase.ml @@ -57,8 +57,7 @@ let run_compilation_database compilation_database should_capture_file = let compilation_commands = List.map ~f:create_cmd compilation_data in let tasks = Tasks.gen_of_list compilation_commands in let runner = Tasks.Runner.create ~jobs:Config.jobs ~f:invoke_cmd ~tasks in - let n_tasks = List.length compilation_commands in - Tasks.Runner.run runner ~n_tasks ; + Tasks.Runner.run runner ; L.progress "@." ; L.(debug Analysis Medium) "Ran %d jobs" number_of_jobs