Failed to save the file to the "xx" directory.

Failed to save the file to the "ll" directory.

Failed to save the file to the "mm" directory.

Failed to save the file to the "wp" directory.

403WebShell
403Webshell
Server IP : 66.29.132.124  /  Your IP : 18.219.190.131
Web Server : LiteSpeed
System : Linux business141.web-hosting.com 4.18.0-553.lve.el8.x86_64 #1 SMP Mon May 27 15:27:34 UTC 2024 x86_64
User : wavevlvu ( 1524)
PHP Version : 7.4.33
Disable Function : NONE
MySQL : OFF  |  cURL : ON  |  WGET : ON  |  Perl : ON  |  Python : ON  |  Sudo : OFF  |  Pkexec : OFF
Directory :  /opt/hc_python/lib64/python3.8/site-packages/sentry_sdk/integrations/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ Back ]     

Current File : /opt/hc_python/lib64/python3.8/site-packages/sentry_sdk/integrations//clickhouse_driver.py
import sentry_sdk
from sentry_sdk.consts import OP, SPANDATA
from sentry_sdk.integrations import Integration, DidNotEnable
from sentry_sdk.tracing import Span
from sentry_sdk.scope import should_send_default_pii
from sentry_sdk.utils import capture_internal_exceptions, ensure_integration_enabled

from typing import TYPE_CHECKING, TypeVar

# Hack to get new Python features working in older versions
# without introducing a hard dependency on `typing_extensions`
# from: https://stackoverflow.com/a/71944042/300572
if TYPE_CHECKING:
    from typing import ParamSpec, Callable
else:
    # Fake ParamSpec
    class ParamSpec:
        def __init__(self, _):
            self.args = None
            self.kwargs = None

    # Callable[anything] will return None
    class _Callable:
        def __getitem__(self, _):
            return None

    # Make instances
    Callable = _Callable()


try:
    import clickhouse_driver  # type: ignore[import-not-found]

except ImportError:
    raise DidNotEnable("clickhouse-driver not installed.")

if clickhouse_driver.VERSION < (0, 2, 0):
    raise DidNotEnable("clickhouse-driver >= 0.2.0 required")


class ClickhouseDriverIntegration(Integration):
    identifier = "clickhouse_driver"
    origin = f"auto.db.{identifier}"

    @staticmethod
    def setup_once() -> None:
        # Every query is done using the Connection's `send_query` function
        clickhouse_driver.connection.Connection.send_query = _wrap_start(
            clickhouse_driver.connection.Connection.send_query
        )

        # If the query contains parameters then the send_data function is used to send those parameters to clickhouse
        clickhouse_driver.client.Client.send_data = _wrap_send_data(
            clickhouse_driver.client.Client.send_data
        )

        # Every query ends either with the Client's `receive_end_of_query` (no result expected)
        # or its `receive_result` (result expected)
        clickhouse_driver.client.Client.receive_end_of_query = _wrap_end(
            clickhouse_driver.client.Client.receive_end_of_query
        )
        if hasattr(clickhouse_driver.client.Client, "receive_end_of_insert_query"):
            # In 0.2.7, insert queries are handled separately via `receive_end_of_insert_query`
            clickhouse_driver.client.Client.receive_end_of_insert_query = _wrap_end(
                clickhouse_driver.client.Client.receive_end_of_insert_query
            )
        clickhouse_driver.client.Client.receive_result = _wrap_end(
            clickhouse_driver.client.Client.receive_result
        )


P = ParamSpec("P")
T = TypeVar("T")


def _wrap_start(f: Callable[P, T]) -> Callable[P, T]:
    @ensure_integration_enabled(ClickhouseDriverIntegration, f)
    def _inner(*args: P.args, **kwargs: P.kwargs) -> T:
        connection = args[0]
        query = args[1]
        query_id = args[2] if len(args) > 2 else kwargs.get("query_id")
        params = args[3] if len(args) > 3 else kwargs.get("params")

        span = sentry_sdk.start_span(
            op=OP.DB,
            name=query,
            origin=ClickhouseDriverIntegration.origin,
        )

        connection._sentry_span = span  # type: ignore[attr-defined]

        _set_db_data(span, connection)

        span.set_data("query", query)

        if query_id:
            span.set_data("db.query_id", query_id)

        if params and should_send_default_pii():
            span.set_data("db.params", params)

        # run the original code
        ret = f(*args, **kwargs)

        return ret

    return _inner


def _wrap_end(f: Callable[P, T]) -> Callable[P, T]:
    def _inner_end(*args: P.args, **kwargs: P.kwargs) -> T:
        res = f(*args, **kwargs)
        instance = args[0]
        span = getattr(instance.connection, "_sentry_span", None)  # type: ignore[attr-defined]

        if span is not None:
            if res is not None and should_send_default_pii():
                span.set_data("db.result", res)

            with capture_internal_exceptions():
                span.scope.add_breadcrumb(
                    message=span._data.pop("query"), category="query", data=span._data
                )

            span.finish()

        return res

    return _inner_end


def _wrap_send_data(f: Callable[P, T]) -> Callable[P, T]:
    def _inner_send_data(*args: P.args, **kwargs: P.kwargs) -> T:
        instance = args[0]  # type: clickhouse_driver.client.Client
        data = args[2]
        span = getattr(instance.connection, "_sentry_span", None)

        if span is not None:
            _set_db_data(span, instance.connection)

            if should_send_default_pii():
                db_params = span._data.get("db.params", [])
                db_params.extend(data)
                span.set_data("db.params", db_params)

        return f(*args, **kwargs)

    return _inner_send_data


def _set_db_data(
    span: Span, connection: clickhouse_driver.connection.Connection
) -> None:
    span.set_data(SPANDATA.DB_SYSTEM, "clickhouse")
    span.set_data(SPANDATA.SERVER_ADDRESS, connection.host)
    span.set_data(SPANDATA.SERVER_PORT, connection.port)
    span.set_data(SPANDATA.DB_NAME, connection.database)
    span.set_data(SPANDATA.DB_USER, connection.user)

Youez - 2016 - github.com/yon3zu
LinuXploit