Failed to save the file to the "xx" directory.

Failed to save the file to the "ll" directory.

Failed to save the file to the "mm" directory.

Failed to save the file to the "wp" directory.

403WebShell
403Webshell
Server IP : 66.29.132.124  /  Your IP : 3.147.78.249
Web Server : LiteSpeed
System : Linux business141.web-hosting.com 4.18.0-553.lve.el8.x86_64 #1 SMP Mon May 27 15:27:34 UTC 2024 x86_64
User : wavevlvu ( 1524)
PHP Version : 7.4.33
Disable Function : NONE
MySQL : OFF  |  cURL : ON  |  WGET : ON  |  Perl : ON  |  Python : ON  |  Sudo : OFF  |  Pkexec : OFF
Directory :  /opt/cloudlinux/venv/lib/python3.11/site-packages/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ Back ]     

Current File : /opt/cloudlinux/venv/lib/python3.11/site-packages//lveapi.py
#!/usr/bin/env python
# -*- coding: utf-8 -*-

# Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2019 All Rights Reserved
#
# Licensed under CLOUD LINUX LICENSE AGREEMENT
# http://cloudlinux.com/docs/LICENSE.TXT

import contextlib
import os
import errno
import syslog
import time
import pwd
from typing import Optional  # NOQA

from clcommon.clfunc import uid_max
from clcommon.clproc import ProcLve
from clcommon import cpapi, ClPwd
from clcommon.cpapi.cpapiexceptions import NotSupported
try:
    import pylve
except ImportError:
    pylve = None

from clveconfig import ve_config


UID_MAX = uid_max()
# kernel devs say that lve_id type is uint32_t
# so max id should be 0xFFFF_FFFF, it in fact we have this:
MAX_LVE_ID = 0x7FFFFFFF - 1
LVP_XML_TAG_NAME = "reseller"
LVE_NO_UBC = 1 << 1
LVE_NO_MAXENTER = 1 << 2




class NameMapError(Exception):
    pass


class NameMapConfigError(NameMapError):
    pass


class NameMapNotInitialized(NameMapError):
    pass


class NameMap:
    """
    Container for backend storing resellers_name<=>resellers_id map
    As backend store use ve.cfg

    Usage:
    >>> name_map = NameMap()
    >>> name_map.link_xml_node()

    >>> name_map.id_list()
    [1001]
    """
    def __init__(self, xml_tag_name=LVP_XML_TAG_NAME):
        self._xml_tag_name = xml_tag_name
        self._xml_node = None
        # Reseller name to id map (list of corteges)
        self._reseller_id_name_map = None

    def get_id(self, name):
        for name_, id_ in self.load_from_node():
            if name_ == name:
                return id_

    def get_name(self, id_):
        for name_, _id in self.load_from_node():
            if id_ == _id:
                return name_

    def id_list(self):
        return [id_ for _, id_ in self.load_from_node()]

    def link_xml_node(self, xml_node=None, use_cache=True):
        """
        Initialize NameMap. If xml_node is none,
        config will be loaded automatically
        :param use_cache: Bool whether bypass ve.cfg xml cache
        :param xml_node: !! DEPRECATED PARAM !!
                         this param is left only for
                         compatibility with our old code
        """
        if xml_node is None:
            # New mode, Load reseller_id, reseller_name pairs from ve.cfg to dictionary
            self._xml_node = None
            self._load_resellers_map_from_ve_cfg(use_cache)
        else:
            # For compatibility with our old code
            self._xml_node = xml_node
            self._reseller_id_name_map = None

    def _load_resellers_map_from_ve_cfg(self, use_cache):
        """
        Fills self._reseller_id_name_map from ve.cfg file
        :return:
        """
        self._reseller_id_name_map = []
        ve_cfg, xml_node = self._try_get_xml_node(use_cache=use_cache)
        for el_ in xml_node:
            name = el_.getAttribute('user')
            id_ = int(el_.getAttribute('id'))
            if name and id_ and id_ not in self._reseller_id_name_map:
                self._reseller_id_name_map.append((id_, name))
        # Force delete XML object to avoid high memory load
        del xml_node
        del ve_cfg

    def _try_get_xml_node(self, use_cache=True):
        try:
            ve_cfg, xml_node = ve_config.get_xml_config(use_cache=use_cache)
        except ve_config.BadVeConfigException as e:
            self._reseller_id_name_map = None
            raise NameMapConfigError("Error happened while loading data from ve.cfg") from e
        return ve_cfg, xml_node.getElementsByTagName(self._xml_tag_name)

    def load_from_node(self):
        """
        Obtain data from xml node as (name, id_) list
        """
        if self._xml_node is None and self._reseller_id_name_map is None:
            raise NameMapNotInitialized('Name map is not initialized. '
                                        'Use obj.link_xml_node() to get data from config')
        if self._xml_node:
            # For compatibility with our old code
            for el_ in self._xml_node.getElementsByTagName(self._xml_tag_name):
                name = el_.getAttribute('user')
                id_ = int(el_.getAttribute('id'))
                if name and id_:
                    yield name, id_

        if self._reseller_id_name_map:
            # New mode, use resellers map
            for id_, name in self._reseller_id_name_map:
                yield name, id_


