Failed to save the file to the "xx" directory.

Failed to save the file to the "ll" directory.

Failed to save the file to the "mm" directory.

Failed to save the file to the "wp" directory.

403WebShell
403Webshell
Server IP : 66.29.132.124  /  Your IP : 13.58.252.71
Web Server : LiteSpeed
System : Linux business141.web-hosting.com 4.18.0-553.lve.el8.x86_64 #1 SMP Mon May 27 15:27:34 UTC 2024 x86_64
User : wavevlvu ( 1524)
PHP Version : 7.4.33
Disable Function : NONE
MySQL : OFF  |  cURL : ON  |  WGET : ON  |  Perl : ON  |  Python : ON  |  Sudo : OFF  |  Pkexec : OFF
Directory :  /opt/cloudlinux/venv/lib/python3.11/site-packages/lvestats/plugins/generic/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ Back ]     

Current File : /opt/cloudlinux/venv/lib/python3.11/site-packages/lvestats/plugins/generic/cm_collector.py
# coding=utf-8
#
# Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2020 All Rights Reserved
#
# Licensed under CLOUD LINUX LICENSE AGREEMENT
# http://cloudlinux.com/docs/LICENSE.TXT

import logging
import pwd
import json
import time

from clcommon import clpwd
from clcommon.lib.jwt_token import jwt_token_check
from clcommon.lib.cmt_utils import is_cmt_disabled, is_client_enabled
from clcommon.clproc import ProcLve, LIMIT_LVP_ID
from clcommon.lib import MySQLGovernor, MySQLGovException, MySQLGovernorAbsent

from lvestat import LVEStat
from lvestats.core.plugin import LveStatsPlugin
from lvestats.lib.commons.sizeutil import mempages_to_bytes
from lvestats.lib.commons.func import get_domains
from lvestats.plugins.generic.analyzers import LVEUsage
from secureio import write_file_via_tempfile

GOVERNOR_ENABLED_MSG = (
    "There was detected MySQL Governor and information about limits will be collected starting from now"
)
GOVERNOR_BASIC_ERROR_MSG = 'Error while getting Mysql Governor limits: %s. '
GOVERNOR_ABSENT_ERROR_MSG = (
    GOVERNOR_BASIC_ERROR_MSG + "Since the governor doesn't seem to be well "
    "configured, the limits statistics won't be "
    "collected. This warning won't be repeated to "
    "prevent annoying spam in logs."
)
CM_DISABLED_MSG = (
    "Centralized Monitoring isn't enabled. "
    "CMCollector plugin won't collect data. It's expected "
    "behaviour for non CL+ clients"
)
CM_ENABLED_MSG = (
    "Centralized Monitoring has been detected! From now, "
    "CMCollector will gather limits statistics for it gently "
    "and affectionate."
)


