Failed to save the file to the "xx" directory.

Failed to save the file to the "ll" directory.

Failed to save the file to the "mm" directory.

Failed to save the file to the "wp" directory.

403WebShell
403Webshell
Server IP : 66.29.132.124  /  Your IP : 18.119.116.125
Web Server : LiteSpeed
System : Linux business141.web-hosting.com 4.18.0-553.lve.el8.x86_64 #1 SMP Mon May 27 15:27:34 UTC 2024 x86_64
User : wavevlvu ( 1524)
PHP Version : 7.4.33
Disable Function : NONE
MySQL : OFF  |  cURL : ON  |  WGET : ON  |  Perl : ON  |  Python : ON  |  Sudo : OFF  |  Pkexec : OFF
Directory :  /opt/hc_python/lib64/python3.8/site-packages/sqlalchemy/util/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ Back ]     

Current File : /opt/hc_python/lib64/python3.8/site-packages/sqlalchemy/util//concurrency.py
# util/concurrency.py
# Copyright (C) 2005-2024 the SQLAlchemy authors and contributors
# <see AUTHORS file>
#
# This module is part of SQLAlchemy and is released under
# the MIT License: https://www.opensource.org/licenses/mit-license.php
# mypy: allow-untyped-defs, allow-untyped-calls

from __future__ import annotations

import asyncio  # noqa
import typing
from typing import Any
from typing import Callable
from typing import Coroutine
from typing import TypeVar

have_greenlet = False
greenlet_error = None
try:
    import greenlet  # type: ignore[import-untyped,unused-ignore]  # noqa: F401,E501
except ImportError as e:
    greenlet_error = str(e)
    pass
else:
    have_greenlet = True
    from ._concurrency_py3k import await_only as await_only
    from ._concurrency_py3k import await_fallback as await_fallback
    from ._concurrency_py3k import in_greenlet as in_greenlet
    from ._concurrency_py3k import greenlet_spawn as greenlet_spawn
    from ._concurrency_py3k import is_exit_exception as is_exit_exception
    from ._concurrency_py3k import AsyncAdaptedLock as AsyncAdaptedLock
    from ._concurrency_py3k import _Runner

_T = TypeVar("_T")


class _AsyncUtil:
    """Asyncio util for test suite/ util only"""

    def __init__(self) -> None:
        if have_greenlet:
            self.runner = _Runner()

    def run(
        self,
        fn: Callable[..., Coroutine[Any, Any, _T]],
        *args: Any,
        **kwargs: Any,
    ) -> _T:
        """Run coroutine on the loop"""
        return self.runner.run(fn(*args, **kwargs))

    def run_in_greenlet(
        self, fn: Callable[..., _T], *args: Any, **kwargs: Any
    ) -> _T:
        """Run sync function in greenlet. Support nested calls"""
        if have_greenlet:
            if self.runner.get_loop().is_running():
                return fn(*args, **kwargs)
            else:
                return self.runner.run(greenlet_spawn(fn, *args, **kwargs))
        else:
            return fn(*args, **kwargs)

    def close(self) -> None:
        if have_greenlet:
            self.runner.close()


if not typing.TYPE_CHECKING and not have_greenlet:

    def _not_implemented():
        # this conditional is to prevent pylance from considering
        # greenlet_spawn() etc as "no return" and dimming out code below it
        if have_greenlet:
            return None

        raise ValueError(
            "the greenlet library is required to use this function."
            " %s" % greenlet_error
            if greenlet_error
            else ""
        )

    def is_exit_exception(e):  # noqa: F811
        return not isinstance(e, Exception)

    def await_only(thing):  # type: ignore  # noqa: F811
        _not_implemented()

    def await_fallback(thing):  # type: ignore  # noqa: F811
        return thing

    def in_greenlet():  # type: ignore  # noqa: F811
        _not_implemented()

    def greenlet_spawn(fn, *args, **kw):  # type: ignore  # noqa: F811
        _not_implemented()

    def AsyncAdaptedLock(*args, **kw):  # type: ignore  # noqa: F811
        _not_implemented()

    def _util_async_run(fn, *arg, **kw):  # type: ignore  # noqa: F811
        return fn(*arg, **kw)

    def _util_async_run_coroutine_function(fn, *arg, **kw):  # type: ignore  # noqa: F811,E501
        _not_implemented()

Youez - 2016 - github.com/yon3zu
LinuXploit