Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

core: more types #1061

Merged
merged 1 commit into from
Aug 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/watchdog/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -451,7 +451,7 @@ def dispatch(self, event: FileSystemEvent) -> None:
class LoggingEventHandler(FileSystemEventHandler):
"""Logs all the events captured."""

def __init__(self, logger: logging.Logger | None = None) -> None:
def __init__(self, *, logger: logging.Logger | None = None) -> None:
super().__init__()
self.logger = logger or logging.root

Expand Down
2 changes: 1 addition & 1 deletion src/watchdog/observers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@


class ObserverType(Protocol):
def __call__(self, *, timeout: int = ...) -> BaseObserver: ...
def __call__(self, *, timeout: float = ...) -> BaseObserver: ...


def _get_observer_cls() -> ObserverType:
Expand Down
32 changes: 15 additions & 17 deletions src/watchdog/observers/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import contextlib
import queue
import threading
from collections import defaultdict
from pathlib import Path
from typing import TYPE_CHECKING

Expand All @@ -12,8 +13,8 @@
if TYPE_CHECKING:
from watchdog.events import FileSystemEvent, FileSystemEventHandler

DEFAULT_EMITTER_TIMEOUT = 1 # in seconds.
DEFAULT_OBSERVER_TIMEOUT = 1 # in seconds.
DEFAULT_EMITTER_TIMEOUT = 1.0 # in seconds
DEFAULT_OBSERVER_TIMEOUT = 1.0 # in seconds


