Failed to save the file to the "xx" directory.

Failed to save the file to the "ll" directory.

Failed to save the file to the "mm" directory.

Failed to save the file to the "wp" directory.

403WebShell
403Webshell
Server IP : 66.29.132.124  /  Your IP : 3.145.76.12
Web Server : LiteSpeed
System : Linux business141.web-hosting.com 4.18.0-553.lve.el8.x86_64 #1 SMP Mon May 27 15:27:34 UTC 2024 x86_64
User : wavevlvu ( 1524)
PHP Version : 7.4.33
Disable Function : NONE
MySQL : OFF  |  cURL : ON  |  WGET : ON  |  Perl : ON  |  Python : ON  |  Sudo : OFF  |  Pkexec : OFF
Directory :  /opt/imunify360/venv/lib/python3.11/site-packages/sentry_sdk/integrations/spark/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ Back ]     

Current File : /opt/imunify360/venv/lib/python3.11/site-packages/sentry_sdk/integrations/spark/spark_worker.py
from __future__ import absolute_import

import sys

from sentry_sdk import configure_scope
from sentry_sdk.hub import Hub
from sentry_sdk.integrations import Integration
from sentry_sdk.utils import (
    capture_internal_exceptions,
    exc_info_from_error,
    single_exception_from_error_tuple,
    walk_exception_chain,
    event_hint_with_exc_info,
)

from sentry_sdk._types import MYPY

if MYPY:
    from typing import Any
    from typing import Optional

    from sentry_sdk._types import ExcInfo, Event, Hint


class SparkWorkerIntegration(Integration):
    identifier = "spark_worker"

    @staticmethod
    def setup_once():
        # type: () -> None
        import pyspark.daemon as original_daemon

        original_daemon.worker_main = _sentry_worker_main


def _capture_exception(exc_info, hub):
    # type: (ExcInfo, Hub) -> None
    client = hub.client

    client_options = client.options  # type: ignore

    mechanism = {"type": "spark", "handled": False}

    exc_info = exc_info_from_error(exc_info)

    exc_type, exc_value, tb = exc_info
    rv = []

    # On Exception worker will call sys.exit(-1), so we can ignore SystemExit and similar errors
    for exc_type, exc_value, tb in walk_exception_chain(exc_info):
        if exc_type not in (SystemExit, EOFError, ConnectionResetError):
            rv.append(
                single_exception_from_error_tuple(
                    exc_type, exc_value, tb, client_options, mechanism
                )
            )

    if rv:
        rv.reverse()
        hint = event_hint_with_exc_info(exc_info)
        event = {"level": "error", "exception": {"values": rv}}

        _tag_task_context()

        hub.capture_event(event, hint=hint)


def _tag_task_context():
    # type: () -> None
    from pyspark.taskcontext import TaskContext

    with configure_scope() as scope:

        @scope.add_event_processor
        def process_event(event, hint):
            # type: (Event, Hint) -> Optional[Event]
            with capture_internal_exceptions():
                integration = Hub.current.get_integration(SparkWorkerIntegration)
                task_context = TaskContext.get()

                if integration is None or task_context is None:
                    return event

                event.setdefault("tags", {}).setdefault(
                    "stageId", str(task_context.stageId())
                )
                event["tags"].setdefault("partitionId", str(task_context.partitionId()))
                event["tags"].setdefault(
                    "attemptNumber", str(task_context.attemptNumber())
                )
                event["tags"].setdefault(
                    "taskAttemptId", str(task_context.taskAttemptId())
                )

                if task_context._localProperties:
                    if "sentry_app_name" in task_context._localProperties:
                        event["tags"].setdefault(
                            "app_name", task_context._localProperties["sentry_app_name"]
                        )
                        event["tags"].setdefault(
                            "application_id",
                            task_context._localProperties["sentry_application_id"],
                        )

                    if "callSite.short" in task_context._localProperties:
                        event.setdefault("extra", {}).setdefault(
                            "callSite", task_context._localProperties["callSite.short"]
                        )

            return event


def _sentry_worker_main(*args, **kwargs):
    # type: (*Optional[Any], **Optional[Any]) -> None
    import pyspark.worker as original_worker

    try:
        original_worker.main(*args, **kwargs)
    except SystemExit:
        if Hub.current.get_integration(SparkWorkerIntegration) is not None:
            hub = Hub.current
            exc_info = sys.exc_info()
            with capture_internal_exceptions():
                _capture_exception(exc_info, hub)

Youez - 2016 - github.com/yon3zu
LinuXploit