[scheduler] fix deadlock introduced in D15373840

Reviewed By: mbouaziz

Differential Revision: D15494101

fbshipit-source-id: 380cdfa7e
master
Nikos Gorogiannis 6 years ago committed by Facebook Github Bot
parent bc082da199
commit c697222a04

@ -11,6 +11,14 @@ module L = Logging
type child_info = {pid: Pid.t; down_pipe: Out_channel.t} type child_info = {pid: Pid.t; down_pipe: Out_channel.t}
(** The master's abstraction of state for workers.
See [worker_message] and [boss_message] below for transitions between states.
- [Initializing] is the state a newly-forked worker is in.
- [Idle] is the state a worker goes to after it finishes initializing, or finishes processing a work item.
- [Processing x] means the worker is currently processing [x].
*)
type 'a child_state = Initializing | Idle | Processing of 'a
type 'a task_generator = type 'a task_generator =
{n_tasks: int; is_empty: unit -> bool; finished: 'a -> unit; next: unit -> 'a option} {n_tasks: int; is_empty: unit -> bool; finished: 'a -> unit; next: unit -> 'a option}
@ -21,8 +29,7 @@ type 'a t =
; slots: child_info Array.t ; slots: child_info Array.t
(** array of child processes with their pids and channels we can use to send work down to (** array of child processes with their pids and channels we can use to send work down to
each child *) each child *)
; pending_items: 'a option Array.t ; children_states: 'a child_state Array.t (** array tracking the state of each worker *)
(** array keeping sent tasks to children; used for feeding the generator a child finishes *)
; children_updates: Unix.File_descr.t ; children_updates: Unix.File_descr.t
(** all the children send updates up the same pipe to the pool *) (** all the children send updates up the same pipe to the pool *)
; task_bar: TaskBar.t ; task_bar: TaskBar.t
@ -54,12 +61,15 @@ type worker_message =
| UpdateStatus of int * Mtime.t * string | UpdateStatus of int * Mtime.t * string
(** [(i, t, status)]: starting a task from slot [i], at start time [t], with description (** [(i, t, status)]: starting a task from slot [i], at start time [t], with description
[status]. Watch out that [status] must not be too close in length to [buffer_size]. *) [status]. Watch out that [status] must not be too close in length to [buffer_size]. *)
| Ready of int (** finished the given task, ready to receive messages *) | Ready of int
(** Sent after finishing initializing or after finishing a given task.
When received by master, this moves the worker state from [Initializing] or [Processing _] to [Idle]. *)
| Crash of int (** there was an error and the child is no longer receiving messages *) | Crash of int (** there was an error and the child is no longer receiving messages *)
(** messages from the parent process down to worker processes *) (** messages from the parent process down to worker processes *)
type 'a boss_message = type 'a boss_message =
| Do of 'a (** a task to do *) | Do of 'a
(** [Do x] is sent only when the worker is [Idle], and moves worker state to [Processing x] *)
| GoHome (** all tasks done, prepare for teardown *) | GoHome (** all tasks done, prepare for teardown *)
(** convenience function to send data down pipes without forgetting to flush *) (** convenience function to send data down pipes without forgetting to flush *)
@ -141,15 +151,16 @@ let has_dead_child pool =
, status ) ) , status ) )
let idle_children pool = let child_is_idle = function Idle _ -> true | _ -> false
Array.fold pool.pending_items ~init:0 ~f:(fun acc -> function Some _ -> acc | None -> 1 + acc)
let all_children_idle pool = Array.for_all pool.children_states ~f:child_is_idle
let send_work_to_child pool slot = let send_work_to_child pool slot =
assert (child_is_idle pool.children_states.(slot)) ;
pool.tasks.next () pool.tasks.next ()
|> Option.iter ~f:(fun x -> |> Option.iter ~f:(fun x ->
let {down_pipe} = pool.slots.(slot) in let {down_pipe} = pool.slots.(slot) in
pool.pending_items.(slot) <- Some x ; pool.children_states.(slot) <- Processing x ;
marshal_to_pipe down_pipe (Do x) ) marshal_to_pipe down_pipe (Do x) )
@ -209,9 +220,6 @@ let process_updates pool buffer =
has_dead_child pool has_dead_child pool
|> Option.iter ~f:(fun (slot, status) -> |> Option.iter ~f:(fun (slot, status) ->
killall pool ~slot (Unix.Exit_or_signal.to_string_hum status) ) ; killall pool ~slot (Unix.Exit_or_signal.to_string_hum status) ) ;
(* try to schedule more work if there is an idle worker *)
Array.findi pool.pending_items ~f:(fun _idx item -> Option.is_none item)
|> Option.iter ~f:(fun (idle_slot, _) -> send_work_to_child pool idle_slot) ;
wait_for_updates pool buffer wait_for_updates pool buffer
|> Option.iter ~f:(function |> Option.iter ~f:(function
| UpdateStatus (slot, t, status) -> | UpdateStatus (slot, t, status) ->
@ -224,9 +232,19 @@ let process_updates pool buffer =
| Ready slot -> | Ready slot ->
TaskBar.tasks_done_add pool.task_bar 1 ; TaskBar.tasks_done_add pool.task_bar 1 ;
TaskBar.update_status pool.task_bar ~slot (Mtime_clock.now ()) "idle" ; TaskBar.update_status pool.task_bar ~slot (Mtime_clock.now ()) "idle" ;
Option.iter pool.pending_items.(slot) ~f:(fun work -> ( match pool.children_states.(slot) with
pool.tasks.finished work ; pool.pending_items.(slot) <- None ) ; | Processing work ->
send_work_to_child pool slot ) pool.tasks.finished work
| Initializing ->
()
| Idle ->
L.die InternalError "Received a Ready message from an idle worker@." ) ;
pool.children_states.(slot) <- Idle ) ;
(* try to schedule more work if there are idle workers *)
if not (pool.tasks.is_empty ()) then
Array.iteri pool.children_states ~f:(fun slot state ->
match state with Idle -> send_work_to_child pool slot | Initializing | Processing _ -> ()
)
(** terminate all worker processes *) (** terminate all worker processes *)
@ -334,8 +352,8 @@ let create :
(* we have forked the child processes and are now in the parent *) (* we have forked the child processes and are now in the parent *)
let[@warning "-26"] pipe_child_w = Unix.close pipe_child_w in let[@warning "-26"] pipe_child_w = Unix.close pipe_child_w in
let children_updates = pipe_child_r in let children_updates = pipe_child_r in
let pending_items : 'a option Array.t = Array.create ~len:jobs None in let children_states = Array.create ~len:jobs Initializing in
{slots; children_updates; jobs; task_bar; tasks; pending_items} {slots; children_updates; jobs; task_bar; tasks; children_states}
let run pool = let run pool =
@ -349,7 +367,7 @@ let run pool =
(* allocate a buffer for reading children updates once for the whole run *) (* allocate a buffer for reading children updates once for the whole run *)
let buffer = Bytes.create buffer_size in let buffer = Bytes.create buffer_size in
(* wait for all children to run out of tasks *) (* wait for all children to run out of tasks *)
while not (pool.tasks.is_empty () && idle_children pool >= pool.jobs) do while not (pool.tasks.is_empty () && all_children_idle pool) do
process_updates pool buffer ; TaskBar.refresh pool.task_bar process_updates pool buffer ; TaskBar.refresh pool.task_bar
done ; done ;
wait_all pool ; wait_all pool ;

Loading…
Cancel
Save