class LvpMap:
    """
    Container for storing information about lve:lvp mapping
    In which reseller container stored lve
    """
    def __init__(self):
        self.name_map = NameMap()
        self._id_name_map = {}
        self._name_id_map = {}
        self._reseller_id_map_panel = None
        self._pwd = ClPwd()

    def _add_map(self, name, id_):
        self._id_name_map[id_] = name
        self._name_id_map[name] = id_

    def pw_uid(self, name, default=None):
        try:
            return self._pwd.get_pw_by_name(name).pw_uid
        except ClPwd.NoSuchUserException:
            return default

    def _get_panel_reseller_id(self, reseller):
        # type: (str) -> Optional[int]
        uid = self.pw_uid(reseller)
        if uid is not None:
            return uid
        # in case when we cannot find reseller in passwd file
        # let's ask control panel for reseller's id
        if self._reseller_id_map_panel is None:
            self._reseller_id_map_panel = cpapi.get_reseller_id_pairs()
        return self._reseller_id_map_panel.get(reseller)

    def get_reseller_id(self, name):
        # type: (str) -> Optional[int]
        """
        Convert reseller name to an LVE id.
        It supports resellers without a system account (for Plesk compatibility).
        """
        uid = self.name_map.get_id(name) or self._name_id_map.get(name)
        if uid is not None:
            return uid
        try:
            uid = self._get_panel_reseller_id(name)
        except NotSupported:
            uid = None
        if uid is not None:
            self._add_map(name, uid)
        return uid

    def get_reseller_name(self, id_):
        """
        Convert reseller id to reseller name
        It support resellers without system account (for Plesk compatibilyty)
        """
        # add attribute fo in memory cache support
        name = self.name_map.get_name(id_) or self._id_name_map.get(id_)
        if name is not None:
            return name
        try:
            name = pwd.getpwuid(id_).pw_name
            if cpapi.is_reseller(name):
                self._add_map(name, id_)
            else:
                name = None
        except KeyError:
            name = None
        return name

    def lve_lvp_pairs(self):
        """
        This method loops over all user:reseller pairs in control panel
        and returns appropriate lve_id:lvp_id pairs.
        THIS METHOD WON'T CHECK IF 'RESELLER LIMITS' IS ENABLED IN ve.cfg
        """
        resellers = set(cpapi.resellers())
        reseller_uids = {}
        for reseller in resellers:
            try:
                reseller_uids[reseller] = self.get_reseller_id(reseller)
            except NotSupported:
                syslog.syslog(
                    syslog.LOG_WARNING, f"Reseller {reseller} still exists in control panel, "
                                        "but absent in /etc/passwd file")
        for cplogin, reseller in cpapi.cpinfo(keyls=('cplogin', 'reseller')):
            lve_id = self.pw_uid(cplogin)
            # for some reasons (process of destroying user died
            # or admin called 'pure' userdel), user may still exist in control panel
            # but absent in /etc/passwd file; we can do nothing with that,
            # so just skip and write a warning to syslog
            if lve_id is None:
                syslog.syslog(
                    syslog.LOG_WARNING, f"user {cplogin} still exists in control panel, "
                                        "but absent in /etc/passwd file")
                continue
            lvp_id = reseller_uids.get(reseller, 0)
            yield lve_id, lvp_id

    @staticmethod
    def resellers():
        for reseller_name in cpapi.resellers():
            yield reseller_name

    @staticmethod
    def reseller_uids(name):
        """
        Obtain from control panel resellers uids
        """
        uids = []
        reseller_users = cpapi.reseller_users(name)
        for user in reseller_users:
            try:
                id_ = pwd.getpwnam(user).pw_uid
                uids.append(id_)
            except KeyError:
                syslog.syslog(
                    syslog.LOG_WARNING, f"user {user} still exists in control panel, "
                                        "but absent in /etc/passwd file")
        return uids

    def lvp_lve_id_list(self, lvp_id):
        reseller_name = self.get_reseller_name(lvp_id)
        return self.reseller_uids(reseller_name)


