From caa7f9a7d77de1a7063ca4a6a9f60b7b516f7f86 Mon Sep 17 00:00:00 2001 From: Nikos Gorogiannis Date: Mon, 7 Dec 2020 12:37:36 -0800 Subject: [PATCH] [process pool] remove file locking protocol Summary: D17710123 (https://github.com/facebook/infer/commit/ec62fbefb20622e942f7d64be51445c087107723) introduced locking to protect the shared pipe to the originator of the process pool. D20158845 (https://github.com/facebook/infer/commit/a154c8c328ff96b1d174fc11822a1340fbecc554) changed the situation by creating a private pipe to the originator for each worker, so should have removed the locks. Reviewed By: ezgicicek Differential Revision: D25370445 fbshipit-source-id: e5f3e4b00 --- infer/src/base/ProcessPool.ml | 47 +++++++++++++++-------------------- infer/src/base/Utils.ml | 24 ------------------ infer/src/base/Utils.mli | 11 -------- 3 files changed, 20 insertions(+), 62 deletions(-) diff --git a/infer/src/base/ProcessPool.ml b/infer/src/base/ProcessPool.ml index 1e6dcefc6..0aba38e82 100644 --- a/infer/src/base/ProcessPool.ml +++ b/infer/src/base/ProcessPool.ml @@ -72,8 +72,7 @@ type ('work, 'final, 'result) t = ; children_updates: Unix.File_descr.t list (** each child has it's own pipe to send updates to the pool *) ; task_bar: TaskBar.t - ; tasks: ('work, 'result) TaskGenerator.t (** generator for work remaining to be done *) - ; file_lock: Utils.file_lock (** file lock for sending worker messages *) } + ; tasks: ('work, 'result) TaskGenerator.t (** generator for work remaining to be done *) } (** {2 Constants} *) @@ -113,14 +112,12 @@ type 'a boss_message = (** convenience function to send data down pipes without forgetting to flush *) -let marshal_to_pipe ?file_lock fd x = +let marshal_to_pipe fd x = PerfEvent.log (fun logger -> PerfEvent.log_begin_event logger ~categories:["sys"] ~name:"send to pipe" () ) ; - Option.iter file_lock ~f:(fun {Utils.lock} -> lock ()) ; Marshal.to_channel fd x [] ; (* Channel flush should be inside the critical section. *) Out_channel.flush fd ; - Option.iter file_lock ~f:(fun {Utils.unlock} -> unlock ()) ; PerfEvent.(log (fun logger -> log_end_event logger ())) @@ -374,7 +371,7 @@ let rec child_loop ~slot send_to_parent send_final receive_from_parent ~f ~epilo The child inherits [updates_w] to send updates up to the parent, and a new pipe is set up for the parent to send instructions down to the child. *) -let fork_child ~file_lock ~child_prologue ~slot (updates_r, updates_w) ~f ~epilogue = +let fork_child ~child_prologue ~slot (updates_r, updates_w) ~f ~epilogue = let to_child_r, to_child_w = Unix.pipe () in match Unix.fork () with | `In_the_child -> @@ -386,11 +383,9 @@ let fork_child ~file_lock ~child_prologue ~slot (updates_r, updates_w) ~f ~epilo ProcessPoolState.reset_pid () ; child_prologue () ; let updates_oc = Unix.out_channel_of_descr updates_w in - let send_to_parent (message : 'b worker_message) = - marshal_to_pipe ~file_lock updates_oc message - in + let send_to_parent (message : 'b worker_message) = marshal_to_pipe updates_oc message in let send_final (final_message : 'a final_worker_message) = - marshal_to_pipe ~file_lock updates_oc final_message + marshal_to_pipe updates_oc final_message in (* Function to send updates up the pipe to the parent instead of directly to the task bar. This is because only the parent knows about all the children, hence it's in charge of @@ -438,13 +433,12 @@ let create : -> tasks:(unit -> ('work, 'result) TaskGenerator.t) -> ('work, 'final, 'result) t = fun ~jobs ~child_prologue ~f ~child_epilogue ~tasks -> - let file_lock = Utils.create_file_lock () in let task_bar = TaskBar.create ~jobs in let children_pipes = create_pipes jobs in let slots = Array.init jobs ~f:(fun slot -> let child_pipe = List.nth_exn children_pipes slot in - fork_child ~file_lock ~child_prologue ~slot child_pipe ~f ~epilogue:child_epilogue ) + fork_child ~child_prologue ~slot child_pipe ~f ~epilogue:child_epilogue ) in ProcessPoolState.has_running_children := true ; Epilogues.register ~description:"Wait children processes exit" ~f:(fun () -> @@ -455,24 +449,23 @@ let create : (* we have forked the child processes and are now in the parent *) let children_updates = List.map children_pipes ~f:(fun (pipe_child_r, _) -> pipe_child_r) in let children_states = Array.create ~len:jobs Initializing in - {slots; children_updates; jobs; task_bar; tasks= tasks (); children_states; file_lock} + {slots; children_updates; jobs; task_bar; tasks= tasks (); children_states} let run pool = - Utils.with_file_lock ~file_lock:pool.file_lock ~f:(fun () -> - let total_tasks = pool.tasks.remaining_tasks () in - TaskBar.set_tasks_total pool.task_bar total_tasks ; - TaskBar.tasks_done_reset pool.task_bar ; - (* allocate a buffer for reading children updates once for the whole run *) - let buffer = Bytes.create buffer_size in - (* wait for all children to run out of tasks *) - while not (pool.tasks.is_empty () && all_children_idle pool) do - process_updates pool buffer ; - TaskBar.refresh pool.task_bar - done ; - let results = wait_all pool in - TaskBar.finish pool.task_bar ; - results ) + let total_tasks = pool.tasks.remaining_tasks () in + TaskBar.set_tasks_total pool.task_bar total_tasks ; + TaskBar.tasks_done_reset pool.task_bar ; + (* allocate a buffer for reading children updates once for the whole run *) + let buffer = Bytes.create buffer_size in + (* wait for all children to run out of tasks *) + while not (pool.tasks.is_empty () && all_children_idle pool) do + process_updates pool buffer ; + TaskBar.refresh pool.task_bar + done ; + let results = wait_all pool in + TaskBar.finish pool.task_bar ; + results let run pool = diff --git a/infer/src/base/Utils.ml b/infer/src/base/Utils.ml index eb9b7e6d0..d60036fea 100644 --- a/infer/src/base/Utils.ml +++ b/infer/src/base/Utils.ml @@ -238,30 +238,6 @@ let with_file_out file ~f = try_finally_swallow_timeout ~f ~finally -type file_lock = - { file: string - ; oc: Stdlib.out_channel - ; fd: Core.Unix.File_descr.t - ; lock: unit -> unit - ; unlock: unit -> unit } - -let create_file_lock () = - let file, oc = Core.Filename.open_temp_file "infer_lock" "" in - let fd = Core.Unix.openfile ~mode:[IStd.Unix.O_WRONLY] file in - let lock () = Core.Unix.lockf fd ~mode:IStd.Unix.F_LOCK ~len:IStd.Int64.zero in - let unlock () = Core.Unix.lockf fd ~mode:IStd.Unix.F_ULOCK ~len:IStd.Int64.zero in - {file; oc; fd; lock; unlock} - - -let with_file_lock ~file_lock:{file; oc; fd} ~f = - let finally () = - Core.Unix.close fd ; - Out_channel.close oc ; - Core.Unix.remove file - in - try_finally_swallow_timeout ~f ~finally - - let with_intermediate_temp_file_out file ~f = let temp_filename, temp_oc = Filename.open_temp_file ~in_dir:(Filename.dirname file) "infer" "" in let f () = f temp_oc in diff --git a/infer/src/base/Utils.mli b/infer/src/base/Utils.mli index cfc298ae7..e8a42c690 100644 --- a/infer/src/base/Utils.mli +++ b/infer/src/base/Utils.mli @@ -63,17 +63,6 @@ val with_file_in : string -> f:(In_channel.t -> 'a) -> 'a val with_file_out : string -> f:(Out_channel.t -> 'a) -> 'a -type file_lock = - { file: string - ; oc: Stdlib.out_channel - ; fd: Core.Unix.File_descr.t - ; lock: unit -> unit - ; unlock: unit -> unit } - -val create_file_lock : unit -> file_lock - -val with_file_lock : file_lock:file_lock -> f:(unit -> 'a) -> 'a - val with_intermediate_temp_file_out : string -> f:(Out_channel.t -> 'a) -> 'a (** like [with_file_out] but uses a fresh intermediate temporary file and rename to avoid write-write races *)