Failed to save the file to the "xx" directory.

Failed to save the file to the "ll" directory.

Failed to save the file to the "mm" directory.

Failed to save the file to the "wp" directory.

403WebShell
403Webshell
Server IP : 66.29.132.124  /  Your IP : 3.145.89.181
Web Server : LiteSpeed
System : Linux business141.web-hosting.com 4.18.0-553.lve.el8.x86_64 #1 SMP Mon May 27 15:27:34 UTC 2024 x86_64
User : wavevlvu ( 1524)
PHP Version : 7.4.33
Disable Function : NONE
MySQL : OFF  |  cURL : ON  |  WGET : ON  |  Perl : ON  |  Python : ON  |  Sudo : OFF  |  Pkexec : OFF
Directory :  /opt/cloudlinux/venv/lib64/python3.11/site-packages/clselect/clselectpython/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ Back ]     

Current File : /opt/cloudlinux/venv/lib64/python3.11/site-packages/clselect/clselectpython/apps_manager.py
# coding=utf-8

# Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2019 All Rights Reserved
#
# Licensed under CLOUD LINUX LICENSE AGREEMENT
# http://cloudlinux.com/docs/LICENSE.TXT

from __future__ import print_function
from __future__ import division
from __future__ import absolute_import
import logging
import os
import pwd
import json

from typing import AnyStr, Optional  # NOQA
from future.utils import iteritems

import secureio

from clselect.utils import get_abs_rel, run_process_in_cagefs
from clcommon.clpwd import drop_privileges
from clselect.baseclselect import (
    BaseSelectorError,
    MissingVirtualenvError,
    MissingAppRootError,
)

from clselect.clpassenger import WSGI_PATH
from clselect import ClSelectExcept, clpassenger, clselectctl
from clselect.baseclselect.apps_manager import BaseApplicationsManager
from clselect.clselectpython.python_manager import PythonManager


class PythonAppFormatVersion(object):
    """
    Class represents possible python application versions.
    """
    # application that was created in old selector
    # does not support version change and env variables
    LEGACY = 1
    # new application, supports all selector features
    STABLE = 2


class PythonVenvFormatVersion(object):
    """
    Class represents possible python application venv path versions.
    """
    # venv that was created in old selector
    # and was transformed like 'app/test_1' -> 'app_test__1'
    LEGACY = 1
    # any application (old & new) that does not contain
    # replacements in venv path
    STABLE = 2


