[scheduler] [restart] Reschedule incomplete tasks

Summary:
[scheduler] [restart] Reschedule incomplete tasks

1. Lock procedures before analysing them, unlock them afterwards.
2. Raise when the lock isn't available.
3. Reschedule the task when it fails.

Reviewed By: ngorogiannis

Differential Revision: D19601424

fbshipit-source-id: 138acdb56
master
Fernando Gasperi Jabalera 5 years ago committed by Facebook Github Bot
parent 8199ed1555
commit c0a990c47c

@ -5,25 +5,20 @@
* LICENSE file in the root directory of this source tree. * LICENSE file in the root directory of this source tree.
*) *)
open! IStd open! IStd
module L = Logging
[@@@warning "-60"]
module ProcLocker : sig module ProcLocker : sig
val setup : unit -> unit val setup : unit -> unit
[@@warning "-32"]
(** This should be called once before trying to lock Anything. *) (** This should be called once before trying to lock Anything. *)
val try_lock : Procname.t -> bool val try_lock : Procname.t -> bool
[@@warning "-32"] (** true = the lock belongs to the calling process. false = the lock belongs to a different worker *)
(** true = the lock belongs to the calling process false = the lock belongs to a different worker *)
val unlock : Procname.t -> unit val unlock : Procname.t -> unit
[@@warning "-32"]
(** This will work as a cleanup function because after calling unlock all the workers that need an (** This will work as a cleanup function because after calling unlock all the workers that need an
unlocked Proc should find it's summary already Cached. Throws if the lock had not been taken. *) unlocked Proc should find it's summary already Cached. Throws if the lock had not been taken. *)
val clean : unit -> unit val clean : unit -> unit
[@@warning "-32"]
(** This should be called when locks will no longer be used to remove any files or state that's (** This should be called when locks will no longer be used to remove any files or state that's
not necessary. *) not necessary. *)
end = struct end = struct
@ -41,7 +36,7 @@ let of_list (lst : 'a list) : 'a ProcessPool.TaskGenerator.t =
let remaining = ref (Queue.length content) in let remaining = ref (Queue.length content) in
let remaining_tasks () = !remaining in let remaining_tasks () = !remaining in
let is_empty () = Queue.is_empty content in let is_empty () = Queue.is_empty content in
let finished ~completed:_ _work = decr remaining in let finished ~completed work = if completed then decr remaining else Queue.enqueue content work in
let next () = Queue.dequeue content in let next () = Queue.dequeue content in
{remaining_tasks; is_empty; finished; next} {remaining_tasks; is_empty; finished; next}
@ -66,3 +61,39 @@ let make_with_procs_from sources =
let make sources = let make sources =
ProcessPool.TaskGenerator.chain (make_with_procs_from sources) (FileScheduler.make sources) ProcessPool.TaskGenerator.chain (make_with_procs_from sources) (FileScheduler.make sources)
let locked_procs = Stack.create ()
let unlock_all () = Stack.until_empty locked_procs ProcLocker.unlock
let record_locked_proc (pname : Procname.t) = Stack.push locked_procs pname
let if_restart_scheduler f =
match Config.scheduler with File | SyntacticCallGraph -> () | Restart -> f ()
let lock_exn pname =
if_restart_scheduler (fun () ->
if ProcLocker.try_lock pname then record_locked_proc pname
else (
unlock_all () ;
raise ProcessPool.ProcnameAlreadyLocked ) )
let unlock pname =
if_restart_scheduler (fun () ->
match Stack.pop locked_procs with
| None ->
L.die InternalError "Trying to unlock %s but it does not appear to be locked.@."
(Procname.to_string pname)
| Some stack_pname when not (Procname.equal pname stack_pname) ->
L.die InternalError "Trying to unlock %s but top of stack is %s.@."
(Procname.to_string pname) (Procname.to_string stack_pname)
| Some _ ->
ProcLocker.unlock pname )
let setup () = ProcLocker.setup ()
let clean () = ProcLocker.clean ()

@ -6,4 +6,12 @@
*) *)
open! IStd open! IStd
val setup : unit -> unit
val clean : unit -> unit
val lock_exn : Procname.t -> unit
val unlock : Procname.t -> unit
val make : SourceFile.t list -> SchedulerTypes.target ProcessPool.TaskGenerator.t val make : SourceFile.t list -> SchedulerTypes.target ProcessPool.TaskGenerator.t

