[tasks] move to a task generator interface

Summary:
Before moving to any kind of non-trivial scheduling, we need to change the Tasks interface.
In particular, it's too restrictive to expect that the tasks to be scheduled are provided as a list before starting execution.  For example, dynamic scheduling does not fit the bill here.  Also, the list expectation means all scheduling work has to be done up front.

The solution here is to move to a `Sequence`-like interface with one difference:
- The function returning the next task expects a task option argument.  That argument is the task that was just finished (if any) by the worker expecting new work.  This will be useful for things like task dependencies (for instance, a procedure has been analysed, and can be marked so).

Reviewed By: mbouaziz

Differential Revision: D15181613

fbshipit-source-id: 21f3ba825
master
Nikos Gorogiannis 6 years ago committed by Facebook Github Bot
parent df438016f2
commit 65824ed7a9

@ -104,6 +104,8 @@ let main ~changed_files =
in in
L.environment_info "Parallel jobs: %d@." Config.jobs ; L.environment_info "Parallel jobs: %d@." Config.jobs ;
(* Prepare tasks one cluster at a time while executing in parallel *) (* Prepare tasks one cluster at a time while executing in parallel *)
let runner = Tasks.Runner.create ~jobs:Config.jobs ~f:analyze_source_file in let tasks = Tasks.gen_of_list source_files_to_analyze in
Tasks.Runner.run runner ~tasks:source_files_to_analyze ) ; let runner = Tasks.Runner.create ~jobs:Config.jobs ~f:analyze_source_file ~tasks in
let n_tasks = !n_source_files_to_analyze in
Tasks.Runner.run runner ~n_tasks ) ;
output_json_makefile_stats source_files_to_analyze output_json_makefile_stats source_files_to_analyze

