[sqlite] write-server implementation

Summary:
Implementation of write-serializer for Sqlite.   Points of note:
- A Unix socket is used for communication.  This avoids buffer-size limitations, as the objects we send for writing may exceed said limits.
- No daemon is used if running under buck or in genrule mode, as this usually means a single-threaded job capturing into the DB.
- When the daemon is running, read-only access is *not* enforced for other processes. This makes starting and stopping the daemon during Infer execution easier and more robust.  In WAL mode this should not have any effect on performance.
- This version is not economical with connections, it uses one per query, todo.

Reviewed By: jvillard

Differential Revision: D17077183

fbshipit-source-id: fa9877d6c
master
Nikos Gorogiannis 5 years ago committed by Facebook Github Bot
parent 26823b22f1
commit b8954e714e

@ -1684,6 +1684,10 @@ INTERNAL OPTIONS
--sqlite-vfs-reset --sqlite-vfs-reset
Cancel the effect of --sqlite-vfs. Cancel the effect of --sqlite-vfs.
--sqlite-write-daemon
Activates: Route all DB writes through a daemon process
(Conversely: --no-sqlite-write-daemon)
--starvation-skip-analysis json --starvation-skip-analysis json
Specify combinations of class/method list that should be skipped Specify combinations of class/method list that should be skipped
during starvation analysis during starvation analysis

@ -56,6 +56,8 @@ let add source_file cfg tenv integer_type_widths =
(* NOTE: it's important to write attribute files to disk before writing cfgs to disk. (* NOTE: it's important to write attribute files to disk before writing cfgs to disk.
OndemandCapture module relies on it - it uses existance of the cfg as a barrier to make OndemandCapture module relies on it - it uses existance of the cfg as a barrier to make
sure that all attributes were written to disk (but not necessarily flushed) *) sure that all attributes were written to disk (but not necessarily flushed) *)
if Config.sqlite_write_daemon then Cfg.store source_file cfg
else
SqliteUtils.with_transaction (ResultsDatabase.get_database ()) ~f:(fun () -> SqliteUtils.with_transaction (ResultsDatabase.get_database ()) ~f:(fun () ->
Cfg.store source_file cfg ) ; Cfg.store source_file cfg ) ;
DBWriter.add_source_file DBWriter.add_source_file

@ -6,24 +6,12 @@
*) *)
open! IStd open! IStd
module L = Logging
type 'a doer = 'a -> unit type 'a doer = 'a -> unit
type 'a task_generator = 'a ProcessPool.task_generator type 'a task_generator = 'a ProcessPool.task_generator
let fork_protect ~f x = let fork_protect ~f x = BackendStats.reset () ; ForkUtils.protect ~f x
(* this is needed whenever a new process is started *)
BackendStats.reset () ;
Epilogues.reset () ;
EventLogger.prepare () ;
L.reset_formatters () ;
ResultsDatabase.new_database_connection () ;
(* get different streams of random numbers in each fork, in particular to lessen contention in
`Filename.mk_temp` *)
Random.self_init () ;
f x
module Runner = struct module Runner = struct
type ('work, 'final) t = ('work, 'final) ProcessPool.t type ('work, 'final) t = ('work, 'final) ProcessPool.t

@ -2269,6 +2269,11 @@ and sqlite_vfs =
CLOpt.mk_string_opt ?default ~long:"sqlite-vfs" "VFS for SQLite" CLOpt.mk_string_opt ?default ~long:"sqlite-vfs" "VFS for SQLite"
and sqlite_write_daemon =
CLOpt.mk_bool ~default:false "Route all DB writes through a daemon process"
~long:"sqlite-write-daemon"
and stats_report = and stats_report =
CLOpt.mk_path_opt ~long:"stats-report" ~meta:"file" CLOpt.mk_path_opt ~long:"stats-report" ~meta:"file"
"Write a report of the analysis results to a file" "Write a report of the analysis results to a file"
@ -3176,6 +3181,8 @@ and sqlite_lock_timeout = !sqlite_lock_timeout
and sqlite_vfs = !sqlite_vfs and sqlite_vfs = !sqlite_vfs
and sqlite_write_daemon = !sqlite_write_daemon
and starvation = !starvation and starvation = !starvation
and starvation_skip_analysis = !starvation_skip_analysis and starvation_skip_analysis = !starvation_skip_analysis

@ -652,6 +652,8 @@ val sqlite_lock_timeout : int
val sqlite_vfs : string option val sqlite_vfs : string option
val sqlite_write_daemon : bool
val starvation : bool val starvation : bool
val starvation_skip_analysis : Yojson.Basic.t val starvation_skip_analysis : Yojson.Basic.t

