Failed to save the file to the "xx" directory.

Failed to save the file to the "ll" directory.

Failed to save the file to the "mm" directory.

Failed to save the file to the "wp" directory.

403WebShell
403Webshell
Server IP : 66.29.132.124  /  Your IP : 3.139.235.99
Web Server : LiteSpeed
System : Linux business141.web-hosting.com 4.18.0-553.lve.el8.x86_64 #1 SMP Mon May 27 15:27:34 UTC 2024 x86_64
User : wavevlvu ( 1524)
PHP Version : 7.4.33
Disable Function : NONE
MySQL : OFF  |  cURL : ON  |  WGET : ON  |  Perl : ON  |  Python : ON  |  Sudo : OFF  |  Pkexec : OFF
Directory :  /opt/cloudlinux/venv/lib/python3.11/site-packages/clquota/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ Back ]     

Current File : /opt/cloudlinux/venv/lib/python3.11/site-packages/clquota/__init__.py
#!/opt/cloudlinux/venv/bin/python3 -bb
# -*- coding: utf-8 -*-

# Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2019 All Rights Reserved
#
# Licensed under CLOUD LINUX LICENSE AGREEMENT
# http://cloudlinux.com/docs/LICENSE.TXT

import configparser as ConfigParser
import csv
import fcntl
import os
import pwd
import re
import tempfile
from collections import defaultdict
from stat import S_IRGRP, S_IROTH, S_IRUSR, S_IWUSR, ST_DEV
from typing import Dict, List, Optional, Tuple

import clcontrollib
import cldetectlib
from clcommon import FormattedException
# pylint: enable=E0611
from clcommon.clpwd import ClPwd
from clcommon.clquota import check_quota_enabled
from clcommon.cpapi import admin_packages, list_users, resellers_packages
from clcommon.cpapi.cpapiexceptions import CPAPIExternalProgramFailed, EncodingError
from clcommon.utils import (
    ExternalProgramFailed,
    get_file_lines,
    get_filesystem_type,
    run_command,
    write_file_lines,
)


IS_DA = clcontrollib.detect.is_da()
DEFAULT_PACKAGE = 'default'
VE_DEFAULT_PACKAGE = 'VE_DEFAULT'  # virtual package, alias for uid=0


class NoSuchPackageException(Exception):
    def __init__(self, package):
        Exception.__init__(self, "No such package (%s)" % (package,))


class NoSuchUserException(Exception):
    def __init__(self, user):
        Exception.__init__(self, "No such user (%s)" % (user,))


class InsufficientPrivilegesException(Exception):
    def __init__(self):
        Exception.__init__(self, "Insufficient privileges")


class IncorrectLimitFormatException(Exception):
    def __init__(self, limit):
        Exception.__init__(self, "Incorrect limit format (%s)" %(limit,))


class MalformedConfigException(FormattedException):
    """
    Raised when config files is malformed and
    cl-quota is not able to work with it
    """
    def __init__(self, error: ConfigParser.ParsingError):
        super(MalformedConfigException, self).__init__({
            'message':
                "cl-quota can't work because for malformed config. "
                "Please, contact CloudLinux support if you "
                "need help with resolving this issue. "
                "Details: %(error_message)s",
            'context': dict(
                error_message=str(error)
            )
        })


class GeneralException(Exception):
    def __init__(self, message):
        Exception.__init__(self, message)


class QuotaDisabledException(Exception):
    def __init__(self):
        super(QuotaDisabledException, self).__init__('Quota disabled for all users on server')


class UserQuotaDisabledException(QuotaDisabledException):
    """
    Raised when quota is disabled for one particular user
    """
    def __init__(self, uid=None, homedir=None, message=None):
        all_msg = 'Quota disabled'
        if uid:
            all_msg += ' for user id %s' % uid
        if homedir:
            all_msg += ' (home directory %s)' % homedir
        if message:
            all_msg += '; %s' % message
        Exception.__init__(self, all_msg)


def _is_sys_path(path):
    """
    >>> _is_sys_path('/home/username')
    False
    >>> _is_sys_path('/var/davecot')
    True
    """
    if path[-1] != '/':
        path += '/'
    sys_path_ = ('/root/', '/usr/', '/var/', '/sbin/', '/dev/', '/bin/', '/srv/', '/sys/', '/etc/ntp/')
    if path == '/':
        return True
    for path_ in sys_path_:
        if path.startswith(path_):
            return True


def _get_users_list():
    """
    Return no system users uid list
    """

    cl_pwd = ClPwd()
    pw_dict = cl_pwd.get_user_dict()
    users_uid = [pw_dict[usr].pw_uid for usr in pw_dict if not _is_sys_path(pw_dict[usr].pw_dir)]

    return users_uid


def is_quota_inheritance_enabled() -> bool:
    """
    Check `cl_quota_inodes_inheritance` parameter in the config file
    """
    res = cldetectlib.get_boolean_param(cldetectlib.CL_CONFIG_FILE, 'cl_quota_inodes_inheritance', default_val=False)
    return res