@ -10,6 +10,8 @@ module L = Logging
type 'a doer = 'a -> unit type 'a doer = 'a -> unit
type 'a task_generator = 'a ProcessPool.task_generator
let run_sequentially ~(f : 'a doer) (tasks : 'a list) : unit = let run_sequentially ~(f : 'a doer) (tasks : 'a list) : unit =
let task_bar = TaskBar.create ~jobs:1 in let task_bar = TaskBar.create ~jobs:1 in
(ProcessPoolState.update_status := (ProcessPoolState.update_status :=
@ -39,12 +41,12 @@ let fork_protect ~f x =
module Runner = struct module Runner = struct
type 'a t = 'a ProcessPool.t type 'a t = 'a ProcessPool.t
let create ~jobs ~f = let create ~jobs ~f ~tasks =
PerfEvent.( PerfEvent.(
log (fun logger -> log_begin_event logger ~categories:["sys"] ~name:"fork prepare" ())) ; log (fun logger -> log_begin_event logger ~categories:["sys"] ~name:"fork prepare" ())) ;
ResultsDatabase.db_close () ; ResultsDatabase.db_close () ;
let pool = let pool =
ProcessPool.create ~jobs ~f ProcessPool.create ~jobs ~f ~tasks
~child_prelude: ~child_prelude:
((* hack: run post-fork bookkeeping stuff by passing a dummy function to [fork_protect] *) ((* hack: run post-fork bookkeeping stuff by passing a dummy function to [fork_protect] *)
fork_protect ~f:(fun () -> ())) fork_protect ~f:(fun () -> ()))
@ -54,10 +56,23 @@ module Runner = struct
pool pool
let run runner ~tasks = let run runner ~n_tasks =
(* Flush here all buffers to avoid passing unflushed data to forked processes, leading to duplication *) (* Flush here all buffers to avoid passing unflushed data to forked processes, leading to duplication *)
Pervasives.flush_all () ; Pervasives.flush_all () ;
(* Compact heap before forking *) (* Compact heap before forking *)
Gc.compact () ; Gc.compact () ;
ProcessPool.run runner tasks ProcessPool.run runner n_tasks
end end
let gen_of_list (lst : 'a list) : 'a task_generator =
let content = ref lst in
let is_empty () = List.is_empty !content in
let next _finished_item =
match !content with
| [] ->
None
| x :: xs ->
content := xs ;
Some x
in
{is_empty; next}

@ -9,6 +9,10 @@ open! IStd
type 'a doer = 'a -> unit type 'a doer = 'a -> unit
type 'a task_generator = 'a ProcessPool.task_generator
val gen_of_list : 'a list -> 'a task_generator
val run_sequentially : f:'a doer -> 'a list -> unit val run_sequentially : f:'a doer -> 'a list -> unit
(** Run the tasks sequentially *) (** Run the tasks sequentially *)
@ -19,9 +23,9 @@ val fork_protect : f:('a -> 'b) -> 'a -> 'b
module Runner : sig module Runner : sig
type 'a t type 'a t
val create : jobs:int -> f:'a doer -> 'a t val create : jobs:int -> f:'a doer -> tasks:'a task_generator -> 'a t
(** Create a runner running [jobs] jobs in parallel *) (** Create a runner running [jobs] jobs in parallel *)
val run : 'a t -> tasks:'a list -> unit val run : 'a t -> n_tasks:int -> unit
(** Start the given tasks with the runner and wait until completion *) (** Start the given tasks with the runner and wait until completion *)
end end

@ -11,6 +11,8 @@ module L = Logging
type child_info = {pid: Pid.t; down_pipe: Out_channel.t} type child_info = {pid: Pid.t; down_pipe: Out_channel.t}
type 'a task_generator = {is_empty: unit -> bool; next: 'a option -> 'a option}
(** the state of the pool *) (** the state of the pool *)
type 'a t = type 'a t =
{ jobs: int { jobs: int
@ -18,10 +20,12 @@ type 'a t =
; slots: child_info Array.t ; slots: child_info Array.t
(** array of child processes with their pids and channels we can use to send work down to (** array of child processes with their pids and channels we can use to send work down to
each child *) each child *)
; pending_items: 'a option Array.t
(** array keeping sent tasks to children; used for feeding the generator a child finishes *)
; children_updates: Unix.File_descr.t ; children_updates: Unix.File_descr.t
(** all the children send updates up the same pipe to the pool *) (** all the children send updates up the same pipe to the pool *)
; task_bar: TaskBar.t ; task_bar: TaskBar.t
; mutable tasks: 'a list (** work remaining to be done *) ; tasks: 'a task_generator (** generator for work remaining to be done *)
; mutable idle_children: int ; mutable idle_children: int
(** number of children currently ready for more work, but there are no tasks to send to (** number of children currently ready for more work, but there are no tasks to send to
them *) them *)
@ -155,13 +159,13 @@ let process_updates pool buffer =
killall pool ~slot "see backtrace above" killall pool ~slot "see backtrace above"
| Some (Ready slot) -> ( | Some (Ready slot) -> (
TaskBar.tasks_done_add pool.task_bar 1 ; TaskBar.tasks_done_add pool.task_bar 1 ;
match pool.tasks with match pool.tasks.next pool.pending_items.(slot) with
| [] -> | None ->
TaskBar.update_status pool.task_bar ~slot (Mtime_clock.now ()) "idle" ; TaskBar.update_status pool.task_bar ~slot (Mtime_clock.now ()) "idle" ;
pool.idle_children <- pool.idle_children + 1 pool.idle_children <- pool.idle_children + 1
| x :: tasks -> | Some x ->
pool.tasks <- tasks ;
let {down_pipe} = pool.slots.(slot) in let {down_pipe} = pool.slots.(slot) in
pool.pending_items.(slot) <- Some x ;
marshal_to_pipe down_pipe (Do x) ) marshal_to_pipe down_pipe (Do x) )
| None -> | None ->
() ()
@ -260,8 +264,9 @@ let fork_child ~child_prelude ~slot (updates_r, updates_w) ~f =
{pid; down_pipe= Unix.out_channel_of_descr to_child_w} {pid; down_pipe= Unix.out_channel_of_descr to_child_w}
let create : jobs:int -> child_prelude:(unit -> unit) -> f:('a -> unit) -> 'a t = let create :
fun ~jobs ~child_prelude ~f -> jobs:int -> child_prelude:(unit -> unit) -> f:('a -> unit) -> tasks:'a task_generator -> 'a t =
fun ~jobs ~child_prelude ~f ~tasks ->
let task_bar = TaskBar.create ~jobs in let task_bar = TaskBar.create ~jobs in
(* Pipe to communicate from children to parent. Only one pipe is needed: the messages sent by (* Pipe to communicate from children to parent. Only one pipe is needed: the messages sent by
children include the identifier of the child sending the message (its [slot]). This way there children include the identifier of the child sending the message (its [slot]). This way there
@ -271,12 +276,12 @@ let create : jobs:int -> child_prelude:(unit -> unit) -> f:('a -> unit) -> 'a t
(* we have forked the child processes and are now in the parent *) (* we have forked the child processes and are now in the parent *)
let[@warning "-26"] pipe_child_w = Unix.close pipe_child_w in let[@warning "-26"] pipe_child_w = Unix.close pipe_child_w in
let children_updates = pipe_child_r in let children_updates = pipe_child_r in
{slots; children_updates; jobs; task_bar; tasks= []; idle_children= 0} let pending_items : 'a option Array.t = Array.create ~len:jobs None in
{slots; children_updates; jobs; task_bar; tasks; pending_items; idle_children= 0}
let run pool tasks = let run pool n_tasks =
pool.tasks <- tasks ; TaskBar.set_tasks_total pool.task_bar n_tasks ;
TaskBar.set_tasks_total pool.task_bar (List.length tasks) ;
TaskBar.tasks_done_reset pool.task_bar ; TaskBar.tasks_done_reset pool.task_bar ;
(* Start with a negative number of completed tasks to account for the initial [Ready] (* Start with a negative number of completed tasks to account for the initial [Ready]
messages. All the children start by sending [Ready], which is interpreted by the parent process messages. All the children start by sending [Ready], which is interpreted by the parent process
@ -286,14 +291,14 @@ let run pool tasks =
(* allocate a buffer for reading children updates once for the whole run *) (* allocate a buffer for reading children updates once for the whole run *)
let buffer = Bytes.create buffer_size in let buffer = Bytes.create buffer_size in
(* wait for all children to run out of tasks *) (* wait for all children to run out of tasks *)
while not (List.is_empty pool.tasks && pool.idle_children >= pool.jobs) do while not (pool.tasks.is_empty () && pool.idle_children >= pool.jobs) do
process_updates pool buffer ; TaskBar.refresh pool.task_bar process_updates pool buffer ; TaskBar.refresh pool.task_bar
done ; done ;
wait_all pool ; wait_all pool ;
TaskBar.finish pool.task_bar TaskBar.finish pool.task_bar
let run pool tasks = let run pool n_tasks =
PerfEvent.(log (fun logger -> log_instant_event logger ~name:"start process pool" Global)) ; PerfEvent.(log (fun logger -> log_instant_event logger ~name:"start process pool" Global)) ;
run pool tasks ; run pool n_tasks ;
PerfEvent.(log (fun logger -> log_instant_event logger ~name:"end process pool" Global)) PerfEvent.(log (fun logger -> log_instant_event logger ~name:"end process pool" Global))

@ -26,8 +26,15 @@ open! IStd
(** A ['a t] process pool accepts tasks of type ['a]. ['a] will be marshalled over a Unix pipe.*) (** A ['a t] process pool accepts tasks of type ['a]. ['a] will be marshalled over a Unix pipe.*)
type _ t type _ t
val create : jobs:int -> child_prelude:(unit -> unit) -> f:('a -> unit) -> 'a t (** abstraction for generating jobs; [next finished_item] produces the next task,
and receives the task that was finished, or [None] if this is the first time a worker
begins work on a task *)
type 'a task_generator = {is_empty: unit -> bool; next: 'a option -> 'a option}
val create :
jobs:int -> child_prelude:(unit -> unit) -> f:('a -> unit) -> tasks:'a task_generator -> 'a t
(** Create a new pool of processes running [jobs] jobs in parallel *) (** Create a new pool of processes running [jobs] jobs in parallel *)
val run : 'a t -> 'a list -> unit val run : 'a t -> int -> unit
(** use the processes in the given process pool to run all the given tasks in parallel *) (** use the processes in the given process pool to run all the given tasks in parallel.
the int argument is used for counting only *)

@ -55,8 +55,10 @@ let run_compilation_database compilation_database should_capture_file =
"Starting %s %d files@\n%!" Config.clang_frontend_action_string number_of_jobs ; "Starting %s %d files@\n%!" Config.clang_frontend_action_string number_of_jobs ;
L.progress "Starting %s %d files@\n%!" Config.clang_frontend_action_string number_of_jobs ; L.progress "Starting %s %d files@\n%!" Config.clang_frontend_action_string number_of_jobs ;
let compilation_commands = List.map ~f:create_cmd compilation_data in let compilation_commands = List.map ~f:create_cmd compilation_data in
let runner = Tasks.Runner.create ~jobs:Config.jobs ~f:invoke_cmd in let tasks = Tasks.gen_of_list compilation_commands in
Tasks.Runner.run runner ~tasks:compilation_commands ; let runner = Tasks.Runner.create ~jobs:Config.jobs ~f:invoke_cmd ~tasks in
let n_tasks = List.length compilation_commands in
Tasks.Runner.run runner ~n_tasks ;
L.progress "@." ; L.progress "@." ;
L.(debug Analysis Medium) "Ran %d jobs" number_of_jobs L.(debug Analysis Medium) "Ran %d jobs" number_of_jobs

Loading…
Cancel
Save