Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/dune_engine/fs_memo.ml
Original file line number Diff line number Diff line change
Expand Up @@ -999,7 +999,7 @@ let invalidate_path_and_its_parent path =
- Finally, the result of [dir_contents] queries can be updated without
calling [Path.Untracked.readdir_unsorted_with_kinds]: we know which file or
directory should be added to or removed from the result. *)
let handle_fs_event ({ kind; path } : Dune_scheduler.File_watcher.Fs_memo_event.t)
let handle_fs_event ({ kind; path } : Dune_scheduler.Event.Fs_memo_event.t)
: Memo.Invalidation.t
=
match Path.destruct_build_dir path with
Expand Down
1 change: 1 addition & 0 deletions src/dune_scheduler/dune_scheduler.ml
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
module Scheduler = Scheduler
module Async_io = Async_io
module Event = Event
module File_watcher = File_watcher
module Shutdown = Shutdown

Expand Down
45 changes: 42 additions & 3 deletions src/dune_scheduler/event.ml
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,52 @@ let dyn_of_job { pid; is_process_group_leader; ivar } =
]
;;

module Fs_memo_event = struct
type kind =
| Created
| Deleted
| File_changed
| Unknown

let dyn_of_kind kind =
Dyn.string
(match kind with
| Created -> "Created"
| Deleted -> "Deleted"
| File_changed -> "File_changed"
| Unknown -> "Unknown")
;;

type t =
{ path : Path.t
; kind : kind
}

let to_dyn { path; kind } =
let open Dyn in
record [ "path", Path.to_dyn path; "kind", dyn_of_kind kind ]
;;

let create ~kind ~path = { path; kind }
end

module Sync_id = Id.Make ()

module File_watcher_event = struct
type t =
| Fs_memo_event of Fs_memo_event.t
| Queue_overflow
| Sync of Sync_id.t
| Watcher_terminated
end

type build_input_change =
| Fs_event of File_watcher.Fs_memo_event.t
| Fs_event of Fs_memo_event.t
| Invalidation of Memo.Invalidation.t

type t =
| Build_inputs_changed of build_input_change Nonempty_list.t
| File_system_sync of File_watcher.Sync_id.t
| File_system_sync of Sync_id.t
| File_system_watcher_terminated
| Shutdown of Shutdown.Reason.t
| Fiber_fill_ivar of Fiber.fill
Expand All @@ -29,7 +68,7 @@ type t =
module Invalidation_event = struct
type t =
| Invalidation of Memo.Invalidation.t
| Filesystem_event of File_watcher.Event.t
| Filesystem_event of File_watcher_event.t
end

module Queue = struct
Expand Down
40 changes: 37 additions & 3 deletions src/dune_scheduler/event.mli
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,47 @@ type job =

val dyn_of_job : job -> Dyn.t

module Fs_memo_event : sig
type kind =
| Created
| Deleted
| File_changed
| Unknown

type t = private
{ path : Path.t
; kind : kind
}

val create : kind:kind -> path:Path.t -> t
val to_dyn : t -> Dyn.t
end

module Sync_id : sig
type t

val equal : t -> t -> bool
val hash : t -> int
val gen : unit -> t
val to_int : t -> int
val to_dyn : t -> Dyn.t
end

module File_watcher_event : sig
type t =
| Fs_memo_event of Fs_memo_event.t
| Queue_overflow
| Sync of Sync_id.t
| Watcher_terminated
end

type build_input_change =
| Fs_event of File_watcher.Fs_memo_event.t
| Fs_event of Fs_memo_event.t
| Invalidation of Memo.Invalidation.t

type t =
| Build_inputs_changed of build_input_change Nonempty_list.t
| File_system_sync of File_watcher.Sync_id.t
| File_system_sync of Sync_id.t
| File_system_watcher_terminated
| Shutdown of Shutdown.Reason.t
| Fiber_fill_ivar of Fiber.fill
Expand Down Expand Up @@ -47,7 +81,7 @@ module Queue : sig
val send_worker_tasks_completed : t -> Fiber.fill list -> unit
val register_worker_task_started : t -> unit
val cancel_work_task_started : t -> unit
val send_file_watcher_events : t -> File_watcher.Event.t list -> unit
val send_file_watcher_events : t -> File_watcher_event.t list -> unit
val send_invalidation_event : t -> Memo.Invalidation.t -> unit
val send_job_completed : t -> job -> Proc.Process_info.t -> unit
val send_job_completed_ready : t -> unit
Expand Down
111 changes: 37 additions & 74 deletions src/dune_scheduler/file_watcher.ml
Original file line number Diff line number Diff line change
@@ -1,51 +1,8 @@
open Import

module Fs_memo_event = struct
type kind =
| Created
| Deleted
| File_changed
| Unknown (** Treated conservatively as any possible event. *)

let dyn_of_kind kind =
Dyn.string
(match kind with
| Created -> "Created"
| Deleted -> "Deleted"
| File_changed -> "File_changed"
| Unknown -> "Unknown")
;;

