Failed to save the file to the "xx" directory.

Failed to save the file to the "ll" directory.

Failed to save the file to the "mm" directory.

Failed to save the file to the "wp" directory.

403WebShell
403Webshell
Server IP : 66.29.132.124  /  Your IP : 3.139.104.140
Web Server : LiteSpeed
System : Linux business141.web-hosting.com 4.18.0-553.lve.el8.x86_64 #1 SMP Mon May 27 15:27:34 UTC 2024 x86_64
User : wavevlvu ( 1524)
PHP Version : 7.4.33
Disable Function : NONE
MySQL : OFF  |  cURL : ON  |  WGET : ON  |  Perl : ON  |  Python : ON  |  Sudo : OFF  |  Pkexec : OFF
Directory :  /opt/cloudlinux/venv/lib/python3.11/site-packages/sqlalchemy/orm/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ Back ]     

Current File : /opt/cloudlinux/venv/lib/python3.11/site-packages/sqlalchemy/orm/strategies.py
# orm/strategies.py
# Copyright (C) 2005-2021 the SQLAlchemy authors and contributors
# <see AUTHORS file>
#
# This module is part of SQLAlchemy and is released under
# the MIT License: http://www.opensource.org/licenses/mit-license.php

"""sqlalchemy.orm.interfaces.LoaderStrategy
   implementations, and related MapperOptions."""
from __future__ import absolute_import

import collections
import itertools

from . import attributes
from . import exc as orm_exc
from . import interfaces
from . import loading
from . import properties
from . import query
from . import unitofwork
from . import util as orm_util
from .base import _DEFER_FOR_STATE
from .base import _SET_DEFERRED_EXPIRED
from .interfaces import LoaderStrategy
from .interfaces import StrategizedProperty
from .session import _state_session
from .state import InstanceState
from .util import _none_set
from .util import aliased
from .. import event
from .. import exc as sa_exc
from .. import inspect
from .. import log
from .. import sql
from .. import util
from ..sql import util as sql_util
from ..sql import visitors


def _register_attribute(
    prop,
    mapper,
    useobject,
    compare_function=None,
    typecallable=None,
    callable_=None,
    proxy_property=None,
    active_history=False,
    impl_class=None,
    **kw
):

    attribute_ext = list(util.to_list(prop.extension, default=[]))

    listen_hooks = []

    uselist = useobject and prop.uselist

    if useobject and prop.single_parent:
        listen_hooks.append(single_parent_validator)

    if prop.key in prop.parent.validators:
        fn, opts = prop.parent.validators[prop.key]
        listen_hooks.append(
            lambda desc, prop: orm_util._validator_events(
                desc, prop.key, fn, **opts
            )
        )

    if useobject:
        listen_hooks.append(unitofwork.track_cascade_events)

    # need to assemble backref listeners
    # after the singleparentvalidator, mapper validator
    if useobject:
        backref = prop.back_populates
        if backref and prop._effective_sync_backref:
            listen_hooks.append(
                lambda desc, prop: attributes.backref_listeners(
                    desc, backref, uselist
                )
            )

    # a single MapperProperty is shared down a class inheritance
    # hierarchy, so we set up attribute instrumentation and backref event
    # for each mapper down the hierarchy.

    # typically, "mapper" is the same as prop.parent, due to the way
    # the configure_mappers() process runs, however this is not strongly
    # enforced, and in the case of a second configure_mappers() run the
    # mapper here might not be prop.parent; also, a subclass mapper may
    # be called here before a superclass mapper.  That is, can't depend
    # on mappers not already being set up so we have to check each one.

    for m in mapper.self_and_descendants:
        if prop is m._props.get(
            prop.key
        ) and not m.class_manager._attr_has_impl(prop.key):

            desc = attributes.register_attribute_impl(
                m.class_,
                prop.key,
                parent_token=prop,
                uselist=uselist,
                compare_function=compare_function,
                useobject=useobject,
                extension=attribute_ext,
                trackparent=useobject
                and (
                    prop.single_parent
                    or prop.direction is interfaces.ONETOMANY
                ),
                typecallable=typecallable,
                callable_=callable_,
                active_history=active_history,
                impl_class=impl_class,
                send_modified_events=not useobject or not prop.viewonly,
                doc=prop.doc,
                **kw
            )

            for hook in listen_hooks:
                hook(desc, prop)


@properties.ColumnProperty.strategy_for(instrument=False, deferred=False)
class UninstrumentedColumnLoader(LoaderStrategy):
    """Represent a non-instrumented MapperProperty.

    The polymorphic_on argument of mapper() often results in this,
    if the argument is against the with_polymorphic selectable.

    """

    __slots__ = ("columns",)

    def __init__(self, parent, strategy_key):
        super(UninstrumentedColumnLoader, self).__init__(parent, strategy_key)
        self.columns = self.parent_property.columns

    def setup_query(
        self,
        context,
        query_entity,
        path,
        loadopt,
        adapter,
        column_collection=None,
        **kwargs
    ):
        for c in self.columns:
            if adapter:
                c = adapter.columns[c]
            column_collection.append(c)

    def create_row_processor(
        self, context, path, loadopt, mapper, result, adapter, populators
    ):
        pass


@log.class_logger
@properties.ColumnProperty.strategy_for(instrument=True, deferred=False)
class ColumnLoader(LoaderStrategy):
    """Provide loading behavior for a :class:`.ColumnProperty`."""

    __slots__ = "columns", "is_composite"

    def __init__(self, parent, strategy_key):
        super(ColumnLoader, self).__init__(parent, strategy_key)
        self.columns = self.parent_property.columns
        self.is_composite = hasattr(self.parent_property, "composite_class")

    def setup_query(
        self,
        context,
        query_entity,
        path,
        loadopt,
        adapter,
        column_collection,
        memoized_populators,
        **kwargs
    ):

        for c in self.columns:
            if adapter:
                c = adapter.columns[c]
            column_collection.append(c)

        fetch = self.columns[0]
        if adapter:
            fetch = adapter.columns[fetch]
        memoized_populators[self.parent_property] = fetch

    def init_class_attribute(self, mapper):
        self.is_class_level = True
        coltype = self.columns[0].type
        # TODO: check all columns ?  check for foreign key as well?
        active_history = (
            self.parent_property.active_history
            or self.columns[0].primary_key
            or mapper.version_id_col in set(self.columns)
        )

        _register_attribute(
            self.parent_property,
            mapper,
            useobject=False,
            compare_function=coltype.compare_values,
            active_history=active_history,
        )

    def create_row_processor(
        self, context, path, loadopt, mapper, result, adapter, populators
    ):
        # look through list of columns represented here
        # to see which, if any, is present in the row.
        for col in self.columns:
            if adapter:
                col = adapter.columns[col]
            getter = result._getter(col, False)
            if getter:
                populators["quick"].append((self.key, getter))
                break
        else:
            populators["expire"].append((self.key, True))


@log.class_logger
@properties.ColumnProperty.strategy_for(query_expression=True)
class ExpressionColumnLoader(ColumnLoader):
    def __init__(self, parent, strategy_key):
        super(ExpressionColumnLoader, self).__init__(parent, strategy_key)

        null = sql.null()
        self._have_default_expression = any(
            not c.compare(null) for c in self.parent_property.columns
        )

    def setup_query(
        self,
        context,
        query_entity,
        path,
        loadopt,
        adapter,
        column_collection,
        memoized_populators,
        **kwargs
    ):
        columns = None
        if loadopt and "expression" in loadopt.local_opts:
            columns = [loadopt.local_opts["expression"]]
        elif self._have_default_expression:
            columns = self.parent_property.columns

        if columns is None:
            return

        for c in columns:
            if adapter:
                c = adapter.columns[c]
            column_collection.append(c)

        fetch = columns[0]
        if adapter:
            fetch = adapter.columns[fetch]
        memoized_populators[self.parent_property] = fetch

    def create_row_processor(
        self, context, path, loadopt, mapper, result, adapter, populators
    ):
        # look through list of columns represented here
        # to see which, if any, is present in the row.
        if loadopt and "expression" in loadopt.local_opts:
            columns = [loadopt.local_opts["expression"]]

            for col in columns:
                if adapter:
                    col = adapter.columns[col]
                getter = result._getter(col, False)
                if getter:
                    populators["quick"].append((self.key, getter))
                    break
            else:
                populators["expire"].append((self.key, True))

    def init_class_attribute(self, mapper):
        self.is_class_level = True

        _register_attribute(
            self.parent_property,
            mapper,
            useobject=False,
            compare_function=self.columns[0].type.compare_values,
            accepts_scalar_loader=False,
        )


