import json
import logging
import os
import re
import time
import types
from enum import IntEnum
logger = logging.getLogger(__name__)
[docs]class Change(IntEnum):
"""Simple Enum representing the different types of changes to a file."""
ADDED = 1
MODIFIED = 2
DELETED = 3
[docs]class Watcher:
"""Class that starts watching your files."""
_directory = os.getcwd() # The root directory to watch.
_recursive = False
_polling_interval = 1000 # The time between polls, in milliseconds.
_file_pattern = r".+" # The file pattern to match entries that are files.
_directory_pattern = r".+" # The directory pattern to match entries that are directories.
_cache = False # If True, we create a '.grigori' file which stores the files on shutdown.
_callback_added = None
_callback_modified = None
_callback_deleted = None
_files = {} # Contains the current file list after each poll.
def __init__(self, directory: str, recursive: bool = _recursive, polling_interval: int = _polling_interval,
file_pattern: str = _file_pattern, directory_pattern: str = _directory_pattern, cache: bool = _cache):
self._directory = directory
self._recursive = recursive
self._polling_interval = polling_interval
self._file_pattern = file_pattern
self._directory_pattern = directory_pattern
self._cache = cache
# Check if we should use cache.
if self._cache:
self._load_cache()
def _load_cache(self) -> None:
"""Load a list of files from the cache file."""
cache_file = os.path.join(self._directory, ".grigori")
if not os.path.exists(self._directory):
self._cache = False
logger.warning("turned off caching for this session because the directory: '" + self._directory +
"' does not exist")
if os.path.isfile(cache_file):
with open(cache_file, "r") as fh:
try:
self._files = json.load(fh)
except json.JSONDecodeError:
logger.error("cache contains invalid JSON, it will not be used")
def _save_cache(self) -> None:
"""Save the list of files to the cache file."""
cache_file = os.path.join(self._directory, ".grigori")
with open(cache_file, "w") as fh:
json.dump(self._files, fh)
[docs] def on(self, change_type: Change, callback: types.FunctionType) -> None:
"""Register a callback function for a type of change.
:param change_type: The type of the change.
:param callback: A function to call when a change of the given type occurs.
"""
if change_type == Change.ADDED:
self._callback_added = callback
elif change_type == Change.MODIFIED:
self._callback_modified = callback
elif change_type == Change.DELETED:
self._callback_deleted = callback
[docs] def watch(self) -> types.GeneratorType:
"""Keep polling for file changes and yield them.
:return: A generator that yields a list of changes.
"""
try:
while True:
yield self._poll()
time.sleep(self._polling_interval / 1000)
except KeyboardInterrupt:
if self._cache:
self._save_cache()
logger.warning("stopped watching due to KeyboardInterrupt")
def _poll(self) -> list:
"""Look for changes, then look for deleted files.
:return: A list of changes, key the keys 'type' and 'file'.
"""
changes = []
files = {}
self._walk(self._directory, changes, files)
# Compare the file lists, so we can find the deleted files.
deleted_files = self._files.keys() - files.keys()
if deleted_files:
for file in deleted_files:
change = {
"type": Change.DELETED,
"file": file
}
changes.append(change)
if self._callback_deleted is not None:
self._callback_deleted(change)
self._files = files
return changes
def _walk(self, directory: str, changes: list, files: dict) -> None:
"""Walk through a directory to find changes in files.
:param directory: The directory to walk through.
:param changes: A list that tracks the changes during the walks in a poll.
:param files: A list of files that are found during this poll. Used to compare to the list from the previous
poll to find deleted files.
"""
if not os.path.isdir(directory):
logger.warning("directory '" + directory + "' does not exist")
return
with os.scandir(directory) as scanner:
for entry in scanner:
if entry.name == ".grigori": # Filter cache file
continue
if self._is_temporary_file(entry.path): # Filter out IDE temporary files.
continue
if entry.is_dir():
if self._recursive and re.match(self._directory_pattern, entry.name):
self._walk(entry.path, changes, files)
else:
if re.match(self._file_pattern, entry.name):
files[entry.path] = entry.stat().st_mtime # Save in new list, so we can compare for deleted.
if entry.path in self._files: # The file is already saved, so we modified it.
if entry.stat().st_mtime > self._files[entry.path]:
change = {
"type": Change.MODIFIED,
"file": entry.path,
}
changes.append(change)
if self._callback_modified is not None:
self._callback_modified(change)
else: # The file is not in the files list, so we added it.
change = {
"type": Change.ADDED,
"file": entry.path,
}
changes.append(change)
if self._callback_added is not None:
self._callback_added(change)
[docs] def wait(self) -> None:
"""Hacky method to use the 'watch' method without a 'for loop'."""
for changes in self.watch():
pass
@staticmethod
def _is_temporary_file(file: str) -> bool:
"""Check if the file given is a temporary file.
:param file: The file to check.
:return: True if the file is temporary, False if not.
"""
# JetBrains editors append ___jb_[tmp/old/bak]___ to temporary file names.
if "___jb_tmp___" in file or "___jb_old___" in file or "___jb_bak___" in file:
return True
# VIM and more append ~ to temporary file names.
if file[-1:] == "~":
return True
return False