[AL] Use Parmap to schedule parallel processes on compilation-database-based analyses

Summary: Parmap delivers a better scheduling, which works as a pipeline, as opposed to what existed before, which schedules processes in batches.

Reviewed By: jvillard

Differential Revision: D5678661

fbshipit-source-id: a632c71
master
Martino Luca 7 years ago committed by Facebook Github Bot
parent 5569ee751a
commit 35ad7dd8bf

@ -153,6 +153,8 @@ let lint_dotty_dir_name = "lint_dotty"
let lint_issues_dir_name = "lint_issues" let lint_issues_dir_name = "lint_issues"
let linters_failed_sentinel_filename = "linters_failed_sentinel"
(** letters used in the analysis output *) (** letters used in the analysis output *)
let log_analysis_file = "F" let log_analysis_file = "F"

@ -147,6 +147,8 @@ val lint_dotty_dir_name : string
val lint_issues_dir_name : string val lint_issues_dir_name : string
val linters_failed_sentinel_filename : string
val load_average : float option val load_average : float option
val log_analysis_crash : string val log_analysis_crash : string

@ -34,73 +34,6 @@ let create_process_and_wait ~prog ~args =
(String.concat ~sep:" " (prog :: args)) (String.concat ~sep:" " (prog :: args))
(Unix.Exit_or_signal.to_string_hum status) (Unix.Exit_or_signal.to_string_hum status)
(** Given a process id and a function that describes the command that the process id
represents, prints a message explaining the command and its status, if in debug or stats mode.
It also prints a dot to show progress of jobs being finished. *)
let print_status ~fail_on_failed_job f pid status =
L.(debug Analysis Medium)
"%a%s@."
(fun fmt pid -> F.pp_print_string fmt (f pid))
pid (Unix.Exit_or_signal.to_string_hum status) ;
L.progress ".%!" ;
match status with
| Error err when fail_on_failed_job
-> L.exit (match err with `Exit_non_zero i -> i | `Signal _ -> 1)
| _
-> ()
let start_current_jobs_count () = ref 0
let waited_for_jobs = ref 0
module PidMap = Caml.Map.Make (Pid)
(** [wait_for_son pid_child f jobs_count] wait for pid_child
and all the other children and update the current jobs count.
Use f to print the job status *)
let rec wait_for_child ~fail_on_failed_job f current_jobs_count jobs_map =
let pid, status = Unix.wait `Any in
Pervasives.decr current_jobs_count ;
Pervasives.incr waited_for_jobs ;
print_status ~fail_on_failed_job f pid status ;
jobs_map := PidMap.remove pid !jobs_map ;
if not (PidMap.is_empty !jobs_map) then
wait_for_child ~fail_on_failed_job f current_jobs_count jobs_map
let pid_to_program jobsMap pid =
try PidMap.find pid jobsMap
with Not_found -> ""
(** [run_jobs_in_parallel jobs_stack gen_prog prog_to_string] runs the jobs in the given stack, by
spawning the jobs in batches of n, where n is [Config.jobs]. It then waits for all those jobs
and starts a new batch and so on. [gen_prog] should return a tuple [(dir_opt, command, args,
env)] where [dir_opt] is an optional directory to chdir to before executing [command] with
[args] in [env]. [prog_to_string] is used for printing information about the job's status. *)
let run_jobs_in_parallel ?(fail_on_failed_job= false) jobs_stack gen_prog prog_to_string =
let run_job () =
let jobs_map = ref PidMap.empty in
let current_jobs_count = start_current_jobs_count () in
while not (Stack.is_empty jobs_stack) do
let job_prog = Stack.pop_exn jobs_stack in
let dir_opt, prog, args, env = gen_prog job_prog in
Pervasives.incr current_jobs_count ;
match Unix.fork () with
| `In_the_child
-> Option.iter dir_opt ~f:Unix.chdir ;
Unix.exec ~prog ~argv:(prog :: args) ~env ~use_path:false |> Unix.handle_unix_error
|> never_returns
| `In_the_parent pid_child
-> jobs_map := PidMap.add pid_child (prog_to_string job_prog) !jobs_map ;
if Int.equal (Stack.length jobs_stack) 0 || !current_jobs_count >= Config.jobs then
wait_for_child ~fail_on_failed_job
(pid_to_program !jobs_map)
current_jobs_count jobs_map
done
in
run_job () ;
L.progress ".@." ;
L.(debug Analysis Medium) "Waited for %d jobs" !waited_for_jobs
let pipeline ~producer_prog ~producer_args ~consumer_prog ~consumer_args = let pipeline ~producer_prog ~producer_args ~consumer_prog ~consumer_args =
let pipe_in, pipe_out = Unix.pipe () in let pipe_in, pipe_out = Unix.pipe () in
match Unix.fork () with match Unix.fork () with

@ -18,15 +18,6 @@ val print_error_and_exit : ?exit_code:int -> ('a, Format.formatter, unit, 'b) fo
(** Prints an error message to a log file, prints a message saying that the error can be (** Prints an error message to a log file, prints a message saying that the error can be
found in that file, and exist, with default code 1 or a given code. *) found in that file, and exist, with default code 1 or a given code. *)
val run_jobs_in_parallel :
?fail_on_failed_job:bool -> 'a Stack.t -> ('a -> string option * string * string list * Unix.env)
-> ('a -> string) -> unit
(** [run_jobs_in_parallel jobs_stack gen_prog prog_to_string] runs the jobs in the given stack, by
spawning the jobs in batches of n, where n is [Config.jobs]. It then waits for all those jobs
and starts a new batch and so on. [gen_prog] should return a tuple [(dir_opt, command, args,
env)] where [dir_opt] is an optional directory to chdir to before executing [command] with
[args] in [env]. [prog_to_string] is used for printing information about the job's status. *)
val pipeline : val pipeline :
producer_prog:string -> producer_args:string list -> consumer_prog:string producer_prog:string -> producer_args:string list -> consumer_prog:string
-> consumer_args:string list -> Unix.Exit_or_signal.t * Unix.Exit_or_signal.t -> consumer_args:string list -> Unix.Exit_or_signal.t * Unix.Exit_or_signal.t

@ -12,57 +12,73 @@ module F = Format
module CLOpt = CommandLineOption module CLOpt = CommandLineOption
module L = Logging module L = Logging
let capture_text = type cmd = {cwd: string; prog: string; args: string}
if Config.equal_analyzer Config.analyzer Config.Linters then "linting" else "translating"
let create_files_stack compilation_database should_capture_file =
let stack = Stack.create () in
let add_to_stack file _ = if should_capture_file file then Stack.push stack file in
CompilationDatabase.iter compilation_database add_to_stack ; stack
let swap_command cmd =
let plusplus = "++" in
let clang = "clang" in
let clangplusplus = "clang++" in
if String.is_suffix ~suffix:plusplus cmd then Config.wrappers_dir ^/ clangplusplus
else Config.wrappers_dir ^/ clang
let run_compilation_file compilation_database file = let create_cmd (compilation_data: CompilationDatabase.compilation_data) =
try let swap_command cmd =
let compilation_data = CompilationDatabase.find compilation_database file in if String.is_suffix ~suffix:"++" cmd then Config.wrappers_dir ^/ "clang++"
let wrapper_cmd = swap_command compilation_data.command in else Config.wrappers_dir ^/ "clang"
in
let arg_file = let arg_file =
ClangQuotes.mk_arg_file "cdb_clang_args_" ClangQuotes.EscapedNoQuotes [compilation_data.args] ClangQuotes.mk_arg_file "cdb_clang_args_" ClangQuotes.EscapedNoQuotes [compilation_data.args]
in in
let args = [("@" ^ arg_file)] in {cwd= compilation_data.dir; prog= swap_command compilation_data.command; args= arg_file}
let env =
`Extend (* A sentinel is a file which indicates that a failure occurred in another infer process.
[ ( CLOpt.args_env_var Because infer processes run in parallel but do not share any memory, we use the
, String.concat ~sep:(String.of_char CLOpt.env_var_sep) filesystem to signal failures across processes. *)
(Option.to_list (Sys.getenv CLOpt.args_env_var) @ ["--fcp-syntax-only"]) ) ] let sentinel_exists sentinel_opt =
let file_exists sentinel = PVariant.( = ) (Sys.file_exists sentinel) `Yes in
Option.value_map ~default:false sentinel_opt ~f:file_exists
let invoke_cmd ~fail_sentinel cmd =
if sentinel_exists fail_sentinel then L.progress "E%!"
else (
Unix.chdir cmd.cwd ;
let pid =
Unix.fork_exec ~prog:cmd.prog ~argv:[cmd.prog; ("@" ^ cmd.args); "-fsyntax-only"]
~use_path:false ()
in in
(Some compilation_data.dir, wrapper_cmd, args, env) let create_sentinel_if_needed () =
with Not_found -> let create_empty_file fname = Utils.with_file_out ~f:(fun _ -> ()) fname in
Process.print_error_and_exit "Failed to find compilation data for %a@\n%!" SourceFile.pp file Option.iter fail_sentinel ~f:create_empty_file
in
match Unix.waitpid pid with
| Ok ()
-> L.progress ".%!"
| Error _
-> L.progress "!%!" ; create_sentinel_if_needed () )
let run_compilation_database compilation_database should_capture_file = let run_compilation_database compilation_database should_capture_file =
let number_of_files = CompilationDatabase.get_size compilation_database in let compilation_data =
L.(debug Capture Quiet) "Starting %s %d files@\n%!" capture_text number_of_files ; CompilationDatabase.filter_compilation_data compilation_database ~f:should_capture_file
L.progress "Starting %s %d files@\n%!" capture_text number_of_files ; in
let jobs_stack = create_files_stack compilation_database should_capture_file in let number_of_jobs = List.length compilation_data in
let capture_text_upper = String.capitalize capture_text in let capture_text =
let job_to_string file = Format.asprintf "%s %a" capture_text_upper SourceFile.pp file in if Config.equal_analyzer Config.analyzer Config.Linters then "linting" else "translating"
let fail_on_failed_job = in
if Config.linters_ignore_clang_failures then false L.(debug Capture Quiet) "Starting %s %d files@\n%!" capture_text number_of_jobs ;
L.progress "Starting %s %d files@\n%!" capture_text number_of_jobs ;
let sequence = Parmap.L (List.map ~f:create_cmd compilation_data) in
let fail_sentinel_fname = Config.results_dir ^/ Config.linters_failed_sentinel_filename in
let fail_sentinel =
if Config.linters_ignore_clang_failures then None
else else
match Config.buck_compilation_database with match Config.buck_compilation_database with
| Some NoDeps | Some NoDeps when Config.clang_frontend_do_lint
-> Config.clang_frontend_do_lint -> Some fail_sentinel_fname
| _ | Some NoDeps | Some Deps _ | None
-> false -> None
in in
Process.run_jobs_in_parallel ~fail_on_failed_job jobs_stack Utils.rmtree fail_sentinel_fname ;
(run_compilation_file compilation_database) job_to_string let chunksize = min (List.length compilation_data / Config.jobs + 1) 10 in
Parmap.pariter ~ncores:Config.jobs ~chunksize (invoke_cmd ~fail_sentinel) sequence ;
L.progress "@." ;
L.(debug Analysis Medium) "Ran %d jobs" number_of_jobs ;
if sentinel_exists fail_sentinel then (
L.progress
"Failure detected, capture did not finish successfully. Use `--linters-ignore-clang-failures` to ignore compilation errors. Terminating@." ;
L.exit 1 )
(** Computes the compilation database files. *) (** Computes the compilation database files. *)
let get_compilation_database_files_buck ~prog ~args = let get_compilation_database_files_buck ~prog ~args =

@ -20,6 +20,9 @@ let get_size database = SourceFile.Map.cardinal !database
let iter database f = SourceFile.Map.iter f !database let iter database f = SourceFile.Map.iter f !database
let filter_compilation_data database ~f =
SourceFile.Map.filter (fun s _ -> f s) !database |> SourceFile.Map.bindings |> List.map ~f:snd
let find database key = SourceFile.Map.find key !database let find database key = SourceFile.Map.find key !database
let parse_command_and_arguments command_and_arguments = let parse_command_and_arguments command_and_arguments =

@ -19,6 +19,8 @@ val get_size : t -> int
val iter : t -> (SourceFile.t -> compilation_data -> unit) -> unit val iter : t -> (SourceFile.t -> compilation_data -> unit) -> unit
val filter_compilation_data : t -> f:(SourceFile.t -> bool) -> compilation_data list
val find : t -> SourceFile.t -> compilation_data val find : t -> SourceFile.t -> compilation_data
val decode_json_file : t -> [< `Escaped of string | `Raw of string] -> unit val decode_json_file : t -> [< `Escaped of string | `Raw of string] -> unit

Loading…
Cancel
Save