Failed to save the file to the "xx" directory.

Failed to save the file to the "ll" directory.

Failed to save the file to the "mm" directory.

Failed to save the file to the "wp" directory.

403WebShell
403Webshell
Server IP : 66.29.132.124  /  Your IP : 3.147.56.125
Web Server : LiteSpeed
System : Linux business141.web-hosting.com 4.18.0-553.lve.el8.x86_64 #1 SMP Mon May 27 15:27:34 UTC 2024 x86_64
User : wavevlvu ( 1524)
PHP Version : 7.4.33
Disable Function : NONE
MySQL : OFF  |  cURL : ON  |  WGET : ON  |  Perl : ON  |  Python : ON  |  Sudo : OFF  |  Pkexec : OFF
Directory :  /opt/cloudlinux/venv/lib64/python3.11/site-packages/clselect/clselectpythonuser/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ Back ]     

Current File : /opt/cloudlinux/venv/lib64/python3.11/site-packages/clselect/clselectpythonuser/environments.py
# -*- coding: utf-8 -*-

# Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2019 All Rights Reserved
#
# Licensed under CLOUD LINUX LICENSE AGREEMENT
# http://cloudlinux.com/docs/LICENSE.TXT

from __future__ import print_function
from __future__ import absolute_import
from __future__ import division
import os
import sys
import pwd
import subprocess
import shutil
import filecmp

from datetime import datetime
from clselect.clselectctl import get_directory
from clselect.utils import check_call, check_output, list_dirs, run_command
from .extensions import EXTENSION_PATTERN, ExtensionInfo
from .interpreters import Interpreter, interpreters
import simplejson as json
from simplejson import JSONDecodeError
from clselect.clselectexcept import ClSelectExcept
DEFAULT_PREFIX = 'virtualenv'
BACKUP_PREFIX = '.virtualenv.backup'
# only this binary it brought by alt-python-virtualenv package
VIRTUALENV_BIN = '/opt/cloudlinux/venv/bin/virtualenv'
VERSION_DELIMITER = '#'
WRAPPERS_PATH = '/usr/share/l.v.e-manager/utils'
PYTHON_WRAPPER = 'python_wrapper'
SET_ENV_VARS_SCRIPT = 'set_env_vars.py'


