Skip to content

Commit

Permalink
core: more types (#1061)
Browse files Browse the repository at this point in the history
  • Loading branch information
BoboTiG authored Aug 26, 2024
1 parent 6847b0e commit 516d4ac
Show file tree
Hide file tree
Showing 15 changed files with 117 additions and 109 deletions.
2 changes: 1 addition & 1 deletion src/watchdog/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -451,7 +451,7 @@ def dispatch(self, event: FileSystemEvent) -> None:
class LoggingEventHandler(FileSystemEventHandler):
"""Logs all the events captured."""

def __init__(self, logger: logging.Logger | None = None) -> None:
def __init__(self, *, logger: logging.Logger | None = None) -> None:
super().__init__()
self.logger = logger or logging.root

Expand Down
2 changes: 1 addition & 1 deletion src/watchdog/observers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@


class ObserverType(Protocol):
def __call__(self, *, timeout: int = ...) -> BaseObserver: ...
def __call__(self, *, timeout: float = ...) -> BaseObserver: ...


def _get_observer_cls() -> ObserverType:
Expand Down
32 changes: 15 additions & 17 deletions src/watchdog/observers/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import contextlib
import queue
import threading
from collections import defaultdict
from pathlib import Path
from typing import TYPE_CHECKING

Expand All @@ -12,8 +13,8 @@
if TYPE_CHECKING:
from watchdog.events import FileSystemEvent, FileSystemEventHandler

DEFAULT_EMITTER_TIMEOUT = 1 # in seconds.
DEFAULT_OBSERVER_TIMEOUT = 1 # in seconds.
DEFAULT_EMITTER_TIMEOUT = 1.0 # in seconds
DEFAULT_OBSERVER_TIMEOUT = 1.0 # in seconds


class EventQueue(SkipRepeatsQueue):
Expand Down Expand Up @@ -110,7 +111,7 @@ def __init__(
event_queue: EventQueue,
watch: ObservedWatch,
*,
timeout: int = DEFAULT_EMITTER_TIMEOUT,
timeout: float = DEFAULT_EMITTER_TIMEOUT,
event_filter: list[type[FileSystemEvent]] | None = None,
) -> None:
super().__init__()
Expand All @@ -120,7 +121,7 @@ def __init__(
self._event_filter = frozenset(event_filter) if event_filter is not None else None

@property
def timeout(self) -> int:
def timeout(self) -> float:
"""Blocking timeout for reading events."""
return self._timeout

Expand All @@ -141,7 +142,7 @@ def queue_event(self, event: FileSystemEvent) -> None:
if self._event_filter is None or any(isinstance(event, cls) for cls in self._event_filter):
self._event_queue.put((event, self.watch))

def queue_events(self, timeout: int) -> None:
def queue_events(self, timeout: float) -> None:
"""Override this method to populate the event queue with events
per interval period.
Expand Down Expand Up @@ -171,13 +172,13 @@ class EventDispatcher(BaseThread):
stop_event = object()
"""Event inserted into the queue to signal a requested stop."""

def __init__(self, *, timeout: int = DEFAULT_OBSERVER_TIMEOUT) -> None:
def __init__(self, *, timeout: float = DEFAULT_OBSERVER_TIMEOUT) -> None:
super().__init__()
self._event_queue = EventQueue()
self._timeout = timeout

@property
def timeout(self) -> int:
def timeout(self) -> float:
"""Timeout value to construct emitters with."""
return self._timeout

Expand Down Expand Up @@ -217,12 +218,12 @@ def run(self) -> None:
class BaseObserver(EventDispatcher):
"""Base observer."""

def __init__(self, emitter_class: type[EventEmitter], *, timeout: int = DEFAULT_OBSERVER_TIMEOUT) -> None:
def __init__(self, emitter_class: type[EventEmitter], *, timeout: float = DEFAULT_OBSERVER_TIMEOUT) -> None:
super().__init__(timeout=timeout)
self._emitter_class = emitter_class
self._lock = threading.RLock()
self._watches: set[ObservedWatch] = set()
self._handlers: dict[ObservedWatch, set[FileSystemEventHandler]] = {}
self._handlers: defaultdict[ObservedWatch, set[FileSystemEventHandler]] = defaultdict(set)
self._emitters: set[EventEmitter] = set()
self._emitter_for_watch: dict[ObservedWatch, EventEmitter] = {}

Expand All @@ -247,8 +248,6 @@ def _clear_emitters(self) -> None:
self._emitter_for_watch.clear()

def _add_handler_for_watch(self, event_handler: FileSystemEventHandler, watch: ObservedWatch) -> None:
if watch not in self._handlers:
self._handlers[watch] = set()
self._handlers[watch].add(event_handler)

def _remove_handlers_for_watch(self, watch: ObservedWatch) -> None:
Expand Down Expand Up @@ -307,7 +306,7 @@ def schedule(
self._add_handler_for_watch(event_handler, watch)

# If we don't have an emitter for this watch already, create it.
if self._emitter_for_watch.get(watch) is None:
if watch not in self._emitter_for_watch:
emitter = self._emitter_class(self.event_queue, watch, timeout=self.timeout, event_filter=event_filter)
if self.is_alive():
emitter.start()
Expand Down Expand Up @@ -367,9 +366,7 @@ def unschedule(self, watch: ObservedWatch) -> None:
self._watches.remove(watch)

def unschedule_all(self) -> None:
"""Unschedules all watches and detaches all associated event
handlers.
"""
"""Unschedules all watches and detaches all associated event handlers."""
with self._lock:
self._handlers.clear()
self._clear_emitters()
Expand All @@ -382,13 +379,14 @@ def dispatch_events(self, event_queue: EventQueue) -> None:
entry = event_queue.get(block=True)
if entry is EventDispatcher.stop_event:
return

event, watch = entry

with self._lock:
# To allow unschedule/stop and safe removal of event handlers
# within event handlers itself, check if the handler is still
# registered after every dispatch.
for handler in list(self._handlers.get(watch, [])):
if handler in self._handlers.get(watch, []):
for handler in self._handlers[watch].copy():
if handler in self._handlers[watch]:
handler.dispatch(event)
event_queue.task_done()
6 changes: 3 additions & 3 deletions src/watchdog/observers/fsevents.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ def __init__(
event_queue: EventQueue,
watch: ObservedWatch,
*,
timeout: int = DEFAULT_EMITTER_TIMEOUT,
timeout: float = DEFAULT_EMITTER_TIMEOUT,
event_filter: list[type[FileSystemEvent]] | None = None,
suppress_history: bool = False,
) -> None:
Expand Down Expand Up @@ -156,7 +156,7 @@ def _is_meta_mod(event: _fsevents.NativeEvent) -> bool:
"""Returns True if the event indicates a change in metadata."""
return event.is_inode_meta_mod or event.is_xattr_mod or event.is_owner_change

def queue_events(self, timeout: int, events: list[_fsevents.NativeEvent]) -> None: # type: ignore[override]
def queue_events(self, timeout: float, events: list[_fsevents.NativeEvent]) -> None: # type: ignore[override]
if logger.getEffectiveLevel() <= logging.DEBUG:
for event in events:
flags = ", ".join(attr for attr in dir(event) if getattr(event, attr) is True)
Expand Down Expand Up @@ -320,7 +320,7 @@ def _encode_path(self, path: bytes | str) -> bytes | str:


class FSEventsObserver(BaseObserver):
def __init__(self, *, timeout: int = DEFAULT_OBSERVER_TIMEOUT) -> None:
def __init__(self, *, timeout: float = DEFAULT_OBSERVER_TIMEOUT) -> None:
super().__init__(FSEventsEmitter, timeout=timeout)

def schedule(
Expand Down
6 changes: 3 additions & 3 deletions src/watchdog/observers/fsevents2.py
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ def __init__(
event_queue: EventQueue,
watch: ObservedWatch,
*,
timeout: int = DEFAULT_EMITTER_TIMEOUT,
timeout: float = DEFAULT_EMITTER_TIMEOUT,
event_filter: list[type[FileSystemEvent]] | None = None,
):
super().__init__(event_queue, watch, timeout=timeout, event_filter=event_filter)
Expand All @@ -197,7 +197,7 @@ def __init__(
def on_thread_stop(self) -> None:
self._fsevents.stop()

def queue_events(self, timeout: int) -> None:
def queue_events(self, timeout: float) -> None:
events = self._fsevents.read_events()
if events is None:
return
Expand Down Expand Up @@ -249,5 +249,5 @@ def queue_events(self, timeout: int) -> None:


class FSEventsObserver2(BaseObserver):
def __init__(self, *, timeout: int = DEFAULT_OBSERVER_TIMEOUT) -> None:
def __init__(self, *, timeout: float = DEFAULT_OBSERVER_TIMEOUT) -> None:
super().__init__(FSEventsEmitter, timeout=timeout)
8 changes: 4 additions & 4 deletions src/watchdog/observers/inotify.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ def __init__(
event_queue: EventQueue,
watch: ObservedWatch,
*,
timeout: int = DEFAULT_EMITTER_TIMEOUT,
timeout: float = DEFAULT_EMITTER_TIMEOUT,
event_filter: list[type[FileSystemEvent]] | None = None,
) -> None:
super().__init__(event_queue, watch, timeout=timeout, event_filter=event_filter)
Expand All @@ -123,7 +123,7 @@ def on_thread_stop(self) -> None:
self._inotify.close()
self._inotify = None

def queue_events(self, timeout: int, *, full_events: bool = False) -> None:
def queue_events(self, timeout: float, *, full_events: bool = False) -> None:
# If "full_events" is true, then the method will report unmatched move events as separate events
# This behavior is by default only called by a InotifyFullEmitter
if self._inotify is None:
Expand Down Expand Up @@ -238,7 +238,7 @@ class InotifyFullEmitter(InotifyEmitter):
Such move events will have a ``None`` value for the unmatched part.
"""

def queue_events(self, timeout: int, *, events: bool = True) -> None: # type: ignore[override]
def queue_events(self, timeout: float, *, events: bool = True) -> None: # type: ignore[override]
super().queue_events(timeout, full_events=events)


Expand All @@ -247,6 +247,6 @@ class InotifyObserver(BaseObserver):
calls to event handlers.
"""

def __init__(self, *, timeout: int = DEFAULT_OBSERVER_TIMEOUT, generate_full_events: bool = False) -> None:
def __init__(self, *, timeout: float = DEFAULT_OBSERVER_TIMEOUT, generate_full_events: bool = False) -> None:
cls = InotifyFullEmitter if generate_full_events else InotifyEmitter
super().__init__(cls, timeout=timeout)
10 changes: 5 additions & 5 deletions src/watchdog/observers/kqueue.py
Original file line number Diff line number Diff line change
Expand Up @@ -402,9 +402,9 @@ def __init__(
event_queue: EventQueue,
watch: ObservedWatch,
*,
timeout: int = DEFAULT_EMITTER_TIMEOUT,
timeout: float = DEFAULT_EMITTER_TIMEOUT,
event_filter: list[type[FileSystemEvent]] | None = None,
stat: Callable = os.stat,
stat: Callable[[str], os.stat_result] = os.stat,
) -> None:
super().__init__(event_queue, watch, timeout=timeout, event_filter=event_filter)

Expand Down Expand Up @@ -590,7 +590,7 @@ def _gen_renamed_events(
yield FileDeletedEvent(src_path)
yield self._parent_dir_modified(src_path)

def _read_events(self, timeout: float | None = None) -> list[select.kevent]:
def _read_events(self, timeout: float) -> list[select.kevent]:
"""Reads events from a call to the blocking
:meth:`select.kqueue.control()` method.
Expand All @@ -601,7 +601,7 @@ def _read_events(self, timeout: float | None = None) -> list[select.kevent]:
"""
return self._kq.control(self._descriptors.kevents, MAX_EVENTS, timeout=timeout)

def queue_events(self, timeout: int) -> None:
def queue_events(self, timeout: float) -> None:
"""Queues events by reading them from a call to the blocking
:meth:`select.kqueue.control()` method.
Expand Down Expand Up @@ -651,5 +651,5 @@ class KqueueObserver(BaseObserver):
calls to event handlers.
"""

def __init__(self, *, timeout: int = DEFAULT_OBSERVER_TIMEOUT) -> None:
def __init__(self, *, timeout: float = DEFAULT_OBSERVER_TIMEOUT) -> None:
super().__init__(KqueueEmitter, timeout=timeout)
21 changes: 14 additions & 7 deletions src/watchdog/observers/polling.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
from watchdog.utils.dirsnapshot import DirectorySnapshot, DirectorySnapshotDiff, EmptyDirectorySnapshot

if TYPE_CHECKING:
from collections.abc import Iterator
from typing import Callable

from watchdog.events import FileSystemEvent
Expand All @@ -52,10 +53,10 @@ def __init__(
event_queue: EventQueue,
watch: ObservedWatch,
*,
timeout: int = DEFAULT_EMITTER_TIMEOUT,
timeout: float = DEFAULT_EMITTER_TIMEOUT,
event_filter: list[type[FileSystemEvent]] | None = None,
stat: Callable = os.stat,
listdir: Callable = os.scandir,
stat: Callable[[str], os.stat_result] = os.stat,
listdir: Callable[[str | None], Iterator[os.DirEntry]] = os.scandir,
) -> None:
super().__init__(event_queue, watch, timeout=timeout, event_filter=event_filter)
self._snapshot: DirectorySnapshot = EmptyDirectorySnapshot()
Expand All @@ -70,7 +71,7 @@ def __init__(
def on_thread_start(self) -> None:
self._snapshot = self._take_snapshot()

def queue_events(self, timeout: int) -> None:
def queue_events(self, timeout: float) -> None:
# We don't want to hit the disk continuously.
# timeout behaves like an interval for polling emitters.
if self.stopped_event.wait(timeout):
Expand Down Expand Up @@ -118,17 +119,23 @@ class PollingObserver(BaseObserver):
system changes.
"""

def __init__(self, *, timeout: int = DEFAULT_OBSERVER_TIMEOUT) -> None:
def __init__(self, *, timeout: float = DEFAULT_OBSERVER_TIMEOUT) -> None:
super().__init__(PollingEmitter, timeout=timeout)


class PollingObserverVFS(BaseObserver):
"""File system independent observer that polls a directory to detect changes."""

def __init__(self, stat: Callable, listdir: Callable, *, polling_interval: int = 1) -> None:
def __init__(
self,
stat: Callable[[str], os.stat_result],
listdir: Callable[[str | None], Iterator[os.DirEntry]],
*,
polling_interval: int = 1,
) -> None:
""":param stat: stat function. See ``os.stat`` for details.
:param listdir: listdir function. See ``os.scandir`` for details.
:type polling_interval: float
:type polling_interval: int
:param polling_interval: interval in seconds between polling the file system.
"""
emitter_cls = partial(PollingEmitter, stat=stat, listdir=listdir)
Expand Down
6 changes: 3 additions & 3 deletions src/watchdog/observers/read_directory_changes.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ def __init__(
event_queue: EventQueue,
watch: ObservedWatch,
*,
timeout: int = DEFAULT_EMITTER_TIMEOUT,
timeout: float = DEFAULT_EMITTER_TIMEOUT,
event_filter: list[type[FileSystemEvent]] | None = None,
) -> None:
super().__init__(event_queue, watch, timeout=timeout, event_filter=event_filter)
Expand Down Expand Up @@ -66,7 +66,7 @@ def _read_events(self) -> list[WinAPINativeEvent]:
return []
return read_events(self._whandle, self.watch.path, recursive=self.watch.is_recursive)

def queue_events(self, timeout: int) -> None:
def queue_events(self, timeout: float) -> None:
winapi_events = self._read_events()
with self._lock:
last_renamed_src_path = ""
Expand Down Expand Up @@ -107,5 +107,5 @@ class WindowsApiObserver(BaseObserver):
calls to event handlers.
"""

def __init__(self, *, timeout: int = DEFAULT_OBSERVER_TIMEOUT) -> None:
def __init__(self, *, timeout: float = DEFAULT_OBSERVER_TIMEOUT) -> None:
super().__init__(WindowsApiEmitter, timeout=timeout)
Loading

0 comments on commit 516d4ac

Please sign in to comment.