Failed to save the file to the "xx" directory.

Failed to save the file to the "ll" directory.

Failed to save the file to the "mm" directory.

Failed to save the file to the "wp" directory.

403WebShell
403Webshell
Server IP : 66.29.132.124  /  Your IP : 3.129.70.138
Web Server : LiteSpeed
System : Linux business141.web-hosting.com 4.18.0-553.lve.el8.x86_64 #1 SMP Mon May 27 15:27:34 UTC 2024 x86_64
User : wavevlvu ( 1524)
PHP Version : 7.4.33
Disable Function : NONE
MySQL : OFF  |  cURL : ON  |  WGET : ON  |  Perl : ON  |  Python : ON  |  Sudo : OFF  |  Pkexec : OFF
Directory :  /opt/cloudlinux/venv/lib64/python3.11/site-packages/clselect/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ Back ]     

Current File : /opt/cloudlinux/venv/lib64/python3.11/site-packages/clselect/cluserselect.py
# -*- coding: utf-8 -*-

# Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2019 All Rights Reserved
#
# Licensed under CLOUD LINUX LICENSE AGREEMENT
# http://cloudlinux.com/docs/LICENSE.TXT

from __future__ import print_function
from __future__ import absolute_import
from __future__ import division
import json
import os
import sys
import uuid
import signal
import secureio
from future.moves import configparser as ConfigParser
from stat import S_IRUSR, S_IWUSR, S_IRGRP, S_IROTH
from future.utils import iteritems
from pathlib import Path
import psutil

from .clselect import ClSelect
from .clselectexcept import ClSelectExcept
from clcommon import ClPwd, clcaptain
from .clselectprint import clprint
from . import utils
from clcommon.utils import ExternalProgramFailed
try:
    from clcagefslib.const import BASEDIR
    from clcagefslib.fs import get_user_prefix
    from clcagefslib.selector.configure import is_ea4_enabled, read_cpanel_ea4_php_conf, configure_alt_php
    from clcagefslib.selector.panel.da import da_change_user_php_ini
    from clcagefslib.selector.panel.isp import ispmanager_create_user_wrapper
except ImportError:
    pass


