|
|
|
@ -94,39 +94,60 @@ let create_serializer (key : Key.t) : 'a serializer =
|
|
|
|
|
SymOp.try_finally
|
|
|
|
|
(fun () -> retry_exception ~timeout:1.0 ~catch_exn ~f:read ())
|
|
|
|
|
(fun () -> In_channel.close inc) in
|
|
|
|
|
|
|
|
|
|
let write_file_with_locking ?(delete=false) ~do_write fname =
|
|
|
|
|
let file_descr = Unix.openfile ~mode:[Unix.O_WRONLY; Unix.O_CREAT] fname in
|
|
|
|
|
let outc = Unix.out_channel_of_descr file_descr in
|
|
|
|
|
if Unix.flock file_descr Unix.Flock_command.lock_exclusive
|
|
|
|
|
then
|
|
|
|
|
begin
|
|
|
|
|
do_write outc;
|
|
|
|
|
flush outc;
|
|
|
|
|
ignore (Unix.flock file_descr Unix.Flock_command.unlock);
|
|
|
|
|
end;
|
|
|
|
|
Out_channel.close outc;
|
|
|
|
|
if delete
|
|
|
|
|
then
|
|
|
|
|
try Unix.unlink fname with
|
|
|
|
|
| Unix.Unix_error _ -> () in
|
|
|
|
|
|
|
|
|
|
let write_to_tmp_file fname data =
|
|
|
|
|
let fname_tmp = Filename.temp_file
|
|
|
|
|
~in_dir:(Filename.dirname fname) (Filename.basename fname) ".tmp" in
|
|
|
|
|
write_file_with_locking
|
|
|
|
|
fname_tmp
|
|
|
|
|
~do_write:(fun outc -> Marshal.to_channel outc (key, version, data) []);
|
|
|
|
|
fname_tmp in
|
|
|
|
|
|
|
|
|
|
(* The .lock file is used to synchronize the writers.
|
|
|
|
|
Once a lock on `file.lock` is obtained, the new data is written into it
|
|
|
|
|
Once a lock on `file.lock` is obtained, the new data is written into a temporary file
|
|
|
|
|
and rename is used to move it atomically to `file` *)
|
|
|
|
|
let execute_write_command_with_lock (fname : DB.filename) (cmd : 'a write_command) =
|
|
|
|
|
let fname_str = DB.filename_to_string fname in
|
|
|
|
|
let fname_str_lock = fname_str ^ ".lock" in
|
|
|
|
|
let file_descr_lock = Unix.openfile ~mode:[Unix.O_WRONLY; Unix.O_CREAT] fname_str_lock in
|
|
|
|
|
if (Unix.flock file_descr_lock Unix.Flock_command.lock_exclusive)
|
|
|
|
|
then
|
|
|
|
|
begin
|
|
|
|
|
let (data_to_write : 'a) = match cmd with
|
|
|
|
|
| Replace data ->
|
|
|
|
|
data
|
|
|
|
|
| Update upd ->
|
|
|
|
|
let old_data_opt =
|
|
|
|
|
if DB.file_exists fname
|
|
|
|
|
then
|
|
|
|
|
(* Because of locking, this should be the latest data written
|
|
|
|
|
by any writer, and can be used for updating *)
|
|
|
|
|
read_from_file fname
|
|
|
|
|
else
|
|
|
|
|
None in
|
|
|
|
|
upd old_data_opt in
|
|
|
|
|
|
|
|
|
|
let outc_lock = Unix.out_channel_of_descr file_descr_lock in
|
|
|
|
|
Marshal.to_channel outc_lock (key, version, data_to_write) [];
|
|
|
|
|
flush outc_lock;
|
|
|
|
|
(* Rename is atomic: the readers can only see one version of this file,
|
|
|
|
|
possibly stale but not corrupted. *)
|
|
|
|
|
Unix.rename ~src:fname_str_lock ~dst:fname_str;
|
|
|
|
|
ignore (Unix.flock file_descr_lock Unix.Flock_command.unlock);
|
|
|
|
|
Out_channel.close outc_lock
|
|
|
|
|
end in
|
|
|
|
|
|
|
|
|
|
write_file_with_locking
|
|
|
|
|
fname_str_lock
|
|
|
|
|
~delete:true
|
|
|
|
|
~do_write:(fun _outc ->
|
|
|
|
|
let (data_to_write : 'a) = match cmd with
|
|
|
|
|
| Replace data ->
|
|
|
|
|
data
|
|
|
|
|
| Update upd ->
|
|
|
|
|
let old_data_opt =
|
|
|
|
|
if DB.file_exists fname
|
|
|
|
|
then
|
|
|
|
|
(* Because of locking, this should be the latest data written
|
|
|
|
|
by any writer, and can be used for updating *)
|
|
|
|
|
read_from_file fname
|
|
|
|
|
else
|
|
|
|
|
None in
|
|
|
|
|
upd old_data_opt in
|
|
|
|
|
|
|
|
|
|
let fname_str_tmp = write_to_tmp_file fname_str data_to_write in
|
|
|
|
|
(* Rename is atomic: the readers can only see one version of this file,
|
|
|
|
|
possibly stale but not corrupted. *)
|
|
|
|
|
Unix.rename ~src:fname_str_tmp ~dst:fname_str) in
|
|
|
|
|
let write_to_file ~(data : 'a) (fname : DB.filename) =
|
|
|
|
|
execute_write_command_with_lock fname (Replace data) in
|
|
|
|
|
let update_file ~f (fname : DB.filename) =
|
|
|
|
|