@ -6,6 +6,8 @@
*) *)
open! IStd open! IStd
module L = Logging
module F = Format
module Implementation = struct module Implementation = struct
let attribute_replace_statement = let attribute_replace_statement =
@ -170,6 +172,27 @@ module Command = struct
| MarkAllSourceFilesStale | MarkAllSourceFilesStale
| MergeDBs of {infer_out_src: string} | MergeDBs of {infer_out_src: string}
| Vacuum | Vacuum
| Handshake
| Terminate
let to_string = function
| ReplaceAttributes _ ->
"ReplaceAttributes"
| AddSourceFile _ ->
"AddSourceFile"
| MarkAllSourceFilesStale ->
"MarkAllSourceFilesStale"
| MergeDBs _ ->
"MergeDBs"
| Vacuum ->
"Vacuum"
| Handshake ->
"Handshake"
| Terminate ->
"Terminate"
let pp fmt cmd = F.pp_print_string fmt (to_string cmd)
let execute = function let execute = function
| ReplaceAttributes {pname_str; pname; akind; source_file; attributes; proc_desc; callees} -> | ReplaceAttributes {pname_str; pname; akind; source_file; attributes; proc_desc; callees} ->
@ -183,9 +206,108 @@ module Command = struct
Implementation.merge_dbs ~infer_out_src Implementation.merge_dbs ~infer_out_src
| Vacuum -> | Vacuum ->
Implementation.canonicalize () Implementation.canonicalize ()
| Handshake ->
()
| Terminate ->
()
end
type response = Ack
module Server = struct
(* General comment about socket/channel destruction: closing the in_channel associated with the socket
will close the file descriptor too, so closing also the out_channel sometimes throws an exception.
That's why in all code below only the input channel is ever closed. *)
let socket_name = "sqlite_write_socket"
let socket_addr = Unix.ADDR_UNIX socket_name
let socket_domain = Unix.domain_of_sockaddr socket_addr
(** Unix socket *paths* have a historical length limit of ~100 chars (!?*@&*$). However, this only applies
to the argument passed in the system call to create the socket, not to the actual path.
Thus a workaround is to cd into the parent dir of the socket and then use it, hence this function. *)
let in_results_dir ~f = Utils.do_in_dir ~dir:Config.toplevel_results_dir ~f
let rec server_loop socket =
let client_sock, _client = Unix.accept socket in
let in_channel = Unix.in_channel_of_descr client_sock
and out_channel = Unix.out_channel_of_descr client_sock in
let command : Command.t = Marshal.from_channel in_channel in
L.debug Analysis Verbose "Sqlite write daemon: received command %a@." Command.pp command ;
Command.execute command ;
Marshal.to_channel out_channel Ack [] ;
Out_channel.flush out_channel ;
In_channel.close in_channel ;
L.debug Analysis Verbose "Sqlite write daemon: closing connection@." ;
match command with
| Terminate ->
L.debug Analysis Quiet "Sqlite write daemon: terminating@." ;
()
| _ ->
server_loop socket
let socket_exists () = in_results_dir ~f:(fun () -> Sys.file_exists_exn socket_name)
let server () =
L.debug Analysis Quiet "Sqlite write daemon: starting up@." ;
if socket_exists () then L.die InternalError "Sqlite write daemon: socket already exists@." ;
let socket = Unix.socket ~domain:socket_domain ~kind:Unix.SOCK_STREAM ~protocol:0 in
in_results_dir ~f:(fun () -> Unix.bind socket ~addr:socket_addr) ;
(* [backlog] is (supposedly) the length of the queue for pending connections ;
there are no rules about the implied behaviour though. Here use optimistically
the number of workers, though even that is a guess. *)
Unix.listen socket ~backlog:Config.jobs ;
L.debug Analysis Quiet "Sqlite write daemon: set up complete, waiting for connections@." ;
let shutdown () = in_results_dir ~f:(fun () -> Unix.close socket ; Unix.remove socket_name) in
Utils.try_finally_swallow_timeout ~f:(fun () -> server_loop socket) ~finally:shutdown
let send cmd =
let in_channel, out_channel = in_results_dir ~f:(fun () -> Unix.open_connection socket_addr) in
Marshal.to_channel out_channel cmd [] ;
Out_channel.flush out_channel ;
let (Ack : response) = Marshal.from_channel in_channel in
In_channel.close in_channel
let rec retry ~pred ~timeout count =
if count < 0 then false
else if pred () then true
else (
Unix.nanosleep timeout |> ignore ;
retry ~pred ~timeout (count - 1) )
let start () =
match Unix.fork () with
| `In_the_child ->
ForkUtils.protect ~f:server () ; L.exit 0
| `In_the_parent _child_pid ->
(* wait for socket to appear, try 5 times, with a 0.1 sec timeout each time ;
choice of numbers is completely arbitrary *)
if not (retry ~pred:socket_exists ~timeout:0.1 5) then
L.die InternalError "Sqlite write daemon never started@." ;
send Command.Handshake
end end
let perform cmd = Command.execute cmd let server_running = ref false
let perform cmd = if !server_running then Server.send cmd else Command.execute cmd
let start () =
if not !server_running then (
Server.start () ;
server_running := true )
let stop () =
if !server_running then (
Server.send Command.Terminate ;
server_running := false )
let replace_attributes ~pname_str ~pname ~akind ~source_file ~attributes ~proc_desc ~callees = let replace_attributes ~pname_str ~pname ~akind ~source_file ~attributes ~proc_desc ~callees =
Command.ReplaceAttributes {pname_str; pname; akind; source_file; attributes; proc_desc; callees} Command.ReplaceAttributes {pname_str; pname; akind; source_file; attributes; proc_desc; callees}

@ -31,3 +31,7 @@ val merge_dbs : infer_out_src:string -> unit
val canonicalize : unit -> unit val canonicalize : unit -> unit
(** put the database on disk in deterministic form *) (** put the database on disk in deterministic form *)
val start : unit -> unit
val stop : unit -> unit

@ -0,0 +1,19 @@
(*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* This source code is licensed under the MIT license found in the
* LICENSE file in the root directory of this source tree.
*)
open! IStd
module L = Logging
let protect ~f x =
Epilogues.reset () ;
EventLogger.prepare () ;
L.reset_formatters () ;
ResultsDatabase.new_database_connection () ;
(* get different streams of random numbers in each fork, in particular to lessen contention in
`Filename.mk_temp` *)
Random.self_init () ;
f x

@ -24,16 +24,10 @@ let shell = "sh"
type server = {input: In_channel.t; output: Out_channel.t} type server = {input: In_channel.t; output: Out_channel.t}
(* Unix socket paths have a historical length limit of ~100 chars (!?*@&*$). However, this applies (** Unix socket *paths* have a historical length limit of ~100 chars (!?*@&*$). However, this only applies
to the argument passed in the system call to create the socket. Thus a workaround is to cd into to the argument passed in the system call to create the socket, not to the actual path.
the parent dir of the socket and then create it, hence this function. *) Thus a workaround is to cd into the parent dir of the socket and then use it, hence this function. *)
let in_results_dir ~f = let in_results_dir ~f = Utils.do_in_dir ~dir:results_dir ~f
let cwd = Unix.getcwd () in
let () = Unix.chdir results_dir in
let res = f () in
let () = Unix.chdir cwd in
res
let fail_on response_line = L.die InternalError "Unexpected server response: %s" response_line let fail_on response_line = L.die InternalError "Unexpected server response: %s" response_line

@ -398,3 +398,9 @@ let timeit ~f =
let ret_val = f () in let ret_val = f () in
let duration_ms = Mtime_clock.count start_time |> Mtime.Span.to_ms |> int_of_float in let duration_ms = Mtime_clock.count start_time |> Mtime.Span.to_ms |> int_of_float in
(ret_val, duration_ms) (ret_val, duration_ms)
let do_in_dir ~dir ~f =
let cwd = Unix.getcwd () in
Unix.chdir dir ;
try_finally_swallow_timeout ~f ~finally:(fun () -> Unix.chdir cwd)

@ -136,3 +136,6 @@ val yojson_lookup :
val timeit : f:(unit -> 'a) -> 'a * int val timeit : f:(unit -> 'a) -> 'a * int
(** Returns the execution time of [f] in milliseconds together with its result *) (** Returns the execution time of [f] in milliseconds together with its result *)
val do_in_dir : dir:string -> f:(unit -> 'a) -> 'a
(** executes [f] after cding into [dir] and then restores original cwd *)

@ -576,7 +576,8 @@ let run_prologue mode =
mono-threaded execution. *) mono-threaded execution. *)
Unix.unsetenv "MAKEFLAGS" ; Unix.unsetenv "MAKEFLAGS" ;
(* disable the Buck daemon as changes in the Buck or infer config may be missed otherwise *) (* disable the Buck daemon as changes in the Buck or infer config may be missed otherwise *)
Unix.putenv ~key:"NO_BUCKD" ~data:"1" ) ; Unix.putenv ~key:"NO_BUCKD" ~data:"1" ;
if Config.(sqlite_write_daemon && not (buck || genrule_mode)) then DBWriter.start () ) ;
() ()
@ -587,6 +588,7 @@ let run_prologue mode =
let run_epilogue () = let run_epilogue () =
if CLOpt.is_originator then ( if CLOpt.is_originator then (
if Config.developer_mode then StatsAggregator.generate_files () ; if Config.developer_mode then StatsAggregator.generate_files () ;
if Config.sqlite_write_daemon then DBWriter.stop () ;
if Config.fail_on_bug then fail_on_issue_epilogue () ; if Config.fail_on_bug then fail_on_issue_epilogue () ;
() ) ; () ) ;
if Config.buck_cache_mode then clean_results_dir () ; if Config.buck_cache_mode then clean_results_dir () ;

Loading…
Cancel
Save