[taskbar] it is born

Summary:
Replace the previous outputting of "." and "F" with an actual progress bar and
a multiline display of what procedure each process is currently busy analysing.
Observe:

```lang=text
Found 19 source files to analyze in /home/jul/code/openssl-1.1.0d/infer-out
 7/19 [######################......................................] 36%
⊢ [ 1.14s] crypto/mem.c: CRYPTO_malloc
⊢ [ 1.68s] crypto/o_time.c: julian_adj
⊢ [ 0.50s] crypto/mem.c: CRYPTO_zalloc
⊢ [ 1.80s] crypto/o_str.c: OPENSSL_strlcpy
```

This works by setting up a worker pool (as before) that waits to receive jobs
(not as before: we used to fork for each new job). Unix pipes are used for
communication.

The new worker pool can be used to experiment with other concurrency models,
such as reviving per-procedure-parallelism, or making sure each procedure is
analysed only once.

Perf tests indicate that this version is no slower than the previous one,
either on laptops or devserver: about 3% worse user time but ~40% better system time.
This new version forks <jobs> processes whereas the previous version would
fork `O(number of source files)` times.

`infer -j 1` shows a progress bar that doesn't update timing info (because it
would need a second process to do that).

Reviewed By: mbouaziz

Differential Revision: D8517507

fbshipit-source-id: c8ca104
master
Jules Villard 7 years ago committed by Facebook Github Bot
parent 0914fee2cc
commit 4a1379ebc5

@ -162,7 +162,7 @@ endif
clusters:=base clang java IR
ml_src_files:=$(shell find . -not -path "./*stubs*" -regex '\./[a-zA-Z].*\.ml\(i\)*')
inc_flags:=$(foreach dir,$(shell find . -type d),-I $(dir))
inc_flags:=$(foreach dir,$(shell find . -not -path './_build*' -type d),-I $(dir))
root_flags:=$(foreach root,$(roots),-r $(root))
cluster_flags:=$(foreach cluster,$(clusters),-c $(cluster))

@ -11,24 +11,13 @@ open! IStd
module L = Logging
(** Create tasks to analyze an execution environment *)
let create_exe_env_tasks source_file exe_env : Tasks.t =
L.progressbar_file () ;
Summary.clear_cache () ;
Typ.Procname.SQLite.clear_cache () ;
Random.self_init () ;
[ (fun () ->
Callbacks.analyze_file exe_env source_file ;
if Config.write_html then Printer.write_all_html_files source_file ) ]
(** Create tasks to analyze a cluster *)
let create_source_file_tasks (source_file: SourceFile.t) : Tasks.t =
let analyze_source_file : SourceFile.t Tasks.doer =
fun source_file ->
let exe_env = Exe_env.mk () in
L.(debug Analysis Medium) "@\nProcessing '%a'@." SourceFile.pp source_file ;
create_exe_env_tasks source_file exe_env
Callbacks.analyze_file exe_env source_file ;
if Config.write_html then Printer.write_all_html_files source_file
let analyze_source_file source_file = Tasks.run (create_source_file_tasks source_file)
let output_json_makefile_stats clusters =
let num_files = List.length clusters in
@ -43,23 +32,6 @@ let output_json_makefile_stats clusters =
Yojson.Basic.pretty_to_channel f file_stats
let print_legend () =
L.progress "Starting analysis...@\n" ;
L.progress "@\n" ;
L.progress "legend:@." ;
L.progress " \"%s\" analyzing a file@\n" Config.log_analysis_file ;
L.progress " \"%s\" analyzing a procedure@\n" Config.log_analysis_procedure ;
if Config.debug_mode then (
L.progress " \"%s\" analyzer crashed@\n" Config.log_analysis_crash ;
L.progress " \"%s\" timeout: procedure analysis took too much time@\n"
Config.log_analysis_wallclock_timeout ;
L.progress " \"%s\" timeout: procedure analysis took too many symbolic execution steps@\n"
Config.log_analysis_symops_timeout ;
L.progress " \"%s\" timeout: procedure analysis took too many recursive iterations@\n"
Config.log_analysis_recursion_timeout ) ;
L.progress "@\n@?"
let source_file_should_be_analyzed ~changed_files source_file =
(* whether [fname] is one of the [changed_files] *)
let is_changed_file = Option.map changed_files ~f:(SourceFile.Set.mem source_file) in
@ -106,19 +78,16 @@ let main ~changed_files =
else "" )
(if Int.equal n_source_files 1 then "" else "s")
Config.results_dir ;
print_legend () ;
(* empty all caches to minimize the process heap to have less work to do when forking *)
Summary.clear_cache () ;
Typ.Procname.SQLite.clear_cache () ;
Random.self_init () ;
if Int.equal Config.jobs 1 then (
List.iter ~f:analyze_source_file source_files_to_analyze ;
Tasks.run_sequentially ~f:analyze_source_file source_files_to_analyze ;
L.progress "@\nAnalysis finished in %as@." Pp.elapsed_time () )
else (
L.environment_info "Parallel jobs: %d@." Config.jobs ;
(* Prepare tasks one cluster at a time while executing in parallel *)
let runner = Tasks.Runner.create ~jobs:Config.jobs in
let analyze source_file =
let tasks = create_source_file_tasks source_file in
let aggregate_tasks = Tasks.aggregate ~size:Config.procedures_per_process tasks in
Tasks.Runner.start runner ~tasks:aggregate_tasks
in
List.iter ~f:analyze source_files_to_analyze ;
Tasks.Runner.complete runner ) ;
let runner = Tasks.Runner.create ~jobs:Config.jobs ~f:analyze_source_file in
Tasks.Runner.run runner ~tasks:source_files_to_analyze ) ;
output_json_makefile_stats source_files_to_analyze

