Implement support for <QuerySet>.as_manager() (#1025)

* Implement support for `<QuerySet>.as_manager()`

* fixup! Implement support for `<QuerySet>.as_manager()`

* fixup! fixup! Implement support for `<QuerySet>.as_manager()`
This commit is contained in:
Petter Friberg
2022-09-29 14:05:25 +02:00
committed by GitHub
parent 1f2e406972
commit 54d5835f66
8 changed files with 582 additions and 82 deletions

View File

@@ -15,7 +15,8 @@ from mypy.nodes import (
TypeInfo,
Var,
)
from mypy.plugin import AttributeContext, DynamicClassDefContext, SemanticAnalyzerPluginInterface
from mypy.plugin import AttributeContext, DynamicClassDefContext
from mypy.semanal import SemanticAnalyzer
from mypy.semanal_shared import has_placeholder
from mypy.types import AnyType, CallableType, Instance, ProperType
from mypy.types import Type as MypyType
@@ -150,7 +151,6 @@ def get_method_type_from_reverse_manager(
def resolve_manager_method_from_instance(instance: Instance, method_name: str, ctx: AttributeContext) -> MypyType:
api = helpers.get_typechecker_api(ctx)
method_type = get_method_type_from_dynamic_manager(
api, method_name, instance
@@ -164,9 +164,11 @@ def resolve_manager_method(ctx: AttributeContext) -> MypyType:
A 'get_attribute_hook' that is intended to be invoked whenever the TypeChecker encounters
an attribute on a class that has 'django.db.models.BaseManager' as a base.
"""
# Skip (method) type that is currently something other than Any
# Skip (method) type that is currently something other than Any of type `implementation_artifact`
if not isinstance(ctx.default_attr_type, AnyType):
return ctx.default_attr_type
elif ctx.default_attr_type.type_of_any != TypeOfAny.implementation_artifact:
return ctx.default_attr_type
# (Current state is:) We wouldn't end up here when looking up a method from a custom _manager_.
# That's why we only attempt to lookup the method for either a dynamically added or reverse manager.
@@ -197,12 +199,12 @@ def create_new_manager_class_from_from_queryset_method(ctx: DynamicClassDefConte
return
# Don't redeclare the manager class if we've already defined it.
manager_node = semanal_api.lookup_current_scope(ctx.name)
if manager_node and isinstance(manager_node.node, TypeInfo):
manager_sym = semanal_api.lookup_current_scope(ctx.name)
if manager_sym and isinstance(manager_sym.node, TypeInfo):
# This is just a deferral run where our work is already finished
return
new_manager_info = create_manager_info_from_from_queryset_call(ctx.api, ctx.call, ctx.name)
new_manager_info = create_manager_info_from_from_queryset_call(semanal_api, ctx.call, ctx.name)
if new_manager_info is None:
if not ctx.api.final_iteration:
ctx.api.defer()
@@ -212,8 +214,17 @@ def create_new_manager_class_from_from_queryset_method(ctx: DynamicClassDefConte
helpers.add_new_manager_base(semanal_api, new_manager_info.fullname)
def register_dynamically_created_manager(fullname: str, manager_name: str, manager_base: TypeInfo) -> None:
manager_base.metadata.setdefault("from_queryset_managers", {})
# The `__module__` value of the manager type created by Django's
# `.from_queryset` is `django.db.models.manager`. But we put new type(s) in the
# module currently being processed, so we'll map those together through metadata.
runtime_fullname = ".".join(["django.db.models.manager", manager_name])
manager_base.metadata["from_queryset_managers"][runtime_fullname] = fullname
def create_manager_info_from_from_queryset_call(
api: SemanticAnalyzerPluginInterface, call_expr: CallExpr, name: Optional[str] = None
api: SemanticAnalyzer, call_expr: CallExpr, name: Optional[str] = None
) -> Optional[TypeInfo]:
"""
Extract manager and queryset TypeInfo from a from_queryset call.
@@ -247,30 +258,48 @@ def create_manager_info_from_from_queryset_call(
else:
manager_name = f"{base_manager_info.name}From{queryset_info.name}"
try:
new_manager_info = create_manager_class(api, base_manager_info, name or manager_name, call_expr.line)
except helpers.IncompleteDefnException:
return None
# Always look in global scope, as that's where we'll declare dynamic manager classes
manager_sym = api.globals.get(manager_name)
if (
manager_sym is not None
and isinstance(manager_sym.node, TypeInfo)
and manager_sym.node.has_base(base_manager_info.fullname)
and manager_sym.node.metadata.get("django", {}).get("from_queryset_manager") == queryset_info.fullname
):
# Reuse an identical, already generated, manager
new_manager_info = manager_sym.node
else:
# Create a new `TypeInfo` instance for the manager type
try:
new_manager_info = create_manager_class(
api=api,
base_manager_info=base_manager_info,
name=manager_name,
line=call_expr.line,
with_unique_name=name is not None and name != manager_name,
)
except helpers.IncompleteDefnException:
return None
popuplate_manager_from_queryset(new_manager_info, queryset_info)
manager_fullname = ".".join(["django.db.models.manager", manager_name])
base_manager_info = new_manager_info.mro[1]
base_manager_info.metadata.setdefault("from_queryset_managers", {})
base_manager_info.metadata["from_queryset_managers"][manager_fullname] = new_manager_info.fullname
populate_manager_from_queryset(new_manager_info, queryset_info)
register_dynamically_created_manager(
fullname=new_manager_info.fullname,
manager_name=manager_name,
manager_base=base_manager_info,
)
# Add the new manager to the current module
module = api.modules[api.cur_mod_id]
module.names[name or manager_name] = SymbolTableNode(
GDEF, new_manager_info, plugin_generated=True, no_serialize=False
)
if name is not None and name != new_manager_info.name:
# Unless names are equal, there's 2 symbol names that needs the manager info
module.names[name] = SymbolTableNode(GDEF, new_manager_info, plugin_generated=True)
module.names[new_manager_info.name] = SymbolTableNode(GDEF, new_manager_info, plugin_generated=True)
return new_manager_info
def create_manager_class(
api: SemanticAnalyzerPluginInterface, base_manager_info: TypeInfo, name: str, line: int
api: SemanticAnalyzer, base_manager_info: TypeInfo, name: str, line: int, with_unique_name: bool
) -> TypeInfo:
base_manager_instance = fill_typevars(base_manager_info)
@@ -280,17 +309,24 @@ def create_manager_class(
if any(has_placeholder(type_var) for type_var in base_manager_info.defn.type_vars):
raise helpers.IncompleteDefnException
manager_info = helpers.create_type_info(name, api.cur_mod_id, bases=[base_manager_instance])
if with_unique_name:
manager_info = helpers.add_new_class_for_module(
module=api.modules[api.cur_mod_id],
name=name,
bases=[base_manager_instance],
)
else:
manager_info = helpers.create_type_info(name, api.cur_mod_id, bases=[base_manager_instance])
manager_info.line = line
manager_info.type_vars = base_manager_info.type_vars
manager_info.defn.type_vars = base_manager_info.defn.type_vars
manager_info.defn.line = line
manager_info.metaclass_type = manager_info.calculate_metaclass_type()
return manager_info
def popuplate_manager_from_queryset(manager_info: TypeInfo, queryset_info: TypeInfo) -> None:
def populate_manager_from_queryset(manager_info: TypeInfo, queryset_info: TypeInfo) -> None:
"""
Add methods from the QuerySet class to the manager.
"""
@@ -318,7 +354,7 @@ def popuplate_manager_from_queryset(manager_info: TypeInfo, queryset_info: TypeI
helpers.add_new_sym_for_info(
manager_info,
name=name,
sym_type=AnyType(TypeOfAny.special_form),
sym_type=AnyType(TypeOfAny.implementation_artifact),
)
# For methods on BaseManager that return a queryset we need to update
@@ -330,5 +366,103 @@ def popuplate_manager_from_queryset(manager_info: TypeInfo, queryset_info: TypeI
helpers.add_new_sym_for_info(
manager_info,
name=method_name,
sym_type=AnyType(TypeOfAny.special_form),
sym_type=AnyType(TypeOfAny.implementation_artifact),
)
def create_new_manager_class_from_as_manager_method(ctx: DynamicClassDefContext) -> None:
"""
Insert a new manager class node for a
```
<manager name> = <QuerySet>.as_manager()
```
"""
semanal_api = helpers.get_semanal_api(ctx)
# Don't redeclare the manager class if we've already defined it.
manager_node = semanal_api.lookup_current_scope(ctx.name)
if manager_node and manager_node.type is not None:
# This is just a deferral run where our work is already finished
return
manager_sym = semanal_api.lookup_fully_qualified_or_none(fullnames.MANAGER_CLASS_FULLNAME)
assert manager_sym is not None
manager_base = manager_sym.node
if manager_base is None:
if not semanal_api.final_iteration:
semanal_api.defer()
return
assert isinstance(manager_base, TypeInfo)
callee = ctx.call.callee
assert isinstance(callee, MemberExpr)
assert isinstance(callee.expr, RefExpr)
queryset_info = callee.expr.node
if queryset_info is None:
if not semanal_api.final_iteration:
semanal_api.defer()
return
assert isinstance(queryset_info, TypeInfo)
manager_class_name = manager_base.name + "From" + queryset_info.name
current_module = semanal_api.modules[semanal_api.cur_mod_id]
existing_sym = current_module.names.get(manager_class_name)
if (
existing_sym is not None
and isinstance(existing_sym.node, TypeInfo)
and existing_sym.node.has_base(fullnames.MANAGER_CLASS_FULLNAME)
and existing_sym.node.metadata.get("django", {}).get("from_queryset_manager") == queryset_info.fullname
):
# Reuse an identical, already generated, manager
new_manager_info = existing_sym.node
else:
# Create a new `TypeInfo` instance for the manager type
try:
new_manager_info = create_manager_class(
api=semanal_api,
base_manager_info=manager_base,
name=manager_class_name,
line=ctx.call.line,
with_unique_name=True,
)
except helpers.IncompleteDefnException:
if not semanal_api.final_iteration:
semanal_api.defer()
return
populate_manager_from_queryset(new_manager_info, queryset_info)
register_dynamically_created_manager(
fullname=new_manager_info.fullname,
manager_name=manager_class_name,
manager_base=manager_base,
)
# So that the plugin will reparameterize the manager when it is constructed inside of a Model definition
helpers.add_new_manager_base(semanal_api, new_manager_info.fullname)
# Whenever `<QuerySet>.as_manager()` isn't called at class level, we want to ensure
# that the variable is an instance of our generated manager. Instead of the return
# value of `.as_manager()`. Though model argument is populated as `Any`.
# `transformers.models.AddManagers` will populate a model's manager(s), when it
# finds it on class level.
var = Var(name=ctx.name, type=Instance(new_manager_info, [AnyType(TypeOfAny.from_omitted_generics)]))
var.info = new_manager_info
var._fullname = f"{current_module.fullname}.{ctx.name}"
var.is_inferred = True
# Note: Order of `add_symbol_table_node` calls matters. Depending on what level
# we've found the `.as_manager()` call. Point here being that we want to replace the
# `.as_manager` return value with our newly created manager.
assert semanal_api.add_symbol_table_node(
ctx.name, SymbolTableNode(semanal_api.current_symbol_kind(), var, plugin_generated=True)
)
# Add the new manager to the current module
assert semanal_api.add_symbol_table_node(
# We'll use `new_manager_info.name` instead of `manager_class_name` here
# to handle possible name collisions, as it's unique.
new_manager_info.name,
# Note that the generated manager type is always inserted at module level
SymbolTableNode(GDEF, new_manager_info, plugin_generated=True),
)