diff --git a/infer/src/Makefile b/infer/src/Makefile index 0f8bcf4b5..9938bcd40 100644 --- a/infer/src/Makefile +++ b/infer/src/Makefile @@ -162,7 +162,7 @@ endif clusters:=base clang java IR ml_src_files:=$(shell find . -not -path "./*stubs*" -regex '\./[a-zA-Z].*\.ml\(i\)*') -inc_flags:=$(foreach dir,$(shell find . -type d),-I $(dir)) +inc_flags:=$(foreach dir,$(shell find . -not -path './_build*' -type d),-I $(dir)) root_flags:=$(foreach root,$(roots),-r $(root)) cluster_flags:=$(foreach cluster,$(clusters),-c $(cluster)) diff --git a/infer/src/backend/InferAnalyze.ml b/infer/src/backend/InferAnalyze.ml index 38daee123..a0a5d3cd3 100644 --- a/infer/src/backend/InferAnalyze.ml +++ b/infer/src/backend/InferAnalyze.ml @@ -11,24 +11,13 @@ open! IStd module L = Logging (** Create tasks to analyze an execution environment *) -let create_exe_env_tasks source_file exe_env : Tasks.t = - L.progressbar_file () ; - Summary.clear_cache () ; - Typ.Procname.SQLite.clear_cache () ; - Random.self_init () ; - [ (fun () -> - Callbacks.analyze_file exe_env source_file ; - if Config.write_html then Printer.write_all_html_files source_file ) ] - - -(** Create tasks to analyze a cluster *) -let create_source_file_tasks (source_file: SourceFile.t) : Tasks.t = +let analyze_source_file : SourceFile.t Tasks.doer = + fun source_file -> let exe_env = Exe_env.mk () in L.(debug Analysis Medium) "@\nProcessing '%a'@." SourceFile.pp source_file ; - create_exe_env_tasks source_file exe_env - + Callbacks.analyze_file exe_env source_file ; + if Config.write_html then Printer.write_all_html_files source_file -let analyze_source_file source_file = Tasks.run (create_source_file_tasks source_file) let output_json_makefile_stats clusters = let num_files = List.length clusters in @@ -43,23 +32,6 @@ let output_json_makefile_stats clusters = Yojson.Basic.pretty_to_channel f file_stats -let print_legend () = - L.progress "Starting analysis...@\n" ; - L.progress "@\n" ; - L.progress "legend:@." ; - L.progress " \"%s\" analyzing a file@\n" Config.log_analysis_file ; - L.progress " \"%s\" analyzing a procedure@\n" Config.log_analysis_procedure ; - if Config.debug_mode then ( - L.progress " \"%s\" analyzer crashed@\n" Config.log_analysis_crash ; - L.progress " \"%s\" timeout: procedure analysis took too much time@\n" - Config.log_analysis_wallclock_timeout ; - L.progress " \"%s\" timeout: procedure analysis took too many symbolic execution steps@\n" - Config.log_analysis_symops_timeout ; - L.progress " \"%s\" timeout: procedure analysis took too many recursive iterations@\n" - Config.log_analysis_recursion_timeout ) ; - L.progress "@\n@?" - - let source_file_should_be_analyzed ~changed_files source_file = (* whether [fname] is one of the [changed_files] *) let is_changed_file = Option.map changed_files ~f:(SourceFile.Set.mem source_file) in @@ -106,19 +78,16 @@ let main ~changed_files = else "" ) (if Int.equal n_source_files 1 then "" else "s") Config.results_dir ; - print_legend () ; + (* empty all caches to minimize the process heap to have less work to do when forking *) + Summary.clear_cache () ; + Typ.Procname.SQLite.clear_cache () ; + Random.self_init () ; if Int.equal Config.jobs 1 then ( - List.iter ~f:analyze_source_file source_files_to_analyze ; + Tasks.run_sequentially ~f:analyze_source_file source_files_to_analyze ; L.progress "@\nAnalysis finished in %as@." Pp.elapsed_time () ) else ( L.environment_info "Parallel jobs: %d@." Config.jobs ; (* Prepare tasks one cluster at a time while executing in parallel *) - let runner = Tasks.Runner.create ~jobs:Config.jobs in - let analyze source_file = - let tasks = create_source_file_tasks source_file in - let aggregate_tasks = Tasks.aggregate ~size:Config.procedures_per_process tasks in - Tasks.Runner.start runner ~tasks:aggregate_tasks - in - List.iter ~f:analyze source_files_to_analyze ; - Tasks.Runner.complete runner ) ; + let runner = Tasks.Runner.create ~jobs:Config.jobs ~f:analyze_source_file in + Tasks.Runner.run runner ~tasks:source_files_to_analyze ) ; output_json_makefile_stats source_files_to_analyze diff --git a/infer/src/backend/Tasks.ml b/infer/src/backend/Tasks.ml index d0e408a90..4323bd7df 100644 --- a/infer/src/backend/Tasks.ml +++ b/infer/src/backend/Tasks.ml @@ -8,18 +8,23 @@ open! IStd module L = Logging -type task = unit -> unit +type 'a doer = 'a -> unit + +let run_sequentially ~(f: 'a doer) (tasks: 'a list) : unit = + let task_bar = + if Config.show_progress_bar then TaskBar.create_multiline ~jobs:1 else TaskBar.create_dummy () + in + (ProcessPoolState.update_status := + fun t status -> + TaskBar.update_status task_bar ~slot:0 t status ; + TaskBar.refresh task_bar) ; + TaskBar.set_tasks_total task_bar (List.length tasks) ; + TaskBar.tasks_done_reset task_bar ; + List.iter + ~f:(fun task -> f task ; TaskBar.tasks_done_add task_bar 1 ; TaskBar.refresh task_bar) + tasks ; + TaskBar.finish task_bar -type t = task list - -(** Aggregate closures into groups of the given size *) -let aggregate ~size t = - let group_to_closure group () = List.iter ~f:(fun closure -> closure ()) group in - let group_size = if size > 0 then size else List.length t / Config.jobs in - if group_size > 1 then List.chunks_of t ~length:group_size |> List.map ~f:group_to_closure else t - - -let run t = List.iter ~f:(fun f -> f ()) t let fork_protect ~f x = EventLogger.prepare () ; @@ -29,18 +34,26 @@ let fork_protect ~f x = module Runner = struct - type tasks = t - - type t = {pool: ProcessPool.t} - - let create ~jobs = {pool= ProcessPool.create ~jobs} - - let start runner ~tasks = - let pool = runner.pool in + type 'a t = {pool: 'a ProcessPool.t; task_bar: TaskBar.t} + + let create ~jobs ~f = + let task_bar = + if Config.show_progress_bar then TaskBar.create_multiline ~jobs else TaskBar.create_dummy () + in + { pool= + ProcessPool.create ~jobs + ~child_prelude: + ((* hack: run post-fork bookkeeping stuff by passing a dummy function to [fork_protect] *) + fork_protect ~f:(fun () -> () )) + task_bar ~f + ; task_bar } + + + let run runner ~tasks = (* Flush here all buffers to avoid passing unflushed data to forked processes, leading to duplication *) Pervasives.flush_all () ; - List.iter ~f:(fun x -> ProcessPool.start_child ~f:(fun f -> fork_protect ~f ()) ~pool x) tasks - - - let complete runner = ProcessPool.wait_all runner.pool + (* Compact heap before forking *) + Gc.compact () ; + ProcessPool.run runner.pool tasks ; + TaskBar.finish runner.task_bar end diff --git a/infer/src/backend/Tasks.mli b/infer/src/backend/Tasks.mli index 4a3c1ca37..cb539d921 100644 --- a/infer/src/backend/Tasks.mli +++ b/infer/src/backend/Tasks.mli @@ -7,33 +7,21 @@ open! IStd -(** Each task executes a closure *) -type task = unit -> unit +type 'a doer = 'a -> unit -(** A sequence of tasks that can be executed in parallel *) -type t = task list - -val aggregate : size:int -> t -> t -(** Aggregate closures into groups of the given size *) - -val run : t -> unit -(** Run the closures *) +val run_sequentially : f:'a doer -> 'a list -> unit +(** Run the tasks sequentially *) val fork_protect : f:('a -> 'b) -> 'a -> 'b (** does the bookkeeping necessary to safely execute an infer function [f] after a call to fork(2) *) (** A runner accepts new tasks repeatedly for parallel execution *) module Runner : sig - type tasks = t + type 'a t - type t - - val create : jobs:int -> t + val create : jobs:int -> f:'a doer -> 'a t (** Create a runner running [jobs] jobs in parallel *) - val start : t -> tasks:tasks -> unit - (** Start the given tasks with the runner *) - - val complete : t -> unit - (** Wait until all the outstanding tasks are completed *) + val run : 'a t -> tasks:'a list -> unit + (** Start the given tasks with the runner and wait until completion *) end diff --git a/infer/src/backend/ondemand.ml b/infer/src/backend/ondemand.ml index 78eec5d2f..2707b90c4 100644 --- a/infer/src/backend/ondemand.ml +++ b/infer/src/backend/ondemand.ml @@ -28,6 +28,10 @@ let unset_callbacks () = callbacks_ref := None let nesting = ref 0 +(* Remember what the last status sent was so that we can update the status correctly when entering + and exiting nested ondemand analyses. In particular we need to remember the original time.*) +let current_taskbar_status : (Mtime.t * string) option ref = ref None + let is_active, add_active, remove_active = let currently_analyzed = ref Typ.Procname.Set.empty in let is_active proc_name = Typ.Procname.Set.mem proc_name !currently_analyzed @@ -79,6 +83,8 @@ type global_state = ; footprint_mode: bool ; html_formatter: F.formatter ; name_generator: Ident.NameGenerator.t + ; proc_analysis_time: (Mtime.Span.t * string) option + (** the time elapsed doing [status] so far *) ; symexec_state: State.t } let save_global_state () = @@ -90,6 +96,9 @@ let save_global_state () = ; footprint_mode= !Config.footprint ; html_formatter= !Printer.curr_html_formatter ; name_generator= Ident.NameGenerator.get_current () + ; proc_analysis_time= + Option.map !current_taskbar_status ~f:(fun (t0, status) -> + (Mtime.span t0 (Mtime_clock.now ()), status) ) ; symexec_state= State.save_state () } @@ -101,6 +110,14 @@ let restore_global_state st = Printer.curr_html_formatter := st.html_formatter ; Ident.NameGenerator.set_current st.name_generator ; State.restore_state st.symexec_state ; + current_taskbar_status := + Option.map st.proc_analysis_time ~f:(fun (suspended_span, status) -> + (* forget about the time spent doing a nested analysis and resend the status of the outer + analysis with the updated "original" start time *) + let new_t0 = Mtime.sub_span (Mtime_clock.now ()) suspended_span in + let new_t0 = Option.value_exn new_t0 in + !ProcessPoolState.update_status new_t0 status ; + (new_t0, status) ) ; Timeout.resume_previous_timeout () @@ -113,7 +130,6 @@ let run_proc_analysis analyze_proc ~caller_pdesc callee_pdesc = "Elapsed analysis time: %a: %a@\n" Typ.Procname.pp callee_pname Mtime.Span.pp (Mtime_clock.count start_time) in - L.progressbar_procedure () ; if Config.trace_ondemand then L.progress "[%d] run_proc_analysis %a -> %a@." !nesting (Pp.option Typ.Procname.pp) (Option.map caller_pdesc ~f:Procdesc.get_proc_name) @@ -170,7 +186,17 @@ let run_proc_analysis analyze_proc ~caller_pdesc callee_pdesc = let analyze_proc ?caller_pdesc callee_pdesc = let callbacks = Option.value_exn !callbacks_ref in - Some (run_proc_analysis callbacks.analyze_ondemand ~caller_pdesc callee_pdesc) + (* wrap [callbacks.analyze_ondemand] to update the status bar *) + let analyze_proc summary pdesc = + let proc_name = Procdesc.get_proc_name callee_pdesc in + let source_file = (Procdesc.get_attributes callee_pdesc).ProcAttributes.translation_unit in + let t0 = Mtime_clock.now () in + let status = F.asprintf "%a: %a" SourceFile.pp source_file Typ.Procname.pp proc_name in + current_taskbar_status := Some (t0, status) ; + !ProcessPoolState.update_status t0 status ; + callbacks.analyze_ondemand summary pdesc + in + Some (run_proc_analysis analyze_proc ~caller_pdesc callee_pdesc) let analyze_proc_desc ~caller_pdesc callee_pdesc = diff --git a/infer/src/base/Config.ml b/infer/src/base/Config.ml index 3709c845d..f079e8f9a 100644 --- a/infer/src/base/Config.ml +++ b/infer/src/base/Config.ml @@ -205,19 +205,6 @@ let lint_issues_dir_name = "lint_issues" let linters_failed_sentinel_filename = "linters_failed_sentinel" -(** letters used in the analysis output *) -let log_analysis_file = "F" - -let log_analysis_procedure = "." - -let log_analysis_wallclock_timeout = "T" - -let log_analysis_symops_timeout = "S" - -let log_analysis_recursion_timeout = "R" - -let log_analysis_crash = "C" - let manual_buck_compilation_db = "BUCK COMPILATION DATABASE OPTIONS" let manual_buck_flavors = "BUCK FLAVORS OPTIONS" @@ -456,13 +443,7 @@ let linters_def_default_file = linters_def_dir ^/ "linters.al" let wrappers_dir = lib_dir ^/ "wrappers" -let ncpu = - try - Utils.with_process_in "getconf _NPROCESSORS_ONLN 2>/dev/null" (fun chan -> - Scanf.bscanf (Scanf.Scanning.from_channel chan) "%d" (fun n -> n) ) - |> fst - with _ -> 1 - +let ncpu = Setcore.numcores () let os_type = match Sys.os_type with "Win32" -> Win32 | "Cygwin" -> Cygwin | _ -> Unix @@ -2755,7 +2736,7 @@ and procedures_filter = !procedures_filter and procedures_name = !procedures_name -and procedures_per_process = !procedures_per_process +and[@warning "-32"] procedures_per_process = !procedures_per_process and procedures_source_file = !procedures_source_file diff --git a/infer/src/base/Config.mli b/infer/src/base/Config.mli index 3b9a8d011..aac2b3f2b 100644 --- a/infer/src/base/Config.mli +++ b/infer/src/base/Config.mli @@ -126,18 +126,6 @@ val linters_failed_sentinel_filename : string val load_average : float option -val log_analysis_crash : string - -val log_analysis_file : string - -val log_analysis_procedure : string - -val log_analysis_recursion_timeout : string - -val log_analysis_symops_timeout : string - -val log_analysis_wallclock_timeout : string - val max_widens : int val meet_level : int @@ -540,8 +528,6 @@ val print_using_diff : bool val printf_args : bool -val procedures_per_process : int - val procs_csv : string option val project_root : string diff --git a/infer/src/base/Logging.ml b/infer/src/base/Logging.ml index 1e5bfefe5..a62257d7f 100644 --- a/infer/src/base/Logging.ml +++ b/infer/src/base/Logging.ml @@ -201,29 +201,6 @@ let phase fmt = log ~to_console:false phase_file_fmts fmt let progress fmt = log ~to_console:(not Config.quiet) progress_file_fmts fmt -let progress_bar text = - log - ~to_console:(Config.show_progress_bar && not Config.quiet) - ~to_file:true progress_file_fmts "%s@?" text - - -let progressbar_file () = progress_bar Config.log_analysis_file - -let progressbar_procedure () = progress_bar Config.log_analysis_procedure - -let progressbar_timeout_event failure_kind = - if Config.debug_mode then - match failure_kind with - | SymOp.FKtimeout -> - progress_bar Config.log_analysis_wallclock_timeout - | SymOp.FKsymops_timeout _ -> - progress_bar Config.log_analysis_symops_timeout - | SymOp.FKrecursion_timeout _ -> - progress_bar Config.log_analysis_recursion_timeout - | SymOp.FKcrash msg -> - progress_bar (Printf.sprintf "%s(%s)" Config.log_analysis_crash msg) - - let user_warning fmt = log ~to_console:(not Config.quiet) user_warning_file_fmts fmt let user_error fmt = log ~to_console:true user_error_file_fmts fmt diff --git a/infer/src/base/Logging.mli b/infer/src/base/Logging.mli index 17c85a39e..30d48533b 100644 --- a/infer/src/base/Logging.mli +++ b/infer/src/base/Logging.mli @@ -23,15 +23,6 @@ val environment_info : ('a, F.formatter, unit) format -> 'a val progress : ('a, F.formatter, unit) format -> 'a (** print immediately to standard error unless --quiet is specified *) -val progressbar_file : unit -> unit -(** Progress bar: start of the analysis of a file. *) - -val progressbar_procedure : unit -> unit -(** Progress bar: start of the analysis of a procedure. *) - -val progressbar_timeout_event : SymOp.failure_kind -> unit -(** Progress bar: log a timeout event if in developer mode. *) - val result : ('a, F.formatter, unit) format -> 'a (** Emit a result to stdout. Use only if the output format is stable and useful enough that it may conceivably get piped to another program, ie, almost never (use [progress] instead otherwise). diff --git a/infer/src/base/ProcessPool.ml b/infer/src/base/ProcessPool.ml index bd1acdcf8..2c38ddc7d 100644 --- a/infer/src/base/ProcessPool.ml +++ b/infer/src/base/ProcessPool.ml @@ -4,37 +4,217 @@ * This source code is licensed under the MIT license found in the * LICENSE file in the root directory of this source tree. *) + open! IStd +module F = Format module L = Logging -type t = {mutable num_processes: int; jobs: int} +(** the state of the pool *) +type 'a t = + { jobs: int + (** number of jobs running in parallel, i.e. number of children we are responsible for *) + ; slots: (Pid.t * Out_channel.t) Array.t + (** array of child processes with their pids and channels we can use to send work down to + each child *) + ; children_updates: Unix.File_descr.t + (** all the children send updates up the same pipe to the pool *) + ; task_bar: TaskBar.t + ; mutable tasks: 'a list (** work remaining to be done *) + ; mutable idle_children: int + (** number of children currently ready for more work, but there are no tasks to send to + them *) + } + +(** {2 Constants} *) + +(** refresh rate of the task bar (worst case: it also refreshes on children updates) *) +let refresh_timeout = + let frames_per_second = 12 in + `After (Time_ns.Span.of_int_ms (1_000 / frames_per_second)) + + +(** size of the buffer for communicating with children --standard pipe buffer size *) +let buffer_size = 65_535 + +(** {2 parmap} *) + +(** Messages from child processes to the parent process. Each message includes the identity of the + child sending the process as its index (slot) in the array [pool.slots]. + + LIMITATION: the messages must not be bigger than [buffer_size] once marshalled, or reading from + the pipe will crash in the parent process. This is a limitation of the way we read from the pipe + for now. To lift it, it should be possible to extend the buffer to the required length if we + notice that we are trying to read more than [buffer_size] for example. *) +type worker_message = + | UpdateStatus of int * Mtime.t * string + (** [(i, t, status)]: starting a task from slot [i], at start time [t], with description + [status]. Watch out that [status] must not be too close in length to [buffer_size]. *) + | Ready of int (** finished the given task, ready to receive messages *) + +(** messages from the parent process down to worker processes *) +type 'a boss_message = + | Do of 'a (** a task to do *) + | GoHome (** all tasks done, prepare for teardown *) -let create ~jobs = {num_processes= 0; jobs} +(** convenience function to send data down pipes without forgetting to flush *) +let marshal_to_pipe fd x = Marshal.to_channel fd x [] ; Out_channel.flush fd -let incr counter = counter.num_processes <- counter.num_processes + 1 +(** like [Unix.read] but reads until [len] bytes have been read *) +let rec really_read ?(pos= 0) ~len fd ~buf = + if len <= 0 then () + else + let read = Unix.read ~pos ~len fd ~buf in + if Int.equal read 0 then raise End_of_file ; + really_read ~pos:(pos + read) ~len:(len - read) fd ~buf -let decr counter = counter.num_processes <- counter.num_processes - 1 -let wait counter = - match Unix.wait `Any with - | _, Ok _ -> - decr counter - | _, (Error _ as status) -> - let log_or_die = if Config.keep_going then L.internal_error else L.die InternalError in - log_or_die "Error in infer subprocess: %s@." (Unix.Exit_or_signal.to_string_hum status) ; - decr counter +(** main dispatch function that responds to messages from worker processes and updates the taskbar + periodically *) +let process_updates pool buffer = + let timeout = if TaskBar.is_interactive pool.task_bar then refresh_timeout else `Never in + (* Use select(2) so that we can both wait on the pipe of children updates and wait for a + timeout. The timeout is for giving a chance to the taskbar of refreshing from time to time. *) + let {Unix.Select_fds.read= read_fds} = + Unix.select ~read:[pool.children_updates] ~write:[] ~except:[] ~timeout () + in + match read_fds with + | _ :: _ :: _ -> + assert false + | [] -> + () + | [_updates_fd] -> + (* Read one OCaml value at a time. This is done by first reading the header of the marshalled + value (fixed size), then get the total size of the data from that header, then request a + read of the full OCaml value. + This way the buffer is used for only one OCaml value at a time. This is simpler (values do + not overlap across the end of a read and the beginning of another) and means we do not need + a large buffer as long as messages are never bigger than the buffer. -let wait_all counter = for _ = 1 to counter.num_processes do wait counter done + This works somewhat like [Marshal.from_channel] but uses the file descriptor directly + instead of an [in_channel]. Do *not* read from the pipe via an [in_channel] as they read + as much as possible eagerly. This can empty the pipe without us having a way to tell that + there is more to read anymore since the [select] call will return that there is nothing to + read. *) + really_read pool.children_updates ~buf:buffer ~len:Marshal.header_size ; + let data_size = Marshal.data_size buffer 0 in + really_read pool.children_updates ~buf:buffer ~pos:Marshal.header_size ~len:data_size ; + match Marshal.from_bytes buffer 0 with + | UpdateStatus (slot, t, status) -> + TaskBar.update_status pool.task_bar ~slot t status + | Ready slot -> + TaskBar.tasks_done_add pool.task_bar 1 ; + match pool.tasks with + | [] -> + TaskBar.update_status pool.task_bar ~slot (Mtime_clock.now ()) "idle" ; + pool.idle_children <- pool.idle_children + 1 + | x :: tasks -> + pool.tasks <- tasks ; + let pipe = snd (pool.slots).(slot) in + marshal_to_pipe pipe (Do x) -let should_wait counter = counter.num_processes >= counter.jobs -let start_child ~f ~pool x = +(** terminate all worker processes *) +let wait_all pool = + (* tell all workers to go home *) + Array.iter pool.slots ~f:(fun (_, pipe) -> marshal_to_pipe pipe GoHome ; Out_channel.close pipe) ; + (* wait(2) all the pids one by one; the order doesn't matter since we want to wait for all of them + eventually anyway. *) + let errors = + Array.fold ~init:[] pool.slots ~f:(fun errors (pid, _) -> + match Unix.wait (`Pid pid) with + | _pid, Ok () -> + errors + | _pid, (Error _ as status) -> + (* Collect all children errors and die only at the end to avoid creating zombies. *) + status :: errors ) + in + if not (List.is_empty errors) then + let log_or_die = if Config.keep_going then L.internal_error else L.die InternalError in + let pp_error f status = + F.fprintf f "Error in infer subprocess: %s@." (Unix.Exit_or_signal.to_string_hum status) + in + log_or_die "@[%a@]%!" (Pp.seq ~print_env:Pp.text_break ~sep:"" pp_error) errors + + +(** worker loop: wait for tasks and run [f] on them until we are told to go home *) +let rec child_loop ~slot send_to_parent receive_from_parent ~f = + send_to_parent (Ready slot) ; + match receive_from_parent () with + | Do stuff -> + (* TODO: error handling *) + f stuff ; + child_loop ~slot send_to_parent receive_from_parent ~f + | GoHome -> + () + + +(** Fork a new child and start it so that it is ready for work. + + The child inherits [updates_w] to send updates up to the parent, and a new pipe is set up for + the parent to send instructions down to the child. *) +let fork_child ~child_prelude ~slot (updates_r, updates_w) ~f = + let to_child_r, to_child_w = Unix.pipe () in match Unix.fork () with | `In_the_child -> + let[@warning "-26"] updates_r = Unix.close updates_r in + let[@warning "-26"] to_child_w = Unix.close to_child_w in + (* Pin to a core. [setcore] does the modulo for us. *) + Setcore.setcore slot ; ProcessPoolState.in_child := true ; - f x ; + child_prelude () ; + let updates_oc = Unix.out_channel_of_descr updates_w in + let send_to_parent (message: worker_message) = marshal_to_pipe updates_oc message in + (* Function to send updates up the pipe to the parent instead of directly to the task + bar. This is because only the parent knows about all the children, hence it's in charge of + actually updating the task bar. *) + let update_status t status = + let status = + (* Truncate status if too big: it's pointless to spam the status bar with long status, and + also difficult to achieve technically over pipes (it's easier if all the messages fit + into a buffer of reasonable size). *) + if String.length status > 100 then String.subo ~len:100 status ^ "..." else status + in + send_to_parent (UpdateStatus (slot, t, status)) + in + ProcessPoolState.update_status := update_status ; + let orders_ic = Unix.in_channel_of_descr to_child_r in + let receive_from_parent () = Marshal.from_channel orders_ic in + child_loop ~slot send_to_parent receive_from_parent ~f ; + Out_channel.close updates_oc ; + In_channel.close orders_ic ; Pervasives.exit 0 - | `In_the_parent _pid -> - incr pool ; - if should_wait pool then wait pool + | `In_the_parent pid -> + let[@warning "-26"] to_child_r = Unix.close to_child_r in + (pid, Unix.out_channel_of_descr to_child_w) + + +let create : jobs:int -> child_prelude:(unit -> unit) -> TaskBar.t -> f:('a -> unit) -> 'a t = + fun ~jobs ~child_prelude task_bar ~f -> + (* Pipe to communicate from children to parent. Only one pipe is needed: the messages sent by + children include the identifier of the child sending the message (its [slot]). This way there + is only one pipe to wait on for updates. *) + let (pipe_child_r, pipe_child_w) as status_pipe = Unix.pipe () in + let slots = Array.init jobs ~f:(fun slot -> fork_child ~child_prelude ~slot status_pipe ~f) in + (* we have forked the child processes and are now in the parent *) + let[@warning "-26"] pipe_child_w = Unix.close pipe_child_w in + let children_updates = pipe_child_r in + {slots; children_updates; jobs; task_bar; tasks= []; idle_children= 0} + + +let run pool tasks = + pool.tasks <- tasks ; + TaskBar.set_tasks_total pool.task_bar (List.length tasks) ; + TaskBar.tasks_done_reset pool.task_bar ; + (* Start with a negative number of completed tasks to account for the initial [Ready] + messages. All the children start by sending [Ready], which is interpreted by the parent process + as "one task has been completed". Starting with a negative number is a simple if hacky way to + account for these spurious "done" tasks. *) + TaskBar.tasks_done_add pool.task_bar (-pool.jobs) ; + (* allocate a buffer for reading children updates once for the whole run *) + let buffer = Bytes.create buffer_size in + (* wait for all children to run out of tasks *) + while pool.idle_children < pool.jobs do + process_updates pool buffer ; TaskBar.refresh pool.task_bar + done ; + wait_all pool diff --git a/infer/src/base/ProcessPool.mli b/infer/src/base/ProcessPool.mli index 5673e90ab..23f5412a1 100644 --- a/infer/src/base/ProcessPool.mli +++ b/infer/src/base/ProcessPool.mli @@ -7,15 +7,27 @@ open! IStd -(** Pool of processes to execute in parallel up to a number of jobs. *) -type t +(** Pool of parallel workers that can both receive tasks from the master process and start doing + tasks on their own. Unix pipes are used for communication, all while refreshing a task bar + periodically. -val create : jobs:int -> t -(** Create a new pool of processes *) + Due to ondemand analysis, workers may do tasks unprompted (eg, when analysing a procedure, a + process will typically end up analysing all its callees). Thus, children need to update the main + process (which is in charge of the task bar) whenever they start analysing a new procedure, and + whenever they resume analysing a previous procedure. This is more complicated than what, eg, + `ParMap` can handle because of the bidirectional flow between children and parents. -val start_child : f:('a -> unit) -> pool:t -> 'a -> unit -(** Start a new child process in the pool. - If all the jobs are taken, wait until one is free. *) + The children send "Ready" or "I'm working on task " messages that are used to + respectively send them more tasks ("Do x") or update the task bar with the description provided + by the child. -val wait_all : t -> unit -(** Wait until all the currently executing processes terminate *) + See also {!module-ProcessPoolState}. *) + +(** A ['a t] process pool accepts tasks of type ['a]. ['a] will be marshalled over a Unix pipe.*) +type _ t + +val create : jobs:int -> child_prelude:(unit -> unit) -> TaskBar.t -> f:('a -> unit) -> 'a t +(** Create a new pool of processes running [jobs] jobs in parallel *) + +val run : 'a t -> 'a list -> unit +(** use the processes in the given process pool to run all the given tasks in parallel *) diff --git a/infer/src/base/ProcessPoolState.ml b/infer/src/base/ProcessPoolState.ml index f8bbe0e93..8ccc81ddf 100644 --- a/infer/src/base/ProcessPoolState.ml +++ b/infer/src/base/ProcessPoolState.ml @@ -9,3 +9,5 @@ open! IStd (** Keep track of whether the current execution is in a child process *) let in_child = ref false + +let update_status = ref (fun _ _ -> ()) diff --git a/infer/src/base/ProcessPoolState.mli b/infer/src/base/ProcessPoolState.mli index 8748a887a..56aedc4b3 100644 --- a/infer/src/base/ProcessPoolState.mli +++ b/infer/src/base/ProcessPoolState.mli @@ -7,3 +7,7 @@ val in_child : bool ref (** Keep track of whether the current execution is in a child process *) + +val update_status : (Mtime.t -> string -> unit) ref +(** Ping the task bar whenever a new task is started with the start time and a description for the + task *) diff --git a/infer/src/base/TaskBar.ml b/infer/src/base/TaskBar.ml new file mode 100644 index 000000000..760622637 --- /dev/null +++ b/infer/src/base/TaskBar.ml @@ -0,0 +1,154 @@ +(* + * Copyright (c) 2018-present, Facebook, Inc. + * + * This source code is licensed under the MIT license found in the + * LICENSE file in the root directory of this source tree. + *) + +open! IStd +module L = Logging + +(** arbitrary *) +let progress_bar_total_size_default = 60 + +(** state of a multi-line task bar *) +type multiline_info = + { jobs: int (** number of jobs running in parallel *) + ; statuses: string Array.t + (** array of size [jobs] with a description of what the process is doing *) + ; start_times: Mtime.t Array.t (** array of size [jobs] of start times for each process *) + ; mutable tasks_done: int + ; mutable tasks_total: int } + +type t = + | MultiLine of multiline_info (** interactive *) + | NonInteractive (** display terse progress, to use when output is redirected *) + | Dummy (** ignore everything *) + +(** print [c] [n] times *) +let rec pp_n c oc n = + if n > 0 then ( + Out_channel.output_char oc c ; + pp_n c oc (n - 1) ) + + +let progress_bar_total_size = + lazy + ( if Unix.(isatty stdin) then + let term_width, _ = ANSITerminal.size () in + min progress_bar_total_size_default term_width + else progress_bar_total_size_default ) + + +let draw_progress_bar ~total ~don = + let lazy progress_bar_total_size = progress_bar_total_size in + let bar_done_size = don * progress_bar_total_size / total in + let tasks_total_string = Int.to_string total in + let bar_tasks_num_size = String.length tasks_total_string in + Printf.eprintf "%*d/%s [%a%a] %d%%\n" bar_tasks_num_size don tasks_total_string (pp_n '#') + bar_done_size (pp_n '.') + (progress_bar_total_size - bar_done_size) + (don * 100 / total) + + +let draw_job_status ~draw_time t ~status ~t0 = + ANSITerminal.(prerr_string [Bold; magenta]) "⊢ " ; + ( if draw_time then + let time_running = Mtime.span t0 t |> Mtime.Span.to_s in + Printf.eprintf "[%4.1fs] " time_running ) ; + Out_channel.output_string stderr status ; + ANSITerminal.erase Eol ; + Out_channel.output_string stderr "\n" + + +let refresh_multiline task_bar = + ANSITerminal.move_bol () ; + let should_draw_progress_bar = task_bar.tasks_total > 0 && task_bar.tasks_done >= 0 in + if should_draw_progress_bar then + draw_progress_bar ~total:task_bar.tasks_total ~don:task_bar.tasks_done ; + let t = Mtime_clock.now () in + let draw_time = + (* When there is only 1 job we are careful not to spawn processes needlessly, thus there is no + one to refresh the task bar while the analysis is running and the time displayed will always + be 0. Avoid confusion by not displaying the time in that case. *) + task_bar.jobs > 1 + in + Array.iter2_exn task_bar.statuses task_bar.start_times ~f:(fun status t0 -> + draw_job_status ~draw_time t ~status ~t0 ) ; + let lines_printed = + let progress_bar = if should_draw_progress_bar then 1 else 0 in + task_bar.jobs + progress_bar + in + ANSITerminal.move_cursor 0 (-lines_printed) ; + () + + +let refresh = function MultiLine t -> refresh_multiline t | NonInteractive | Dummy -> () + +let create_multiline ~jobs = + if Unix.(isatty stdin) && Unix.(isatty stderr) then ( + let t0 = Mtime_clock.now () in + let task_bar = + { jobs + ; statuses= Array.create ~len:jobs "idle" + ; start_times= Array.create ~len:jobs t0 + ; tasks_done= 0 + ; tasks_total= 0 } + in + ANSITerminal.erase Below ; MultiLine task_bar ) + else NonInteractive + + +let create_dummy () = Dummy + +let update_status_multiline task_bar ~slot:job t0 status = + (task_bar.statuses).(job) <- status ; + (task_bar.start_times).(job) <- t0 ; + () + + +let update_status task_bar ~slot t0 status = + match task_bar with + | MultiLine t -> + update_status_multiline t ~slot t0 status + | NonInteractive | Dummy -> + () + + +let set_tasks_total task_bar n = + match task_bar with + | MultiLine multiline -> + multiline.tasks_total <- n + | NonInteractive | Dummy -> + () + + +let tasks_done_add task_bar n = + match task_bar with + | MultiLine multiline -> + multiline.tasks_done <- multiline.tasks_done + n + | NonInteractive -> + L.progress "#%!" + | Dummy -> + () + + +let tasks_done_reset task_bar = + match task_bar with + | MultiLine multiline -> + multiline.tasks_done <- 0 + | NonInteractive | Dummy -> + () + + +let finish = function + | MultiLine _ -> + (* leave the progress bar displayed *) + Out_channel.output_string stderr "\n" ; + ANSITerminal.erase Below ; + Out_channel.flush stderr + | NonInteractive | Dummy -> + () + + +let is_interactive = function MultiLine _ -> true | NonInteractive | Dummy -> false diff --git a/infer/src/base/TaskBar.mli b/infer/src/base/TaskBar.mli new file mode 100644 index 000000000..cbb026038 --- /dev/null +++ b/infer/src/base/TaskBar.mli @@ -0,0 +1,38 @@ +(* + * Copyright (c) 2018-present, Facebook, Inc. + * + * This source code is licensed under the MIT license found in the + * LICENSE file in the root directory of this source tree. + *) + +open! IStd + +type t + +val refresh : t -> unit +(** draw the taskbar *) + +val create_multiline : jobs:int -> t +(** creates a multiline task bar for running [jobs] jobs in parallel *) + +val create_dummy : unit -> t +(** silent task bar *) + +val update_status : t -> slot:int -> Mtime.t -> string -> unit +(** [update_status task_bar ~slot t status] records an event described by [status] on slot [slot] + started at time [t] *) + +val set_tasks_total : t -> int -> unit +(** set the total number of tasks to do *) + +val tasks_done_reset : t -> unit +(** record that 0 tasks have been completed so far *) + +val tasks_done_add : t -> int -> unit +(** record that a number of tasks have been completed *) + +val finish : t -> unit +(** tear down the task bar and ready the terminal for more output *) + +val is_interactive : t -> bool +(** does the task bar expect periodic refresh? *) diff --git a/infer/src/biabduction/Timeout.ml b/infer/src/biabduction/Timeout.ml index 10fd44310..d7adff3d7 100644 --- a/infer/src/biabduction/Timeout.ml +++ b/infer/src/biabduction/Timeout.ml @@ -6,7 +6,6 @@ *) open! IStd -module L = Logging (** Handle timeout events *) @@ -118,6 +117,5 @@ let exe_timeout f x = None ) ~finally:resume_previous_timeout with SymOp.Analysis_failure_exe kind -> - L.progressbar_timeout_event kind ; Errdesc.warning_err (State.get_loc ()) "TIMEOUT: %a@." SymOp.pp_failure_kind kind ; Some kind diff --git a/infer/src/istd/IStd.ml b/infer/src/istd/IStd.ml index 4766372cd..681f103be 100644 --- a/infer/src/istd/IStd.ml +++ b/infer/src/istd/IStd.ml @@ -31,6 +31,60 @@ let exit = `In_general_prefer_using_Logging_exit_over_Pervasives_exit module ANSITerminal : module type of ANSITerminal = struct include ANSITerminal + (* from ANSITerminal_unix.ml but using stderr instead of stdout *) + (* Cursor *) + + let set_cursor x y = + if Unix.(isatty stderr) then + if x <= 0 then ( if y > 0 then Printf.eprintf "\027[%id%!" y ) + else if (* x > 0 *) y <= 0 then Printf.eprintf "\027[%iG%!" x + else Printf.eprintf "\027[%i;%iH%!" y x + + + let move_cursor x y = + if Unix.(isatty stderr) then ( + if x > 0 then Printf.eprintf "\027[%iC%!" x + else if x < 0 then Printf.eprintf "\027[%iD%!" (-x) ; + if y > 0 then Printf.eprintf "\027[%iB%!" y + else if y < 0 then Printf.eprintf "\027[%iA%!" (-y) ) + + + let save_cursor () = if Unix.(isatty stderr) then Printf.eprintf "\027[s%!" + + let restore_cursor () = if Unix.(isatty stderr) then Printf.eprintf "\027[u%!" + + let move_bol () = + Out_channel.output_string stderr "\r" ; + Out_channel.flush stderr + + + (* Erasing *) + + let erase loc = + if Unix.(isatty stderr) then ( + Out_channel.output_string stderr + ( match loc with + | Eol -> + "\027[K" + | Above -> + "\027[1J" + | Below -> + "\027[0J" + | Screen -> + "\027[2J" ) ; + Out_channel.flush stderr ) + + + (* Scrolling *) + + let scroll lines = + if Unix.(isatty stderr) then + if lines > 0 then Printf.eprintf "\027[%iS%!" lines + else if lines < 0 then Printf.eprintf "\027[%iT%!" (-lines) + + + (* /from ANSITerminal_unix.ml but using stderr instead of stdout *) + (* more careful about when the channel is connected to a tty *) let print_string = if Unix.(isatty stdout) then print_string else fun _ -> Pervasives.print_string