class EventQueue(SkipRepeatsQueue):
Expand Down Expand Up @@ -110,7 +111,7 @@ def __init__(
event_queue: EventQueue,
watch: ObservedWatch,
*,
timeout: int = DEFAULT_EMITTER_TIMEOUT,
timeout: float = DEFAULT_EMITTER_TIMEOUT,
event_filter: list[type[FileSystemEvent]] | None = None,
) -> None:
super().__init__()
Expand All @@ -120,7 +121,7 @@ def __init__(
self._event_filter = frozenset(event_filter) if event_filter is not None else None

@property
def timeout(self) -> int:
def timeout(self) -> float:
"""Blocking timeout for reading events."""
return self._timeout

Expand All @@ -141,7 +142,7 @@ def queue_event(self, event: FileSystemEvent) -> None:
if self._event_filter is None or any(isinstance(event, cls) for cls in self._event_filter):
self._event_queue.put((event, self.watch))

def queue_events(self, timeout: int) -> None:
def queue_events(self, timeout: float) -> None:
"""Override this method to populate the event queue with events
per interval period.

Expand Down Expand Up @@ -171,13 +172,13 @@ class EventDispatcher(BaseThread):
stop_event = object()
"""Event inserted into the queue to signal a requested stop."""

def __init__(self, *, timeout: int = DEFAULT_OBSERVER_TIMEOUT) -> None:
def __init__(self, *, timeout: float = DEFAULT_OBSERVER_TIMEOUT) -> None:
super().__init__()
self._event_queue = EventQueue()
self._timeout = timeout

@property
def timeout(self) -> int:
def timeout(self) -> float:
"""Timeout value to construct emitters with."""
return self._timeout

Expand Down Expand Up @@ -217,12 +218,12 @@ def run(self) -> None:
class BaseObserver(EventDispatcher):
"""Base observer."""

def __init__(self, emitter_class: type[EventEmitter], *, timeout: int = DEFAULT_OBSERVER_TIMEOUT) -> None:
def __init__(self, emitter_class: type[EventEmitter], *, timeout: float = DEFAULT_OBSERVER_TIMEOUT) -> None:
super().__init__(timeout=timeout)
self._emitter_class = emitter_class
self._lock = threading.RLock()
self._watches: set[ObservedWatch] = set()
self._handlers: dict[ObservedWatch, set[FileSystemEventHandler]] = {}
self._handlers: defaultdict[ObservedWatch, set[FileSystemEventHandler]] = defaultdict(set)
self._emitters: set[EventEmitter] = set()
self._emitter_for_watch: dict[ObservedWatch, EventEmitter] = {}

Expand All @@ -247,8 +248,6 @@ def _clear_emitters(self) -> None:
self._emitter_for_watch.clear()

def _add_handler_for_watch(self, event_handler: FileSystemEventHandler, watch: ObservedWatch) -> None:
if watch not in self._handlers:
self._handlers[watch] = set()
self._handlers[watch].add(event_handler)

def _remove_handlers_for_watch(self, watch: ObservedWatch) -> None:
Expand Down Expand Up @@ -307,7 +306,7 @@ def schedule(
self._add_handler_for_watch(event_handler, watch)

# If we don't have an emitter for this watch already, create it.
if self._emitter_for_watch.get(watch) is None:
if watch not in self._emitter_for_watch:
emitter = self._emitter_class(self.event_queue, watch, timeout=self.timeout, event_filter=event_filter)
if self.is_alive():
emitter.start()
Expand Down Expand Up @@ -367,9 +366,7 @@ def unschedule(self, watch: ObservedWatch) -> None:
self._watches.remove(watch)

def unschedule_all(self) -> None:
"""Unschedules all watches and detaches all associated event
handlers.
"""
"""Unschedules all watches and detaches all associated event handlers."""
with self._lock:
self._handlers.clear()
self._clear_emitters()
Expand All @@ -382,13 +379,14 @@ def dispatch_events(self, event_queue: EventQueue) -> None:
entry = event_queue.get(block=True)
if entry is EventDispatcher.stop_event:
return

event, watch = entry

with self._lock:
# To allow unschedule/stop and safe removal of event handlers
# within event handlers itself, check if the handler is still
# registered after every dispatch.
for handler in list(self._handlers.get(watch, [])):
if handler in self._handlers.get(watch, []):
for handler in self._handlers[watch].copy():
if handler in self._handlers[watch]:
handler.dispatch(event)
event_queue.task_done()
6 changes: 3 additions & 3 deletions src/watchdog/observers/fsevents.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ def __init__(
event_queue: EventQueue,
watch: ObservedWatch,
*,
timeout: int = DEFAULT_EMITTER_TIMEOUT,
timeout: float = DEFAULT_EMITTER_TIMEOUT,
event_filter: list[type[FileSystemEvent]] | None = None,
suppress_history: bool = False,
) -> None:
Expand Down Expand Up @@ -156,7 +156,7 @@ def _is_meta_mod(event: _fsevents.NativeEvent) -> bool:
"""Returns True if the event indicates a change in metadata."""
return event.is_inode_meta_mod or event.is_xattr_mod or event.is_owner_change

def queue_events(self, timeout: int, events: list[_fsevents.NativeEvent]) -> None: # type: ignore[override]
def queue_events(self, timeout: float, events: list[_fsevents.NativeEvent]) -> None: # type: ignore[override]
if logger.getEffectiveLevel() <= logging.DEBUG:
for event in events:
flags = ", ".join(attr for attr in dir(event) if getattr(event, attr) is True)
Expand Down Expand Up @@ -320,7 +320,7 @@ def _encode_path(self, path: bytes | str) -> bytes | str:


class FSEventsObserver(BaseObserver):
def __init__(self, *, timeout: int = DEFAULT_OBSERVER_TIMEOUT) -> None:
def __init__(self, *, timeout: float = DEFAULT_OBSERVER_TIMEOUT) -> None:
super().__init__(FSEventsEmitter, timeout=timeout)

def schedule(
Expand Down
6 changes: 3 additions & 3 deletions src/watchdog/observers/fsevents2.py
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ def __init__(
event_queue: EventQueue,
watch: ObservedWatch,
*,
timeout: int = DEFAULT_EMITTER_TIMEOUT,
timeout: float = DEFAULT_EMITTER_TIMEOUT,
event_filter: list[type[FileSystemEvent]] | None = None,
):
super().__init__(event_queue, watch, timeout=timeout, event_filter=event_filter)
Expand All @@ -197,7 +197,7 @@ def __init__(
def on_thread_stop(self) -> None:
self._fsevents.stop()

def queue_events(self, timeout: int) -> None:
def queue_events(self, timeout: float) -> None:
events = self._fsevents.read_events()
if events is None:
return
Expand Down Expand Up @@ -249,5 +249,5 @@ def queue_events(self, timeout: int) -> None:


class FSEventsObserver2(BaseObserver):
def __init__(self, *, timeout: int = DEFAULT_OBSERVER_TIMEOUT) -> None:
def __init__(self, *, timeout: float = DEFAULT_OBSERVER_TIMEOUT) -> None:
super().__init__(FSEventsEmitter, timeout=timeout)
8 changes: 4 additions & 4 deletions src/watchdog/observers/inotify.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ def __init__(
event_queue: EventQueue,
watch: ObservedWatch,
*,
timeout: int = DEFAULT_EMITTER_TIMEOUT,
timeout: float = DEFAULT_EMITTER_TIMEOUT,
event_filter: list[type[FileSystemEvent]] | None = None,
) -> None:
super().__init__(event_queue, watch, timeout=timeout, event_filter=event_filter)
Expand All @@ -123,7 +123,7 @@ def on_thread_stop(self) -> None:
self._inotify.close()
self._inotify = None

def queue_events(self, timeout: int, *, full_events: bool = False) -> None:
def queue_events(self, timeout: float, *, full_events: bool = False) -> None:
# If "full_events" is true, then the method will report unmatched move events as separate events
# This behavior is by default only called by a InotifyFullEmitter
if self._inotify is None:
Expand Down Expand Up @@ -238,7 +238,7 @@ class InotifyFullEmitter(InotifyEmitter):
Such move events will have a ``None`` value for the unmatched part.
"""

def queue_events(self, timeout: int, *, events: bool = True) -> None: # type: ignore[override]
def queue_events(self, timeout: float, *, events: bool = True) -> None: # type: ignore[override]
super().queue_events(timeout, full_events=events)


Expand All @@ -247,6 +247,6 @@ class InotifyObserver(BaseObserver):
calls to event handlers.
"""

def __init__(self, *, timeout: int = DEFAULT_OBSERVER_TIMEOUT, generate_full_events: bool = False) -> None:
def __init__(self, *, timeout: float = DEFAULT_OBSERVER_TIMEOUT, generate_full_events: bool = False) -> None:
cls = InotifyFullEmitter if generate_full_events else InotifyEmitter
super().__init__(cls, timeout=timeout)
10 changes: 5 additions & 5 deletions src/watchdog/observers/kqueue.py
Original file line number Diff line number Diff line change
Expand Up @@ -402,9 +402,9 @@ def __init__(
event_queue: EventQueue,
watch: ObservedWatch,
*,
timeout: int = DEFAULT_EMITTER_TIMEOUT,
timeout: float = DEFAULT_EMITTER_TIMEOUT,
event_filter: list[type[FileSystemEvent]] | None = None,
stat: Callable = os.stat,
stat: Callable[[str], os.stat_result] = os.stat,
) -> None:
super().__init__(event_queue, watch, timeout=timeout, event_filter=event_filter)

Expand Down Expand Up @@ -590,7 +590,7 @@ def _gen_renamed_events(
yield FileDeletedEvent(src_path)
yield self._parent_dir_modified(src_path)

def _read_events(self, timeout: float | None = None) -> list[select.kevent]:
def _read_events(self, timeout: float) -> list[select.kevent]:
"""Reads events from a call to the blocking
:meth:`select.kqueue.control()` method.

Expand All @@ -601,7 +601,7 @@ def _read_events(self, timeout: float | None = None) -> list[select.kevent]:
"""
return self._kq.control(self._descriptors.kevents, MAX_EVENTS, timeout=timeout)

def queue_events(self, timeout: int) -> None:
def queue_events(self, timeout: float) -> None:
"""Queues events by reading them from a call to the blocking
:meth:`select.kqueue.control()` method.

Expand Down Expand Up @@ -651,5 +651,5 @@ class KqueueObserver(BaseObserver):
calls to event handlers.
"""

def __init__(self, *, timeout: int = DEFAULT_OBSERVER_TIMEOUT) -> None:
def __init__(self, *, timeout: float = DEFAULT_OBSERVER_TIMEOUT) -> None:
super().__init__(KqueueEmitter, timeout=timeout)
21 changes: 14 additions & 7 deletions src/watchdog/observers/polling.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
from watchdog.utils.dirsnapshot import DirectorySnapshot, DirectorySnapshotDiff, EmptyDirectorySnapshot

if TYPE_CHECKING:
from collections.abc import Iterator
from typing import Callable

from watchdog.events import FileSystemEvent
Expand All @@ -52,10 +53,10 @@ def __init__(
event_queue: EventQueue,
watch: ObservedWatch,
*,
timeout: int = DEFAULT_EMITTER_TIMEOUT,
timeout: float = DEFAULT_EMITTER_TIMEOUT,
event_filter: list[type[FileSystemEvent]] | None = None,
stat: Callable = os.stat,
listdir: Callable = os.scandir,
stat: Callable[[str], os.stat_result] = os.stat,
listdir: Callable[[str | None], Iterator[os.DirEntry]] = os.scandir,
) -> None:
super().__init__(event_queue, watch, timeout=timeout, event_filter=event_filter)
self._snapshot: DirectorySnapshot = EmptyDirectorySnapshot()
Expand All @@ -70,7 +71,7 @@ def __init__(
def on_thread_start(self) -> None:
self._snapshot = self._take_snapshot()

def queue_events(self, timeout: int) -> None:
def queue_events(self, timeout: float) -> None:
# We don't want to hit the disk continuously.
# timeout behaves like an interval for polling emitters.
if self.stopped_event.wait(timeout):
Expand Down Expand Up @@ -118,17 +119,23 @@ class PollingObserver(BaseObserver):
system changes.
"""

def __init__(self, *, timeout: int = DEFAULT_OBSERVER_TIMEOUT) -> None:
def __init__(self, *, timeout: float = DEFAULT_OBSERVER_TIMEOUT) -> None:
super().__init__(PollingEmitter, timeout=timeout)


class PollingObserverVFS(BaseObserver):
"""File system independent observer that polls a directory to detect changes."""

def __init__(self, stat: Callable, listdir: Callable, *, polling_interval: int = 1) -> None:
def __init__(
self,
stat: Callable[[str], os.stat_result],
listdir: Callable[[str | None], Iterator[os.DirEntry]],
*,
polling_interval: int = 1,
) -> None:
""":param stat: stat function. See ``os.stat`` for details.
:param listdir: listdir function. See ``os.scandir`` for details.
:type polling_interval: float
:type polling_interval: int
:param polling_interval: interval in seconds between polling the file system.
"""
emitter_cls = partial(PollingEmitter, stat=stat, listdir=listdir)
Expand Down
6 changes: 3 additions & 3 deletions src/watchdog/observers/read_directory_changes.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ def __init__(
event_queue: EventQueue,
watch: ObservedWatch,
*,
timeout: int = DEFAULT_EMITTER_TIMEOUT,
timeout: float = DEFAULT_EMITTER_TIMEOUT,
event_filter: list[type[FileSystemEvent]] | None = None,
) -> None:
super().__init__(event_queue, watch, timeout=timeout, event_filter=event_filter)
Expand Down Expand Up @@ -66,7 +66,7 @@ def _read_events(self) -> list[WinAPINativeEvent]:
return []
return read_events(self._whandle, self.watch.path, recursive=self.watch.is_recursive)

def queue_events(self, timeout: int) -> None:
def queue_events(self, timeout: float) -> None:
winapi_events = self._read_events()
with self._lock:
last_renamed_src_path = ""
Expand Down Expand Up @@ -107,5 +107,5 @@ class WindowsApiObserver(BaseObserver):
calls to event handlers.
"""

def __init__(self, *, timeout: int = DEFAULT_OBSERVER_TIMEOUT) -> None:
def __init__(self, *, timeout: float = DEFAULT_OBSERVER_TIMEOUT) -> None:
super().__init__(WindowsApiEmitter, timeout=timeout)
Loading