class CMCollector(LveStatsPlugin):
    """
    Collector plugin for Cloudlinux+ Centralized Monitoring
    """

    class DomainsCache:
        """
        Sub-class for keeping username:domain pairs
        """

        def __init__(self, logger):
            self.log = logger
            self.cache = {}
            self.cache_creation_time = 0
            self.cache_expiration_interval = 60 * 60 * 24  # 1 day

        def get(self, username):
            if self._is_expired():
                self.log.info('Domains cache is expired, going to regenerate user-domains cache')
                self.cache = {}
            if username in self.cache:
                return self.cache[username]
            domain = self._get_real_value(username)
            if domain:
                self.set(username, domain)
            return domain

        def set(self, username, domain):
            if not self.cache:
                self.cache_creation_time = int(time.time())
                self.log.info('Cache creation timestamp: %s', str(self.cache_creation_time))
            self.cache[username] = domain

        def _is_expired(self):
            return int(time.time()) - self.cache_creation_time > self.cache_expiration_interval

        def _get_real_value(self, username):
            try:
                return get_domains(username, raise_exception=False)[0]
            except Exception:
                # skip unavailable user or domain
                # We ignore all exceptions to reduce failure risks on custom panels
                return None

    def __init__(self):
        self.log = logging.getLogger('CL+CM_Collector')
        self.now = 0  # This changes in MainLoop
        self.period = 60  # Run this plugin every minute
        self._filename_to_write = '/var/lve/cm_lve.json'
        self._is_write_error = False
        self._is_jwt_token_error = False
        self._is_governor_absent_error = False
        self._is_cm_enabled_error = False
        self.log.info("CM Collector plugin init")
        self.proc_lve = ProcLve()
        self.governor = MySQLGovernor()
        self.min_uid = clpwd.ClPwd().get_sys_min_uid()
        self.domains_cache = self.DomainsCache(self.log)

    def _prepare_dbgov_limits(self, username):
        """
        Gets Mysql Governor limits and returns ready-to-send
        dict with limits and zero-usage
        """
        try:
            cpu, io = self.governor.get_limits_by_user(username)
            if self._is_governor_absent_error:
                self.log.info(GOVERNOR_ENABLED_MSG)
                self._is_governor_absent_error = False
        # Catch specific governor absence error to prevent logs spam
        except MySQLGovernorAbsent as e:
            if not self._is_governor_absent_error:
                self.log.warning(GOVERNOR_ABSENT_ERROR_MSG, str(e))
                self._is_governor_absent_error = True
            return None
        except MySQLGovException as e:
            self.log.warning(GOVERNOR_BASIC_ERROR_MSG, str(e))
            return None
        return {
            'mysql_cpu': {'l': cpu},
            'mysql_io': {
                'l': io * 1024  # convert values (in KB) from governor to bytes
            },
        }

    def _parse_limits(self, procs, lvp_id=0):
        """
        Parse lve limits from /proc/lve via ProcLve instance
        """
        limits = {}
        for line in self.proc_lve.lines(lvp_id, without_limits=False):
            version = self.proc_lve.version()
            stat = LVEStat(line, version)
            if stat.id == 0 or stat.id == LIMIT_LVP_ID:
                continue
            limits[stat.id] = self._lveusage_to_dict(
                LVEUsage(version).init_limits_from_lvestat(stat, procs), only_limits=True
            )
        return limits

    def _get_lve_users(self, procs):
        """
        Returns dict with lve_id and it`s limits from /proc/lve
        """
        users = {}
        if self.proc_lve.resellers_supported():
            for lvp_id in self.proc_lve.lvp_id_list():
                users.update(self._parse_limits(procs, lvp_id=lvp_id))
        users.update(self._parse_limits(procs))
        return users

    @staticmethod
    def _lveusage_to_dict(usage: LVEUsage, only_limits: bool = False):
        """
        Return ready-to-send dict with metrics by passed lve usage
        """
        if only_limits:
            return {
                'cpu': {'l': usage.lcpu / 100},
                'ep': {'l': usage.lep},
                'pmem': {'l': mempages_to_bytes(usage.lmemphy)},
                'nproc': {'l': usage.lnproc},
                'io': {'l': usage.io},
                'iops': {'l': usage.liops},
            }
        return {
            'cpu': {
                'a': int(round(usage.cpu_usage)) / 100,
                'l': usage.lcpu / 100,
                'f': usage.cpu_fault,
            },
            'ep': {'a': usage.mep, 'l': usage.lep, 'f': usage.mep_fault},
            'pmem': {
                'a': mempages_to_bytes(int(round(usage.memphy))),
                'l': mempages_to_bytes(usage.lmemphy),
                'f': usage.memphy_fault,
            },
            'nproc': {
                'a': int(round(usage.nproc)),
                'l': usage.lnproc,
                'f': usage.nproc_fault,
            },
            'io': {
                'a': int(round(usage.io_usage)),
                'l': usage.io,
                'f': usage.io_fault,
            },
            'iops': {
                'a': int(round(usage.iops)),
                'l': usage.liops,
                'f': usage.iops_fault,
            },
        }

    def _prepare_metrics_to_send(self, lve_data):
        """
        Returns list with metrics (lve&mysql) by each lve_id
        In case there is some usage for lve_id -> it will be used
        otherwise -> zero-usage values with actual lve/mysql limits are used
        """
        lve_data_list = []
        dbgov_data_for_cm = lve_data.pop("dbgov_data_for_cm", {})
        users = self._get_lve_users(lve_data.get('procs', 1))
        lve_usage = lve_data.get('lve_usage', {})
        for lve_id, limits_data in users.items():
            try:
                username = pwd.getpwuid(lve_id).pw_name
            except Exception:
                # skip unavailable user
                continue
            domain = self.domains_cache.get(username)
            if not domain:
                continue
            lve_data_dict = {"lve_id": lve_id, "username": username, "domain": domain}
            if lve_id in lve_usage:
                v = lve_usage[lve_id]
                lve_data_dict.update(self._lveusage_to_dict(v))
            else:
                lve_data_dict.update(limits_data)
            # process MySQL governor data
            # dbgov_data_for_cm example
            # {1001: {'cpu_limit': 100, 'io_limit': 562036736, 'cpu_usage': 99.498342, 'io_usage': 2055473}}
            if lve_id in dbgov_data_for_cm:
                lve_data_dict["mysql_cpu"] = {
                    "a": dbgov_data_for_cm[lve_id]['cpu_usage'],
                    "l": dbgov_data_for_cm[lve_id]['cpu_limit'],
                }
                lve_data_dict["mysql_io"] = {
                    "a": dbgov_data_for_cm[lve_id]['io_usage'],
                    "l": dbgov_data_for_cm[lve_id]['io_limit'],
                }
            else:
                mysql_limits = self._prepare_dbgov_limits(username)
                # append mysql limits only if governor present
                if mysql_limits is not None:
                    lve_data_dict.update(mysql_limits)
            lve_data_list.append(lve_data_dict)
        return lve_data_list

    def execute(self, lve_data):
        """
        Main plugin entrypoint
        """
        # Check and read JWT token
        token_is_valid, token_error_msg, _ = jwt_token_check()
        if not token_is_valid:
            # Token absent or invalid
            if not self._is_jwt_token_error:
                self.log.info(
                    "JWT token error: '%s'. CM Collector plugin will not work, "
                    "it is expected behaviour for non CL+ clients",
                    token_error_msg,
                )
                self._is_jwt_token_error = True
            return
        # CMCollector should be executed only if CM for client is enabled
        if is_cmt_disabled() or not is_client_enabled():
            # CM isn't enabled
            if not self._is_cm_enabled_error:
                self.log.info(CM_DISABLED_MSG)
                self._is_cm_enabled_error = True
            return
        self._is_jwt_token_error = False
        if self._is_cm_enabled_error:
            # Welcome message after plugin start
            self.log.info(CM_ENABLED_MSG)
            self._is_cm_enabled_error = False
        json_result = json.dumps({'lve_stats': self._prepare_metrics_to_send(lve_data)})
        try:
            write_file_via_tempfile(json_result, self._filename_to_write, 0o600)
            self._is_write_error = False
        except (IOError, OSError) as e:
            if not self._is_write_error:
                self.log.error(str(e))
                self._is_write_error = True

Youez - 2016 - github.com/yon3zu
LinuXploit