@ -8,18 +8,23 @@
open! IStd
module L = Logging
type task = unit -> unit
type 'a doer = 'a -> unit
let run_sequentially ~(f: 'a doer) (tasks: 'a list) : unit =
let task_bar =
if Config.show_progress_bar then TaskBar.create_multiline ~jobs:1 else TaskBar.create_dummy ()
in
(ProcessPoolState.update_status :=
fun t status ->
TaskBar.update_status task_bar ~slot:0 t status ;
TaskBar.refresh task_bar) ;
TaskBar.set_tasks_total task_bar (List.length tasks) ;
TaskBar.tasks_done_reset task_bar ;
List.iter
~f:(fun task -> f task ; TaskBar.tasks_done_add task_bar 1 ; TaskBar.refresh task_bar)
tasks ;
TaskBar.finish task_bar
type t = task list
(** Aggregate closures into groups of the given size *)
let aggregate ~size t =
let group_to_closure group () = List.iter ~f:(fun closure -> closure ()) group in
let group_size = if size > 0 then size else List.length t / Config.jobs in
if group_size > 1 then List.chunks_of t ~length:group_size |> List.map ~f:group_to_closure else t
let run t = List.iter ~f:(fun f -> f ()) t
let fork_protect ~f x =
EventLogger.prepare () ;
@ -29,18 +34,26 @@ let fork_protect ~f x =
module Runner = struct
type tasks = t
type t = {pool: ProcessPool.t}
let create ~jobs = {pool= ProcessPool.create ~jobs}
let start runner ~tasks =
let pool = runner.pool in
type 'a t = {pool: 'a ProcessPool.t; task_bar: TaskBar.t}
let create ~jobs ~f =
let task_bar =
if Config.show_progress_bar then TaskBar.create_multiline ~jobs else TaskBar.create_dummy ()
in
{ pool=
ProcessPool.create ~jobs
~child_prelude:
((* hack: run post-fork bookkeeping stuff by passing a dummy function to [fork_protect] *)
fork_protect ~f:(fun () -> () ))
task_bar ~f
; task_bar }
let run runner ~tasks =
(* Flush here all buffers to avoid passing unflushed data to forked processes, leading to duplication *)
Pervasives.flush_all () ;
List.iter ~f:(fun x -> ProcessPool.start_child ~f:(fun f -> fork_protect ~f ()) ~pool x) tasks
let complete runner = ProcessPool.wait_all runner.pool
(* Compact heap before forking *)
Gc.compact () ;
ProcessPool.run runner.pool tasks ;
TaskBar.finish runner.task_bar
end

@ -7,33 +7,21 @@
open! IStd
(** Each task executes a closure *)
type task = unit -> unit
type 'a doer = 'a -> unit
(** A sequence of tasks that can be executed in parallel *)
type t = task list
val aggregate : size:int -> t -> t
(** Aggregate closures into groups of the given size *)
val run : t -> unit
(** Run the closures *)
val run_sequentially : f:'a doer -> 'a list -> unit
(** Run the tasks sequentially *)
val fork_protect : f:('a -> 'b) -> 'a -> 'b
(** does the bookkeeping necessary to safely execute an infer function [f] after a call to fork(2) *)
(** A runner accepts new tasks repeatedly for parallel execution *)
module Runner : sig
type tasks = t
type 'a t
type t
val create : jobs:int -> t
val create : jobs:int -> f:'a doer -> 'a t
(** Create a runner running [jobs] jobs in parallel *)
val start : t -> tasks:tasks -> unit
(** Start the given tasks with the runner *)
val complete : t -> unit
(** Wait until all the outstanding tasks are completed *)
val run : 'a t -> tasks:'a list -> unit
(** Start the given tasks with the runner and wait until completion *)
end

