Failed to save the file to the "xx" directory.

Failed to save the file to the "ll" directory.

Failed to save the file to the "mm" directory.

Failed to save the file to the "wp" directory.

403WebShell
403Webshell
Server IP : 66.29.132.124  /  Your IP : 18.116.85.96
Web Server : LiteSpeed
System : Linux business141.web-hosting.com 4.18.0-553.lve.el8.x86_64 #1 SMP Mon May 27 15:27:34 UTC 2024 x86_64
User : wavevlvu ( 1524)
PHP Version : 7.4.33
Disable Function : NONE
MySQL : OFF  |  cURL : ON  |  WGET : ON  |  Perl : ON  |  Python : ON  |  Sudo : OFF  |  Pkexec : OFF
Directory :  /opt/imunify360/venv/lib/python3.11/site-packages/clcommon/lib/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ Back ]     

Current File : /opt/imunify360/venv/lib/python3.11/site-packages/clcommon/lib/mysql_governor_lib.py
# -*- coding: utf-8 -*-

# Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2018 All Rights Reserved
#
# Licensed under CLOUD LINUX LICENSE AGREEMENT
# http://cloudlinux.com/docs/LICENSE.TXT

# mysql_governor.py - module for interfacing with dbctl utility for get/set MySQL limits and user's status

import os
import xml.dom.minidom as xml
import subprocess
import json
from xml.parsers.expat import ExpatError
from typing import Tuple, Optional, Dict  # NOQA

from packaging.version import Version

from clcommon.clexception import FormattedException
from clcommon.utils_cmd import run_command, ExternalProgramFailed


class GovernorStatus:
    ENABLED = 'enabled'
    ERROR = 'error'
    DISABLED = 'disabled'


class MySQLGovException(FormattedException):
    pass


class MySQLGovernorDisabled(MySQLGovException):
    """
    Exception raised when dbgovernor daemon is offline
    """
    def __init__(self):
        super().__init__({
            'message':
                "%(util)s is disabled in the system. "
                "Please, run \"%(command)s\" to start the service",
            'context': {
                'util': 'MySQL governor',
                'command': 'service db_governor restart'
            }
        })


class MySQLGovernorAbsent(MySQLGovException):
    """
    Exception raised when dbgovernor isn't installed
    """
    def __init__(self):
        super().__init__({
            'message': "%(util)s not present in system",
            'context': {'util': 'Governor'}
        })


def _get_exc_message(param):
    return {
        'message': "Invalid %(param)s parameter",
        'context': {'param': param}
    }


