Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 2 additions & 15 deletions src/dune_scheduler/event.ml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ type build_input_change =
| Invalidation of Memo.Invalidation.t

type t =
| File_watcher_task of (unit -> File_watcher.Event.t list)
| Build_inputs_changed of build_input_change Nonempty_list.t
| File_system_sync of File_watcher.Sync_id.t
| File_system_watcher_terminated
Expand All @@ -38,7 +37,6 @@ module Queue = struct

type t =
{ jobs_completed : (job * Proc.Process_info.t) Queue.t
; file_watcher_tasks : (unit -> File_watcher.Event.t list) Queue.t
; mutable invalidation_events : Invalidation_event.t list
; mutable shutdown_reasons : Shutdown.Reason.Set.t
; mutex : Mutex.t
Expand All @@ -60,7 +58,6 @@ module Queue = struct

let create () =
let jobs_completed = Queue.create () in
let file_watcher_tasks = Queue.create () in
let worker_tasks_completed = Queue.create () in
let invalidation_events = [] in
let shutdown_reasons = Shutdown.Reason.Set.empty in
Expand All @@ -69,7 +66,6 @@ module Queue = struct
let pending_jobs = 0 in
let pending_worker_tasks = 0 in
{ jobs_completed
; file_watcher_tasks
; invalidation_events
; shutdown_reasons
; mutex
Expand Down Expand Up @@ -140,10 +136,6 @@ module Queue = struct
Some Job_complete_ready
;;

let file_watcher_task q =
Option.map (Queue.pop q.file_watcher_tasks) ~f:(fun job -> File_watcher_task job)
;;

let invalidation q =
match q.invalidation_events with
| [] -> None
Expand Down Expand Up @@ -206,13 +198,12 @@ module Queue = struct
let events_in_order =
(* Event sources are listed in priority order. Signals are the
highest priority to maximize responsiveness to Ctrl+C.
[file_watcher_task], [worker_tasks_completed] and [invalidation] are
used for reacting to user input, so their latency is also important.
[invalidation] and [worker_tasks_completed] are used for reacting to
user input, so their latency is also important.
[jobs_completed] and [yield] are where the bulk of the work is done, so
they are the lowest priority to avoid starving other things. *)
Event_source.
[ shutdown
; file_watcher_task
; invalidation
; worker_tasks_completed
; (if Sys.win32 then jobs_completed else job_complete_ready)
Expand Down Expand Up @@ -280,10 +271,6 @@ module Queue = struct
q.shutdown_reasons <- Shutdown.Reason.Set.add q.shutdown_reasons signal)
;;

let send_file_watcher_task q job =
add_event q (fun q -> Queue.push q.file_watcher_tasks job)
;;

let pending_jobs q = q.pending_jobs
let pending_worker_tasks q = q.pending_worker_tasks
end
7 changes: 0 additions & 7 deletions src/dune_scheduler/event.mli
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ type build_input_change =
| Invalidation of Memo.Invalidation.t

type t =
| File_watcher_task of (unit -> File_watcher.Event.t list)
| Build_inputs_changed of build_input_change Nonempty_list.t
| File_system_sync of File_watcher.Sync_id.t
| File_system_watcher_terminated
Expand Down Expand Up @@ -48,13 +47,7 @@ module Queue : sig
val send_worker_tasks_completed : t -> Fiber.fill list -> unit
val register_worker_task_started : t -> unit
val cancel_work_task_started : t -> unit
val send_file_watcher_task : t -> (unit -> File_watcher.Event.t list) -> unit

(** It's a bit weird to have both this and [send_file_watcher_task]. The
reason is that [send_file_watcher_task] uses [send_file_watcher_events]
internally. *)
val send_file_watcher_events : t -> File_watcher.Event.t list -> unit

val send_invalidation_event : t -> Memo.Invalidation.t -> unit
val send_job_completed : t -> job -> Proc.Process_info.t -> unit
val send_job_completed_ready : t -> unit
Expand Down
145 changes: 84 additions & 61 deletions src/dune_scheduler/file_watcher.ml
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ module Event = struct
end

module Scheduler = struct
type t = { thread_safe_send_emit_events_job : (unit -> Event.t list) -> unit }
type t = { thread_safe_send_events : Event.t list -> unit }
end

module Watch_trie : sig
Expand Down Expand Up @@ -141,9 +141,18 @@ type kind =
| Inotify of Inotify.t
| Fswatch_win of { t : Fswatch_win.t }

type sync_table =
{ table : (string, Sync_id.t) Table.t
; mutex : Mutex.t
}

let create_sync_table () =
{ table = Table.create (module String) 64; mutex = Mutex.create () }
;;

type t =
{ kind : kind
; sync_table : (string, Sync_id.t) Table.t
; sync_table : sync_table
(* Pending fs sync operations indexed by the special sync filename. *)
}

Expand Down Expand Up @@ -215,24 +224,31 @@ let shutdown t =
module Fs_sync : sig
val special_dir_path : Path.Build.t Lazy.t
val special_dir : string Lazy.t
val emit : t -> Sync_id.t
val emit : sync_table -> Sync_id.t
val is_special_file : path_as_reported_by_file_watcher:string -> bool

(** fsevents always reports absolute paths. therefore, we need callers to make
an effort to determine if an absolute path is in fact in the build dir *)
val is_special_file_fsevents : Path.t -> bool

val consume_event : (string, 'a) Table.t -> string -> 'a option
val consume_event : sync_table -> string -> Sync_id.t option
end = struct
let special_dir_path = lazy (Path.Build.relative Path.Build.root ".sync")
let special_dir = lazy (Lazy.force special_dir_path |> Path.Build.to_string)

let emit t =
let emit sync_table =
let id = Sync_id.gen () in
let fn = id |> Sync_id.to_int |> string_of_int in
let path = Filename.concat (Lazy.force special_dir) fn in
Unix.close (Unix.openfile path [ O_WRONLY; O_CREAT; O_TRUNC; O_CLOEXEC ] 0o666);
Table.set t.sync_table fn id;
Mutex.protect sync_table.mutex (fun () ->
Table.set sync_table.table fn id;
match
Unix.close (Unix.openfile path [ O_WRONLY; O_CREAT; O_TRUNC; O_CLOEXEC ] 0o666)
with
| () -> ()
| exception exn ->
Table.remove sync_table.table fn;
Exn.reraise exn);
id
;;

Expand All @@ -243,14 +259,15 @@ end = struct
Filename.dirname path_as_reported_by_file_watcher = Lazy.force special_dir
;;

let consume_event table path =
let basename = Filename.basename path in
match Table.find table basename with
| None -> None
| Some id ->
Fpath.unlink_no_err path;
Table.remove table basename;
Some id
let consume_event sync_table path =
Mutex.protect sync_table.mutex (fun () ->
let basename = Filename.basename path in
match Table.find sync_table.table basename with
| None -> None
| Some id ->
Fpath.unlink_no_err path;
Table.remove sync_table.table basename;
Some id)
;;

let is_special_file_fsevents (path : Path.t) =
Expand Down Expand Up @@ -368,27 +385,32 @@ let spawn_external_watcher ~backend ~watch_exclusions =
;;

let create_inotifylib_watcher ~sync_table ~(scheduler : Scheduler.t) should_exclude =
Inotify.create ~modify_event_selector:`Closed_writable_fd ~emit_events:(fun events ->
scheduler.thread_safe_send_emit_events_job (fun () ->
List.concat_map events ~f:(fun event ->
let is_fs_sync_event_generated_by_dune =
match (event : Inotify.Event.t) with
| Modified path | Created path | Unlinked path ->
Option.some_if
(Fs_sync.is_special_file ~path_as_reported_by_file_watcher:path)
path
| Moved _ | Queue_overflow -> None
in
let events =
match is_fs_sync_event_generated_by_dune with
| None -> process_inotify_event event should_exclude
| Some path ->
(match Fs_sync.consume_event sync_table path with
| None -> []
| Some id -> [ Event.Sync id ])
in
emit_events events;
events)))
Inotify.create
~mutex:sync_table.mutex
~modify_event_selector:`Closed_writable_fd
~emit_events:(fun events ->
let events =
List.concat_map events ~f:(fun event ->
let is_fs_sync_event_generated_by_dune =
match (event : Inotify.Event.t) with
| Modified path | Created path | Unlinked path ->
Option.some_if
(Fs_sync.is_special_file ~path_as_reported_by_file_watcher:path)
path
| Moved _ | Queue_overflow -> None
in
let events =
match is_fs_sync_event_generated_by_dune with
| None -> process_inotify_event event should_exclude
| Some path ->
(match Fs_sync.consume_event sync_table path with
| None -> []
| Some id -> [ Event.Sync id ])
in
emit_events events;
events)
in
scheduler.thread_safe_send_events events)
;;

let create_external_fswatch ~(scheduler : Scheduler.t) ~backend ~watch_exclusions =
Expand All @@ -397,29 +419,30 @@ let create_external_fswatch ~(scheduler : Scheduler.t) ~backend ~watch_exclusion
let event_mtx = Mutex.create () in
let event_cv = Condition.create () in
let res =
let sync_table = Table.create (module String) 64 in
let sync_table = create_sync_table () in
let pipe, pid = spawn_external_watcher ~backend ~watch_exclusions in
let (_ : Thread.t) =
Thread0.spawn ~name:"file-watcher" (fun () ->
let enqueue job =
let enqueue events =
Mutex.protect event_mtx (fun () ->
jobs := job :: !jobs;
jobs := events :: !jobs;
Condition.signal event_cv)
in
let rec loop () =
(* This job runs on the scheduler thread because it uses [sync_table]. *)
match input_line pipe with
| exception End_of_file -> enqueue (fun () -> [ Event.Watcher_terminated ])
| exception End_of_file -> enqueue [ Event.Watcher_terminated ]
| path_s ->
enqueue (fun () ->
let events =
if Fs_sync.is_special_file ~path_as_reported_by_file_watcher:path_s
then (
match Fs_sync.consume_event sync_table path_s with
| None -> []
| Some id -> [ Event.Sync id ])
else (
let path = Path.Expert.try_localize_external (Path.of_string path_s) in
[ Fs_memo_event (Fs_memo_event.create ~kind:File_changed ~path) ]));
[ Fs_memo_event (Fs_memo_event.create ~kind:File_changed ~path) ])
in
enqueue events;
loop ()
in
loop ())
Expand Down Expand Up @@ -449,8 +472,7 @@ let create_external_fswatch ~(scheduler : Scheduler.t) ~backend ~watch_exclusion
jobs := [];
jobs_batch)
in
scheduler.thread_safe_send_emit_events_job (fun () ->
List.concat_map jobs_batch ~f:(fun job -> job ()));
scheduler.thread_safe_send_events (List.concat jobs_batch);
Thread.delay (Time.Span.to_secs debounce_interval);
buffer_thread ()
in
Expand All @@ -461,7 +483,7 @@ let create_external_fswatch ~(scheduler : Scheduler.t) ~backend ~watch_exclusion

let create_inotifylib ~scheduler ~should_exclude =
prepare_sync ();
let sync_table = Table.create (module String) 64 in
let sync_table = create_sync_table () in
let inotify = create_inotifylib_watcher ~sync_table ~scheduler should_exclude in
Inotify.add inotify (Lazy.force Fs_sync.special_dir);
{ kind = Inotify inotify; sync_table }
Expand All @@ -475,12 +497,14 @@ let fsevents_callback ?exclusion_paths (scheduler : Scheduler.t) ~f events =
| None -> fun _ -> false
| Some paths -> fun p -> List.mem paths p ~equal:Path.equal
in
scheduler.thread_safe_send_emit_events_job (fun () ->
let events =
List.filter_map events ~f:(fun event ->
let path =
Fsevents.Event.path event |> Path.of_string |> Path.Expert.try_localize_external
in
if skip_path path then None else f event path))
if skip_path path then None else f event path)
in
scheduler.thread_safe_send_events events
;;