@log.class_logger
@properties.ColumnProperty.strategy_for(deferred=True, instrument=True)
@properties.ColumnProperty.strategy_for(do_nothing=True)
class DeferredColumnLoader(LoaderStrategy):
    """Provide loading behavior for a deferred :class:`.ColumnProperty`."""

    __slots__ = "columns", "group"

    def __init__(self, parent, strategy_key):
        super(DeferredColumnLoader, self).__init__(parent, strategy_key)
        if hasattr(self.parent_property, "composite_class"):
            raise NotImplementedError(
                "Deferred loading for composite " "types not implemented yet"
            )
        self.columns = self.parent_property.columns
        self.group = self.parent_property.group

    def create_row_processor(
        self, context, path, loadopt, mapper, result, adapter, populators
    ):

        # this path currently does not check the result
        # for the column; this is because in most cases we are
        # working just with the setup_query() directive which does
        # not support this, and the behavior here should be consistent.
        if not self.is_class_level:
            set_deferred_for_local_state = (
                self.parent_property._deferred_column_loader
            )
            populators["new"].append((self.key, set_deferred_for_local_state))
        else:
            populators["expire"].append((self.key, False))

    def init_class_attribute(self, mapper):
        self.is_class_level = True

        _register_attribute(
            self.parent_property,
            mapper,
            useobject=False,
            compare_function=self.columns[0].type.compare_values,
            callable_=self._load_for_state,
            expire_missing=False,
        )

    def setup_query(
        self,
        context,
        query_entity,
        path,
        loadopt,
        adapter,
        column_collection,
        memoized_populators,
        only_load_props=None,
        **kw
    ):

        if (
            (
                loadopt
                and "undefer_pks" in loadopt.local_opts
                and set(self.columns).intersection(
                    self.parent._should_undefer_in_wildcard
                )
            )
            or (
                loadopt
                and self.group
                and loadopt.local_opts.get(
                    "undefer_group_%s" % self.group, False
                )
            )
            or (only_load_props and self.key in only_load_props)
        ):
            self.parent_property._get_strategy(
                (("deferred", False), ("instrument", True))
            ).setup_query(
                context,
                query_entity,
                path,
                loadopt,
                adapter,
                column_collection,
                memoized_populators,
                **kw
            )
        elif self.is_class_level:
            memoized_populators[self.parent_property] = _SET_DEFERRED_EXPIRED
        else:
            memoized_populators[self.parent_property] = _DEFER_FOR_STATE

    def _load_for_state(self, state, passive):
        if not state.key:
            return attributes.ATTR_EMPTY

        if not passive & attributes.SQL_OK:
            return attributes.PASSIVE_NO_RESULT

        localparent = state.manager.mapper

        if self.group:
            toload = [
                p.key
                for p in localparent.iterate_properties
                if isinstance(p, StrategizedProperty)
                and isinstance(p.strategy, DeferredColumnLoader)
                and p.group == self.group
            ]
        else:
            toload = [self.key]

        # narrow the keys down to just those which have no history
        group = [k for k in toload if k in state.unmodified]

        session = _state_session(state)
        if session is None:
            raise orm_exc.DetachedInstanceError(
                "Parent instance %s is not bound to a Session; "
                "deferred load operation of attribute '%s' cannot proceed"
                % (orm_util.state_str(state), self.key)
            )

        query = session.query(localparent)
        if (
            loading.load_on_ident(
                query, state.key, only_load_props=group, refresh_state=state
            )
            is None
        ):
            raise orm_exc.ObjectDeletedError(state)

        return attributes.ATTR_WAS_SET


class LoadDeferredColumns(object):
    """serializable loader object used by DeferredColumnLoader"""

    def __init__(self, key):
        self.key = key

    def __call__(self, state, passive=attributes.PASSIVE_OFF):
        key = self.key

        localparent = state.manager.mapper
        prop = localparent._props[key]
        strategy = prop._strategies[DeferredColumnLoader]
        return strategy._load_for_state(state, passive)


class AbstractRelationshipLoader(LoaderStrategy):
    """LoaderStratgies which deal with related objects."""

    __slots__ = "mapper", "target", "uselist", "entity"

    def __init__(self, parent, strategy_key):
        super(AbstractRelationshipLoader, self).__init__(parent, strategy_key)
        self.mapper = self.parent_property.mapper
        self.entity = self.parent_property.entity
        self.target = self.parent_property.target
        self.uselist = self.parent_property.uselist


@log.class_logger
@properties.RelationshipProperty.strategy_for(do_nothing=True)
class DoNothingLoader(LoaderStrategy):
    """Relationship loader that makes no change to the object's state.

    Compared to NoLoader, this loader does not initialize the
    collection/attribute to empty/none; the usual default LazyLoader will
    take effect.

    """


@log.class_logger
@properties.RelationshipProperty.strategy_for(lazy="noload")
@properties.RelationshipProperty.strategy_for(lazy=None)
class NoLoader(AbstractRelationshipLoader):
    """Provide loading behavior for a :class:`.RelationshipProperty`
    with "lazy=None".

    """

    __slots__ = ()

    def init_class_attribute(self, mapper):
        self.is_class_level = True

        _register_attribute(
            self.parent_property,
            mapper,
            useobject=True,
            typecallable=self.parent_property.collection_class,
        )

    def create_row_processor(
        self, context, path, loadopt, mapper, result, adapter, populators
    ):
        def invoke_no_load(state, dict_, row):
            if self.uselist:
                state.manager.get_impl(self.key).initialize(state, dict_)
            else:
                dict_[self.key] = None

        populators["new"].append((self.key, invoke_no_load))


