diff --git a/infer/src/base/ProcessPool.ml b/infer/src/base/ProcessPool.ml index 294b2c10a..51851d62b 100644 --- a/infer/src/base/ProcessPool.ml +++ b/infer/src/base/ProcessPool.ml @@ -38,7 +38,8 @@ type ('work, 'final) t = ; children_updates: Unix.File_descr.t (** all the children send updates up the same pipe to the pool *) ; task_bar: TaskBar.t - ; tasks: 'work task_generator (** generator for work remaining to be done *) } + ; tasks: 'work task_generator (** generator for work remaining to be done *) + ; file_lock: Utils.file_lock (** file lock for sending worker messages *) } (** {2 Constants} *) @@ -78,10 +79,13 @@ type 'a boss_message = | GoHome (** all tasks done, prepare for teardown *) (** convenience function to send data down pipes without forgetting to flush *) -let marshal_to_pipe fd x = + +let marshal_to_pipe ?file_lock fd x = PerfEvent.log (fun logger -> PerfEvent.log_begin_event logger ~categories:["sys"] ~name:"send to pipe" () ) ; + Option.iter file_lock ~f:(fun {Utils.lock} -> lock ()) ; Marshal.to_channel fd x [] ; + Option.iter file_lock ~f:(fun {Utils.unlock} -> unlock ()) ; Out_channel.flush fd ; PerfEvent.(log (fun logger -> log_end_event logger ())) @@ -324,7 +328,7 @@ let rec child_loop ~slot send_to_parent send_final receive_from_parent ~f ~epilo The child inherits [updates_w] to send updates up to the parent, and a new pipe is set up for the parent to send instructions down to the child. *) -let fork_child ~child_prelude ~slot (updates_r, updates_w) ~f ~epilogue = +let fork_child ~file_lock ~child_prelude ~slot (updates_r, updates_w) ~f ~epilogue = let to_child_r, to_child_w = Unix.pipe () in match Unix.fork () with | `In_the_child -> @@ -336,9 +340,11 @@ let fork_child ~child_prelude ~slot (updates_r, updates_w) ~f ~epilogue = ProcessPoolState.reset_pid () ; child_prelude () ; let updates_oc = Unix.out_channel_of_descr updates_w in - let send_to_parent (message : worker_message) = marshal_to_pipe updates_oc message in + let send_to_parent (message : worker_message) = + marshal_to_pipe ~file_lock updates_oc message + in let send_final (final_message : 'a final_worker_message) = - marshal_to_pipe updates_oc final_message + marshal_to_pipe ~file_lock updates_oc final_message in (* Function to send updates up the pipe to the parent instead of directly to the task bar. This is because only the parent knows about all the children, hence it's in charge of @@ -379,6 +385,7 @@ let create : -> tasks:'work task_generator -> ('work, 'final) t = fun ~jobs ~child_prelude ~f ~child_epilogue ~tasks -> + let file_lock = Utils.create_file_lock () in let task_bar = TaskBar.create ~jobs in (* Pipe to communicate from children to parent. Only one pipe is needed: the messages sent by children include the identifier of the child sending the message (its [slot]). This way there @@ -386,27 +393,28 @@ let create : let ((pipe_child_r, pipe_child_w) as status_pipe) = Unix.pipe () in let slots = Array.init jobs ~f:(fun slot -> - fork_child ~child_prelude ~slot status_pipe ~f ~epilogue:child_epilogue ) + fork_child ~file_lock ~child_prelude ~slot status_pipe ~f ~epilogue:child_epilogue ) in (* we have forked the child processes and are now in the parent *) let[@warning "-26"] pipe_child_w = Unix.close pipe_child_w in let children_updates = pipe_child_r in let children_states = Array.create ~len:jobs Initializing in - {slots; children_updates; jobs; task_bar; tasks; children_states} + {slots; children_updates; jobs; task_bar; tasks; children_states; file_lock} let run pool = - let total_tasks = pool.tasks.remaining_tasks () in - TaskBar.set_tasks_total pool.task_bar total_tasks ; - TaskBar.tasks_done_reset pool.task_bar ; - (* allocate a buffer for reading children updates once for the whole run *) - let buffer = Bytes.create buffer_size in - (* wait for all children to run out of tasks *) - while not (pool.tasks.is_empty () && all_children_idle pool) do - process_updates pool buffer ; TaskBar.refresh pool.task_bar - done ; - let results = wait_all pool in - TaskBar.finish pool.task_bar ; results + Utils.with_file_lock ~file_lock:pool.file_lock ~f:(fun () -> + let total_tasks = pool.tasks.remaining_tasks () in + TaskBar.set_tasks_total pool.task_bar total_tasks ; + TaskBar.tasks_done_reset pool.task_bar ; + (* allocate a buffer for reading children updates once for the whole run *) + let buffer = Bytes.create buffer_size in + (* wait for all children to run out of tasks *) + while not (pool.tasks.is_empty () && all_children_idle pool) do + process_updates pool buffer ; TaskBar.refresh pool.task_bar + done ; + let results = wait_all pool in + TaskBar.finish pool.task_bar ; results ) let run pool = diff --git a/infer/src/base/Utils.ml b/infer/src/base/Utils.ml index 88f22ca19..3c16fa451 100644 --- a/infer/src/base/Utils.ml +++ b/infer/src/base/Utils.ml @@ -191,6 +191,26 @@ let with_file_out file ~f = try_finally_swallow_timeout ~f ~finally +type file_lock = + { file: string + ; oc: Pervasives.out_channel + ; fd: Core.Unix.File_descr.t + ; lock: unit -> unit + ; unlock: unit -> unit } + +let create_file_lock () = + let file, oc = Core.Filename.open_temp_file "infer" "" in + let fd = Core.Unix.openfile ~mode:[IStd.Unix.O_WRONLY] file in + let lock () = Core.Unix.lockf fd ~mode:IStd.Unix.F_LOCK ~len:IStd.Int64.zero in + let unlock () = Core.Unix.lockf fd ~mode:IStd.Unix.F_ULOCK ~len:IStd.Int64.zero in + {file; oc; fd; lock; unlock} + + +let with_file_lock ~file_lock:{file; oc; fd} ~f = + let finally () = Core.Unix.close fd ; Out_channel.close oc ; Core.Unix.remove file in + try_finally_swallow_timeout ~f ~finally + + let with_intermediate_temp_file_out file ~f = let temp_filename, temp_oc = Filename.open_temp_file ~in_dir:(Filename.dirname file) "infer" "" diff --git a/infer/src/base/Utils.mli b/infer/src/base/Utils.mli index e4c91fa30..bea6ccbbe 100644 --- a/infer/src/base/Utils.mli +++ b/infer/src/base/Utils.mli @@ -56,6 +56,17 @@ val with_file_in : string -> f:(In_channel.t -> 'a) -> 'a val with_file_out : string -> f:(Out_channel.t -> 'a) -> 'a +type file_lock = + { file: string + ; oc: Pervasives.out_channel + ; fd: Core.Unix.File_descr.t + ; lock: unit -> unit + ; unlock: unit -> unit } + +val create_file_lock : unit -> file_lock + +val with_file_lock : file_lock:file_lock -> f:(unit -> 'a) -> 'a + val with_intermediate_temp_file_out : string -> f:(Out_channel.t -> 'a) -> 'a (** like [with_file_out] but uses a fresh intermediate temporary file and rename to avoid write-write races *)