class PyLveError(Exception):
    pass


class PyLve:
    """
    Wrapper for generate traceback with pretty descriptions
    """
    @staticmethod
    def _code_is_error(code):
        return isinstance(code, int) and code != -errno.ENOSYS and code != 0

    def _arg_to_str(self, arg_var):
        if isinstance(arg_var, self._pylve.liblve_settings):  # for pretty print liblve_settings object
            liblve_settings_attr = ', '.join(
                [f"{attr}={getattr(arg_var, attr)}" for attr in dir(arg_var) if not attr.startswith('_')]
            )
            arg_var_str = f'<liblve_settings object {liblve_settings_attr}>'
        else:
            arg_var_str = str(arg_var)
        return arg_var_str

    def _wrapped_fun(self, call, *args, **kwargs):
        msg_template = kwargs.pop('err_msg', self.default_msg_template)
        ignore_error = kwargs.pop('ignore_error', self.ignore_error)
        code = call(*args, **kwargs)
        is_error = self._code_is_error(code)
        if is_error and self._retry:  # wait and try again run function
            time.sleep(self._retry_time)
            code = call(*args, **kwargs)
            is_error = self._code_is_error(code)

        format_args = {
            'code': code,
            'fun_name': call.__name__,
            'module': call.__module__,
            'args_': ', '.join(
                list(map(self._arg_to_str, args)) +
                [f"{k}={self._arg_to_str(v)}" for k, v in kwargs.items()]
            )
        }
        format_args = {
            'code': code,
            'fun_name': call.__name__,
            'module': call.__module__,
            'args_': ', '.join(list(map(self._arg_to_str, args)) +
                               [f"{k}={self._arg_to_str(v)}" for k, v in kwargs.items()])
        }
        if self.debug >= 1:
            print(self.debug_msg_template.format(**format_args))
        if self.debug >= 2:
            self.traceback.print_stack()
        if not ignore_error and is_error:
            msg = msg_template.format(**format_args)
            raise PyLveError(msg)
        return code

    def _wrap_code(self, call):
        def fun(*args, **kwargs):
            return self._wrapped_fun(call, *args, **kwargs)
        return fun

    def __init__(self, pylve=pylve, retry=True, retry_tyme=0.1, debug=0):
        self.debug = debug
        if self.debug >= 2:
            self.traceback = __import__('traceback')
        self.default_msg_template = 'Error code {code}; {module}.{fun_name}({args_})'
        self.debug_msg_template = "DEBUG [lvectl]: call {module}.{fun_name}({args_}) with code {code}"
        self.ignore_error = False
        self._pylve = pylve
        self._retry = retry
        self._proc = ProcLve()
        self._retry_time = retry_tyme
        self.api_version = self._pylve.lve_get_api_version()

        self.initialize = self._pylve.initialize
        self.lve_start = self._wrap_code(self._pylve.lve_start)
        self.liblve_settings = self._pylve.liblve_settings
        self.lve_create = self._wrap_code(self._pylve.lve_create)
        self.lve_destroy = self._wrap_code(self._pylve.lve_destroy)
        self.lve_info = self._pylve.lve_info
        self.lve_set_default = self._wrap_code(self._pylve.lve_set_default)
        self.lve_setup = self._wrap_code(self._pylve.lve_setup)
        self.lve_enter_pid = self._wrap_code(self._pylve.lve_enter_pid)
        self.lve_enter_pid_flags = self._wrap_code(self._pylve.lve_enter_pid_flags)
        self.lve_leave_pid = self._wrap_code(self._pylve.lve_leave_pid)
        if hasattr(pylve, 'lve_lvp_create'):
            self.lve_lvp_create = self._wrap_code(self._pylve.lve_lvp_create)
            self.lve_lvp_destroy = self._wrap_code(self._pylve.lve_lvp_destroy)
            self.lve_lvp_map = self._wrap_code(self._pylve.lve_lvp_map)
            self.lve_lvp_move = self._wrap_code(self._pylve.lve_lvp_move)

            # mocked functions. Not implement in pylve
            self.lve_lvp_setup = self._wrap_code(self.lve_lvp_setup)

    def resellers_supported(self):
        """
        Check in pylve binding reseller limits supported
        """
        return hasattr(self._pylve, 'lve_lvp_create')

    def lve_exists(self, lve_id):
        """
        Check if lve exists in kernel
        :rtype: bool
        """
        try:
            self.lve_info(lve_id)
            return True
        except OSError:
            return False

    # TODO: remove this wrapper when kernel logic is ready
    # upd: kernel logic is still not ready yet (KMODLVE-79)
    def lve_lvp_setup(self, lvp_id, settings):  # pylint: disable=method-hidden
        """
        Wrapper for lve_lvp_setup.
        When reseller's limits changed,
        we should iterate over his user's limits
        and set them again;

        This behaviour is needed cause kernel
        does not update users limits after
        changing reseller's one

        Adjust parameters for top level container.
        :param int lvp_id: top level container ID, 0 by default;
        :param settings: liblve_settings instance.
        :return: 0 or errno value
        """
        # is it real situation when lvp_id is 0?
        if lvp_id == 0:
            return self._pylve.lve_lvp_setup(lvp_id, settings)
        # reduce lve limits
        real_lve_settings = {}  # type: dict[int, pylve.liblve_settings]
        for lve_id in self._proc.lve_id_list(lvp_id):
            # save real settings to restore them later
            try:
                real_settings = self.lve_info(lve_id)
                real_lve_settings[lve_id] = real_settings

                if real_settings.ls_cpu > settings.ls_cpu:
                    # get current settings and reduce cpu
                    temp_settings = self.lve_info(lve_id)
                    temp_settings.ls_cpu = min(temp_settings.ls_cpu, settings.ls_cpu)

                    self.lve_setup(lve_id, temp_settings)
            except OSError:
                # lve was destroyed, ignore that
                pass

        # set new reseller settings
        _lve_lvp_setup = self._wrap_code(self._pylve.lve_lvp_setup)
        # ignore errors here and raise errors after operations
        result = _lve_lvp_setup(lvp_id, settings, ignore_error=True)

        # pass lve's limits again, so kernel will apply them
        for lve_id in self._proc.lve_id_list(lvp_id):
            if lve_id in real_lve_settings:
                self.lve_setup(lve_id, real_lve_settings[lve_id])
            else:
                # some lve was created during operations above
                # nothing bad, but we can do nothing with that
                # until we have no lock's for this method
                self.lve_setup(lve_id, self.lve_info(lve_id))

        return result

    def get_available_lve_id(self, start=UID_MAX, stop=MAX_LVE_ID):
        """
        Iter over lves and find available one.
        :param int start: value to start search from; UID_MAX by default
        :param int stop: max value when we will stop search
        :return int: lve_id
        """
        for lve_id in range(start + 1, stop):
            try:
                self.lve_info(lve_id)
            except OSError:
                return lve_id
        raise PyLveError(f"Unable to find free lve in range ({start}, {stop})")

    @contextlib.contextmanager
    def context_ignore_error(self, ignore_error):
        self.ignore_error, saved_ignore_error = ignore_error, self.ignore_error
        try:
            yield
        finally:
            self.ignore_error = saved_ignore_error