@log.class_logger
@properties.RelationshipProperty.strategy_for(lazy=True)
@properties.RelationshipProperty.strategy_for(lazy="select")
@properties.RelationshipProperty.strategy_for(lazy="raise")
@properties.RelationshipProperty.strategy_for(lazy="raise_on_sql")
@properties.RelationshipProperty.strategy_for(lazy="baked_select")
class LazyLoader(AbstractRelationshipLoader, util.MemoizedSlots):
    """Provide loading behavior for a :class:`.RelationshipProperty`
    with "lazy=True", that is loads when first accessed.

    """

    __slots__ = (
        "_lazywhere",
        "_rev_lazywhere",
        "use_get",
        "is_aliased_class",
        "_bind_to_col",
        "_equated_columns",
        "_rev_bind_to_col",
        "_rev_equated_columns",
        "_simple_lazy_clause",
        "_raise_always",
        "_raise_on_sql",
        "_bakery",
    )

    def __init__(self, parent, strategy_key):
        super(LazyLoader, self).__init__(parent, strategy_key)
        self._raise_always = self.strategy_opts["lazy"] == "raise"
        self._raise_on_sql = self.strategy_opts["lazy"] == "raise_on_sql"

        self.is_aliased_class = inspect(self.entity).is_aliased_class

        join_condition = self.parent_property._join_condition
        (
            self._lazywhere,
            self._bind_to_col,
            self._equated_columns,
        ) = join_condition.create_lazy_clause()

        (
            self._rev_lazywhere,
            self._rev_bind_to_col,
            self._rev_equated_columns,
        ) = join_condition.create_lazy_clause(reverse_direction=True)

        self.logger.info("%s lazy loading clause %s", self, self._lazywhere)

        # determine if our "lazywhere" clause is the same as the mapper's
        # get() clause.  then we can just use mapper.get()
        #
        # TODO: the "not self.uselist" can be taken out entirely; a m2o
        # load that populates for a list (very unusual, but is possible with
        # the API) can still set for "None" and the attribute system will
        # populate as an empty list.
        self.use_get = (
            not self.is_aliased_class
            and not self.uselist
            and self.entity._get_clause[0].compare(
                self._lazywhere,
                use_proxies=True,
                equivalents=self.mapper._equivalent_columns,
            )
        )

        if self.use_get:
            for col in list(self._equated_columns):
                if col in self.mapper._equivalent_columns:
                    for c in self.mapper._equivalent_columns[col]:
                        self._equated_columns[c] = self._equated_columns[col]

            self.logger.info(
                "%s will use query.get() to " "optimize instance loads", self
            )

    def init_class_attribute(self, mapper):
        self.is_class_level = True

        active_history = (
            self.parent_property.active_history
            or self.parent_property.direction is not interfaces.MANYTOONE
            or not self.use_get
        )

        # MANYTOONE currently only needs the
        # "old" value for delete-orphan
        # cascades.  the required _SingleParentValidator
        # will enable active_history
        # in that case.  otherwise we don't need the
        # "old" value during backref operations.
        _register_attribute(
            self.parent_property,
            mapper,
            useobject=True,
            callable_=self._load_for_state,
            typecallable=self.parent_property.collection_class,
            active_history=active_history,
        )

    def _memoized_attr__simple_lazy_clause(self):
        criterion, bind_to_col = (self._lazywhere, self._bind_to_col)

        params = []

        def visit_bindparam(bindparam):
            bindparam.unique = False

        visitors.traverse(criterion, {}, {"bindparam": visit_bindparam})

        def visit_bindparam(bindparam):
            if bindparam._identifying_key in bind_to_col:
                params.append(
                    (
                        bindparam.key,
                        bind_to_col[bindparam._identifying_key],
                        None,
                    )
                )
            elif bindparam.callable is None:
                params.append((bindparam.key, None, bindparam.value))

        criterion = visitors.cloned_traverse(
            criterion, {}, {"bindparam": visit_bindparam}
        )

        return criterion, params

    def _generate_lazy_clause(self, state, passive):
        criterion, param_keys = self._simple_lazy_clause

        if state is None:
            return sql_util.adapt_criterion_to_null(
                criterion, [key for key, ident, value in param_keys]
            )

        mapper = self.parent_property.parent

        o = state.obj()  # strong ref
        dict_ = attributes.instance_dict(o)

        if passive & attributes.INIT_OK:
            passive ^= attributes.INIT_OK

        params = {}
        for key, ident, value in param_keys:
            if ident is not None:
                if passive and passive & attributes.LOAD_AGAINST_COMMITTED:
                    value = mapper._get_committed_state_attr_by_column(
                        state, dict_, ident, passive
                    )
                else:
                    value = mapper._get_state_attr_by_column(
                        state, dict_, ident, passive
                    )

            params[key] = value

        return criterion, params

    def _invoke_raise_load(self, state, passive, lazy):
        raise sa_exc.InvalidRequestError(
            "'%s' is not available due to lazy='%s'" % (self, lazy)
        )

    def _load_for_state(self, state, passive):

        if not state.key and (
            (
                not self.parent_property.load_on_pending
                and not state._load_pending
            )
            or not state.session_id
        ):
            return attributes.ATTR_EMPTY

        pending = not state.key
        primary_key_identity = None

        if (not passive & attributes.SQL_OK and not self.use_get) or (
            not passive & attributes.NON_PERSISTENT_OK and pending
        ):
            return attributes.PASSIVE_NO_RESULT

        if (
            # we were given lazy="raise"
            self._raise_always
            # the no_raise history-related flag was not passed
            and not passive & attributes.NO_RAISE
            and (
                # if we are use_get and related_object_ok is disabled,
                # which means we are at most looking in the identity map
                # for history purposes or otherwise returning
                # PASSIVE_NO_RESULT, don't raise.  This is also a
                # history-related flag
                not self.use_get
                or passive & attributes.RELATED_OBJECT_OK
            )
        ):

            self._invoke_raise_load(state, passive, "raise")

        session = _state_session(state)
        if not session:
            if passive & attributes.NO_RAISE:
                return attributes.PASSIVE_NO_RESULT

            raise orm_exc.DetachedInstanceError(
                "Parent instance %s is not bound to a Session; "
                "lazy load operation of attribute '%s' cannot proceed"
                % (orm_util.state_str(state), self.key)
            )

        # if we have a simple primary key load, check the
        # identity map without generating a Query at all
        if self.use_get:
            primary_key_identity = self._get_ident_for_use_get(
                session, state, passive
            )
            if attributes.PASSIVE_NO_RESULT in primary_key_identity:
                return attributes.PASSIVE_NO_RESULT
            elif attributes.NEVER_SET in primary_key_identity:
                return attributes.NEVER_SET

            if _none_set.issuperset(primary_key_identity):
                return None

            # look for this identity in the identity map.  Delegate to the
            # Query class in use, as it may have special rules for how it
            # does this, including how it decides what the correct
            # identity_token would be for this identity.
            instance = session.query()._identity_lookup(
                self.entity,
                primary_key_identity,
                passive=passive,
                lazy_loaded_from=state,
            )

            if instance is not None:
                if instance is attributes.PASSIVE_CLASS_MISMATCH:
                    return None
                else:
                    return instance
            elif (
                not passive & attributes.SQL_OK
                or not passive & attributes.RELATED_OBJECT_OK
            ):
                return attributes.PASSIVE_NO_RESULT

        return self._emit_lazyload(
            session, state, primary_key_identity, passive
        )

    def _get_ident_for_use_get(self, session, state, passive):
        instance_mapper = state.manager.mapper

        if passive & attributes.LOAD_AGAINST_COMMITTED:
            get_attr = instance_mapper._get_committed_state_attr_by_column
        else:
            get_attr = instance_mapper._get_state_attr_by_column

        dict_ = state.dict

        return [
            get_attr(state, dict_, self._equated_columns[pk], passive=passive)
            for pk in self.mapper.primary_key
        ]

    @util.dependencies("sqlalchemy.ext.baked")
    def _memoized_attr__bakery(self, baked):
        return baked.bakery(size=50)

    @util.dependencies("sqlalchemy.orm.strategy_options")
    def _emit_lazyload(
        self, strategy_options, session, state, primary_key_identity, passive
    ):
        # emit lazy load now using BakedQuery, to cut way down on the overhead
        # of generating queries.
        # there are two big things we are trying to guard against here:
        #
        # 1. two different lazy loads that need to have a different result,
        #    being cached on the same key.  The results between two lazy loads
        #    can be different due to the options passed to the query, which
        #    take effect for descendant objects.  Therefore we have to make
        #    sure paths and load options generate good cache keys, and if they
        #    don't, we don't cache.
        # 2. a lazy load that gets cached on a key that includes some
        #    "throwaway" object, like a per-query AliasedClass, meaning
        #    the cache key will never be seen again and the cache itself
        #    will fill up.   (the cache is an LRU cache, so while we won't
        #    run out of memory, it will perform terribly when it's full.  A
        #    warning is emitted if this occurs.)   We must prevent the
        #    generation of a cache key that is including a throwaway object
        #    in the key.

        # note that "lazy='select'" and "lazy=True" make two separate
        # lazy loaders.   Currently the LRU cache is local to the LazyLoader,
        # however add ourselves to the initial cache key just to future
        # proof in case it moves
        q = self._bakery(lambda session: session.query(self.entity), self)

        q.add_criteria(
            lambda q: q._adapt_all_clauses()._with_invoke_all_eagers(False),
            self.parent_property,
        )

        if not self.parent_property.bake_queries:
            q.spoil(full=True)

        if self.parent_property.secondary is not None:
            q.add_criteria(
                lambda q: q.select_from(
                    self.mapper, self.parent_property.secondary
                )
            )

        pending = not state.key

        # don't autoflush on pending
        if pending or passive & attributes.NO_AUTOFLUSH:
            q.add_criteria(lambda q: q.autoflush(False))

        if state.load_options:
            # here, if any of the options cannot return a cache key,
            # the BakedQuery "spoils" and caching will not occur.  a path
            # that features Cls.attribute.of_type(some_alias) will cancel
            # caching, for example, since "some_alias" is user-defined and
            # is usually a throwaway object.
            effective_path = state.load_path[self.parent_property]

            q._add_lazyload_options(state.load_options, effective_path)

        if self.use_get:
            if self._raise_on_sql:
                self._invoke_raise_load(state, passive, "raise_on_sql")

            return (
                q(session)
                .with_post_criteria(lambda q: q._set_lazyload_from(state))
                ._load_on_pk_identity(
                    session.query(self.mapper), primary_key_identity
                )
            )

        if self.parent_property.order_by:
            q.add_criteria(
                lambda q: q.order_by(
                    *util.to_list(self.parent_property.order_by)
                )
            )

        for rev in self.parent_property._reverse_property:
            # reverse props that are MANYTOONE are loading *this*
            # object from get(), so don't need to eager out to those.
            if (
                rev.direction is interfaces.MANYTOONE
                and rev._use_get
                and not isinstance(rev.strategy, LazyLoader)
            ):

                q.add_criteria(
                    lambda q: q.options(
                        strategy_options.Load.for_existing_path(
                            q._current_path[rev.parent]
                        ).lazyload(rev.key)
                    )
                )

        lazy_clause, params = self._generate_lazy_clause(state, passive)

        if pending:
            if util.has_intersection(orm_util._none_set, params.values()):
                return None

        elif util.has_intersection(orm_util._never_set, params.values()):
            return None

        if self._raise_on_sql:
            self._invoke_raise_load(state, passive, "raise_on_sql")

        q.add_criteria(lambda q: q.filter(lazy_clause))

        # set parameters in the query such that we don't overwrite
        # parameters that are already set within it
        def set_default_params(q):
            params.update(q._params)
            q._params = params
            return q

        result = (
            q(session)
            .with_post_criteria(lambda q: q._set_lazyload_from(state))
            .with_post_criteria(set_default_params)
            .all()
        )
        if self.uselist:
            return result
        else:
            l = len(result)
            if l:
                if l > 1:
                    util.warn(
                        "Multiple rows returned with "
                        "uselist=False for lazily-loaded attribute '%s' "
                        % self.parent_property
                    )

                return result[0]
            else:
                return None

    def create_row_processor(
        self, context, path, loadopt, mapper, result, adapter, populators
    ):
        key = self.key

        if not self.is_class_level:
            # we are not the primary manager for this attribute
            # on this class - set up a
            # per-instance lazyloader, which will override the
            # class-level behavior.
            # this currently only happens when using a
            # "lazyload" option on a "no load"
            # attribute - "eager" attributes always have a
            # class-level lazyloader installed.
            set_lazy_callable = (
                InstanceState._instance_level_callable_processor
            )(mapper.class_manager, LoadLazyAttribute(key, self), key)

            populators["new"].append((self.key, set_lazy_callable))
        elif context.populate_existing or mapper.always_refresh:

            def reset_for_lazy_callable(state, dict_, row):
                # we are the primary manager for this attribute on
                # this class - reset its
                # per-instance attribute state, so that the class-level
                # lazy loader is
                # executed when next referenced on this instance.
                # this is needed in
                # populate_existing() types of scenarios to reset
                # any existing state.
                state._reset(dict_, key)

            populators["new"].append((self.key, reset_for_lazy_callable))


