[scheduler] consume all updates each time

Reviewed By: mbouaziz

Differential Revision: D15740858

fbshipit-source-id: 0d7fd2cab
master
Nikos Gorogiannis 6 years ago committed by Facebook Github Bot
parent d2eb3c8cc6
commit 7904ca31c0

@ -90,21 +90,23 @@ let rec really_read ?(pos = 0) ~len fd ~buf =
really_read ~pos:(pos + read) ~len:(len - read) fd ~buf really_read ~pos:(pos + read) ~len:(len - read) fd ~buf
(** return [true] if the [file_descr] is ready for reading after at most [timeout] has (** return a list of all updates coming from workers. The first update is expected for up to the
elapsed *) timeout [refresh_timeout]. After that, all already received updates are consumed but with zero timeout.
If there is none left, return the list. *)
let wait_for_updates pool buffer = let wait_for_updates pool buffer =
let rec aux acc ~timeout =
let file_descr = pool.children_updates in let file_descr = pool.children_updates in
(* Use select(2) so that we can both wait on the pipe of children updates and wait for a (* Use select(2) so that we can both wait on the pipe of children updates and wait for a
timeout. The timeout is for giving a chance to the taskbar of refreshing from time to time, timeout. The timeout is for giving a chance to the taskbar of refreshing from time to time,
as well as for checking for new work where none were previously available. *) as well as for checking for new work where none were previously available. *)
let {Unix.Select_fds.read= read_fds} = let {Unix.Select_fds.read= read_fds} =
Unix.select ~read:[file_descr] ~write:[] ~except:[] ~timeout:refresh_timeout () Unix.select ~read:[file_descr] ~write:[] ~except:[] ~timeout ()
in in
match read_fds with match read_fds with
| _ :: _ :: _ -> | _ :: _ :: _ ->
assert false assert false
| [] -> | [] ->
(* not ready *) None (* no updates, break loop *) acc
| [_file_descr] -> | [_file_descr] ->
(* Read one OCaml value at a time. This is done by first reading the header of the marshalled (* Read one OCaml value at a time. This is done by first reading the header of the marshalled
value (fixed size), then get the total size of the data from that header, then request a value (fixed size), then get the total size of the data from that header, then request a
@ -122,7 +124,9 @@ let wait_for_updates pool buffer =
really_read pool.children_updates ~buf:buffer ~len:Marshal.header_size ; really_read pool.children_updates ~buf:buffer ~len:Marshal.header_size ;
let data_size = Marshal.data_size buffer 0 in let data_size = Marshal.data_size buffer 0 in
really_read pool.children_updates ~buf:buffer ~pos:Marshal.header_size ~len:data_size ; really_read pool.children_updates ~buf:buffer ~pos:Marshal.header_size ~len:data_size ;
Some (Marshal.from_bytes buffer 0) aux (Marshal.from_bytes buffer 0 :: acc) ~timeout:`Immediately
in
aux [] ~timeout:refresh_timeout |> List.rev
let wait_for_updates pool buffer = let wait_for_updates pool buffer =
@ -221,7 +225,7 @@ let process_updates pool buffer =
|> Option.iter ~f:(fun (slot, status) -> |> Option.iter ~f:(fun (slot, status) ->
killall pool ~slot (Unix.Exit_or_signal.to_string_hum status) ) ; killall pool ~slot (Unix.Exit_or_signal.to_string_hum status) ) ;
wait_for_updates pool buffer wait_for_updates pool buffer
|> Option.iter ~f:(function |> List.iter ~f:(function
| UpdateStatus (slot, t, status) -> | UpdateStatus (slot, t, status) ->
TaskBar.update_status pool.task_bar ~slot t status TaskBar.update_status pool.task_bar ~slot t status
| Crash slot -> | Crash slot ->

Loading…
Cancel
Save