Failed to save the file to the "xx" directory.

Failed to save the file to the "ll" directory.

Failed to save the file to the "mm" directory.

Failed to save the file to the "wp" directory.

403WebShell
403Webshell
Server IP : 66.29.132.124  /  Your IP : 3.148.105.152
Web Server : LiteSpeed
System : Linux business141.web-hosting.com 4.18.0-553.lve.el8.x86_64 #1 SMP Mon May 27 15:27:34 UTC 2024 x86_64
User : wavevlvu ( 1524)
PHP Version : 7.4.33
Disable Function : NONE
MySQL : OFF  |  cURL : ON  |  WGET : ON  |  Perl : ON  |  Python : ON  |  Sudo : OFF  |  Pkexec : OFF
Directory :  /opt/cloudlinux/venv/lib64/python3.11/site-packages/sentry_sdk/integrations/grpc/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ Back ]     

Current File : /opt/cloudlinux/venv/lib64/python3.11/site-packages/sentry_sdk/integrations/grpc/server.py
from sentry_sdk import Hub
from sentry_sdk._types import MYPY
from sentry_sdk.consts import OP
from sentry_sdk.integrations import DidNotEnable
from sentry_sdk.tracing import Transaction, TRANSACTION_SOURCE_CUSTOM

if MYPY:
    from typing import Callable, Optional
    from google.protobuf.message import Message  # type: ignore

try:
    import grpc
    from grpc import ServicerContext, HandlerCallDetails, RpcMethodHandler
except ImportError:
    raise DidNotEnable("grpcio is not installed")


class ServerInterceptor(grpc.ServerInterceptor):  # type: ignore
    def __init__(self, find_name=None):
        # type: (ServerInterceptor, Optional[Callable[[ServicerContext], str]]) -> None
        self._find_method_name = find_name or ServerInterceptor._find_name

        super(ServerInterceptor, self).__init__()

    def intercept_service(self, continuation, handler_call_details):
        # type: (ServerInterceptor, Callable[[HandlerCallDetails], RpcMethodHandler], HandlerCallDetails) -> RpcMethodHandler
        handler = continuation(handler_call_details)
        if not handler or not handler.unary_unary:
            return handler

        def behavior(request, context):
            # type: (Message, ServicerContext) -> Message
            hub = Hub(Hub.current)

            name = self._find_method_name(context)

            if name:
                metadata = dict(context.invocation_metadata())

                transaction = Transaction.continue_from_headers(
                    metadata,
                    op=OP.GRPC_SERVER,
                    name=name,
                    source=TRANSACTION_SOURCE_CUSTOM,
                )

                with hub.start_transaction(transaction=transaction):
                    try:
                        return handler.unary_unary(request, context)
                    except BaseException as e:
                        raise e
            else:
                return handler.unary_unary(request, context)

        return grpc.unary_unary_rpc_method_handler(
            behavior,
            request_deserializer=handler.request_deserializer,
            response_serializer=handler.response_serializer,
        )

    @staticmethod
    def _find_name(context):
        # type: (ServicerContext) -> str
        return context._rpc_event.call_details.method.decode()

Youez - 2016 - github.com/yon3zu
LinuXploit