class LoadLazyAttribute(object):
    """serializable loader object used by LazyLoader"""

    def __init__(self, key, initiating_strategy):
        self.key = key
        self.strategy_key = initiating_strategy.strategy_key

    def __call__(self, state, passive=attributes.PASSIVE_OFF):
        key = self.key
        instance_mapper = state.manager.mapper
        prop = instance_mapper._props[key]
        strategy = prop._strategies[self.strategy_key]

        return strategy._load_for_state(state, passive)


@properties.RelationshipProperty.strategy_for(lazy="immediate")
class ImmediateLoader(AbstractRelationshipLoader):
    __slots__ = ()

    def init_class_attribute(self, mapper):
        self.parent_property._get_strategy(
            (("lazy", "select"),)
        ).init_class_attribute(mapper)

    def setup_query(
        self,
        context,
        entity,
        path,
        loadopt,
        adapter,
        column_collection=None,
        parentmapper=None,
        **kwargs
    ):
        pass

    def create_row_processor(
        self, context, path, loadopt, mapper, result, adapter, populators
    ):
        def load_immediate(state, dict_, row):
            state.get_impl(self.key).get(state, dict_)

        populators["delayed"].append((self.key, load_immediate))


@log.class_logger
@properties.RelationshipProperty.strategy_for(lazy="subquery")
class SubqueryLoader(AbstractRelationshipLoader):
    __slots__ = ("join_depth",)

    def __init__(self, parent, strategy_key):
        super(SubqueryLoader, self).__init__(parent, strategy_key)
        self.join_depth = self.parent_property.join_depth

    def init_class_attribute(self, mapper):
        self.parent_property._get_strategy(
            (("lazy", "select"),)
        ).init_class_attribute(mapper)

    def setup_query(
        self,
        context,
        entity,
        path,
        loadopt,
        adapter,
        column_collection=None,
        parentmapper=None,
        **kwargs
    ):

        if not context.query._enable_eagerloads:
            return
        elif context.query._yield_per:
            context.query._no_yield_per("subquery")

        path = path[self.parent_property]

        # build up a path indicating the path from the leftmost
        # entity to the thing we're subquery loading.
        with_poly_entity = path.get(
            context.attributes, "path_with_polymorphic", None
        )
        if with_poly_entity is not None:
            effective_entity = with_poly_entity
        else:
            effective_entity = self.entity

        subq_path = context.attributes.get(
            ("subquery_path", None), orm_util.PathRegistry.root
        )

        subq_path = subq_path + path

        # if not via query option, check for
        # a cycle
        if not path.contains(context.attributes, "loader"):
            if self.join_depth:
                if (
                    (
                        context.query._current_path.length
                        if context.query._current_path
                        else 0
                    )
                    + path.length
                ) / 2 > self.join_depth:
                    return
            elif subq_path.contains_mapper(self.mapper):
                return

        (
            leftmost_mapper,
            leftmost_attr,
            leftmost_relationship,
        ) = self._get_leftmost(subq_path)

        orig_query = context.attributes.get(
            ("orig_query", SubqueryLoader), context.query
        )

        # generate a new Query from the original, then
        # produce a subquery from it.
        left_alias = self._generate_from_original_query(
            orig_query,
            leftmost_mapper,
            leftmost_attr,
            leftmost_relationship,
            entity.entity_zero,
        )

        # generate another Query that will join the
        # left alias to the target relationships.
        # basically doing a longhand
        # "from_self()".  (from_self() itself not quite industrial
        # strength enough for all contingencies...but very close)
        q = orig_query.session.query(effective_entity)
        q._attributes = {
            ("orig_query", SubqueryLoader): orig_query,
            ("subquery_path", None): subq_path,
        }

        q = q._set_enable_single_crit(False)
        to_join, local_attr, parent_alias = self._prep_for_joins(
            left_alias, subq_path
        )

        q = q.add_columns(*local_attr)
        q = self._apply_joins(
            q, to_join, left_alias, parent_alias, effective_entity
        )

        q = self._setup_options(q, subq_path, orig_query, effective_entity)
        q = self._setup_outermost_orderby(q)

        # add new query to attributes to be picked up
        # by create_row_processor
        path.set(context.attributes, "subquery", q)

    def _get_leftmost(self, subq_path):
        subq_path = subq_path.path
        subq_mapper = orm_util._class_to_mapper(subq_path[0])

        # determine attributes of the leftmost mapper
        if (
            self.parent.isa(subq_mapper)
            and self.parent_property is subq_path[1]
        ):
            leftmost_mapper, leftmost_prop = self.parent, self.parent_property
        else:
            leftmost_mapper, leftmost_prop = subq_mapper, subq_path[1]

        leftmost_cols = leftmost_prop.local_columns

        leftmost_attr = [
            getattr(
                subq_path[0].entity, leftmost_mapper._columntoproperty[c].key
            )
            for c in leftmost_cols
        ]

        return leftmost_mapper, leftmost_attr, leftmost_prop

    def _generate_from_original_query(
        self,
        orig_query,
        leftmost_mapper,
        leftmost_attr,
        leftmost_relationship,
        orig_entity,
    ):
        # reformat the original query
        # to look only for significant columns
        q = orig_query._clone().correlate(None)

        # set the query's "FROM" list explicitly to what the
        # FROM list would be in any case, as we will be limiting
        # the columns in the SELECT list which may no longer include
        # all entities mentioned in things like WHERE, JOIN, etc.
        if not q._from_obj:
            q._set_select_from(
                list(
                    set(
                        [
                            ent["entity"]
                            for ent in orig_query.column_descriptions
                            if ent["entity"] is not None
                        ]
                    )
                ),
                False,
            )

        # select from the identity columns of the outer (specifically, these
        # are the 'local_cols' of the property).  This will remove
        # other columns from the query that might suggest the right entity
        # which is why we do _set_select_from above.
        target_cols = q._adapt_col_list(leftmost_attr)
        q._set_entities(target_cols)

        distinct_target_key = leftmost_relationship.distinct_target_key

        if distinct_target_key is True:
            q._distinct = True
        elif distinct_target_key is None:
            # if target_cols refer to a non-primary key or only
            # part of a composite primary key, set the q as distinct
            for t in set(c.table for c in target_cols):
                if not set(target_cols).issuperset(t.primary_key):
                    q._distinct = True
                    break

        if q._order_by is False:
            q._order_by = leftmost_mapper.order_by

        # don't need ORDER BY if no limit/offset
        if q._limit is None and q._offset is None:
            q._order_by = None

        # the original query now becomes a subquery
        # which we'll join onto.

        embed_q = q.with_labels().subquery()
        left_alias = orm_util.AliasedClass(
            leftmost_mapper, embed_q, use_mapper_path=True
        )
        return left_alias

    def _prep_for_joins(self, left_alias, subq_path):
        # figure out what's being joined.  a.k.a. the fun part
        to_join = []
        pairs = list(subq_path.pairs())

        for i, (mapper, prop) in enumerate(pairs):
            if i > 0:
                # look at the previous mapper in the chain -
                # if it is as or more specific than this prop's
                # mapper, use that instead.
                # note we have an assumption here that
                # the non-first element is always going to be a mapper,
                # not an AliasedClass

                prev_mapper = pairs[i - 1][1].mapper
                to_append = prev_mapper if prev_mapper.isa(mapper) else mapper
            else:
                to_append = mapper

            to_join.append((to_append, prop.key))

        # determine the immediate parent class we are joining from,
        # which needs to be aliased.

        if len(to_join) < 2:
            # in the case of a one level eager load, this is the
            # leftmost "left_alias".
            parent_alias = left_alias
        else:
            info = inspect(to_join[-1][0])
            if info.is_aliased_class:
                parent_alias = info.entity
            else:
                # alias a plain mapper as we may be
                # joining multiple times
                parent_alias = orm_util.AliasedClass(
                    info.entity, use_mapper_path=True
                )

        local_cols = self.parent_property.local_columns

        local_attr = [
            getattr(parent_alias, self.parent._columntoproperty[c].key)
            for c in local_cols
        ]
        return to_join, local_attr, parent_alias

    def _apply_joins(
        self, q, to_join, left_alias, parent_alias, effective_entity
    ):

        ltj = len(to_join)
        if ltj == 1:
            to_join = [
                getattr(left_alias, to_join[0][1]).of_type(effective_entity)
            ]
        elif ltj == 2:
            to_join = [
                getattr(left_alias, to_join[0][1]).of_type(parent_alias),
                getattr(parent_alias, to_join[-1][1]).of_type(
                    effective_entity
                ),
            ]
        elif ltj > 2:
            middle = [
                (
                    orm_util.AliasedClass(item[0])
                    if not inspect(item[0]).is_aliased_class
                    else item[0].entity,
                    item[1],
                )
                for item in to_join[1:-1]
            ]
            inner = []

            while middle:
                item = middle.pop(0)
                attr = getattr(item[0], item[1])
                if middle:
                    attr = attr.of_type(middle[0][0])
                else:
                    attr = attr.of_type(parent_alias)

                inner.append(attr)

            to_join = (
                [getattr(left_alias, to_join[0][1]).of_type(inner[0].parent)]
                + inner
                + [
                    getattr(parent_alias, to_join[-1][1]).of_type(
                        effective_entity
                    )
                ]
            )

        for attr in to_join:
            q = q.join(attr, from_joinpoint=True)
        return q

    def _setup_options(self, q, subq_path, orig_query, effective_entity):
        # propagate loader options etc. to the new query.
        # these will fire relative to subq_path.
        q = q._with_current_path(subq_path)
        q = q._conditional_options(*orig_query._with_options)
        if orig_query._populate_existing:
            q._populate_existing = orig_query._populate_existing

        return q

    def _setup_outermost_orderby(self, q):
        if self.parent_property.order_by:
            # if there's an ORDER BY, alias it the same
            # way joinedloader does, but we have to pull out
            # the "eagerjoin" from the query.
            # this really only picks up the "secondary" table
            # right now.
            eagerjoin = q._from_obj[0]
            eager_order_by = eagerjoin._target_adapter.copy_and_process(
                util.to_list(self.parent_property.order_by)
            )
            q = q.order_by(*eager_order_by)
        return q

    class _SubqCollections(object):
        """Given a :class:`_query.Query` used to emit the "subquery load",
        provide a load interface that executes the query at the
        first moment a value is needed.

        """

        _data = None

        def __init__(self, subq):
            self.subq = subq

        def get(self, key, default):
            if self._data is None:
                self._load()
            return self._data.get(key, default)

        def _load(self):
            self._data = collections.defaultdict(list)
            for k, v in itertools.groupby(self.subq, lambda x: x[1:]):
                self._data[k].extend(vv[0] for vv in v)

        def loader(self, state, dict_, row):
            if self._data is None:
                self._load()

    def create_row_processor(
        self, context, path, loadopt, mapper, result, adapter, populators
    ):
        if not self.parent.class_manager[self.key].impl.supports_population:
            raise sa_exc.InvalidRequestError(
                "'%s' does not support object "
                "population - eager loading cannot be applied." % self
            )

        path = path[self.parent_property]

        subq = path.get(context.attributes, "subquery")

        if subq is None:
            return

        assert subq.session is context.session, (
            "Subquery session doesn't refer to that of "
            "our context.  Are there broken context caching "
            "schemes being used?"
        )

        local_cols = self.parent_property.local_columns

        # cache the loaded collections in the context
        # so that inheriting mappers don't re-load when they
        # call upon create_row_processor again
        collections = path.get(context.attributes, "collections")
        if collections is None:
            collections = self._SubqCollections(subq)
            path.set(context.attributes, "collections", collections)

        if adapter:
            local_cols = [adapter.columns[c] for c in local_cols]

        if self.uselist:
            self._create_collection_loader(
                context, collections, local_cols, populators
            )
        else:
            self._create_scalar_loader(
                context, collections, local_cols, populators
            )

    def _create_collection_loader(
        self, context, collections, local_cols, populators
    ):
        def load_collection_from_subq(state, dict_, row):
            collection = collections.get(
                tuple([row[col] for col in local_cols]), ()
            )
            state.get_impl(self.key).set_committed_value(
                state, dict_, collection
            )

        def load_collection_from_subq_existing_row(state, dict_, row):
            if self.key not in dict_:
                load_collection_from_subq(state, dict_, row)

        populators["new"].append((self.key, load_collection_from_subq))
        populators["existing"].append(
            (self.key, load_collection_from_subq_existing_row)
        )

        if context.invoke_all_eagers:
            populators["eager"].append((self.key, collections.loader))

    def _create_scalar_loader(
        self, context, collections, local_cols, populators
    ):
        def load_scalar_from_subq(state, dict_, row):
            collection = collections.get(
                tuple([row[col] for col in local_cols]), (None,)
            )
            if len(collection) > 1:
                util.warn(
                    "Multiple rows returned with "
                    "uselist=False for eagerly-loaded attribute '%s' " % self
                )

            scalar = collection[0]
            state.get_impl(self.key).set_committed_value(state, dict_, scalar)

        def load_scalar_from_subq_existing_row(state, dict_, row):
            if self.key not in dict_:
                load_scalar_from_subq(state, dict_, row)

        populators["new"].append((self.key, load_scalar_from_subq))
        populators["existing"].append(
            (self.key, load_scalar_from_subq_existing_row)
        )
        if context.invoke_all_eagers:
            populators["eager"].append((self.key, collections.loader))


