[ProcessPool] Prevent workers starvation

Summary: The refactor from using 1 pipe for all worker-to-master communication to `n` (one per worker) introduced the possibility of starving workers because the master process read all the messages from one pipe (refreshing the file descriptors to read from with `Unix.select`) before moving to the next one. These changes aim to prevent that by reading one message from all available pipes before refreshing the file descriptors to read from.

Reviewed By: ngorogiannis

Differential Revision: D20194924

fbshipit-source-id: 91a0fbc47
master
Fernando Gasperi Jabalera 5 years ago committed by Facebook Github Bot
parent 5e7b0caaef
commit 98096f7fb2

@ -147,7 +147,7 @@ let wait_for_updates pool buffer =
match read_fds with match read_fds with
| [] -> | [] ->
(* no updates, break loop *) acc (* no updates, break loop *) acc
| file_descr :: _ -> | _ ->
(* Read one OCaml value at a time. This is done by first reading the header of the marshalled (* Read one OCaml value at a time. This is done by first reading the header of the marshalled
value (fixed size), then get the total size of the data from that header, then request a value (fixed size), then get the total size of the data from that header, then request a
read of the full OCaml value. read of the full OCaml value.
@ -161,10 +161,15 @@ let wait_for_updates pool buffer =
as much as possible eagerly. This can empty the pipe without us having a way to tell that as much as possible eagerly. This can empty the pipe without us having a way to tell that
there is more to read anymore since the [select] call will return that there is nothing to there is more to read anymore since the [select] call will return that there is nothing to
read. *) read. *)
let messages =
(* Read one message from each file descriptor for fairness *)
List.fold read_fds ~init:acc ~f:(fun msgs_acc file_descr ->
really_read file_descr ~buf:buffer ~len:Marshal.header_size ; really_read file_descr ~buf:buffer ~len:Marshal.header_size ;
let data_size = Marshal.data_size buffer 0 in let data_size = Marshal.data_size buffer 0 in
really_read file_descr ~buf:buffer ~pos:Marshal.header_size ~len:data_size ; really_read file_descr ~buf:buffer ~pos:Marshal.header_size ~len:data_size ;
aux (Marshal.from_bytes buffer 0 :: acc) ~timeout:`Immediately Marshal.from_bytes buffer 0 :: msgs_acc )
in
aux messages ~timeout:`Immediately
in in
aux [] ~timeout:refresh_timeout |> List.rev aux [] ~timeout:refresh_timeout |> List.rev

Loading…
Cancel
Save