diff --git a/conftest.py b/conftest.py index 4d0b927..51b3edc 100644 --- a/conftest.py +++ b/conftest.py @@ -3,6 +3,7 @@ import tempfile import shutil import logging import os +from pathlib import Path import pytest @@ -28,7 +29,7 @@ def clean_parso_cache(): """ old = cache._default_cache_path tmp = tempfile.mkdtemp(prefix='parso-test-') - cache._default_cache_path = tmp + cache._default_cache_path = Path(str(tmp)) yield cache._default_cache_path = old shutil.rmtree(tmp) diff --git a/parso/cache.py b/parso/cache.py index d96bb7b..dd0f496 100644 --- a/parso/cache.py +++ b/parso/cache.py @@ -8,10 +8,9 @@ import platform import logging import warnings import pickle +from pathlib import Path from typing import Dict, Any -from parso.file_io import FileIO - LOG = logging.getLogger(__name__) _CACHED_FILE_MINIMUM_SURVIVAL = 60 * 10 # 10 minutes @@ -66,13 +65,12 @@ See: http://docs.python.org/3/library/sys.html#sys.implementation def _get_default_cache_path(): if platform.system().lower() == 'windows': - dir_ = os.path.join(os.getenv('LOCALAPPDATA') - or os.path.expanduser('~'), 'Parso', 'Parso') + dir_ = Path(os.getenv('LOCALAPPDATA') or os.path.expanduser('~'), 'Parso', 'Parso') elif platform.system().lower() == 'darwin': - dir_ = os.path.join('~', 'Library', 'Caches', 'Parso') + dir_ = Path('~', 'Library', 'Caches', 'Parso') else: - dir_ = os.path.join(os.getenv('XDG_CACHE_HOME') or '~/.cache', 'parso') - return os.path.expanduser(dir_) + dir_ = Path(os.getenv('XDG_CACHE_HOME') or '~/.cache', 'parso') + return dir_.expanduser() _default_cache_path = _get_default_cache_path() @@ -88,15 +86,15 @@ On Linux, if environment variable ``$XDG_CACHE_HOME`` is set, _CACHE_CLEAR_THRESHOLD = 60 * 60 * 24 -def _get_cache_clear_lock(cache_path=None): +def _get_cache_clear_lock_path(cache_path=None): """ The path where the cache lock is stored. Cache lock will prevent continous cache clearing and only allow garbage collection once a day (can be configured in _CACHE_CLEAR_THRESHOLD). """ - cache_path = cache_path or _get_default_cache_path() - return FileIO(os.path.join(cache_path, "PARSO-CACHE-LOCK")) + cache_path = cache_path or _default_cache_path + return cache_path.joinpath("PARSO-CACHE-LOCK") parser_cache: Dict[str, Any] = {} @@ -213,18 +211,15 @@ def clear_inactive_cache( inactivity_threshold=_CACHED_FILE_MAXIMUM_SURVIVAL, ): if cache_path is None: - cache_path = _get_default_cache_path() - if not os.path.exists(cache_path): + cache_path = _default_cache_path + if not cache_path.exists(): return False - for version_path in os.listdir(cache_path): - version_path = os.path.join(cache_path, version_path) - if not os.path.isdir(version_path): + for dirname in os.listdir(cache_path): + version_path = cache_path.joinpath(dirname) + if not version_path.is_dir(): continue for file in os.scandir(version_path): - if ( - file.stat().st_atime + _CACHED_FILE_MAXIMUM_SURVIVAL - <= time.time() - ): + if file.stat().st_atime + _CACHED_FILE_MAXIMUM_SURVIVAL <= time.time(): try: os.remove(file.path) except OSError: # silently ignore all failures @@ -233,14 +228,29 @@ def clear_inactive_cache( return True +def _touch(path): + try: + os.utime(path, None) + except FileNotFoundError: + try: + file = open(path, 'a') + file.close() + except (OSError, IOError): # TODO Maybe log this? + return False + return True + + def _remove_cache_and_update_lock(cache_path=None): - lock = _get_cache_clear_lock(cache_path=cache_path) - clear_lock_time = lock.get_last_modified() + lock_path = _get_cache_clear_lock_path(cache_path=cache_path) + try: + clear_lock_time = os.path.getmtime(lock_path) + except FileNotFoundError: + clear_lock_time = None if ( clear_lock_time is None # first time or clear_lock_time + _CACHE_CLEAR_THRESHOLD <= time.time() ): - if not lock._touch(): + if not _touch(lock_path): # First make sure that as few as possible other cleanup jobs also # get started. There is still a race condition but it's probably # not a big problem. @@ -252,14 +262,14 @@ def _remove_cache_and_update_lock(cache_path=None): def _get_hashed_path(hashed_grammar, path, cache_path=None): directory = _get_cache_directory_path(cache_path=cache_path) - file_hash = hashlib.sha256(path.encode("utf-8")).hexdigest() + file_hash = hashlib.sha256(str(path).encode("utf-8")).hexdigest() return os.path.join(directory, '%s-%s.pkl' % (hashed_grammar, file_hash)) def _get_cache_directory_path(cache_path=None): if cache_path is None: cache_path = _default_cache_path - directory = os.path.join(cache_path, _VERSION_TAG) - if not os.path.exists(directory): + directory = cache_path.joinpath(_VERSION_TAG) + if not directory.exists(): os.makedirs(directory) return directory diff --git a/parso/file_io.py b/parso/file_io.py index 8874466..568ce9d 100644 --- a/parso/file_io.py +++ b/parso/file_io.py @@ -1,8 +1,12 @@ import os +from pathlib import Path +from typing import Union class FileIO: - def __init__(self, path): + def __init__(self, path: Union[os.PathLike, str]): + if isinstance(path, str): + path = Path(path) self.path = path def read(self): # Returns bytes/str @@ -21,17 +25,6 @@ class FileIO: except FileNotFoundError: return None - def _touch(self): - try: - os.utime(self.path, None) - except FileNotFoundError: - try: - file = open(self.path, 'a') - file.close() - except (OSError, IOError): # TODO Maybe log this? - return False - return True - def __repr__(self): return '%s(%s)' % (self.__class__.__name__, self.path) diff --git a/parso/grammar.py b/parso/grammar.py index f0e6684..28df8d6 100644 --- a/parso/grammar.py +++ b/parso/grammar.py @@ -1,6 +1,7 @@ import hashlib import os from typing import Generic, TypeVar, Union, Dict, Optional, Any +from pathlib import Path from parso._compatibility import is_pypy from parso.pgen2 import generate_grammar @@ -49,11 +50,11 @@ class Grammar(Generic[_NodeT]): code: Union[str, bytes] = None, *, error_recovery=True, - path: str = None, + path: Union[os.PathLike, str] = None, start_symbol: str = None, cache=False, diff_cache=False, - cache_path: str = None, + cache_path: Union[os.PathLike, str] = None, file_io: FileIO = None) -> _NodeT: """ If you want to parse a Python file you want to start here, most likely. @@ -92,6 +93,11 @@ class Grammar(Generic[_NodeT]): if code is None and path is None and file_io is None: raise TypeError("Please provide either code or a path.") + if isinstance(path, str): + path = Path(path) + if isinstance(cache_path, str): + cache_path = Path(cache_path) + if start_symbol is None: start_symbol = self._start_nonterminal @@ -100,7 +106,7 @@ class Grammar(Generic[_NodeT]): if file_io is None: if code is None: - file_io = FileIO(path) + file_io = FileIO(path) # type: ignore else: file_io = KnownContentFileIO(path, code) diff --git a/test/test_cache.py b/test/test_cache.py index d5e4397..bfdbaf5 100644 --- a/test/test_cache.py +++ b/test/test_cache.py @@ -3,13 +3,12 @@ Test all things related to the ``jedi.cache`` module. """ import os -import os.path - import pytest import time +from pathlib import Path from parso.cache import (_CACHED_FILE_MAXIMUM_SURVIVAL, _VERSION_TAG, - _get_cache_clear_lock, _get_hashed_path, + _get_cache_clear_lock_path, _get_hashed_path, _load_from_file_system, _NodeCacheItem, _remove_cache_and_update_lock, _save_to_file_system, load_module, parser_cache, try_to_save_module) @@ -30,9 +29,8 @@ skip_pypy = pytest.mark.skipif( def isolated_parso_cache(monkeypatch, tmpdir): """Set `parso.cache._default_cache_path` to a temporary directory during the test. """ - cache_path = str(os.path.join(str(tmpdir), "__parso_cache")) + cache_path = Path(str(tmpdir), "__parso_cache") monkeypatch.setattr(cache, '_default_cache_path', cache_path) - monkeypatch.setattr(cache, '_get_default_cache_path', lambda *args, **kwargs: cache_path) return cache_path @@ -42,13 +40,13 @@ def test_modulepickling_change_cache_dir(tmpdir): See: `#168 `_ """ - dir_1 = str(tmpdir.mkdir('first')) - dir_2 = str(tmpdir.mkdir('second')) + dir_1 = Path(str(tmpdir.mkdir('first'))) + dir_2 = Path(str(tmpdir.mkdir('second'))) item_1 = _NodeCacheItem('bla', []) item_2 = _NodeCacheItem('bla', []) - path_1 = 'fake path 1' - path_2 = 'fake path 2' + path_1 = Path('fake path 1') + path_2 = Path('fake path 2') hashed_grammar = load_grammar()._hashed _save_to_file_system(hashed_grammar, path_1, item_1, cache_path=dir_1) @@ -86,7 +84,7 @@ def test_modulepickling_simulate_deleted_cache(tmpdir): module = 'fake parser' # Create the file - path = tmpdir.dirname + '/some_path' + path = Path(str(tmpdir.dirname), 'some_path') with open(path, 'w'): pass io = file_io.FileIO(path) @@ -134,7 +132,7 @@ class _FixedTimeFileIO(file_io.KnownContentFileIO): @pytest.mark.parametrize('diff_cache', [False, True]) @pytest.mark.parametrize('use_file_io', [False, True]) def test_cache_last_used_update(diff_cache, use_file_io): - p = '/path/last-used' + p = Path('/path/last-used') parser_cache.clear() # Clear, because then it's easier to find stuff. parse('somecode', cache=True, path=p) node_cache_item = next(iter(parser_cache.values()))[p] @@ -157,21 +155,21 @@ def test_inactive_cache(tmpdir, isolated_parso_cache): test_subjects = "abcdef" for path in test_subjects: parse('somecode', cache=True, path=os.path.join(str(tmpdir), path)) - raw_cache_path = os.path.join(isolated_parso_cache, _VERSION_TAG) - assert os.path.exists(raw_cache_path) - paths = os.listdir(raw_cache_path) + raw_cache_path = isolated_parso_cache.joinpath(_VERSION_TAG) + assert raw_cache_path.exists() + dir_names = os.listdir(raw_cache_path) a_while_ago = time.time() - _CACHED_FILE_MAXIMUM_SURVIVAL old_paths = set() - for path in paths[:len(test_subjects) // 2]: # make certain number of paths old - os.utime(os.path.join(raw_cache_path, path), (a_while_ago, a_while_ago)) - old_paths.add(path) + for dir_name in dir_names[:len(test_subjects) // 2]: # make certain number of paths old + os.utime(raw_cache_path.joinpath(dir_name), (a_while_ago, a_while_ago)) + old_paths.add(dir_name) # nothing should be cleared while the lock is on - assert os.path.exists(_get_cache_clear_lock().path) + assert _get_cache_clear_lock_path().exists() _remove_cache_and_update_lock() # it shouldn't clear anything assert len(os.listdir(raw_cache_path)) == len(test_subjects) assert old_paths.issubset(os.listdir(raw_cache_path)) - os.utime(_get_cache_clear_lock().path, (a_while_ago, a_while_ago)) + os.utime(_get_cache_clear_lock_path(), (a_while_ago, a_while_ago)) _remove_cache_and_update_lock() assert len(os.listdir(raw_cache_path)) == len(test_subjects) // 2 assert not old_paths.intersection(os.listdir(raw_cache_path))