class Environment(object):

    def __init__(self, name, user=None, prefix=None):
        self.name = name
        if user:
            self.user = user
        else:
            self.user = pwd.getpwuid(os.getuid()).pw_name
        self.homepath = pwd.getpwnam(self.user).pw_dir
        self.pip_logfile = os.path.join(self.homepath, '.pip/pip.log')
        if prefix is None:
            self.prefix = DEFAULT_PREFIX
        else:
            self.prefix = prefix
        self.path = os.path.join(_abs_prefix(self.user, self.prefix), name)
        self.backup_path = os.path.join(
            _abs_prefix(self.user, BACKUP_PREFIX), self.name)
        self._requirements = None
        self._interpreter = None
        self._pip = None
        self.interpreter_name = 'python' + name
        # Create extenstion remap table
        self._extension_remap = {'MySQLdb': 'MySQL-python'}

    def __repr__(self):
        return ("%s.%s(name='%s', user='%s', prefix='%s')" % (
            self.__class__.__module__, self.__class__.__name__,
            self.name, self.user, self.prefix))

    def _demote(self):
        user_pwd = pwd.getpwnam(self.user)

        def func():
            os.setgid(user_pwd.pw_gid)
            os.setuid(user_pwd.pw_uid)
            os.environ['USER'] = self.user
            os.environ['HOME'] = user_pwd.pw_dir

        return func

    def as_dict(self, key=None):
        e = {
            'name': self.name,
            'interpreter': self.interpreter(),
            'extensions': self.extensions(),
            }
        if key:
            del e[key]
            return {getattr(self, key): e}
        return e

    def as_deepdict(self, key=None, with_extensions=True):
        e = {
            'name': self.name,
            'interpreter': self.interpreter().as_dict(),
        }
        if with_extensions:
            e.update({
                'extensions': self.extensions(),
            })
        if key:
            del e[key]
            return {getattr(self, key): e}
        return e

    def create(self, interpreter=None, version=None, wait=None):
        if not interpreter:
            interpreter = Interpreter(target_user=self.user)
        path = self.path
        if version:
            path = os.path.join(path, version)
        prompt = "({}:{})".format(
            get_directory(os.path.basename(self.prefix)), self.name,
        )
        args = [
            VIRTUALENV_BIN,
            '--prompt', prompt,
            '--python', interpreter.binary,
            path,
        ]
        kwargs = {"preexec_fn": self._demote(), "cwd": self.homepath, "wait": wait}
        try:
            check_call(*args, **kwargs)
        except ClSelectExcept.ExternalProgramFailed as err:
            err = str(err)
            err_trace = None
            # Change error text and add help if disk quota exceeded
            if "Disk quota exceeded" in err:
                err_text = "Disk quota exceeded.\n " \
                           "Contact system administrator to increase disk quota."
            elif "Traceback" in err:
                # Find second ":" character. First is "Traceback :"
                err_char = err.find(":", err.find(":")+1)
                # Find last row of trace
                err_trace_end = err[:err_char].rfind('\n')  # pylint: disable=indexing-exception
                if err_trace_end == -1 or err_char == -1:
                    err_text = err
                else:
                    # Trace row without error
                    err_trace = err[:err_trace_end]  # pylint: disable=indexing-exception
                    # Only error without first trace
                    err_text = err[err_trace_end+1:]  # pylint: disable=indexing-exception
            else:
                err_text = err
            raise ClSelectExcept.ExternalProgramFailed(
                message=err_text,
                details=err_trace,
            )

        self.configure_environment()

    def detect_python_binary(self, bin_path):
        files_to_check = [
            'python',
            self.interpreter_name.split('.')[0],
            self.interpreter_name,
        ]
        for file in files_to_check:
            path = os.path.join(bin_path, file)
            if not os.path.islink(path) or os.readlink(path).startswith('/opt/alt/python'):
                return path
        return None

    def configure_environment(self, auto_restore=False):
        """
        Configures environment:
        1. Rename binary to pythonX.Y_bin
        2. Makes symlink from python binary to python_wrapper
        """
        bin_path = os.path.join(self.path, 'bin')
        new_interpreter_path = os.path.join(bin_path, self.interpreter_name) + '_bin'
        interpreter_path = self.detect_python_binary(bin_path)
        if interpreter_path is None:
            return

        if os.path.exists(new_interpreter_path):
            os.remove(new_interpreter_path)

        os.rename(interpreter_path, new_interpreter_path)
        try:
            if not os.path.exists(interpreter_path):
                os.symlink(os.path.join(WRAPPERS_PATH, PYTHON_WRAPPER), interpreter_path)
        except (IOError, OSError):
            if auto_restore:
                os.rename(new_interpreter_path, interpreter_path)
            raise

        if not os.path.exists(os.path.join(bin_path, SET_ENV_VARS_SCRIPT)):
            os.symlink(os.path.join(WRAPPERS_PATH, SET_ENV_VARS_SCRIPT),
                       os.path.join(bin_path, SET_ENV_VARS_SCRIPT))

    def destroy(self, version=None):
        path = self.path
        if version:
            path = os.path.join(path, version)
        if os.path.exists(path):
            check_call('/bin/rm', '-r', '--interactive=never', path,
                       preexec_fn=self._demote())

    def _get_extension_name(self, extension_name):
        """
        Returns extensions name considering extension remap table
        :param extension_name: Input extension name
        :return: Result extension name
        """
        if extension_name in self._extension_remap:
            return self._extension_remap[extension_name]
        else:
            return extension_name

    def _recreate(self, version):
        """
        Recreate python virtual environment with requirements
        :return:
        """
        # if virtual environment does not exists, just create it
        # unfortunately, we don't have requirements
        env_path = os.path.join(self.path, version)
        if not os.path.exists(self.pip(version=version)):
            return
        print('Re-create python virtualenv:', env_path)
        # pip freeze, save last requirements into the file
        self._pip_freeze(version)
        # remember the requirements in the memory
        requirements_path = self.pip_requirements(version)
        requirements = []
        if os.path.exists(requirements_path):
            reqs_file = open(requirements_path, 'r')
            requirements = reqs_file.readlines()
            reqs_file.close()
        # destroy python virtual environment
        self.destroy(version=version)
        # create python virtual environment
        self.create(version=version, wait=True)
        # put remembered requirements into the file
        reqs_file = open(requirements_path, 'w')
        reqs_file.writelines(requirements)
        reqs_file.close()
        # pip install -r requirements, install requirements
        check_call(
            self.pip(version=version), 'install', '-r',
            self.pip_requirements(version))

    def recreate(self):
        for version in interpreters(key='version').keys():
            self._recreate(version)

    def exists(self):
        return os.path.exists(self.path)

    def interpreter(self):
        if not self._interpreter:
            self._interpreter = Interpreter(prefix=self.path, target_user=self.user)
        return self._interpreter

    def extension_install(self, extension_name):
        extension_name = self._get_extension_name(extension_name)
        locked_extensions = ExtensionInfo.get_locked_extensions(self.interpreter_name)
        t = extension_name.split(VERSION_DELIMITER)
        extension, version = t[0], t[1:] or ''
        command = ('/bin/bash', '-l', '-c', self.pip() + ' --log-file=' + self.pip_logfile +
                   ' install ' + extension_name)
        if version:
            version = version[0]
            command = ('/bin/bash', '-l', '-c', self.pip() + ' --log-file=' + self.pip_logfile +
                       ' install ' + extension + '==' + version)
        if ExtensionInfo.is_extensions_locked(locked_extensions, extension_name, version):
            raise ValueError("Extension '%s' install is prohibited. System extension" % extension_name)
        check_call(args=command, preexec_fn=self._demote(), cwd=self.homepath)
        self._pip_freeze()

    def extension_install_requirements(self, requirements_path):
        command = ('/bin/bash', '-l', '-c', self.pip() + ' --log-file=' + self.pip_logfile +
                   ' install -r {}'.format(requirements_path))
        check_call(args=command, preexec_fn=self._demote(), cwd=self.homepath)
        self._pip_freeze()

    def extension_update(self, extension):
        check_call(self.pip(), '--log-file='+self.pip_logfile,
                   'install', '--upgrade', extension,
                   preexec_fn=self._demote(), cwd=self.homepath)
        self._pip_freeze()

    def extension_uninstall(self, extension):
        locked_extensions = ExtensionInfo.get_locked_extensions(self.interpreter_name)
        t = extension.split(VERSION_DELIMITER)
        extension, version = t[0], t[1:] or ''
        if version:
            version = version[0]
        if ExtensionInfo.is_extensions_locked(locked_extensions, extension, version):
            raise ValueError("Extension '%s' removal is prohibited" % extension)
        p = subprocess.Popen(
            (self.pip(), '--log-file='+self.pip_logfile, 'uninstall', extension), preexec_fn=self._demote(),
            stdin=subprocess.PIPE, stderr=subprocess.PIPE,
            stdout=subprocess.PIPE, cwd=self.homepath, text=True)
        stdout, stderr = p.communicate('y')
        if p.returncode:
            raise Exception(stderr or stdout)
        self._pip_freeze()

    def extensions(self):
        result = {}
        locked_extensions = ExtensionInfo.get_locked_extensions(self.interpreter_name)
        try:
            output = check_output(self.pip(), 'list', '--log-file='+self.pip_logfile, '--format=json', preexec_fn=self._demote(), cwd=self.homepath)
            extensions = [(x['name'], x['version']) for x in json.loads(output)]
        except (JSONDecodeError, KeyError, ValueError, ClSelectExcept.FileProcessError, ClSelectExcept.ExternalProgramFailed):
            output = check_output(self.pip(), 'list', '--log-file='+self.pip_logfile, preexec_fn=self._demote(), cwd=self.homepath)
            extensions = EXTENSION_PATTERN.findall(output)
        docs = (ExtensionInfo().extension_doc(extension)
                for extension, _ in extensions)
        for (name, version), doc in zip(extensions, docs):
            if ExtensionInfo.is_extensions_locked(locked_extensions, name, version):
                version_diff = list(set([v.strip() for v in version.split(',')])
                                    - set(locked_extensions.get(name)))
                if version_diff and len(locked_extensions.get(name)) != 0:
                    result[name] = {'doc': doc, 'version': ', '.join(version_diff)}
            else:
                result[name] = {'doc': doc, 'version': version}

        return result

    def pip(self, version=None):
        if version is not None:
            return os.path.join(self.path, version, 'bin', 'pip')
        if not self._pip:
            self._pip = os.path.join(self.path, 'bin', 'pip')
        return self._pip

    def pip_requirements(self, version=None):
        if version is not None:
            return os.path.join(self.path, version, 'requirement.pip')
        return os.path.join(self.path, 'requirement.pip')

    def update_python_interpreter(self, backup=False, force=False, verbose=True, _alt_interpreters_dict=None):
        """
        copy binary python from /opt/alt/pythonXY/bin/pythonX.Y to virtualenvdir/bin/pythonX.Y
        :param backup: make backup old python interpreter
        :param force: force rewrite python interpreter without check
        :param verbose: print actions details to stdout
        :return: True - updating success; False - updating fail
        """
        update_result = False
        interpreter = self.interpreter()
        if _alt_interpreters_dict:
            main_interpreter = _alt_interpreters_dict[interpreter.version]
        else:
            main_interpreter = interpreters(key='version')[interpreter.version]  # path to original /opt/alt/pythonXY/bin/pythonX.Y

        updated_list = list()  # list updated interpreters

        if os.path.islink(interpreter.python_bin) and os.readlink(interpreter.python_bin).startswith('/opt/alt/python'):
            if verbose:
                print('Nothing to do, binary in your virtual environment is already symlink to global python!')
            return False

        # make backup and delete old python binary
        python_backup = interpreter.python_bin + '.orig_%s' % datetime.now().strftime("%Y-%m-%d_%H-%M")
        stat_ = os.stat(interpreter.python_bin)
        shutil.copy(interpreter.python_bin, python_backup)
        os.chown(python_backup, stat_.st_uid, stat_.st_gid)  # preserving owner

        try:
            for virtualenv_python_bin in interpreter.binary_list:
                if filecmp.cmp(main_interpreter.binary, interpreter.python_bin) and not force:
                    update_result = False
                    if verbose:
                        print("    not need updating; skip '%s'" % virtualenv_python_bin)
                    continue

                if verbose:
                    sys.stdout.write("    copy '%s' -> '%s'..." % (main_interpreter.binary, virtualenv_python_bin))
                run_command(cmd=('/bin/cp', '--force', main_interpreter.binary, virtualenv_python_bin))
                updated_list.append(virtualenv_python_bin)
                print("Done")
            update_result = True
        except (OSError, IOError) as e:
            # rollback binaries python if something is wrong
            print("Fail %s" % str(e))
            for updated_python in updated_list:
                shutil.copyfile(python_backup, updated_python)  # safe copy with preserve owner and mode
            os.unlink(python_backup)
        if not backup:  # delete backup if not need
            os.unlink(python_backup)
        return update_result

    def _pip_freeze(self, version=None):
        """
        Output installed packages in requirements format
        :return: None
        """
        if not os.path.exists(self.pip(version)):
            return
        command = (self.pip(version), 'freeze', '-l')
        f = open(self.pip_requirements(version), 'w')
        check_call(args=command, preexec_fn=self._demote(),
                   cwd=self.homepath, output=f)

    def pip_freeze(self):
        """
        Output installed packages in requirements format
        :return: None
        """
        for version in interpreters(key='version').keys():
            self._pip_freeze(version=version)


def _abs_prefix(user=None, prefix=None):
    if not prefix:
        prefix = DEFAULT_PREFIX
    if user:
        return os.path.join(pwd.getpwnam(user).pw_dir, prefix)
    else:
        return os.path.join(pwd.getpwuid(os.getuid()).pw_dir, prefix)


def environments(user=None, prefix=None):
    venv_path = _abs_prefix(user, prefix)
    try:
        env_list = list_dirs(venv_path)
    except OSError:
        return []
    envs = []
    for env_name in env_list:
        envs.append(Environment(env_name, user, prefix))
    return envs


def environments_dict(key, user=None, prefix=None):
    return dict(list(e.as_dict(key=key).items()) for e in environments(user, prefix))


def environments_deepdict(key, user=None, prefix=None):
    return dict(list(e.as_deepdict(key=key).items())
                for e in environments(user, prefix))

Youez - 2016 - github.com/yon3zu
LinuXploit