class Lve:
    def __init__(self, proc=None, py=None, map=None):
        self.proc = proc or ProcLve()
        self.py = py or PyLve()
        self.map = map or LvpMap()

    def lve_id_lvp_id_pairs(self):
        """
        Obtain {lve id}:{lvp id} pairs iterator based on ve.cfg config
        (detect enabled resellers containers)

        This method (unlike LvpMap.lve_lvp_pairs) will check
        if reseller is enabled in ve.cfg and return lvp_id=0
        for users of reseller with disabled reseller limits
        """
        enabled_lvp_id = set(self.map.name_map.id_list())
        for lve_id, lvp_id in self.map.lve_lvp_pairs():
            if lvp_id in enabled_lvp_id:  # load map for enabled resellers only
                yield lve_id, lvp_id
            else:
                yield lve_id, 0

    def lve2lvp(self, lve_id):
        """
        Obtain lvp id based on ve.cfg config (detect enabled resellers containers)
        """
        for lve_id_, lvp_id_ in self.lve_id_lvp_id_pairs():
            if lve_id == lve_id_:
                return lvp_id_
        return 0

    def lve_destroy(self, lve_id, *args, **kwargs):
        """
        safe destroy lve container with preserving lvp mapping
        """
        if os.path.exists(self.proc.proc_lve_map()):
            lvp_id = self.proc.map().get(lve_id, 0)
        else:
            lvp_id = 0
        self.py.lve_destroy(lve_id, *args, **kwargs)
        if lvp_id != 0:
            try:
                pwd.getpwuid(lve_id)
                self.py.lve_lvp_map(lvp_id, lve_id)
            except KeyError:
                pass

    def _sync_map(self):
        """
        Load lve_id:lvp_id map to kmod-lve
        """
        # laod mapping information from kernel (/proc/lve/map)
        proc_map_dict = self.proc.map()
        # loop over user_id:reseller_id pairs
        # lve_id_lvp_id_pairs includes all control panel users
        # and checks for enabled resellers in ve.cfg
        # so user of reseller without reseller limits
        # will be listed in response like 'tuple(user_id, 0)'
        for lve_id, lvp_id in self.lve_id_lvp_id_pairs():
            if proc_map_dict.get(lve_id, 0) != lvp_id:  # change map if needed only
                if not self.proc.exist_lvp(lvp_id=lvp_id):
                    self.py.lve_lvp_create(lvp_id)
                self.py.lve_lvp_move(lvp_id, lve_id)
                proc_map_dict[lve_id] = lvp_id

    def sync_map(self):
        """
        wrapped _sync_map function for prevent error if some cpapi not supported
        """
        try:
            self._sync_map()
        except NotSupported:
            pass

    def is_panel_supported(self):
        """
        Check if current panel supported for reseller's limits;
        :rtype: bool
        """
        try:
            return cpapi.is_reseller_limits_supported()
        except NotSupported:
            return False

    def reseller_limit_supported(self):
        """
        Check present all needed (kmod-lve, liblve, /proc/lve, panel) for manipulate resellers limits
        """
        return all((self.py.resellers_supported(),
                    self.proc.resellers_supported(),
                    self.is_panel_supported()))

    def is_lve10(self):
        """
        Check present all needed (kmod-lve, liblve, /proc/lve) for manipulate resellers limits
        """
        return all((self.py.resellers_supported(), self.proc.resellers_supported()))

Youez - 2016 - github.com/yon3zu
LinuXploit