[sched] refactor into a more sane structure

Summary:
- Convert `task_generator` into a module of `ProcessPool` and collect inside the two combinators which were in semi-random places.
- Make `SyntacticCallGraph` export a `task_generator` as opposed to a call-graph builder.
- Separate `target` type and put it in its own module to avoid dependency cycles.

Reviewed By: skcho

Differential Revision: D18425718

fbshipit-source-id: 7957edac8
master
Nikos Gorogiannis 5 years ago committed by Facebook Github Bot
parent c3a62b808a
commit be43364d05

@ -18,7 +18,7 @@ let clear_caches () =
Typ.Procname.SQLite.clear_cache ()
let analyze_target : TaskScheduler.target Tasks.doer =
let analyze_target : SchedulerTypes.target Tasks.doer =
let analyze_source_file exe_env source_file =
if Topl.is_active () then DB.Results_dir.init (Topl.sourcefile ()) ;
DB.Results_dir.init source_file ;
@ -116,7 +116,9 @@ let get_source_files_to_analyze ~changed_files =
let analyze source_files_to_analyze =
if Int.equal Config.jobs 1 then (
let target_files = List.rev_map source_files_to_analyze ~f:(fun sf -> TaskScheduler.File sf) in
let target_files =
List.rev_map source_files_to_analyze ~f:(fun sf -> SchedulerTypes.File sf)
in
Tasks.run_sequentially ~f:analyze_target target_files ;
BackendStats.get () )
else (

@ -0,0 +1,9 @@
(*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* This source code is licensed under the MIT license found in the
* LICENSE file in the root directory of this source tree.
*)
open! IStd
type target = Procname of Typ.Procname.t | File of SourceFile.t

@ -42,3 +42,69 @@ let build_from_sources g sources =
"Built call graph in %a, from %d total procs, %d reachable defined procs and takes %d bytes@."
Mtime.Span.pp (Mtime_clock.count time0) n_captured (CallGraph.n_procs g)
(Obj.(reachable_words (repr g)) * (Sys.word_size / 8))
let count_procedures () =
let db = ResultsDatabase.get_database () in
let stmt = Sqlite3.prepare db "SELECT COUNT(rowid) FROM procedures" in
let count =
match SqliteUtils.result_single_column_option db ~log:"counting procedures" stmt with
| Some (Sqlite3.Data.INT i64) ->
Int64.to_int i64 |> Option.value ~default:Int.max_value
| _ ->
L.die InternalError "Got no result trying to count procedures"
in
L.debug Analysis Quiet "Found %d procedures in procedures table.@." count ;
count
let bottom_up sources : SchedulerTypes.target ProcessPool.TaskGenerator.t =
let open SchedulerTypes in
(* this will potentially grossly overapproximate the tasks *)
let remaining = ref (count_procedures ()) in
let remaining_tasks () = !remaining in
let syntactic_call_graph = CallGraph.create CallGraph.default_initial_capacity in
let initialized = ref false in
let pending : CallGraph.Node.t list ref = ref [] in
let scheduled = ref Typ.Procname.Set.empty in
let is_empty () =
let empty = !initialized && List.is_empty !pending && Typ.Procname.Set.is_empty !scheduled in
if empty then (
remaining := 0 ;
L.progress "Finished call graph scheduling, %d procs remaining (in cycles).@."
(CallGraph.n_procs syntactic_call_graph) ;
if Config.debug_level_analysis > 0 then CallGraph.to_dotty syntactic_call_graph "cycles.dot" ;
(* save some memory *)
CallGraph.reset syntactic_call_graph ) ;
empty
in
let rec next_aux () =
match !pending with
| [] ->
pending := CallGraph.get_unflagged_leaves syntactic_call_graph ;
if List.is_empty !pending then None else next_aux ()
| n :: ns when n.flag || not (CallGraph.mem syntactic_call_graph n.id) ->
pending := ns ;
next_aux ()
| n :: ns ->
pending := ns ;
scheduled := Typ.Procname.Set.add n.pname !scheduled ;
CallGraph.flag_reachable syntactic_call_graph n.pname ;
Some (Procname n.pname)
in
let finished = function
| File _ ->
assert false
| Procname pname ->
decr remaining ;
scheduled := Typ.Procname.Set.remove pname !scheduled ;
CallGraph.remove_reachable syntactic_call_graph pname
in
let next () =
(* do construction here, to avoid having the call graph into forked workers *)
if not !initialized then (
build_from_sources syntactic_call_graph sources ;
initialized := true ) ;
next_aux ()
in
{remaining_tasks; is_empty; finished; next}

@ -6,5 +6,9 @@
*)
open! IStd
val build_from_sources : CallGraph.t -> SourceFile.t list -> unit
(** build restriction of call graph to procedures reachable from provided sources *)
val bottom_up : SourceFile.t list -> SchedulerTypes.target ProcessPool.TaskGenerator.t
(** task generator that works by
- loading the syntactic call graph from the capture DB
- restricting it to the reachable procs from the modified files
- scheduling leaves only and removing them from the graph when analysed.
*)