@ -28,6 +28,10 @@ let unset_callbacks () = callbacks_ref := None
let nesting = ref 0
(* Remember what the last status sent was so that we can update the status correctly when entering
and exiting nested ondemand analyses. In particular we need to remember the original time.*)
let current_taskbar_status : (Mtime.t * string) option ref = ref None
let is_active, add_active, remove_active =
let currently_analyzed = ref Typ.Procname.Set.empty in
let is_active proc_name = Typ.Procname.Set.mem proc_name !currently_analyzed
@ -79,6 +83,8 @@ type global_state =
; footprint_mode: bool
; html_formatter: F.formatter
; name_generator: Ident.NameGenerator.t
; proc_analysis_time: (Mtime.Span.t * string) option
(** the time elapsed doing [status] so far *)
; symexec_state: State.t }
let save_global_state () =
@ -90,6 +96,9 @@ let save_global_state () =
; footprint_mode= !Config.footprint
; html_formatter= !Printer.curr_html_formatter
; name_generator= Ident.NameGenerator.get_current ()
; proc_analysis_time=
Option.map !current_taskbar_status ~f:(fun (t0, status) ->
(Mtime.span t0 (Mtime_clock.now ()), status) )
; symexec_state= State.save_state () }
@ -101,6 +110,14 @@ let restore_global_state st =
Printer.curr_html_formatter := st.html_formatter ;
Ident.NameGenerator.set_current st.name_generator ;
State.restore_state st.symexec_state ;
current_taskbar_status :=
Option.map st.proc_analysis_time ~f:(fun (suspended_span, status) ->
(* forget about the time spent doing a nested analysis and resend the status of the outer
analysis with the updated "original" start time *)
let new_t0 = Mtime.sub_span (Mtime_clock.now ()) suspended_span in
let new_t0 = Option.value_exn new_t0 in
!ProcessPoolState.update_status new_t0 status ;
(new_t0, status) ) ;
Timeout.resume_previous_timeout ()
@ -113,7 +130,6 @@ let run_proc_analysis analyze_proc ~caller_pdesc callee_pdesc =
"Elapsed analysis time: %a: %a@\n" Typ.Procname.pp callee_pname Mtime.Span.pp
(Mtime_clock.count start_time)
in
L.progressbar_procedure () ;
if Config.trace_ondemand then
L.progress "[%d] run_proc_analysis %a -> %a@." !nesting (Pp.option Typ.Procname.pp)
(Option.map caller_pdesc ~f:Procdesc.get_proc_name)
@ -170,7 +186,17 @@ let run_proc_analysis analyze_proc ~caller_pdesc callee_pdesc =
let analyze_proc ?caller_pdesc callee_pdesc =
let callbacks = Option.value_exn !callbacks_ref in
Some (run_proc_analysis callbacks.analyze_ondemand ~caller_pdesc callee_pdesc)
(* wrap [callbacks.analyze_ondemand] to update the status bar *)
let analyze_proc summary pdesc =
let proc_name = Procdesc.get_proc_name callee_pdesc in
let source_file = (Procdesc.get_attributes callee_pdesc).ProcAttributes.translation_unit in
let t0 = Mtime_clock.now () in
let status = F.asprintf "%a: %a" SourceFile.pp source_file Typ.Procname.pp proc_name in
current_taskbar_status := Some (t0, status) ;
!ProcessPoolState.update_status t0 status ;
callbacks.analyze_ondemand summary pdesc
in
Some (run_proc_analysis analyze_proc ~caller_pdesc callee_pdesc)
let analyze_proc_desc ~caller_pdesc callee_pdesc =

@ -205,19 +205,6 @@ let lint_issues_dir_name = "lint_issues"
let linters_failed_sentinel_filename = "linters_failed_sentinel"
(** letters used in the analysis output *)
let log_analysis_file = "F"
let log_analysis_procedure = "."
let log_analysis_wallclock_timeout = "T"
let log_analysis_symops_timeout = "S"
let log_analysis_recursion_timeout = "R"
let log_analysis_crash = "C"
let manual_buck_compilation_db = "BUCK COMPILATION DATABASE OPTIONS"
let manual_buck_flavors = "BUCK FLAVORS OPTIONS"
@ -456,13 +443,7 @@ let linters_def_default_file = linters_def_dir ^/ "linters.al"
let wrappers_dir = lib_dir ^/ "wrappers"
let ncpu =
try
Utils.with_process_in "getconf _NPROCESSORS_ONLN 2>/dev/null" (fun chan ->
Scanf.bscanf (Scanf.Scanning.from_channel chan) "%d" (fun n -> n) )
|> fst
with _ -> 1
let ncpu = Setcore.numcores ()
let os_type = match Sys.os_type with "Win32" -> Win32 | "Cygwin" -> Cygwin | _ -> Unix
@ -2755,7 +2736,7 @@ and procedures_filter = !procedures_filter
and procedures_name = !procedures_name
and procedures_per_process = !procedures_per_process
and[@warning "-32"] procedures_per_process = !procedures_per_process
and procedures_source_file = !procedures_source_file