let fsevents ?exclusion_paths ~latency ~paths scheduler f =
Expand Down Expand Up @@ -515,7 +539,7 @@ let create_fsevents
()
=
prepare_sync ();
let sync_table = Table.create (module String) 64 in
let sync_table = create_sync_table () in
let sync =
(* Keep the original event path for consuming the sync file; use the
localized path only to check whether the event is in the build directory. *)
Expand Down Expand Up @@ -587,7 +611,7 @@ let fswatch_win_callback ~(scheduler : Scheduler.t) ~sync_table ~should_exclude
| Added | Modified ->
(match Fs_sync.consume_event sync_table filename with
| None -> ()
| Some id -> scheduler.thread_safe_send_emit_events_job (fun () -> [ Sync id ]))
| Some id -> scheduler.thread_safe_send_events [ Sync id ])
| Removed | Renamed_new | Renamed_old -> ())
| path ->
let normalized_filename =
Expand All @@ -596,20 +620,19 @@ let fswatch_win_callback ~(scheduler : Scheduler.t) ~sync_table ~should_exclude
(String.split_on_char ~sep:'\\' (String.lowercase_ascii filename))
in
if not (should_exclude normalized_filename)
then
scheduler.thread_safe_send_emit_events_job (fun () ->
let kind =
match Fswatch_win.Event.action event with
| Added | Renamed_new -> Fs_memo_event.Created
| Removed | Renamed_old -> Deleted
| Modified -> File_changed
in
[ Fs_memo_event { kind; path } ])
then (
let kind =
match Fswatch_win.Event.action event with
| Added | Renamed_new -> Fs_memo_event.Created
| Removed | Renamed_old -> Deleted
| Modified -> File_changed
in
scheduler.thread_safe_send_events [ Fs_memo_event { kind; path } ])
;;

