Failed to save the file to the "xx" directory.

Failed to save the file to the "ll" directory.

Failed to save the file to the "mm" directory.

Failed to save the file to the "wp" directory.

403WebShell
403Webshell
Server IP : 66.29.132.124  /  Your IP : 52.15.113.71
Web Server : LiteSpeed
System : Linux business141.web-hosting.com 4.18.0-553.lve.el8.x86_64 #1 SMP Mon May 27 15:27:34 UTC 2024 x86_64
User : wavevlvu ( 1524)
PHP Version : 7.4.33
Disable Function : NONE
MySQL : OFF  |  cURL : ON  |  WGET : ON  |  Perl : ON  |  Python : ON  |  Sudo : OFF  |  Pkexec : OFF
Directory :  /opt/cloudlinux/venv/lib64/python3.11/site-packages/cllimits/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ Back ]     

Current File : /opt/cloudlinux/venv/lib64/python3.11/site-packages/cllimits/lvectl_lib.py
# -*- coding: utf-8 -*-

# lvectl.py - module for interfacing with lvectl utility for get/set LVE limits
#
# Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2019 All Rights Reserved
#
# Licensed under CLOUD LINUX LICENSE AGREEMENT
# http://cloudlinux.com/docs/LICENSE.TXT

import copy
import json

import lvectllib
from clcommon import cpapi
from clcommon.clexception import FormattedException
from cllimits.lib import exec_utility
from cllimits.lib.utils import _convert_memory_value_to_adaptive_format


class LvectlException(FormattedException):
    pass


