From 98096f7fb27198f754c099dff485796f38ec4b84 Mon Sep 17 00:00:00 2001 From: Fernando Gasperi Jabalera Date: Wed, 4 Mar 2020 13:09:56 -0800 Subject: [PATCH] [ProcessPool] Prevent workers starvation Summary: The refactor from using 1 pipe for all worker-to-master communication to `n` (one per worker) introduced the possibility of starving workers because the master process read all the messages from one pipe (refreshing the file descriptors to read from with `Unix.select`) before moving to the next one. These changes aim to prevent that by reading one message from all available pipes before refreshing the file descriptors to read from. Reviewed By: ngorogiannis Differential Revision: D20194924 fbshipit-source-id: 91a0fbc47 --- infer/src/base/ProcessPool.ml | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/infer/src/base/ProcessPool.ml b/infer/src/base/ProcessPool.ml index cdf11690b..05086737b 100644 --- a/infer/src/base/ProcessPool.ml +++ b/infer/src/base/ProcessPool.ml @@ -147,7 +147,7 @@ let wait_for_updates pool buffer = match read_fds with | [] -> (* no updates, break loop *) acc - | file_descr :: _ -> + | _ -> (* Read one OCaml value at a time. This is done by first reading the header of the marshalled value (fixed size), then get the total size of the data from that header, then request a read of the full OCaml value. @@ -161,10 +161,15 @@ let wait_for_updates pool buffer = as much as possible eagerly. This can empty the pipe without us having a way to tell that there is more to read anymore since the [select] call will return that there is nothing to read. *) - really_read file_descr ~buf:buffer ~len:Marshal.header_size ; - let data_size = Marshal.data_size buffer 0 in - really_read file_descr ~buf:buffer ~pos:Marshal.header_size ~len:data_size ; - aux (Marshal.from_bytes buffer 0 :: acc) ~timeout:`Immediately + let messages = + (* Read one message from each file descriptor for fairness *) + List.fold read_fds ~init:acc ~f:(fun msgs_acc file_descr -> + really_read file_descr ~buf:buffer ~len:Marshal.header_size ; + let data_size = Marshal.data_size buffer 0 in + really_read file_descr ~buf:buffer ~pos:Marshal.header_size ~len:data_size ; + Marshal.from_bytes buffer 0 :: msgs_acc ) + in + aux messages ~timeout:`Immediately in aux [] ~timeout:refresh_timeout |> List.rev