diff --git a/src/dune_scheduler/event.ml b/src/dune_scheduler/event.ml index bc971926580..be68892b9f9 100644 --- a/src/dune_scheduler/event.ml +++ b/src/dune_scheduler/event.ml @@ -19,7 +19,6 @@ type build_input_change = | Invalidation of Memo.Invalidation.t type t = - | File_watcher_task of (unit -> File_watcher.Event.t list) | Build_inputs_changed of build_input_change Nonempty_list.t | File_system_sync of File_watcher.Sync_id.t | File_system_watcher_terminated @@ -38,7 +37,6 @@ module Queue = struct type t = { jobs_completed : (job * Proc.Process_info.t) Queue.t - ; file_watcher_tasks : (unit -> File_watcher.Event.t list) Queue.t ; mutable invalidation_events : Invalidation_event.t list ; mutable shutdown_reasons : Shutdown.Reason.Set.t ; mutex : Mutex.t @@ -60,7 +58,6 @@ module Queue = struct let create () = let jobs_completed = Queue.create () in - let file_watcher_tasks = Queue.create () in let worker_tasks_completed = Queue.create () in let invalidation_events = [] in let shutdown_reasons = Shutdown.Reason.Set.empty in @@ -69,7 +66,6 @@ module Queue = struct let pending_jobs = 0 in let pending_worker_tasks = 0 in { jobs_completed - ; file_watcher_tasks ; invalidation_events ; shutdown_reasons ; mutex @@ -140,10 +136,6 @@ module Queue = struct Some Job_complete_ready ;; - let file_watcher_task q = - Option.map (Queue.pop q.file_watcher_tasks) ~f:(fun job -> File_watcher_task job) - ;; - let invalidation q = match q.invalidation_events with | [] -> None @@ -206,13 +198,12 @@ module Queue = struct let events_in_order = (* Event sources are listed in priority order. Signals are the highest priority to maximize responsiveness to Ctrl+C. - [file_watcher_task], [worker_tasks_completed] and [invalidation] are - used for reacting to user input, so their latency is also important. + [invalidation] and [worker_tasks_completed] are used for reacting to + user input, so their latency is also important. [jobs_completed] and [yield] are where the bulk of the work is done, so they are the lowest priority to avoid starving other things. *) Event_source. [ shutdown - ; file_watcher_task ; invalidation ; worker_tasks_completed ; (if Sys.win32 then jobs_completed else job_complete_ready) @@ -280,10 +271,6 @@ module Queue = struct q.shutdown_reasons <- Shutdown.Reason.Set.add q.shutdown_reasons signal) ;; - let send_file_watcher_task q job = - add_event q (fun q -> Queue.push q.file_watcher_tasks job) - ;; - let pending_jobs q = q.pending_jobs let pending_worker_tasks q = q.pending_worker_tasks end diff --git a/src/dune_scheduler/event.mli b/src/dune_scheduler/event.mli index 90bb863fea5..54983b4dba8 100644 --- a/src/dune_scheduler/event.mli +++ b/src/dune_scheduler/event.mli @@ -13,7 +13,6 @@ type build_input_change = | Invalidation of Memo.Invalidation.t type t = - | File_watcher_task of (unit -> File_watcher.Event.t list) | Build_inputs_changed of build_input_change Nonempty_list.t | File_system_sync of File_watcher.Sync_id.t | File_system_watcher_terminated @@ -48,13 +47,7 @@ module Queue : sig val send_worker_tasks_completed : t -> Fiber.fill list -> unit val register_worker_task_started : t -> unit val cancel_work_task_started : t -> unit - val send_file_watcher_task : t -> (unit -> File_watcher.Event.t list) -> unit - - (** It's a bit weird to have both this and [send_file_watcher_task]. The - reason is that [send_file_watcher_task] uses [send_file_watcher_events] - internally. *) val send_file_watcher_events : t -> File_watcher.Event.t list -> unit - val send_invalidation_event : t -> Memo.Invalidation.t -> unit val send_job_completed : t -> job -> Proc.Process_info.t -> unit val send_job_completed_ready : t -> unit diff --git a/src/dune_scheduler/file_watcher.ml b/src/dune_scheduler/file_watcher.ml index 5524175ba9f..46506e6550b 100644 --- a/src/dune_scheduler/file_watcher.ml +++ b/src/dune_scheduler/file_watcher.ml @@ -48,7 +48,7 @@ module Event = struct end module Scheduler = struct - type t = { thread_safe_send_emit_events_job : (unit -> Event.t list) -> unit } + type t = { thread_safe_send_events : Event.t list -> unit } end module Watch_trie : sig @@ -141,9 +141,18 @@ type kind = | Inotify of Inotify.t | Fswatch_win of { t : Fswatch_win.t } +type sync_table = + { table : (string, Sync_id.t) Table.t + ; mutex : Mutex.t + } + +let create_sync_table () = + { table = Table.create (module String) 64; mutex = Mutex.create () } +;; + type t = { kind : kind - ; sync_table : (string, Sync_id.t) Table.t + ; sync_table : sync_table (* Pending fs sync operations indexed by the special sync filename. *) } @@ -215,24 +224,31 @@ let shutdown t = module Fs_sync : sig val special_dir_path : Path.Build.t Lazy.t val special_dir : string Lazy.t - val emit : t -> Sync_id.t + val emit : sync_table -> Sync_id.t val is_special_file : path_as_reported_by_file_watcher:string -> bool (** fsevents always reports absolute paths. therefore, we need callers to make an effort to determine if an absolute path is in fact in the build dir *) val is_special_file_fsevents : Path.t -> bool - val consume_event : (string, 'a) Table.t -> string -> 'a option + val consume_event : sync_table -> string -> Sync_id.t option end = struct let special_dir_path = lazy (Path.Build.relative Path.Build.root ".sync") let special_dir = lazy (Lazy.force special_dir_path |> Path.Build.to_string) - let emit t = + let emit sync_table = let id = Sync_id.gen () in let fn = id |> Sync_id.to_int |> string_of_int in let path = Filename.concat (Lazy.force special_dir) fn in - Unix.close (Unix.openfile path [ O_WRONLY; O_CREAT; O_TRUNC; O_CLOEXEC ] 0o666); - Table.set t.sync_table fn id; + Mutex.protect sync_table.mutex (fun () -> + Table.set sync_table.table fn id; + match + Unix.close (Unix.openfile path [ O_WRONLY; O_CREAT; O_TRUNC; O_CLOEXEC ] 0o666) + with + | () -> () + | exception exn -> + Table.remove sync_table.table fn; + Exn.reraise exn); id ;; @@ -243,14 +259,15 @@ end = struct Filename.dirname path_as_reported_by_file_watcher = Lazy.force special_dir ;; - let consume_event table path = - let basename = Filename.basename path in - match Table.find table basename with - | None -> None - | Some id -> - Fpath.unlink_no_err path; - Table.remove table basename; - Some id + let consume_event sync_table path = + Mutex.protect sync_table.mutex (fun () -> + let basename = Filename.basename path in + match Table.find sync_table.table basename with + | None -> None + | Some id -> + Fpath.unlink_no_err path; + Table.remove sync_table.table basename; + Some id) ;; let is_special_file_fsevents (path : Path.t) = @@ -368,27 +385,32 @@ let spawn_external_watcher ~backend ~watch_exclusions = ;; let create_inotifylib_watcher ~sync_table ~(scheduler : Scheduler.t) should_exclude = - Inotify.create ~modify_event_selector:`Closed_writable_fd ~emit_events:(fun events -> - scheduler.thread_safe_send_emit_events_job (fun () -> - List.concat_map events ~f:(fun event -> - let is_fs_sync_event_generated_by_dune = - match (event : Inotify.Event.t) with - | Modified path | Created path | Unlinked path -> - Option.some_if - (Fs_sync.is_special_file ~path_as_reported_by_file_watcher:path) - path - | Moved _ | Queue_overflow -> None - in - let events = - match is_fs_sync_event_generated_by_dune with - | None -> process_inotify_event event should_exclude - | Some path -> - (match Fs_sync.consume_event sync_table path with - | None -> [] - | Some id -> [ Event.Sync id ]) - in - emit_events events; - events))) + Inotify.create + ~mutex:sync_table.mutex + ~modify_event_selector:`Closed_writable_fd + ~emit_events:(fun events -> + let events = + List.concat_map events ~f:(fun event -> + let is_fs_sync_event_generated_by_dune = + match (event : Inotify.Event.t) with + | Modified path | Created path | Unlinked path -> + Option.some_if + (Fs_sync.is_special_file ~path_as_reported_by_file_watcher:path) + path + | Moved _ | Queue_overflow -> None + in + let events = + match is_fs_sync_event_generated_by_dune with + | None -> process_inotify_event event should_exclude + | Some path -> + (match Fs_sync.consume_event sync_table path with + | None -> [] + | Some id -> [ Event.Sync id ]) + in + emit_events events; + events) + in + scheduler.thread_safe_send_events events) ;; let create_external_fswatch ~(scheduler : Scheduler.t) ~backend ~watch_exclusions = @@ -397,21 +419,20 @@ let create_external_fswatch ~(scheduler : Scheduler.t) ~backend ~watch_exclusion let event_mtx = Mutex.create () in let event_cv = Condition.create () in let res = - let sync_table = Table.create (module String) 64 in + let sync_table = create_sync_table () in let pipe, pid = spawn_external_watcher ~backend ~watch_exclusions in let (_ : Thread.t) = Thread0.spawn ~name:"file-watcher" (fun () -> - let enqueue job = + let enqueue events = Mutex.protect event_mtx (fun () -> - jobs := job :: !jobs; + jobs := events :: !jobs; Condition.signal event_cv) in let rec loop () = - (* This job runs on the scheduler thread because it uses [sync_table]. *) match input_line pipe with - | exception End_of_file -> enqueue (fun () -> [ Event.Watcher_terminated ]) + | exception End_of_file -> enqueue [ Event.Watcher_terminated ] | path_s -> - enqueue (fun () -> + let events = if Fs_sync.is_special_file ~path_as_reported_by_file_watcher:path_s then ( match Fs_sync.consume_event sync_table path_s with @@ -419,7 +440,9 @@ let create_external_fswatch ~(scheduler : Scheduler.t) ~backend ~watch_exclusion | Some id -> [ Event.Sync id ]) else ( let path = Path.Expert.try_localize_external (Path.of_string path_s) in - [ Fs_memo_event (Fs_memo_event.create ~kind:File_changed ~path) ])); + [ Fs_memo_event (Fs_memo_event.create ~kind:File_changed ~path) ]) + in + enqueue events; loop () in loop ()) @@ -449,8 +472,7 @@ let create_external_fswatch ~(scheduler : Scheduler.t) ~backend ~watch_exclusion jobs := []; jobs_batch) in - scheduler.thread_safe_send_emit_events_job (fun () -> - List.concat_map jobs_batch ~f:(fun job -> job ())); + scheduler.thread_safe_send_events (List.concat jobs_batch); Thread.delay (Time.Span.to_secs debounce_interval); buffer_thread () in @@ -461,7 +483,7 @@ let create_external_fswatch ~(scheduler : Scheduler.t) ~backend ~watch_exclusion let create_inotifylib ~scheduler ~should_exclude = prepare_sync (); - let sync_table = Table.create (module String) 64 in + let sync_table = create_sync_table () in let inotify = create_inotifylib_watcher ~sync_table ~scheduler should_exclude in Inotify.add inotify (Lazy.force Fs_sync.special_dir); { kind = Inotify inotify; sync_table } @@ -475,12 +497,14 @@ let fsevents_callback ?exclusion_paths (scheduler : Scheduler.t) ~f events = | None -> fun _ -> false | Some paths -> fun p -> List.mem paths p ~equal:Path.equal in - scheduler.thread_safe_send_emit_events_job (fun () -> + let events = List.filter_map events ~f:(fun event -> let path = Fsevents.Event.path event |> Path.of_string |> Path.Expert.try_localize_external in - if skip_path path then None else f event path)) + if skip_path path then None else f event path) + in + scheduler.thread_safe_send_events events ;; let fsevents ?exclusion_paths ~latency ~paths scheduler f = @@ -515,7 +539,7 @@ let create_fsevents () = prepare_sync (); - let sync_table = Table.create (module String) 64 in + let sync_table = create_sync_table () in let sync = (* Keep the original event path for consuming the sync file; use the localized path only to check whether the event is in the build directory. *) @@ -587,7 +611,7 @@ let fswatch_win_callback ~(scheduler : Scheduler.t) ~sync_table ~should_exclude | Added | Modified -> (match Fs_sync.consume_event sync_table filename with | None -> () - | Some id -> scheduler.thread_safe_send_emit_events_job (fun () -> [ Sync id ])) + | Some id -> scheduler.thread_safe_send_events [ Sync id ]) | Removed | Renamed_new | Renamed_old -> ()) | path -> let normalized_filename = @@ -596,20 +620,19 @@ let fswatch_win_callback ~(scheduler : Scheduler.t) ~sync_table ~should_exclude (String.split_on_char ~sep:'\\' (String.lowercase_ascii filename)) in if not (should_exclude normalized_filename) - then - scheduler.thread_safe_send_emit_events_job (fun () -> - let kind = - match Fswatch_win.Event.action event with - | Added | Renamed_new -> Fs_memo_event.Created - | Removed | Renamed_old -> Deleted - | Modified -> File_changed - in - [ Fs_memo_event { kind; path } ]) + then ( + let kind = + match Fswatch_win.Event.action event with + | Added | Renamed_new -> Fs_memo_event.Created + | Removed | Renamed_old -> Deleted + | Modified -> File_changed + in + scheduler.thread_safe_send_events [ Fs_memo_event { kind; path } ]) ;; let create_fswatch_win ~(scheduler : Scheduler.t) ~debounce_interval:sleep ~should_exclude = - let sync_table = Table.create (module String) 64 in + let sync_table = create_sync_table () in let t = Fswatch_win.create () in Fswatch_win.add t (Path.to_absolute_filename Path.root); let (_ : Thread.t) = @@ -692,4 +715,4 @@ let add_watch t path = Ok ())) ;; -let emit_sync = Fs_sync.emit +let emit_sync t = Fs_sync.emit t.sync_table diff --git a/src/dune_scheduler/file_watcher.mli b/src/dune_scheduler/file_watcher.mli index 3f83ac679b1..6a6bc7c930f 100644 --- a/src/dune_scheduler/file_watcher.mli +++ b/src/dune_scheduler/file_watcher.mli @@ -54,10 +54,9 @@ end module Scheduler : sig (** Hook into the fiber scheduler. *) type t = - { thread_safe_send_emit_events_job : (unit -> Event.t list) -> unit - (** Send some events to the scheduler. The events are sent in the form - of a thunk to be executed on the scheduler thread, so that we can - do some bookkeeping that needs to happen there. *) + { thread_safe_send_events : Event.t list -> unit + (** Send some events to the scheduler. This function must be safe to call + from file watcher threads. *) } end diff --git a/src/dune_scheduler/inotify.ml b/src/dune_scheduler/inotify.ml index 69d7b104db5..c89a9ea9bbe 100644 --- a/src/dune_scheduler/inotify.ml +++ b/src/dune_scheduler/inotify.ml @@ -157,7 +157,7 @@ let pump_events t = () ;; -let create ~modify_event_selector ~emit_events = +let create ~mutex ~modify_event_selector ~emit_events = let fd = Inotify.create () |> Fd.unsafe_of_unix_file_descr in let watch_table = Table.create (module Inotify_watch) 10 in let modify_selector : Inotify.selector = @@ -167,7 +167,7 @@ let create ~modify_event_selector ~emit_events = in let t = { fd - ; mutex = Mutex.create () + ; mutex ; watch_table ; select_events = [ S_Create; S_Delete; modify_selector; S_Move_self; S_Moved_from; S_Moved_to ] diff --git a/src/dune_scheduler/inotify.mli b/src/dune_scheduler/inotify.mli index d9eaa814cd0..48efd88314c 100644 --- a/src/dune_scheduler/inotify.mli +++ b/src/dune_scheduler/inotify.mli @@ -51,9 +51,14 @@ type modify_event_selector = (** [create] creates an inotify watcher. + [mutex] is used for the inotify bookkeeping. This is useful when + [emit_events] needs to synchronize its own bookkeeping with inotify event + processing. + [emit_events] is called with events emitted by the watcher. *) val create - : modify_event_selector:modify_event_selector + : mutex:Mutex.t + -> modify_event_selector:modify_event_selector -> emit_events:(Event.t list -> unit) -> t diff --git a/src/dune_scheduler/scheduler.ml b/src/dune_scheduler/scheduler.ml index 9250afe9adc..cd29457c3c7 100644 --- a/src/dune_scheduler/scheduler.ml +++ b/src/dune_scheduler/scheduler.ml @@ -255,10 +255,6 @@ module Run_once = struct let rec iter (t : t) : Fiber.fill list = Console.Status_line.refresh (); match Event.Queue.next t.events with - | File_watcher_task job -> - let events = job () in - Event.Queue.send_file_watcher_events t.events events; - iter t | File_system_sync id -> (match Table.find t.fs_syncs id with | None -> iter t @@ -495,9 +491,7 @@ module Run = struct Some (File_watcher.create_default ~scheduler: - { thread_safe_send_emit_events_job = - (fun job -> Event_queue.send_file_watcher_task events job) - } + { thread_safe_send_events = Event_queue.send_file_watcher_events events } ~watch_exclusions:config.watch_exclusions ()) in diff --git a/test/expect-tests/dune_file_watcher/dune_file_watcher_tests_linux.ml b/test/expect-tests/dune_file_watcher/dune_file_watcher_tests_linux.ml index cfe94a7cb4f..7a97d4428a5 100644 --- a/test/expect-tests/dune_file_watcher/dune_file_watcher_tests_linux.ml +++ b/test/expect-tests/dune_file_watcher/dune_file_watcher_tests_linux.ml @@ -9,11 +9,9 @@ let%expect_test _ = let watcher = Dune_scheduler.File_watcher.create_default ~scheduler: - { thread_safe_send_emit_events_job = - (fun job -> - Mutex.protect mutex (fun () -> - let events = job () in - events_buffer := !events_buffer @ events)) + { thread_safe_send_events = + (fun events -> + Mutex.protect mutex (fun () -> events_buffer := !events_buffer @ events)) } ~watch_exclusions:[] () diff --git a/test/expect-tests/dune_file_watcher/dune_file_watcher_tests_macos.ml b/test/expect-tests/dune_file_watcher/dune_file_watcher_tests_macos.ml index 72acae3f6cb..359c2f1d413 100644 --- a/test/expect-tests/dune_file_watcher/dune_file_watcher_tests_macos.ml +++ b/test/expect-tests/dune_file_watcher/dune_file_watcher_tests_macos.ml @@ -10,11 +10,9 @@ let%expect_test _ = Dune_scheduler.File_watcher.create_default ~fsevents_debounce:(Time.Span.of_secs 0.) ~scheduler: - { thread_safe_send_emit_events_job = - (fun job -> - Mutex.protect mutex (fun () -> - let events = job () in - events_buffer := !events_buffer @ events)) + { thread_safe_send_events = + (fun events -> + Mutex.protect mutex (fun () -> events_buffer := !events_buffer @ events)) } ~watch_exclusions:[] () diff --git a/test/expect-tests/inotify_tests/inotify_tests.ml b/test/expect-tests/inotify_tests/inotify_tests.ml index 975ebdfdf0f..b507bde7cbc 100644 --- a/test/expect-tests/inotify_tests/inotify_tests.ml +++ b/test/expect-tests/inotify_tests/inotify_tests.ml @@ -55,6 +55,7 @@ let watch, collect_events = let cond = Condition.create () in let inotify = Inotify.create + ~mutex ~modify_event_selector:`Closed_writable_fd ~emit_events:(fun file_events -> Mutex.protect mutex (fun () ->