Failed to save the file to the "xx" directory.

Failed to save the file to the "ll" directory.

Failed to save the file to the "mm" directory.

Failed to save the file to the "wp" directory.

403WebShell
403Webshell
Server IP : 66.29.132.124  /  Your IP : 3.133.127.131
Web Server : LiteSpeed
System : Linux business141.web-hosting.com 4.18.0-553.lve.el8.x86_64 #1 SMP Mon May 27 15:27:34 UTC 2024 x86_64
User : wavevlvu ( 1524)
PHP Version : 7.4.33
Disable Function : NONE
MySQL : OFF  |  cURL : ON  |  WGET : ON  |  Perl : ON  |  Python : ON  |  Sudo : OFF  |  Pkexec : OFF
Directory :  /opt/cloudlinux/venv/lib/python3.11/site-packages/alembic/testing/suite/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ Back ]     

Current File : /opt/cloudlinux/venv/lib/python3.11/site-packages/alembic/testing/suite/test_environment.py
import io

from ...migration import MigrationContext
from ...testing import assert_raises
from ...testing import config
from ...testing import eq_
from ...testing import is_
from ...testing import is_false
from ...testing import is_not_
from ...testing import is_true
from ...testing import ne_
from ...testing.fixtures import TestBase


class MigrationTransactionTest(TestBase):
    __backend__ = True

    conn = None

    def _fixture(self, opts):
        self.conn = conn = config.db.connect()

        if opts.get("as_sql", False):
            self.context = MigrationContext.configure(
                dialect=conn.dialect, opts=opts
            )
            self.context.output_buffer = (
                self.context.impl.output_buffer
            ) = io.StringIO()
        else:
            self.context = MigrationContext.configure(
                connection=conn, opts=opts
            )
        return self.context

    def teardown_method(self):
        if self.conn:
            self.conn.close()

    def test_proxy_transaction_rollback(self):
        context = self._fixture(
            {"transaction_per_migration": True, "transactional_ddl": True}
        )

        is_false(self.conn.in_transaction())
        proxy = context.begin_transaction(_per_migration=True)
        is_true(self.conn.in_transaction())
        proxy.rollback()
        is_false(self.conn.in_transaction())

    def test_proxy_transaction_commit(self):
        context = self._fixture(
            {"transaction_per_migration": True, "transactional_ddl": True}
        )
        proxy = context.begin_transaction(_per_migration=True)
        is_true(self.conn.in_transaction())
        proxy.commit()
        is_false(self.conn.in_transaction())

    def test_proxy_transaction_contextmanager_commit(self):
        context = self._fixture(
            {"transaction_per_migration": True, "transactional_ddl": True}
        )
        proxy = context.begin_transaction(_per_migration=True)
        is_true(self.conn.in_transaction())
        with proxy:
            pass
        is_false(self.conn.in_transaction())

    def test_proxy_transaction_contextmanager_rollback(self):
        context = self._fixture(
            {"transaction_per_migration": True, "transactional_ddl": True}
        )
        proxy = context.begin_transaction(_per_migration=True)
        is_true(self.conn.in_transaction())

        def go():
            with proxy:
                raise Exception("hi")

        assert_raises(Exception, go)
        is_false(self.conn.in_transaction())

    def test_proxy_transaction_contextmanager_explicit_rollback(self):
        context = self._fixture(
            {"transaction_per_migration": True, "transactional_ddl": True}
        )
        proxy = context.begin_transaction(_per_migration=True)
        is_true(self.conn.in_transaction())

        with proxy:
            is_true(self.conn.in_transaction())
            proxy.rollback()
            is_false(self.conn.in_transaction())

        is_false(self.conn.in_transaction())

    def test_proxy_transaction_contextmanager_explicit_commit(self):
        context = self._fixture(
            {"transaction_per_migration": True, "transactional_ddl": True}
        )
        proxy = context.begin_transaction(_per_migration=True)
        is_true(self.conn.in_transaction())

        with proxy:
            is_true(self.conn.in_transaction())
            proxy.commit()
            is_false(self.conn.in_transaction())

        is_false(self.conn.in_transaction())

    def test_transaction_per_migration_transactional_ddl(self):
        context = self._fixture(
            {"transaction_per_migration": True, "transactional_ddl": True}
        )

        is_false(self.conn.in_transaction())

        with context.begin_transaction():
            is_false(self.conn.in_transaction())
            with context.begin_transaction(_per_migration=True):
                is_true(self.conn.in_transaction())

            is_false(self.conn.in_transaction())
        is_false(self.conn.in_transaction())

    def test_transaction_per_migration_non_transactional_ddl(self):
        context = self._fixture(
            {"transaction_per_migration": True, "transactional_ddl": False}
        )

        is_false(self.conn.in_transaction())

        with context.begin_transaction():
            is_false(self.conn.in_transaction())
            with context.begin_transaction(_per_migration=True):
                is_true(self.conn.in_transaction())

            is_false(self.conn.in_transaction())
        is_false(self.conn.in_transaction())

    def test_transaction_per_all_transactional_ddl(self):
        context = self._fixture({"transactional_ddl": True})

        is_false(self.conn.in_transaction())

        with context.begin_transaction():
            is_true(self.conn.in_transaction())
            with context.begin_transaction(_per_migration=True):
                is_true(self.conn.in_transaction())

            is_true(self.conn.in_transaction())
        is_false(self.conn.in_transaction())

    def test_transaction_per_all_non_transactional_ddl(self):
        context = self._fixture({"transactional_ddl": False})

        is_false(self.conn.in_transaction())

        with context.begin_transaction():
            is_false(self.conn.in_transaction())
            with context.begin_transaction(_per_migration=True):
                is_true(self.conn.in_transaction())

            is_false(self.conn.in_transaction())
        is_false(self.conn.in_transaction())

    def test_transaction_per_all_sqlmode(self):
        context = self._fixture({"as_sql": True})

        context.execute("step 1")
        with context.begin_transaction():
            context.execute("step 2")
            with context.begin_transaction(_per_migration=True):
                context.execute("step 3")

            context.execute("step 4")
        context.execute("step 5")

        if context.impl.transactional_ddl:
            self._assert_impl_steps(
                "step 1",
                "BEGIN",
                "step 2",
                "step 3",
                "step 4",
                "COMMIT",
                "step 5",
            )
        else:
            self._assert_impl_steps(
                "step 1", "step 2", "step 3", "step 4", "step 5"
            )

    def test_transaction_per_migration_sqlmode(self):
        context = self._fixture(
            {"as_sql": True, "transaction_per_migration": True}
        )

        context.execute("step 1")
        with context.begin_transaction():
            context.execute("step 2")
            with context.begin_transaction(_per_migration=True):
                context.execute("step 3")

            context.execute("step 4")
        context.execute("step 5")

        if context.impl.transactional_ddl:
            self._assert_impl_steps(
                "step 1",
                "step 2",
                "BEGIN",
                "step 3",
                "COMMIT",
                "step 4",
                "step 5",
            )
        else:
            self._assert_impl_steps(
                "step 1", "step 2", "step 3", "step 4", "step 5"
            )

    @config.requirements.autocommit_isolation
    def test_autocommit_block(self):
        context = self._fixture({"transaction_per_migration": True})

        is_false(self.conn.in_transaction())

        with context.begin_transaction():
            is_false(self.conn.in_transaction())
            with context.begin_transaction(_per_migration=True):
                is_true(self.conn.in_transaction())

                with context.autocommit_block():
                    # in 1.x, self.conn is separate due to the
                    # execution_options call.  however for future they are the
                    # same connection and there is a "transaction" block
                    # despite autocommit
                    if self.is_sqlalchemy_future:
                        is_(context.connection, self.conn)
                    else:
                        is_not_(context.connection, self.conn)
                        is_false(self.conn.in_transaction())

                    eq_(
                        context.connection._execution_options[
                            "isolation_level"
                        ],
                        "AUTOCOMMIT",
                    )

                ne_(
                    context.connection._execution_options.get(
                        "isolation_level", None
                    ),
                    "AUTOCOMMIT",
                )
                is_true(self.conn.in_transaction())

            is_false(self.conn.in_transaction())
        is_false(self.conn.in_transaction())

    @config.requirements.autocommit_isolation
    def test_autocommit_block_no_transaction(self):
        context = self._fixture({"transaction_per_migration": True})

        is_false(self.conn.in_transaction())

        with context.autocommit_block():
            is_true(context.connection.in_transaction())

            # in 1.x, self.conn is separate due to the execution_options
            # call.  however for future they are the same connection and there
            # is a "transaction" block despite autocommit
            if self.is_sqlalchemy_future:
                is_(context.connection, self.conn)
            else:
                is_not_(context.connection, self.conn)
                is_false(self.conn.in_transaction())

            eq_(
                context.connection._execution_options["isolation_level"],
                "AUTOCOMMIT",
            )

        ne_(
            context.connection._execution_options.get("isolation_level", None),
            "AUTOCOMMIT",
        )

        is_false(self.conn.in_transaction())

    def test_autocommit_block_transactional_ddl_sqlmode(self):
        context = self._fixture(
            {
                "transaction_per_migration": True,
                "transactional_ddl": True,
                "as_sql": True,
            }
        )

        with context.begin_transaction():
            context.execute("step 1")
            with context.begin_transaction(_per_migration=True):
                context.execute("step 2")

                with context.autocommit_block():
                    context.execute("step 3")

                context.execute("step 4")

            context.execute("step 5")

        self._assert_impl_steps(
            "step 1",
            "BEGIN",
            "step 2",
            "COMMIT",
            "step 3",
            "BEGIN",
            "step 4",
            "COMMIT",
            "step 5",
        )

    def test_autocommit_block_nontransactional_ddl_sqlmode(self):
        context = self._fixture(
            {
                "transaction_per_migration": True,
                "transactional_ddl": False,
                "as_sql": True,
            }
        )

        with context.begin_transaction():
            context.execute("step 1")
            with context.begin_transaction(_per_migration=True):
                context.execute("step 2")

                with context.autocommit_block():
                    context.execute("step 3")

                context.execute("step 4")

            context.execute("step 5")

        self._assert_impl_steps(
            "step 1", "step 2", "step 3", "step 4", "step 5"
        )

    def _assert_impl_steps(self, *steps):
        to_check = self.context.output_buffer.getvalue()

        self.context.impl.output_buffer = buf = io.StringIO()
        for step in steps:
            if step == "BEGIN":
                self.context.impl.emit_begin()
            elif step == "COMMIT":
                self.context.impl.emit_commit()
            else:
                self.context.impl._exec(step)

        eq_(to_check, buf.getvalue())

Youez - 2016 - github.com/yon3zu
LinuXploit