[tasks] relax task generator interface contract

Summary:
A more dynamic scheduling scheme will potentially run into the situation where no new work packets can be scheduled, but more work will be possible to schedule in the future, perhaps when some dependent work packet finishes being analysed.

The current implementation prevents that, as it expects that if a worker goes idle, it stays idle.

The changes here address this in two parts:
- the `select` call is always given a finite timeout.  If given an infinite timeout, we will not be able to poll the task generator for more work, where none were previously possible.
- when the `select` call times out without updates, check if there is an idle child, and if so if the task generator has more work right now.

See also ProcessPool.mli for comments.

Reviewed By: mbouaziz

Differential Revision: D15197749

fbshipit-source-id: babe5da8e
master
Nikos Gorogiannis 6 years ago committed by Facebook Github Bot
parent b47e2d13f3
commit 5a18ad5c69

@ -106,6 +106,5 @@ let main ~changed_files =
(* Prepare tasks one cluster at a time while executing in parallel *) (* Prepare tasks one cluster at a time while executing in parallel *)
let tasks = Tasks.gen_of_list source_files_to_analyze in let tasks = Tasks.gen_of_list source_files_to_analyze in
let runner = Tasks.Runner.create ~jobs:Config.jobs ~f:analyze_source_file ~tasks in let runner = Tasks.Runner.create ~jobs:Config.jobs ~f:analyze_source_file ~tasks in
let n_tasks = !n_source_files_to_analyze in Tasks.Runner.run runner ) ;
Tasks.Runner.run runner ~n_tasks ) ;
output_json_makefile_stats source_files_to_analyze output_json_makefile_stats source_files_to_analyze

