From 11300370eddc4b79563d73f5b69b9b95da369a06 Mon Sep 17 00:00:00 2001 From: Fernando Gasperi Jabalera Date: Thu, 23 Jan 2020 05:57:27 -0800 Subject: [PATCH] Add task_result type for scheduler analysis tasks Summary: The RestartScheduler needs to know if the worker finished it's task because: 1. there was no more work to do or 2. found that a needed Procname was already taken (this part is not yet implemented) This need was addressed by (i) making the functions that the workers execute return a value of task_result.t intead of unit and (ii) adding a constructor to the worker_message.t (FinishedTask). Reviewed By: ngorogiannis Differential Revision: D19467783 fbshipit-source-id: a76b02b6c --- infer/src/backend/RestartScheduler.ml | 6 +++--- infer/src/backend/SyntacticCallGraph.ml | 6 +++--- infer/src/backend/Tasks.ml | 4 +++- infer/src/base/ProcessPool.ml | 27 ++++++++++++++++--------- infer/src/base/ProcessPool.mli | 2 +- 5 files changed, 27 insertions(+), 18 deletions(-) diff --git a/infer/src/backend/RestartScheduler.ml b/infer/src/backend/RestartScheduler.ml index 08cb4de44..69b7c10fc 100644 --- a/infer/src/backend/RestartScheduler.ml +++ b/infer/src/backend/RestartScheduler.ml @@ -8,10 +8,10 @@ open! IStd let of_list (lst : 'a list) : 'a ProcessPool.TaskGenerator.t = let content = Queue.of_list lst in - let length = ref (Queue.length content) in - let remaining_tasks () = !length in + let remaining = ref (Queue.length content) in + let remaining_tasks () = !remaining in let is_empty () = Queue.is_empty content in - let finished _finished_item = decr length in + let finished ~completed:_ _work = decr remaining in let next () = Queue.dequeue content in {remaining_tasks; is_empty; finished; next} diff --git a/infer/src/backend/SyntacticCallGraph.ml b/infer/src/backend/SyntacticCallGraph.ml index 6b1057668..3db902d15 100644 --- a/infer/src/backend/SyntacticCallGraph.ml +++ b/infer/src/backend/SyntacticCallGraph.ml @@ -92,13 +92,13 @@ let bottom_up sources : SchedulerTypes.target ProcessPool.TaskGenerator.t = CallGraph.flag syntactic_call_graph n.pname ; Some (Procname n.pname) in - let finished = function - | File _ -> - assert false + let finished ~completed:_ = function | Procname pname -> decr remaining ; scheduled := Procname.Set.remove pname !scheduled ; CallGraph.remove syntactic_call_graph pname + | File _ -> + L.die InternalError "Only Procnames are scheduled but File target was received" in let next () = (* do construction here, to avoid having the call graph into forked workers *) diff --git a/infer/src/backend/Tasks.ml b/infer/src/backend/Tasks.ml index e994c4687..9ed9e8d20 100644 --- a/infer/src/backend/Tasks.ml +++ b/infer/src/backend/Tasks.ml @@ -48,7 +48,9 @@ let run_sequentially ~(f : 'a doer) (tasks : 'a list) : unit = TaskBar.tasks_done_reset task_bar ; let rec run_tasks () = if not (task_generator.is_empty ()) then ( - Option.iter (task_generator.next ()) ~f:(fun t -> f t ; task_generator.finished t) ; + Option.iter (task_generator.next ()) ~f:(fun t -> + f t ; + task_generator.finished ~completed:true t ) ; TaskBar.set_remaining_tasks task_bar (task_generator.remaining_tasks ()) ; TaskBar.refresh task_bar ; run_tasks () ) diff --git a/infer/src/base/ProcessPool.ml b/infer/src/base/ProcessPool.ml index 336c8cd34..aceba3bc2 100644 --- a/infer/src/base/ProcessPool.ml +++ b/infer/src/base/ProcessPool.ml @@ -13,7 +13,7 @@ module TaskGenerator = struct type 'a t = { remaining_tasks: unit -> int ; is_empty: unit -> bool - ; finished: 'a -> unit + ; finished: completed:bool -> 'a -> unit ; next: unit -> 'a option } let chain (gen1 : 'a t) (gen2 : 'a t) : 'a t = @@ -24,7 +24,10 @@ module TaskGenerator = struct !gen1_returned_empty in let is_empty () = gen1_is_empty () && gen2.is_empty () in - let finished x = if gen1_is_empty () then gen2.finished x else gen1.finished x in + let finished ~completed work_item = + if gen1_is_empty () then gen2.finished ~completed work_item + else gen1.finished ~completed work_item + in let next x = if gen1_is_empty () then gen2.next x else gen1.next x in {remaining_tasks; is_empty; finished; next} @@ -34,7 +37,7 @@ module TaskGenerator = struct let length = ref (List.length lst) in let remaining_tasks () = !length in let is_empty () = List.is_empty !content in - let finished _finished_item = decr length in + let finished ~completed:_ _work_item = decr length in let next () = match !content with | [] -> @@ -98,7 +101,7 @@ type worker_message = | UpdateStatus of int * Mtime.t * string (** [(i, t, status)]: starting a task from slot [i], at start time [t], with description [status]. Watch out that [status] must not be too close in length to [buffer_size]. *) - | Ready of int + | Ready of {worker: int; completed: bool} (** Sent after finishing initializing or after finishing a given task. When received by master, this moves the worker state from [Initializing] or [Processing _] to [Idle]. *) | Crash of int (** there was an error and the child is no longer receiving messages *) @@ -260,12 +263,12 @@ let process_updates pool buffer = (* clean crash, give the child process a chance to cleanup *) Unix.wait (`Pid pid) |> ignore ; killall pool ~slot "see backtrace above" - | Ready slot -> + | Ready {worker= slot; completed} -> ( match pool.children_states.(slot) with - | Processing work -> - pool.tasks.finished work | Initializing -> () + | Processing work -> + pool.tasks.finished ~completed work | Idle -> L.die InternalError "Received a Ready message from an idle worker@." ) ; TaskBar.set_remaining_tasks pool.task_bar (pool.tasks.remaining_tasks ()) ; @@ -327,8 +330,9 @@ let wait_all pool = (** worker loop: wait for tasks and run [f] on them until we are told to go home *) -let rec child_loop ~slot send_to_parent send_final receive_from_parent ~f ~epilogue = - send_to_parent (Ready slot) ; +let rec child_loop ~slot send_to_parent send_final receive_from_parent ~f ~epilogue ~prev_completed + = + send_to_parent (Ready {worker= slot; completed= prev_completed}) ; match receive_from_parent () with | GoHome -> ( match epilogue () with @@ -356,7 +360,9 @@ let rec child_loop ~slot send_to_parent send_final receive_from_parent ~f ~epilo (* crash hard, but first let the master know that we have crashed *) send_to_parent (Crash slot) ; true ) ) ) ; + (* This is temporary. prev_completed should contain the return value of f stuff *) child_loop ~slot send_to_parent send_final receive_from_parent ~f ~epilogue + ~prev_completed:true (** Fork a new child and start it so that it is ready for work. @@ -402,7 +408,8 @@ let fork_child ~file_lock ~child_prelude ~slot (updates_r, updates_w) ~f ~epilog PerfEvent.(log (fun logger -> log_end_event logger ())) ; x in - child_loop ~slot send_to_parent send_final receive_from_parent ~f ~epilogue ; + child_loop ~slot send_to_parent send_final receive_from_parent ~f ~epilogue + ~prev_completed:true ; Out_channel.close updates_oc ; In_channel.close orders_ic ; Epilogues.run () ; diff --git a/infer/src/base/ProcessPool.mli b/infer/src/base/ProcessPool.mli index b41306ffc..074f13d82 100644 --- a/infer/src/base/ProcessPool.mli +++ b/infer/src/base/ProcessPool.mli @@ -15,7 +15,7 @@ module TaskGenerator : sig not a bug *) ; is_empty: unit -> bool (** when should the main loop of the task manager stop expecting new tasks *) - ; finished: 'a -> unit + ; finished: completed:bool -> 'a -> unit (** Process pool calls [finished x] when a worker finishes item [x]. This is only called if [next ()] has previously returned [Some x] and [x] was sent to a worker. *) ; next: unit -> 'a option