@ -34,7 +34,9 @@ module Runner = struct
Stdlib.flush_all () ; Stdlib.flush_all () ;
(* Compact heap before forking *) (* Compact heap before forking *)
Gc.compact () ; Gc.compact () ;
ProcessPool.run runner RestartScheduler.setup () ;
let results = ProcessPool.run runner in
RestartScheduler.clean () ; results
end end
let run_sequentially ~(f : 'a doer) (tasks : 'a list) : unit = let run_sequentially ~(f : 'a doer) (tasks : 'a list) : unit =

@ -337,10 +337,14 @@ let analyze_callee ?caller_summary callee =
if callee_should_be_analyzed callee then if callee_should_be_analyzed callee then
match get_callee_proc_desc callee with match get_callee_proc_desc callee with
| Some callee_pdesc -> | Some callee_pdesc ->
Some RestartScheduler.lock_exn callee_pname ;
(run_proc_analysis let callee_summary =
~caller_pdesc:(Option.map ~f:Summary.get_proc_desc caller_summary) run_proc_analysis
callee_pdesc) ~caller_pdesc:(Option.map ~f:Summary.get_proc_desc caller_summary)
callee_pdesc
in
RestartScheduler.unlock callee_pname ;
Some callee_summary
| None -> | None ->
Summary.OnDisk.get callee_pname Summary.OnDisk.get callee_pname
else ( else (

@ -9,6 +9,8 @@ open! IStd
module F = Format module F = Format
module L = Logging module L = Logging
exception ProcnameAlreadyLocked
module TaskGenerator = struct module TaskGenerator = struct
type 'a t = type 'a t =
{ remaining_tasks: unit -> int { remaining_tasks: unit -> int
@ -349,20 +351,24 @@ let rec child_loop ~slot send_to_parent send_final receive_from_parent ~f ~epilo
send_final (FinalCrash slot) ; send_final (FinalCrash slot) ;
true ) ) ) true ) ) )
| Do stuff -> | Do stuff ->
( try f stuff let result =
with e -> try f stuff ; true with
IExn.reraise_if e ~f:(fun () -> | ProcnameAlreadyLocked ->
if Config.keep_going then ( false
L.internal_error "Error in subprocess %d: %a@." slot Exn.pp e ; | e ->
(* do not raise and continue accepting jobs *) IExn.reraise_if e ~f:(fun () ->
false ) if Config.keep_going then (
else ( L.internal_error "Error in subprocess %d: %a@." slot Exn.pp e ;
(* crash hard, but first let the master know that we have crashed *) (* do not raise and continue accepting jobs *)
send_to_parent (Crash slot) ; false )
true ) ) ) ; else (
(* This is temporary. prev_completed should contain the return value of f stuff *) (* crash hard, but first let the master know that we have crashed *)
send_to_parent (Crash slot) ;
true ) ) ;
true
in
child_loop ~slot send_to_parent send_final receive_from_parent ~f ~epilogue child_loop ~slot send_to_parent send_final receive_from_parent ~f ~epilogue
~prev_completed:true ~prev_completed:result
(** Fork a new child and start it so that it is ready for work. (** Fork a new child and start it so that it is ready for work.

@ -7,6 +7,8 @@
open! IStd open! IStd
exception ProcnameAlreadyLocked
module TaskGenerator : sig module TaskGenerator : sig
(** abstraction for generating jobs *) (** abstraction for generating jobs *)
type 'a t = type 'a t =

Loading…
Cancel
Save