Add task_result type for scheduler analysis tasks

Summary:
The RestartScheduler needs to know if the worker finished it's task
because:
  1. there was no more work to do or
  2. found that a needed Procname was already taken (this part is not yet implemented)

This need was addressed by (i) making the functions that the workers execute
return a value of task_result.t intead of unit and (ii) adding a
constructor to the worker_message.t (FinishedTask).

Reviewed By: ngorogiannis

Differential Revision: D19467783

fbshipit-source-id: a76b02b6c
master
Fernando Gasperi Jabalera 5 years ago committed by Facebook Github Bot
parent fe736f4151
commit 11300370ed

@ -8,10 +8,10 @@ open! IStd
let of_list (lst : 'a list) : 'a ProcessPool.TaskGenerator.t =
let content = Queue.of_list lst in
let length = ref (Queue.length content) in
let remaining_tasks () = !length in
let remaining = ref (Queue.length content) in
let remaining_tasks () = !remaining in
let is_empty () = Queue.is_empty content in
let finished _finished_item = decr length in
let finished ~completed:_ _work = decr remaining in
let next () = Queue.dequeue content in
{remaining_tasks; is_empty; finished; next}

@ -92,13 +92,13 @@ let bottom_up sources : SchedulerTypes.target ProcessPool.TaskGenerator.t =
CallGraph.flag syntactic_call_graph n.pname ;
Some (Procname n.pname)
in
let finished = function
| File _ ->
assert false
let finished ~completed:_ = function
| Procname pname ->
decr remaining ;
scheduled := Procname.Set.remove pname !scheduled ;
CallGraph.remove syntactic_call_graph pname
| File _ ->
L.die InternalError "Only Procnames are scheduled but File target was received"
in
let next () =
(* do construction here, to avoid having the call graph into forked workers *)

@ -48,7 +48,9 @@ let run_sequentially ~(f : 'a doer) (tasks : 'a list) : unit =
TaskBar.tasks_done_reset task_bar ;
let rec run_tasks () =
if not (task_generator.is_empty ()) then (
Option.iter (task_generator.next ()) ~f:(fun t -> f t ; task_generator.finished t) ;
Option.iter (task_generator.next ()) ~f:(fun t ->
f t ;
task_generator.finished ~completed:true t ) ;
TaskBar.set_remaining_tasks task_bar (task_generator.remaining_tasks ()) ;
TaskBar.refresh task_bar ;
run_tasks () )

@ -13,7 +13,7 @@ module TaskGenerator = struct
type 'a t =
{ remaining_tasks: unit -> int
; is_empty: unit -> bool
; finished: 'a -> unit
; finished: completed:bool -> 'a -> unit
; next: unit -> 'a option }
let chain (gen1 : 'a t) (gen2 : 'a t) : 'a t =
@ -24,7 +24,10 @@ module TaskGenerator = struct
!gen1_returned_empty
in
let is_empty () = gen1_is_empty () && gen2.is_empty () in
let finished x = if gen1_is_empty () then gen2.finished x else gen1.finished x in
let finished ~completed work_item =
if gen1_is_empty () then gen2.finished ~completed work_item
else gen1.finished ~completed work_item
in
let next x = if gen1_is_empty () then gen2.next x else gen1.next x in
{remaining_tasks; is_empty; finished; next}
@ -34,7 +37,7 @@ module TaskGenerator = struct
let length = ref (List.length lst) in
let remaining_tasks () = !length in
let is_empty () = List.is_empty !content in
let finished _finished_item = decr length in
let finished ~completed:_ _work_item = decr length in
let next () =
match !content with
| [] ->
@ -98,7 +101,7 @@ type worker_message =
| UpdateStatus of int * Mtime.t * string
(** [(i, t, status)]: starting a task from slot [i], at start time [t], with description
[status]. Watch out that [status] must not be too close in length to [buffer_size]. *)
| Ready of int
| Ready of {worker: int; completed: bool}
(** Sent after finishing initializing or after finishing a given task. When received by
master, this moves the worker state from [Initializing] or [Processing _] to [Idle]. *)
| Crash of int (** there was an error and the child is no longer receiving messages *)
@ -260,12 +263,12 @@ let process_updates pool buffer =
(* clean crash, give the child process a chance to cleanup *)
Unix.wait (`Pid pid) |> ignore ;
killall pool ~slot "see backtrace above"
| Ready slot ->
| Ready {worker= slot; completed} ->
( match pool.children_states.(slot) with
| Processing work ->
pool.tasks.finished work
| Initializing ->
()
| Processing work ->
pool.tasks.finished ~completed work
| Idle ->
L.die InternalError "Received a Ready message from an idle worker@." ) ;
TaskBar.set_remaining_tasks pool.task_bar (pool.tasks.remaining_tasks ()) ;
@ -327,8 +330,9 @@ let wait_all pool =
(** worker loop: wait for tasks and run [f] on them until we are told to go home *)
let rec child_loop ~slot send_to_parent send_final receive_from_parent ~f ~epilogue =
send_to_parent (Ready slot) ;
let rec child_loop ~slot send_to_parent send_final receive_from_parent ~f ~epilogue ~prev_completed
=
send_to_parent (Ready {worker= slot; completed= prev_completed}) ;
match receive_from_parent () with
| GoHome -> (
match epilogue () with
@ -356,7 +360,9 @@ let rec child_loop ~slot send_to_parent send_final receive_from_parent ~f ~epilo
(* crash hard, but first let the master know that we have crashed *)
send_to_parent (Crash slot) ;
true ) ) ) ;
(* This is temporary. prev_completed should contain the return value of f stuff *)
child_loop ~slot send_to_parent send_final receive_from_parent ~f ~epilogue
~prev_completed:true
(** Fork a new child and start it so that it is ready for work.
@ -402,7 +408,8 @@ let fork_child ~file_lock ~child_prelude ~slot (updates_r, updates_w) ~f ~epilog
PerfEvent.(log (fun logger -> log_end_event logger ())) ;
x
in
child_loop ~slot send_to_parent send_final receive_from_parent ~f ~epilogue ;
child_loop ~slot send_to_parent send_final receive_from_parent ~f ~epilogue
~prev_completed:true ;
Out_channel.close updates_oc ;
In_channel.close orders_ic ;
Epilogues.run () ;

@ -15,7 +15,7 @@ module TaskGenerator : sig
not a bug *)
; is_empty: unit -> bool
(** when should the main loop of the task manager stop expecting new tasks *)
; finished: 'a -> unit
; finished: completed:bool -> 'a -> unit
(** Process pool calls [finished x] when a worker finishes item [x]. This is only called
if [next ()] has previously returned [Some x] and [x] was sent to a worker. *)
; next: unit -> 'a option

Loading…
Cancel
Save