Failed to save the file to the "xx" directory.

Failed to save the file to the "ll" directory.

Failed to save the file to the "mm" directory.

Failed to save the file to the "wp" directory.

403WebShell
403Webshell
Server IP : 66.29.132.124  /  Your IP : 18.119.113.73
Web Server : LiteSpeed
System : Linux business141.web-hosting.com 4.18.0-553.lve.el8.x86_64 #1 SMP Mon May 27 15:27:34 UTC 2024 x86_64
User : wavevlvu ( 1524)
PHP Version : 7.4.33
Disable Function : NONE
MySQL : OFF  |  cURL : ON  |  WGET : ON  |  Perl : ON  |  Python : ON  |  Sudo : OFF  |  Pkexec : OFF
Directory :  /opt/cloudlinux/venv/lib64/python3.11/site-packages/lvestats/eventloop/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ Back ]     

Current File : /opt/cloudlinux/venv/lib64/python3.11/site-packages/lvestats/eventloop/plugin_executors.py
# coding=utf-8
#
# Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2019 All Rights Reserved
#
# Licensed under CLOUD LINUX LICENSE AGREEMENT
# http://cloudlinux.com/docs/LICENSE.TXT

import cProfile
import logging
import multiprocessing
import os
import pstats
import signal
import sys
import threading
import time
import traceback

from sqlalchemy.exc import (
    NoReferenceError,
    NoSuchColumnError,
    NoSuchTableError,
    SQLAlchemyError,
)

from lvestats.core.plugin import LveStatsPluginTerminated
from lvestats.core.plugin_context import PluginContext
from lvestats.lib.config import ConfigError

__author__ = 'shaman'


class PluginExecutionException(Exception):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)


class DbRecoveredException(Exception):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)


class DbRecoverFailedException(Exception):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)


class PluginTimeoutException(Exception):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)


class PluginExecutor(object):
    initial_lve_data = {'stats': {}}

    def __init__(self, config):
        self.config = config
        self.log = logging.getLogger('main_loop.plugin_executor')

        from lvestats.lib.commons.func import (  # pylint: disable=import-outside-toplevel
            get_lve_version,
        )
        PluginExecutor.initial_lve_data = {'stats': {}, 'LVE_VERSION': get_lve_version()}

    def execute(self, plugin_class, now, timeout):
        """
        Executes given plugin with provided lve_data.
        Does not modify lve_data.
        Returns updated lve_data or throws PluginExecutionException or PluginTimeoutException
        :param plugin_class: object:
        :return:
        """
        raise NotImplementedError()

    def terminate(self):
        raise NotImplementedError()


KILL_PROCESS_TIMEOUT = 20


class SameProcessPluginExecutor(PluginExecutor):
    def __init__(self, config):
        super().__init__(config)
        self.plugin_context = PluginContext(config)
        self.last_execution_time = {}
        self.lve_data = PluginExecutor.initial_lve_data.copy()

    def execute(self, plugin_class, now, timeout):
        log = logging.getLogger('main_loop.plugin_executor.same_process')
        plugin_instance = self.plugin_context.get_instance(plugin_class)
        plugin_instance.now = now

        time_passed = None

        period = getattr(plugin_instance, 'period', getattr(plugin_class, 'period', None))

        if period:
            time_passed = now - self.last_execution_time.get(plugin_class, 0)

        if period is None or period <= time_passed:
            try:
                plugin_instance.execute(self.lve_data)
                self.last_execution_time[plugin_class] = now
                return
            except (NoSuchColumnError, NoSuchTableError, NoReferenceError) as ex:
                log.exception('Database error during executing plugin that leads to need of db recreation db')

                try:
                    self.plugin_context.recover_db()
                except BaseException as exception_during_recover:
                    raise DbRecoverFailedException(exception_during_recover) from exception_during_recover

                raise DbRecoveredException() from ex
            except SQLAlchemyError as ex:
                log.exception('Database error during executing plugin that may indicate the need to recreate db. '
                              'Please check it manually.')
                raise PluginExecutionException(ex) from ex
            except BaseException as ex:
                raise PluginExecutionException(ex) from ex

    def terminate(self):
        pass


class ProcessContext(object):
    def __init__(self):
        self.mgr = multiprocessing.Manager()
        self.ns = self.mgr.Namespace()
        self.ns.last_execution_time = {}
        self.plugin_ready_event = self.mgr.Event()
        self.plugin_execution_finished_event = self.mgr.Event()
        self.plugin_execution_success_event = self.mgr.Event()
        self.db_error_event = self.mgr.Event()

        self.have_to_exit = self.mgr.Event()
        self.exited = self.mgr.Event()

    def terminate(self):
        self.mgr.shutdown()