@ -56,16 +56,17 @@ module Runner = struct
pool pool
let run runner ~n_tasks = let run runner =
(* Flush here all buffers to avoid passing unflushed data to forked processes, leading to duplication *) (* Flush here all buffers to avoid passing unflushed data to forked processes, leading to duplication *)
Pervasives.flush_all () ; Pervasives.flush_all () ;
(* Compact heap before forking *) (* Compact heap before forking *)
Gc.compact () ; Gc.compact () ;
ProcessPool.run runner n_tasks ProcessPool.run runner
end end
let gen_of_list (lst : 'a list) : 'a task_generator = let gen_of_list (lst : 'a list) : 'a task_generator =
let content = ref lst in let content = ref lst in
let n_tasks = List.length lst in
let is_empty () = List.is_empty !content in let is_empty () = List.is_empty !content in
let next _finished_item = let next _finished_item =
match !content with match !content with
@ -75,4 +76,4 @@ let gen_of_list (lst : 'a list) : 'a task_generator =
content := xs ; content := xs ;
Some x Some x
in in
{is_empty; next} {n_tasks; is_empty; next}

@ -26,6 +26,6 @@ module Runner : sig
val create : jobs:int -> f:'a doer -> tasks:'a task_generator -> 'a t val create : jobs:int -> f:'a doer -> tasks:'a task_generator -> 'a t
(** Create a runner running [jobs] jobs in parallel *) (** Create a runner running [jobs] jobs in parallel *)
val run : 'a t -> n_tasks:int -> unit val run : 'a t -> unit
(** Start the given tasks with the runner and wait until completion *) (** Start the given tasks with the runner and wait until completion *)
end end

@ -11,7 +11,7 @@ module L = Logging
type child_info = {pid: Pid.t; down_pipe: Out_channel.t} type child_info = {pid: Pid.t; down_pipe: Out_channel.t}
type 'a task_generator = {is_empty: unit -> bool; next: 'a option -> 'a option} type 'a task_generator = {n_tasks: int; is_empty: unit -> bool; next: 'a option -> 'a option}
(** the state of the pool *) (** the state of the pool *)
type 'a t = type 'a t =
@ -25,15 +25,13 @@ type 'a t =
; children_updates: Unix.File_descr.t ; children_updates: Unix.File_descr.t
(** all the children send updates up the same pipe to the pool *) (** all the children send updates up the same pipe to the pool *)
; task_bar: TaskBar.t ; task_bar: TaskBar.t
; tasks: 'a task_generator (** generator for work remaining to be done *) ; tasks: 'a task_generator (** generator for work remaining to be done *) }
; mutable idle_children: int
(** number of children currently ready for more work, but there are no tasks to send to
them *)
}
(** {2 Constants} *) (** {2 Constants} *)
(** refresh rate of the task bar (worst case: it also refreshes on children updates) *) (** refresh rate of the task bar (worst case: it also refreshes on children updates)
this is now mandatory to allow checking for new work packets, when none were
previously available *)
let refresh_timeout = let refresh_timeout =
let frames_per_second = 12 in let frames_per_second = 12 in
`After (Time_ns.Span.of_int_ms (1_000 / frames_per_second)) `After (Time_ns.Span.of_int_ms (1_000 / frames_per_second))
@ -85,11 +83,11 @@ let rec really_read ?(pos = 0) ~len fd ~buf =
elapsed *) elapsed *)
let wait_for_updates pool buffer = let wait_for_updates pool buffer =
let file_descr = pool.children_updates in let file_descr = pool.children_updates in
let timeout = if TaskBar.is_interactive pool.task_bar then refresh_timeout else `Never in
(* Use select(2) so that we can both wait on the pipe of children updates and wait for a (* Use select(2) so that we can both wait on the pipe of children updates and wait for a
timeout. The timeout is for giving a chance to the taskbar of refreshing from time to time. *) timeout. The timeout is for giving a chance to the taskbar of refreshing from time to time,
as well as for checking for new work where none were previously available. *)
let {Unix.Select_fds.read= read_fds} = let {Unix.Select_fds.read= read_fds} =
Unix.select ~read:[file_descr] ~write:[] ~except:[] ~timeout () Unix.select ~read:[file_descr] ~write:[] ~except:[] ~timeout:refresh_timeout ()
in in
match read_fds with match read_fds with
| _ :: _ :: _ -> | _ :: _ :: _ ->
@ -142,6 +140,21 @@ let has_dead_child pool =
, status ) ) , status ) )
let idle_children pool =
Array.fold pool.pending_items ~init:0 ~f:(fun acc -> function Some _ -> acc | None -> 1 + acc)
let send_work_to_child pool slot =
match pool.tasks.next pool.pending_items.(slot) with
| None ->
TaskBar.update_status pool.task_bar ~slot (Mtime_clock.now ()) "idle" ;
pool.pending_items.(slot) <- None
| Some x ->
let {down_pipe} = pool.slots.(slot) in
pool.pending_items.(slot) <- Some x ;
marshal_to_pipe down_pipe (Do x)
(** main dispatch function that responds to messages from worker processes and updates the taskbar (** main dispatch function that responds to messages from worker processes and updates the taskbar
periodically *) periodically *)
let process_updates pool buffer = let process_updates pool buffer =
@ -157,18 +170,16 @@ let process_updates pool buffer =
(* clean crash, give the child process a chance to cleanup *) (* clean crash, give the child process a chance to cleanup *)
Unix.wait (`Pid pid) |> ignore ; Unix.wait (`Pid pid) |> ignore ;
killall pool ~slot "see backtrace above" killall pool ~slot "see backtrace above"
| Some (Ready slot) -> ( | Some (Ready slot) ->
TaskBar.tasks_done_add pool.task_bar 1 ; TaskBar.tasks_done_add pool.task_bar 1 ;
match pool.tasks.next pool.pending_items.(slot) with send_work_to_child pool slot
| None -> | None -> (
TaskBar.update_status pool.task_bar ~slot (Mtime_clock.now ()) "idle" ; (* no updates, so try to schedule more work if there is an idle worker *)
pool.idle_children <- pool.idle_children + 1 match Array.findi pool.pending_items ~f:(fun _idx item -> Option.is_none item) with
| Some x ->
let {down_pipe} = pool.slots.(slot) in
pool.pending_items.(slot) <- Some x ;
marshal_to_pipe down_pipe (Do x) )
| None -> | None ->
() ()
| Some (idle_slot, _) ->
send_work_to_child pool idle_slot )
(** terminate all worker processes *) (** terminate all worker processes *)
@ -277,11 +288,11 @@ let create :
let[@warning "-26"] pipe_child_w = Unix.close pipe_child_w in let[@warning "-26"] pipe_child_w = Unix.close pipe_child_w in
let children_updates = pipe_child_r in let children_updates = pipe_child_r in
let pending_items : 'a option Array.t = Array.create ~len:jobs None in let pending_items : 'a option Array.t = Array.create ~len:jobs None in
{slots; children_updates; jobs; task_bar; tasks; pending_items; idle_children= 0} {slots; children_updates; jobs; task_bar; tasks; pending_items}
let run pool n_tasks = let run pool =
TaskBar.set_tasks_total pool.task_bar n_tasks ; TaskBar.set_tasks_total pool.task_bar pool.tasks.n_tasks ;
TaskBar.tasks_done_reset pool.task_bar ; TaskBar.tasks_done_reset pool.task_bar ;
(* Start with a negative number of completed tasks to account for the initial [Ready] (* Start with a negative number of completed tasks to account for the initial [Ready]
messages. All the children start by sending [Ready], which is interpreted by the parent process messages. All the children start by sending [Ready], which is interpreted by the parent process
@ -291,14 +302,14 @@ let run pool n_tasks =
(* allocate a buffer for reading children updates once for the whole run *) (* allocate a buffer for reading children updates once for the whole run *)
let buffer = Bytes.create buffer_size in let buffer = Bytes.create buffer_size in
(* wait for all children to run out of tasks *) (* wait for all children to run out of tasks *)
while not (pool.tasks.is_empty () && pool.idle_children >= pool.jobs) do while not (pool.tasks.is_empty () && idle_children pool >= pool.jobs) do
process_updates pool buffer ; TaskBar.refresh pool.task_bar process_updates pool buffer ; TaskBar.refresh pool.task_bar
done ; done ;
wait_all pool ; wait_all pool ;
TaskBar.finish pool.task_bar TaskBar.finish pool.task_bar
let run pool n_tasks = let run pool =
PerfEvent.(log (fun logger -> log_instant_event logger ~name:"start process pool" Global)) ; PerfEvent.(log (fun logger -> log_instant_event logger ~name:"start process pool" Global)) ;
run pool n_tasks ; run pool ;
PerfEvent.(log (fun logger -> log_instant_event logger ~name:"end process pool" Global)) PerfEvent.(log (fun logger -> log_instant_event logger ~name:"end process pool" Global))

@ -26,15 +26,25 @@ open! IStd
(** A ['a t] process pool accepts tasks of type ['a]. ['a] will be marshalled over a Unix pipe.*) (** A ['a t] process pool accepts tasks of type ['a]. ['a] will be marshalled over a Unix pipe.*)
type _ t type _ t
(** abstraction for generating jobs; [next finished_item] produces the next task, (** abstraction for generating jobs *)
and receives the task that was finished, or [None] if this is the first time a worker type 'a task_generator =
begins work on a task *) { n_tasks: int
type 'a task_generator = {is_empty: unit -> bool; next: 'a option -> 'a option} (** total number of tasks -- only used for reporting, so imprecision is not a bug *)
; is_empty: unit -> bool
(** when should the main loop of the task manager stop expecting new tasks *)
; next: 'a option -> 'a option
(** [next (Some finished_item)] generates the next work item.
The worker requesting more work has just finished processing [finished_item].
[None] is passed when the worker was previously idle.
In particular, it is OK to for [next] to return [None] even when [is_empty]
is false. This corresponds to the case where there is more work to be done,
but it is not schedulable until some already scheduled work is finished. *)
}
val create : val create :
jobs:int -> child_prelude:(unit -> unit) -> f:('a -> unit) -> tasks:'a task_generator -> 'a t jobs:int -> child_prelude:(unit -> unit) -> f:('a -> unit) -> tasks:'a task_generator -> 'a t
(** Create a new pool of processes running [jobs] jobs in parallel *) (** Create a new pool of processes running [jobs] jobs in parallel *)
val run : 'a t -> int -> unit val run : 'a t -> unit
(** use the processes in the given process pool to run all the given tasks in parallel. (** use the processes in the given process pool to run all the given tasks in parallel. *)
the int argument is used for counting only *)

@ -192,6 +192,3 @@ let finish = function
Out_channel.flush stderr Out_channel.flush stderr
| NonInteractive | Quiet -> | NonInteractive | Quiet ->
() ()
let is_interactive = function MultiLine _ -> true | NonInteractive | Quiet -> false

@ -30,6 +30,3 @@ val tasks_done_add : t -> int -> unit
val finish : t -> unit val finish : t -> unit
(** tear down the task bar and ready the terminal for more output *) (** tear down the task bar and ready the terminal for more output *)
val is_interactive : t -> bool
(** does the task bar expect periodic refresh? *)

@ -57,8 +57,7 @@ let run_compilation_database compilation_database should_capture_file =
let compilation_commands = List.map ~f:create_cmd compilation_data in let compilation_commands = List.map ~f:create_cmd compilation_data in
let tasks = Tasks.gen_of_list compilation_commands in let tasks = Tasks.gen_of_list compilation_commands in
let runner = Tasks.Runner.create ~jobs:Config.jobs ~f:invoke_cmd ~tasks in let runner = Tasks.Runner.create ~jobs:Config.jobs ~f:invoke_cmd ~tasks in
let n_tasks = List.length compilation_commands in Tasks.Runner.run runner ;
Tasks.Runner.run runner ~n_tasks ;
L.progress "@." ; L.progress "@." ;
L.(debug Analysis Medium) "Ran %d jobs" number_of_jobs L.(debug Analysis Medium) "Ran %d jobs" number_of_jobs

Loading…
Cancel
Save