mirror of
https://github.com/davidhalter/django-stubs.git
synced 2025-12-09 05:24:53 +08:00
Add support for BaseManager.from_queryset() (#251)
* add support for BaseManager.from_queryset() * cleanups * lint fixes
This commit is contained in:
@@ -1,5 +1,4 @@
|
||||
from collections import OrderedDict
|
||||
from typing import List, Tuple, Type
|
||||
from typing import Dict, Optional, Type, cast
|
||||
|
||||
from django.db.models.base import Model
|
||||
from django.db.models.fields import DateField, DateTimeField
|
||||
@@ -8,10 +7,10 @@ from django.db.models.fields.reverse_related import (
|
||||
ManyToManyRel, ManyToOneRel, OneToOneRel,
|
||||
)
|
||||
from mypy.nodes import ARG_STAR2, Argument, Context, FuncDef, TypeInfo, Var
|
||||
from mypy.plugin import ClassDefContext, SemanticAnalyzerPluginInterface
|
||||
from mypy.plugin import ClassDefContext
|
||||
from mypy.plugins import common
|
||||
from mypy.plugins.common import add_method
|
||||
from mypy.types import AnyType, CallableType, Instance
|
||||
from mypy.semanal import SemanticAnalyzer
|
||||
from mypy.types import AnyType, Instance
|
||||
from mypy.types import Type as MypyType
|
||||
from mypy.types import TypeOfAny
|
||||
|
||||
@@ -22,19 +21,22 @@ from mypy_django_plugin.transformers.fields import get_field_descriptor_types
|
||||
|
||||
|
||||
class ModelClassInitializer:
|
||||
api: SemanticAnalyzerPluginInterface
|
||||
api: SemanticAnalyzer
|
||||
|
||||
def __init__(self, ctx: ClassDefContext, django_context: DjangoContext):
|
||||
self.api = ctx.api
|
||||
self.api = cast(SemanticAnalyzer, ctx.api)
|
||||
self.model_classdef = ctx.cls
|
||||
self.django_context = django_context
|
||||
self.ctx = ctx
|
||||
|
||||
def lookup_typeinfo(self, fullname: str) -> Optional[TypeInfo]:
|
||||
return helpers.lookup_fully_qualified_typeinfo(self.api, fullname)
|
||||
|
||||
def lookup_typeinfo_or_incomplete_defn_error(self, fullname: str) -> TypeInfo:
|
||||
sym = self.api.lookup_fully_qualified_or_none(fullname)
|
||||
if sym is None or not isinstance(sym.node, TypeInfo):
|
||||
info = self.lookup_typeinfo(fullname)
|
||||
if info is None:
|
||||
raise helpers.IncompleteDefnException(f'No {fullname!r} found')
|
||||
return sym.node
|
||||
return info
|
||||
|
||||
def lookup_class_typeinfo_or_incomplete_defn_error(self, klass: type) -> TypeInfo:
|
||||
fullname = helpers.get_class_fullname(klass)
|
||||
@@ -135,11 +137,64 @@ class AddRelatedModelsId(ModelClassInitializer):
|
||||
|
||||
|
||||
class AddManagers(ModelClassInitializer):
|
||||
def _is_manager_any(self, typ: Instance) -> bool:
|
||||
return typ.type.fullname == fullnames.MANAGER_CLASS_FULLNAME and type(typ.args[0]) == AnyType
|
||||
def has_any_parametrized_manager_as_base(self, info: TypeInfo) -> bool:
|
||||
for base in helpers.iter_bases(info):
|
||||
if self.is_any_parametrized_manager(base):
|
||||
return True
|
||||
return False
|
||||
|
||||
def is_any_parametrized_manager(self, typ: Instance) -> bool:
|
||||
return typ.type.fullname == fullnames.MANAGER_CLASS_FULLNAME and isinstance(typ.args[0], AnyType)
|
||||
|
||||
def get_generated_manager_mappings(self, base_manager_fullname: str) -> Dict[str, str]:
|
||||
base_manager_info = self.lookup_typeinfo(base_manager_fullname)
|
||||
if (base_manager_info is None
|
||||
or 'from_queryset_managers' not in base_manager_info.metadata):
|
||||
return {}
|
||||
return base_manager_info.metadata['from_queryset_managers']
|
||||
|
||||
def create_new_model_parametrized_manager(self, name: str, base_manager_info: TypeInfo) -> Instance:
|
||||
bases = []
|
||||
for original_base in base_manager_info.bases:
|
||||
if self.is_any_parametrized_manager(original_base):
|
||||
if original_base.type is None:
|
||||
raise helpers.IncompleteDefnException()
|
||||
|
||||
original_base = helpers.reparametrize_instance(original_base,
|
||||
[Instance(self.model_classdef.info, [])])
|
||||
bases.append(original_base)
|
||||
|
||||
current_module = self.api.modules[self.model_classdef.info.module_name]
|
||||
custom_manager_info = helpers.add_new_class_for_module(current_module,
|
||||
name=name, bases=bases)
|
||||
# copy fields to a new manager
|
||||
new_cls_def_context = ClassDefContext(cls=custom_manager_info.defn,
|
||||
reason=self.ctx.reason,
|
||||
api=self.api)
|
||||
custom_manager_type = Instance(custom_manager_info, [Instance(self.model_classdef.info, [])])
|
||||
|
||||
for name, sym in base_manager_info.names.items():
|
||||
# replace self type with new class, if copying method
|
||||
if isinstance(sym.node, FuncDef):
|
||||
helpers.copy_method_to_another_class(new_cls_def_context,
|
||||
self_type=custom_manager_type,
|
||||
new_method_name=name,
|
||||
method_node=sym.node)
|
||||
continue
|
||||
|
||||
new_sym = sym.copy()
|
||||
if isinstance(new_sym.node, Var):
|
||||
new_var = Var(name, type=sym.type)
|
||||
new_var.info = custom_manager_info
|
||||
new_var._fullname = custom_manager_info.fullname + '.' + name
|
||||
new_sym.node = new_var
|
||||
custom_manager_info.names[name] = new_sym
|
||||
|
||||
return custom_manager_type
|
||||
|
||||
def run_with_model_cls(self, model_cls: Type[Model]) -> None:
|
||||
for manager_name, manager in model_cls._meta.managers_map.items():
|
||||
manager_class_name = manager.__class__.__name__
|
||||
manager_fullname = helpers.get_class_fullname(manager.__class__)
|
||||
try:
|
||||
manager_info = self.lookup_typeinfo_or_incomplete_defn_error(manager_fullname)
|
||||
@@ -147,78 +202,33 @@ class AddManagers(ModelClassInitializer):
|
||||
if not self.api.final_iteration:
|
||||
raise exc
|
||||
else:
|
||||
continue
|
||||
base_manager_fullname = helpers.get_class_fullname(manager.__class__.__bases__[0])
|
||||
generated_managers = self.get_generated_manager_mappings(base_manager_fullname)
|
||||
if manager_fullname not in generated_managers:
|
||||
# not a generated manager, continue with the loop
|
||||
continue
|
||||
real_manager_fullname = generated_managers[manager_fullname]
|
||||
manager_info = self.lookup_typeinfo(real_manager_fullname) # type: ignore
|
||||
if manager_info is None:
|
||||
continue
|
||||
manager_class_name = real_manager_fullname.rsplit('.', maxsplit=1)[1]
|
||||
|
||||
if manager_name not in self.model_classdef.info.names:
|
||||
manager_type = Instance(manager_info, [Instance(self.model_classdef.info, [])])
|
||||
self.add_new_node_to_model_class(manager_name, manager_type)
|
||||
else:
|
||||
# creates new MODELNAME_MANAGERCLASSNAME class that represents manager parametrized with current model
|
||||
has_manager_any_base = any(self._is_manager_any(base) for base in manager_info.bases)
|
||||
if has_manager_any_base:
|
||||
custom_model_manager_name = manager.model.__name__ + '_' + manager.__class__.__name__
|
||||
if not self.has_any_parametrized_manager_as_base(manager_info):
|
||||
continue
|
||||
|
||||
try:
|
||||
bases = []
|
||||
for original_base in manager_info.bases:
|
||||
if self._is_manager_any(original_base):
|
||||
if original_base.type is None:
|
||||
raise helpers.IncompleteDefnException()
|
||||
custom_model_manager_name = manager.model.__name__ + '_' + manager_class_name
|
||||
try:
|
||||
custom_manager_type = self.create_new_model_parametrized_manager(custom_model_manager_name,
|
||||
base_manager_info=manager_info)
|
||||
except helpers.IncompleteDefnException:
|
||||
continue
|
||||
|
||||
original_base = helpers.reparametrize_instance(original_base,
|
||||
[Instance(self.model_classdef.info, [])])
|
||||
bases.append(original_base)
|
||||
except helpers.IncompleteDefnException as exc:
|
||||
if not self.api.final_iteration:
|
||||
raise exc
|
||||
else:
|
||||
continue
|
||||
|
||||
current_module = self.api.modules[self.model_classdef.info.module_name]
|
||||
custom_manager_info = helpers.add_new_class_for_module(current_module,
|
||||
custom_model_manager_name,
|
||||
bases=bases,
|
||||
fields=OrderedDict())
|
||||
# copy fields to a new manager
|
||||
new_cls_def_context = ClassDefContext(cls=custom_manager_info.defn,
|
||||
reason=self.ctx.reason,
|
||||
api=self.api)
|
||||
custom_manager_type = Instance(custom_manager_info, [Instance(self.model_classdef.info, [])])
|
||||
|
||||
for name, sym in manager_info.names.items():
|
||||
# replace self type with new class, if copying method
|
||||
if isinstance(sym.node, FuncDef):
|
||||
arguments, return_type = self.prepare_new_method_arguments(sym.node)
|
||||
add_method(new_cls_def_context,
|
||||
name,
|
||||
args=arguments,
|
||||
return_type=return_type,
|
||||
self_type=custom_manager_type)
|
||||
continue
|
||||
|
||||
new_sym = sym.copy()
|
||||
if isinstance(new_sym.node, Var):
|
||||
new_var = Var(name, type=sym.type)
|
||||
new_var.info = custom_manager_info
|
||||
new_var._fullname = custom_manager_info.fullname + '.' + name
|
||||
new_sym.node = new_var
|
||||
custom_manager_info.names[name] = new_sym
|
||||
|
||||
self.add_new_node_to_model_class(manager_name, custom_manager_type)
|
||||
|
||||
def prepare_new_method_arguments(self, node: FuncDef) -> Tuple[List[Argument], MypyType]:
|
||||
arguments = []
|
||||
for argument in node.arguments[1:]:
|
||||
if argument.type_annotation is None:
|
||||
argument.type_annotation = AnyType(TypeOfAny.unannotated)
|
||||
arguments.append(argument)
|
||||
|
||||
if isinstance(node.type, CallableType):
|
||||
return_type = node.type.ret_type
|
||||
else:
|
||||
return_type = AnyType(TypeOfAny.unannotated)
|
||||
|
||||
return arguments, return_type
|
||||
self.add_new_node_to_model_class(manager_name, custom_manager_type)
|
||||
|
||||
|
||||
class AddDefaultManagerAttribute(ModelClassInitializer):
|
||||
|
||||
Reference in New Issue
Block a user