From b0427c9390a792a0f2219236b0cc0f9205308329 Mon Sep 17 00:00:00 2001 From: Jules Villard Date: Fri, 22 Jun 2018 02:36:58 -0700 Subject: [PATCH] [taskbar] die when a child errors Summary: Previously we would block indefinitely waiting for that child to send us an update. Now errors in the child are caught and the main process dies, taking everyone down with it. When a child dies, it sends a "Crash" message to the parent, unless it died receiving a signal (like a segmentation fault, or an external signal). In that case, the OCaml runtime won't get a chance to notice its death and send the "Crash" message. Thus, always check if some child has died from under us as well. Reviewed By: mbouaziz Differential Revision: D8577095 fbshipit-source-id: 519992b --- infer/src/base/ProcessPool.ml | 150 +++++++++++++++++++++------------- 1 file changed, 95 insertions(+), 55 deletions(-) diff --git a/infer/src/base/ProcessPool.ml b/infer/src/base/ProcessPool.ml index 2c38ddc7d..8fff41ccf 100644 --- a/infer/src/base/ProcessPool.ml +++ b/infer/src/base/ProcessPool.ml @@ -9,11 +9,13 @@ open! IStd module F = Format module L = Logging +type child_info = {pid: Pid.t; down_pipe: Out_channel.t} + (** the state of the pool *) type 'a t = { jobs: int (** number of jobs running in parallel, i.e. number of children we are responsible for *) - ; slots: (Pid.t * Out_channel.t) Array.t + ; slots: child_info Array.t (** array of child processes with their pids and channels we can use to send work down to each child *) ; children_updates: Unix.File_descr.t @@ -50,6 +52,7 @@ type worker_message = (** [(i, t, status)]: starting a task from slot [i], at start time [t], with description [status]. Watch out that [status] must not be too close in length to [buffer_size]. *) | Ready of int (** finished the given task, ready to receive messages *) + | Crash of int (** there was an error and the child is no longer receiving messages *) (** messages from the parent process down to worker processes *) type 'a boss_message = @@ -68,71 +71,100 @@ let rec really_read ?(pos= 0) ~len fd ~buf = really_read ~pos:(pos + read) ~len:(len - read) fd ~buf -(** main dispatch function that responds to messages from worker processes and updates the taskbar - periodically *) -let process_updates pool buffer = - let timeout = if TaskBar.is_interactive pool.task_bar then refresh_timeout else `Never in +(** return [true] if the [file_descr] is ready for reading after at most [timeout] has + elapsed *) +let wait_for_updates file_descr timeout = (* Use select(2) so that we can both wait on the pipe of children updates and wait for a timeout. The timeout is for giving a chance to the taskbar of refreshing from time to time. *) let {Unix.Select_fds.read= read_fds} = - Unix.select ~read:[pool.children_updates] ~write:[] ~except:[] ~timeout () + Unix.select ~read:[file_descr] ~write:[] ~except:[] ~timeout () in - match read_fds with - | _ :: _ :: _ -> - assert false - | [] -> - () - | [_updates_fd] -> - (* Read one OCaml value at a time. This is done by first reading the header of the marshalled - value (fixed size), then get the total size of the data from that header, then request a - read of the full OCaml value. - - This way the buffer is used for only one OCaml value at a time. This is simpler (values do - not overlap across the end of a read and the beginning of another) and means we do not need - a large buffer as long as messages are never bigger than the buffer. - - This works somewhat like [Marshal.from_channel] but uses the file descriptor directly - instead of an [in_channel]. Do *not* read from the pipe via an [in_channel] as they read - as much as possible eagerly. This can empty the pipe without us having a way to tell that - there is more to read anymore since the [select] call will return that there is nothing to - read. *) - really_read pool.children_updates ~buf:buffer ~len:Marshal.header_size ; - let data_size = Marshal.data_size buffer 0 in - really_read pool.children_updates ~buf:buffer ~pos:Marshal.header_size ~len:data_size ; - match Marshal.from_bytes buffer 0 with - | UpdateStatus (slot, t, status) -> - TaskBar.update_status pool.task_bar ~slot t status - | Ready slot -> - TaskBar.tasks_done_add pool.task_bar 1 ; - match pool.tasks with - | [] -> - TaskBar.update_status pool.task_bar ~slot (Mtime_clock.now ()) "idle" ; - pool.idle_children <- pool.idle_children + 1 - | x :: tasks -> - pool.tasks <- tasks ; - let pipe = snd (pool.slots).(slot) in - marshal_to_pipe pipe (Do x) + match read_fds with _ :: _ :: _ -> assert false | [] -> false | [_file_descr] -> true + + +let killall pool ~slot status = + Array.iter pool.slots ~f:(fun {pid} -> + match Signal.send Signal.term (`Pid pid) with `Ok | `No_such_process -> () ) ; + Array.iter pool.slots ~f:(fun {pid} -> + try Unix.wait (`Pid pid) |> ignore with Unix.Unix_error (ECHILD, _, _) -> + (* some children may have died already, it's fine *) () ) ; + L.die InternalError "Subprocess %d: %s" slot status + + +let has_dead_child pool = + Unix.wait_nohang `Any + |> Option.map ~f:(fun (dead_pid, status) -> + ( Array.find_mapi_exn pool.slots ~f:(fun slot {pid} -> + if Pid.equal pid dead_pid then Some slot else None ) + , status ) ) + + +(** main dispatch function that responds to messages from worker processes and updates the taskbar + periodically *) +let process_updates pool buffer = + (* abort everything if some child has died unexpectedly *) + has_dead_child pool + |> Option.iter ~f:(fun (slot, status) -> + killall pool ~slot (Unix.Exit_or_signal.to_string_hum status) ) ; + let timeout = if TaskBar.is_interactive pool.task_bar then refresh_timeout else `Never in + if wait_for_updates pool.children_updates timeout then ( + (* Read one OCaml value at a time. This is done by first reading the header of the marshalled + value (fixed size), then get the total size of the data from that header, then request a + read of the full OCaml value. + + This way the buffer is used for only one OCaml value at a time. This is simpler (values do + not overlap across the end of a read and the beginning of another) and means we do not need + a large buffer as long as messages are never bigger than the buffer. + + This works somewhat like [Marshal.from_channel] but uses the file descriptor directly + instead of an [in_channel]. Do *not* read from the pipe via an [in_channel] as they read + as much as possible eagerly. This can empty the pipe without us having a way to tell that + there is more to read anymore since the [select] call will return that there is nothing to + read. *) + really_read pool.children_updates ~buf:buffer ~len:Marshal.header_size ; + let data_size = Marshal.data_size buffer 0 in + really_read pool.children_updates ~buf:buffer ~pos:Marshal.header_size ~len:data_size ; + let update = Marshal.from_bytes buffer 0 in + match update with + | UpdateStatus (slot, t, status) -> + TaskBar.update_status pool.task_bar ~slot t status + | Crash slot -> + let {pid} = (pool.slots).(slot) in + (* clean crash, give the child process a chance to cleanup *) + Unix.wait (`Pid pid) |> ignore ; + killall pool ~slot "see backtrace above" + | Ready slot -> + TaskBar.tasks_done_add pool.task_bar 1 ; + match pool.tasks with + | [] -> + TaskBar.update_status pool.task_bar ~slot (Mtime_clock.now ()) "idle" ; + pool.idle_children <- pool.idle_children + 1 + | x :: tasks -> + pool.tasks <- tasks ; + let {down_pipe} = (pool.slots).(slot) in + marshal_to_pipe down_pipe (Do x) ) (** terminate all worker processes *) let wait_all pool = - (* tell all workers to go home *) - Array.iter pool.slots ~f:(fun (_, pipe) -> marshal_to_pipe pipe GoHome ; Out_channel.close pipe) ; - (* wait(2) all the pids one by one; the order doesn't matter since we want to wait for all of them - eventually anyway. *) + (* tell each alive worker to go home and wait(2) them, one by one; the order doesn't matter since + we want to wait for all of them eventually anyway. *) let errors = - Array.fold ~init:[] pool.slots ~f:(fun errors (pid, _) -> + Array.foldi ~init:[] pool.slots ~f:(fun slot errors {pid; down_pipe} -> + marshal_to_pipe down_pipe GoHome ; + Out_channel.close down_pipe ; match Unix.wait (`Pid pid) with | _pid, Ok () -> errors | _pid, (Error _ as status) -> (* Collect all children errors and die only at the end to avoid creating zombies. *) - status :: errors ) + (slot, status) :: errors ) in if not (List.is_empty errors) then let log_or_die = if Config.keep_going then L.internal_error else L.die InternalError in - let pp_error f status = - F.fprintf f "Error in infer subprocess: %s@." (Unix.Exit_or_signal.to_string_hum status) + let pp_error f (slot, status) = + F.fprintf f "Error in infer subprocess %d: %s@." slot + (Unix.Exit_or_signal.to_string_hum status) in log_or_die "@[%a@]%!" (Pp.seq ~print_env:Pp.text_break ~sep:"" pp_error) errors @@ -141,12 +173,20 @@ let wait_all pool = let rec child_loop ~slot send_to_parent receive_from_parent ~f = send_to_parent (Ready slot) ; match receive_from_parent () with - | Do stuff -> - (* TODO: error handling *) - f stuff ; - child_loop ~slot send_to_parent receive_from_parent ~f | GoHome -> () + | Do stuff -> + ( try f stuff with e -> + IExn.reraise_if e ~f:(fun () -> + if Config.keep_going then ( + L.internal_error "Error in subprocess %d: %a@." slot Exn.pp e ; + (* do not raise and continue accepting jobs *) + false ) + else ( + (* crash hard, but first let the master know that we have crashed *) + send_to_parent (Crash slot) ; + true ) ) ) ; + child_loop ~slot send_to_parent receive_from_parent ~f (** Fork a new child and start it so that it is ready for work. @@ -186,7 +226,7 @@ let fork_child ~child_prelude ~slot (updates_r, updates_w) ~f = Pervasives.exit 0 | `In_the_parent pid -> let[@warning "-26"] to_child_r = Unix.close to_child_r in - (pid, Unix.out_channel_of_descr to_child_w) + {pid; down_pipe= Unix.out_channel_of_descr to_child_w} let create : jobs:int -> child_prelude:(unit -> unit) -> TaskBar.t -> f:('a -> unit) -> 'a t = @@ -214,7 +254,7 @@ let run pool tasks = (* allocate a buffer for reading children updates once for the whole run *) let buffer = Bytes.create buffer_size in (* wait for all children to run out of tasks *) - while pool.idle_children < pool.jobs do + while not (List.is_empty pool.tasks && pool.idle_children >= pool.jobs) do process_updates pool buffer ; TaskBar.refresh pool.task_bar done ; wait_all pool