[utils] move parts of utils into their own submodules to avoid future dependency cycle

Summary:
This will be needed higher up in the stack because the new `ProcessPool` module
will need to call into `Logging` to refresh the logging formatters to get the
right PID when writing to the log file.

+remove dead code `iter_parallel`

Reviewed By: jberdine

Differential Revision: D5165130

fbshipit-source-id: 95c949b
master
Jules Villard 8 years ago committed by Facebook Github Bot
parent f4b9bb3e3b
commit 0404641ab3

@ -141,7 +141,7 @@ let stats () =
}
let register_report_at_exit file =
Utils.register_epilogue (fun () ->
Epilogues.register ~f:(fun () ->
try
let json_stats = to_json (stats ()) in
try

@ -50,21 +50,21 @@ let run t =
module Runner = struct
type runner =
{ pool : Utils.ProcessPool.t;
{ pool : ProcessPool.t;
all_continuations : closure Queue.t }
let create ~jobs =
{ pool = Utils.ProcessPool.create ~jobs;
{ pool = ProcessPool.create ~jobs;
all_continuations = Queue.create () }
let start runner ~tasks =
let pool = runner.pool in
Queue.enqueue_all runner.all_continuations (Queue.to_list tasks.continuations);
List.iter
~f:(fun x -> Utils.ProcessPool.start_child ~f:(fun f -> f ()) ~pool x)
~f:(fun x -> ProcessPool.start_child ~f:(fun f -> f ()) ~pool x)
tasks.closures
let complete runner =
Utils.ProcessPool.wait_all runner.pool;
ProcessPool.wait_all runner.pool;
Queue.iter ~f:(fun f -> f ()) runner.all_continuations
end

@ -0,0 +1,34 @@
(*
* Copyright (c) 2017 - present Facebook, Inc.
* All rights reserved.
*
* This source code is licensed under the BSD style license found in the
* LICENSE file in the root directory of this source tree. An additional grant
* of patent rights can be found in the PATENTS file in the same directory.
*)
open! IStd
module F = Format
(* Run the epilogues when we get SIGINT (Control-C). We do not want to mask SIGINT unless at least
one epilogue has been registered, so make this value lazy. *)
let activate_run_epilogues_on_signal = lazy (
let run_epilogues_on_signal s =
F.eprintf "*** %s: Caught %s, time to die@." (Filename.basename Sys.executable_name)
(Signal.to_string s);
(* Epilogues are registered with [at_exit] so exiting will make them run. *)
exit 0 in
Signal.Expert.handle Signal.int run_epilogues_on_signal
)
let register ~f desc =
let f_no_exn () =
if not !ProcessPool.in_child then
try
f ()
with exn ->
F.eprintf "Error while running epilogue %s:@ %a.@ Powering through...@." desc Exn.pp exn in
(* We call `exit` in a bunch of places, so register the epilogues with [at_exit]. *)
Pervasives.at_exit f_no_exn;
(* Register signal masking. *)
Lazy.force activate_run_epilogues_on_signal

@ -0,0 +1,13 @@
(*
* Copyright (c) 2017 - present Facebook, Inc.
* All rights reserved.
*
* This source code is licensed under the BSD style license found in the
* LICENSE file in the root directory of this source tree. An additional grant
* of patent rights can be found in the PATENTS file in the same directory.
*)
open! IStd
(** Register a function to run when the program exits or is interrupted. Registered functions are
run in the reverse order in which they were registered. *)
val register : f:(unit -> unit) -> string -> unit

@ -68,8 +68,8 @@ let create_log_file command name_prefix =
if Config.print_logs then (
dup_formatter file_fmt Format.err_formatter
);
Utils.register_epilogue
(fun () -> close_log_file (lazy file_fmt) (lazy chan) (lazy file))
Epilogues.register
~f:(fun () -> close_log_file (lazy file_fmt) (lazy chan) (lazy file))
"log files flushing";
(file_fmt, chan, file)

@ -0,0 +1,52 @@
(*
* Copyright (c) 2017 - present Facebook, Inc.
* All rights reserved.
*
* This source code is licensed under the BSD style license found in the
* LICENSE file in the root directory of this source tree. An additional grant
* of patent rights can be found in the PATENTS file in the same directory.
*)
open! IStd
(* Keep track of whether the current execution is in a child process *)
let in_child = ref false
type t =
{
mutable num_processes : int;
jobs : int;
}
let create ~jobs =
{
num_processes = 0;
jobs;
}
let incr counter =
counter.num_processes <- counter.num_processes + 1
let decr counter =
counter.num_processes <- counter.num_processes - 1
let wait counter =
let _ = Unix.wait `Any in
decr counter
let wait_all counter =
for _ = 1 to counter.num_processes do
wait counter
done
let should_wait counter =
counter.num_processes >= counter.jobs
let start_child ~f ~pool x =
match Unix.fork () with
| `In_the_child ->
in_child := true;
f x;
exit 0
| `In_the_parent _pid ->
incr pool;
if should_wait pool
then wait pool

@ -0,0 +1,24 @@
(*
* Copyright (c) 2017 - present Facebook, Inc.
* All rights reserved.
*
* This source code is licensed under the BSD style license found in the
* LICENSE file in the root directory of this source tree. An additional grant
* of patent rights can be found in the PATENTS file in the same directory.
*)
open! IStd
(** Pool of processes to execute in parallel up to a number of jobs. *)
type t
(** Create a new pool of processes *)
val create : jobs:int -> t
(** Start a new child process in the pool.
If all the jobs are taken, wait until one is free. *)
val start_child : f:('a -> unit) -> pool:t -> 'a -> unit
(** Wait until all the currently executing processes terminate *)
val wait_all : t -> unit
val in_child : bool ref

@ -296,78 +296,3 @@ let compare_versions v1 v2 =
let lv1 = int_list_of_version v1 in
let lv2 = int_list_of_version v2 in
[%compare : int list] lv1 lv2
(* Run the epilogues when we get SIGINT (Control-C). We do not want to mask SIGINT unless at least
one epilogue has been registered, so make this value lazy. *)
let activate_run_epilogues_on_signal = lazy (
let run_epilogues_on_signal s =
F.eprintf "*** %s: Caught %s, time to die@." (Filename.basename Sys.executable_name)
(Signal.to_string s);
(* Epilogues are registered with [at_exit] so exiting will make them run. *)
exit 0 in
Signal.Expert.handle Signal.int run_epilogues_on_signal
)
(* Keep track of whether the current execution is in a child process *)
let in_child = ref false
module ProcessPool = struct
type t =
{
mutable num_processes : int;
jobs : int;
}
let create ~jobs =
{
num_processes = 0;
jobs;
}
let incr counter =
counter.num_processes <- counter.num_processes + 1
let decr counter =
counter.num_processes <- counter.num_processes - 1
let wait counter =
let _ = Unix.wait `Any in
decr counter
let wait_all counter =
for _ = 1 to counter.num_processes do
wait counter
done
let should_wait counter =
counter.num_processes >= counter.jobs
let start_child ~f ~pool x =
match Unix.fork () with
| `In_the_child ->
in_child := true;
f x;
exit 0
| `In_the_parent _pid ->
incr pool;
if should_wait pool
then wait pool
end
let iteri_parallel ~f ?(jobs=1) l =
let pool = ProcessPool.create ~jobs in
List.iteri ~f:(fun i x -> ProcessPool.start_child ~f:(f i) ~pool x) l;
ProcessPool.wait_all pool
let iter_parallel ~f ?(jobs=1) l =
iteri_parallel ~f:(fun _ x -> f x) ~jobs l
let register_epilogue f desc =
let f_no_exn () =
if not !in_child then
try f ()
with exn ->
F.eprintf "Error while running epilogue %s:@ %a.@ Powering through...@." desc Exn.pp exn in
(* We call `exit` in a bunch of places, so register the epilogues with [at_exit]. *)
Pervasives.at_exit f_no_exn;
(* Register signal masking. *)
Lazy.force activate_run_epilogues_on_signal

@ -83,28 +83,3 @@ val suppress_stderr2 : ('a -> 'b -> 'c) -> 'a -> 'b -> 'c
-1 if v1 is older than v2 and 0 if they are the same version.
The versions are strings of the shape "n.m.t", the order is lexicographic. *)
val compare_versions : string -> string -> int
(** Like List.iter but operates in parallel up to a number of jobs *)
val iter_parallel : f:('a -> unit) -> ?jobs:int -> 'a list -> unit
(** Like List.iteri but operates in parallel up to a number of jobs *)
val iteri_parallel : f:(int -> 'a -> unit) -> ?jobs:int -> 'a list -> unit
(** Pool of processes to execute in parallel up to a number of jobs. *)
module ProcessPool : sig
type t
(** Create a new pool of processes *)
val create : jobs:int -> t
(** Start a new child process in the pool.
If all the jobs are taken, wait until one is free. *)
val start_child : f:('a -> unit) -> pool:t -> 'a -> unit
(** Wait until all the currently executing processes terminate *)
val wait_all : t -> unit
end
(** Register a function to run when the program exits or is interrupted. Registered functions are
run in the reverse order in which they were registered. *)
val register_epilogue : (unit -> unit) -> string -> unit

@ -132,13 +132,13 @@ let add_profile_to_pom_in_directory dir =
let infer_pom_path = dir ^/ "pom.xml.infer" in
add_infer_profile maven_pom_path infer_pom_path;
Unix.rename ~src:maven_pom_path ~dst:saved_pom_path;
Utils.register_epilogue
(fun () -> Unix.rename ~src:saved_pom_path ~dst:maven_pom_path)
Epilogues.register
~f:(fun () -> Unix.rename ~src:saved_pom_path ~dst:maven_pom_path)
"restoring Maven's pom.xml to its original state";
Unix.rename ~src:infer_pom_path ~dst:maven_pom_path;
if Config.debug_mode || Config.stats_mode then
Utils.register_epilogue
(fun () -> Unix.rename ~src:maven_pom_path ~dst:infer_pom_path)
Epilogues.register
~f:(fun () -> Unix.rename ~src:maven_pom_path ~dst:infer_pom_path)
"saving infer's pom.xml"
let capture ~prog ~args =

Loading…
Cancel
Save