Use pathlib.Path instead of strings

This commit is contained in:
Dave Halter
2020-07-26 01:19:41 +02:00
parent 97c10facf7
commit ea6b01b968
5 changed files with 68 additions and 60 deletions

View File

@@ -8,10 +8,9 @@ import platform
import logging
import warnings
import pickle
from pathlib import Path
from typing import Dict, Any
from parso.file_io import FileIO
LOG = logging.getLogger(__name__)
_CACHED_FILE_MINIMUM_SURVIVAL = 60 * 10 # 10 minutes
@@ -66,13 +65,12 @@ See: http://docs.python.org/3/library/sys.html#sys.implementation
def _get_default_cache_path():
if platform.system().lower() == 'windows':
dir_ = os.path.join(os.getenv('LOCALAPPDATA')
or os.path.expanduser('~'), 'Parso', 'Parso')
dir_ = Path(os.getenv('LOCALAPPDATA') or os.path.expanduser('~'), 'Parso', 'Parso')
elif platform.system().lower() == 'darwin':
dir_ = os.path.join('~', 'Library', 'Caches', 'Parso')
dir_ = Path('~', 'Library', 'Caches', 'Parso')
else:
dir_ = os.path.join(os.getenv('XDG_CACHE_HOME') or '~/.cache', 'parso')
return os.path.expanduser(dir_)
dir_ = Path(os.getenv('XDG_CACHE_HOME') or '~/.cache', 'parso')
return dir_.expanduser()
_default_cache_path = _get_default_cache_path()
@@ -88,15 +86,15 @@ On Linux, if environment variable ``$XDG_CACHE_HOME`` is set,
_CACHE_CLEAR_THRESHOLD = 60 * 60 * 24
def _get_cache_clear_lock(cache_path=None):
def _get_cache_clear_lock_path(cache_path=None):
"""
The path where the cache lock is stored.
Cache lock will prevent continous cache clearing and only allow garbage
collection once a day (can be configured in _CACHE_CLEAR_THRESHOLD).
"""
cache_path = cache_path or _get_default_cache_path()
return FileIO(os.path.join(cache_path, "PARSO-CACHE-LOCK"))
cache_path = cache_path or _default_cache_path
return cache_path.joinpath("PARSO-CACHE-LOCK")
parser_cache: Dict[str, Any] = {}
@@ -213,18 +211,15 @@ def clear_inactive_cache(
inactivity_threshold=_CACHED_FILE_MAXIMUM_SURVIVAL,
):
if cache_path is None:
cache_path = _get_default_cache_path()
if not os.path.exists(cache_path):
cache_path = _default_cache_path
if not cache_path.exists():
return False
for version_path in os.listdir(cache_path):
version_path = os.path.join(cache_path, version_path)
if not os.path.isdir(version_path):
for dirname in os.listdir(cache_path):
version_path = cache_path.joinpath(dirname)
if not version_path.is_dir():
continue
for file in os.scandir(version_path):
if (
file.stat().st_atime + _CACHED_FILE_MAXIMUM_SURVIVAL
<= time.time()
):
if file.stat().st_atime + _CACHED_FILE_MAXIMUM_SURVIVAL <= time.time():
try:
os.remove(file.path)
except OSError: # silently ignore all failures
@@ -233,14 +228,29 @@ def clear_inactive_cache(
return True
def _touch(path):
try:
os.utime(path, None)
except FileNotFoundError:
try:
file = open(path, 'a')
file.close()
except (OSError, IOError): # TODO Maybe log this?
return False
return True
def _remove_cache_and_update_lock(cache_path=None):
lock = _get_cache_clear_lock(cache_path=cache_path)
clear_lock_time = lock.get_last_modified()
lock_path = _get_cache_clear_lock_path(cache_path=cache_path)
try:
clear_lock_time = os.path.getmtime(lock_path)
except FileNotFoundError:
clear_lock_time = None
if (
clear_lock_time is None # first time
or clear_lock_time + _CACHE_CLEAR_THRESHOLD <= time.time()
):
if not lock._touch():
if not _touch(lock_path):
# First make sure that as few as possible other cleanup jobs also
# get started. There is still a race condition but it's probably
# not a big problem.
@@ -252,14 +262,14 @@ def _remove_cache_and_update_lock(cache_path=None):
def _get_hashed_path(hashed_grammar, path, cache_path=None):
directory = _get_cache_directory_path(cache_path=cache_path)
file_hash = hashlib.sha256(path.encode("utf-8")).hexdigest()
file_hash = hashlib.sha256(str(path).encode("utf-8")).hexdigest()
return os.path.join(directory, '%s-%s.pkl' % (hashed_grammar, file_hash))
def _get_cache_directory_path(cache_path=None):
if cache_path is None:
cache_path = _default_cache_path
directory = os.path.join(cache_path, _VERSION_TAG)
if not os.path.exists(directory):
directory = cache_path.joinpath(_VERSION_TAG)
if not directory.exists():
os.makedirs(directory)
return directory

View File

@@ -1,8 +1,12 @@
import os
from pathlib import Path
from typing import Union
class FileIO:
def __init__(self, path):
def __init__(self, path: Union[os.PathLike, str]):
if isinstance(path, str):
path = Path(path)
self.path = path
def read(self): # Returns bytes/str
@@ -21,17 +25,6 @@ class FileIO:
except FileNotFoundError:
return None
def _touch(self):
try:
os.utime(self.path, None)
except FileNotFoundError:
try:
file = open(self.path, 'a')
file.close()
except (OSError, IOError): # TODO Maybe log this?
return False
return True
def __repr__(self):
return '%s(%s)' % (self.__class__.__name__, self.path)

View File

@@ -1,6 +1,7 @@
import hashlib
import os
from typing import Generic, TypeVar, Union, Dict, Optional, Any
from pathlib import Path
from parso._compatibility import is_pypy
from parso.pgen2 import generate_grammar
@@ -49,11 +50,11 @@ class Grammar(Generic[_NodeT]):
code: Union[str, bytes] = None,
*,
error_recovery=True,
path: str = None,
path: Union[os.PathLike, str] = None,
start_symbol: str = None,
cache=False,
diff_cache=False,
cache_path: str = None,
cache_path: Union[os.PathLike, str] = None,
file_io: FileIO = None) -> _NodeT:
"""
If you want to parse a Python file you want to start here, most likely.
@@ -92,6 +93,11 @@ class Grammar(Generic[_NodeT]):
if code is None and path is None and file_io is None:
raise TypeError("Please provide either code or a path.")
if isinstance(path, str):
path = Path(path)
if isinstance(cache_path, str):
cache_path = Path(cache_path)
if start_symbol is None:
start_symbol = self._start_nonterminal
@@ -100,7 +106,7 @@ class Grammar(Generic[_NodeT]):
if file_io is None:
if code is None:
file_io = FileIO(path)
file_io = FileIO(path) # type: ignore
else:
file_io = KnownContentFileIO(path, code)