From 90617bc76af4de4030be7374f50a38143475e0d4 Mon Sep 17 00:00:00 2001 From: Maxim Kurnikov Date: Sat, 4 Jan 2020 16:52:13 +0300 Subject: [PATCH] QuerySet.as_manager() support --- mypy_django_plugin/main.py | 17 +- mypy_django_plugin/transformers/managers.py | 214 ++++++++++++++++-- .../managers/querysets/test_as_manager.yml | 42 ++++ 3 files changed, 247 insertions(+), 26 deletions(-) create mode 100644 test-data/typecheck/managers/querysets/test_as_manager.yml diff --git a/mypy_django_plugin/main.py b/mypy_django_plugin/main.py index f94af4a..5c6b569 100644 --- a/mypy_django_plugin/main.py +++ b/mypy_django_plugin/main.py @@ -20,7 +20,7 @@ from mypy_django_plugin.transformers import ( ) from mypy_django_plugin.transformers.managers import ( create_new_manager_class_from_from_queryset_method, -) + create_manager_class_from_as_manager_method, instantiate_anonymous_queryset_from_as_manager) from mypy_django_plugin.transformers.models import process_model_class @@ -124,6 +124,10 @@ class NewSemanalDjangoPlugin(Plugin): return 10, module, -1 def get_additional_deps(self, file: MypyFile) -> List[Tuple[int, str, int]]: + # load QuerySet and Manager together (for as_manager) + if file.fullname == 'django.db.models.query': + return [self._new_dependency('django.db.models.manager')] + # for settings if file.fullname == 'django.conf' and self.django_context.django_settings_module: return [self._new_dependency(self.django_context.django_settings_module)] @@ -213,6 +217,11 @@ class NewSemanalDjangoPlugin(Plugin): if info and info.has_base(fullnames.OPTIONS_CLASS_FULLNAME): return partial(meta.return_proper_field_type_from_get_field, django_context=self.django_context) + if method_name == 'as_manager': + info = self._get_typeinfo_or_none(class_fullname) + if info and info.has_base(fullnames.QUERYSET_CLASS_FULLNAME): + return instantiate_anonymous_queryset_from_as_manager + manager_classes = self._get_current_manager_bases() if class_fullname in manager_classes and method_name == 'create': return partial(init_create.redefine_and_typecheck_model_create, django_context=self.django_context) @@ -253,6 +262,12 @@ class NewSemanalDjangoPlugin(Plugin): info = self._get_typeinfo_or_none(class_name) if info and info.has_base(fullnames.BASE_MANAGER_CLASS_FULLNAME): return create_new_manager_class_from_from_queryset_method + if fullname.endswith('as_manager'): + class_name, _, _ = fullname.rpartition('.') + info = self._get_typeinfo_or_none(class_name) + if info and info.has_base(fullnames.QUERYSET_CLASS_FULLNAME): + return create_manager_class_from_as_manager_method + return None diff --git a/mypy_django_plugin/transformers/managers.py b/mypy_django_plugin/transformers/managers.py index c304273..6d43e4a 100644 --- a/mypy_django_plugin/transformers/managers.py +++ b/mypy_django_plugin/transformers/managers.py @@ -1,14 +1,16 @@ -from typing import Iterator, Tuple, Optional +from typing import Iterator, Tuple, Optional, Any, Dict from mypy.nodes import ( FuncDef, MemberExpr, NameExpr, RefExpr, StrExpr, TypeInfo, - PlaceholderNode, SymbolTableNode, GDEF -) -from mypy.plugin import ClassDefContext, DynamicClassDefContext -from mypy.types import AnyType, Instance, TypeOfAny + PlaceholderNode, SymbolTableNode, GDEF, + CallExpr, Context, Decorator, OverloadedFuncDef, SymbolTable) +from mypy.plugin import ClassDefContext, DynamicClassDefContext, MethodContext +from mypy.semanal import SemanticAnalyzer, is_valid_replacement, is_same_symbol +from mypy.types import AnyType, Instance, TypeOfAny, CallableType +from mypy.types import Type as MypyType from mypy.typevars import fill_typevars -from mypy_django_plugin.lib import fullnames, sem_helpers, helpers +from mypy_django_plugin.lib import fullnames, sem_helpers, helpers, chk_helpers def iter_all_custom_queryset_methods(derived_queryset_info: TypeInfo) -> Iterator[Tuple[str, FuncDef]]: @@ -20,19 +22,23 @@ def iter_all_custom_queryset_methods(derived_queryset_info: TypeInfo) -> Iterato yield name, sym.node -def resolve_callee_manager_info_or_exception(ctx: DynamicClassDefContext) -> Optional[TypeInfo]: +def generate_from_queryset_name(base_manager_info: TypeInfo, queryset_info: TypeInfo) -> str: + return base_manager_info.name + 'From' + queryset_info.name + + +def resolve_callee_info_or_exception(ctx: DynamicClassDefContext) -> Optional[TypeInfo]: callee = ctx.call.callee assert isinstance(callee, MemberExpr) assert isinstance(callee.expr, RefExpr) - callee_manager_info = callee.expr.node - if (callee_manager_info is None - or isinstance(callee_manager_info, PlaceholderNode)): - raise sem_helpers.IncompleteDefnException(f'Definition of base manager {callee_manager_info.fullname} ' + callee_info = callee.expr.node + if (callee_info is None + or isinstance(callee_info, PlaceholderNode)): + raise sem_helpers.IncompleteDefnException(f'Definition of base manager {callee_info.fullname} ' f'is incomplete.') - assert isinstance(callee_manager_info, TypeInfo) - return callee_manager_info + assert isinstance(callee_info, TypeInfo) + return callee_info def resolve_passed_queryset_info_or_exception(ctx: DynamicClassDefContext) -> Optional[TypeInfo]: @@ -75,28 +81,39 @@ def new_manager_typeinfo(ctx: DynamicClassDefContext, callee_manager_info: TypeI return new_manager_info +def get_generated_manager_fullname(call: CallExpr, base_manager_info: TypeInfo, queryset_info: TypeInfo) -> str: + if len(call.args) > 1: + # only for from_queryset() + expr = call.args[1] + assert isinstance(expr, StrExpr) + custom_manager_generated_name = expr.value + else: + custom_manager_generated_name = base_manager_info.name + 'From' + queryset_info.name + + custom_manager_generated_fullname = 'django.db.models.manager' + '.' + custom_manager_generated_name + return custom_manager_generated_fullname + + +def get_generated_managers_metadata(django_manager_info: TypeInfo) -> Dict[str, Any]: + return django_manager_info.metadata.setdefault('from_queryset_managers', {}) + + def record_new_manager_info_fullname_into_metadata(ctx: DynamicClassDefContext, new_manager_fullname: str, callee_manager_info: TypeInfo, queryset_info: TypeInfo, django_manager_info: TypeInfo) -> None: - if len(ctx.call.args) > 1: - expr = ctx.call.args[1] - assert isinstance(expr, StrExpr) - custom_manager_generated_name = expr.value - else: - custom_manager_generated_name = callee_manager_info.name + 'From' + queryset_info.name - - custom_manager_generated_fullname = 'django.db.models.manager' + '.' + custom_manager_generated_name - - metadata = django_manager_info.metadata.setdefault('from_queryset_managers', {}) + custom_manager_generated_fullname = get_generated_manager_fullname(ctx.call, + base_manager_info=callee_manager_info, + queryset_info=queryset_info) + metadata = get_generated_managers_metadata(django_manager_info) metadata[custom_manager_generated_fullname] = new_manager_fullname def create_new_manager_class_from_from_queryset_method(ctx: DynamicClassDefContext) -> None: semanal_api = sem_helpers.get_semanal_api(ctx) try: - callee_manager_info = resolve_callee_manager_info_or_exception(ctx) + callee_manager_info = resolve_callee_info_or_exception(ctx) queryset_info = resolve_passed_queryset_info_or_exception(ctx) django_manager_info = resolve_django_manager_info_or_exception(ctx) except sem_helpers.IncompleteDefnException: @@ -116,7 +133,6 @@ def create_new_manager_class_from_from_queryset_method(ctx: DynamicClassDefConte class_def_context = ClassDefContext(cls=new_manager_info.defn, reason=ctx.call, api=semanal_api) self_type = fill_typevars(new_manager_info) - # self_type = Instance(new_manager_info, []) try: for name, method_node in iter_all_custom_queryset_methods(queryset_info): @@ -147,3 +163,151 @@ def create_new_manager_class_from_from_queryset_method(ctx: DynamicClassDefConte if (not added and not semanal_api.final_iteration): semanal_api.defer() + + +def add_symbol_table_node(api: SemanticAnalyzer, + name: str, + symbol: SymbolTableNode, + context: Optional[Context] = None, + symbol_table: Optional[SymbolTable] = None, + can_defer: bool = True, + escape_comprehensions: bool = False) -> bool: + """Add symbol table node to the currently active symbol table. + + Return True if we actually added the symbol, or False if we refused + to do so (because something is not ready or it was a no-op). + + Generate an error if there is an invalid redefinition. + + If context is None, unconditionally add node, since we can't report + an error. Note that this is used by plugins to forcibly replace nodes! + + TODO: Prevent plugins from replacing nodes, as it could cause problems? + + Args: + name: short name of symbol + symbol: Node to add + can_defer: if True, defer current target if adding a placeholder + context: error context (see above about None value) + """ + names = symbol_table or api.current_symbol_table(escape_comprehensions=escape_comprehensions) + existing = names.get(name) + if isinstance(symbol.node, PlaceholderNode) and can_defer: + api.defer(context) + if (existing is not None + and context is not None + and not is_valid_replacement(existing, symbol)): + # There is an existing node, so this may be a redefinition. + # If the new node points to the same node as the old one, + # or if both old and new nodes are placeholders, we don't + # need to do anything. + old = existing.node + new = symbol.node + if isinstance(new, PlaceholderNode): + # We don't know whether this is okay. Let's wait until the next iteration. + return False + if not is_same_symbol(old, new): + if isinstance(new, (FuncDef, Decorator, OverloadedFuncDef, TypeInfo)): + api.add_redefinition(names, name, symbol) + if not (isinstance(new, (FuncDef, Decorator)) + and api.set_original_def(old, new)): + api.name_already_defined(name, context, existing) + elif name not in api.missing_names and '*' not in api.missing_names: + names[name] = symbol + api.progress = True + return True + return False + + + +def create_manager_class_from_as_manager_method(ctx: DynamicClassDefContext) -> None: + semanal_api = sem_helpers.get_semanal_api(ctx) + try: + queryset_info = resolve_callee_info_or_exception(ctx) + django_manager_info = resolve_django_manager_info_or_exception(ctx) + except sem_helpers.IncompleteDefnException: + if not semanal_api.final_iteration: + semanal_api.defer() + return + else: + raise + + generic_param = AnyType(TypeOfAny.explicit) + generic_param_name = 'Any' + if (semanal_api.scope.classes + and semanal_api.scope.classes[-1].has_base(fullnames.MODEL_CLASS_FULLNAME)): + info = semanal_api.scope.classes[-1] # type: TypeInfo + generic_param = Instance(info, []) + generic_param_name = info.name + + new_manager_class_name = queryset_info.name + '_AsManager_' + generic_param_name + new_manager_info = helpers.new_typeinfo(new_manager_class_name, + bases=[Instance(django_manager_info, [generic_param])], + module_name=semanal_api.cur_mod_id) + new_manager_info.set_line(ctx.call) + + record_new_manager_info_fullname_into_metadata(ctx, + new_manager_info.fullname, + django_manager_info, + queryset_info, + django_manager_info) + + class_def_context = ClassDefContext(cls=new_manager_info.defn, + reason=ctx.call, api=semanal_api) + self_type = fill_typevars(new_manager_info) + + try: + for name, method_node in iter_all_custom_queryset_methods(queryset_info): + sem_helpers.copy_method_or_incomplete_defn_exception(class_def_context, + self_type, + new_method_name=name, + method_node=method_node) + except sem_helpers.IncompleteDefnException: + if not semanal_api.final_iteration: + semanal_api.defer() + return + else: + raise + + new_manager_sym = SymbolTableNode(GDEF, new_manager_info, plugin_generated=True) + + # context=None - forcibly replace old node + added = add_symbol_table_node(semanal_api, new_manager_class_name, new_manager_sym, + context=None, + symbol_table=semanal_api.globals) + if added: + # replace all references to the old manager Var everywhere + for _, module in semanal_api.modules.items(): + if module.fullname != semanal_api.cur_mod_id: + for sym_name, sym in module.names.items(): + if sym.fullname == new_manager_info.fullname: + module.names[sym_name] = new_manager_sym.copy() + + # we need another iteration to process methods + if (not added + and not semanal_api.final_iteration): + semanal_api.defer() + + +def instantiate_anonymous_queryset_from_as_manager(ctx: MethodContext) -> MypyType: + api = chk_helpers.get_typechecker_api(ctx) + django_manager_info = helpers.lookup_fully_qualified_typeinfo(api, fullnames.MANAGER_CLASS_FULLNAME) + assert django_manager_info is not None + + assert isinstance(ctx.type, CallableType) + assert isinstance(ctx.type.ret_type, Instance) + queryset_info = ctx.type.ret_type.type + + fullname = get_generated_manager_fullname(ctx.context, + base_manager_info=django_manager_info, + queryset_info=queryset_info) + metadata = get_generated_managers_metadata(django_manager_info) + if fullname not in metadata: + raise ValueError(f'{fullname!r} is not present in generated managers list') + + module_name, _, class_name = metadata[fullname].rpartition('.') + current_module = helpers.get_current_module(api) + assert module_name == current_module.fullname + + generated_manager_info = current_module.names[class_name].node + return fill_typevars(generated_manager_info) diff --git a/test-data/typecheck/managers/querysets/test_as_manager.yml b/test-data/typecheck/managers/querysets/test_as_manager.yml new file mode 100644 index 0000000..e08c021 --- /dev/null +++ b/test-data/typecheck/managers/querysets/test_as_manager.yml @@ -0,0 +1,42 @@ +- case: anonymous_queryset_from_as_manager_inside_model + main: | + from myapp.models import MyModel + + reveal_type(MyModel.objects) # N: Revealed type is 'myapp.models.MyQuerySet_AsManager_MyModel' + reveal_type(MyModel.objects.get()) # N: Revealed type is 'myapp.models.MyModel*' + reveal_type(MyModel.objects.queryset_method) # N: Revealed type is 'def () -> builtins.int' + reveal_type(MyModel.objects.queryset_method()) # N: Revealed type is 'builtins.int' + installed_apps: + - myapp + files: + - path: myapp/__init__.py + - path: myapp/models.py + content: | + from django.db import models + class MyQuerySet(models.QuerySet): + def queryset_method(self) -> int: + pass + class MyModel(models.Model): + objects = MyQuerySet.as_manager() + + +- case: as_manager_outside_model_parametrized_with_any + main: | + from myapp.models import NotModel, outside_objects + reveal_type(NotModel.objects) # N: Revealed type is 'myapp.models.MyQuerySet_AsManager_Any' + reveal_type(NotModel.objects.get()) # N: Revealed type is 'Any' + reveal_type(outside_objects) # N: Revealed type is 'myapp.models.MyQuerySet_AsManager_Any' + reveal_type(outside_objects.get()) # N: Revealed type is 'Any' + installed_apps: + - myapp + files: + - path: myapp/__init__.py + - path: myapp/models.py + content: | + from django.db import models + class MyQuerySet(models.QuerySet): + def queryset_method(self) -> int: + pass + outside_objects = MyQuerySet.as_manager() + class NotModel: + objects = MyQuerySet.as_manager()