Failed to save the file to the "xx" directory.

Failed to save the file to the "ll" directory.

Failed to save the file to the "mm" directory.

Failed to save the file to the "wp" directory.

403WebShell
403Webshell
Server IP : 66.29.132.124  /  Your IP : 18.225.55.42
Web Server : LiteSpeed
System : Linux business141.web-hosting.com 4.18.0-553.lve.el8.x86_64 #1 SMP Mon May 27 15:27:34 UTC 2024 x86_64
User : wavevlvu ( 1524)
PHP Version : 7.4.33
Disable Function : NONE
MySQL : OFF  |  cURL : ON  |  WGET : ON  |  Perl : ON  |  Python : ON  |  Sudo : OFF  |  Pkexec : OFF
Directory :  /opt/cloudlinux/venv/lib64/python3.11/site-packages/cllimits_validator/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ Back ]     

Current File : /opt/cloudlinux/venv/lib64/python3.11/site-packages/cllimits_validator/limits_validator.py
# coding: utf-8
#
# Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2021 All Rights Reserved
#
# Licensed under CLOUD LINUX LICENSE AGREEMENT
# http://cloudlinux.com/docs/LICENSE.TXT

from collections import defaultdict
from itertools import chain
from typing import Any, Dict, List, Optional, Tuple, Union  # NOQA

from cllimits import LveCtl
from cllimitslib_v2 import DEFAULTS, LimitsDataStorage
from clveconfig.ve_config_reader import DEFAULT_PROVIDER

from .base import ENTITY_DEFAULTS, ENTITY_PACKAGE, ENTITY_RESELLER, ENTITY_USER, BaseValidator  # NOQA
from .ep_nproc_validator import EpNprocValidator