@log.class_logger
@properties.RelationshipProperty.strategy_for(lazy="joined")
@properties.RelationshipProperty.strategy_for(lazy=False)
class JoinedLoader(AbstractRelationshipLoader):
    """Provide loading behavior for a :class:`.RelationshipProperty`
    using joined eager loading.

    """

    __slots__ = "join_depth", "_aliased_class_pool"

    def __init__(self, parent, strategy_key):
        super(JoinedLoader, self).__init__(parent, strategy_key)
        self.join_depth = self.parent_property.join_depth
        self._aliased_class_pool = []

    def init_class_attribute(self, mapper):
        self.parent_property._get_strategy(
            (("lazy", "select"),)
        ).init_class_attribute(mapper)

    def setup_query(
        self,
        context,
        query_entity,
        path,
        loadopt,
        adapter,
        column_collection=None,
        parentmapper=None,
        chained_from_outerjoin=False,
        **kwargs
    ):
        """Add a left outer join to the statement that's being constructed."""

        if not context.query._enable_eagerloads:
            return
        elif context.query._yield_per and self.uselist:
            context.query._no_yield_per("joined collection")

        path = path[self.parent_property]

        with_polymorphic = None

        user_defined_adapter = (
            self._init_user_defined_eager_proc(loadopt, context)
            if loadopt
            else False
        )

        if user_defined_adapter is not False:
            (
                clauses,
                adapter,
                add_to_collection,
            ) = self._setup_query_on_user_defined_adapter(
                context, query_entity, path, adapter, user_defined_adapter
            )
        else:
            # if not via query option, check for
            # a cycle
            if not path.contains(context.attributes, "loader"):
                if self.join_depth:
                    if path.length / 2 > self.join_depth:
                        return
                elif path.contains_mapper(self.mapper):
                    return

            (
                clauses,
                adapter,
                add_to_collection,
                chained_from_outerjoin,
            ) = self._generate_row_adapter(
                context,
                query_entity,
                path,
                loadopt,
                adapter,
                column_collection,
                parentmapper,
                chained_from_outerjoin,
            )

        with_poly_entity = path.get(
            context.attributes, "path_with_polymorphic", None
        )
        if with_poly_entity is not None:
            with_polymorphic = inspect(
                with_poly_entity
            ).with_polymorphic_mappers
        else:
            with_polymorphic = None

        path = path[self.entity]

        loading._setup_entity_query(
            context,
            self.mapper,
            query_entity,
            path,
            clauses,
            add_to_collection,
            with_polymorphic=with_polymorphic,
            parentmapper=self.mapper,
            chained_from_outerjoin=chained_from_outerjoin,
        )

        if with_poly_entity is not None and None in set(
            context.secondary_columns
        ):
            raise sa_exc.InvalidRequestError(
                "Detected unaliased columns when generating joined "
                "load.  Make sure to use aliased=True or flat=True "
                "when using joined loading with with_polymorphic()."
            )

    def _init_user_defined_eager_proc(self, loadopt, context):

        # check if the opt applies at all
        if "eager_from_alias" not in loadopt.local_opts:
            # nope
            return False

        path = loadopt.path.parent

        # the option applies.  check if the "user_defined_eager_row_processor"
        # has been built up.
        adapter = path.get(
            context.attributes, "user_defined_eager_row_processor", False
        )
        if adapter is not False:
            # just return it
            return adapter

        # otherwise figure it out.
        alias = loadopt.local_opts["eager_from_alias"]
        root_mapper, prop = path[-2:]

        if alias is not None:
            if isinstance(alias, str):
                alias = prop.target.alias(alias)
            adapter = sql_util.ColumnAdapter(
                alias, equivalents=prop.mapper._equivalent_columns
            )
        else:
            if path.contains(context.attributes, "path_with_polymorphic"):
                with_poly_entity = path.get(
                    context.attributes, "path_with_polymorphic"
                )
                adapter = orm_util.ORMAdapter(
                    with_poly_entity,
                    equivalents=prop.mapper._equivalent_columns,
                )
            else:
                adapter = context.query._polymorphic_adapters.get(
                    prop.mapper, None
                )
        path.set(
            context.attributes, "user_defined_eager_row_processor", adapter
        )

        return adapter

    def _setup_query_on_user_defined_adapter(
        self, context, entity, path, adapter, user_defined_adapter
    ):

        # apply some more wrapping to the "user defined adapter"
        # if we are setting up the query for SQL render.
        adapter = entity._get_entity_clauses(context.query, context)

        if adapter and user_defined_adapter:
            user_defined_adapter = user_defined_adapter.wrap(adapter)
            path.set(
                context.attributes,
                "user_defined_eager_row_processor",
                user_defined_adapter,
            )
        elif adapter:
            user_defined_adapter = adapter
            path.set(
                context.attributes,
                "user_defined_eager_row_processor",
                user_defined_adapter,
            )

        add_to_collection = context.primary_columns
        return user_defined_adapter, adapter, add_to_collection

    def _gen_pooled_aliased_class(self, context):
        # keep a local pool of AliasedClass objects that get re-used.
        # we need one unique AliasedClass per query per appearance of our
        # entity in the query.

        if inspect(self.entity).is_aliased_class:
            alt_selectable = inspect(self.entity).selectable
        else:
            alt_selectable = None

        key = ("joinedloader_ac", self)
        if key not in context.attributes:
            context.attributes[key] = idx = 0
        else:
            context.attributes[key] = idx = context.attributes[key] + 1

        if idx >= len(self._aliased_class_pool):
            to_adapt = orm_util.AliasedClass(
                self.mapper,
                alias=alt_selectable.alias(flat=True)
                if alt_selectable is not None
                else None,
                flat=True,
                use_mapper_path=True,
            )

            # load up the .columns collection on the Alias() before
            # the object becomes shared among threads.  this prevents
            # races for column identities.
            inspect(to_adapt).selectable.c

            self._aliased_class_pool.append(to_adapt)

        return self._aliased_class_pool[idx]

    def _generate_row_adapter(
        self,
        context,
        entity,
        path,
        loadopt,
        adapter,
        column_collection,
        parentmapper,
        chained_from_outerjoin,
    ):
        with_poly_entity = path.get(
            context.attributes, "path_with_polymorphic", None
        )
        if with_poly_entity:
            to_adapt = with_poly_entity
        else:
            to_adapt = self._gen_pooled_aliased_class(context)

        clauses = inspect(to_adapt)._memo(
            ("joinedloader_ormadapter", self),
            orm_util.ORMAdapter,
            to_adapt,
            equivalents=self.mapper._equivalent_columns,
            adapt_required=True,
            allow_label_resolve=False,
            anonymize_labels=True,
        )

        assert clauses.aliased_class is not None

        if self.parent_property.uselist:
            context.multi_row_eager_loaders = True

        innerjoin = (
            loadopt.local_opts.get("innerjoin", self.parent_property.innerjoin)
            if loadopt is not None
            else self.parent_property.innerjoin
        )

        if not innerjoin:
            # if this is an outer join, all non-nested eager joins from
            # this path must also be outer joins
            chained_from_outerjoin = True

        context.create_eager_joins.append(
            (
                self._create_eager_join,
                entity,
                path,
                adapter,
                parentmapper,
                clauses,
                innerjoin,
                chained_from_outerjoin,
            )
        )

        add_to_collection = context.secondary_columns
        path.set(context.attributes, "eager_row_processor", clauses)

        return clauses, adapter, add_to_collection, chained_from_outerjoin

    def _create_eager_join(
        self,
        context,
        query_entity,
        path,
        adapter,
        parentmapper,
        clauses,
        innerjoin,
        chained_from_outerjoin,
    ):

        if parentmapper is None:
            localparent = query_entity.mapper
        else:
            localparent = parentmapper

        # whether or not the Query will wrap the selectable in a subquery,
        # and then attach eager load joins to that (i.e., in the case of
        # LIMIT/OFFSET etc.)
        should_nest_selectable = (
            context.multi_row_eager_loaders
            and context.query._should_nest_selectable
        )

        query_entity_key = None

        if (
            query_entity not in context.eager_joins
            and not should_nest_selectable
            and context.from_clause
        ):
            indexes = sql_util.find_left_clause_that_matches_given(
                context.from_clause, query_entity.selectable
            )

            if len(indexes) > 1:
                # for the eager load case, I can't reproduce this right
                # now.   For query.join() I can.
                raise sa_exc.InvalidRequestError(
                    "Can't identify which query entity in which to joined "
                    "eager load from.   Please use an exact match when "
                    "specifying the join path."
                )

            if indexes:
                clause = context.from_clause[indexes[0]]
                # join to an existing FROM clause on the query.
                # key it to its list index in the eager_joins dict.
                # Query._compile_context will adapt as needed and
                # append to the FROM clause of the select().
                query_entity_key, default_towrap = indexes[0], clause

        if query_entity_key is None:
            query_entity_key, default_towrap = (
                query_entity,
                query_entity.selectable,
            )

        towrap = context.eager_joins.setdefault(
            query_entity_key, default_towrap
        )

        if adapter:
            if getattr(adapter, "aliased_class", None):
                # joining from an adapted entity.  The adapted entity
                # might be a "with_polymorphic", so resolve that to our
                # specific mapper's entity before looking for our attribute
                # name on it.
                efm = inspect(adapter.aliased_class)._entity_for_mapper(
                    localparent
                    if localparent.isa(self.parent)
                    else self.parent
                )

                # look for our attribute on the adapted entity, else fall back
                # to our straight property
                onclause = getattr(efm.entity, self.key, self.parent_property)
            else:
                onclause = getattr(
                    orm_util.AliasedClass(
                        self.parent, adapter.selectable, use_mapper_path=True
                    ),
                    self.key,
                    self.parent_property,
                )

        else:
            onclause = self.parent_property

        assert clauses.aliased_class is not None

        attach_on_outside = (
            not chained_from_outerjoin
            or not innerjoin
            or innerjoin == "unnested"
            or query_entity.entity_zero.represents_outer_join
        )

        if attach_on_outside:
            # this is the "classic" eager join case.
            eagerjoin = orm_util._ORMJoin(
                towrap,
                clauses.aliased_class,
                onclause,
                isouter=not innerjoin
                or query_entity.entity_zero.represents_outer_join
                or (chained_from_outerjoin and isinstance(towrap, sql.Join)),
                _left_memo=self.parent,
                _right_memo=self.mapper,
            )
        else:
            # all other cases are innerjoin=='nested' approach
            eagerjoin = self._splice_nested_inner_join(
                path, towrap, clauses, onclause
            )

        context.eager_joins[query_entity_key] = eagerjoin

        # send a hint to the Query as to where it may "splice" this join
        eagerjoin.stop_on = query_entity.selectable

        if not parentmapper:
            # for parentclause that is the non-eager end of the join,
            # ensure all the parent cols in the primaryjoin are actually
            # in the
            # columns clause (i.e. are not deferred), so that aliasing applied
            # by the Query propagates those columns outward.
            # This has the effect
            # of "undefering" those columns.
            for col in sql_util._find_columns(
                self.parent_property.primaryjoin
            ):
                if localparent.persist_selectable.c.contains_column(col):
                    if adapter:
                        col = adapter.columns[col]
                    context.primary_columns.append(col)

        if self.parent_property.order_by:
            context.eager_order_by += (
                eagerjoin._target_adapter.copy_and_process
            )(util.to_list(self.parent_property.order_by))

    def _splice_nested_inner_join(
        self, path, join_obj, clauses, onclause, splicing=False
    ):

        if splicing is False:
            # first call is always handed a join object
            # from the outside
            assert isinstance(join_obj, orm_util._ORMJoin)
        elif isinstance(join_obj, sql.selectable.FromGrouping):
            return self._splice_nested_inner_join(
                path, join_obj.element, clauses, onclause, splicing
            )
        elif not isinstance(join_obj, orm_util._ORMJoin):
            if path[-2] is splicing:
                return orm_util._ORMJoin(
                    join_obj,
                    clauses.aliased_class,
                    onclause,
                    isouter=False,
                    _left_memo=splicing,
                    _right_memo=path[-1].mapper,
                )
            else:
                # only here if splicing == True
                return None

        target_join = self._splice_nested_inner_join(
            path, join_obj.right, clauses, onclause, join_obj._right_memo
        )
        if target_join is None:
            right_splice = False
            target_join = self._splice_nested_inner_join(
                path, join_obj.left, clauses, onclause, join_obj._left_memo
            )
            if target_join is None:
                # should only return None when recursively called,
                # e.g. splicing==True
                assert (
                    splicing is not False
                ), "assertion failed attempting to produce joined eager loads"
                return None
        else:
            right_splice = True

        if right_splice:
            # for a right splice, attempt to flatten out
            # a JOIN b JOIN c JOIN .. to avoid needless
            # parenthesis nesting
            if not join_obj.isouter and not target_join.isouter:
                eagerjoin = join_obj._splice_into_center(target_join)
            else:
                eagerjoin = orm_util._ORMJoin(
                    join_obj.left,
                    target_join,
                    join_obj.onclause,
                    isouter=join_obj.isouter,
                    _left_memo=join_obj._left_memo,
                )
        else:
            eagerjoin = orm_util._ORMJoin(
                target_join,
                join_obj.right,
                join_obj.onclause,
                isouter=join_obj.isouter,
                _right_memo=join_obj._right_memo,
            )

        eagerjoin._target_adapter = target_join._target_adapter
        return eagerjoin

    def _create_eager_adapter(self, context, result, adapter, path, loadopt):
        user_defined_adapter = (
            self._init_user_defined_eager_proc(loadopt, context)
            if loadopt
            else False
        )

        if user_defined_adapter is not False:
            decorator = user_defined_adapter
            # user defined eagerloads are part of the "primary"
            # portion of the load.
            # the adapters applied to the Query should be honored.
            if context.adapter and decorator:
                decorator = decorator.wrap(context.adapter)
            elif context.adapter:
                decorator = context.adapter
        else:
            decorator = path.get(context.attributes, "eager_row_processor")
            if decorator is None:
                return False

        if self.mapper._result_has_identity_key(result, decorator):
            return decorator
        else:
            # no identity key - don't return a row
            # processor, will cause a degrade to lazy
            return False

    def create_row_processor(
        self, context, path, loadopt, mapper, result, adapter, populators
    ):
        if not self.parent.class_manager[self.key].impl.supports_population:
            raise sa_exc.InvalidRequestError(
                "'%s' does not support object "
                "population - eager loading cannot be applied." % self
            )

        our_path = path[self.parent_property]

        eager_adapter = self._create_eager_adapter(
            context, result, adapter, our_path, loadopt
        )

        if eager_adapter is not False:
            key = self.key

            _instance = loading._instance_processor(
                self.mapper,
                context,
                result,
                our_path[self.entity],
                eager_adapter,
            )

            if not self.uselist:
                self._create_scalar_loader(context, key, _instance, populators)
            else:
                self._create_collection_loader(
                    context, key, _instance, populators
                )
        else:
            self.parent_property._get_strategy(
                (("lazy", "select"),)
            ).create_row_processor(
                context, path, loadopt, mapper, result, adapter, populators
            )

    def _create_collection_loader(self, context, key, _instance, populators):
        def load_collection_from_joined_new_row(state, dict_, row):
            collection = attributes.init_state_collection(state, dict_, key)
            result_list = util.UniqueAppender(
                collection, "append_without_event"
            )
            context.attributes[(state, key)] = result_list
            inst = _instance(row)
            if inst is not None:
                result_list.append(inst)

        def load_collection_from_joined_existing_row(state, dict_, row):
            if (state, key) in context.attributes:
                result_list = context.attributes[(state, key)]
            else:
                # appender_key can be absent from context.attributes
                # with isnew=False when self-referential eager loading
                # is used; the same instance may be present in two
                # distinct sets of result columns
                collection = attributes.init_state_collection(
                    state, dict_, key
                )
                result_list = util.UniqueAppender(
                    collection, "append_without_event"
                )
                context.attributes[(state, key)] = result_list
            inst = _instance(row)
            if inst is not None:
                result_list.append(inst)

        def load_collection_from_joined_exec(state, dict_, row):
            _instance(row)

        populators["new"].append(
            (self.key, load_collection_from_joined_new_row)
        )
        populators["existing"].append(
            (self.key, load_collection_from_joined_existing_row)
        )
        if context.invoke_all_eagers:
            populators["eager"].append(
                (self.key, load_collection_from_joined_exec)
            )

    def _create_scalar_loader(self, context, key, _instance, populators):
        def load_scalar_from_joined_new_row(state, dict_, row):
            # set a scalar object instance directly on the parent
            # object, bypassing InstrumentedAttribute event handlers.
            dict_[key] = _instance(row)

        def load_scalar_from_joined_existing_row(state, dict_, row):
            # call _instance on the row, even though the object has
            # been created, so that we further descend into properties
            existing = _instance(row)

            # conflicting value already loaded, this shouldn't happen
            if key in dict_:
                if existing is not dict_[key]:
                    util.warn(
                        "Multiple rows returned with "
                        "uselist=False for eagerly-loaded attribute '%s' "
                        % self
                    )
            else:
                # this case is when one row has multiple loads of the
                # same entity (e.g. via aliasing), one has an attribute
                # that the other doesn't.
                dict_[key] = existing

        def load_scalar_from_joined_exec(state, dict_, row):
            _instance(row)

        populators["new"].append((self.key, load_scalar_from_joined_new_row))
        populators["existing"].append(
            (self.key, load_scalar_from_joined_existing_row)
        )
        if context.invoke_all_eagers:
            populators["eager"].append(
                (self.key, load_scalar_from_joined_exec)
            )