type t =
{ path : Path.t
; kind : kind
}

let to_dyn { path; kind } =
let open Dyn in
record [ "path", Path.to_dyn path; "kind", dyn_of_kind kind ]
;;

let create ~kind ~path =
(match Path.as_in_build_dir path with
| None -> ()
| Some dir ->
Code_error.raise
"Fs_memo.Event.create called on a build path"
[ "path", Path.Build.to_dyn dir ]);
{ path; kind }
;;
end

module Sync_id = Id.Make ()

module Event = struct
type t =
| Fs_memo_event of Fs_memo_event.t
| Queue_overflow
| Sync of Sync_id.t
| Watcher_terminated
end
module Scheduler_event = Event
module Fs_memo_event = Scheduler_event.Fs_memo_event
module Sync_id = Scheduler_event.Sync_id
module Event = Scheduler_event.File_watcher_event

module Watch_trie : sig
(** Specialized trie for fsevent watches *)
Expand Down Expand Up @@ -128,7 +85,7 @@ type kind =
| Fsevents of
{ mutable external_ : Fsevents.t Watch_trie.t
; dispatch_queue : Fsevents.Dispatch_queue.t
; send_events : Event.t list -> unit
; event_queue : Scheduler_event.Queue.t
; source : Fsevents.t
; sync : Fsevents.t
; latency : Time.Span.t
Expand Down Expand Up @@ -204,6 +161,12 @@ let trace_and_send_events send_events events =
send_events events
;;

let send_events event_queue events =
trace_and_send_events
(Scheduler_event.Queue.send_file_watcher_events event_queue)
events
;;

let shutdown t =
match t.kind with
| Fswatch { pid; _ } -> `Kill pid
Expand Down Expand Up @@ -381,7 +344,7 @@ let spawn_external_watcher ~backend ~watch_exclusions =
Unix.in_channel_of_descr r_stdout, pid
;;

let create_inotifylib_watcher ~sync_table ~send_events should_exclude =
let create_inotifylib_watcher ~sync_table ~event_queue should_exclude =
Inotify.create
~mutex:sync_table.mutex
~modify_event_selector:`Closed_writable_fd
Expand All @@ -403,10 +366,10 @@ let create_inotifylib_watcher ~sync_table ~send_events should_exclude =
| None -> []
| Some id -> [ Event.Sync id ]))
in
trace_and_send_events send_events events)
send_events event_queue events)
;;

let create_external_fswatch ~send_events ~backend ~watch_exclusions =
let create_external_fswatch ~event_queue ~backend ~watch_exclusions =
let debounce_interval = Time.Span.of_secs 0.5 in
let jobs = ref [] in
let event_mtx = Mutex.create () in
Expand Down Expand Up @@ -465,7 +428,7 @@ let create_external_fswatch ~send_events ~backend ~watch_exclusions =
jobs := [];
jobs_batch)
in
trace_and_send_events send_events (List.concat jobs_batch);
send_events event_queue (List.concat jobs_batch);
Thread.delay (Time.Span.to_secs debounce_interval);
buffer_thread ()
in
Expand All @@ -474,15 +437,15 @@ let create_external_fswatch ~send_events ~backend ~watch_exclusions =
res
;;

let create_inotifylib ~send_events ~should_exclude =
let create_inotifylib ~event_queue ~should_exclude =
prepare_sync ();
let sync_table = create_sync_table () in
let inotify = create_inotifylib_watcher ~sync_table ~send_events should_exclude in
let inotify = create_inotifylib_watcher ~sync_table ~event_queue should_exclude in
Inotify.add inotify (Lazy.force Fs_sync.special_dir);
{ kind = Inotify inotify; sync_table }
;;

let fsevents_callback ?exclusion_paths send_events ~f events =
let fsevents_callback ?exclusion_paths event_queue ~f events =
let skip_path =
(* excluding a [path] will exclude children under [path] but not [path]
itself. Hence we need to skip [path] manually *)
Expand All @@ -497,13 +460,13 @@ let fsevents_callback ?exclusion_paths send_events ~f events =
in
if skip_path path then None else f event path)
in
trace_and_send_events send_events events
send_events event_queue events
;;

let fsevents ?exclusion_paths ~latency ~paths send_events f =
let fsevents ?exclusion_paths ~latency ~paths event_queue f =
let fsevents =
let paths = List.map paths ~f:Path.to_absolute_filename in
Fsevents.create ~latency ~paths ~f:(fsevents_callback ?exclusion_paths send_events ~f)
Fsevents.create ~latency ~paths ~f:(fsevents_callback ?exclusion_paths event_queue ~f)
in
Option.iter exclusion_paths ~f:(fun paths ->
let paths = List.rev_map paths ~f:Path.to_absolute_filename in
Expand All @@ -522,10 +485,10 @@ let fsevents_standard_event ~should_exclude event path =
| Remove -> Deleted
| Modify -> if Fsevents.Event.kind event = File then File_changed else Unknown
in
Some (Event.Fs_memo_event { Fs_memo_event.kind; path }))
Some (Event.Fs_memo_event (Fs_memo_event.create ~kind ~path)))
;;