class SeparateProcessPluginExecutor(PluginExecutor):
    def __init__(self, config, profiling_log=None):
        super().__init__(config)
        self.profiling_log = profiling_log
        self.process_context = None
        self.process = None

    @staticmethod
    def _in_separate_process(config, process_context, profiler=None, profiling_log=None):
        os.setpgrp()
        signal.signal(signal.SIGTERM, SeparateProcessPluginExecutor._child_process_sigterm_handler)
        signal.signal(signal.SIGUSR1, SeparateProcessPluginExecutor._sigusr1_handler)
        log = logging.getLogger('main_loop.plugin_executor.process')
        try:
            plugin_context = PluginContext(config)
            lve_data = PluginExecutor.initial_lve_data.copy()

            while True:
                while not process_context.plugin_ready_event.wait(timeout=1):
                    if process_context.have_to_exit.is_set():
                        process_context.plugin_execution_finished_event.set()

                        if profiler:
                            with open(profiling_log, 'a+', encoding='utf-8') as f:
                                f.write('Plugin process profile, internal time:\n')
                                stats = pstats.Stats(profiler, stream=f)
                                stats.sort_stats('time').print_stats(20)

                                f.write('Plugin process profile, cumulative time:\n')
                                stats.sort_stats('cumulative').print_stats(20)
                        process_context.exited.set()
                        return

                process_context.plugin_ready_event.clear()

                plugin_class = process_context.ns.plugin_class
                now = process_context.ns.now

                try:
                    plugin_instance = plugin_context.get_instance(plugin_class)
                except ConfigError as ce:
                    log.exception(str(ce))
                    continue
                except Exception as ex:
                    log.exception("Error during instantiating plugin")
                    process_context.ns.exception = ex
                    process_context.plugin_execution_finished_event.set()
                    continue

                plugin_instance.now = now

                period = getattr(plugin_instance, 'period', getattr(plugin_class, 'period', None))
                if period is not None:
                    period = float(period)

                time_passed = None

                last_execution_time = process_context.ns.last_execution_time

                if period is not None:
                    time_passed = now - last_execution_time.get(plugin_class, 0)

                if period is None or period <= time_passed:
                    try:
                        signal.signal(signal.SIGUSR2, SeparateProcessPluginExecutor._stop_plugin)
                        log.debug("Executing plugin %s", plugin_class)

                        t0 = time.time()
                        plugin_instance.execute(lve_data)
                        log.debug("Executing plugin %s took %f sec", plugin_class, time.time() - t0)

                        last_execution_time[plugin_class] = now
                        process_context.ns.last_execution_time = last_execution_time

                        process_context.plugin_execution_success_event.set()
                        process_context.plugin_execution_finished_event.set()
                        signal.signal(signal.SIGUSR2, signal.SIG_IGN)
                    except (NoSuchColumnError, NoSuchTableError, NoReferenceError) as ex:
                        log.exception('Database error during executing plugin that leads to need of db recreation db')
                        process_context.ns.exception = ex
                        process_context.db_error_event.set()
                        process_context.plugin_execution_finished_event.set()
                        continue
                    except SQLAlchemyError as ex:
                        log.exception('Database error during executing %s plugin that may indicate the need to '
                                      'recreate db. '
                                      'Please check it manually.', plugin_class)
                        process_context.ns.exception = ex
                        process_context.plugin_execution_finished_event.set()
                        continue
                    except LveStatsPluginTerminated:
                        log.info("Plugin %s was terminated.", plugin_class)
                        sys.exit(0)
                    except IOError as ioe:
                        log.exception("IO Error: %s", str(ioe))
                        continue
                    except Exception as ex:
                        log.exception("Other exception during execution of plugin %s", plugin_class)
                        process_context.ns.exception = ex
                        process_context.plugin_execution_finished_event.set()
                        continue
                else:
                    log.debug("plugin %s will be launched in %f sec", plugin_class, period - time_passed)
                    process_context.plugin_execution_success_event.set()
                    process_context.plugin_execution_finished_event.set()
        except Exception as ex:
            try:
                process_context.ns.exception = ex
            except IOError as e:
                # Hide "IOError: [Errno 32] Broken pipe" on KeyboardInterrupt
                # Probably there is a better way to do this
                if e.errno == 32 and isinstance(ex, EOFError):
                    return
                raise e     # don't hide any other error
            log.exception("Exception during execution in separate process")
            process_context.plugin_execution_finished_event.set()

    @staticmethod
    def _do_profile_in_separate_process(config, process_context, profiling_log):
        profiler = cProfile.Profile()
        profiler.runcall(SeparateProcessPluginExecutor._in_separate_process,
                         config,
                         process_context,
                         profiler,
                         profiling_log)

    def _make_sub_process(self, ):
        self.process_context = ProcessContext()
        target = SeparateProcessPluginExecutor._do_profile_in_separate_process if self.profiling_log else \
            SeparateProcessPluginExecutor._in_separate_process

        self.process = multiprocessing.Process(target=target,
                                               name='plugin_process',
                                               args=(self.config, self.process_context, self.profiling_log))
        self.process.start()

    @staticmethod
    def _stop_plugin(signum, frame):
        log = logging.getLogger('plugin_sigusr2_handler')
        log.info('Shutting down plugin')
        raise LveStatsPluginTerminated()

    @staticmethod
    def _child_process_sigterm_handler(signum, frame):
        log = logging.getLogger('subprocess_sigterm_handler')
        log.info('Shutting down child process')
        sys.exit(0)

    def _kill_subprocess(self, pid):
        if not pid:
            return

        os.kill(pid, signal.SIGKILL)
        self.log.debug("subprocess killed")

    def _terminate_sub_process(self):
        if not self.process or not self.process_context:
            return

        self.log.debug('Terminating subprocess')
        self.process_context.have_to_exit.set()

        if self.process_context.exited.wait(KILL_PROCESS_TIMEOUT):
            self.process = None
            self.process_context.terminate()
            self.process_context = None
            return

        self.log.debug('Killing subprocess')
        pid = self.process.pid

        try:
            self.process.terminate()
            self.process.join(KILL_PROCESS_TIMEOUT)
            if self.process.is_alive():
                self._kill_subprocess(pid)
        except Exception as ex:
            self.log.exception("Wasn't able to kill subprocess because of the error: %s", str(ex))
            self._kill_subprocess(pid)
        finally:
            self.process = None
            self.process_context.terminate()
            self.process_context = None

    @staticmethod
    def _sigusr1_handler(signum, frame):
        log = logging.getLogger('stack tracer')
        log.info("--- Threads stack traces, while plugin is considered stuck ---")
        id2name = dict([(th.ident, th.name) for th in threading.enumerate()])
        code = []
        for thread_id, stack in list(sys._current_frames().items()):  # pylint: disable=protected-access
            code.append(f"\n# Thread: {id2name.get(thread_id, '')}({thread_id})")
            for filename, lineno, name, line in traceback.extract_stack(stack):
                code.append(f'File: "{filename}", line {lineno}, in {name}')
                if line:
                    code.append(f"  {line.strip()}")
        log.info("\n".join(code))

    def terminate(self):
        self._terminate_sub_process()

    def _restart_sub_process(self):
        self._terminate_sub_process()
        self._make_sub_process()

    def _get_process_and_context(self):
        if not self.process or not self.process_context:
            self._make_sub_process()
        return self.process, self.process_context

    def execute(self, plugin_class, now, timeout):
        if self.profiling_log:
            timeout = None

        _, context = self._get_process_and_context()

        t0 = time.time()
        context.ns.plugin_class = plugin_class
        context.ns.now = now

        context.plugin_execution_finished_event.clear()
        context.plugin_execution_success_event.clear()

        context.plugin_ready_event.set()
        self.log.debug('Executor (main process) data exchange with plugin took %f sec', time.time() - t0)
        context.plugin_execution_finished_event.wait(timeout)

        db_corrupted = context.db_error_event.is_set()

        if db_corrupted:
            self._terminate_sub_process()
            try:
                recreator = PluginContext(self.config)
                recreator.recover_db()
            except BaseException as exception_during_recover:
                raise DbRecoverFailedException(exception_during_recover) from exception_during_recover
            self._make_sub_process()
            raise DbRecoveredException()

        in_time = context.plugin_execution_finished_event.is_set()
        if not in_time:
            os.kill(self.process.pid, signal.SIGUSR1)
            time.sleep(timeout / 2.0)
            self._restart_sub_process()
            raise PluginTimeoutException()

        success = context.plugin_execution_success_event.is_set()
        if not success:
            ex = context.ns.exception
            self._terminate_sub_process()
            if ex:
                raise PluginExecutionException(ex)
            else:
                raise PluginExecutionException()


__all__ = ['PluginExecutor', 'SameProcessPluginExecutor', 'SeparateProcessPluginExecutor',
           'PluginExecutionException', 'PluginTimeoutException', 'DbRecoveredException',
           'DbRecoverFailedException']

Youez - 2016 - github.com/yon3zu
LinuXploit