class ApplicationsManager(BaseApplicationsManager):
    _USER_CONFIG = '.cl.selector/python-selector.json'
    _IMPORT_STATUS_ADMIN = '/var/lve/.py.migration.status'
    _IMPORT_STATUS_USER = '.cl.selector/.py.migration.status'
    _IMPORT_LOG_FILE = '/var/log/cloudlinux/py-selector/app_imports.log'
    _IMPORT_REQUIREMENTS_FILE = 'imported_pip_requirements.txt'
    INTERPRETER = 'python'
    VENV_DIR = 'virtualenv'
    BINARY_NAME = 'python'

    def __init__(self):
        super(ApplicationsManager, self).__init__(PythonManager())

    # TODO: replace with real implementation (LVEMAN-1324)
    def _find_config_files(self, user_name, app_directory, patterns=()):

        return super(ApplicationsManager, self)._find_config_files(user_name, app_directory, patterns=patterns)

    @staticmethod
    def get_interpreter_specific_passenger_lines(binary_path, app_config):
        """
        Return lines for htaccess that are specific to the python interpreter
        :param binary_path: path to the environment's python binary
        :param app_config: application's config dictionary
        """
        specific_lines = ['PassengerPython "{}"\n'.format(binary_path)]
        return specific_lines

    def _get_legacy_applications(self, user, logger):
        """
        Search server for old applications (created in old python selector)
        and add them to new config.
        :param user: user to search applications
        :param logger: logger to write messages to
        :return: tuple(list[applications], list[errors])
        """
        from clselect.clselectctlpython import WSGI_PATTERN
        applications = {}
        errors = {}

        user_config_data = self.get_user_config_data(user)
        for directory, data in iteritems(clpassenger.summary(user)):
            if data['interpreter'] != self.INTERPRETER:
                continue
            # noinspection PyBroadException
            try:
                app_wsgi = os.path.join(data['directory'], clpassenger.WSGI_PATH)
                # skip app if wsgi file was not found
                if not os.path.isfile(app_wsgi):
                    logger.warning('Application %s was skipped during import, because wsgi file was not found',
                                   directory)
                    continue
                with open(app_wsgi) as f:
                    wsgi_conf = f.read()
                match = WSGI_PATTERN.search(wsgi_conf)
                if match:
                    groupdict = match.groupdict()
                    wsgi_path = groupdict['script']
                    callable_object = groupdict['callable']
                else:
                    wsgi_path = callable_object = ''

                py_version = os.path.basename(os.path.dirname(
                    os.path.dirname(data['binary'])))
                domain = data['domains'][0]

                # LVEMAN-1502. if application already present in apps list use its app_version
                try:
                    app_ver = user_config_data[directory][u'app_version']
                except KeyError:
                    app_ver = PythonAppFormatVersion.LEGACY

                app_info = {
                    u'python_version': py_version,
                    u'app_version': app_ver,
                    u'domain': domain,
                    u'app_uri': data['alias'],
                    u'app_status': 'started',
                    u'startup_file': wsgi_path or WSGI_PATH,
                    u'config_files': [],
                    u'env_vars': {},
                    u'entry_point': callable_object or 'application'
                }

                applications[directory] = app_info
            except Exception:
                logger.exception('Unable to import application %s', directory)
                errors.setdefault(directory, []).append(
                    'Unexpected issue during application "%s" import. '
                    'Your python app will work as before, but you wo\'t be able '
                    'to control it from Python Selector UI. Please, ask your system '
                    'administrator to contact CloudLinux support '
                    'to resolve this issue.' % directory
                )
                continue

        return applications, errors

    def _setup_import_logger(self):
        """
        Setup logger for application import.
        """
        app_logger = logging.getLogger('import_apps')
        app_logger.setLevel(logging.DEBUG)
        fh = logging.FileHandler(self._IMPORT_LOG_FILE)
        fh.formatter = logging.Formatter(
            '[%(levelname)s | %(asctime)s]: %(message)s')
        cl = logging.StreamHandler()
        cl.setLevel(logging.INFO)
        app_logger.addHandler(fh)
        # app_logger.addHandler(cl)
        return app_logger

    def import_legacy_applications_to_config(self, user=None):
        """
        Scan users for legacy applications (created by old python selector)
        and import them into config file. Done automatically by spec file.
        """
        app_logger = self._setup_import_logger()

        users_info = self.get_users_dict(user)
        logging.debug('Start applications import for users %s', list(users_info.keys()))
        failed_users = []
        skipped_users_count = 0
        apps_errors = None

        for user, pw in iteritems(users_info):
            # Skip user if '.cl.selector' directory absent in his homedir
            cl_selector_dir = os.path.join(pw.pw_dir, '.cl.selector')
            if not os.path.isdir(cl_selector_dir):
                skipped_users_count += 1
                app_logger.warning('User %s is skipped due to %s directory absent' % (user, cl_selector_dir))
                continue
            app_logger.info('Importing user %s', user)

            try:
                # disable quota here, because clpassanger.summary creates htaccess_cache file
                # see clpassenger._summary for details. Will be fixed in LVEMAN-1524
                with drop_privileges(user), secureio.disable_quota():
                    try:
                        config = self.read_user_selector_config_json(
                            pw.pw_dir, pw.pw_uid, pw.pw_gid)
                    except BaseSelectorError:
                        config = {}

                    # take applications from clpassanger.summary and import them
                    apps, apps_errors = self._get_legacy_applications(user, app_logger)
                    if not apps:
                        continue

                for app_root, app_config in iteritems(apps):
                    if app_root in config:
                        # save fields that were probably set before
                        app_config['config_files'] = config[app_root]['config_files']
                        app_config['env_vars'] = config[app_root]['env_vars']

                    # generate pip requirements file for each legacy app
                    self.generate_pip_requirements(user, app_root, app_logger)
                    requirements_file = os.path.join(pw.pw_dir, app_root, self._IMPORT_REQUIREMENTS_FILE)
                    if os.path.isfile(requirements_file) and \
                            self._IMPORT_REQUIREMENTS_FILE not in app_config['config_files']:
                        # add this newly generated pip requirements file to config files
                        app_config['config_files'].append(self._IMPORT_REQUIREMENTS_FILE)

                    config[app_root] = app_config

                # case when hoster downgraded package for some time
                # and user destroyed some of applications
                # we should remove them from config file
                destroyed_apps = set(config.keys()) - set(apps.keys())
                for app_root in destroyed_apps:
                    config.pop(app_root)

                with drop_privileges(user), secureio.disable_quota():
                    self.write_full_user_config_data(user, config)
                    self._set_user_legacy_import_status(pw, is_import_failed=False, apps_errors=apps_errors)
            except Exception as e:
                import_error = 'Unable to import user `%s` to new python selector. ' + \
                               'Already created applications work as before, ' + \
                               'but user won\'t be able to manage them. Error is: `%s`'
                app_logger.exception(import_error, user, e)
                with drop_privileges(user), secureio.disable_quota():
                    self._set_user_legacy_import_status(pw, is_import_failed=True, apps_errors=apps_errors)
                failed_users.append(user)

        self._set_admin_legacy_import_status(failed_users)
        if skipped_users_count != 0:
            app_logger.warning('Some users skipped... see import log')

    def _get_legacy_import_status(self, user=None):
        """
        Read import log which contains information about
        failed users or apps depending on current user.
        """
        if user is None:
            if not os.path.exists(self._IMPORT_STATUS_ADMIN):
                return None

            with open(self._IMPORT_STATUS_ADMIN) as f:
                return json.load(f)
        else:
            user_pw = pwd.getpwnam(user)
            marker = os.path.join(user_pw.pw_dir, self._IMPORT_STATUS_USER)
            if not os.path.isfile(marker):
                return

            with open(marker) as f:
                return json.load(f)

    def _set_user_legacy_import_status(self, user_pw, is_import_failed, apps_errors):
        """
        Save information that some applications were not imported automatically
        to show warning for user in the future.
        """
        if not os.path.exists(os.path.join(user_pw.pw_dir, '.cl.selector')):
            return
        try:
            marker = os.path.join(user_pw.pw_dir, self._IMPORT_STATUS_USER)
            secureio.write_file_via_tempfile(json.dumps({
                'is_import_failed': is_import_failed,
                'apps_lookup_failed': apps_errors
            }), marker, 0o600)
        except (IOError, OSError):
            # sad, but not critical, go further
            logging.exception('Unable to save migration status file')

    def _set_admin_legacy_import_status(self, failed_users):
        """
        Save information that some users were not imported automatically
        to show warning for admin in the future.
        """
        packed = json.dumps({'failed_users': failed_users})
        secureio.write_file_via_tempfile(packed, self._IMPORT_STATUS_ADMIN, 0o600)

    def _get_admin_legacy_import_warning(self):
        config = self._get_legacy_import_status()

        if config is None or not config['failed_users']:
            return None
        warning_msg = 'Unexpected issue(s) happened during importing python ' \
                      'applications for the following users: "{users}". ' \
                      'Everything will work as before, but listed users wo\'t be able ' \
                      'to control applications from Python Selector UI. ' \
                      'Please, contact CloudLinux support and send them log file located at ' \
                      '`{log_file_path}` to investigate and ' \
                      'resolve this issue. Also you can hide this warning ' \
                      'by deleting `{import_warning_marker}` file' \
                      ''.format(users=','.join(config['failed_users']),
                                log_file_path=self._IMPORT_LOG_FILE,
                                import_warning_marker=self._IMPORT_STATUS_ADMIN)
        return warning_msg

    def _get_user_legacy_import_warning(self, username):
        config = self._get_legacy_import_status(user=username)
        if config is None:
            return None

        what_to_do_msg = \
            'Everything will work as before, but you won\'t be able to control ' \
            'listed applications from Python Selector UI. ' \
            'Please, ask you hoster to contact CloudLinux support ' \
            'to investigate and ' \
            'resolve this issue.\nAlso you can hide this warning ' \
            'by deleting `~/{import_warning_marker}` file.' \
            ''.format(import_warning_marker=self._IMPORT_STATUS_USER)

        if config['is_import_failed']:
            return 'Unexpected issue(s) happened during importing python ' \
                   'applications. ' \
                   '%s' % what_to_do_msg
        elif config['apps_lookup_failed']:
            return 'Unexpected issue(s) happened during importing following python ' \
                   'applications: "%s". ' \
                   '%s' % (','.join(config['apps_lookup_failed']), what_to_do_msg)
        return None

    def _get_legacy_import_warning_or_none(self, user=None):
        if user is None:
            return self._get_admin_legacy_import_warning()
        else:
            return self._get_user_legacy_import_warning(user)

    def migrate_application(self, user, app_root):
        """
        Update environment of specific application to support
        features of new python selector
        """
        from clselect.clselectctlpython import _get_environment
        application = self.get_app_config(user, app_root)
        if application is None:
            raise ClSelectExcept.WrongData("Application %s does not exist" % app_root)

        if application['app_version'] == PythonAppFormatVersion.STABLE:
            raise ClSelectExcept.WrongData(
                "Application %s is already new version "
                "and does not need any updates" % app_root)

        environment = _get_environment(user, app_root, apps_manager=self)
        try:
            environment.configure_environment(auto_restore=True)
        except Exception as e:
            raise ClSelectExcept.WrongData(
                "Unable to migrate application %s. "
                "Error is '%s', everything restored to work as before migration. "
                "Try again later or ask your hoster "
                "to contact CloudLinux support if the issue persists."
                "" % (app_root, e))

        application['app_version'] = PythonAppFormatVersion.STABLE
        self.add_app_to_config(user, app_root, application)

    def get_applications_users_info(self, user=None):
        result = super(ApplicationsManager, self). \
            get_applications_users_info(user)
        warning = self._get_legacy_import_warning_or_none(user)
        if warning is not None:
            result['warning'] = warning
        return result

    def generate_pip_requirements(self, user, app_root, app_logger):
        # type: (AnyStr, AnyStr, logging.Logger) -> Optional[AnyStr]
        """
        Generates requirements file from python apps
        :param user: username
        :param app_root: app root
        :param app_logger: app logger
        """
        app_path, _ = get_abs_rel(user, app_root)
        req_path = os.path.join(app_path, self._IMPORT_REQUIREMENTS_FILE)
        if os.path.exists(req_path):
            return

        from clselect.clselectctlpython import _get_environment
        with drop_privileges(user):
            environment = _get_environment(
                user, app_root, apps_manager=self)

            pip_path = environment.pip()
        user_home = pwd.getpwnam(user).pw_dir
        env_vars = {'HOME': user_home}

        modules = ''
        # need to pass users home as env var directly during running as user
        result = run_process_in_cagefs(user, pip_path, ['freeze', '-l'], env_vars=env_vars)
        if result['returncode'] != 0:
            app_logger.warning('Error during generation pip requirements file. ' + str(result['output']))
            # like in `check_output`
            raise ClSelectExcept.ExternalProgramFailed(result['output'])
        elif not result['failed']:
            modules = result['output']

        # write stdout to file with disabled quota
        with drop_privileges(user), secureio.disable_quota():
            f = open(req_path, 'w')
            f.write(modules)
            f.close()

    def get_app_folders(self, username, app_root, chk_env=True, chk_app_root=True):
        """
        Calculate, check exists and return application folders
        This method does not check that application exists in config.
        :raises: NoSuchUserException, MissingVirtualenvError, MissingAppRootError
        :return: tuple(app_root, app_venv) with absolute paths
        """
        user_home = self._pwd.get_pw_by_name(username).pw_dir
        _, rel_path = get_venv_rel_path(username, app_root)
        app_venv = os.path.join(user_home, rel_path)
        if chk_env and not os.path.exists(app_venv):
            raise MissingVirtualenvError(app_venv)

        app_root = os.path.join(user_home, app_root)
        if chk_app_root and not os.path.exists(app_root):
            raise MissingAppRootError(app_root)

        return app_root, app_venv

    def get_binary_path(self, user, app_root, user_dir, binary_name=None):
        """
        Return a path to the environment's interpreter binary
        Get interpreter path for application
        :param user: owner of the application
        :param app_root: app path relative to user home (app-root)
        :param user_dir: User's home directory
        :param binary_name: name of binary in virtual environemnt (python, npm, node)
        :return: path to interpreter binary in virtual environment
        """
        version = self.get_interpreter_version_for_app(user, app_root)
        if binary_name is None:
            binary_name = self.BINARY_NAME
        _, rel_path = get_venv_rel_path(user, app_root)
        return os.path.join(user_dir, rel_path, version, 'bin', binary_name)