class ClUserSelect(ClSelect):
    CAGEFS_PATH = '/var/cagefs'
    SELECTOR_PATH = '/usr/selector'
    NATIVE_PATH = SELECTOR_PATH if utils.in_cagefs() else '/usr/share/cagefs-skeleton/usr/selector'
    CAGEFS_EXCLUDE = '/etc/cagefs/exclude'
    SELECTOR2_DIR = '.cl.selector/selector.path'


    def clean_crui_images(self, users=None):
        """
        Creates flags mod_lsapi_reset_me in users' home directories in order
        to recreate CRIU images when php version/extensions/options have changed
        For details see LVEMAN-1210
        :param users: list of usernames (strings)
        """

        # There is not reliable way to check if CRIU is enabled inside CageFS
        # So let's always create the "mod_lsapi_reset_me" flag
        if not utils.in_cagefs() and not os.path.isfile('/var/run/mod_lsapi/criu.enabled'):
            return
        for user in users:
            pw = self._clpwd.get_pw_by_name(user)
            path = os.path.join(pw.pw_dir, 'mod_lsapi_reset_me')
            if not os.path.isfile(path):
                previous_user_data = self._change_uid(user)
                try:
                    clcaptain.write(path)
                except (OSError, ExternalProgramFailed) as e:
                    raise ClSelectExcept.UnableToSaveData(path, e)
                finally:
                    ClUserSelect._restore_uid(previous_user_data)


    @staticmethod
    def switch_symlink_for_alt_php(version, pw, exit_on_error=True, configure_multiphp = True):
        """
        Switch symlink for alt php.
        Create .cagefs directory if not created
        Rerurn True if error has occured
        """
        if not os.path.isdir(BASEDIR) and not utils.in_cagefs():
            print('ERROR: CageFS not installed.')
            if exit_on_error:
                sys.exit(1)
            else:
                return True

        if configure_multiphp and is_ea4_enabled():
            conf = read_cpanel_ea4_php_conf()
            if conf:
                try:
                    # get default system php version selected via MultiPHP Manager in cPanel WHM
                    default_php = conf['default']
                    # LVEMAN-1170: do not configure PHP Selector when system default version is alt-php
                    if not default_php.startswith('ea-php'):
                        print('ERROR: system default PHP version is alt-php. '
                              'PHP Selector is disabled. Use cPanel MultiPHP manager instead.')
                        if exit_on_error:
                            sys.exit(1)
                        else:
                            return True
                except KeyError:
                    pass

        # configure alt php - create .cagefs dir and create symlink
        error = configure_alt_php(pw, version, write_log=False, drop_perm=(os.geteuid() == 0), configure_multiphp = configure_multiphp)
        if error and exit_on_error:
            sys.exit(1)
        return error

    def apply_symlinks_rules(self):
        if self.without_cagefs:
            print('ERROR: this option does not work in "single user" mode (when CageFS is disabled)')
            sys.exit(1)
        if os.geteuid() != 0:
            print('ERROR: root privileges required')
            sys.exit(1)
        users_vers_dict = self.get_user_version_map()
        for user, version in iteritems(users_vers_dict):
            print('Processing user', user)
            pw = self._clpwd.get_pw_by_name(user)
            ClUserSelect.switch_symlink_for_alt_php(version, pw, exit_on_error=False, configure_multiphp=False)

    def __init__(self, item='php', exclude_pid_list=None):
        ClSelect.__init__(self, item)
        self._clpwd = ClPwd()
        self._user_excludes = set()
        if exclude_pid_list:
            self.exclude_pid_list = exclude_pid_list
        else:
            self.exclude_pid_list = []

    def get_version(self, user, show_native_version=False):
        """
        Returns alternative version for a user
        @param user: string
        @return: string
        """
        self._check_user_in_cagefs(user)
        alt_path = self._compose_user_alt_path(user)
        native = self._compose_native_info(show_native_version)
        if not os.path.isdir(alt_path):
            return native
        alternatives = self.get_all_alternatives_data()
        full_path = os.path.join(alt_path, self._item)
        if not os.path.islink(full_path):
            return native
        link_dst = os.readlink(full_path)
        if self.without_cagefs:
            if not self._native_contents:
                self._load_native_contents(self._item)
            if link_dst == self._native_contents[self._item]:
                return native
        if os.path.dirname(link_dst) == self.SELECTOR_PATH:
            return native
        try:
            version = list(filter(
                (lambda i: alternatives[i]['data'][self._item] ==
                    link_dst),
                list(alternatives.keys())))[0]
            return (version,
                    alternatives[version]['version'],
                    alternatives[version]['data'][self._item])
        except (IndexError, KeyError):
            return native


    def create_dir(self, path, user):
        if not os.path.isdir(path):
            previous_user_data = self._change_uid(user)
            try:
                clcaptain.mkdir(path)
            except (OSError, ExternalProgramFailed) as e:
                raise ClSelectExcept.UnableToSaveData(path, e)
            finally:
                ClUserSelect._restore_uid(previous_user_data)


    def create_selector_symlinks(self, user):
        """
        Creates additional directory and symlinks for use in "without CageFS" mode
        """
        homedir = self._clpwd.get_homedir(user)
        path_in_home = os.path.join(homedir, self.SELECTOR2_DIR)
        cur_user = self._change_uid(user)
        self.create_dir(path_in_home, user)
        self._create_symlink('../php-cli', path_in_home+'/php', check_existence=True)
        self._create_symlink('../php', path_in_home+'/php-cgi', check_existence=True)
        self._restore_uid(cur_user)


    def get_default_version(self):
        if os.path.isfile(ClSelect.DEFAULTS_PATH):
            try:
                return self._dh.get('versions', self._item)
            except (ConfigParser.Error, IOError, KeyError):
                return 'native'
        return 'native'


    def set_version_from_backup(self, user):
        user_backup_path = os.path.join(self._clpwd.get_homedir(user), '.cl.selector', 'defaults.cfg')
        if not os.path.isfile(user_backup_path):
            self.set_version(user, self.get_default_version())
        else:
            try:
                dh = self._get_default_config_handler(user_backup_path)
                self.set_version(user, dh.get('versions', self._item))
            except (ConfigParser.Error, IOError, KeyError) as e:
                print('Error while restoring settings from backup', str(e))
                sys.exit(1)

    def set_version(self, user, version, return_summary=False, show_native_version=False, exit_on_error=True):
        """
        Sets alternative version for a users with the same uid
        @param user: string
        @return: None
        """
        if os.geteuid() != 0 and (message := self.get_version_selection_disabled_msg(user)):
            raise ClSelectExcept.VersionModificationBlocked(message)

        data = utils.apply_for_at_least_one_user(
            self._set_version,
            self._clpwd.get_names(self._clpwd.get_uid(user)),
            ClSelectExcept.NoUserSelector,
            version, return_summary, show_native_version, exit_on_error
        )

        if return_summary:
            return data

    def _set_version(self, user, version, return_summary=False, show_native_version=False, exit_on_error=True):
        """
        Sets alternative version for a user
        @param user: string
        @return: None
        """
        if self.without_cagefs:
            previous_user_data = self._change_uid(user)
        self._check_user_in_cagefs(user)
        alt_path = self._compose_user_alt_path(user)
        if not os.path.isdir(alt_path):
            if self.without_cagefs:
                self.create_dir(alt_path, user)
            else:
                raise ClSelectExcept.NoUserSelector(user)
        alternatives = self.get_all_alternatives_data()
        if version not in alternatives and version != 'native':
            raise ClSelectExcept.NoSuchAlternativeVersion(version)
        self._remove_alternatives_links(alt_path)
        pw = self._clpwd.get_pw_by_name(user)
        if version == 'native':
            if self.without_cagefs:
                if not self._native_contents:
                    self._load_native_contents(self._item)
                for item, native_path in iteritems(self._native_contents):
                    self._create_symlink(native_path, alt_path+'/'+item, user, version)
            else:
                ini = 'php.ini'
                new_ini_created = False
                new_ini_path = os.path.join("%s.etc" % (self.NATIVE_PATH,), ini)
                if os.path.exists(new_ini_path):
                    src = os.path.join("%s.etc" % self.SELECTOR_PATH, ini)
                    dst = os.path.join(alt_path, ini)
                    self._create_symlink(src, dst, user, version)
                    new_ini_created = True
                for filename in os.listdir(self.NATIVE_PATH):
                    if self._item not in filename:
                        continue
                    if filename.endswith('.ini') and new_ini_created:
                        continue
                    dst = os.path.join(alt_path, filename)
                    src = os.path.join(self.SELECTOR_PATH, filename)
                    self._create_symlink(src, dst, user, version)
        else:
            for item, path in iteritems(alternatives[version]['data']):
                self._create_symlink(path, os.path.join(alt_path, item), user, version)

        if self.without_cagefs:
            ClUserSelect._restore_uid(previous_user_data)
        else:
            ClUserSelect.switch_symlink_for_alt_php(version, pw, exit_on_error = exit_on_error)
            self._switch_php_da_isp(user, version)
        self._reload_processes(user)
        self._backup_settings(user)

        if return_summary:
            return self.get_summary(user, show_native_version)

    def get_version_selection_disabled_msg(self, username: str) -> str:
        """
        Returns a message indicating that the selection of the PHP version
        is disabled for the user, based on the configuration file.

        Args:
            username (str): The username for which to check the configuration.

        Returns:
            str: The message indicating that version selection is disabled,
                 or an empty string if the configuration file does not exist
                 or does not contain the message.
        """
        uid = self._clpwd.get_uid(username)
        config_file = Path(f'/var/cloudlinux/cl.selector/uids/{uid}/version_selection_conf.json')
        if not config_file.exists():
            return ''

        try:
            with config_file.open(encoding='utf-8') as f:
                config_data = json.load(f)
                return config_data.get('version_selection_disabled_msg', '')
        except (OSError, ValueError):
            return ''

    def get_summary(self, user, show_native_version=False):
        """
        Returns state of alternatives
        @param user: string
        @return: tuple
        """
        self._check_user_in_cagefs(user)
        alternatives = self.get_all_alternatives_data()
        native_info = self._compose_native_info(show_native_version)
        summary = {'native': {
            'enabled': True, 'default': False, 'selected': False}}
        alt_versions = sorted(alternatives.keys())
        alt_versions.append('native')
        selected_version = self.get_version(user)[0]
        for version in alt_versions:
            if version not in summary:
                summary[version] = {}
            summary[version]['enabled'] = not self._dh.has_option(
                "%s%s" % (self._item, version), 'state')
            summary[version]['default'] = False
            summary[version]['selected'] = False
        try:
            default_version = self._dh.get('versions', self._item)
        except (ConfigParser.NoSectionError, ConfigParser.NoOptionError):
            default_version = 'native'
        try:
            summary[default_version]['default'] = True
            summary[selected_version]['selected'] = True
        except KeyError:
            raise ClSelectExcept.NoSuchAlternativeVersion(default_version)
        summary[native_info[0]] = summary.pop('native')
        alt_versions.remove('native')
        alt_versions.append(native_info[0])
        for idx in range(len(alt_versions)):
            v = alt_versions[idx]
            alt_versions[idx] = (
                v,
                (summary[v]['enabled'],
                 summary[v]['default'],
                 summary[v]['selected']))
        return tuple(alt_versions)

    def change_to_version(self, new_version, current_version):
        """
        Changes users of a supplied version to specified_version
        @param version: string
        @param current_version: string
        """
        users = self.list_users(current_version)
        for user in users:
            try:
                self.set_version(user, new_version, exit_on_error = False)
            except Exception as e:     #catch every errors, print it and go to the next user
                clprint.print_diag('text', {'status': 'ERROR', 'message': str(e)})
                pass
        self.clean_crui_images(users)

    def list_users(self, version):
        """
        Returns users of a certain alternative
        """
        data = self.get_version_user_map()
        if version in data:
            return data[version]
        return []

    def list_all_users(self):
        """
        Returns all valid system users
        @return: list
        """
        if self.without_cagefs:
            from .clselectctlphp import get_cpanel_user
            return [get_cpanel_user()]
        return list(self._get_system_users().difference(
            self._get_user_excludes()))

    def cagefs_copy_etc(self, user):
        config = dict()
        config['init'] = 0
        config['reinit'] = 0
        config['verbose'] = 0
        LIBDIR = '/usr/share/cagefs'
        sys.path.append(LIBDIR)

        try:
            import cagefsctl
        except ImportError:
            print('ERROR: CageFS not installed.')
            sys.exit(1)

        cagefs_etc_path = os.path.join(BASEDIR, get_user_prefix(user), user, 'etc')
        if not os.path.exists(cagefs_etc_path + '/cl.selector') or \
                not os.path.exists(cagefs_etc_path + '/cl.php.d'):
            cagefsctl.cpetc_for_user(user, config)

    def get_user_version_map(self):
        """
        Returns user version map as dict
        @return: dict
        """
        actual_users = self.list_all_users()
        data = {}
        for user in actual_users:
            try:
                data[user] = self.get_version(user, False)[0]
            except ClSelectExcept.NotCageFSUser:
                continue
        return data

    def get_version_user_map(self, user_names = None):
        """
        Returns users grouped by version
        @return: dict
        """
        actual_users = user_names or self.list_all_users()
        data = {}
        for user in actual_users:
            try:
                version = self.get_version(user, False)[0]
                if not version in data:
                    data[version] = []
                data[version].append(user)
            except ClSelectExcept.NotCageFSUser:
                continue
        return data

    def _create_symlink(src, dst, user=None, version=None, check_existence=False):
        """
        Creates symlink from src to dst
        @param src: string
        @param dst: string
        @param user: string
        @param version: string
        @param check_existence: bool
        @return: None
        """
        try:
            if check_existence:
                if os.path.islink(dst):
                    if os.readlink(dst) != src:
                        os.unlink(dst)
                    else:
                        return
                else:
                    utils.remove_file_or_dir(dst)
            clcaptain.symlink(src, dst)
        except Exception as e:
            if user is not None and version is not None:
                raise ClSelectExcept.UnableToSetAlternative(user, version, e)
            raise ClSelectExcept.SelectorException(
                "Cannot create symlink from %s to %s (%s)" % (src, dst, e))
    _create_symlink = staticmethod(_create_symlink)

    def _get_user_excludes(self):
        """
        Returns list of user excludes
        @return: list
        """
        if self._user_excludes:
            return self._user_excludes
        if not os.path.isdir(self.CAGEFS_EXCLUDE):
            return set()
        for item in os.listdir(self.CAGEFS_EXCLUDE):
            full_item_path = os.path.join(self.CAGEFS_EXCLUDE, item)
            self._user_excludes.update(
                set(
                    map((lambda x: x.strip()),
                        utils.read_file_as_string(full_item_path).splitlines())))
        return self._user_excludes

    def _check_user_in_cagefs(self, user):
        """
        Check that cagefs enabled for user
        """
        if self.without_cagefs:
            return

        if utils.in_cagefs():
            return

        LIBDIR = '/usr/share/cagefs'
        sys.path.append(LIBDIR)
        try:
            import cagefsctl
        except ImportError:
            print('ERROR: CageFS not installed.')
            sys.exit(1)
        try:
            if not cagefsctl.is_user_enabled(user):
                raise ClSelectExcept.NotCageFSUser(user)
        except AttributeError:
            print('ERROR: CageFS version is unsupported. Please update CageFS.')
            sys.exit(1)

    def _remove_alternatives_links(self, path):
        """
        Removes all symlinks from directory
        @param path: string
        @return: None
        """
        for filename in os.listdir(path):
            if self._item not in filename:
                continue
            full_path = os.path.join(path, filename)
            if not os.path.islink(full_path):
                continue
            os.unlink(full_path)

    def _compose_user_alt_path(self, user):
        """
        Composes and returns user alternative directory path
        @param user: string
        @return: string
        """
        if self.without_cagefs:
            homedir = self._clpwd.get_homedir(user)
            return homedir + '/.cl.selector'
        uid = str(self._clpwd.get_uid(user))
        return (
            '/etc/cl.selector'
            if utils.in_cagefs() else
            os.path.join(self.CAGEFS_PATH, uid[-2:], user, 'etc', 'cl.selector')
        )

    def _get_system_users(self):
        """
        Returns set of system users
        @return: set
        """
        users_dict = self._clpwd.get_user_dict()
        return set(users_dict.keys())

    def _delete_if_symlink(file_path):
        """
        Deletes file to be written if it is a symlink
        """
        if os.path.islink(file_path):
            try:
                os.unlink(file_path)
            except OSError:
                raise ClSelectExcept.UnableToSaveData(
                    file_path,
                    "Cannot delete symlink while saving data")
    _delete_if_symlink = staticmethod(_delete_if_symlink)

    def _change_uid(self, user):
        """
        Changes to another uid and returns tuple of previous euid and egid
        @param user: string
        @return: tuple
        """
        entry = self._clpwd.get_pw_by_name(user)
        new_uid = entry.pw_uid
        new_gid = entry.pw_gid
        cur_euid = os.geteuid()
        cur_egid = os.getegid()
        if cur_euid == new_uid:
            return cur_euid, cur_egid
        try:
            os.setegid(new_gid)
            os.seteuid(new_uid)
            secureio.set_capability()
            return cur_euid, cur_egid
        except OSError as e:
            raise ClSelectExcept.UnableToChangeToAnotherUser(user, e)

    def _restore_uid(uid_and_gid):
        """
        Restores changed uid and gid to original ones
        @param uid_and_gid: tuple
        @return: None
        """
        if uid_and_gid[0] != os.geteuid():
            secureio.set_capability(clear=True)
            try:
                os.setegid(uid_and_gid[1])
                os.seteuid(uid_and_gid[0])
            except OSError as e:
                raise ClSelectExcept.UnableToChangeToAnotherUser(str(uid_and_gid[0]), e)
    _restore_uid = staticmethod(_restore_uid)

    def _write_to_file(self, user, file_contents, file_path, create=True):
        """
        Saves data to file
        @param user: string
        @param file_contents: string
        @param file_path: string
        @return: None
        """
        if not create and not os.path.exists(file_path):
            return
        self._delete_if_symlink(file_path)
        previous_user_data = self._change_uid(user)
        file_directory = os.path.dirname(file_path)
        try:
            # Replace tempfile.mkstemp with str(uuid.uuid4())
            dirname = 'clseltmp_%s' % str(uuid.uuid4())
            temp_path = os.path.join(file_directory, dirname)
            clcaptain.write(temp_path, "%s\n" % (file_contents,))
        except (IOError, OSError, ExternalProgramFailed) as e:
            try:
                if os.path.exists(temp_path):
                    os.unlink(temp_path)
            except:
                pass
            ClUserSelect._restore_uid(previous_user_data)
            raise ClSelectExcept.UnableToSaveData(file_path, e)
        else:
            try:
                mask = S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH
                os.rename(temp_path, file_path)
                os.chmod(file_path, mask)
            except OSError:
                pass
        ClUserSelect._restore_uid(previous_user_data)

    def _reload_processes(self, user):
        """
        Reloads user process
        """
        try:
            next_parent = psutil.Process()
            for i in range(2):
                next_parent = next_parent.parent()
                if next_parent is not None:
                    self.exclude_pid_list.append(next_parent.pid)
                else:
                    break
        except psutil.NoSuchProcess:
            pass
        try:
            uid = ClPwd().get_uid(user)
        except (ClPwd.NoSuchUserException, ):
            # no such user
            return
        try:
            for proc in psutil.process_iter():
                try:
                    if uid not in [proc.uids().real, proc.uids().effective] or proc.name().find(self._item) == -1:
                        continue
                    pid = proc.pid
                except psutil.NoSuchProcess:
                    continue
                try:
                    if pid not in self.exclude_pid_list:
                        os.kill(pid, signal.SIGHUP)
                except (OSError, ):
                    continue
        except (OSError, IOError):
            # psutil reads /proc FS as usual FS, skip read errors
            pass

    def _skim_over_extensions(path):
        """
        Get extension names from user extensions file comments
        """
        extensions = []
        try:
            ini = open(path)
            for line in ini:
                if line.startswith(';---'):
                    ext = line[4:line.rfind('---')]
                    extensions.append(ext)
            ini.close()
        except (OSError, IOError):
            pass
        return extensions
    _skim_over_extensions = staticmethod(_skim_over_extensions)

    def _backup_settings(self, user):
        """
        Scans all user settings and backups'em in homedir as INI file
        @param user: string
        """
        self._check_user_in_cagefs(user)
        backup_contents = []
        user_alt_path = self._compose_user_alt_path(user)
        user_ext_path = os.path.join(os.path.dirname(user_alt_path), 'cl.php.d')
        alternatives = self.get_all_alternatives_data()
        user_backup_path = os.path.join(
            self._clpwd.get_homedir(user), '.cl.selector')
        if not os.path.isdir(user_backup_path):
            previous_user_data = self._change_uid(user)
            try:
                clcaptain.mkdir(user_backup_path)
            except (OSError, ExternalProgramFailed) as e:
                ClUserSelect._restore_uid(previous_user_data)
                raise ClSelectExcept.UnableToSaveData(user_backup_path, e)
            ClUserSelect._restore_uid(previous_user_data)
        user_backup_file = os.path.join(
            user_backup_path, 'defaults.cfg')
        if os.path.isdir(user_alt_path):
            version = '[versions]\n%s = %s\n' % (
                self._item, self.get_version(user)[0])
        else:
            version = '[versions]\n%s = native\n' % (self._item,)
        backup_contents.append(version)
        for alt in sorted(alternatives.keys()):
            if self.without_cagefs:
                curr_ext_path = user_alt_path + "/alt_php" + alt.replace('.', '') + '.ini'
            else:
                curr_ext_path = os.path.join(
                    user_ext_path, "alt-php%s" % (
                        (alt.replace('.', ''),)), 'alt_php.ini')
            extensions = self._skim_over_extensions(curr_ext_path)
            backup_contents.append(
                "[%s%s]\nmodules = %s\n" % (
                    self._item, alt, ','.join(sorted(extensions))))
        self._write_to_file(
            user, '\n'.join(backup_contents), user_backup_file)

    def _switch_php_da_isp(self, user, version):
        if self.without_cagefs:
            return
        da_change_user_php_ini(user, version)
        ispmanager_create_user_wrapper(user, version)

Youez - 2016 - github.com/yon3zu
LinuXploit