class QuotaWrapper(object):
    """
    Base quota class for inode quotas handling

    * Update system quotas via setquota
    * Retrieves system quotas via repquota
    * Stores current quotas in /etc/container/cl-quotas.dat file
    * Maintaines /etc/container/cl-quotas.cache file with resolved quotas. That file can be read by non-privileged users
    """
    PROC_MOUNTS = '/proc/mounts'
    QUOTASYNC  = '/usr/bin/quotasync'
    SETQUOTA  = '/usr/sbin/setquota'
    REPQUOTA  = '/usr/sbin/repquota'
    GETPACKS  = '/usr/bin/getcontrolpaneluserspackages'
    DATAFILE  = '/etc/container/cl-quotas.dat'
    CACHEFILE = '/etc/container/cl-quotas.cache'
    # File lock variables
    LOCK_FD = None
    LOCK_FILE = DATAFILE + '.lock'
    LOCK_WRITE = False

    def __init__(self):
        self._assert_file_exists(QuotaWrapper.PROC_MOUNTS)
        self._assert_file_exists(QuotaWrapper.REPQUOTA)
        self._assert_file_exists(QuotaWrapper.SETQUOTA)

        self._quota_enabled_list = list()
        self._panel_present = None
        self._grace = {}
        self._quota = {}
        self._device_quota = {}
        self._package_to_uids_map = {}  # type: Dict[str, List[str]]
        self._uid_to_packages_map = {}  # type: Dict[str, List[str]]
        self._uid_to_homedir_map = {}  # type: Dict[str, str]
        self._dh = self._get_saved_data_handler()
        self._fields = ['bytes_used', 'bytes_soft', 'bytes_hard', 'inodes_used', 'inodes_soft', 'inodes_hard']
        self._euid = os.geteuid()
        self._devices = self._load_quota_devices()
        self._mountpoint_device_mapped = self._get_mountpoint_device_map(self._devices)
        self._device_user_map = None
        # List of all packages (all admin's packages + all reseller packages)
        self._all_package_list = None

    @staticmethod
    def _assert_file_exists(path):
        """
        Checks if command is present and exits if no
        """
        if not os.path.exists(path):
            raise RuntimeError('No such command (%s)' % (path,))

    def __enter__(self):
        return self

    def __exit__(self, _type, _value, _traceback):
        if self.LOCK_FD is not None:
            self.LOCK_FD.close()

    def get_user_limits(self, uid):
        '''
        Returns user limits converted to tuples
        '''
        return self._convert_data_to_tuples(self._get_current_quotas(uid))

    def get_all_users_limits(self):
        '''
        Returns all user limits converted to tuples
        '''
        return self._convert_data_to_tuples(self._get_current_quotas())

    def get_package_limits(self, package):
        """
        :param packname: Package name for get limits. If None, returns all packages,
                          else - only supplied package
        Returns package limits converted to tuples (called only from main)
        """
        return self._convert_data_to_tuples(self._get_package_quotas(packname=package))

    def get_all_packages_limits(self, package=None):
        """
        Returns all packages limits converted to tuples (called only from main)
        """
        return self._convert_data_to_tuples(self._get_package_quotas(packname=package, all_packages=True))

    def _preprocess_limit(self, limit):
        """
        Preprocessed passed limit: 'default' --> '0', 'unlimited' --> -1, else calls _check_limit
        :param limit:
        :return:
        """
        if limit == 'default':
            return '0'
        if limit in ('unlimited', '-1'):
            return '-1'
        return self._check_limit(limit)

    def _get_package_from_dh(self, package):
        return self._dh.get('packages', package).split(':')

    def _get_all_packages_with_limits(self, clean_dead_packages=False):
        """
        Retrive all available packages with their limits
        :param clean_dead_packages: if True - remove all nonexistent packages from cl-quotas.dat
        :return: Dictionary: { 'package_name': (soft_limit, hard_limit) }
        """

        # result dictionary
        package_limits_dict = {}
        # Load packages limits from cl-quota.dat
        db_packages = {}
        if self._dh.has_section('packages') and len(self._dh.items('packages')) > 0:
            list_of_packages = self._get_all_package_list()
            for package in self._dh.options('packages'):
                if clean_dead_packages and package not in list_of_packages:
                    self._dh.remove_option('packages', package)
                    continue
                package_limits = self._get_package_from_dh(package)
                # Pass package, if limits not well-formed
                if len(package_limits) != 2:
                    continue
                db_packages[package] = package_limits[0], package_limits[1]
            if clean_dead_packages:
                self._write_data()
        # Put all panel packages to result dictionary
        self._get_package_to_users_map()
        for package in self._package_to_uids_map.keys():
            if package in db_packages:
                # if package present in cl-quota.dat, take limits
                package_limits_dict[package] = db_packages[package]
            else:
                package_limits_dict[package] = ('0', '0')
        return package_limits_dict

    def set_user_limit(
            self,
            uid: str,
            soft: Optional[str] = None,
            hard: Optional[str] = None,
            save: bool = True,
            force_save: bool = False,
            only_store: bool = False,
    ):
        """
        Set limits for users

        * Resolve limits according to those saved in cl-quota.dat
          Limits are resolved in the following order:
            user limits --> package limits --> root user (uid=0) limits
        * Apply new limits when resolved limits differ from those in cache or when `force_save` is True
        * Write updated values to cl-quota.dat if `save` is True
          Always update cl-quota.dat in case both zeroes or both unlimited are provided

        :param uid: user id
        :param soft: soft limit value
        :param hard: hard limit value
        :param save: save limits to cl-quota.dat
        :param force_save: save limits to cl-quota.dat even if they are not changed
        :param only_store: store limits in memory, but do not apply them

        :return: None
        """
        self._check_admin()

        # Validate limits value, convert 'unlimited' --> '-1', 'default' --> '0'
        soft_validated, hard_validated = self._preprocess_limit(soft), self._preprocess_limit(hard)

        # Derive limits from cl-quota.dat according to the limits inheritance rules
        soft_resolved, hard_resolved = self._combine_user_limits(uid=uid, soft=soft_validated, hard=hard_validated)

        # Convert limit to format acceptable by setquota utility
        soft_converted, hard_converted = self._convert_for_sys_utility(soft=soft_resolved, hard=hard_resolved)

        # Get data from repquota utility
        cached = self._get_current_quotas(uid)[uid]

        # Run cmd only if quota changed or force_save is True
        # If force_save is True it equals to --save-all-paramters in cloudlinux-limits
        if (soft_converted, hard_converted) != (cached["inodes_soft"], cached["inodes_hard"]) or force_save:
            # Don't apply limits to root user
            if uid != '0':
                device = self._get_home_device(self._fetch_homedir(uid))
                if only_store:
                    stdin = f'{uid} {cached["bytes_soft"]} {cached["bytes_hard"]} {soft_converted} {hard_converted}\n'
                    self._device_quota[device] = self._device_quota.get(device, '') + stdin
                else:
                    cmd = [
                        QuotaWrapper.SETQUOTA,
                        '-u', uid, cached['bytes_soft'], cached['bytes_hard'], soft_converted, hard_converted, device,
                    ]
                    run_command(cmd)
                    self._sync_quota_files(device)

            if save:
                soft_user_dat, hard_user_dat = self._get_user_limits_to_save(
                    uid, soft_validated, hard_validated, force_save=force_save)
                self._save_user_limits(uid, soft_user_dat, hard_user_dat)

        # Always update cl-quota.dat in case both zeroes or both unlimited are provided
        if (soft_validated == '0' and hard_validated == '0') or (soft_validated == '-1' and hard_validated == '-1'):
            self._save_user_limits(uid, soft_validated, hard_validated)

        if uid == '0':
            self._apply_all_limits()

    def set_package_limit(
            self,
            package: str,
            soft: Optional[str] = None,
            hard: Optional[str] = None,
            save: bool = True,
            only_store: bool = False
    ) -> None:
        """
        Sets limits for package

        :param package: package name
        :param soft: soft limit value
        :param hard: hard limit value
        :param save: save limits to cl-quota.dat
        :param only_store: store limits in memory, but do not apply them

        :return: None
        """

        self._check_admin()

        # Validate limits value, convert 'unlimited' --> '-1', 'default' --> '0'
        soft_validated, hard_validated = self._preprocess_limit(soft), self._preprocess_limit(hard)

        # Set limits for empty reseller package
        if save is True \
                and package in self._get_package_quotas(all_packages=True) \
                and package not in self._get_package_to_users_map():
            # Drive limits from cl-quota.dat according to the limits inheritance rules
            soft_resolved, hard_resolved = self._get_saved_package_limits_if_none(
                package, soft_validated, hard_validated)
            self._save_package_limits(package, soft_resolved, hard_resolved)
            return

        if not self._check_package_exists(package):
            return

        # Example: {'/dev/sda1': ['502', '504', '515', '521', '501']}
        device_user_map = self._get_device_user_map()
        cached_quotas = self._get_current_quotas()
        for device in device_user_map.keys():
            std_in = []
            for uid in self._get_package_to_users_map(package):
                if uid not in device_user_map[device]:
                    continue

                soft_resolved, hard_resolved = self._combine_package_limits(
                    uid=uid, package=package, soft=soft_validated, hard=hard_validated)

                soft_converted, hard_converted = self._convert_for_sys_utility(soft=soft_resolved, hard=hard_resolved)
                try:
                    soft_cached, hard_cached = cached_quotas[uid]['inodes_soft'], cached_quotas[uid]['inodes_hard']
                    if (soft_converted, hard_converted) != (soft_cached, hard_cached):
                        std_in.append(
                            f'{uid} '
                            f'{cached_quotas[uid]["bytes_soft"]} {cached_quotas[uid]["bytes_hard"]} '
                            f'{soft_converted} {hard_converted}'
                        )
                except KeyError:
                    pass  # skip error when quota is on but not configured

            if len(std_in) != 0:
                std_in = ('\n'.join(std_in) + '\n')
                self._device_quota[device] = self._device_quota.get(device, '') + std_in

        if save:
            soft_data, hard_data = self._get_package_limits_to_save(package, soft_validated, hard_validated)
            self._save_package_limits(package, soft_data, hard_data)

        if not only_store:
            self._flush_device_quota()

    def synchronize(self):
        """
        Read limits from file and applies them to packages and users
        """
        self._check_admin()

        package_limits = self._get_all_packages_with_limits(clean_dead_packages=True)
        for package, (soft, hard) in package_limits.items():
            self.set_package_limit(package, soft, hard, save=False, only_store=True)

        self._remove_unexisting_users()
        self._flush_device_quota()

    def save_user_cache(self):
        """
        Caches the limits to non-privileged user to see them
        """
        self._check_admin()

        # get data from repquota utility
        current_quotas = self._get_current_quotas()

        # form 2d array for writing to file
        cache_content = [
            [k] + [current_quotas[k][field] for field in self._fields]
            for k in sorted(current_quotas.keys(), key=int)
        ]

        self._get_global_lock(True)
        file_handler = self._prepare_writer(QuotaWrapper.CACHEFILE)
        csv_out = csv.writer(file_handler, quoting=csv.QUOTE_MINIMAL)
        csv_out.writerows(cache_content)
        self._end_writer(QuotaWrapper.CACHEFILE)
        self._release_lock()

    def _apply_all_limits(self, skip_root: bool = True):
        """Set limits for all users. Skip root user if skip_root is True"""
        for uid in self._get_uid_to_packages_map().keys():
            if uid == '0' and skip_root:
                continue
            self.set_user_limit(uid, soft=None, hard=None, save=False, only_store=True)
        self._flush_device_quota()

    def _flush_device_quota(self):
        """Write all device quotas to disk"""
        quotas_written: bool = False

        for device in self._device_quota.keys():
            cmd = [QuotaWrapper.SETQUOTA, '-bu', device]
            run_command(cmd, std_in=self._device_quota[device])
            quotas_written = True

        if quotas_written:
            self._sync_quota_files()

        self._device_quota = {}

    def _sync_quota_files(self, device: str | None = None):
        """
        In order to sync inodes limits in kernel with limits in user.quota file run `quotasync` command.
        Otherwise `repquota` called right after `setquota` may return old limits existed before `setquota` call.

        Skipped on the XFS filesystem because XFS does not require a separate quota synchronization step
        as it handles these operations in real-time. Additionally, the specific functionality required by `quotasync`
        to sync the disk quota information is not implemented, resulting in an error.
        """
        if device is not None:
            fs_type = get_filesystem_type(device)
            if fs_type.lower() == 'xfs':
                return
            cmd = [QuotaWrapper.QUOTASYNC, device]
        else:
            cmd = [QuotaWrapper.QUOTASYNC, '-a']
        run_command(cmd)

    def _remove_unexisting_users(self):
        """Remove all records from cl-quota.dat for users which do not exist"""
        if self._dh.has_section('users'):
            for uid in self._dh.options('users'):
                try:
                    # Check user presence
                    self._fetch_homedir(uid)
                except NoSuchUserException:
                    self._dh.remove_option('users', uid)
            self._write_data()

    def _check_package_exists(self, package):
        """Check whether package exists"""
        try:
            self._get_package_to_users_map(package)
        except NoSuchPackageException:
            return False
        else:
            return True

    def _get_user_limits_to_save(
            self,
            uid: str,
            soft_validated: Optional[str],
            hard_validated: Optional[str],
            force_save: bool = False,
    ) -> Tuple[Optional[str], Optional[str]]:
        """
        Derive package limit values to save to cl-quota.dat
        If None passed as limit to method, then replace it by the user's value from cl-quota.dat
        Update cl-quota.dat only if the derivation result changes
        """
        soft_user_dat, hard_user_dat = self._get_user_limits(uid=uid)

        soft_resolved, hard_resolved = self._combine_user_limits(uid=uid, soft=soft_validated, hard=hard_validated)
        soft_none_resolved, hard_none_resolved = self._combine_user_limits(uid=uid, soft=None, hard=None)

        if soft_resolved != soft_none_resolved or (force_save and soft_validated is not None):
            soft_user_dat = soft_validated
        if hard_resolved != hard_none_resolved or (force_save and soft_validated is not None):
            hard_user_dat = hard_validated

        return soft_user_dat, hard_user_dat

    def _get_package_limits_to_save(
            self, package: str, soft_validated: Optional[str], hard_validated: Optional[str]) -> Tuple[str, str]:
        """Derive package limit values to save to cl-quota.dat"""
        p_soft, p_hard = self._get_package_limits(package=package)

        # If new value is provided, then update package limit
        p_soft = soft_validated if soft_validated is not None else p_soft
        p_hard = hard_validated if hard_validated is not None else p_hard
        return p_soft, p_hard

    def _check_present_panel(self):
        """
        Return True if control panel present
        """
        if self._panel_present is None:
            self._panel_present = 'Unknown' != run_command(['/usr/bin/cldetect', '--detect-cp-nameonly']).rstrip()
        return self._panel_present

    def _check_admin(self):
        '''
        Raise exception if no admin user
        '''
        if self._euid != 0:
            raise InsufficientPrivilegesException()

    def _get_saved_data_handler(self) -> ConfigParser.ConfigParser:
        '''
        Gets ConfigParser handler for future use
        Loads saved quotas from /etc/container/cl-quotas.dat file
        '''
        self._get_global_lock(True)
        dh = ConfigParser.ConfigParser(interpolation=None, strict=False)
        dh.optionxform = str
        try:
            dh.read(QuotaWrapper.DATAFILE)
        except ConfigParser.ParsingError as e:
            raise MalformedConfigException(e)
        finally:
            self._release_lock()
        return dh

    def _get_device_user_map(self):
        """
        Returns dictionary mapping devices to lists of users
        """
        if self._device_user_map is not None:
            return self._device_user_map
        devices_map = {}
        device_user_pairs = []
        for uid in self._get_list_of_uids():
            try:
                device_user_pairs.append((self._get_home_device(self._fetch_homedir(uid)), uid))
            except KeyError:
                continue
        for pair in device_user_pairs:
            if pair[0] not in devices_map:
                devices_map[pair[0]] = []
            devices_map[pair[0]].append(pair[1])
        self._device_user_map = devices_map
        return self._device_user_map

    def _check_limit(self, limit: Optional[str]) -> Optional[str]:
        if limit is None or limit == '-1':
            return limit
        limit_pattern = re.compile(r'(\d+)')
        pattern_match = limit_pattern.search(limit)
        if not pattern_match:
            raise IncorrectLimitFormatException(limit)
        return pattern_match.group(1)

    def _combine_user_limits(self, uid: str, soft: Optional[str] = None, hard: Optional[str] = None) -> Tuple[str, str]:
        """
        Determines user limits by resolving them according to the limits inheritance rule:
            user limits ---(overridden by provided as method param if provided not None)--->
                ---> package limits ---> root user (uid=0) limits

        uid: user id
        soft: Optional[str] = None: limit value, can be:
            * None -- value not passed
            * "-1" -- unlimited
            * "0" -- default (next value from hierarchy should be considered)
            * "1", "2", ... -- precice limit values
        hard: soft: Optional[str] = None: limit value, values range the same as for soft

        return: Tuple[str, str]: (soft_limit, hard_limit), they can be:
            * "-1" -- unlimited
            * "1", "2", ... -- precice limit values
        """
        if uid == '0':
            soft, hard = self._get_user_limits_override_none(uid=uid, soft=soft, hard=hard)
            soft, hard = (soft if soft != '0' else '-1', hard if hard != '0' else '-1')
            return soft, hard

        soft, hard = self._get_user_limits_override_none(uid=uid, soft=soft, hard=hard)

        for package in self._get_uid_to_packages_map(uid):
            soft, hard = self._get_package_limits_override_default(package=package, soft=soft, hard=hard)

        soft, hard = self._get_user_limits_override_default(uid='0', soft=soft, hard=hard)
        soft, hard = (soft if soft != '0' else '-1', hard if hard != '0' else '-1')
        return soft, hard

    def _get_user_limits_override_none(self, uid: str, soft: Optional[str] = None, hard: Optional[str] = None) -> Tuple[str, str]:
        """Get user limits from cl-quota.dat. If limit is None, then override it by user's limit

        :param str uid: user id
        :param Optional[str] soft: limit value, can be:
            * None -- value not passed, should be overridden by user's limit
            * "-1" -- unlimited
            * "0" -- default (next value from hierarchy should be taken)
            * "1", "2", ... -- values
        :param Optional[str] hard: limit values same as for soft
        :return Tuple[str, str]: derived limits
        """
        user_soft, user_hard = self._get_user_limits(uid=uid)
        # Override by passed limits values
        soft = self._check_limit(limit=soft if soft is not None else user_soft)
        hard = self._check_limit(limit=hard if hard is not None else user_hard)
        return soft, hard

    def _get_user_limits_override_default(self, uid: str, soft: str, hard: str) -> Tuple[str, str]:
        """Get user limits from cl-quota.dat. If limit is default, then override it by user's limit

        :param str uid: user id
        :param Optional[str] soft: limit value, can be:
            * "-1" -- unlimited
            * "0" -- default (next value from hierarchy should be taken)
            * "1", "2", ... -- values
        :param Optional[str] hard: limit values same as for soft
        :return Tuple[str, str]: derived limits
        """
        user_soft, user_hard = self._get_user_limits(uid=uid)
        # Override by passed limits values
        soft = self._check_limit(limit=soft if soft != '0' else user_soft)
        hard = self._check_limit(limit=hard if hard != '0' else user_hard)
        return soft, hard

    def _get_user_limits(self, uid: str) -> Tuple[str, str]:
        """Try to get user's limits from cl-quota.dat"""
        try:
            user_soft, user_hard = self._dh.get('users', uid).split(':')
        except (ConfigParser.NoSectionError, ConfigParser.NoOptionError):
            user_soft, user_hard = '0', '0'
        return user_soft, user_hard

    def _get_package_limits_override_default(self, package: str, soft: str, hard: str) -> Tuple[str, str]:
        """Get package limits from cl-quota.dat. If passed limit is default, then override it by package's limit"""
        pack_soft, pack_hard = self._get_package_limits(package=package)
        soft = self._check_limit(limit=pack_soft if soft == '0' else soft)
        hard = self._check_limit(limit=pack_hard if hard == '0' else hard)
        return soft, hard

    def _get_package_limits(self, package: str) -> Tuple[str, str]:
        """Try to get package's limits from cl-quota.dat"""
        try:
            soft, hard = self._get_package_from_dh(package)
        except (ConfigParser.NoSectionError, ConfigParser.NoOptionError):
            soft, hard = '0', '0'
        return soft, hard

    @staticmethod
    def _convert_for_sys_utility(soft: Optional[str], hard: Optional[str]) -> Tuple[Optional[str], Optional[str]]:
        """Converts limits for setquota utility which threats 0 as unlimited"""
        if soft == '-1':
            soft = '0'
        if hard == '-1':
            hard = '0'
        return soft, hard

    def _combine_package_limits(
            self, package: str, uid: str, soft: Optional[str], hard: Optional[str]) -> Tuple[str, str]:
        """
        Determines package limits taking into account saved user and default ones
        """
        u_soft, u_hard = self._get_user_limits(uid=uid)

        if IS_DA and is_quota_inheritance_enabled():
            # Check the real user's package and save his quotas (instead of setting `DEFAULT` package ones)
            # This is only DA's specificall
            da_real_package = self._get_da_real_package(uid=uid)
            if da_real_package != package:
                da_real_quotas = self._get_package_quotas(packname=da_real_package, all_packages=True)
                soft = p_soft = da_real_quotas[da_real_package]['inodes_soft']
                hard = p_hard = da_real_quotas[da_real_package]['inodes_hard']
            else:
                p_soft, p_hard = self._get_package_limits(package=package)
        else:
            p_soft, p_hard = self._get_package_limits(package=package)

        # Combine package limits with new package limits
        # If user limit is non-default, take it, otherwise use package limit
        # Override package limit by the new one if it's possible
        soft = u_soft if u_soft != '0' else (soft if soft is not None else p_soft)
        hard = u_hard if u_hard != '0' else (hard if hard is not None else p_hard)

        # If package limits absent, use default limits
        soft, hard = self._get_user_limits_override_default(uid='0', soft=soft, hard=hard)
        soft, hard = (soft if soft != '0' else '-1', hard if hard != '0' else '-1')
        return soft, hard

    def _get_saved_package_limits_if_none(self, package, soft=None, hard=None):
        """
        Applies saved package limits if none has been passed
        """
        try:
            pack_soft, pack_hard = self._get_package_from_dh(package)
            if soft is None and pack_soft != '0':
                soft = pack_soft
            if hard is None and pack_hard != '0':
                hard = pack_hard
        except (ConfigParser.NoSectionError, ConfigParser.NoOptionError):
            pass
        soft = self._check_limit(soft)
        hard = self._check_limit(hard)
        return soft, hard

    def _get_da_real_package(self, uid: str) -> str:
        """
        Get real package name for DA user

        :param str uid: user id
        :return str: retrieved package name
        """
        username = ClPwd().get_names(int(uid))[0]
        return clcontrollib.DirectAdmin()._get_user_package(username)

    def _get_current_quotas(self, uid=None):
        """
        Retrieves current quotas.
        If euid == 0, use data from repquota utility, else from /etc/container/cl-quotas.cache file
        """
        if self._euid != 0:
            return self._load_user_cache()
        if not self._quota:
            # Retrieves quotas from repquota utility
            self._quota = self._load_current_quotas()
        if uid:
            try:
                return {uid: self._quota[uid]}
            except KeyError:
                self._check_if_quota_enabled(uid)
                raise NoSuchUserException(uid)
        return self._quota

    def _get_package_quotas(self, packname=None, all_packages=False):
        """
        Prepares package limits data for outputting
        (call only from get_package_limits/get_all_packages_limits - main)

        :param packname: Package name for get limits. If present, function returns
                            limits only for this package, else - all packages
        :param all_packages: If False reads only used and admin's packages, True - all packages
                                (including reseller packages without users)
        :return Dictionary of package limits:
            {package_name: {'inodes_used': 'xxx', 'inodes_soft': 'yyy', 'inodes_hard': 'zzz'}
        """
        q = {}
        if all_packages:
            # Get list of all packages
            list_of_packages = self._get_all_package_list()
        else:
            # Get list of used packages + all admin's packages
            list_of_packages = self._get_list_of_packages()
        for package in list_of_packages:
            values = ['-']
            try:
                if package == VE_DEFAULT_PACKAGE:
                    # Because "VE_DEFAULT" package is not a real package and just
                    # uses limits from LVE == 0 we should read it's limits
                    # from there
                    soft, hard = self._dh.get('users', '0').split(':')
                else:
                    soft, hard = self._dh.get('packages', package).split(':')
                soft = self._check_limit(soft)
                hard = self._check_limit(hard)
                if soft == '-1':
                    soft = '-'
                if hard == '-1':
                    hard = '-'
                values.extend([soft, hard])
            except (ConfigParser.NoSectionError, ConfigParser.NoOptionError):
                values.extend(['0', '0'])
            q.update(self._populate(package, values))
        if packname:
            try:
                return {packname: q[packname]}
            except KeyError:
                raise NoSuchPackageException(packname)
        return q

    def _populate(self, item, data):
        return {item: dict(list(map((lambda x: (x[1], data[x[0]])), enumerate(self._fields[3:]))))}

    def _get_list_of_packages(self):
        return list(self._get_package_to_users_map().keys())

    def _get_list_of_uids(self):
        return list(self._get_uid_to_packages_map().keys())

    def _get_package_to_users_map(self, package=None):
        if not self._package_to_uids_map:
            self._package_to_uids_map = self._load_package_uids_data()
        if package:
            try:
                return self._package_to_uids_map[package]
            except KeyError:
                raise NoSuchPackageException(package)
        return self._package_to_uids_map

    def _check_if_quota_enabled(self, uid):
        if uid in self._quota_enabled_list:
            return
        home_dir = self._fetch_homedir(uid)
        quota_disabled_message = check_quota_enabled(path=home_dir)
        if quota_disabled_message:
            raise UserQuotaDisabledException(uid=uid, homedir=home_dir, message=quota_disabled_message)
        else:
            self._quota_enabled_list.append(uid)

    def _get_uid_to_packages_map(self, uid=None):
        if not self._uid_to_packages_map:
            self._package_to_uids_map = self._load_package_uids_data()
        if uid:
            try:
                return self._uid_to_packages_map[uid]
            except KeyError:
                raise NoSuchUserException(uid)
        return self._uid_to_packages_map

    def _get_packages_uids_from_cpapi(self) -> Dict[str, List[str]]:
        """
        Retrieve package-uids map from cpapi. Only for custom panels. See LU-610 for details.
        Null packages coming from cpapi are considered to be 'default' package.

        :return: Dictionary with data.
        Example response:
            {'default': ['1038', '1043', '1046'], 'res1_pack1': ['1044'], 'pack1': ['1042']}
        Coorresponding self._uid_to_packages_map value:
            {'1038': ['default'], '1042': ['pack1'], '1043': ['default'], '1044': ['res1_pack1'], '1046': ['default']}
        """

        try:
            users_packages = list_users()
        except (OSError, CPAPIExternalProgramFailed, EncodingError) as e:
            raise ExternalProgramFailed('%s. Can not get users' % (str(e)))

        # Example of users_packages:
        #  {1000: {'reseller': 'root', 'package': 'Package1'},
        #   1001: {'reseller': 'res1', 'package': 'BusinessPackage'},
        #   1002: {'reseller': 'root', 'package': None}}

        packages_users = defaultdict(list)  # type: Dict[str, List[str]]
        self._uid_to_packages_map = defaultdict(list)  # type: Dict[str, List[str]]

        for uid, uid_data in users_packages.items():
            s_uid = str(uid)
            package = uid_data['package'] if uid_data['package'] is not None else VE_DEFAULT_PACKAGE
            packages_users[package].append(s_uid)
            self._uid_to_packages_map[s_uid].append(package)

        try:
            admin_pkgs = admin_packages(raise_exc=True)
        except (OSError, CPAPIExternalProgramFailed) as e:
            raise ExternalProgramFailed('%s. Can not get admin packages' % (str(e)))

        for package in admin_pkgs:
            packages_users.setdefault(package if package is not None else VE_DEFAULT_PACKAGE, [])

        packages_users.setdefault(VE_DEFAULT_PACKAGE, [])

        return packages_users

    def _load_package_uids_data(self) -> Dict[str, List[str]]:
        """
        Gets map of packages and users
        :rtype dict
        :return Dictionary with data. Example:
            {'default': ['1038', '1043', '1046'], 'res1_pack1': ['1044'], 'pack1': ['1042']}
        """
        packages = {}
        if self._euid != 0:
            return packages
        # if packages not supported all user has 'VE_DEFAULT' package
        if not self._check_present_panel():
            packages[VE_DEFAULT_PACKAGE] = list(map(str, _get_users_list()))
            self._uid_to_packages_map = {i: [VE_DEFAULT_PACKAGE] for i in packages[VE_DEFAULT_PACKAGE]}
            return packages

        return self._get_packages_uids_from_cpapi()

    def _get_all_package_list(self):
        """
        Retrives all (root and resellers) panel package list
        :return: List of package names
        """
        # If list already loaded - do nothing
        if self._all_package_list:
            return self._all_package_list
        try:
            self._all_package_list = []
            list_admin_packages = admin_packages(raise_exc=True)
            for package in list_admin_packages:
                self._all_package_list.append(package)
        except (OSError, CPAPIExternalProgramFailed) as e:
            raise ExternalProgramFailed('%s. Can not get admin packages' % (str(e)))
        try:
            dict_resellers_packages = resellers_packages(raise_exc=True)
            for packages_list in dict_resellers_packages.values():
                for package in packages_list:
                    self._all_package_list.append(package)
        except (OSError, CPAPIExternalProgramFailed) as e:
            raise ExternalProgramFailed('%s. Can not get reseller packages' % (str(e)))
        # Add 'VE_DEFAULT' package to list
        if VE_DEFAULT_PACKAGE not in self._all_package_list:
            self._all_package_list.append(VE_DEFAULT_PACKAGE)
        return self._all_package_list

    def _convert_data_to_tuples(self, data):
        '''
        Convert dict to tuples for passing to printing routines
        '''
        for key in data.keys():
            try:
                entry = tuple(map((lambda x: (x, data[key][x])), self._fields[3:]))
                data[key] = entry
            except KeyError:
                continue
        return data

    def _load_current_quotas(self):
        """
        Gets current quota settings from repqouta utility for further processing
        """
        q = {}
        device = None
        devices = self._devices
        cmd = [QuotaWrapper.REPQUOTA, '-una']
        data = run_command(cmd)
        grace_regex_pattern = re.compile(r'(block|inode)\sgrace\stime:?\s(\d[\w:]+)(?:;|$|\s)', re.IGNORECASE)
        for line in data.splitlines():
            if line.startswith('#'):
                if not device:
                    continue
                parts = line.split()
                if len(parts) != 8:
                    parts = self._remove_redundant_fields_from_input(parts)
                uid = parts[0][1:]
                if uid == '0':  # We do not want to limit root :)
                    continue
                try:
                    if device not in devices:
                        device = self._find_unknown_device(device)
                    if device in devices and self._is_home_device(self._fetch_homedir(uid), device):
                        q[uid] = dict(list(map((lambda x: (self._fields[x[0]], x[1])), enumerate(parts[2:]))))
                except (KeyError, IndexError, NoSuchUserException):
                    continue
            elif line.startswith('***'):
                device = line[line.find('/dev'):].strip()
            elif 'grace' in line:
                found = grace_regex_pattern.findall(line)
                if found:
                    self._grace.update(dict(list(map((lambda x: (x[0].lower(), x[1])), found))))
        q.update(self._add_default())
        return q

    def _remove_redundant_fields_from_input(self, parts):
        stripped_parts = parts[:2]
        is_digit_pattern = re.compile(r'^\d+$')
        stripped_parts.extend(
            [field for field in parts[2:] if is_digit_pattern.search(field)])
        return stripped_parts

    def _fetch_homedir(self, uid):
        if len(self._uid_to_homedir_map) == 0:
            self._uid_to_homedir_map.update({str(entry.pw_uid): entry.pw_dir for entry in pwd.getpwall()})
        try:
            return self._uid_to_homedir_map[uid]
        except KeyError:
            raise NoSuchUserException(uid)

    def _load_quota_devices(self):
        """
        Gets mounted filesystems list and picks ones with quota on

        Example of returned data structure:
            {'/dev/mapper/VolGroup-lv_root': [
                {'mountpoint': '/', 'quota_file': 'quota.user', 'quota_type': 'vfsv0'},
                {'mountpoint': '/var', 'quota_file': 'quota.user', 'quota_type': 'vfsv0'}
                ],
            '/dev/mapper/VolGroup-lv_root2': [
                {'mountpoint': '/', 'quota_file': 'quota.user', 'quota_type': 'vfsv0'},
                {'mountpoint': '/var', 'quota_file': 'quota.user', 'quota_type': 'vfsv0'}
            ]
            }
        """
        devices = {}  # type: Dict[str, List[Dict[str, str]]]
        proc_mounts_stream = open(QuotaWrapper.PROC_MOUNTS)
        split_patt = re.compile(r' |,')
        for line in proc_mounts_stream:
            if line.startswith('rootfs /'):
                continue
            line_splited = split_patt.split(line)
            device = line_splited[0]
            mountpoint_data = {'mountpoint':  line_splited[1]}
            for line_splited_element in line_splited:
                if line_splited_element.startswith('usrquota=') or line_splited_element.startswith('usruota='):
                    mountpoint_data['quota_file'] = line_splited_element.split('=')[1]
                elif line_splited_element.startswith('jqfmt='):
                    mountpoint_data['quota_type'] = line_splited_element.split('=')[1]
            if device in devices:
                devices[device].append(mountpoint_data)
            else:
                devices[device] = [mountpoint_data]
        proc_mounts_stream.close()
        if len(devices) == 0:
            # TODO: this only can happen when system HAS NO MOUNTS AT ALL
            raise QuotaDisabledException()
        return devices

    def _load_user_cache(self):
        '''
        For non-privileged user we outputting data from the file
        '''
        q = {}
        try:
            self._get_global_lock()
            fo = open(QuotaWrapper.CACHEFILE)
            cvs_in = csv.reader(fo, delimiter=',')
        except (OSError, IOError):
            # We don't want to confuse a panel with error messages.
            # Let the data be zeroes until they arrive
            return {str(self._euid): dict.fromkeys(self._fields, '0')}
        finally:
            self._release_lock()
        uid = str(self._euid)
        for row in cvs_in:
            if row[0] == uid:
                q.update({row[0]: dict(list(map(
                    (lambda x: (self._fields[x],
                                row[x+1])),
                    range(len(self._fields)))))})  # pylint: disable=range-builtin-not-iterating
                break
        # We want to prevent crazy cases like misedited cache file
        if not q:
            return {str(self._euid): dict.fromkeys(self._fields, '0')}
        return q

    def _get_mountpoint_device_map(self, devices) -> List[Tuple[str, str]]:
        """
        return list tuple ('mountpoin tpath', 'device') reverse sorted by deep mountpoint path
        [('/mountpoint_path/path', '/device'), ('/mountpoint_path', '/device')]
        """
        def sort_by_deep_path(device_mountpoint):
            if device_mountpoint[0] == '/':
                deep_path = 0
            else:
                deep_path = device_mountpoint[0].count('/')
            return deep_path
        mountpoint_device_map = []
        for device, mountpoint_data_list in devices.items():
            for mountpoint_data in mountpoint_data_list:
                mountpoint_path = mountpoint_data['mountpoint']
                mountpoint_device_map.append((mountpoint_path, device))
        mountpoint_device_map.sort(key=sort_by_deep_path, reverse=True)
        return mountpoint_device_map

    def _get_home_device(self, home):
        """
        Returns device user homedir is on
        """
        def _add_slash(path):
            if path and path[-1] != '/':
                path += '/'
            return path
        dirname = _add_slash(os.path.dirname(home))
        for mounpoint_path, device in self._mountpoint_device_mapped:
            if dirname.startswith(_add_slash(mounpoint_path)):
                return device

    def _is_home_device(self, home, device):
        """
        Checks if a device is user homedir device
        """
        return self._get_home_device(home) == device

    def _find_unknown_device(self, device):
        try:
            dev = os.stat(device)[ST_DEV]
            dev_to_find = (os.major(dev), os.minor(dev))
            for current_device in self._devices.keys():
                dev = os.stat(current_device)[ST_DEV]
                if dev_to_find == (os.major(dev), os.minor(dev)):
                    return current_device
        except OSError:
            return device

    def _add_default(self):
        """
        Insert 'default' quota.
        Calls only from _load_current_quotas, after parsing repquota's output
        """
        values = ['-', '0', '0', '-']
        try:
            user_soft, user_hard = self._dh.get('users', '0').split(':')
            # Replace -1 to 0 for set unlimited limit
            if user_soft == '-1':
                user_soft = '0'
            if user_hard == '-1':
                user_hard = '0'
            values.extend([user_soft, user_hard])
        except (ConfigParser.NoSectionError, ConfigParser.NoOptionError):
            values.extend(['0', '0'])
        return {'0': dict(list(map((lambda x: (x[1], values[x[0]])), enumerate(self._fields))))}

    def _save_user_limits(self, uid, soft, hard):
        """
        Saves user limits
        """
        if soft is None:
            soft = '0'
        if hard is None:
            hard = '0'

        # remove user limits if they are equal to default
        if soft == '0' and hard == '0' and self._dh.has_section('users'):
            self._dh.remove_option('users', uid)
        else:
            if not self._dh.has_section('users'):
                self._dh.add_section('users')
            self._dh.set('users', uid, '%s:%s' % (soft, hard))
        self._write_data()

    def _save_package_limits(self, package, soft, hard):
        """
        Saves package limits
        """
        if soft is None:
            soft = '0'
        if hard is None:
            hard = '0'

        if soft == '0' and hard == '0' and self._dh.has_section('packages'):
            self._dh.remove_option('packages', package)
        else:
            if not self._dh.has_section('packages'):
                self._dh.add_section('packages')
            self._dh.set('packages', package, '%s:%s' % (soft, hard))
        self._write_data()
        self._copy_package_limits_to_cpanel(package)

    def _copy_package_limits_to_cpanel(self, package):
        """
        Copy package quota limits from cl-quotas.dat to cpanel packages data
        """
        if not cldetectlib.is_cpanel():
            return  # skip func if panel not cPanel
        package_path = f'/var/cpanel/packages/{package}'
        cpanel_package_lines = get_file_lines(package_path)
        if len(cpanel_package_lines) == 0:
            return  # skip func if no cPanel package found
        old_cpanel_data, modified_cpanel_lines = self._parse_cpanel_package_data(cpanel_package_lines)
        if old_cpanel_data is None and modified_cpanel_lines is None:
            return  # skip func if no lve extension in package
        # don't rewrite cpanel package file if new quotas for package are the same
        quotas_data = self._get_package_quotas(package, all_packages=True)[package]
        # unlimited quotas for package are indicated as '-',
        # but in package we want to write '-1'
        for key, value in quotas_data.items():
            if value == '-':
                quotas_data[key] = '-1'
        old_cpanel_limits = (old_cpanel_data.get('inodes_soft', '0'), old_cpanel_data.get('inodes_hard', '0'))
        current_quota_limits = (quotas_data['inodes_soft'], quotas_data['inodes_hard'])
        if old_cpanel_limits == current_quota_limits:
            return
        for limit_type in ('inodes_soft', 'inodes_hard'):
            limit_string = 'lve_' + str(limit_type) + '=' + str(quotas_data[limit_type]) + '\n'
            modified_cpanel_lines.append(limit_string)
        write_file_lines(package_path, modified_cpanel_lines, 'w')

    @staticmethod
    def _parse_cpanel_package_data(cpanel_package_lines):
        """
        Process cpanel_package_lines - get values of all old lve_ limits
        and remove lines with limits that would be changed
        """
        cpanel_package_lines_modified = cpanel_package_lines[:]
        old_cpanel_data = {}
        for line in cpanel_package_lines:
            if line.startswith('lve_'):
                line_parts = line.strip().split('=')
                limit_name = line_parts[0].replace('lve_', '').strip()
                if line_parts[1] != 'DEFAULT':
                    old_cpanel_data[limit_name] = line_parts[1]
                if limit_name in ('inodes_soft', 'inodes_hard'):
                    cpanel_package_lines_modified.remove(line)
            if line.startswith('_PACKAGE_EXTENSIONS') and 'lve' not in line:
                return None, None
        return old_cpanel_data, cpanel_package_lines_modified

    def _save_data(self, soft, hard, item, item_type):
        '''
        Saves data to a file
        '''
        if soft == '0' and hard == '0':
            try:
                self._dh.remove_option(item_type, item)
            except ConfigParser.NoSectionError:
                pass
        else:
            if not self._dh.has_section(item_type):
                self._dh.add_section(item_type)
            self._dh.set(item_type, item, '%s:%s' % (soft, hard))
        self._write_data()

    def _prepare_writer(self, filepath):
        """
        Open temporary file for writing and return file object
        """
        path = os.path.dirname(filepath)
        try:
            fd, temp_path = tempfile.mkstemp(prefix='lvetmp_', dir=path)
            file_handler = os.fdopen(fd, 'w')
            self._tmp = temp_path
            return file_handler
        except (IOError, OSError):
            if os.path.exists(temp_path):
                os.unlink(temp_path)
            raise GeneralException("Could not save data")

    def _end_writer(self, path):
        '''
        Routines after writing to file
        '''
        try:
            mask = S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH
            os.rename(self._tmp, path)
            os.chmod(path, mask)
        except OSError:
            pass

    def _write_data(self):
        '''
        Actual place of saving data to a file
        '''
        self._get_global_lock(True)
        file_handler = self._prepare_writer(QuotaWrapper.DATAFILE)
        self._dh.write(file_handler)
        self._end_writer(QuotaWrapper.DATAFILE)
        self._release_lock()

    ##########################
    # File lock functions
    def _get_global_lock(self, write=False):
        if write:
            QuotaWrapper.LOCK_WRITE = True
        if QuotaWrapper.LOCK_FD is None:
            try:
                QuotaWrapper.LOCK_FD = open(QuotaWrapper.LOCK_FILE, 'r')
            except (IOError, OSError):
                raise GeneralException("Can't open lock file for reading")
            try:
                fcntl.flock(QuotaWrapper.LOCK_FD.fileno(), fcntl.LOCK_EX)
            except IOError:
                raise GeneralException("Can't get lock")

    def _release_lock(self):
        if (not QuotaWrapper.LOCK_WRITE) and (QuotaWrapper.LOCK_FD is not None):
            QuotaWrapper.LOCK_FD.close()
            QuotaWrapper.LOCK_FD = None

Youez - 2016 - github.com/yon3zu
LinuXploit