From ad4c5ceb48360c4a82b5731bb7c568c4a179081c Mon Sep 17 00:00:00 2001 From: Rudi Grinberg Date: Sat, 23 May 2026 21:42:17 +0100 Subject: [PATCH] Send file watcher events directly to the scheduler File watcher callbacks now construct event lists before sending them through the scheduler hook, removing the intermediate queued thunk path. Protect fs sync bookkeeping with a mutex shared with the inotify watcher so sync events can be consumed safely from watcher threads. Signed-off-by: Rudi Grinberg --- src/dune_scheduler/event.ml | 17 +- src/dune_scheduler/event.mli | 7 - src/dune_scheduler/file_watcher.ml | 145 ++++++++++-------- src/dune_scheduler/file_watcher.mli | 7 +- src/dune_scheduler/inotify.ml | 4 +- src/dune_scheduler/inotify.mli | 7 +- src/dune_scheduler/scheduler.ml | 8 +- .../dune_file_watcher_tests_linux.ml | 8 +- .../dune_file_watcher_tests_macos.ml | 8 +- .../inotify_tests/inotify_tests.ml | 1 + 10 files changed, 105 insertions(+), 107 deletions(-) diff --git a/src/dune_scheduler/event.ml b/src/dune_scheduler/event.ml index bc971926580..be68892b9f9 100644 --- a/src/dune_scheduler/event.ml +++ b/src/dune_scheduler/event.ml @@ -19,7 +19,6 @@ type build_input_change = | Invalidation of Memo.Invalidation.t type t = - | File_watcher_task of (unit -> File_watcher.Event.t list) | Build_inputs_changed of build_input_change Nonempty_list.t | File_system_sync of File_watcher.Sync_id.t | File_system_watcher_terminated @@ -38,7 +37,6 @@ module Queue = struct type t = { jobs_completed : (job * Proc.Process_info.t) Queue.t - ; file_watcher_tasks : (unit -> File_watcher.Event.t list) Queue.t ; mutable invalidation_events : Invalidation_event.t list ; mutable shutdown_reasons : Shutdown.Reason.Set.t ; mutex : Mutex.t @@ -60,7 +58,6 @@ module Queue = struct let create () = let jobs_completed = Queue.create () in - let file_watcher_tasks = Queue.create () in let worker_tasks_completed = Queue.create () in let invalidation_events = [] in let shutdown_reasons = Shutdown.Reason.Set.empty in @@ -69,7 +66,6 @@ module Queue = struct let pending_jobs = 0 in let pending_worker_tasks = 0 in { jobs_completed - ; file_watcher_tasks ; invalidation_events ; shutdown_reasons ; mutex @@ -140,10 +136,6 @@ module Queue = struct Some Job_complete_ready ;; - let file_watcher_task q = - Option.map (Queue.pop q.file_watcher_tasks) ~f:(fun job -> File_watcher_task job) - ;; - let invalidation q = match q.invalidation_events with | [] -> None @@ -206,13 +198,12 @@ module Queue = struct let events_in_order = (* Event sources are listed in priority order. Signals are the highest priority to maximize responsiveness to Ctrl+C. - [file_watcher_task], [worker_tasks_completed] and [invalidation] are - used for reacting to user input, so their latency is also important. + [invalidation] and [worker_tasks_completed] are used for reacting to + user input, so their latency is also important. [jobs_completed] and [yield] are where the bulk of the work is done, so they are the lowest priority to avoid starving other things. *) Event_source. [ shutdown - ; file_watcher_task ; invalidation ; worker_tasks_completed ; (if Sys.win32 then jobs_completed else job_complete_ready) @@ -280,10 +271,6 @@ module Queue = struct q.shutdown_reasons <- Shutdown.Reason.Set.add q.shutdown_reasons signal) ;; - let send_file_watcher_task q job = - add_event q (fun q -> Queue.push q.file_watcher_tasks job) - ;; - let pending_jobs q = q.pending_jobs let pending_worker_tasks q = q.pending_worker_tasks end diff --git a/src/dune_scheduler/event.mli b/src/dune_scheduler/event.mli index 90bb863fea5..54983b4dba8 100644 --- a/src/dune_scheduler/event.mli +++ b/src/dune_scheduler/event.mli @@ -13,7 +13,6 @@ type build_input_change = | Invalidation of Memo.Invalidation.t type t = - | File_watcher_task of (unit -> File_watcher.Event.t list) | Build_inputs_changed of build_input_change Nonempty_list.t | File_system_sync of File_watcher.Sync_id.t | File_system_watcher_terminated @@ -48,13 +47,7 @@ module Queue : sig val send_worker_tasks_completed : t -> Fiber.fill list -> unit val register_worker_task_started : t -> unit val cancel_work_task_started : t -> unit - val send_file_watcher_task : t -> (unit -> File_watcher.Event.t list) -> unit - - (** It's a bit weird to have both this and [send_file_watcher_task]. The - reason is that [send_file_watcher_task] uses [send_file_watcher_events] - internally. *) val send_file_watcher_events : t -> File_watcher.Event.t list -> unit - val send_invalidation_event : t -> Memo.Invalidation.t -> unit val send_job_completed : t -> job -> Proc.Process_info.t -> unit val send_job_completed_ready : t -> unit diff --git a/src/dune_scheduler/file_watcher.ml b/src/dune_scheduler/file_watcher.ml index 5524175ba9f..46506e6550b 100644 --- a/src/dune_scheduler/file_watcher.ml +++ b/src/dune_scheduler/file_watcher.ml @@ -48,7 +48,7 @@ module Event = struct end module Scheduler = struct - type t = { thread_safe_send_emit_events_job : (unit -> Event.t list) -> unit } + type t = { thread_safe_send_events : Event.t list -> unit } end module Watch_trie : sig @@ -141,9 +141,18 @@ type kind = | Inotify of Inotify.t | Fswatch_win of { t : Fswatch_win.t } +type sync_table = + { table : (string, Sync_id.t) Table.t + ; mutex : Mutex.t + } + +let create_sync_table () = + { table = Table.create (module String) 64; mutex = Mutex.create () } +;; + type t = { kind : kind - ; sync_table : (string, Sync_id.t) Table.t + ; sync_table : sync_table (* Pending fs sync operations indexed by the special sync filename. *) } @@ -215,24 +224,31 @@ let shutdown t = module Fs_sync : sig val special_dir_path : Path.Build.t Lazy.t val special_dir : string Lazy.t - val emit : t -> Sync_id.t + val emit : sync_table -> Sync_id.t val is_special_file : path_as_reported_by_file_watcher:string -> bool (** fsevents always reports absolute paths. therefore, we need callers to make an effort to determine if an absolute path is in fact in the build dir *) val is_special_file_fsevents : Path.t -> bool - val consume_event : (string, 'a) Table.t -> string -> 'a option + val consume_event : sync_table -> string -> Sync_id.t option end = struct let special_dir_path = lazy (Path.Build.relative Path.Build.root ".sync") let special_dir = lazy (Lazy.force special_dir_path |> Path.Build.to_string) - let emit t = + let emit sync_table = let id = Sync_id.gen () in let fn = id |> Sync_id.to_int |> string_of_int in let path = Filename.concat (Lazy.force special_dir) fn in - Unix.close (Unix.openfile path [ O_WRONLY; O_CREAT; O_TRUNC; O_CLOEXEC ] 0o666); - Table.set t.sync_table fn id; + Mutex.protect sync_table.mutex (fun () -> + Table.set sync_table.table fn id; + match + Unix.close (Unix.openfile path [ O_WRONLY; O_CREAT; O_TRUNC; O_CLOEXEC ] 0o666) + with + | () -> () + | exception exn -> + Table.remove sync_table.table fn; + Exn.reraise exn); id ;; @@ -243,14 +259,15 @@ end = struct Filename.dirname path_as_reported_by_file_watcher = Lazy.force special_dir ;; - let consume_event table path = - let basename = Filename.basename path in - match Table.find table basename with - | None -> None - | Some id -> - Fpath.unlink_no_err path; - Table.remove table basename; - Some id + let consume_event sync_table path = + Mutex.protect sync_table.mutex (fun () -> + let basename = Filename.basename path in + match Table.find sync_table.table basename with + | None -> None + | Some id -> + Fpath.unlink_no_err path; + Table.remove sync_table.table basename; + Some id) ;; let is_special_file_fsevents (path : Path.t) = @@ -368,27 +385,32 @@ let spawn_external_watcher ~backend ~watch_exclusions = ;; let create_inotifylib_watcher ~sync_table ~(scheduler : Scheduler.t) should_exclude = - Inotify.create ~modify_event_selector:`Closed_writable_fd ~emit_events:(fun events -> - scheduler.thread_safe_send_emit_events_job (fun () -> - List.concat_map events ~f:(fun event -> - let is_fs_sync_event_generated_by_dune = - match (event : Inotify.Event.t) with - | Modified path | Created path | Unlinked path -> - Option.some_if - (Fs_sync.is_special_file ~path_as_reported_by_file_watcher:path) - path - | Moved _ | Queue_overflow -> None - in - let events = - match is_fs_sync_event_generated_by_dune with - | None -> process_inotify_event event should_exclude - | Some path -> - (match Fs_sync.consume_event sync_table path with - | None -> [] - | Some id -> [ Event.Sync id ]) - in - emit_events events; - events))) + Inotify.create + ~mutex:sync_table.mutex + ~modify_event_selector:`Closed_writable_fd + ~emit_events:(fun events -> + let events = + List.concat_map events ~f:(fun event -> + let is_fs_sync_event_generated_by_dune = + match (event : Inotify.Event.t) with + | Modified path | Created path | Unlinked path -> + Option.some_if + (Fs_sync.is_special_file ~path_as_reported_by_file_watcher:path) + path + | Moved _ | Queue_overflow -> None + in + let events = + match is_fs_sync_event_generated_by_dune with + | None -> process_inotify_event event should_exclude + | Some path -> + (match Fs_sync.consume_event sync_table path with + | None -> [] + | Some id -> [ Event.Sync id ]) + in + emit_events events; + events) + in + scheduler.thread_safe_send_events events) ;; let create_external_fswatch ~(scheduler : Scheduler.t) ~backend ~watch_exclusions = @@ -397,21 +419,20 @@ let create_external_fswatch ~(scheduler : Scheduler.t) ~backend ~watch_exclusion let event_mtx = Mutex.create () in let event_cv = Condition.create () in let res = - let sync_table = Table.create (module String) 64 in + let sync_table = create_sync_table () in let pipe, pid = spawn_external_watcher ~backend ~watch_exclusions in let (_ : Thread.t) = Thread0.spawn ~name:"file-watcher" (fun () -> - let enqueue job = + let enqueue events = Mutex.protect event_mtx (fun () -> - jobs := job :: !jobs; + jobs := events :: !jobs; Condition.signal event_cv) in let rec loop () = - (* This job runs on the scheduler thread because it uses [sync_table]. *) match input_line pipe with - | exception End_of_file -> enqueue (fun () -> [ Event.Watcher_terminated ]) + | exception End_of_file -> enqueue [ Event.Watcher_terminated ] | path_s -> - enqueue (fun () -> + let events = if Fs_sync.is_special_file ~path_as_reported_by_file_watcher:path_s then ( match Fs_sync.consume_event sync_table path_s with @@ -419,7 +440,9 @@ let create_external_fswatch ~(scheduler : Scheduler.t) ~backend ~watch_exclusion | Some id -> [ Event.Sync id ]) else ( let path = Path.Expert.try_localize_external (Path.of_string path_s) in - [ Fs_memo_event (Fs_memo_event.create ~kind:File_changed ~path) ])); + [ Fs_memo_event (Fs_memo_event.create ~kind:File_changed ~path) ]) + in + enqueue events; loop () in loop ()) @@ -449,8 +472,7 @@ let create_external_fswatch ~(scheduler : Scheduler.t) ~backend ~watch_exclusion jobs := []; jobs_batch) in - scheduler.thread_safe_send_emit_events_job (fun () -> - List.concat_map jobs_batch ~f:(fun job -> job ())); + scheduler.thread_safe_send_events (List.concat jobs_batch); Thread.delay (Time.Span.to_secs debounce_interval); buffer_thread () in @@ -461,7 +483,7 @@ let create_external_fswatch ~(scheduler : Scheduler.t) ~backend ~watch_exclusion let create_inotifylib ~scheduler ~should_exclude = prepare_sync (); - let sync_table = Table.create (module String) 64 in + let sync_table = create_sync_table () in let inotify = create_inotifylib_watcher ~sync_table ~scheduler should_exclude in Inotify.add inotify (Lazy.force Fs_sync.special_dir); { kind = Inotify inotify; sync_table } @@ -475,12 +497,14 @@ let fsevents_callback ?exclusion_paths (scheduler : Scheduler.t) ~f events = | None -> fun _ -> false | Some paths -> fun p -> List.mem paths p ~equal:Path.equal in - scheduler.thread_safe_send_emit_events_job (fun () -> + let events = List.filter_map events ~f:(fun event -> let path = Fsevents.Event.path event |> Path.of_string |> Path.Expert.try_localize_external in - if skip_path path then None else f event path)) + if skip_path path then None else f event path) + in + scheduler.thread_safe_send_events events ;; let fsevents ?exclusion_paths ~latency ~paths scheduler f = @@ -515,7 +539,7 @@ let create_fsevents () = prepare_sync (); - let sync_table = Table.create (module String) 64 in + let sync_table = create_sync_table () in let sync = (* Keep the original event path for consuming the sync file; use the localized path only to check whether the event is in the build directory. *) @@ -587,7 +611,7 @@ let fswatch_win_callback ~(scheduler : Scheduler.t) ~sync_table ~should_exclude | Added | Modified -> (match Fs_sync.consume_event sync_table filename with | None -> () - | Some id -> scheduler.thread_safe_send_emit_events_job (fun () -> [ Sync id ])) + | Some id -> scheduler.thread_safe_send_events [ Sync id ]) | Removed | Renamed_new | Renamed_old -> ()) | path -> let normalized_filename = @@ -596,20 +620,19 @@ let fswatch_win_callback ~(scheduler : Scheduler.t) ~sync_table ~should_exclude (String.split_on_char ~sep:'\\' (String.lowercase_ascii filename)) in if not (should_exclude normalized_filename) - then - scheduler.thread_safe_send_emit_events_job (fun () -> - let kind = - match Fswatch_win.Event.action event with - | Added | Renamed_new -> Fs_memo_event.Created - | Removed | Renamed_old -> Deleted - | Modified -> File_changed - in - [ Fs_memo_event { kind; path } ]) + then ( + let kind = + match Fswatch_win.Event.action event with + | Added | Renamed_new -> Fs_memo_event.Created + | Removed | Renamed_old -> Deleted + | Modified -> File_changed + in + scheduler.thread_safe_send_events [ Fs_memo_event { kind; path } ]) ;; let create_fswatch_win ~(scheduler : Scheduler.t) ~debounce_interval:sleep ~should_exclude = - let sync_table = Table.create (module String) 64 in + let sync_table = create_sync_table () in let t = Fswatch_win.create () in Fswatch_win.add t (Path.to_absolute_filename Path.root); let (_ : Thread.t) = @@ -692,4 +715,4 @@ let add_watch t path = Ok ())) ;; -let emit_sync = Fs_sync.emit +let emit_sync t = Fs_sync.emit t.sync_table diff --git a/src/dune_scheduler/file_watcher.mli b/src/dune_scheduler/file_watcher.mli index 3f83ac679b1..6a6bc7c930f 100644 --- a/src/dune_scheduler/file_watcher.mli +++ b/src/dune_scheduler/file_watcher.mli @@ -54,10 +54,9 @@ end module Scheduler : sig (** Hook into the fiber scheduler. *) type t = - { thread_safe_send_emit_events_job : (unit -> Event.t list) -> unit - (** Send some events to the scheduler. The events are sent in the form - of a thunk to be executed on the scheduler thread, so that we can - do some bookkeeping that needs to happen there. *) + { thread_safe_send_events : Event.t list -> unit + (** Send some events to the scheduler. This function must be safe to call + from file watcher threads. *) } end diff --git a/src/dune_scheduler/inotify.ml b/src/dune_scheduler/inotify.ml index 69d7b104db5..c89a9ea9bbe 100644 --- a/src/dune_scheduler/inotify.ml +++ b/src/dune_scheduler/inotify.ml @@ -157,7 +157,7 @@ let pump_events t = () ;; -let create ~modify_event_selector ~emit_events = +let create ~mutex ~modify_event_selector ~emit_events = let fd = Inotify.create () |> Fd.unsafe_of_unix_file_descr in let watch_table = Table.create (module Inotify_watch) 10 in let modify_selector : Inotify.selector = @@ -167,7 +167,7 @@ let create ~modify_event_selector ~emit_events = in let t = { fd - ; mutex = Mutex.create () + ; mutex ; watch_table ; select_events = [ S_Create; S_Delete; modify_selector; S_Move_self; S_Moved_from; S_Moved_to ] diff --git a/src/dune_scheduler/inotify.mli b/src/dune_scheduler/inotify.mli index d9eaa814cd0..48efd88314c 100644 --- a/src/dune_scheduler/inotify.mli +++ b/src/dune_scheduler/inotify.mli @@ -51,9 +51,14 @@ type modify_event_selector = (** [create] creates an inotify watcher. + [mutex] is used for the inotify bookkeeping. This is useful when + [emit_events] needs to synchronize its own bookkeeping with inotify event + processing. + [emit_events] is called with events emitted by the watcher. *) val create - : modify_event_selector:modify_event_selector + : mutex:Mutex.t + -> modify_event_selector:modify_event_selector -> emit_events:(Event.t list -> unit) -> t diff --git a/src/dune_scheduler/scheduler.ml b/src/dune_scheduler/scheduler.ml index 9250afe9adc..cd29457c3c7 100644 --- a/src/dune_scheduler/scheduler.ml +++ b/src/dune_scheduler/scheduler.ml @@ -255,10 +255,6 @@ module Run_once = struct let rec iter (t : t) : Fiber.fill list = Console.Status_line.refresh (); match Event.Queue.next t.events with - | File_watcher_task job -> - let events = job () in - Event.Queue.send_file_watcher_events t.events events; - iter t | File_system_sync id -> (match Table.find t.fs_syncs id with | None -> iter t @@ -495,9 +491,7 @@ module Run = struct Some (File_watcher.create_default ~scheduler: - { thread_safe_send_emit_events_job = - (fun job -> Event_queue.send_file_watcher_task events job) - } + { thread_safe_send_events = Event_queue.send_file_watcher_events events } ~watch_exclusions:config.watch_exclusions ()) in diff --git a/test/expect-tests/dune_file_watcher/dune_file_watcher_tests_linux.ml b/test/expect-tests/dune_file_watcher/dune_file_watcher_tests_linux.ml index cfe94a7cb4f..7a97d4428a5 100644 --- a/test/expect-tests/dune_file_watcher/dune_file_watcher_tests_linux.ml +++ b/test/expect-tests/dune_file_watcher/dune_file_watcher_tests_linux.ml @@ -9,11 +9,9 @@ let%expect_test _ = let watcher = Dune_scheduler.File_watcher.create_default ~scheduler: - { thread_safe_send_emit_events_job = - (fun job -> - Mutex.protect mutex (fun () -> - let events = job () in - events_buffer := !events_buffer @ events)) + { thread_safe_send_events = + (fun events -> + Mutex.protect mutex (fun () -> events_buffer := !events_buffer @ events)) } ~watch_exclusions:[] () diff --git a/test/expect-tests/dune_file_watcher/dune_file_watcher_tests_macos.ml b/test/expect-tests/dune_file_watcher/dune_file_watcher_tests_macos.ml index 72acae3f6cb..359c2f1d413 100644 --- a/test/expect-tests/dune_file_watcher/dune_file_watcher_tests_macos.ml +++ b/test/expect-tests/dune_file_watcher/dune_file_watcher_tests_macos.ml @@ -10,11 +10,9 @@ let%expect_test _ = Dune_scheduler.File_watcher.create_default ~fsevents_debounce:(Time.Span.of_secs 0.) ~scheduler: - { thread_safe_send_emit_events_job = - (fun job -> - Mutex.protect mutex (fun () -> - let events = job () in - events_buffer := !events_buffer @ events)) + { thread_safe_send_events = + (fun events -> + Mutex.protect mutex (fun () -> events_buffer := !events_buffer @ events)) } ~watch_exclusions:[] () diff --git a/test/expect-tests/inotify_tests/inotify_tests.ml b/test/expect-tests/inotify_tests/inotify_tests.ml index 975ebdfdf0f..b507bde7cbc 100644 --- a/test/expect-tests/inotify_tests/inotify_tests.ml +++ b/test/expect-tests/inotify_tests/inotify_tests.ml @@ -55,6 +55,7 @@ let watch, collect_events = let cond = Condition.create () in let inotify = Inotify.create + ~mutex ~modify_event_selector:`Closed_writable_fd ~emit_events:(fun file_events -> Mutex.protect mutex (fun () ->