@ -126,18 +126,6 @@ val linters_failed_sentinel_filename : string
val load_average : float option
val log_analysis_crash : string
val log_analysis_file : string
val log_analysis_procedure : string
val log_analysis_recursion_timeout : string
val log_analysis_symops_timeout : string
val log_analysis_wallclock_timeout : string
val max_widens : int
val meet_level : int
@ -540,8 +528,6 @@ val print_using_diff : bool
val printf_args : bool
val procedures_per_process : int
val procs_csv : string option
val project_root : string

@ -201,29 +201,6 @@ let phase fmt = log ~to_console:false phase_file_fmts fmt
let progress fmt = log ~to_console:(not Config.quiet) progress_file_fmts fmt
let progress_bar text =
log
~to_console:(Config.show_progress_bar && not Config.quiet)
~to_file:true progress_file_fmts "%s@?" text
let progressbar_file () = progress_bar Config.log_analysis_file
let progressbar_procedure () = progress_bar Config.log_analysis_procedure
let progressbar_timeout_event failure_kind =
if Config.debug_mode then
match failure_kind with
| SymOp.FKtimeout ->
progress_bar Config.log_analysis_wallclock_timeout
| SymOp.FKsymops_timeout _ ->
progress_bar Config.log_analysis_symops_timeout
| SymOp.FKrecursion_timeout _ ->
progress_bar Config.log_analysis_recursion_timeout
| SymOp.FKcrash msg ->
progress_bar (Printf.sprintf "%s(%s)" Config.log_analysis_crash msg)
let user_warning fmt = log ~to_console:(not Config.quiet) user_warning_file_fmts fmt
let user_error fmt = log ~to_console:true user_error_file_fmts fmt

