From b8954e714e319d105af8e4688a9cc146704ae80e Mon Sep 17 00:00:00 2001 From: Nikos Gorogiannis Date: Mon, 2 Sep 2019 05:18:08 -0700 Subject: [PATCH] [sqlite] write-server implementation Summary: Implementation of write-serializer for Sqlite. Points of note: - A Unix socket is used for communication. This avoids buffer-size limitations, as the objects we send for writing may exceed said limits. - No daemon is used if running under buck or in genrule mode, as this usually means a single-threaded job capturing into the DB. - When the daemon is running, read-only access is *not* enforced for other processes. This makes starting and stopping the daemon during Infer execution easier and more robust. In WAL mode this should not have any effect on performance. - This version is not economical with connections, it uses one per query, todo. Reviewed By: jvillard Differential Revision: D17077183 fbshipit-source-id: fa9877d6c --- infer/man/man1/infer-full.txt | 4 ++ infer/src/IR/SourceFiles.ml | 6 +- infer/src/backend/Tasks.ml | 14 +--- infer/src/base/Config.ml | 7 ++ infer/src/base/Config.mli | 2 + infer/src/base/DBWriter.ml | 124 +++++++++++++++++++++++++++++++- infer/src/base/DBWriter.mli | 4 ++ infer/src/base/ForkUtils.ml | 19 +++++ infer/src/base/Memcached.ml | 14 ++-- infer/src/base/Utils.ml | 6 ++ infer/src/base/Utils.mli | 3 + infer/src/integration/Driver.ml | 4 +- 12 files changed, 180 insertions(+), 27 deletions(-) create mode 100644 infer/src/base/ForkUtils.ml diff --git a/infer/man/man1/infer-full.txt b/infer/man/man1/infer-full.txt index ffbee92a8..6f538e551 100644 --- a/infer/man/man1/infer-full.txt +++ b/infer/man/man1/infer-full.txt @@ -1684,6 +1684,10 @@ INTERNAL OPTIONS --sqlite-vfs-reset Cancel the effect of --sqlite-vfs. + --sqlite-write-daemon + Activates: Route all DB writes through a daemon process + (Conversely: --no-sqlite-write-daemon) + --starvation-skip-analysis json Specify combinations of class/method list that should be skipped during starvation analysis diff --git a/infer/src/IR/SourceFiles.ml b/infer/src/IR/SourceFiles.ml index 4c7194e72..d0b8ceb67 100644 --- a/infer/src/IR/SourceFiles.ml +++ b/infer/src/IR/SourceFiles.ml @@ -56,8 +56,10 @@ let add source_file cfg tenv integer_type_widths = (* NOTE: it's important to write attribute files to disk before writing cfgs to disk. OndemandCapture module relies on it - it uses existance of the cfg as a barrier to make sure that all attributes were written to disk (but not necessarily flushed) *) - SqliteUtils.with_transaction (ResultsDatabase.get_database ()) ~f:(fun () -> - Cfg.store source_file cfg ) ; + if Config.sqlite_write_daemon then Cfg.store source_file cfg + else + SqliteUtils.with_transaction (ResultsDatabase.get_database ()) ~f:(fun () -> + Cfg.store source_file cfg ) ; DBWriter.add_source_file ~source_file:(SourceFile.SQLite.serialize source_file) ~tenv:(Tenv.SQLite.serialize tenv) diff --git a/infer/src/backend/Tasks.ml b/infer/src/backend/Tasks.ml index f8bd15dac..ab7582d21 100644 --- a/infer/src/backend/Tasks.ml +++ b/infer/src/backend/Tasks.ml @@ -6,24 +6,12 @@ *) open! IStd -module L = Logging type 'a doer = 'a -> unit type 'a task_generator = 'a ProcessPool.task_generator -let fork_protect ~f x = - (* this is needed whenever a new process is started *) - BackendStats.reset () ; - Epilogues.reset () ; - EventLogger.prepare () ; - L.reset_formatters () ; - ResultsDatabase.new_database_connection () ; - (* get different streams of random numbers in each fork, in particular to lessen contention in - `Filename.mk_temp` *) - Random.self_init () ; - f x - +let fork_protect ~f x = BackendStats.reset () ; ForkUtils.protect ~f x module Runner = struct type ('work, 'final) t = ('work, 'final) ProcessPool.t diff --git a/infer/src/base/Config.ml b/infer/src/base/Config.ml index 7c2203c7b..bf289adf9 100644 --- a/infer/src/base/Config.ml +++ b/infer/src/base/Config.ml @@ -2269,6 +2269,11 @@ and sqlite_vfs = CLOpt.mk_string_opt ?default ~long:"sqlite-vfs" "VFS for SQLite" +and sqlite_write_daemon = + CLOpt.mk_bool ~default:false "Route all DB writes through a daemon process" + ~long:"sqlite-write-daemon" + + and stats_report = CLOpt.mk_path_opt ~long:"stats-report" ~meta:"file" "Write a report of the analysis results to a file" @@ -3176,6 +3181,8 @@ and sqlite_lock_timeout = !sqlite_lock_timeout and sqlite_vfs = !sqlite_vfs +and sqlite_write_daemon = !sqlite_write_daemon + and starvation = !starvation and starvation_skip_analysis = !starvation_skip_analysis diff --git a/infer/src/base/Config.mli b/infer/src/base/Config.mli index 21933d1f9..63f44123e 100644 --- a/infer/src/base/Config.mli +++ b/infer/src/base/Config.mli @@ -652,6 +652,8 @@ val sqlite_lock_timeout : int val sqlite_vfs : string option +val sqlite_write_daemon : bool + val starvation : bool val starvation_skip_analysis : Yojson.Basic.t diff --git a/infer/src/base/DBWriter.ml b/infer/src/base/DBWriter.ml index fa0a87f08..4d2fcc03f 100644 --- a/infer/src/base/DBWriter.ml +++ b/infer/src/base/DBWriter.ml @@ -6,6 +6,8 @@ *) open! IStd +module L = Logging +module F = Format module Implementation = struct let attribute_replace_statement = @@ -170,6 +172,27 @@ module Command = struct | MarkAllSourceFilesStale | MergeDBs of {infer_out_src: string} | Vacuum + | Handshake + | Terminate + + let to_string = function + | ReplaceAttributes _ -> + "ReplaceAttributes" + | AddSourceFile _ -> + "AddSourceFile" + | MarkAllSourceFilesStale -> + "MarkAllSourceFilesStale" + | MergeDBs _ -> + "MergeDBs" + | Vacuum -> + "Vacuum" + | Handshake -> + "Handshake" + | Terminate -> + "Terminate" + + + let pp fmt cmd = F.pp_print_string fmt (to_string cmd) let execute = function | ReplaceAttributes {pname_str; pname; akind; source_file; attributes; proc_desc; callees} -> @@ -183,9 +206,108 @@ module Command = struct Implementation.merge_dbs ~infer_out_src | Vacuum -> Implementation.canonicalize () + | Handshake -> + () + | Terminate -> + () end -let perform cmd = Command.execute cmd +type response = Ack + +module Server = struct + (* General comment about socket/channel destruction: closing the in_channel associated with the socket + will close the file descriptor too, so closing also the out_channel sometimes throws an exception. + That's why in all code below only the input channel is ever closed. *) + + let socket_name = "sqlite_write_socket" + + let socket_addr = Unix.ADDR_UNIX socket_name + + let socket_domain = Unix.domain_of_sockaddr socket_addr + + (** Unix socket *paths* have a historical length limit of ~100 chars (!?*@&*$). However, this only applies + to the argument passed in the system call to create the socket, not to the actual path. + Thus a workaround is to cd into the parent dir of the socket and then use it, hence this function. *) + let in_results_dir ~f = Utils.do_in_dir ~dir:Config.toplevel_results_dir ~f + + let rec server_loop socket = + let client_sock, _client = Unix.accept socket in + let in_channel = Unix.in_channel_of_descr client_sock + and out_channel = Unix.out_channel_of_descr client_sock in + let command : Command.t = Marshal.from_channel in_channel in + L.debug Analysis Verbose "Sqlite write daemon: received command %a@." Command.pp command ; + Command.execute command ; + Marshal.to_channel out_channel Ack [] ; + Out_channel.flush out_channel ; + In_channel.close in_channel ; + L.debug Analysis Verbose "Sqlite write daemon: closing connection@." ; + match command with + | Terminate -> + L.debug Analysis Quiet "Sqlite write daemon: terminating@." ; + () + | _ -> + server_loop socket + + + let socket_exists () = in_results_dir ~f:(fun () -> Sys.file_exists_exn socket_name) + + let server () = + L.debug Analysis Quiet "Sqlite write daemon: starting up@." ; + if socket_exists () then L.die InternalError "Sqlite write daemon: socket already exists@." ; + let socket = Unix.socket ~domain:socket_domain ~kind:Unix.SOCK_STREAM ~protocol:0 in + in_results_dir ~f:(fun () -> Unix.bind socket ~addr:socket_addr) ; + (* [backlog] is (supposedly) the length of the queue for pending connections ; + there are no rules about the implied behaviour though. Here use optimistically + the number of workers, though even that is a guess. *) + Unix.listen socket ~backlog:Config.jobs ; + L.debug Analysis Quiet "Sqlite write daemon: set up complete, waiting for connections@." ; + let shutdown () = in_results_dir ~f:(fun () -> Unix.close socket ; Unix.remove socket_name) in + Utils.try_finally_swallow_timeout ~f:(fun () -> server_loop socket) ~finally:shutdown + + + let send cmd = + let in_channel, out_channel = in_results_dir ~f:(fun () -> Unix.open_connection socket_addr) in + Marshal.to_channel out_channel cmd [] ; + Out_channel.flush out_channel ; + let (Ack : response) = Marshal.from_channel in_channel in + In_channel.close in_channel + + + let rec retry ~pred ~timeout count = + if count < 0 then false + else if pred () then true + else ( + Unix.nanosleep timeout |> ignore ; + retry ~pred ~timeout (count - 1) ) + + + let start () = + match Unix.fork () with + | `In_the_child -> + ForkUtils.protect ~f:server () ; L.exit 0 + | `In_the_parent _child_pid -> + (* wait for socket to appear, try 5 times, with a 0.1 sec timeout each time ; + choice of numbers is completely arbitrary *) + if not (retry ~pred:socket_exists ~timeout:0.1 5) then + L.die InternalError "Sqlite write daemon never started@." ; + send Command.Handshake +end + +let server_running = ref false + +let perform cmd = if !server_running then Server.send cmd else Command.execute cmd + +let start () = + if not !server_running then ( + Server.start () ; + server_running := true ) + + +let stop () = + if !server_running then ( + Server.send Command.Terminate ; + server_running := false ) + let replace_attributes ~pname_str ~pname ~akind ~source_file ~attributes ~proc_desc ~callees = Command.ReplaceAttributes {pname_str; pname; akind; source_file; attributes; proc_desc; callees} diff --git a/infer/src/base/DBWriter.mli b/infer/src/base/DBWriter.mli index e7fefc32f..344732242 100644 --- a/infer/src/base/DBWriter.mli +++ b/infer/src/base/DBWriter.mli @@ -31,3 +31,7 @@ val merge_dbs : infer_out_src:string -> unit val canonicalize : unit -> unit (** put the database on disk in deterministic form *) + +val start : unit -> unit + +val stop : unit -> unit diff --git a/infer/src/base/ForkUtils.ml b/infer/src/base/ForkUtils.ml new file mode 100644 index 000000000..3b6c2cb3d --- /dev/null +++ b/infer/src/base/ForkUtils.ml @@ -0,0 +1,19 @@ +(* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * This source code is licensed under the MIT license found in the + * LICENSE file in the root directory of this source tree. + *) + +open! IStd +module L = Logging + +let protect ~f x = + Epilogues.reset () ; + EventLogger.prepare () ; + L.reset_formatters () ; + ResultsDatabase.new_database_connection () ; + (* get different streams of random numbers in each fork, in particular to lessen contention in + `Filename.mk_temp` *) + Random.self_init () ; + f x diff --git a/infer/src/base/Memcached.ml b/infer/src/base/Memcached.ml index 9eb0cd003..d24e66ace 100644 --- a/infer/src/base/Memcached.ml +++ b/infer/src/base/Memcached.ml @@ -24,16 +24,10 @@ let shell = "sh" type server = {input: In_channel.t; output: Out_channel.t} -(* Unix socket paths have a historical length limit of ~100 chars (!?*@&*$). However, this applies - to the argument passed in the system call to create the socket. Thus a workaround is to cd into - the parent dir of the socket and then create it, hence this function. *) -let in_results_dir ~f = - let cwd = Unix.getcwd () in - let () = Unix.chdir results_dir in - let res = f () in - let () = Unix.chdir cwd in - res - +(** Unix socket *paths* have a historical length limit of ~100 chars (!?*@&*$). However, this only applies + to the argument passed in the system call to create the socket, not to the actual path. + Thus a workaround is to cd into the parent dir of the socket and then use it, hence this function. *) +let in_results_dir ~f = Utils.do_in_dir ~dir:results_dir ~f let fail_on response_line = L.die InternalError "Unexpected server response: %s" response_line diff --git a/infer/src/base/Utils.ml b/infer/src/base/Utils.ml index bae4d0337..e41863fed 100644 --- a/infer/src/base/Utils.ml +++ b/infer/src/base/Utils.ml @@ -398,3 +398,9 @@ let timeit ~f = let ret_val = f () in let duration_ms = Mtime_clock.count start_time |> Mtime.Span.to_ms |> int_of_float in (ret_val, duration_ms) + + +let do_in_dir ~dir ~f = + let cwd = Unix.getcwd () in + Unix.chdir dir ; + try_finally_swallow_timeout ~f ~finally:(fun () -> Unix.chdir cwd) diff --git a/infer/src/base/Utils.mli b/infer/src/base/Utils.mli index e3fe74dc8..116056f81 100644 --- a/infer/src/base/Utils.mli +++ b/infer/src/base/Utils.mli @@ -136,3 +136,6 @@ val yojson_lookup : val timeit : f:(unit -> 'a) -> 'a * int (** Returns the execution time of [f] in milliseconds together with its result *) + +val do_in_dir : dir:string -> f:(unit -> 'a) -> 'a +(** executes [f] after cding into [dir] and then restores original cwd *) diff --git a/infer/src/integration/Driver.ml b/infer/src/integration/Driver.ml index 21206bbfb..4a650611f 100644 --- a/infer/src/integration/Driver.ml +++ b/infer/src/integration/Driver.ml @@ -576,7 +576,8 @@ let run_prologue mode = mono-threaded execution. *) Unix.unsetenv "MAKEFLAGS" ; (* disable the Buck daemon as changes in the Buck or infer config may be missed otherwise *) - Unix.putenv ~key:"NO_BUCKD" ~data:"1" ) ; + Unix.putenv ~key:"NO_BUCKD" ~data:"1" ; + if Config.(sqlite_write_daemon && not (buck || genrule_mode)) then DBWriter.start () ) ; () @@ -587,6 +588,7 @@ let run_prologue mode = let run_epilogue () = if CLOpt.is_originator then ( if Config.developer_mode then StatsAggregator.generate_files () ; + if Config.sqlite_write_daemon then DBWriter.stop () ; if Config.fail_on_bug then fail_on_issue_epilogue () ; () ) ; if Config.buck_cache_mode then clean_results_dir () ;