@ -5,95 +5,13 @@
* LICENSE file in the root directory of this source tree.
*)
open! IStd
module L = Logging
type target = Procname of Typ.Procname.t | File of SourceFile.t
type 'a task_generator = 'a Tasks.task_generator
let chain (gen1 : 'a task_generator) (gen2 : 'a task_generator) : 'a task_generator =
let remaining_tasks () = gen1.remaining_tasks () + gen2.remaining_tasks () in
let gen1_returned_empty = ref false in
let gen1_is_empty () =
gen1_returned_empty := !gen1_returned_empty || gen1.is_empty () ;
!gen1_returned_empty
in
let is_empty () = gen1_is_empty () && gen2.is_empty () in
let finished x = if gen1_is_empty () then gen2.finished x else gen1.finished x in
let next x = if gen1_is_empty () then gen2.next x else gen1.next x in
{remaining_tasks; is_empty; finished; next}
let count_procedures () =
let db = ResultsDatabase.get_database () in
let stmt = Sqlite3.prepare db "SELECT COUNT(rowid) FROM procedures" in
let count =
match SqliteUtils.result_single_column_option db ~log:"counting procedures" stmt with
| Some (Sqlite3.Data.INT i64) ->
Int64.to_int i64 |> Option.value ~default:Int.max_value
| _ ->
L.die InternalError "Got no result trying to count procedures"
in
L.debug Analysis Quiet "Found %d procedures in procedures table.@." count ;
count
let bottom_up sources : target task_generator =
(* this will potentially grossly overapproximate the tasks *)
let remaining = ref (count_procedures ()) in
let remaining_tasks () = !remaining in
let syntactic_call_graph = CallGraph.create CallGraph.default_initial_capacity in
let initialized = ref false in
let pending : CallGraph.Node.t list ref = ref [] in
let scheduled = ref Typ.Procname.Set.empty in
let is_empty () =
let empty = !initialized && List.is_empty !pending && Typ.Procname.Set.is_empty !scheduled in
if empty then (
remaining := 0 ;
L.progress "Finished call graph scheduling, %d procs remaining (in cycles).@."
(CallGraph.n_procs syntactic_call_graph) ;
if Config.debug_level_analysis > 0 then CallGraph.to_dotty syntactic_call_graph "cycles.dot" ;
(* save some memory *)
CallGraph.reset syntactic_call_graph ) ;
empty
in
let rec next_aux () =
match !pending with
| [] ->
pending := CallGraph.get_unflagged_leaves syntactic_call_graph ;
if List.is_empty !pending then None else next_aux ()
| n :: ns when n.flag || not (CallGraph.mem syntactic_call_graph n.id) ->
pending := ns ;
next_aux ()
| n :: ns ->
pending := ns ;
scheduled := Typ.Procname.Set.add n.pname !scheduled ;
CallGraph.flag_reachable syntactic_call_graph n.pname ;
Some (Procname n.pname)
in
let finished = function
| File _ ->
assert false
| Procname pname ->
decr remaining ;
scheduled := Typ.Procname.Set.remove pname !scheduled ;
CallGraph.remove_reachable syntactic_call_graph pname
in
let next () =
(* do construction here, to avoid having the call graph into forked workers *)
if not !initialized then (
SyntacticCallGraph.build_from_sources syntactic_call_graph sources ;
initialized := true ) ;
next_aux ()
in
{remaining_tasks; is_empty; finished; next}
let of_sources sources =
let open SchedulerTypes in
let gen =
List.rev_map sources ~f:(fun sf -> File sf)
|> List.permute ~random_state:(Random.State.make (Array.create ~len:1 0))
|> Tasks.gen_of_list
|> ProcessPool.TaskGenerator.of_list
in
let next x =
let res = gen.next x in
@ -103,5 +21,6 @@ let of_sources sources =
let schedule sources =
if Config.call_graph_schedule then chain (bottom_up sources) (of_sources sources)
if Config.call_graph_schedule then
ProcessPool.TaskGenerator.chain (SyntacticCallGraph.bottom_up sources) (of_sources sources)
else of_sources sources