let create_fswatch_win ~(scheduler : Scheduler.t) ~debounce_interval:sleep ~should_exclude
=
let sync_table = Table.create (module String) 64 in
let sync_table = create_sync_table () in
let t = Fswatch_win.create () in
Fswatch_win.add t (Path.to_absolute_filename Path.root);
let (_ : Thread.t) =
Expand Down Expand Up @@ -692,4 +715,4 @@ let add_watch t path =
Ok ()))
;;

let emit_sync = Fs_sync.emit
let emit_sync t = Fs_sync.emit t.sync_table
7 changes: 3 additions & 4 deletions src/dune_scheduler/file_watcher.mli
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,9 @@ end
module Scheduler : sig
(** Hook into the fiber scheduler. *)
type t =
{ thread_safe_send_emit_events_job : (unit -> Event.t list) -> unit
(** Send some events to the scheduler. The events are sent in the form
of a thunk to be executed on the scheduler thread, so that we can
do some bookkeeping that needs to happen there. *)
{ thread_safe_send_events : Event.t list -> unit
(** Send some events to the scheduler. This function must be safe to call
from file watcher threads. *)
}
end

Expand Down
4 changes: 2 additions & 2 deletions src/dune_scheduler/inotify.ml
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ let pump_events t =
()
;;

let create ~modify_event_selector ~emit_events =
let create ~mutex ~modify_event_selector ~emit_events =
let fd = Inotify.create () |> Fd.unsafe_of_unix_file_descr in
let watch_table = Table.create (module Inotify_watch) 10 in
let modify_selector : Inotify.selector =
Expand All @@ -167,7 +167,7 @@ let create ~modify_event_selector ~emit_events =
in
let t =
{ fd
; mutex = Mutex.create ()
; mutex
; watch_table
; select_events =
[ S_Create; S_Delete; modify_selector; S_Move_self; S_Moved_from; S_Moved_to ]
Expand Down
Loading
Loading