@log.class_logger
@properties.RelationshipProperty.strategy_for(lazy="selectin")
class SelectInLoader(AbstractRelationshipLoader, util.MemoizedSlots):
    __slots__ = (
        "join_depth",
        "omit_join",
        "_parent_alias",
        "_query_info",
        "_fallback_query_info",
        "_bakery",
    )

    query_info = collections.namedtuple(
        "queryinfo",
        [
            "load_only_child",
            "load_with_join",
            "in_expr",
            "pk_cols",
            "zero_idx",
            "child_lookup_cols",
        ],
    )

    _chunksize = 500

    def __init__(self, parent, strategy_key):
        super(SelectInLoader, self).__init__(parent, strategy_key)
        self.join_depth = self.parent_property.join_depth
        is_m2o = self.parent_property.direction is interfaces.MANYTOONE

        if self.parent_property.omit_join is not None:
            self.omit_join = self.parent_property.omit_join
        else:
            lazyloader = self.parent_property._get_strategy(
                (("lazy", "select"),)
            )
            if is_m2o:
                self.omit_join = lazyloader.use_get
            else:
                self.omit_join = self.parent._get_clause[0].compare(
                    lazyloader._rev_lazywhere,
                    use_proxies=True,
                    equivalents=self.parent._equivalent_columns,
                )

        if self.omit_join:
            if is_m2o:
                self._query_info = self._init_for_omit_join_m2o()
                self._fallback_query_info = self._init_for_join()
            else:
                self._query_info = self._init_for_omit_join()
        else:
            self._query_info = self._init_for_join()

    def _init_for_omit_join(self):
        pk_to_fk = dict(
            self.parent_property._join_condition.local_remote_pairs
        )
        pk_to_fk.update(
            (equiv, pk_to_fk[k])
            for k in list(pk_to_fk)
            for equiv in self.parent._equivalent_columns.get(k, ())
        )

        pk_cols = fk_cols = [
            pk_to_fk[col] for col in self.parent.primary_key if col in pk_to_fk
        ]
        if len(fk_cols) > 1:
            in_expr = sql.tuple_(*fk_cols)
            zero_idx = False
        else:
            in_expr = fk_cols[0]
            zero_idx = True

        return self.query_info(False, False, in_expr, pk_cols, zero_idx, None)

    def _init_for_omit_join_m2o(self):
        pk_cols = self.mapper.primary_key
        if len(pk_cols) > 1:
            in_expr = sql.tuple_(*pk_cols)
            zero_idx = False
        else:
            in_expr = pk_cols[0]
            zero_idx = True

        lazyloader = self.parent_property._get_strategy((("lazy", "select"),))
        lookup_cols = [lazyloader._equated_columns[pk] for pk in pk_cols]

        return self.query_info(
            True, False, in_expr, pk_cols, zero_idx, lookup_cols
        )

    def _init_for_join(self):
        self._parent_alias = aliased(self.parent.class_)
        pa_insp = inspect(self._parent_alias)
        pk_cols = [
            pa_insp._adapt_element(col) for col in self.parent.primary_key
        ]
        if len(pk_cols) > 1:
            in_expr = sql.tuple_(*pk_cols)
            zero_idx = False
        else:
            in_expr = pk_cols[0]
            zero_idx = True
        return self.query_info(False, True, in_expr, pk_cols, zero_idx, None)

    def init_class_attribute(self, mapper):
        self.parent_property._get_strategy(
            (("lazy", "select"),)
        ).init_class_attribute(mapper)

    @util.dependencies("sqlalchemy.ext.baked")
    def _memoized_attr__bakery(self, baked):
        return baked.bakery(size=50)

    def create_row_processor(
        self, context, path, loadopt, mapper, result, adapter, populators
    ):
        if not self.parent.class_manager[self.key].impl.supports_population:
            raise sa_exc.InvalidRequestError(
                "'%s' does not support object "
                "population - eager loading cannot be applied." % self
            )

        selectin_path = (
            context.query._current_path or orm_util.PathRegistry.root
        ) + path

        if not orm_util._entity_isa(path[-1], self.parent):
            return

        if loading.PostLoad.path_exists(
            context, selectin_path, self.parent_property
        ):
            return

        path_w_prop = path[self.parent_property]
        selectin_path_w_prop = selectin_path[self.parent_property]

        # build up a path indicating the path from the leftmost
        # entity to the thing we're subquery loading.
        with_poly_entity = path_w_prop.get(
            context.attributes, "path_with_polymorphic", None
        )

        if with_poly_entity is not None:
            effective_entity = with_poly_entity
        else:
            effective_entity = self.entity

        if not path_w_prop.contains(context.attributes, "loader"):
            if self.join_depth:
                if selectin_path_w_prop.length / 2 > self.join_depth:
                    return
            elif selectin_path_w_prop.contains_mapper(self.mapper):
                return

        loading.PostLoad.callable_for_path(
            context,
            selectin_path,
            self.parent,
            self.parent_property,
            self._load_for_path,
            effective_entity,
        )

    @util.dependencies("sqlalchemy.ext.baked")
    def _load_for_path(
        self, baked, context, path, states, load_only, effective_entity
    ):

        if load_only and self.key not in load_only:
            return

        query_info = self._query_info

        if query_info.load_only_child:
            our_states = collections.defaultdict(list)
            none_states = []

            mapper = self.parent

            for state, overwrite in states:
                state_dict = state.dict
                related_ident = tuple(
                    mapper._get_state_attr_by_column(
                        state,
                        state_dict,
                        lk,
                        passive=attributes.PASSIVE_NO_FETCH,
                    )
                    for lk in query_info.child_lookup_cols
                )
                # if the loaded parent objects do not have the foreign key
                # to the related item loaded, then degrade into the joined
                # version of selectinload
                if attributes.PASSIVE_NO_RESULT in related_ident:
                    query_info = self._fallback_query_info
                    break

                # organize states into lists keyed to particular foreign
                # key values.
                if None not in related_ident:
                    our_states[related_ident].append(
                        (state, state_dict, overwrite)
                    )
                else:
                    # For FK values that have None, add them to a
                    # separate collection that will be populated separately
                    none_states.append((state, state_dict, overwrite))

        # note the above conditional may have changed query_info
        if not query_info.load_only_child:
            our_states = [
                (state.key[1], state, state.dict, overwrite)
                for state, overwrite in states
            ]

        pk_cols = query_info.pk_cols
        in_expr = query_info.in_expr

        if not query_info.load_with_join:
            # in "omit join" mode, the primary key column and the
            # "in" expression are in terms of the related entity.  So
            # if the related entity is polymorphic or otherwise aliased,
            # we need to adapt our "pk_cols" and "in_expr" to that
            # entity.   in non-"omit join" mode, these are against the
            # parent entity and do not need adaption.
            insp = inspect(effective_entity)
            if insp.is_aliased_class:
                pk_cols = [insp._adapt_element(col) for col in pk_cols]
                in_expr = insp._adapt_element(in_expr)
                pk_cols = [insp._adapt_element(col) for col in pk_cols]

        q = self._bakery(
            lambda session: session.query(
                query.Bundle("pk", *pk_cols), effective_entity
            ),
            self,
        )

        if not query_info.load_with_join:
            # the Bundle we have in the "omit_join" case is against raw, non
            # annotated columns, so to ensure the Query knows its primary
            # entity, we add it explicitly.  If we made the Bundle against
            # annotated columns, we hit a performance issue in this specific
            # case, which is detailed in issue #4347.
            q.add_criteria(lambda q: q.select_from(effective_entity))
        else:
            # in the non-omit_join case, the Bundle is against the annotated/
            # mapped column of the parent entity, but the #4347 issue does not
            # occur in this case.
            pa = self._parent_alias
            q.add_criteria(
                lambda q: q.select_from(pa).join(
                    getattr(pa, self.parent_property.key).of_type(
                        effective_entity
                    )
                )
            )

        if query_info.load_only_child:
            q.add_criteria(
                lambda q: q.filter(
                    in_expr.in_(sql.bindparam("primary_keys", expanding=True))
                )
            )
        else:
            q.add_criteria(
                lambda q: q.filter(
                    in_expr.in_(sql.bindparam("primary_keys", expanding=True))
                )
            )

        orig_query = context.query

        q._add_lazyload_options(
            orig_query._with_options, path[self.parent_property]
        )

        if orig_query._populate_existing:
            q.add_criteria(lambda q: q.populate_existing())

        if self.parent_property.order_by:
            if not query_info.load_with_join:
                eager_order_by = self.parent_property.order_by
                if insp.is_aliased_class:
                    eager_order_by = [
                        insp._adapt_element(elem) for elem in eager_order_by
                    ]
                q.add_criteria(lambda q: q.order_by(*eager_order_by))
            else:

                def _setup_outermost_orderby(q):
                    # imitate the same method that subquery eager loading uses,
                    # looking for the adapted "secondary" table
                    eagerjoin = q._from_obj[0]

                    return q.order_by(
                        *eagerjoin._target_adapter.copy_and_process(
                            util.to_list(self.parent_property.order_by)
                        )
                    )

                q.add_criteria(_setup_outermost_orderby)

        if query_info.load_only_child:
            self._load_via_child(
                our_states, none_states, query_info, q, context
            )
        else:
            self._load_via_parent(our_states, query_info, q, context)

    def _load_via_child(self, our_states, none_states, query_info, q, context):
        uselist = self.uselist

        # this sort is really for the benefit of the unit tests
        our_keys = sorted(our_states)
        while our_keys:
            chunk = our_keys[0 : self._chunksize]
            our_keys = our_keys[self._chunksize :]
            data = {
                k: v
                for k, v in q(context.session).params(
                    primary_keys=[
                        key[0] if query_info.zero_idx else key for key in chunk
                    ]
                )
            }

            for key in chunk:
                # for a real foreign key and no concurrent changes to the
                # DB while running this method, "key" is always present in
                # data.  However, for primaryjoins without real foreign keys
                # a non-None primaryjoin condition may still refer to no
                # related object.
                related_obj = data.get(key, None)
                for state, dict_, overwrite in our_states[key]:
                    if not overwrite and self.key in dict_:
                        continue

                    state.get_impl(self.key).set_committed_value(
                        state,
                        dict_,
                        related_obj if not uselist else [related_obj],
                    )
        # populate none states with empty value / collection
        for state, dict_, overwrite in none_states:
            if not overwrite and self.key in dict_:
                continue

            # note it's OK if this is a uselist=True attribute, the empty
            # collection will be populated
            state.get_impl(self.key).set_committed_value(state, dict_, None)

    def _load_via_parent(self, our_states, query_info, q, context):
        uselist = self.uselist
        _empty_result = () if uselist else None

        while our_states:
            chunk = our_states[0 : self._chunksize]
            our_states = our_states[self._chunksize :]

            primary_keys = [
                key[0] if query_info.zero_idx else key
                for key, state, state_dict, overwrite in chunk
            ]

            data = collections.defaultdict(list)
            for k, v in itertools.groupby(
                q(context.session).params(primary_keys=primary_keys),
                lambda x: x[0],
            ):
                data[k].extend(vv[1] for vv in v)

            for key, state, state_dict, overwrite in chunk:

                if not overwrite and self.key in state_dict:
                    continue

                collection = data.get(key, _empty_result)

                if not uselist and collection:
                    if len(collection) > 1:
                        util.warn(
                            "Multiple rows returned with "
                            "uselist=False for eagerly-loaded "
                            "attribute '%s' " % self
                        )
                    state.get_impl(self.key).set_committed_value(
                        state, state_dict, collection[0]
                    )
                else:
                    # note that empty tuple set on uselist=False sets the
                    # value to None
                    state.get_impl(self.key).set_committed_value(
                        state, state_dict, collection
                    )


def single_parent_validator(desc, prop):
    def _do_check(state, value, oldvalue, initiator):
        if value is not None and initiator.key == prop.key:
            hasparent = initiator.hasparent(attributes.instance_state(value))
            if hasparent and oldvalue is not value:
                raise sa_exc.InvalidRequestError(
                    "Instance %s is already associated with an instance "
                    "of %s via its %s attribute, and is only allowed a "
                    "single parent."
                    % (orm_util.instance_str(value), state.class_, prop),
                    code="bbf1",
                )
        return value

    def append(state, value, initiator):
        return _do_check(state, value, None, initiator)

    def set_(state, value, oldvalue, initiator):
        return _do_check(state, value, oldvalue, initiator)

    event.listen(
        desc, "append", append, raw=True, retval=True, active_history=True
    )
    event.listen(desc, "set", set_, raw=True, retval=True, active_history=True)

Youez - 2016 - github.com/yon3zu
LinuXploit