diff --git a/infer/src/base/ProcessPool.ml b/infer/src/base/ProcessPool.ml index 2c38ddc7d..8fff41ccf 100644 --- a/infer/src/base/ProcessPool.ml +++ b/infer/src/base/ProcessPool.ml @@ -9,11 +9,13 @@ open! IStd module F = Format module L = Logging +type child_info = {pid: Pid.t; down_pipe: Out_channel.t} + (** the state of the pool *) type 'a t = { jobs: int (** number of jobs running in parallel, i.e. number of children we are responsible for *) - ; slots: (Pid.t * Out_channel.t) Array.t + ; slots: child_info Array.t (** array of child processes with their pids and channels we can use to send work down to each child *) ; children_updates: Unix.File_descr.t @@ -50,6 +52,7 @@ type worker_message = (** [(i, t, status)]: starting a task from slot [i], at start time [t], with description [status]. Watch out that [status] must not be too close in length to [buffer_size]. *) | Ready of int (** finished the given task, ready to receive messages *) + | Crash of int (** there was an error and the child is no longer receiving messages *) (** messages from the parent process down to worker processes *) type 'a boss_message = @@ -68,71 +71,100 @@ let rec really_read ?(pos= 0) ~len fd ~buf = really_read ~pos:(pos + read) ~len:(len - read) fd ~buf -(** main dispatch function that responds to messages from worker processes and updates the taskbar - periodically *) -let process_updates pool buffer = - let timeout = if TaskBar.is_interactive pool.task_bar then refresh_timeout else `Never in +(** return [true] if the [file_descr] is ready for reading after at most [timeout] has + elapsed *) +let wait_for_updates file_descr timeout = (* Use select(2) so that we can both wait on the pipe of children updates and wait for a timeout. The timeout is for giving a chance to the taskbar of refreshing from time to time. *) let {Unix.Select_fds.read= read_fds} = - Unix.select ~read:[pool.children_updates] ~write:[] ~except:[] ~timeout () + Unix.select ~read:[file_descr] ~write:[] ~except:[] ~timeout () in - match read_fds with - | _ :: _ :: _ -> - assert false - | [] -> - () - | [_updates_fd] -> - (* Read one OCaml value at a time. This is done by first reading the header of the marshalled - value (fixed size), then get the total size of the data from that header, then request a - read of the full OCaml value. - - This way the buffer is used for only one OCaml value at a time. This is simpler (values do - not overlap across the end of a read and the beginning of another) and means we do not need - a large buffer as long as messages are never bigger than the buffer. - - This works somewhat like [Marshal.from_channel] but uses the file descriptor directly - instead of an [in_channel]. Do *not* read from the pipe via an [in_channel] as they read - as much as possible eagerly. This can empty the pipe without us having a way to tell that - there is more to read anymore since the [select] call will return that there is nothing to - read. *) - really_read pool.children_updates ~buf:buffer ~len:Marshal.header_size ; - let data_size = Marshal.data_size buffer 0 in - really_read pool.children_updates ~buf:buffer ~pos:Marshal.header_size ~len:data_size ; - match Marshal.from_bytes buffer 0 with - | UpdateStatus (slot, t, status) -> - TaskBar.update_status pool.task_bar ~slot t status - | Ready slot -> - TaskBar.tasks_done_add pool.task_bar 1 ; - match pool.tasks with - | [] -> - TaskBar.update_status pool.task_bar ~slot (Mtime_clock.now ()) "idle" ; - pool.idle_children <- pool.idle_children + 1 - | x :: tasks -> - pool.tasks <- tasks ; - let pipe = snd (pool.slots).(slot) in - marshal_to_pipe pipe (Do x) + match read_fds with _ :: _ :: _ -> assert false | [] -> false | [_file_descr] -> true + + +let killall pool ~slot status = + Array.iter pool.slots ~f:(fun {pid} -> + match Signal.send Signal.term (`Pid pid) with `Ok | `No_such_process -> () ) ; + Array.iter pool.slots ~f:(fun {pid} -> + try Unix.wait (`Pid pid) |> ignore with Unix.Unix_error (ECHILD, _, _) -> + (* some children may have died already, it's fine *) () ) ; + L.die InternalError "Subprocess %d: %s" slot status + + +let has_dead_child pool = + Unix.wait_nohang `Any + |> Option.map ~f:(fun (dead_pid, status) -> + ( Array.find_mapi_exn pool.slots ~f:(fun slot {pid} -> + if Pid.equal pid dead_pid then Some slot else None ) + , status ) ) + + +(** main dispatch function that responds to messages from worker processes and updates the taskbar + periodically *) +let process_updates pool buffer = + (* abort everything if some child has died unexpectedly *) + has_dead_child pool + |> Option.iter ~f:(fun (slot, status) -> + killall pool ~slot (Unix.Exit_or_signal.to_string_hum status) ) ; + let timeout = if TaskBar.is_interactive pool.task_bar then refresh_timeout else `Never in + if wait_for_updates pool.children_updates timeout then ( + (* Read one OCaml value at a time. This is done by first reading the header of the marshalled + value (fixed size), then get the total size of the data from that header, then request a + read of the full OCaml value. + + This way the buffer is used for only one OCaml value at a time. This is simpler (values do + not overlap across the end of a read and the beginning of another) and means we do not need + a large buffer as long as messages are never bigger than the buffer. + + This works somewhat like [Marshal.from_channel] but uses the file descriptor directly + instead of an [in_channel]. Do *not* read from the pipe via an [in_channel] as they read + as much as possible eagerly. This can empty the pipe without us having a way to tell that + there is more to read anymore since the [select] call will return that there is nothing to + read. *) + really_read pool.children_updates ~buf:buffer ~len:Marshal.header_size ; + let data_size = Marshal.data_size buffer 0 in + really_read pool.children_updates ~buf:buffer ~pos:Marshal.header_size ~len:data_size ; + let update = Marshal.from_bytes buffer 0 in + match update with + | UpdateStatus (slot, t, status) -> + TaskBar.update_status pool.task_bar ~slot t status + | Crash slot -> + let {pid} = (pool.slots).(slot) in + (* clean crash, give the child process a chance to cleanup *) + Unix.wait (`Pid pid) |> ignore ; + killall pool ~slot "see backtrace above" + | Ready slot -> + TaskBar.tasks_done_add pool.task_bar 1 ; + match pool.tasks with + | [] -> + TaskBar.update_status pool.task_bar ~slot (Mtime_clock.now ()) "idle" ; + pool.idle_children <- pool.idle_children + 1 + | x :: tasks -> + pool.tasks <- tasks ; + let {down_pipe} = (pool.slots).(slot) in + marshal_to_pipe down_pipe (Do x) ) (** terminate all worker processes *) let wait_all pool = - (* tell all workers to go home *) - Array.iter pool.slots ~f:(fun (_, pipe) -> marshal_to_pipe pipe GoHome ; Out_channel.close pipe) ; - (* wait(2) all the pids one by one; the order doesn't matter since we want to wait for all of them - eventually anyway. *) + (* tell each alive worker to go home and wait(2) them, one by one; the order doesn't matter since + we want to wait for all of them eventually anyway. *) let errors = - Array.fold ~init:[] pool.slots ~f:(fun errors (pid, _) -> + Array.foldi ~init:[] pool.slots ~f:(fun slot errors {pid; down_pipe} -> + marshal_to_pipe down_pipe GoHome ; + Out_channel.close down_pipe ; match Unix.wait (`Pid pid) with | _pid, Ok () -> errors | _pid, (Error _ as status) -> (* Collect all children errors and die only at the end to avoid creating zombies. *) - status :: errors ) + (slot, status) :: errors ) in if not (List.is_empty errors) then let log_or_die = if Config.keep_going then L.internal_error else L.die InternalError in - let pp_error f status = - F.fprintf f "Error in infer subprocess: %s@." (Unix.Exit_or_signal.to_string_hum status) + let pp_error f (slot, status) = + F.fprintf f "Error in infer subprocess %d: %s@." slot + (Unix.Exit_or_signal.to_string_hum status) in log_or_die "@[%a@]%!" (Pp.seq ~print_env:Pp.text_break ~sep:"" pp_error) errors @@ -141,12 +173,20 @@ let wait_all pool = let rec child_loop ~slot send_to_parent receive_from_parent ~f = send_to_parent (Ready slot) ; match receive_from_parent () with - | Do stuff -> - (* TODO: error handling *) - f stuff ; - child_loop ~slot send_to_parent receive_from_parent ~f | GoHome -> () + | Do stuff -> + ( try f stuff with e -> + IExn.reraise_if e ~f:(fun () -> + if Config.keep_going then ( + L.internal_error "Error in subprocess %d: %a@." slot Exn.pp e ; + (* do not raise and continue accepting jobs *) + false ) + else ( + (* crash hard, but first let the master know that we have crashed *) + send_to_parent (Crash slot) ; + true ) ) ) ; + child_loop ~slot send_to_parent receive_from_parent ~f (** Fork a new child and start it so that it is ready for work. @@ -186,7 +226,7 @@ let fork_child ~child_prelude ~slot (updates_r, updates_w) ~f = Pervasives.exit 0 | `In_the_parent pid -> let[@warning "-26"] to_child_r = Unix.close to_child_r in - (pid, Unix.out_channel_of_descr to_child_w) + {pid; down_pipe= Unix.out_channel_of_descr to_child_w} let create : jobs:int -> child_prelude:(unit -> unit) -> TaskBar.t -> f:('a -> unit) -> 'a t = @@ -214,7 +254,7 @@ let run pool tasks = (* allocate a buffer for reading children updates once for the whole run *) let buffer = Bytes.create buffer_size in (* wait for all children to run out of tasks *) - while pool.idle_children < pool.jobs do + while not (List.is_empty pool.tasks && pool.idle_children >= pool.jobs) do process_updates pool buffer ; TaskBar.refresh pool.task_bar done ; wait_all pool