@ -6,6 +6,4 @@
*)
open! IStd
type target = Procname of Typ.Procname.t | File of SourceFile.t
val schedule : SourceFile.t list -> target Tasks.task_generator
val schedule : SourceFile.t list -> SchedulerTypes.target ProcessPool.TaskGenerator.t

@ -9,8 +9,6 @@ open! IStd
type 'a doer = 'a -> unit
type 'a task_generator = 'a ProcessPool.task_generator
let fork_protect ~f x = BackendStats.reset () ; ForkUtils.protect ~f x
module Runner = struct
@ -39,25 +37,8 @@ module Runner = struct
ProcessPool.run runner
end
let gen_of_list (lst : 'a list) : 'a task_generator =
let content = ref lst in
let length = ref (List.length lst) in
let remaining_tasks () = !length in
let is_empty () = List.is_empty !content in
let finished _finished_item = decr length in
let next () =
match !content with
| [] ->
None
| x :: xs ->
content := xs ;
Some x
in
{remaining_tasks; is_empty; finished; next}
let run_sequentially ~(f : 'a doer) (tasks : 'a list) : unit =
let task_generator = gen_of_list tasks in
let task_generator = ProcessPool.TaskGenerator.of_list tasks in
let task_bar = TaskBar.create ~jobs:1 in
(ProcessPoolState.update_status :=
fun t status ->

@ -9,10 +9,6 @@ open! IStd
type 'a doer = 'a -> unit
type 'a task_generator = 'a ProcessPool.task_generator
val gen_of_list : 'a list -> 'a task_generator
val run_sequentially : f:'a doer -> 'a list -> unit
(** Run the tasks sequentially *)
@ -27,7 +23,7 @@ module Runner : sig
jobs:int
-> f:'work doer
-> child_epilogue:(unit -> 'final)
-> tasks:'work task_generator
-> tasks:'work ProcessPool.TaskGenerator.t
-> ('work, 'final) t
(** Create a runner running [jobs] jobs in parallel *)

@ -9,6 +9,43 @@ open! IStd
module F = Format
module L = Logging
module TaskGenerator = struct
type 'a t =
{ remaining_tasks: unit -> int
; is_empty: unit -> bool
; finished: 'a -> unit
; next: unit -> 'a option }
let chain (gen1 : 'a t) (gen2 : 'a t) : 'a t =
let remaining_tasks () = gen1.remaining_tasks () + gen2.remaining_tasks () in
let gen1_returned_empty = ref false in
let gen1_is_empty () =
gen1_returned_empty := !gen1_returned_empty || gen1.is_empty () ;
!gen1_returned_empty
in
let is_empty () = gen1_is_empty () && gen2.is_empty () in
let finished x = if gen1_is_empty () then gen2.finished x else gen1.finished x in
let next x = if gen1_is_empty () then gen2.next x else gen1.next x in
{remaining_tasks; is_empty; finished; next}
let of_list (lst : 'a list) : 'a t =
let content = ref lst in
let length = ref (List.length lst) in
let remaining_tasks () = !length in
let is_empty () = List.is_empty !content in
let finished _finished_item = decr length in
let next () =
match !content with
| [] ->
None
| x :: xs ->
content := xs ;
Some x
in
{remaining_tasks; is_empty; finished; next}
end
let log_or_die fmt = if Config.keep_going then L.internal_error fmt else L.die InternalError fmt
type child_info = {pid: Pid.t; down_pipe: Out_channel.t}
@ -21,12 +58,6 @@ type child_info = {pid: Pid.t; down_pipe: Out_channel.t}
*)
type 'a child_state = Initializing | Idle | Processing of 'a
type 'a task_generator =
{ remaining_tasks: unit -> int
; is_empty: unit -> bool
; finished: 'a -> unit
; next: unit -> 'a option }
(** the state of the pool *)
type ('work, 'final) t =
{ jobs: int
@ -38,7 +69,7 @@ type ('work, 'final) t =
; children_updates: Unix.File_descr.t
(** all the children send updates up the same pipe to the pool *)
; task_bar: TaskBar.t
; tasks: 'work task_generator (** generator for work remaining to be done *)
; tasks: 'work TaskGenerator.t (** generator for work remaining to be done *)
; file_lock: Utils.file_lock (** file lock for sending worker messages *) }
(** {2 Constants} *)
@ -383,7 +414,7 @@ let create :
-> child_prelude:(unit -> unit)
-> f:('work -> unit)
-> child_epilogue:(unit -> 'final)
-> tasks:'work task_generator
-> tasks:'work TaskGenerator.t
-> ('work, 'final) t =
fun ~jobs ~child_prelude ~f ~child_epilogue ~tasks ->
let file_lock = Utils.create_file_lock () in

@ -7,6 +7,30 @@
open! IStd
module TaskGenerator : sig
(** abstraction for generating jobs *)
type 'a t =
{ remaining_tasks: unit -> int
(** number of tasks remaining to complete -- only used for reporting, so imprecision is not a bug *)
; is_empty: unit -> bool
(** when should the main loop of the task manager stop expecting new tasks *)
; finished: 'a -> unit
(** Process pool calls [finished x] when a worker finishes item [x]. This is only called
if [next ()] has previously returned [Some x] and [x] was sent to a worker. *)
; next: unit -> 'a option
(** [next ()] generates the next work item. If [is_empty ()] is true then [next ()]
must return [None]. However, it is OK to for [next ()] to return [None] when [is_empty]
is false. This corresponds to the case where there is more work to be done,
but it is not schedulable until some already scheduled work is finished. *)
}
val chain : 'a t -> 'a t -> 'a t
(** chain two generators in order *)
val of_list : 'a list -> 'a t
(** schedule tasks out of a concrete list *)
end
(** Pool of parallel workers that can both receive tasks from the master process and start doing
tasks on their own. Unix pipes are used for communication, all while refreshing a task bar
periodically.
@ -27,28 +51,12 @@ open! IStd
results of type ['final]. ['work] and ['final] will be marshalled over a Unix pipe.*)
type (_, _) t
(** abstraction for generating jobs *)
type 'a task_generator =
{ remaining_tasks: unit -> int
(** number of tasks remaining to complete -- only used for reporting, so imprecision is not a bug *)
; is_empty: unit -> bool
(** when should the main loop of the task manager stop expecting new tasks *)
; finished: 'a -> unit
(** Process pool calls [finished x] when a worker finishes item [x]. This is only called
if [next ()] has previously returned [Some x] and [x] was sent to a worker. *)
; next: unit -> 'a option
(** [next ()] generates the next work item. If [is_empty ()] is true then [next ()]
must return [None]. However, it is OK to for [next ()] to return [None] when [is_empty]
is false. This corresponds to the case where there is more work to be done,
but it is not schedulable until some already scheduled work is finished. *)
}
val create :
jobs:int
-> child_prelude:(unit -> unit)
-> f:('work -> unit)
-> child_epilogue:(unit -> 'final)
-> tasks:'work task_generator
-> tasks:'work TaskGenerator.t
-> ('work, 'final) t
(** Create a new pool of processes running [jobs] jobs in parallel *)

@ -55,7 +55,7 @@ let run_compilation_database compilation_database should_capture_file =
"Starting %s %d files@\n%!" Config.clang_frontend_action_string number_of_jobs ;
L.progress "Starting %s %d files@\n%!" Config.clang_frontend_action_string number_of_jobs ;
let compilation_commands = List.map ~f:create_cmd compilation_data in
let tasks = Tasks.gen_of_list compilation_commands in
let tasks = ProcessPool.TaskGenerator.of_list compilation_commands in
(* no stats to record so [child_epilogue] does nothing and we ignore the return
{!Tasks.Runner.run} *)
let runner =

Loading…
Cancel
Save