Failed to save the file to the "xx" directory.

Failed to save the file to the "ll" directory.

Failed to save the file to the "mm" directory.

Failed to save the file to the "wp" directory.

403WebShell
403Webshell
Server IP : 66.29.132.124  /  Your IP : 3.128.31.117
Web Server : LiteSpeed
System : Linux business141.web-hosting.com 4.18.0-553.lve.el8.x86_64 #1 SMP Mon May 27 15:27:34 UTC 2024 x86_64
User : wavevlvu ( 1524)
PHP Version : 7.4.33
Disable Function : NONE
MySQL : OFF  |  cURL : ON  |  WGET : ON  |  Perl : ON  |  Python : ON  |  Sudo : OFF  |  Pkexec : OFF
Directory :  /opt/cloudlinux/venv/lib64/python3.11/site-packages/sentry_sdk/integrations/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ Back ]     

Current File : /opt/cloudlinux/venv/lib64/python3.11/site-packages/sentry_sdk/integrations//socket.py
from __future__ import absolute_import

import socket
from sentry_sdk import Hub
from sentry_sdk._types import MYPY
from sentry_sdk.consts import OP
from sentry_sdk.integrations import Integration

if MYPY:
    from socket import AddressFamily, SocketKind
    from typing import Tuple, Optional, Union, List

__all__ = ["SocketIntegration"]


class SocketIntegration(Integration):
    identifier = "socket"

    @staticmethod
    def setup_once():
        # type: () -> None
        """
        patches two of the most used functions of socket: create_connection and getaddrinfo(dns resolver)
        """
        _patch_create_connection()
        _patch_getaddrinfo()


def _get_span_description(host, port):
    # type: (Union[bytes, str, None], Union[str, int, None]) -> str

    try:
        host = host.decode()  # type: ignore
    except (UnicodeDecodeError, AttributeError):
        pass

    description = "%s:%s" % (host, port)  # type: ignore

    return description


def _patch_create_connection():
    # type: () -> None
    real_create_connection = socket.create_connection

    def create_connection(
        address,
        timeout=socket._GLOBAL_DEFAULT_TIMEOUT,  # type: ignore
        source_address=None,
    ):
        # type: (Tuple[Optional[str], int], Optional[float], Optional[Tuple[Union[bytearray, bytes, str], int]])-> socket.socket
        hub = Hub.current
        if hub.get_integration(SocketIntegration) is None:
            return real_create_connection(
                address=address, timeout=timeout, source_address=source_address
            )

        with hub.start_span(
            op=OP.SOCKET_CONNECTION,
            description=_get_span_description(address[0], address[1]),
        ) as span:
            span.set_data("address", address)
            span.set_data("timeout", timeout)
            span.set_data("source_address", source_address)

            return real_create_connection(
                address=address, timeout=timeout, source_address=source_address
            )

    socket.create_connection = create_connection  # type: ignore


def _patch_getaddrinfo():
    # type: () -> None
    real_getaddrinfo = socket.getaddrinfo

    def getaddrinfo(host, port, family=0, type=0, proto=0, flags=0):
        # type: (Union[bytes, str, None], Union[str, int, None], int, int, int, int) -> List[Tuple[AddressFamily, SocketKind, int, str, Union[Tuple[str, int], Tuple[str, int, int, int]]]]
        hub = Hub.current
        if hub.get_integration(SocketIntegration) is None:
            return real_getaddrinfo(host, port, family, type, proto, flags)

        with hub.start_span(
            op=OP.SOCKET_DNS, description=_get_span_description(host, port)
        ) as span:
            span.set_data("host", host)
            span.set_data("port", port)

            return real_getaddrinfo(host, port, family, type, proto, flags)

    socket.getaddrinfo = getaddrinfo  # type: ignore

Youez - 2016 - github.com/yon3zu
LinuXploit