class LveCtl:
    """
    Library for work with LVE limits
    """
    MEGAHERZ = 1000000
    GIGAHERZ = 1000000000
    lve_version = None
    _UTILITY_PATH = "/usr/sbin/lvectl"
    _packages_limits = None  # Panel packages limits
    _package_data = None  # Panel LVE limits
    _resellers_data = None  # Panel LVP limits

    def __init__(self):
        """
        Object constructor. Get LVE version
        """
        _, s_lve_version = exec_utility(self._UTILITY_PATH, ["lve-version"])
        self.lve_version = int(s_lve_version)
        self._package_data = None  # type: dict
        self._resellers_data = None  # type: dict

    @staticmethod
    def get_panel_users_uid_list():
        """
        Get panel users uid list
        :return: List of uids
        """
        user_packages = cpapi.list_users()
        uid_list = list(user_packages.keys())
        uid_list.append(0)
        return uid_list

    def get_limits_by_user_id(self, user_id, human_readable_numbers=False):
        """
        Reads limits by user id
        :param user_id: user/lve id
        :param human_readable_numbers: if True PMEM and VMEM limits in KBytes, MBytes or GBytes
            False - in bytes
        :return: dictionary
        """
        # Load LVE limits data if need
        self._load_info(human_readable_numbers, userid=user_id)
        if user_id in self._package_data:
            user_limits = copy.deepcopy(self._package_data[user_id])
            del user_limits["PACKAGE"]
            return user_limits

        # No limits, use default
        # user_limits = self._package_data[0]
        user_limits = copy.deepcopy(self._package_data[0])
        del user_limits["PACKAGE"]
        return user_limits

    def get_limits_by_reseller_name(self, reseller_name, human_readable_numbers=False):
        """
        Reads limits by reseller name
        :param reseller_name:
        :param: id - user/lve id
        :param human_readable_numbers: if True PMEM and VMEM limits in KBytes, MBytes or GBytes
            False - in bytes
        :rtype: dict
        """
        # lvectl list-reseller <reseller_name> --json --with-name
        self._load_resellers_info(human_readable_numbers, reseller_name)
        reseller_limits = copy.deepcopy(self._resellers_data.get(reseller_name))
        return reseller_limits

    def get_default_limits_by_reseller_name(self, reseller_name, human_readable_numbers=False):
        """
        Reads limits by reseller name
        :param reseller_name: reseller's name
        :param human_readable_numbers: if True PMEM and VMEM limits in KBytes, MBytes or GBytes
            False - in bytes
        :rtype: dict[str, str | dict]
        """
        if not lvectllib.lve.reseller_limit_supported():
            return

        lvectl_args = ['list-reseller', reseller_name, '--json', '--with-name']
        if not human_readable_numbers:
            lvectl_args.append('--bytes')
        ret_code, std_out = exec_utility(self._UTILITY_PATH, lvectl_args)
        loaded_json = json.loads(std_out)
        if ret_code != 0:
            raise LvectlException(loaded_json['msg'])

        default_info = loaded_json.get('data', [])
        if default_info:
            default_limits = {
                'PACKAGE': 'DEFAULT',
                'cpu': {'all': default_info[0]['SPEED']},
                'ep': default_info[0]['EP'],
                'io': {'all': default_info[0]['IO']},
                'iops': default_info[0]['IOPS'],
                'nproc': default_info[0]['NPROC'],
                'pmem': _convert_memory_value_to_adaptive_format(default_info[0]['PMEM'],
                                                                 human_readable_numbers),
                'vmem': _convert_memory_value_to_adaptive_format(default_info[0]['VMEM'],
                                                                 human_readable_numbers)
            }
            return default_limits

    def get_package_name_by_user_id(self, user_id):
        """
        Get package name by user id.
        None package returns as str(None) for user checker compatibility:
          'not str(None)' is True
        :param: id - user/lve id
        :return: Package name
        """
        if self._package_data is None:
            from clcommon.cpapi import reseller_package_by_uid  # pylint: disable=import-outside-toplevel
            try:
                package_name = str(reseller_package_by_uid(user_id)[1])
            except IndexError:
                # No user with such uid, use empty package name
                package_name = ''
        else:
            if user_id in self._package_data:
                package_name = str(self._package_data[user_id]["PACKAGE"])
            else:
                # default
                package_name = str(self._package_data[0]["PACKAGE"])
        return package_name

    def __set_error(self, param, lve_id, err):
        return {'message': "%(what)s set error for uid=%(uid)s%(error)s",
                'context': {'what': param, 'uid': lve_id,
                            'error': f" [{err}]" if err else ""}}

    def _set_container_limits_by_id(self, command, container_id, limits, reseller_name=None):
        """
        Set limits for given container id
        :param: str command: 'set' | 'set-reseller', based on container type
        :param: int | str container_id: LVE | LVP id for set limits
        :param: dict limits: new LVE limits. Available keys: speed, vmem, pmem, mep, io, nproc, iops
                    and 'save-all-parameters'.
                    All other keys are ignoring. If some parameter absent on current LVE version
                    (for example pmem on LVE4), it will be ignored too.
        :param reseller_name: Reseller name
        """
        lvectl_args = [command, str(container_id)]
        # 1. Check arguments and form lvectl call arguments
        if "mep" in limits or "ep" in limits:
            limits["maxEntryProcs"] = limits.get("mep", limits.get("ep"))
        for k in ("speed", "vmem", "pmem", "maxEntryProcs", "io", "nproc", "iops"):
            v = limits.get(k)
            if v is None:
                continue
            v = str(v).strip()
            if k in ["pmem", "nproc", "iops"] and self.lve_version == 4:
                continue
            if k in ["iops"] and self.lve_version == 6:
                continue
            if k == "speed" and v.isdigit():
                v = f"{v}%"
            lvectl_args.append(f"--{k}={v}")
        if len(lvectl_args) <= 2:
            return 0
        if limits.get("save-all-parameters"):
            lvectl_args.append("--save-all-parameters")
        # Add reseller name if need
        if reseller_name:
            lvectl_args.append(f"--reseller={reseller_name}")
        # 2. call utility to set limits
        ret_code, out, err = exec_utility(self._UTILITY_PATH, lvectl_args, stderr=True)
        if ret_code != 0:
            # Set limits error
            raise LvectlException(self.__set_error('Limits', container_id, err))

    def _set_container_limits_by_id_or_name(self, reseller_id):
        """
        Set limits for given container id
        :param reseller_id: LVP id or reseller's name or '--all'
        :type reseller_id: int | str
        """
        lvectl_args = ['set-reseller', reseller_id]
        ret_code, out, err = exec_utility(self._UTILITY_PATH, lvectl_args, stderr=True)
        if ret_code != 0:
            # Set limits error
            raise LvectlException(self.__set_error('Limits', reseller_id, err))

    def set_lve_limits_by_user_id(self, lve_id, limits, reseller_name=None):
        """
        Wrapper for _set_container_limits_by_id, set limits for lve_id;
        :param int lve_id: user's container id
        :param limits: dict with limits to set
        :param reseller_name: Reseller name
        :return: int
        """
        if bool(lve_id) and not self.get_package_name_by_user_id(lve_id):
            return 1
        self._set_container_limits_by_id('set', lve_id, limits, reseller_name=reseller_name)
        return 0

    def set_lvp_limits_by_reseller_id(self, lvp_id, limits):
        """
        Wrapper for _set_container_limits_by_id, set limits for lvp_id;
        :type lvp_id: int
        :type limits: dict
        :return: int
        """
        self._set_container_limits_by_id('set-reseller', lvp_id, limits)
        return 0

    def set_lvp_defaults_by_reseller_id(self, lvp_id, limits):
        """
        Wrapper for _set_container_limits_by_id, set limits for lvp_id;
        :type lvp_id: int
        :type limits: dict
        :return: int
        """
        self._set_container_limits_by_id('set-reseller-default', lvp_id, limits)
        return 0

    def set_lve_unlimited(self, lve_id, reseller_name=None):
        """
        Set unlimited LVE for lve_id
        :param: lve_id `int`: LVE id
        :param reseller_name: Reseller name
        :return: 0
        """
        args = ["set", str(lve_id), "--unlimited"]
        if reseller_name is not None:
            args.extend(['--reseller', reseller_name])
        ret_code, err = exec_utility(self._UTILITY_PATH, args)
        if ret_code != 0:
            raise LvectlException(self.__set_error('Unlimited', lve_id, err))
        return 0

    def set_lvp_unlimited(self, lvp_id):
        """
        Set unlimited LVP for reseller;
        Accepts name or id;
        :type lvp_id: str | int
        :return: 0
        """
        args = ["set-reseller", str(lvp_id), "--unlimited"]
        ret_code, err = exec_utility(self._UTILITY_PATH, args)
        if ret_code != 0:
            raise LvectlException(self.__set_error('Unlimited', lvp_id, err))
        return 0

    def reset_lve_limits(self, lve_id, limits):
        """
        Reset LVE limits for lve_id. Set default limits for LVE package or
        system default LVE
        :param: lve_id `int: LVE id
        :return: 0
        """
        args = ["set", str(lve_id), f"--default={','.join(limits)}"]
        ret_code, err = exec_utility(self._UTILITY_PATH, args)
        if ret_code != 0:
            raise LvectlException(self.__set_error('Default', lve_id, err))
        return 0

    def reset_reseller_limits(self, reseller_name, limits):
        """
        Reset LVP limits for reseller_name.
        :param: reseller_name str:
        :return: 0
        """
        args = ["set-reseller", str(reseller_name), f"--default={','.join(limits)}"]
        ret_code, err = exec_utility(self._UTILITY_PATH, args)
        if ret_code != 0:
            raise LvectlException(self.__set_error('Default', reseller_name, err))
        return 0

    def apply_all_limits(self):
        """
        Apply all already configured limits
        :return: ret code
        """
        ret_code, err = exec_utility(self._UTILITY_PATH, ["apply", "all"])
        return ret_code

    def disable_reseller_limits(self, reseller_name):
        """
        Disable reseller limits for given name;
        Equivalent to lvectl remove-reseller <str>
        :type reseller_name: str
        :return: 0
        """
        args = ["remove-reseller", str(reseller_name), '--json']
        ret_code, err = exec_utility(self._UTILITY_PATH, args)
        if ret_code != 0:
            raise LvectlException(self.__set_error('Disable reseller limits', reseller_name, err))
        return 0

    def _load_resellers_info(self, human_readable_numbers, reseller_name):
        """
        Load information about resellers;
        :type human_readable_numbers: bool
        :type reseller_name: str | None
        :return:
        """
        self._resellers_data = {}
        if not lvectllib.lve.reseller_limit_supported():
            return

        lvectl_args = ['list-reseller', '--json', '--with-name']
        if not human_readable_numbers:
            lvectl_args.append('--bytes')
        ret_code, std_out = exec_utility(self._UTILITY_PATH, lvectl_args)
        loaded_json = json.loads(std_out)
        if ret_code != 0:
            raise LvectlException(loaded_json['msg'])

        # To edit reseller limits in UI, admin should have ability to see
        # all resellers, even those for which limits have not yet been enabled.
        # So we just init dict with all resellers and frontend will rely
        # on empty "limits" key to determine whether reseller limits has been
        # already set for particular reseller or not.
        for reseller in cpapi.resellers():
            if reseller_name and reseller_name != reseller:
                continue
            self._resellers_data[reseller] = {'name': reseller, 'limits': {}}

        # Refresh settings with utility's data
        for reseller in loaded_json.get('data', []):
            id_, name = reseller['ID'].split(':')
            if reseller_name and reseller_name != name:
                continue
            reseller_info = {
                'id': id_,
                'name': name,
                'limits': {
                    'cpu': {'all': reseller['SPEED']},
                    'ep': reseller['EP'],
                    'io': {'all': reseller['IO']},
                    'iops': reseller['IOPS'],
                    'nproc': reseller['NPROC'],
                    'pmem': _convert_memory_value_to_adaptive_format(reseller['PMEM'],
                                                                     human_readable_numbers),
                }
            }
            self._resellers_data[reseller_info['name']] = reseller_info

    def _load_info(self, human_readable_numbers, userid=None, reseller=None):
        """
        Loads all package info from lvectl
        :param human_readable_numbers: if True PMEM and VMEM limits in KBytes, MBytes or GBytes
            False - in bytes
        :return: None
        """
        if self._resellers_data is None:
            self._load_resellers_info(human_readable_numbers, reseller)

        if self._package_data is not None:
            return

        # Get panel packages data from lvectl utility
        # Create Panel packages data
        self._package_data = {}
        if userid is not None:
            lvectl_args = ['paneluserlimits', str(userid)]
        elif reseller is not None:
            lvectl_args = ['paneluserslimits', str(reseller)]
        else:
            lvectl_args = ['paneluserslimits']
        lvectl_args.append('--json')
        if not human_readable_numbers:
            lvectl_args.append('--bytes')
        ret_code, std_out = exec_utility(self._UTILITY_PATH, lvectl_args)
        loaded_json = json.loads(std_out)
        if ret_code != 0:
            raise LvectlException(loaded_json['msg'])
        json_data = loaded_json['data']
        for pkg_data in json_data:
            pkg_limits = {}
            pkg_name = pkg_data['PACKAGE']
            if pkg_name == 'VE_DEFAULT':
                pkg_name = 'DEFAULT'
            # self._package_data[pkg_data['ID']] = pkg_name
            pkg_limits['PACKAGE'] = pkg_name
            # All LVE
            pkg_limits['cpu'] = {'all': pkg_data['SPEED']}
            pkg_limits['io'] = {'all': pkg_data['IO']}
            pkg_limits['vmem'] = _convert_memory_value_to_adaptive_format(pkg_data['VMEM'],
                                                                          human_readable_numbers)
            pkg_limits['ep'] = pkg_data['EP']
            if self.lve_version >= 6:
                # LVE 6, 8
                pkg_limits['pmem'] = _convert_memory_value_to_adaptive_format(pkg_data['PMEM'],
                                                                              human_readable_numbers)
                pkg_limits['nproc'] = pkg_data['NPROC']
            if self.lve_version >= 8:
                # LVE 8
                pkg_limits['iops'] = pkg_data['IOPS']
            self._package_data[int(pkg_data['ID'])] = pkg_limits

        if reseller is not None:
            reseller_defaults = self.get_default_limits_by_reseller_name(reseller, human_readable_numbers)
            if reseller_defaults:
                self._package_data[0] = reseller_defaults
                return

        if 0 not in self._package_data:
            # Defaults limits not found, set them manually
            pkg_limits = {}
            pkg_limits['PACKAGE'] = 'DEFAULT'
            # All LVE
            pkg_limits['cpu'] = {'all': '0'}
            pkg_limits['io'] = {'all': '0'}
            pkg_limits['vmem'] = '0K'
            pkg_limits['ep'] = '0'
            if self.lve_version >= 6:
                # LVE 6, 8
                pkg_limits['pmem'] = '0K'
                pkg_limits['nproc'] = '0'
            if self.lve_version >= 8:
                # LVE 8
                pkg_limits['iops'] = '0'
            self._package_data[0] = pkg_limits

Youez - 2016 - github.com/yon3zu
LinuXploit