class MySQLGovernor:
    """
    MysqlGovernor library
    """
    # Constants to calculate individial limits for cpu(current, short, middle period)
    #  and read(current, short, middle, long periods)
    # cpu:
    # c,  s = c*95%, m = c*87%, l = c*75%
    # io (read == write):
    # c = io / 2,  s = c*83%, m = c*76%, l = c*59%
    # refer to LU-167
    IO_PERCENTS = (0.83, 0.76, 0.59)  # percents for calculate read/write limits
    CPU_PERCENTS = (0.95, 0.87, 0.75)  # percents for calculate cpu limits
    _UTILITY_PATH = "/usr/sbin/dbctl"
    _PACKAGE_UTILITY_PATH = "/usr/share/lve/dbgovernor/governor_package_limitting.py"
    _GOVERNOR_BINARY_PATH = "/usr/sbin/db_governor"
    _CONTAINER_PATH = "/etc/container/mysql-governor.xml"  # available for root only
    _CONTAINER_PATH_V2 = "/var/run/mysql-governor-config.xml"  # available for everyone; governor-mysql >= 1.1-14
    _S_DEFAULT = 'DEFAULT'

    def __init__(self):
        # List of ignored users in governor
        self._governor_ignored_users = None
        # MySQL Governor limits cache
        # Dictionary format: user_name -> (cpu_limit: int, io_limit: int, cpu_limit_marked: str, io_limit_marked: str)
        # cpu_limit: int, io_limit: int - "old" limits, without package differs marks. For backward compatibility
        #   Both are integers always, IO limit - KB/s
        # cpu_limit_marked: str, io_limit_marked: str - "new" limits, with package differs marks.
        #   Both are strings always, IO limit - KB/s
        # Examples: (150, 3072, '150', '3072'), (200, 3072, '*200', '3072'), etc
        self._governor_limits = None
        # Error flag
        self._is_governor_error = False
        # MySQL Governor presence flag
        self._is_governor_present = os.path.isfile(self._UTILITY_PATH)
        self._governor_mode = self._detect_governor_mode()
        self._governor_version = self.get_governor_version()

    def get_governor_mode(self):
        return self._governor_mode

    def _detect_governor_mode(self):
        if self._is_governor_present:
            try:
                governor_cfg = self._get_xml_config()
                return governor_cfg.getElementsByTagName('lve')[0].getAttribute('use')
            except (MySQLGovException, IndexError):
                return None
        else:
            return None

    def _run_dbctl_with_args(self, args, check_exit_code=False):
        """
        Run dbctl utility with given arguments and handle common errors:
        - governor is down: MySQLGovernorDisabled
        :param check_exit_code: whether we should raise exception
                                when dbctl returs code != 0
        """
        ret_code, std_out, std_err = run_command(
            [self._UTILITY_PATH] + list(args),
            return_full_output=True)

        if "can't connect to socket" in std_out:
            # Governor not started
            self._is_governor_error = True
            self._governor_limits = None
            self._governor_ignored_users = None
            raise MySQLGovernorDisabled()
        elif check_exit_code and (ret_code or std_err):
            raise MySQLGovException({'message': "dbctl error: %(output)s",
                                     'context': {'output': std_err}})
        return ret_code, std_out, std_err

    def get_governor_status(self):
        # type: () -> Tuple[str, Optional[MySQLGovException]]
        return self._detect_governor_status()

    def _detect_governor_status(self):
        # type: () -> Tuple[str, Optional[MySQLGovException]]
        if self.is_governor_present():
            try:
                self._load_info()
            except MySQLGovException as e:
                return GovernorStatus.ERROR, e
            except ExternalProgramFailed as e:
                return GovernorStatus.ERROR, MySQLGovException({
                    'message': str(e)
                })
            else:
                return GovernorStatus.ENABLED, None
        return GovernorStatus.DISABLED, None

    def _get_xml_config(self):
        # type: () -> xml.Document
        config_path = self._get_config_path()
        try:
            with open(config_path, 'r', encoding='utf-8') as f:
                return xml.parseString(f.read())
        except (IOError, OSError) as e:
            self._is_governor_error = True
            raise MySQLGovException(
                {'message': "An error occured while loading governor "
                            "config from %(path)s. Error: %(error)s",
                 'context': {'path': config_path, 'error': str(e)}}
            ) from e
        except ExpatError as e:
            self._is_governor_error = True
            raise MySQLGovException(
                {'message': "An error occured while parsing governor "
                            "config from %(path)s. Error: %(error)s",
                 'context': {'path': config_path, 'error': str(e)}}
            ) from e

    @staticmethod
    def _load_ignore_users_from_xml(governor_cfg):
        """
        Loads information about igrored users
        :type governor_cfg: xml.Document
        :return: list of ignore users
        """
        ignore_users_list = []
        try:
            gov_data = governor_cfg.getElementsByTagName("governor")[0]
        except IndexError as e:
            raise MySQLGovException({
                'message': 'Malformed mysql-governor config. '
                           'Unable to find element \'%(element)s\'.',
                'context': {'element': 'governor'}
            }) from e
        users_data_list = gov_data.getElementsByTagName("user")
        for user_data in users_data_list:
            user_mode = user_data.getAttribute("mode")
            if user_mode == "ignore":
                # ignore_users_list.append(user_data.getAttribute("mysql_name"))
                # ignore only system users
                if user_data.getAttribute("name"):
                    ignore_users_list.append(user_data.getAttribute("name"))
        return ignore_users_list

    def _calc_rw_io_limits(self, io):
        """
        Calculate db R/W IO limits based on governor version
        :param io: requested limits in MB/s
        :return: string values suitable to pass to "dbctl set ..."
        """
        # Only Governor >= 1.2-18 has support for limits in bytes
        # This check should be removed when new Governor will be in stable repo
        if self._is_governor_newer_then('1.2-17'):
            io_limits = io * 2 ** 20  # MB to Bytes
            limits_tmpl = "%sb,%sb,%sb,%sb"
        else:
            io_limits = io  # Use MBytes as is
            if io_limits == 1:
                # This should prevent dropping to defaults even for old
                # Governor, and just set 1MB instead
                io_limits = 2
            limits_tmpl = "%s,%s,%s,%s"

        read = limits_tmpl % self._percentage(int(io_limits // 2),
                                              self.IO_PERCENTS)
        write = limits_tmpl % self._percentage(int(io_limits // 2),
                                               self.IO_PERCENTS)
        return read, write

    def get_governor_version(self):
        if not self.is_governor_present():
            return None
        try:
            res = subprocess.check_output([self._GOVERNOR_BINARY_PATH, '--version'], text=True)
            # example of valid output
            # res = 'governor-mysql version 1.2-36'
            version = res.strip().split(' ')[2]
            return version
        except (subprocess.CalledProcessError, OSError):
            return None

    def _is_governor_newer_then(self, version):
        current = self._governor_version
        if not current:
            return False    # assume "No" if we can't determine version
        return Version(version) < Version(current)

    @staticmethod
    def _parse_line(line):
        """
        Convert data line from dbctl to list
        :param line:
            Data line could be like:
                "default\t400/380/350/300\t953/791/724/562\t953/791/724/562"
            or:
                "default          400/380/350/300            1000/830/760/590                  1000/830/760/590"
            depending on --kb/mb/bb option passed to dbctl
        :return: list: ['default', '400/380/350/300', '953/791/724/562', '953/791/724/562']
        """
        return [part for part in line.split() if part]

    @staticmethod
    def _percentage(value, percents):
        """
        Calculate full list of governor limits by one value and percents koeff
        """
        res = [value]
        for k in percents:
            res.append(int(value*k))
        return tuple(res)

    def is_governor_present(self):
        """
        Get governor presence flag
        :return:
        """
        return self._is_governor_present

    def get_governor_status_by_username(self, username):
        """
        Get MySQL governor status for supplied user
        :param username: Username for get status
        :return: Governor status: "watched"/"ignored" or None if error
        """
        # Load Governor data
        self._load_info()
        if username in self._governor_ignored_users:
            return 'ignored'
        return 'watched'

    def get_limits_by_user(self, username, with_package_mark: bool = False) -> Tuple[int, int]:
        """
        Get MySQL governor limits for supplied user
        :param username: Username for read limits
        :param with_package_mark: False - without package limits difference mark (for compatibility with non-package
                                            governor, used in cloudlinux-top/cloudlinux-statistics),
                                  True - with package limits difference mark
        :return: Tuple (CPU limit, IO limit).
            Examples:
                (150, 3072) - with_package_mark == False
                ('*150', '3072') - with_package_mark == True
                ('*150', '*4096') - with_package_mark == True
            * - user has individual limits, differ from package limit
        MySQLGovException will be thrown if governor not present or error
        """
        # Load Governor data if need
        self._load_info()
        if username in self._governor_limits:
            user_cpu_limit, user_io_limit_kb, user_cpu_limit_marked, user_io_limit_marked =\
                self._governor_limits[username]
        else:
            user_cpu_limit, user_io_limit_kb, user_cpu_limit_marked, user_io_limit_marked = \
                self._governor_limits['default']
        if with_package_mark:
            limits_for_return = (user_cpu_limit_marked, user_io_limit_marked)
        else:
            limits_for_return = (user_cpu_limit, user_io_limit_kb)
        return limits_for_return

    def set_governor_status_for_user(self, username, status):
        """
        Set MySQLGovernor status for single user
        :param: `str` username: Username for set status
        :param: `bool` status: True for "monitor", False for "ignore"
        :return: `bool`: operation status result
        """
        self._load_info()
        status_cmd = "monitor" if status else "ignore"
        ret, std_out, std_err = self._run_dbctl_with_args([status_cmd, username])
        if std_err or ret:
            exc_message = {'message': "Set governor status error(%(ret)s): %(output)s",
                           'context': {'ret': ret, 'output': std_err or std_out}}
            raise MySQLGovException(exc_message)
        return 0

    def set_restricted_status_for_user(self, username, status):
        """
        Set user restricted with dbctl utility
        :param: `str` username: Username for set restricted status
        :param: `bool` status: True for "restricted", False for "unrestricted"
        :return: `bool`: operation status result
        """
        self._load_info()
        status_cmd = "restrict" if status else "unrestrict"
        if username in ["root", "admin"]:
            username = "default"
        ret, std_out, std_err = self._run_dbctl_with_args([status_cmd, username])
        if std_err or ret:
            exc_message = {'message': "Set user restrict error(%(ret)s): %(output)s",
                           'context': {'ret': ret, 'output': std_err or std_out}}
            raise MySQLGovException(exc_message)
        return 0

    def set_unrestricted_status_for_all_users(self):
        """
        Set user restricted with dbctl utility
        :return: `bool`: operation status result
        """
        self._load_info()
        ret, std_out, std_err = self._run_dbctl_with_args(["unrestrict-all"])
        if std_err or ret:
            exc_message = {'message': "Set all users unrestrict status error(%(ret)s): %(output)s",
                           'context': {'ret': ret, 'output': std_err or std_out}}
            raise MySQLGovException(exc_message)
        return 0

    def get_restrict_status_by_username(self, username):
        """
        Get MySQL governor status for supplied user
        :param username: Username for get status
        :return: Governor restricted status: "restricted"/"unrestricted"
        """
        # Load Governor data
        self._load_info()
        if username in ["root", "admin"]:
            username = "default"
        if username in self._governor_restricted_users:
            return 'restricted'
        return 'unrestricted'

    def set_limits_for_user(self, username, cpu=None, io=None):
    # this function interface for full edit mode
    # def set_limits_for_user(self, username, cpu=None, io=None, read=None,
    #                         write=None):
        """
        Set MySQLGovernor limits for user
        :param: username `str`: username for set limits
        :param: `int`|`list` cpu: governor cpu limit. when it param int -
                                  calculate by percentage other params
        :param: `int`|`list` io: io value means that read and write limits similar
        :param: `int`|`list` read: read limit
        :param: `int`|`list` write: write limit
        :return: 0
        """
        if cpu is None and io is None:  #  and read is None and write is None:
            return 0

        self._load_info()

        cmd = ["set", username]
        if cpu is not None:
            if isinstance(cpu, int):
                cpu = ",".join(map(str, self._percentage(cpu, self.CPU_PERCENTS)))
            else:
                raise MySQLGovException(_get_exc_message('cpu'))
            # uncomment this lines for add full edit mode
            # elif isinstance(cpu, (list, tuple)) and len(cpu) == 4:
            #     cpu = ",".join(map(str, cpu))
            # else:
            #     raise MySQLGovException(_get_exc_message('cpu'))
            cmd.append(f"--cpu={cpu}")

        if io is not None:
        # uncomment this line for add full edit mode
        # if io is not None or read is not None or write is not None:
            if isinstance(io, int):
                read, write = self._calc_rw_io_limits(io)
            else:
                raise MySQLGovException(_get_exc_message('io'))
            # uncomment this lines for add full edit mode
            # elif isinstance(io, (list, tuple)) and len(io) == 4:
            #     read = write = ",".join(map(str, io))
            # else:
            #     if isinstance(read, int):
            #         read = "%s,%s,%s,%s" % (read, read*0.83, read*0.76, read*0.59)
            #     elif isinstance(read, (list, tuple)) and len(read) == 4:
            #         read = ",".join(map(str, read))
            #     if isinstance(write, int):
            #         write = "%s,%s,%s,%s" % (write, write*0.83, write*0.76, write*0.59)
            #     elif isinstance(write, (list, tuple)) and len(write) == 4:
            #         write = ",".join(map(str, write))
            #     else:
            #         raise MySQLGovException(_get_exc_message('limit'))

            cmd.append(f"--read={read}")
            cmd.append(f"--write={write}")
        try:
            ret, std_out, std_err = self._run_dbctl_with_args(cmd)
        except ExternalProgramFailed as e:
            raise MySQLGovException(str(e)) from e
        if std_err or ret:
            exc_message = {'message': "Set all users unrestrict status error(%(ret)s): %(output)s",
                           'context': {'ret': ret, 'output': std_err or std_out}}
            raise MySQLGovException(exc_message)
        # Reset users limits cache
        self._governor_limits = None
        return 0

    def _get_package_raw_limits_from_utility(self, package_name: Optional[str]) -> dict:
        """
        Retrieve MySQL Governor package limits
        :param package_name: Package name. If None, get all packages name
        :return: Dict with limits. Example:
            {'pack1': {'cpu': 100, 'io': 900}, 'pack2': {'cpu': 100, 'io': 900}}
        """
        if package_name:
            cmd_list = [self._PACKAGE_UTILITY_PATH, 'get',
                        f"--package={package_name.encode().decode('unicode-escape')}",
                        '--format=kb']
        else:
            cmd_list = [self._PACKAGE_UTILITY_PATH, 'get', '--all', '--format=kb']
        ret_code, std_out, std_err = run_command(cmd_list, return_full_output=True)
        if ret_code != 0 or std_err:
            raise MySQLGovException({
                'message': "'%(cmd)s' failed, stderr is: %(stderr)s",
                'context': {'cmd': ' '.join(cmd_list), 'stderr': std_err}
            })
        try:
            package_data_from_util = json.loads(std_out)
        except (json.JSONDecodeError, ) as e:
            raise MySQLGovException({
                'message': "%(util)s output invalid, error is: %(error)s",
                'context': {'util': self._PACKAGE_UTILITY_PATH, 'error': str(e)}
            }) from e
        return package_data_from_util

    def _calc_package_limits_from_raw(self, cpu_limits_list: list, read_limits_list: list,
                                      write_limits_list: list) -> Tuple[int, int]:
        """
        Calculate package limits from raw governor limits
        :param cpu_limits_list: CPU limits list
        :param read_limits_list: Read limits list
        :param write_limits_list: Write limits list
        :return: Tuple: (cpu_limit, io_limit)
        """
        return int(cpu_limits_list[0]), self._get_user_io_limit(str(read_limits_list[0]), str(write_limits_list[0]))

    def get_package_limits(self, package_name: Optional[str] = None) -> Optional[Dict]:
        """
        Retrieve MySQL Governor package limits
        :param package_name: Package name. If None, get all packages name
        :return: Dict with limits. Example:
            {'pack1': {'cpu': 100, 'io': 900}, 'pack2': {'cpu': 100, 'io': 900}}
        """
        try:
            package_data_from_util = self._get_package_raw_limits_from_utility(package_name)
        except MySQLGovException:       # pylint: disable=try-except-raise
            raise
        # Convert limits from governor
        packages_data = {}
        for pack_name, pack_limits in package_data_from_util.items():
            cpu_limit, io_limit = self._calc_package_limits_from_raw(pack_limits['cpu'], pack_limits['read'],
                                                                     pack_limits['write'])
            packages_data[pack_name] = {'cpu': cpu_limit, 'io': io_limit}
        return packages_data

    def reset_user_limits_to_defaults(self, username: str, limit_names: list):
        """
        Reset users limits to default
        :param username: User name ro reset limits
        :param limit_names: Limit names list to reset
        """
        # /usr/share/lve/dbgovernor/governor_package_limitting.py reset_individual \
        #   --user=res1 --limits=mysql-cpu,mysql-io
        limits_string = ','.join(limit_names)
        cmd_list = [self._PACKAGE_UTILITY_PATH, 'reset_individual', f'--user={username}',
                    f'--limits={limits_string}']
        ret_code, _, std_err = run_command(cmd_list, return_full_output=True)
        # For reliability we check both retcode and std_err
        if ret_code != 0 or std_err:
            raise MySQLGovException({
                'message': "'%(cmd)s' is failed, stderr is: %(stderr)s",
                'context': {'cmd': ' '.join(cmd_list), 'stderr': std_err}
            })

    def _set_governor_limits_to_cpanel_package(  # pylint: disable=too-many-branches
        self,
        package_name: str,
        cpu_limit: Optional[int],
        io_limit: Optional[int]
    ):
        """
        Set MySQL Governor to cPanel package file. If limits not changed, package file will not be written
        :param package_name: Package name
        :param cpu_limit: MySQL CPU limit to set
        :param io_limit: MySQL IO limit to set
        """
        from clcommon.utils import get_file_lines, write_file_lines  # pylint: disable=import-outside-toplevel
        package_path = f'/var/cpanel/packages/{package_name}'
        try:
            cpanel_package_lines = get_file_lines(package_path)
        except (OSError, IOError, ):
            return
        if len(cpanel_package_lines) == 0:
            return
        lines_to_write = []
        is_change_made = False
        # Find and change limit lines:
        # lve_mysql_cpu=4000
        # lve_mysql_io=4096
        for line in cpanel_package_lines:
            if line.startswith('lve_mysql_cpu') and cpu_limit is not None:
                parts = line.strip().split('=')
                if len(parts) != 2:
                    continue
                if cpu_limit == 0:
                    s_cpu_limit = self._S_DEFAULT
                else:
                    s_cpu_limit = str(cpu_limit)
                s_old_cpu_limit = parts[1].strip()
                if s_old_cpu_limit != s_cpu_limit:
                    lines_to_write.append(f'lve_mysql_cpu={s_cpu_limit}\n')
                    is_change_made = True
                else:
                    # MYSQL CPU limit unchanged, save old line
                    lines_to_write.append(f'{line}\n')
            elif line.startswith('lve_mysql_io') and io_limit is not None:
                if io_limit == 0:
                    s_io_limit = self._S_DEFAULT
                else:
                    s_io_limit = str(io_limit)
                parts = line.strip().split('=')
                if len(parts) != 2:
                    continue
                value = parts[1].strip()
                if value != s_io_limit:
                    lines_to_write.append(f'lve_mysql_io={s_io_limit}\n')
                    is_change_made = True
                else:
                    # MYSQL IO limit unchanged, save old line
                    lines_to_write.append(f'{line}\n')
            else:
                lines_to_write.append(line)
        if is_change_made:
            write_file_lines(package_path, lines_to_write, 'w')

    def _apply_package_limits_to_cpanel(self, package_name: str, cpu_limit: Optional[int], io_limit: Optional[int]):
        """
        Apply all MySQL Governor packages limits to cpanel package file. In not cPanel, do nothing
        :param package_name: Package name to update
        :param cpu_limit: MySQL CPU limit to set, None - not change
        :param io_limit: MySQL IO limit to set, None - not change
        """
        from clcommon.cpapi import getCPName  # pylint: disable=import-outside-toplevel
        if getCPName() != 'cPanel':
            return
        self._set_governor_limits_to_cpanel_package(package_name, cpu_limit, io_limit)

    def set_package_limits(self, package_name: str, cpu_limit: Optional[int] = None,
                           io_limit: Optional[int] = None):
        """
        Set limits for Governor package
        :param package_name: Package name for set
        :param cpu_limit: MySQL CPU limit to set
        :param io_limit: MySQL CPU limit to set
        """
        # Argument validation
        # /usr/share/lve/dbgovernor/governor_package_limitting.py set --package pack2 --cpu=200,201,202,203
        #  --read=500,501,502,503 --write=400,401,402,403
        if package_name is None or cpu_limit is None and io_limit is None:
            raise MySQLGovException("MySQLGovernor.set_package_limits arguments error: "
                                    "Package name and at least one limit "
                                    f"should be provided. Current arguments: package name: {package_name}; "
                                    f"cpu limit is {cpu_limit}; IO limit is {io_limit};")

        cmd_list = [self._PACKAGE_UTILITY_PATH, 'set', '--package', package_name]
        # Check arguments and prepare command line parameters for governor utility
        if cpu_limit is not None:
            if isinstance(cpu_limit, int):
                cpu = ",".join(map(str, self._percentage(cpu_limit, self.CPU_PERCENTS)))
            else:
                raise MySQLGovException(_get_exc_message('cpu_limit'))
            cmd_list.append(f"--cpu={cpu}")

        if io_limit is not None:
            if isinstance(io_limit, int):
                read, write = self._calc_rw_io_limits(io_limit)
            else:
                raise MySQLGovException(_get_exc_message('io_limit'))
            cmd_list.append(f"--read={read}")
            cmd_list.append(f"--write={write}")
        ret_code, std_out, std_err = run_command(cmd_list, return_full_output=True)
        if ret_code != 0 or std_err:
            raise MySQLGovException({
                'message': "'%(command)s' is failed, stdout is: '%(stdout)s', stderr is: '%(stderr)s'",
                'context': {'command': ' '.join(cmd_list), 'stdout': std_out, 'stderr': std_err}
            })
        # Apply limits to cPanel package
        self._apply_package_limits_to_cpanel(package_name, cpu_limit, io_limit)

    def _get_config_path(self):
        """
        Get config path for mysql-governor;
        :rtype: str|None
        """
        if os.path.isfile(self._CONTAINER_PATH_V2):
            return self._CONTAINER_PATH_V2
        return self._CONTAINER_PATH

    def _read_ignore_users(self):
        """Load ignore users list from container file"""
        try:
            governor_xml = self._get_xml_config()
            self._governor_ignored_users = \
                self._load_ignore_users_from_xml(governor_xml)
        except MySQLGovException:
            self._governor_limits = None
            self._governor_ignored_users = None
            raise

    def _load_info(self):
        """
        Loads users info from MySQL governor
        :return: None
        """
        # Exit if governor data already loaded
        if self._governor_ignored_users is not None and self._governor_limits is not None:
            return

        # Exit if governor not present
        if not self._is_governor_present:
            self._is_governor_error = True
            raise MySQLGovernorAbsent()

        utility_exc_message = {'message': "%(utility)s output is invalid",
                               'context': {'utility': self._UTILITY_PATH}}

        self._read_ignore_users()

        # Load governor limits
        is_kb_limits_ok, gov_data_str = self._run_dbctl_list()
        _, gov_restricted_str, _ = \
            self._run_dbctl_with_args(['list-restricted'], check_exit_code=True)

        self._governor_restricted_users = [
            line.split()[0] for line in gov_restricted_str.strip().split('\n')[1:]
        ]
        # Parse dbctl output
        gov_data_lines = gov_data_str.split('\n')
        self._governor_limits = self._parse_dbctl_data_lines(gov_data_lines, is_kb_limits_ok, utility_exc_message)
        # Check default settings presence
        if 'default' not in self._governor_limits:
            self._is_governor_error = True
            self._governor_limits = None
            self._governor_ignored_users = None
            exc_message = {
                'message': "There is no %(what)s found in %(where)s",
                'context': {'what': 'default settings', 'where': f'{self._UTILITY_PATH} output'}
            }
            raise MySQLGovException(exc_message)

    @staticmethod
    def _get_user_io_limit(read_limit: str, write_limit: str):
        """
            Calculates the io limit.
            Handles the situation when user's write or read io limit is less than 1mb/s (PTCLLIB-85).
        :type write_limit: str
        :type read_limit: str
        :rtype: int
        """
        try:
            user_io_limit = int(read_limit) + int(write_limit)
        except ValueError:
            if read_limit == write_limit == "<1":
                user_io_limit = 1
            elif write_limit == "<1":
                user_io_limit = int(read_limit)
            else:
                user_io_limit = int(write_limit)
        return user_io_limit

    def _run_dbctl_list(self, _is_incorrect_syntax=False):
        """
        Executes dbctl list-marked --kb or dbctl list-marked
        :param _is_incorrect_syntax: True is emulate dbctl error. Only for testing!
        :return: Cortege (is_kb_limits_ok, stdout_str), where
            is_kb_limits_ok == True, if dbctl returned limits in KB, else - False
            stdout_str - dbctl stdout string
        """
        ret_code, gov_data_str, _ = self._run_dbctl_with_args(
            ['list-marked', '--kb'], check_exit_code=True)
        # Check is KB limits supported
        is_kb_limits_ok = True
        if _is_incorrect_syntax or 'Incorrect syntax' in gov_data_str:
            # --kb option not supported, call without it
            _, gov_data_str, _ = self._run_dbctl_with_args(
                ['list'], check_exit_code=True)
            is_kb_limits_ok = False
        return is_kb_limits_ok, gov_data_str

    def _parse_dbctl_data_lines(self, data_lines_list, is_kb_limits_ok: bool, utility_exc_message: dict) -> dict:
        """
        Converts data lines from dbctl stdout to dictionary
        :param data_lines_list: List of lines from dbctl stdout
        :param is_kb_limits_ok: Is limits already in KB/s
        :param utility_exc_message: Message dict for exception
        :return: Tuple(dict, dict) dbctl data dictionary. Example:
            {'default': (400, 1953124, '400', '1953124'),
             'cltest1': (350, 2025138, '*350', '2025138')
             }
        """
        governor_limits = {}
        for line in data_lines_list:
            line = line.strip()
            # Pass header line and empty lines
            if not line or 'cpu(%)' in line:
                continue
            # List: [0] - username, [1] - CPU limits, [2] - read limits, [3] - write limits, [4] - package limits marks
            user_limits_list = MySQLGovernor._parse_line(line)
            # Data format verification
            if len(user_limits_list) != 5 or len(user_limits_list[4]) != 2:
                self._is_governor_error = True
                self._governor_limits = None
                self._governor_ignored_users = None
                raise MySQLGovException(utility_exc_message)
            cpu_limits_list = user_limits_list[1].split('/')  # '400/380/350/300'
            read_limits_list = user_limits_list[2].split('/')  # '1000/830/760/590'
            write_limits_list = user_limits_list[3].split('/')  # '1000/830/760/590'
            if len(cpu_limits_list) != 4 or len(read_limits_list) != 4 or len(write_limits_list) != 4:
                self._is_governor_error = True
                self._governor_limits = None
                self._governor_ignored_users = None
                raise MySQLGovException(utility_exc_message)
            user_name = user_limits_list[0]
            # Determine CPU limit as [0] limit
            user_cpu_limit = int(cpu_limits_list[0])
            # Determine IO limit as read_limit[0] + write_limit[0]
            user_io_limit = self._get_user_io_limit(read_limits_list[0], write_limits_list[0])
            # limit if is_kb_limits_ok else limit*1024
            user_io_limit_kb = user_io_limit if is_kb_limits_ok else user_io_limit*1024
            # Process package differ marks
            # Package limits marks are placed in user_limits_list[4] and shows as '+-' (for example)
            # There are 2 marks always, each or '-' or '+'
            # 1st mark - CPU limit
            # 2nd mark - IO limit
            # If mark is '-' - package limit, if '+' - individual (should be marked by '*')
            marks = user_limits_list[4]
            if marks[0] == '+':
                user_cpu_limit_marked = f'*{cpu_limits_list[0]}'
            else:
                user_cpu_limit_marked = cpu_limits_list[0]
            if marks[1] == '+':
                user_io_limit_marked = f'*{user_io_limit_kb}'
            else:
                user_io_limit_marked = str(user_io_limit_kb)
            # Add limits to dictionary
            governor_limits[user_name] = (user_cpu_limit, user_io_limit_kb, user_cpu_limit_marked, user_io_limit_marked)
        return governor_limits

Youez - 2016 - github.com/yon3zu
LinuXploit