From c697222a0426ece928e0c78c16b64ec6c162381e Mon Sep 17 00:00:00 2001 From: Nikos Gorogiannis Date: Sat, 25 May 2019 02:23:55 -0700 Subject: [PATCH] [scheduler] fix deadlock introduced in D15373840 Reviewed By: mbouaziz Differential Revision: D15494101 fbshipit-source-id: 380cdfa7e --- infer/src/base/ProcessPool.ml | 50 ++++++++++++++++++++++++----------- 1 file changed, 34 insertions(+), 16 deletions(-) diff --git a/infer/src/base/ProcessPool.ml b/infer/src/base/ProcessPool.ml index ca4469282..5b8f9b5a4 100644 --- a/infer/src/base/ProcessPool.ml +++ b/infer/src/base/ProcessPool.ml @@ -11,6 +11,14 @@ module L = Logging type child_info = {pid: Pid.t; down_pipe: Out_channel.t} +(** The master's abstraction of state for workers. + See [worker_message] and [boss_message] below for transitions between states. + - [Initializing] is the state a newly-forked worker is in. + - [Idle] is the state a worker goes to after it finishes initializing, or finishes processing a work item. + - [Processing x] means the worker is currently processing [x]. +*) +type 'a child_state = Initializing | Idle | Processing of 'a + type 'a task_generator = {n_tasks: int; is_empty: unit -> bool; finished: 'a -> unit; next: unit -> 'a option} @@ -21,8 +29,7 @@ type 'a t = ; slots: child_info Array.t (** array of child processes with their pids and channels we can use to send work down to each child *) - ; pending_items: 'a option Array.t - (** array keeping sent tasks to children; used for feeding the generator a child finishes *) + ; children_states: 'a child_state Array.t (** array tracking the state of each worker *) ; children_updates: Unix.File_descr.t (** all the children send updates up the same pipe to the pool *) ; task_bar: TaskBar.t @@ -54,12 +61,15 @@ type worker_message = | UpdateStatus of int * Mtime.t * string (** [(i, t, status)]: starting a task from slot [i], at start time [t], with description [status]. Watch out that [status] must not be too close in length to [buffer_size]. *) - | Ready of int (** finished the given task, ready to receive messages *) + | Ready of int + (** Sent after finishing initializing or after finishing a given task. + When received by master, this moves the worker state from [Initializing] or [Processing _] to [Idle]. *) | Crash of int (** there was an error and the child is no longer receiving messages *) (** messages from the parent process down to worker processes *) type 'a boss_message = - | Do of 'a (** a task to do *) + | Do of 'a + (** [Do x] is sent only when the worker is [Idle], and moves worker state to [Processing x] *) | GoHome (** all tasks done, prepare for teardown *) (** convenience function to send data down pipes without forgetting to flush *) @@ -141,15 +151,16 @@ let has_dead_child pool = , status ) ) -let idle_children pool = - Array.fold pool.pending_items ~init:0 ~f:(fun acc -> function Some _ -> acc | None -> 1 + acc) +let child_is_idle = function Idle _ -> true | _ -> false +let all_children_idle pool = Array.for_all pool.children_states ~f:child_is_idle let send_work_to_child pool slot = + assert (child_is_idle pool.children_states.(slot)) ; pool.tasks.next () |> Option.iter ~f:(fun x -> let {down_pipe} = pool.slots.(slot) in - pool.pending_items.(slot) <- Some x ; + pool.children_states.(slot) <- Processing x ; marshal_to_pipe down_pipe (Do x) ) @@ -209,9 +220,6 @@ let process_updates pool buffer = has_dead_child pool |> Option.iter ~f:(fun (slot, status) -> killall pool ~slot (Unix.Exit_or_signal.to_string_hum status) ) ; - (* try to schedule more work if there is an idle worker *) - Array.findi pool.pending_items ~f:(fun _idx item -> Option.is_none item) - |> Option.iter ~f:(fun (idle_slot, _) -> send_work_to_child pool idle_slot) ; wait_for_updates pool buffer |> Option.iter ~f:(function | UpdateStatus (slot, t, status) -> @@ -224,9 +232,19 @@ let process_updates pool buffer = | Ready slot -> TaskBar.tasks_done_add pool.task_bar 1 ; TaskBar.update_status pool.task_bar ~slot (Mtime_clock.now ()) "idle" ; - Option.iter pool.pending_items.(slot) ~f:(fun work -> - pool.tasks.finished work ; pool.pending_items.(slot) <- None ) ; - send_work_to_child pool slot ) + ( match pool.children_states.(slot) with + | Processing work -> + pool.tasks.finished work + | Initializing -> + () + | Idle -> + L.die InternalError "Received a Ready message from an idle worker@." ) ; + pool.children_states.(slot) <- Idle ) ; + (* try to schedule more work if there are idle workers *) + if not (pool.tasks.is_empty ()) then + Array.iteri pool.children_states ~f:(fun slot state -> + match state with Idle -> send_work_to_child pool slot | Initializing | Processing _ -> () + ) (** terminate all worker processes *) @@ -334,8 +352,8 @@ let create : (* we have forked the child processes and are now in the parent *) let[@warning "-26"] pipe_child_w = Unix.close pipe_child_w in let children_updates = pipe_child_r in - let pending_items : 'a option Array.t = Array.create ~len:jobs None in - {slots; children_updates; jobs; task_bar; tasks; pending_items} + let children_states = Array.create ~len:jobs Initializing in + {slots; children_updates; jobs; task_bar; tasks; children_states} let run pool = @@ -349,7 +367,7 @@ let run pool = (* allocate a buffer for reading children updates once for the whole run *) let buffer = Bytes.create buffer_size in (* wait for all children to run out of tasks *) - while not (pool.tasks.is_empty () && idle_children pool >= pool.jobs) do + while not (pool.tasks.is_empty () && all_children_idle pool) do process_updates pool buffer ; TaskBar.refresh pool.task_bar done ; wait_all pool ;