class LimitsValidator:

    # This dict contains keys which are old names of entities
    # and contais values which are new names of entities.
    # Old names will be replaced by new names for improve readability of error message
    dict_of_replaced_names_of_entities = {
        'user': 'User(s)',
        'package': 'Package(s)',
        'reseller': 'Reseller(s)',
        'defaults': 'Defaults of resellers'
    }
    # This dict contains keys which are old ids of entities
    # and contais values which are new ids of entities.
    # Old ids will be replaced by ids names for improve readability of error message
    dict_of_replaced_ids_of_entities = {
        'defaults': 'Hoster defaults limits'
    }
    # This list contains ids entities which will be deleted
    # for improve readability of error message
    list_of_deleted_ids_of_entities = [
        DEFAULTS
    ]

    # This dict contais keys which are old specific messages and
    # contains values which are new specific message
    # Old specific msgs will be replaced by new for improve readability of error message
    dict_of_replaced_specific_messages = {
        'inherited from Global': 'inherited from Hoster'
    }

    def __init__(self, _limits_data_storage=None, validators=None):
        # type: (Optional[LimitsDataStorage], Optional[List[BaseValidator]]) -> None
        self.limits_provider = _limits_data_storage or LimitsDataStorage()
        self._list_validators = validators or [
            EpNprocValidator(self.limits_provider),
        ]  # type: List[BaseValidator]

        self.message_dict = {
            'common_msg': None,
            'specific_msg': None,
            'affected_entity': None,
            'entity_id': None,
        }

    @staticmethod
    def _format_message_string_from_single_msg_dict(msg_dict):
        # type: (Dict[str, str, str, str]) -> str
        """
        Format message string from single message dict.
        That function should be used for formatting message while validation input limits
        """

        common_msg = msg_dict['common_msg']
        specific_msg = msg_dict['specific_msg']
        affected_entity = msg_dict['affected_entity']
        entity_id = msg_dict['entity_id']

        if affected_entity is not None and entity_id is not None:
            if entity_id == DEFAULTS:
                msg_about_affect = 'Hoster defaults limits is affected'
            elif affected_entity == ENTITY_DEFAULTS:
                msg_about_affect = f'Defaults limits of Reseller "{entity_id}" are affected.'
            elif affected_entity == ENTITY_USER:
                msg_about_affect = f'Limits of user with LVE ID "{entity_id}" are affected.'
            elif affected_entity == ENTITY_RESELLER:
                msg_about_affect = f'Limits of Reseller "{entity_id}" are affected.'
            else:
                msg_about_affect = f'Limits of package "{entity_id}" are affected.'
        else:
            msg_about_affect = ''
        if specific_msg is None:
            specific_msg = ''

        result_msg = f'{common_msg} {msg_about_affect} {specific_msg}'

        return result_msg

    def _call_validators_and_process_result(self, only_input_limits, *args, **kwargs):
        # type: (bool, *Any, **Any) -> Tuple[bool, List]
        """
        Call each validator from validators list and processing results
        args and kwargs are argumets for calling of validator
        We validate limits which already are recorded in ve.cfg if only_input_limits is False.
        We validate limits which we want to set if only_input_limits is True
        We return aggregated result as bool value and list of messages
        which explain which limits are wrong and who holds those
        """

        msg_list = []
        total_result = True
        for validator in self._list_validators:
            if only_input_limits:
                result, msg_dict = validator.validate_input_limits(*args, **kwargs)
            else:
                result, msg_dict = validator.validate_existing_limits(*args, **kwargs)
            total_result &= result
            msg_list.append(msg_dict)
        return total_result, msg_list

    def validate_input_limits(self, entity_type, entity_id, input_limits, reseller=DEFAULT_PROVIDER):
        # type: (str, Union[str, int], Dict, str) -> Tuple[bool, List]

        check_reseller_defaults = False
        if entity_type == ENTITY_USER:
            validated_entity = self.limits_provider.get_user_by_uid(int(entity_id))
        elif entity_type == ENTITY_PACKAGE:
            validated_entity = self.limits_provider.get_package_by_name_and_reseller(str(entity_id), str(reseller))
        elif entity_type == ENTITY_RESELLER:
            validated_entity = self.limits_provider.get_reseller_by_name(str(entity_id))
        elif entity_type == ENTITY_DEFAULTS:
            if entity_id != DEFAULTS:
                validated_entity = self.limits_provider.get_reseller_by_name(str(entity_id))
            else:
                validated_entity = entity_id
            check_reseller_defaults = True
        else:
            validated_entity = None

        total_result, msg_list = self._call_validators_and_process_result(
            True,
            validated_entity,
            input_limits,
            check_reseller_defaults,
        )
        msg_list = [self._format_message_string_from_single_msg_dict(msg_dict) for msg_dict in msg_list]

        return total_result, msg_list

    def _validate_users(self):
        # type: () -> List[Tuple[bool, List[Dict]]]

        users = self.limits_provider.users.values()
        result = [self._call_validators_and_process_result(False, user) for user in users]

        return result

    def _validate_packages(self):
        # type: () -> List[Tuple[bool, List[Dict]]]

        packages = self.limits_provider.packages.values()
        result = [self._call_validators_and_process_result(False, package) for package in packages]

        return result

    def _validate_resellers(self):
        # type: () -> List[Tuple[bool, List[Dict]]]

        resellers = self.limits_provider.resellers.values()
        # We should validate only activated resellers,
        # because not activated resellers don't have any limits.
        result = [self._call_validators_and_process_result(False, reseller) for reseller in resellers
                  if reseller.defaults is not None]

        return result

    def _validate_defaults(self):
        # type: () -> List[Tuple[bool, List[Dict]]]

        result = []
        for default_name in self.limits_provider.defaults.keys():
            default_entity = self.limits_provider.resellers.get(default_name, DEFAULTS)
            # We should validate only defaults which belong to activated resellers,
            # because not activated resellers don't have any limits.
            if default_entity != DEFAULTS and default_entity.defaults is None:
                continue
            result.append(self._call_validators_and_process_result(False, default_entity))
        return result

    def _replace_specific_msgs(self, specific_msg):
        # type: (str) -> str
        """
        Replacing some specific messages for improve readability of error message
        """
        for old_spec_msg, new_spec_msg in self.dict_of_replaced_specific_messages.items():
            if specific_msg is None:
                break
            specific_msg = specific_msg.replace(old_spec_msg, new_spec_msg)

        return specific_msg

    def _replace_ids_of_entities(self, entity_id):
        # type: (str) -> str
        """
        Replacing some ids of entities for improve readability of error message
        """
        if entity_id in self.dict_of_replaced_ids_of_entities:
            return self.dict_of_replaced_ids_of_entities[entity_id]
        return entity_id

    def _replace_names_of_entities(self, entity_name):
        # type: (str) -> str
        """
        Replacing some names of entities for improve readability of error message
        """
        if entity_name in self.dict_of_replaced_names_of_entities:
            return self.dict_of_replaced_names_of_entities[entity_name]
        return entity_name

    def _format_message_string_for_existing_limits(self, total_result_list):
        # type: (List[Dict[str, str, str, str]]) -> str
        """
        Format message string from result of execution of validation the existing limits
        """

        total_result_dict = defaultdict(lambda: defaultdict(list))
        total_msg = None

        # We are groupping affected entities (entity_id) by name of check (common_msg)
        # and type of affected entity (affected_entity).
        for msg_dict in total_result_list:
            name_of_check = msg_dict['common_msg']
            type_of_affected_entity = self._replace_names_of_entities(msg_dict['affected_entity'])
            entity_id = self._replace_ids_of_entities(msg_dict['entity_id'])
            # Deleting some ids of entities for improve readability of error message
            if entity_id in self.list_of_deleted_ids_of_entities:
                continue
            total_result_dict[name_of_check][type_of_affected_entity].append({
                'entity_id': entity_id,
                'specific_msg': self._replace_specific_msgs(msg_dict['specific_msg'])
            })

        for common_msg, entities_dict in total_result_dict.items():
            total_msg = f'{"" if total_msg is None else total_msg}{common_msg}\n'

            for entity_type, list_entity_dicts in entities_dict.items():
                affected_entities = []

                # make string as `entity_1 (specific_msg_1), entity_2, entity_3, entity_4 (specific_msg)
                for entity_dict in list_entity_dicts:
                    if entity_dict['specific_msg'] is None:
                        affected_entity = entity_dict['entity_id']
                    else:
                        affected_entity = f'{entity_dict["entity_id"]} ({entity_dict["specific_msg"]})'

                    affected_entities.append(affected_entity)
                affected_entities = ', '.join(str(item) for item in set(affected_entities))

                total_msg = f'{total_msg}{entity_type.capitalize()}: {affected_entities}\n'

        return total_msg

    def validate_existing_limits(self):
        # type: () -> Optional[str]

        total_result_list = [
            msg_dict for result, msg_dict_list in chain(
                self._validate_users(),
                self._validate_packages(),
                self._validate_resellers(),
                self._validate_defaults(),
            ) for msg_dict in msg_dict_list if not result
        ]

        total_msg = self._format_message_string_for_existing_limits(total_result_list)
        return total_msg

    @staticmethod
    def is_low_pmem_limit_present() -> bool:
        """
        Check is Low PMEM limit present
        :return True/False
        """
        _lvectl = LveCtl()
        _panel_uids_list = _lvectl.get_panel_users_uid_list()
        _lvectl._load_info(False)
        for uid in _panel_uids_list:
            if uid == 0:
                # skip defaults
                continue
            limits = _lvectl.get_limits_by_user_id(uid)
            # limits example:
            # {'cpu': {'all': '100'}, 'io': {'all': '1024'}, 'vmem': '0', 'ep': '20', 'pmem': '*268435456',
            # 'nproc': '100', 'iops': '*4096'}
            try:
                # Extract PMEM limit
                pmem_limit_bytes = int(limits['pmem'].replace('*', ''))
                if pmem_limit_bytes != 0 and pmem_limit_bytes < 512 * 1024 * 1024:
                    # PMEM limit not unlimited and < 512 MB - error
                    return True
            except ValueError:
                pass
        return False

Youez - 2016 - github.com/yon3zu
LinuXploit