@ -23,15 +23,6 @@ val environment_info : ('a, F.formatter, unit) format -> 'a
val progress : ('a, F.formatter, unit) format -> 'a
(** print immediately to standard error unless --quiet is specified *)
val progressbar_file : unit -> unit
(** Progress bar: start of the analysis of a file. *)
val progressbar_procedure : unit -> unit
(** Progress bar: start of the analysis of a procedure. *)
val progressbar_timeout_event : SymOp.failure_kind -> unit
(** Progress bar: log a timeout event if in developer mode. *)
val result : ('a, F.formatter, unit) format -> 'a
(** Emit a result to stdout. Use only if the output format is stable and useful enough that it may
conceivably get piped to another program, ie, almost never (use [progress] instead otherwise).

@ -4,37 +4,217 @@
* This source code is licensed under the MIT license found in the
* LICENSE file in the root directory of this source tree.
*)
open! IStd
module F = Format
module L = Logging
type t = {mutable num_processes: int; jobs: int}
(** the state of the pool *)
type 'a t =
{ jobs: int
(** number of jobs running in parallel, i.e. number of children we are responsible for *)
; slots: (Pid.t * Out_channel.t) Array.t
(** array of child processes with their pids and channels we can use to send work down to
each child *)
; children_updates: Unix.File_descr.t
(** all the children send updates up the same pipe to the pool *)
; task_bar: TaskBar.t
; mutable tasks: 'a list (** work remaining to be done *)
; mutable idle_children: int
(** number of children currently ready for more work, but there are no tasks to send to
them *)
}
(** {2 Constants} *)
(** refresh rate of the task bar (worst case: it also refreshes on children updates) *)
let refresh_timeout =
let frames_per_second = 12 in
`After (Time_ns.Span.of_int_ms (1_000 / frames_per_second))
(** size of the buffer for communicating with children --standard pipe buffer size *)
let buffer_size = 65_535
(** {2 parmap} *)
(** Messages from child processes to the parent process. Each message includes the identity of the
child sending the process as its index (slot) in the array [pool.slots].
LIMITATION: the messages must not be bigger than [buffer_size] once marshalled, or reading from
the pipe will crash in the parent process. This is a limitation of the way we read from the pipe
for now. To lift it, it should be possible to extend the buffer to the required length if we
notice that we are trying to read more than [buffer_size] for example. *)
type worker_message =
| UpdateStatus of int * Mtime.t * string
(** [(i, t, status)]: starting a task from slot [i], at start time [t], with description
[status]. Watch out that [status] must not be too close in length to [buffer_size]. *)
| Ready of int (** finished the given task, ready to receive messages *)
(** messages from the parent process down to worker processes *)
type 'a boss_message =
| Do of 'a (** a task to do *)
| GoHome (** all tasks done, prepare for teardown *)
let create ~jobs = {num_processes= 0; jobs}
(** convenience function to send data down pipes without forgetting to flush *)
let marshal_to_pipe fd x = Marshal.to_channel fd x [] ; Out_channel.flush fd
let incr counter = counter.num_processes <- counter.num_processes + 1
(** like [Unix.read] but reads until [len] bytes have been read *)
let rec really_read ?(pos= 0) ~len fd ~buf =
if len <= 0 then ()
else
let read = Unix.read ~pos ~len fd ~buf in
if Int.equal read 0 then raise End_of_file ;
really_read ~pos:(pos + read) ~len:(len - read) fd ~buf
let decr counter = counter.num_processes <- counter.num_processes - 1
let wait counter =
match Unix.wait `Any with
| _, Ok _ ->
decr counter
| _, (Error _ as status) ->
let log_or_die = if Config.keep_going then L.internal_error else L.die InternalError in
log_or_die "Error in infer subprocess: %s@." (Unix.Exit_or_signal.to_string_hum status) ;
decr counter
(** main dispatch function that responds to messages from worker processes and updates the taskbar
periodically *)
let process_updates pool buffer =
let timeout = if TaskBar.is_interactive pool.task_bar then refresh_timeout else `Never in
(* Use select(2) so that we can both wait on the pipe of children updates and wait for a
timeout. The timeout is for giving a chance to the taskbar of refreshing from time to time. *)
let {Unix.Select_fds.read= read_fds} =
Unix.select ~read:[pool.children_updates] ~write:[] ~except:[] ~timeout ()
in
match read_fds with
| _ :: _ :: _ ->
assert false
| [] ->
()
| [_updates_fd] ->
(* Read one OCaml value at a time. This is done by first reading the header of the marshalled
value (fixed size), then get the total size of the data from that header, then request a
read of the full OCaml value.
This way the buffer is used for only one OCaml value at a time. This is simpler (values do
not overlap across the end of a read and the beginning of another) and means we do not need
a large buffer as long as messages are never bigger than the buffer.
let wait_all counter = for _ = 1 to counter.num_processes do wait counter done
This works somewhat like [Marshal.from_channel] but uses the file descriptor directly
instead of an [in_channel]. Do *not* read from the pipe via an [in_channel] as they read
as much as possible eagerly. This can empty the pipe without us having a way to tell that
there is more to read anymore since the [select] call will return that there is nothing to
read. *)
really_read pool.children_updates ~buf:buffer ~len:Marshal.header_size ;
let data_size = Marshal.data_size buffer 0 in
really_read pool.children_updates ~buf:buffer ~pos:Marshal.header_size ~len:data_size ;
match Marshal.from_bytes buffer 0 with
| UpdateStatus (slot, t, status) ->
TaskBar.update_status pool.task_bar ~slot t status
| Ready slot ->
TaskBar.tasks_done_add pool.task_bar 1 ;
match pool.tasks with
| [] ->
TaskBar.update_status pool.task_bar ~slot (Mtime_clock.now ()) "idle" ;
pool.idle_children <- pool.idle_children + 1
| x :: tasks ->
pool.tasks <- tasks ;
let pipe = snd (pool.slots).(slot) in
marshal_to_pipe pipe (Do x)
let should_wait counter = counter.num_processes >= counter.jobs
let start_child ~f ~pool x =
(** terminate all worker processes *)
let wait_all pool =
(* tell all workers to go home *)
Array.iter pool.slots ~f:(fun (_, pipe) -> marshal_to_pipe pipe GoHome ; Out_channel.close pipe) ;
(* wait(2) all the pids one by one; the order doesn't matter since we want to wait for all of them
eventually anyway. *)
let errors =
Array.fold ~init:[] pool.slots ~f:(fun errors (pid, _) ->
match Unix.wait (`Pid pid) with
| _pid, Ok () ->
errors
| _pid, (Error _ as status) ->
(* Collect all children errors and die only at the end to avoid creating zombies. *)
status :: errors )
in
if not (List.is_empty errors) then
let log_or_die = if Config.keep_going then L.internal_error else L.die InternalError in
let pp_error f status =
F.fprintf f "Error in infer subprocess: %s@." (Unix.Exit_or_signal.to_string_hum status)
in
log_or_die "@[<v>%a@]%!" (Pp.seq ~print_env:Pp.text_break ~sep:"" pp_error) errors
(** worker loop: wait for tasks and run [f] on them until we are told to go home *)
let rec child_loop ~slot send_to_parent receive_from_parent ~f =
send_to_parent (Ready slot) ;
match receive_from_parent () with
| Do stuff ->
(* TODO: error handling *)
f stuff ;
child_loop ~slot send_to_parent receive_from_parent ~f
| GoHome ->
()
(** Fork a new child and start it so that it is ready for work.
The child inherits [updates_w] to send updates up to the parent, and a new pipe is set up for
the parent to send instructions down to the child. *)
let fork_child ~child_prelude ~slot (updates_r, updates_w) ~f =
let to_child_r, to_child_w = Unix.pipe () in
match Unix.fork () with
| `In_the_child ->
let[@warning "-26"] updates_r = Unix.close updates_r in
let[@warning "-26"] to_child_w = Unix.close to_child_w in
(* Pin to a core. [setcore] does the modulo <number of cores> for us. *)
Setcore.setcore slot ;
ProcessPoolState.in_child := true ;
f x ;
child_prelude () ;
let updates_oc = Unix.out_channel_of_descr updates_w in
let send_to_parent (message: worker_message) = marshal_to_pipe updates_oc message in
(* Function to send updates up the pipe to the parent instead of directly to the task
bar. This is because only the parent knows about all the children, hence it's in charge of
actually updating the task bar. *)
let update_status t status =
let status =
(* Truncate status if too big: it's pointless to spam the status bar with long status, and
also difficult to achieve technically over pipes (it's easier if all the messages fit
into a buffer of reasonable size). *)
if String.length status > 100 then String.subo ~len:100 status ^ "..." else status
in
send_to_parent (UpdateStatus (slot, t, status))
in
ProcessPoolState.update_status := update_status ;
let orders_ic = Unix.in_channel_of_descr to_child_r in
let receive_from_parent () = Marshal.from_channel orders_ic in
child_loop ~slot send_to_parent receive_from_parent ~f ;
Out_channel.close updates_oc ;
In_channel.close orders_ic ;
Pervasives.exit 0
| `In_the_parent _pid ->
incr pool ;
if should_wait pool then wait pool
| `In_the_parent pid ->
let[@warning "-26"] to_child_r = Unix.close to_child_r in
(pid, Unix.out_channel_of_descr to_child_w)
let create : jobs:int -> child_prelude:(unit -> unit) -> TaskBar.t -> f:('a -> unit) -> 'a t =
fun ~jobs ~child_prelude task_bar ~f ->
(* Pipe to communicate from children to parent. Only one pipe is needed: the messages sent by
children include the identifier of the child sending the message (its [slot]). This way there
is only one pipe to wait on for updates. *)
let (pipe_child_r, pipe_child_w) as status_pipe = Unix.pipe () in
let slots = Array.init jobs ~f:(fun slot -> fork_child ~child_prelude ~slot status_pipe ~f) in
(* we have forked the child processes and are now in the parent *)
let[@warning "-26"] pipe_child_w = Unix.close pipe_child_w in
let children_updates = pipe_child_r in
{slots; children_updates; jobs; task_bar; tasks= []; idle_children= 0}
let run pool tasks =
pool.tasks <- tasks ;
TaskBar.set_tasks_total pool.task_bar (List.length tasks) ;
TaskBar.tasks_done_reset pool.task_bar ;
(* Start with a negative number of completed tasks to account for the initial [Ready]
messages. All the children start by sending [Ready], which is interpreted by the parent process
as "one task has been completed". Starting with a negative number is a simple if hacky way to
account for these spurious "done" tasks. *)
TaskBar.tasks_done_add pool.task_bar (-pool.jobs) ;
(* allocate a buffer for reading children updates once for the whole run *)
let buffer = Bytes.create buffer_size in
(* wait for all children to run out of tasks *)
while pool.idle_children < pool.jobs do
process_updates pool buffer ; TaskBar.refresh pool.task_bar
done ;
wait_all pool

@ -7,15 +7,27 @@
open! IStd
(** Pool of processes to execute in parallel up to a number of jobs. *)
type t
(** Pool of parallel workers that can both receive tasks from the master process and start doing
tasks on their own. Unix pipes are used for communication, all while refreshing a task bar
periodically.
val create : jobs:int -> t
(** Create a new pool of processes *)
Due to ondemand analysis, workers may do tasks unprompted (eg, when analysing a procedure, a
process will typically end up analysing all its callees). Thus, children need to update the main
process (which is in charge of the task bar) whenever they start analysing a new procedure, and
whenever they resume analysing a previous procedure. This is more complicated than what, eg,
`ParMap` can handle because of the bidirectional flow between children and parents.
val start_child : f:('a -> unit) -> pool:t -> 'a -> unit
(** Start a new child process in the pool.
If all the jobs are taken, wait until one is free. *)
The children send "Ready" or "I'm working on task <some string>" messages that are used to
respectively send them more tasks ("Do x") or update the task bar with the description provided
by the child.
val wait_all : t -> unit
(** Wait until all the currently executing processes terminate *)
See also {!module-ProcessPoolState}. *)
(** A ['a t] process pool accepts tasks of type ['a]. ['a] will be marshalled over a Unix pipe.*)
type _ t
val create : jobs:int -> child_prelude:(unit -> unit) -> TaskBar.t -> f:('a -> unit) -> 'a t
(** Create a new pool of processes running [jobs] jobs in parallel *)
val run : 'a t -> 'a list -> unit
(** use the processes in the given process pool to run all the given tasks in parallel *)

@ -9,3 +9,5 @@ open! IStd
(** Keep track of whether the current execution is in a child process *)
let in_child = ref false
let update_status = ref (fun _ _ -> ())

@ -7,3 +7,7 @@
val in_child : bool ref
(** Keep track of whether the current execution is in a child process *)
val update_status : (Mtime.t -> string -> unit) ref
(** Ping the task bar whenever a new task is started with the start time and a description for the
task *)

@ -0,0 +1,154 @@
(*
* Copyright (c) 2018-present, Facebook, Inc.
*
* This source code is licensed under the MIT license found in the
* LICENSE file in the root directory of this source tree.
*)
open! IStd
module L = Logging
(** arbitrary *)
let progress_bar_total_size_default = 60
(** state of a multi-line task bar *)
type multiline_info =
{ jobs: int (** number of jobs running in parallel *)
; statuses: string Array.t
(** array of size [jobs] with a description of what the process is doing *)
; start_times: Mtime.t Array.t (** array of size [jobs] of start times for each process *)
; mutable tasks_done: int
; mutable tasks_total: int }
type t =
| MultiLine of multiline_info (** interactive *)
| NonInteractive (** display terse progress, to use when output is redirected *)
| Dummy (** ignore everything *)
(** print [c] [n] times *)
let rec pp_n c oc n =
if n > 0 then (
Out_channel.output_char oc c ;
pp_n c oc (n - 1) )
let progress_bar_total_size =
lazy
( if Unix.(isatty stdin) then
let term_width, _ = ANSITerminal.size () in
min progress_bar_total_size_default term_width
else progress_bar_total_size_default )
let draw_progress_bar ~total ~don =
let lazy progress_bar_total_size = progress_bar_total_size in
let bar_done_size = don * progress_bar_total_size / total in
let tasks_total_string = Int.to_string total in
let bar_tasks_num_size = String.length tasks_total_string in
Printf.eprintf "%*d/%s [%a%a] %d%%\n" bar_tasks_num_size don tasks_total_string (pp_n '#')
bar_done_size (pp_n '.')
(progress_bar_total_size - bar_done_size)
(don * 100 / total)
let draw_job_status ~draw_time t ~status ~t0 =
ANSITerminal.(prerr_string [Bold; magenta]) "" ;
( if draw_time then
let time_running = Mtime.span t0 t |> Mtime.Span.to_s in
Printf.eprintf "[%4.1fs] " time_running ) ;
Out_channel.output_string stderr status ;
ANSITerminal.erase Eol ;
Out_channel.output_string stderr "\n"
let refresh_multiline task_bar =
ANSITerminal.move_bol () ;
let should_draw_progress_bar = task_bar.tasks_total > 0 && task_bar.tasks_done >= 0 in
if should_draw_progress_bar then
draw_progress_bar ~total:task_bar.tasks_total ~don:task_bar.tasks_done ;
let t = Mtime_clock.now () in
let draw_time =
(* When there is only 1 job we are careful not to spawn processes needlessly, thus there is no
one to refresh the task bar while the analysis is running and the time displayed will always
be 0. Avoid confusion by not displaying the time in that case. *)
task_bar.jobs > 1
in
Array.iter2_exn task_bar.statuses task_bar.start_times ~f:(fun status t0 ->
draw_job_status ~draw_time t ~status ~t0 ) ;
let lines_printed =
let progress_bar = if should_draw_progress_bar then 1 else 0 in
task_bar.jobs + progress_bar
in
ANSITerminal.move_cursor 0 (-lines_printed) ;
()
let refresh = function MultiLine t -> refresh_multiline t | NonInteractive | Dummy -> ()
let create_multiline ~jobs =
if Unix.(isatty stdin) && Unix.(isatty stderr) then (
let t0 = Mtime_clock.now () in
let task_bar =
{ jobs
; statuses= Array.create ~len:jobs "idle"
; start_times= Array.create ~len:jobs t0
; tasks_done= 0
; tasks_total= 0 }
in
ANSITerminal.erase Below ; MultiLine task_bar )
else NonInteractive
let create_dummy () = Dummy
let update_status_multiline task_bar ~slot:job t0 status =
(task_bar.statuses).(job) <- status ;
(task_bar.start_times).(job) <- t0 ;
()
let update_status task_bar ~slot t0 status =
match task_bar with
| MultiLine t ->
update_status_multiline t ~slot t0 status
| NonInteractive | Dummy ->
()
let set_tasks_total task_bar n =
match task_bar with
| MultiLine multiline ->
multiline.tasks_total <- n
| NonInteractive | Dummy ->
()
let tasks_done_add task_bar n =
match task_bar with
| MultiLine multiline ->
multiline.tasks_done <- multiline.tasks_done + n
| NonInteractive ->
L.progress "#%!"
| Dummy ->
()
let tasks_done_reset task_bar =
match task_bar with
| MultiLine multiline ->
multiline.tasks_done <- 0
| NonInteractive | Dummy ->
()
let finish = function
| MultiLine _ ->
(* leave the progress bar displayed *)
Out_channel.output_string stderr "\n" ;
ANSITerminal.erase Below ;
Out_channel.flush stderr
| NonInteractive | Dummy ->
()
let is_interactive = function MultiLine _ -> true | NonInteractive | Dummy -> false

@ -0,0 +1,38 @@
(*
* Copyright (c) 2018-present, Facebook, Inc.
*
* This source code is licensed under the MIT license found in the
* LICENSE file in the root directory of this source tree.
*)
open! IStd
type t
val refresh : t -> unit
(** draw the taskbar *)
val create_multiline : jobs:int -> t
(** creates a multiline task bar for running [jobs] jobs in parallel *)
val create_dummy : unit -> t
(** silent task bar *)
val update_status : t -> slot:int -> Mtime.t -> string -> unit
(** [update_status task_bar ~slot t status] records an event described by [status] on slot [slot]
started at time [t] *)
val set_tasks_total : t -> int -> unit
(** set the total number of tasks to do *)
val tasks_done_reset : t -> unit
(** record that 0 tasks have been completed so far *)
val tasks_done_add : t -> int -> unit
(** record that a number of tasks have been completed *)
val finish : t -> unit
(** tear down the task bar and ready the terminal for more output *)
val is_interactive : t -> bool
(** does the task bar expect periodic refresh? *)

@ -6,7 +6,6 @@
*)
open! IStd
module L = Logging
(** Handle timeout events *)
@ -118,6 +117,5 @@ let exe_timeout f x =
None )
~finally:resume_previous_timeout
with SymOp.Analysis_failure_exe kind ->
L.progressbar_timeout_event kind ;
Errdesc.warning_err (State.get_loc ()) "TIMEOUT: %a@." SymOp.pp_failure_kind kind ;
Some kind

@ -31,6 +31,60 @@ let exit = `In_general_prefer_using_Logging_exit_over_Pervasives_exit
module ANSITerminal : module type of ANSITerminal = struct
include ANSITerminal
(* from ANSITerminal_unix.ml but using stderr instead of stdout *)
(* Cursor *)
let set_cursor x y =
if Unix.(isatty stderr) then
if x <= 0 then ( if y > 0 then Printf.eprintf "\027[%id%!" y )
else if (* x > 0 *) y <= 0 then Printf.eprintf "\027[%iG%!" x
else Printf.eprintf "\027[%i;%iH%!" y x
let move_cursor x y =
if Unix.(isatty stderr) then (
if x > 0 then Printf.eprintf "\027[%iC%!" x
else if x < 0 then Printf.eprintf "\027[%iD%!" (-x) ;
if y > 0 then Printf.eprintf "\027[%iB%!" y
else if y < 0 then Printf.eprintf "\027[%iA%!" (-y) )
let save_cursor () = if Unix.(isatty stderr) then Printf.eprintf "\027[s%!"
let restore_cursor () = if Unix.(isatty stderr) then Printf.eprintf "\027[u%!"
let move_bol () =
Out_channel.output_string stderr "\r" ;
Out_channel.flush stderr
(* Erasing *)
let erase loc =
if Unix.(isatty stderr) then (
Out_channel.output_string stderr
( match loc with
| Eol ->
"\027[K"
| Above ->
"\027[1J"
| Below ->
"\027[0J"
| Screen ->
"\027[2J" ) ;
Out_channel.flush stderr )
(* Scrolling *)
let scroll lines =
if Unix.(isatty stderr) then
if lines > 0 then Printf.eprintf "\027[%iS%!" lines
else if lines < 0 then Printf.eprintf "\027[%iT%!" (-lines)
(* /from ANSITerminal_unix.ml but using stderr instead of stdout *)
(* more careful about when the channel is connected to a tty *)
let print_string =
if Unix.(isatty stdout) then print_string else fun _ -> Pervasives.print_string

Loading…
Cancel
Save