diff --git a/infer/src/base/ProcessPool.ml b/infer/src/base/ProcessPool.ml index 0ed323394..9c1d899ad 100644 --- a/infer/src/base/ProcessPool.ml +++ b/infer/src/base/ProcessPool.ml @@ -60,7 +60,13 @@ type 'a boss_message = | GoHome (** all tasks done, prepare for teardown *) (** convenience function to send data down pipes without forgetting to flush *) -let marshal_to_pipe fd x = Marshal.to_channel fd x [] ; Out_channel.flush fd +let marshal_to_pipe fd x = + PerfEvent.log (fun logger -> + PerfEvent.log_begin_event logger ~categories:["sys"] ~name:"send to pipe" () ) ; + Marshal.to_channel fd x [] ; + Out_channel.flush fd ; + PerfEvent.(log (fun logger -> log_end_event logger ())) + (** like [Unix.read] but reads until [len] bytes have been read *) let rec really_read ?(pos = 0) ~len fd ~buf = @@ -73,13 +79,45 @@ let rec really_read ?(pos = 0) ~len fd ~buf = (** return [true] if the [file_descr] is ready for reading after at most [timeout] has elapsed *) -let wait_for_updates file_descr timeout = +let wait_for_updates pool buffer = + let file_descr = pool.children_updates in + let timeout = if TaskBar.is_interactive pool.task_bar then refresh_timeout else `Never in (* Use select(2) so that we can both wait on the pipe of children updates and wait for a timeout. The timeout is for giving a chance to the taskbar of refreshing from time to time. *) let {Unix.Select_fds.read= read_fds} = Unix.select ~read:[file_descr] ~write:[] ~except:[] ~timeout () in - match read_fds with _ :: _ :: _ -> assert false | [] -> false | [_file_descr] -> true + match read_fds with + | _ :: _ :: _ -> + assert false + | [] -> + (* not ready *) None + | [_file_descr] -> + (* Read one OCaml value at a time. This is done by first reading the header of the marshalled + value (fixed size), then get the total size of the data from that header, then request a + read of the full OCaml value. + + This way the buffer is used for only one OCaml value at a time. This is simpler (values do + not overlap across the end of a read and the beginning of another) and means we do not need + a large buffer as long as messages are never bigger than the buffer. + + This works somewhat like [Marshal.from_channel] but uses the file descriptor directly + instead of an [in_channel]. Do *not* read from the pipe via an [in_channel] as they read + as much as possible eagerly. This can empty the pipe without us having a way to tell that + there is more to read anymore since the [select] call will return that there is nothing to + read. *) + really_read pool.children_updates ~buf:buffer ~len:Marshal.header_size ; + let data_size = Marshal.data_size buffer 0 in + really_read pool.children_updates ~buf:buffer ~pos:Marshal.header_size ~len:data_size ; + Some (Marshal.from_bytes buffer 0) + + +let wait_for_updates pool buffer = + PerfEvent.log (fun logger -> + PerfEvent.log_begin_event logger ~categories:["sys"] ~name:"wait for event" () ) ; + let update = wait_for_updates pool buffer in + PerfEvent.(log (fun logger -> log_end_event logger ())) ; + update let killall pool ~slot status = @@ -106,43 +144,26 @@ let process_updates pool buffer = has_dead_child pool |> Option.iter ~f:(fun (slot, status) -> killall pool ~slot (Unix.Exit_or_signal.to_string_hum status) ) ; - let timeout = if TaskBar.is_interactive pool.task_bar then refresh_timeout else `Never in - if wait_for_updates pool.children_updates timeout then ( - (* Read one OCaml value at a time. This is done by first reading the header of the marshalled - value (fixed size), then get the total size of the data from that header, then request a - read of the full OCaml value. - - This way the buffer is used for only one OCaml value at a time. This is simpler (values do - not overlap across the end of a read and the beginning of another) and means we do not need - a large buffer as long as messages are never bigger than the buffer. - - This works somewhat like [Marshal.from_channel] but uses the file descriptor directly - instead of an [in_channel]. Do *not* read from the pipe via an [in_channel] as they read - as much as possible eagerly. This can empty the pipe without us having a way to tell that - there is more to read anymore since the [select] call will return that there is nothing to - read. *) - really_read pool.children_updates ~buf:buffer ~len:Marshal.header_size ; - let data_size = Marshal.data_size buffer 0 in - really_read pool.children_updates ~buf:buffer ~pos:Marshal.header_size ~len:data_size ; - let update = Marshal.from_bytes buffer 0 in - match update with - | UpdateStatus (slot, t, status) -> - TaskBar.update_status pool.task_bar ~slot t status - | Crash slot -> - let {pid} = pool.slots.(slot) in - (* clean crash, give the child process a chance to cleanup *) - Unix.wait (`Pid pid) |> ignore ; - killall pool ~slot "see backtrace above" - | Ready slot -> ( - TaskBar.tasks_done_add pool.task_bar 1 ; - match pool.tasks with - | [] -> - TaskBar.update_status pool.task_bar ~slot (Mtime_clock.now ()) "idle" ; - pool.idle_children <- pool.idle_children + 1 - | x :: tasks -> - pool.tasks <- tasks ; - let {down_pipe} = pool.slots.(slot) in - marshal_to_pipe down_pipe (Do x) ) ) + match wait_for_updates pool buffer with + | Some (UpdateStatus (slot, t, status)) -> + TaskBar.update_status pool.task_bar ~slot t status + | Some (Crash slot) -> + let {pid} = pool.slots.(slot) in + (* clean crash, give the child process a chance to cleanup *) + Unix.wait (`Pid pid) |> ignore ; + killall pool ~slot "see backtrace above" + | Some (Ready slot) -> ( + TaskBar.tasks_done_add pool.task_bar 1 ; + match pool.tasks with + | [] -> + TaskBar.update_status pool.task_bar ~slot (Mtime_clock.now ()) "idle" ; + pool.idle_children <- pool.idle_children + 1 + | x :: tasks -> + pool.tasks <- tasks ; + let {down_pipe} = pool.slots.(slot) in + marshal_to_pipe down_pipe (Do x) ) + | None -> + () (** terminate all worker processes *) @@ -220,7 +241,13 @@ let fork_child ~child_prelude ~slot (updates_r, updates_w) ~f = in ProcessPoolState.update_status := update_status ; let orders_ic = Unix.in_channel_of_descr to_child_r in - let receive_from_parent () = Marshal.from_channel orders_ic in + let receive_from_parent () = + PerfEvent.log (fun logger -> + PerfEvent.log_begin_event logger ~categories:["sys"] ~name:"receive from pipe" () ) ; + let x = Marshal.from_channel orders_ic in + PerfEvent.(log (fun logger -> log_end_event logger ())) ; + x + in child_loop ~slot send_to_parent receive_from_parent ~f ; Out_channel.close updates_oc ; In_channel.close orders_ic ;