let create_fsevents ?(latency = Time.Span.of_secs 0.2) ~send_events ~should_exclude () =
let create_fsevents ?(latency = Time.Span.of_secs 0.2) ~event_queue ~should_exclude () =
prepare_sync ();
let sync_table = create_sync_table () in
let sync =
Expand All @@ -534,7 +497,7 @@ let create_fsevents ?(latency = Time.Span.of_secs 0.2) ~send_events ~should_excl
fsevents
~latency
~paths:[ Path.build (Lazy.force Fs_sync.special_dir_path) ]
send_events
event_queue
(fun event localized_path ->
let path = Fsevents.Event.path event in
if not (Fs_sync.is_special_file_fsevents localized_path)
Expand All @@ -554,7 +517,7 @@ let create_fsevents ?(latency = Time.Span.of_secs 0.2) ~send_events ~should_excl
:: ([ "_esy"; "_opam"; ".git"; ".hg" ]
|> List.rev_map ~f:(Path.relative (Path.source Path.Source.root)))
in
fsevents ~latency send_events ~exclusion_paths ~paths on_event
fsevents ~latency event_queue ~exclusion_paths ~paths on_event
in
let cv = Condition.create () in
let dispatch_queue_ref = ref None in
Expand All @@ -580,12 +543,12 @@ let create_fsevents ?(latency = Time.Span.of_secs 0.2) ~send_events ~should_excl
Option.value_exn !dispatch_queue_ref
in
{ kind =
Fsevents { latency; send_events; sync; source; external_; dispatch_queue; on_event }
Fsevents { latency; event_queue; sync; source; external_; dispatch_queue; on_event }
; sync_table
}
;;

let fswatch_win_callback ~send_events ~sync_table ~should_exclude event =
let fswatch_win_callback ~event_queue ~sync_table ~should_exclude event =
let filename =
let dir = Fswatch_win.Event.directory event in
Filename.concat dir (Fswatch_win.Event.path event)
Expand All @@ -599,7 +562,7 @@ let fswatch_win_callback ~send_events ~sync_table ~should_exclude event =
| Added | Modified ->
(match Fs_sync.consume_event sync_table filename with
| None -> ()
| Some id -> trace_and_send_events send_events [ Event.Sync id ])
| Some id -> send_events event_queue [ Event.Sync id ])
| Removed | Renamed_new | Renamed_old -> ())
| path ->
let normalized_filename =
Expand All @@ -615,10 +578,10 @@ let fswatch_win_callback ~send_events ~sync_table ~should_exclude event =
| Removed | Renamed_old -> Deleted
| Modified -> File_changed
in
trace_and_send_events send_events [ Event.Fs_memo_event { kind; path } ])
send_events event_queue [ Event.Fs_memo_event (Fs_memo_event.create ~kind ~path) ])
;;

let create_fswatch_win ~send_events ~debounce_interval:sleep ~should_exclude =
let create_fswatch_win ~event_queue ~debounce_interval:sleep ~should_exclude =
prepare_sync ();
let sync_table = create_sync_table () in
let t = Fswatch_win.create () in
Expand All @@ -628,24 +591,24 @@ let create_fswatch_win ~send_events ~debounce_interval:sleep ~should_exclude =
while true do
let events = Fswatch_win.wait t ~sleep in
List.iter
~f:(fswatch_win_callback ~send_events ~sync_table ~should_exclude)
~f:(fswatch_win_callback ~event_queue ~sync_table ~should_exclude)
events
done)
in
{ kind = Fswatch_win { t }; sync_table }
;;

let create_default ?fsevents_debounce ~watch_exclusions ~send_events () =
let create_default ?fsevents_debounce ~watch_exclusions ~event_queue () =
let should_exclude = create_should_exclude_predicate ~watch_exclusions in
match select_watcher_backend () with
| `Fswatch _ as backend ->
create_external_fswatch ~send_events ~backend ~watch_exclusions
create_external_fswatch ~event_queue ~backend ~watch_exclusions
| `Fsevents ->
create_fsevents ?latency:fsevents_debounce ~send_events ~should_exclude ()
| `Inotify_lib -> create_inotifylib ~send_events ~should_exclude
create_fsevents ?latency:fsevents_debounce ~event_queue ~should_exclude ()
| `Inotify_lib -> create_inotifylib ~event_queue ~should_exclude
| `Fswatch_win ->
create_fswatch_win
~send_events
~event_queue
~should_exclude
~debounce_interval:500 (* milliseconds *)
;;
Expand Down Expand Up @@ -705,7 +668,7 @@ let add_watch t path =
lazy
(fsevents
~latency:f.latency
f.send_events
f.event_queue
~paths:[ Path.external_ ext ]
f.on_event)
in
Expand Down
Loading
Loading