403Webshell
Server IP : 66.29.132.124  /  Your IP : 3.15.182.217
Web Server : LiteSpeed
System : Linux business141.web-hosting.com 4.18.0-553.lve.el8.x86_64 #1 SMP Mon May 27 15:27:34 UTC 2024 x86_64
User : wavevlvu ( 1524)
PHP Version : 7.4.33
Disable Function : NONE
MySQL : OFF  |  cURL : ON  |  WGET : ON  |  Perl : ON  |  Python : ON  |  Sudo : OFF  |  Pkexec : OFF
Directory :  /opt/hc_python/lib64/python3.8/site-packages/sentry_sdk/integrations/django/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ Back ]     

Current File : /opt/hc_python/lib64/python3.8/site-packages/sentry_sdk/integrations/django//caching.py
import functools
from typing import TYPE_CHECKING
from sentry_sdk.integrations.redis.utils import _get_safe_key, _key_as_string
from urllib3.util import parse_url as urlparse

from django import VERSION as DJANGO_VERSION
from django.core.cache import CacheHandler

import sentry_sdk
from sentry_sdk.consts import OP, SPANDATA
from sentry_sdk.utils import (
    capture_internal_exceptions,
    ensure_integration_enabled,
)


if TYPE_CHECKING:
    from typing import Any
    from typing import Callable
    from typing import Optional


METHODS_TO_INSTRUMENT = [
    "set",
    "set_many",
    "get",
    "get_many",
]


def _get_span_description(method_name, args, kwargs):
    # type: (str, tuple[Any], dict[str, Any]) -> str
    return _key_as_string(_get_safe_key(method_name, args, kwargs))


def _patch_cache_method(cache, method_name, address, port):
    # type: (CacheHandler, str, Optional[str], Optional[int]) -> None
    from sentry_sdk.integrations.django import DjangoIntegration

    original_method = getattr(cache, method_name)

    @ensure_integration_enabled(DjangoIntegration, original_method)
    def _instrument_call(
        cache, method_name, original_method, args, kwargs, address, port
    ):
        # type: (CacheHandler, str, Callable[..., Any], tuple[Any, ...], dict[str, Any], Optional[str], Optional[int]) -> Any
        is_set_operation = method_name.startswith("set")
        is_get_operation = not is_set_operation

        op = OP.CACHE_PUT if is_set_operation else OP.CACHE_GET
        description = _get_span_description(method_name, args, kwargs)

        with sentry_sdk.start_span(
            op=op,
            name=description,
            origin=DjangoIntegration.origin,
        ) as span:
            value = original_method(*args, **kwargs)

            with capture_internal_exceptions():
                if address is not None:
                    span.set_data(SPANDATA.NETWORK_PEER_ADDRESS, address)

                if port is not None:
                    span.set_data(SPANDATA.NETWORK_PEER_PORT, port)

                key = _get_safe_key(method_name, args, kwargs)
                if key is not None:
                    span.set_data(SPANDATA.CACHE_KEY, key)

                item_size = None
                if is_get_operation:
                    if value:
                        item_size = len(str(value))
                        span.set_data(SPANDATA.CACHE_HIT, True)
                    else:
                        span.set_data(SPANDATA.CACHE_HIT, False)
                else:
                    try:
                        # 'set' command
                        item_size = len(str(args[1]))
                    except IndexError:
                        # 'set_many' command
                        item_size = len(str(args[0]))

                if item_size is not None:
                    span.set_data(SPANDATA.CACHE_ITEM_SIZE, item_size)

            return value

    @functools.wraps(original_method)
    def sentry_method(*args, **kwargs):
        # type: (*Any, **Any) -> Any
        return _instrument_call(
            cache, method_name, original_method, args, kwargs, address, port
        )

    setattr(cache, method_name, sentry_method)


def _patch_cache(cache, address=None, port=None):
    # type: (CacheHandler, Optional[str], Optional[int]) -> None
    if not hasattr(cache, "_sentry_patched"):
        for method_name in METHODS_TO_INSTRUMENT:
            _patch_cache_method(cache, method_name, address, port)
        cache._sentry_patched = True


def _get_address_port(settings):
    # type: (dict[str, Any]) -> tuple[Optional[str], Optional[int]]
    location = settings.get("LOCATION")

    # TODO: location can also be an array of locations
    #       see: https://docs.djangoproject.com/en/5.0/topics/cache/#redis
    #       GitHub issue: https://github.com/getsentry/sentry-python/issues/3062
    if not isinstance(location, str):
        return None, None

    if "://" in location:
        parsed_url = urlparse(location)
        # remove the username and password from URL to not leak sensitive data.
        address = "{}://{}{}".format(
            parsed_url.scheme or "",
            parsed_url.hostname or "",
            parsed_url.path or "",
        )
        port = parsed_url.port
    else:
        address = location
        port = None

    return address, int(port) if port is not None else None


def patch_caching():
    # type: () -> None
    from sentry_sdk.integrations.django import DjangoIntegration

    if not hasattr(CacheHandler, "_sentry_patched"):
        if DJANGO_VERSION < (3, 2):
            original_get_item = CacheHandler.__getitem__

            @functools.wraps(original_get_item)
            def sentry_get_item(self, alias):
                # type: (CacheHandler, str) -> Any
                cache = original_get_item(self, alias)

                integration = sentry_sdk.get_client().get_integration(DjangoIntegration)
                if integration is not None and integration.cache_spans:
                    from django.conf import settings

                    address, port = _get_address_port(
                        settings.CACHES[alias or "default"]
                    )

                    _patch_cache(cache, address, port)

                return cache

            CacheHandler.__getitem__ = sentry_get_item
            CacheHandler._sentry_patched = True

        else:
            original_create_connection = CacheHandler.create_connection

            @functools.wraps(original_create_connection)
            def sentry_create_connection(self, alias):
                # type: (CacheHandler, str) -> Any
                cache = original_create_connection(self, alias)

                integration = sentry_sdk.get_client().get_integration(DjangoIntegration)
                if integration is not None and integration.cache_spans:
                    address, port = _get_address_port(self.settings[alias or "default"])

                    _patch_cache(cache, address, port)

                return cache

            CacheHandler.create_connection = sentry_create_connection
            CacheHandler._sentry_patched = True

Youez - 2016 - github.com/yon3zu
LinuXploit