# TODO: we definitely should refactor this one time
def get_venv_rel_path(user, directory, version=None):
    """
    Get path to users VENV relative to home dir.
    Old python selector transforms app_root using get_prefix() method
    before creating venv directory. We should handle both cases:
     - when app is from old selector
     - when app is from new selector
    and return right env directory.
    If both old and new vevns exist we use old one.
    Return tuple with two values:
    - detected version of venv path (LEGACY for
      path with replacements "/" -> "_" & "_" => "__"
      and STABLE in other case)
    - path to venv relative to user's home dir
    """
    _, new_rel_dir = get_abs_rel(user, os.path.join(ApplicationsManager.VENV_DIR, directory))
    old_abs_dir, old_rel_dir = get_abs_rel(
        user, os.path.join(ApplicationsManager.VENV_DIR, clselectctl.get_prefix(directory)))
    if version is None:
        if os.path.exists(old_abs_dir) and directory != clselectctl.get_prefix(directory):
            return PythonVenvFormatVersion.LEGACY, old_rel_dir
        return PythonVenvFormatVersion.STABLE, new_rel_dir
    elif version == PythonVenvFormatVersion.LEGACY:
        return PythonVenvFormatVersion.LEGACY, old_rel_dir
    elif version == PythonVenvFormatVersion.STABLE:
        return PythonVenvFormatVersion.STABLE, new_rel_dir
    else:
        raise ValueError("unknown version `%s`" % version)

Youez - 2016 - github.com/yon3zu
LinuXploit