Failed to save the file to the "xx" directory.

Failed to save the file to the "ll" directory.

Failed to save the file to the "mm" directory.

Failed to save the file to the "wp" directory.

403WebShell
403Webshell
Server IP : 66.29.132.124  /  Your IP : 3.143.7.53
Web Server : LiteSpeed
System : Linux business141.web-hosting.com 4.18.0-553.lve.el8.x86_64 #1 SMP Mon May 27 15:27:34 UTC 2024 x86_64
User : wavevlvu ( 1524)
PHP Version : 7.4.33
Disable Function : NONE
MySQL : OFF  |  cURL : ON  |  WGET : ON  |  Perl : ON  |  Python : ON  |  Sudo : OFF  |  Pkexec : OFF
Directory :  /opt/cloudlinux/venv/lib64/python3.11/site-packages/clselect/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ Back ]     

Current File : /opt/cloudlinux/venv/lib64/python3.11/site-packages/clselect/clselectctlpython.py
# -*- coding: utf-8 -*-

# Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2019 All Rights Reserved
#
# Licensed under CLOUD LINUX LICENSE AGREEMENT
# http://cloudlinux.com/docs/LICENSE.TXT

from __future__ import print_function
from __future__ import absolute_import
from __future__ import division
import getopt
import glob
import os
import re
import shutil
import sys
import pwd
import fnmatch
import secureio
import traceback
import requests
import requests.packages.urllib3.exceptions as urllib_exceptions                        #pylint: disable=E0401

from requests.exceptions import ConnectionError
from future.utils import iteritems

from . import clpassenger
from . import clselectctl
from . import utils
import cldetectlib as detect

from clcommon.cpapi import userdomains, docroot, cpusers, CP_NAME
from clcommon.cpapi.cpapiexceptions import NoDomain, NotSupported
from clcommon.utils import mod_makedirs

from .clselectpython.apps_manager import (
    ApplicationsManager,
    PythonAppFormatVersion,
    get_venv_rel_path
)
from .clselectexcept import ClSelectExcept, BaseClSelectException
from .clselectprint import clprint
from .clselectpythonuser import extensions, environments, interpreters
from .baseclselect import APP_STARTED_CONST, APP_STOPPED_CONST


DEFAULT_STARTUP_FILE = 'passenger_wsgi.py'
DEFAULT_APP_STATE = u'started'
DEFAULT_APP_ENTRYPOINT = 'application'

WSGI_TEMPLATE = r'''import imp
import os
import sys


sys.path.insert(0, os.path.dirname(__file__))

wsgi = imp.load_source('wsgi', '%s')
application = wsgi.%s
'''

_CONFIGURABLE_WSGI_TEMPLATE = r'''import os
import sys


sys.path.insert(0, os.path.dirname(__file__))


def %s(environ, start_response):
    start_response('200 OK', [('Content-Type', 'text/plain')])
    message = 'It works!\n'
    version = 'Python v' + sys.version.split()[0] + '\n'
    response = '\n'.join([message, version])
    return [response.encode()]
'''


WSGI_PATTERN = re.compile("imp\.load_source\(.+, '(?P<script>.+)'\)\s+"
                          "application.+\.(?P<callable>.+)")


def usage():
    print('Warning: selectorctl utility for --interpreter python is deprecated. '
          'Only update-interpreter option is still possible to use. '
          '\nPlease, use cloudlinux-selector utility to manage your python applications')
    print(' -v | --version VERSION                  : Specify alternative version')
    print(' -u | --user USERNAME                    : Username')
    print('      --domain DOMAIN                    : Domain (or subdomain); users main domain as default')
    print(' -l | --list                             : List alternatives for interpreter')
    print(' -G | --list-extensions                  : List global set of packages')
    print(' -K | --list-extensions-version          : List version set of packages')
    print(' -s | --user-summary                     : List user summary of webapps')
    print(' -y | --create-webapp                    : Create user webapp')
    print('    | --update-interpreter               : Update binary python in virtual environment; you can use with ')
    print('                                           --user=* for all users; application name as default * (all app)')
    print(' -n | --destroy-webapp                   : Destroy user webapp')
    print(' -f | --relocate-webapp                  : Change webapp directory (files must be moved manually)')
    print(' -F | --transit-webapp                   : Change webapp domain/alias')
    print(' -Z | --restart-webapp                   : Restart webapp')
    print(' -t | --stop-webapp                      : Stop webapp')
    print(' -c | --user-current                     : Show currently selected alternative for user webapp')
    print(' -b | --set-user-current                 : Set alternative as user webapp default')
    print(' -e | --enable-user-extensions PACKAGES  : Install comma-separated list of packages for user webapp')
    print(' -d | --disable-user-extensions PACKAGES : Uninstall comma-separated list of packages for user webapp')
    print("                                         : Use '-' (minus) for all packages")
    print(' -r | --replace-user-extensions PACKAGES : Update comma-separated list of packages for user webapp')
    print("                                         : Use '-' (minus) for all packages")
    print('      --recreate-virtualenv              : Recreate python virtual environment')
    print('      --freeze-requirements              : Freeze virtual environment installed packages')
    print(' -g | --list-user-extensions             : List installed packages for user webapp')
    print(' -o | --setup-wsgi WSGI[:CALLABLE]       : Setup wsgi')
    print(' -p | --print-summary                    : If specified along with setting an alternative ')
    print('                                           prints user summary')
    print(' -j | --json                             : Print data as JSON')
    print('    | --verbose                          : show detail information about action')


def _create_environment(user, directory, version, env_name=None, use_existing=True):
    """
    Create new environment (or use existing if use_existing=True)
    :param user: unix user name that app belongs to
    :param directory: app root
    :param version: python version
    :param env_name: DEPRECATED
    :param use_existing: do not raise exception if env already exists
    """
    _, prefix = get_venv_rel_path(user, directory)
    if not env_name:
        env_name = version
    environment = environments.Environment(env_name, user, prefix)
    if environment.exists() and not use_existing:
        raise ClSelectExcept.EnvironmentAlreadyExists(environment.path)
    if not environment.exists():
        try:
            interpreter = interpreters.interpreters(key='version')[version]
        except KeyError:
            raise ClSelectExcept.NoSuchAlternativeVersion(version)
        environment.create(interpreter)
    return environment


def _get_environment(user, directory, app_summary=None, apps_manager=None):
    if apps_manager is None:
        apps_manager = ApplicationsManager()
    try:
        if app_summary:
            binary = app_summary['binary']
        else:
            binary = apps_manager.get_binary_path(user, directory, pwd.getpwnam(user).pw_dir)
    except ClSelectExcept.ConfigMissingError:
        # this method is called in many places,
        # there could be case when config is still absent or empty
        app_summary = clpassenger.summary(user).get(directory)
        if app_summary is None:
            raise ClSelectExcept.WebAppError('No such application (or application not configured) "%s"' % directory)
        binary = app_summary['binary']
    env_name = os.path.basename(os.path.dirname(os.path.dirname(binary)))
    _, prefix = get_venv_rel_path(user, directory)
    environment = environments.Environment(env_name, user, prefix)
    return environment


def create(user, directory, alias, version=None, doc_root=None,
           env_vars=None, startup_file=None, domain_name=None, entry_point=None,
           apps_manager=None, passenger_log_file=None):
    """
    Create new python application
    :param user: unix user name
    :param directory: application path in user's home (app-root)
    :param alias: alias (app-uri)
    :param version: version of interpreter
    :param doc_root: doc_root
    :param env_vars: dict with environment variables
    :param startup_file: main application file
    :param domain_name: domain name
    :param entry_point: application entry point
    :param apps_manager: Application Manager. Class that responsible
           for gathering and writing information about applications
    :param passenger_log_file: Passenger log filename to write to app's .htaccess
    :return: None
    """

    if apps_manager is None:
        apps_manager = ApplicationsManager()

    if version is None:
        raise ClSelectExcept.WrongData('Not passed version as argument')

    alias = clselectctl.get_alias(alias)

    user_summary = clpassenger.summary(user)
    try:
        app_summary = utils.get_using_realpath_keys(user, directory, user_summary)
    except KeyError:
        pass
    else:
        raise ClSelectExcept.AppRootBusy(app_summary['directory'])

    environment = _create_environment(user, directory, version, use_existing=False)
    binary = environment.interpreter().binary

    clpassenger.configure(user, directory, alias, apps_manager.INTERPRETER, binary, doc_root=doc_root,
                          passenger_log_file = passenger_log_file)
    clpassenger.restart(user, directory)
    # create new startup file with supplied name
    if entry_point:
        content = _CONFIGURABLE_WSGI_TEMPLATE % entry_point
    else:
        content = clpassenger.WSGI_TEMPLATE
    if startup_file and startup_file != DEFAULT_STARTUP_FILE:
        # Custom startup file, use existing mechanism to set new startup file and entrypoint
        startup_file_name = _get_full_path_to_startup_file(user, directory, startup_file)
        # to be able set /some/not/existing/dir for startup file
        dir_name = os.path.dirname(startup_file_name)
        if not os.path.exists(dir_name):
            mod_makedirs(dir_name, 0o755)
        # save new startup file
        utils.file_write(startup_file_name, content)
        # Set startup file and entrypoint (if need)
        if entry_point:
            setup_wsgi(user, directory, startup_file_name, entry_point)
    else:
        # No startup file provided, use default
        startup_file = DEFAULT_STARTUP_FILE
        # Set new startup_file and/or entrypoint
        # build startup_file full path - setup_wsgi function requirement
        startup_file_name = _get_full_path_to_startup_file(user, directory, startup_file)
        # save new startup file
        utils.file_write(startup_file_name, content)
        if entry_point:
            # Set new entrypoint
            setup_wsgi(user, directory, startup_file_name, entry_point)

    if not env_vars:
        env_vars = {}

    if not entry_point:
        # if entrypoint not supplied, use default in config.
        # We don't need to set it in the application because it the same with passenger settings
        entry_point = DEFAULT_APP_ENTRYPOINT
    # Add application to user's config file
    if not domain_name:
        # if domain name not defined - try to determine it
        summary_data = clpassenger.summary(user)
        app_summary = utils.get_using_realpath_keys(user, directory, summary_data)
        domain_name = app_summary['domain']
    # TODO: use namedtuple or just class instead of plain dicts
    app_data = {
        u'python_version': version,
        u'app_version': PythonAppFormatVersion.STABLE,
        u'domain': domain_name,
        u'app_uri': alias,
        u'app_status': DEFAULT_APP_STATE,
        u'startup_file': startup_file,
        u'config_files': [],
        u'env_vars': env_vars,
        u'entry_point': entry_point
    }
    if passenger_log_file:
        app_data[u'passenger_log_file'] = passenger_log_file
    apps_manager.add_app_to_config(user, directory, app_data)
    try:
        apps_manager.add_env_vars_for_htaccess(user, directory, env_vars, doc_root)
    except Exception as err:
        clprint.print_diag('text', {'status': 'ERROR', 'message': str(err)})


def _ensure_version_enabled(version, user):
    """
    Check whether particular interpreter version is enabled and raises
    exception if not
    :param user: user to include in exception
    """
    from .clselectpython import python_manager
    if not python_manager.PythonManager().is_version_enabled(version):
        raise ClSelectExcept.UnableToSetAlternative(
            user, version, 'version is not enabled')


def check_response_from_webapp(domain, alias, action=None):
    """
    Check response from user's webapp before and after calling action.
    Also compare both responses
    :param domain: domain associated with webapp
    :param alias: URI associated with webapp
    :param action: called action, that make something with webapp: install modules, transit it, etc
    :return: None
    """

    app_is_inaccessible_before = 'Web application is inaccessible by its address "%s". The operation wasn\'t performed.'
    app_is_inaccessible_after = 'The operation was performed, but check availability of application has failed. ' \
                                'Web application is inaccessible by its address "%s" after the operation.'
    app_is_broken = 'The operation was performed, but check availability of application has failed. ' \
                    'Web application responds, but its return code "%s" or ' \
                    'content type before operation "%s" doesn\'t equal to contet type after operation "%s".'

    requests.packages.urllib3.disable_warnings(urllib_exceptions.InsecureRequestWarning)        #pylint: disable=E1101

    if not callable(action):
        raise ClSelectExcept.WrongData('Wrong action for calling in checking webapp')
    webapp_url = 'https://{domain}/{alias}'.format(
        domain=domain,
        alias=alias,
    )
    try:
        request = requests.get(webapp_url, verify=False)
    except ConnectionError:
        # site is not available by https protocol
        webapp_url = webapp_url.replace('https://', 'http://')
        try:
            request = requests.get(webapp_url, verify=False)
        except ConnectionError:
            raise ClSelectExcept.WebAppError(app_is_inaccessible_before % webapp_url)
    before_mime_type = request.headers.get('Content-Type')
    before_status = request.headers.get('Status')
    action()
    try:
        request = requests.get(webapp_url, verify=False)
    except ConnectionError:
        raise ClSelectExcept.WebAppError(app_is_inaccessible_after % webapp_url)
    after_mime_type = request.headers.get('Content-Type')
    after_status = request.headers.get('Status')
    # assume that app is broken if:
    # it's response Content-Type or Status code (if first code wasn't 500) changed
    # if last Status code was 500 (internal error)
    if before_mime_type.lower() != after_mime_type.lower() or \
        (before_status != after_status and before_status is not None and before_status[:3] != '500') \
            or (after_status is not None and after_status[:3] == '500'):
        raise ClSelectExcept.WebAppError(app_is_broken % (after_status, before_mime_type, after_mime_type))


def _get_info_about_webapp(app_summary=None, user=None):
    """
    Get info (alias and domain) about user's web application
    :param app_summary: dict -> summary info about user's web application
    :param user: str -> name of unix user
    :return: tuple -> (alias, domain)
    """

    if app_summary is None:
        raise ClSelectExcept.WrongData('Was passed incorrect summary info about application')
    if user is None:
        raise ClSelectExcept.WrongData('Was passed incorrect name of user')
    alias = app_summary['alias']
    user_domains = userdomains(user)
    found_domains = [
            domain for domain, doc_root in user_domains
            if utils.realpaths_are_equal(user, doc_root, app_summary['docroot'])]
    if len(found_domains) == 0:
        raise ClSelectExcept.WrongData('Can not found suitable domain for application')
    app_domain = found_domains[0]

    return alias, app_domain


def destroy(user, app_directory, doc_root, apps_manager=None):
    """
    Destroy web app with specified directory for specified user
    :param user: username
    :param app_directory: application directory
    :param doc_root: Document root for selected domain
    :param apps_manager: Application Manager. Class that responsible
           for gathering and writing information about applications
    :return: None
    """

    if apps_manager is None:
        apps_manager = ApplicationsManager()

    # get app state
    app_config = apps_manager.get_app_config(user, app_directory)
    # remove env from htaccess
    try:
        # if domain is already removed we shouldn't do anything
        if doc_root is not None:
            apps_manager.add_env_vars_for_htaccess(user, app_directory, None, doc_root)
    except Exception as err:
        clprint.print_diag('text', {'status': 'ERROR', 'message': str(err)})

    if app_config is not None and app_config.get('app_status') == APP_STOPPED_CONST:
        # stopped app
        user_home = pwd.getpwnam(user).pw_dir
        # Remove app's virtualenv, e.g. /home/cltest1/virtualenv/test_app
        # TODO: Account for possible symlinks in LU-832
        virtual_env_path = os.path.join(
            user_home, get_venv_rel_path(user, app_directory)[1])
        try:
            shutil.rmtree(virtual_env_path)
        except OSError:
            pass
        # if domain is already removed we shouldn't do anything
        if doc_root is not None:
            # Remove passenger lines from .htaccess
            htaccess_filename = apps_manager.get_htaccess_by_appdir(user, app_directory, doc_root)
            clpassenger.remove_passenger_lines_from_htaccess(htaccess_filename)
        # remove app from python-selector.json file
        apps_manager.remove_app_from_config(user, app_directory)
        # Update application status in passenger
        clpassenger.restart(user, app_directory)
        return
    # Moved before removing an app from node-selector.json because in case if
    # app_root is absent we still want to remove venv dir of an application.
    # The method didn't anyhing if app_root is absent and skip other commands
    # including removal of a venv dir
    _, prefix = get_venv_rel_path(user, app_directory)
    abs_dir, _ = utils.get_abs_rel(user, prefix)
    try:
        # Remove app's virtualenv, e.g. /home/cltest1/virtualenvnv/test_app
        shutil.rmtree(abs_dir)
    except OSError:
        pass
    user_summary = clpassenger.summary(user)
    # remove app from python-selector.json file
    app_in_config = apps_manager.remove_app_from_config(user, app_directory)
    try:
        utils.get_using_realpath_keys(user, app_directory, user_summary)
    except KeyError:
        # if app was existed and app's dir is not exists, we skip all further actions
        if app_in_config:
            return None
        else:
            raise ClSelectExcept.WrongData("No such application (or application not configured) \"%s\""
                                           % app_directory)
    # remove app from passenger
    clpassenger.unconfigure(user, app_directory)        # Clear .htaccess

    try:
        clpassenger.restart(user, app_directory)
    except ClSelectExcept.MissingApprootDirectory:
        pass


def fix_homedir(user):
    user = clselectctl.get_user(user)

    envs, _ = utils.get_abs_rel(user, environments.DEFAULT_PREFIX)

    for env in glob.glob(os.path.join(envs, '*', '*')):
        for script in glob.glob(os.path.join(env, 'bin', '*')):
            # we should not even try to read symlinks
            if os.path.islink(script):
                continue
            old_script = utils.file_read(script, 'rb')
            new_script = []
            encoded_env = env.encode()
            # in fact, this is only for activate scripts
            if b'VIRTUAL_ENV' in old_script:
                for line in old_script.splitlines():
                    if b'VIRTUAL_ENV' in line:
                        # matches both cases with single and double quotes
                        # example: VIRTUAL_ENV="<some_path>" VIRTUAL_ENV='<some_path>'
                        line = re.sub(
                            rb'VIRTUAL_ENV[^\\]([\"\'])(?P<old_path>.+)\1$',
                            lambda m: m.group(0).replace(m.group('old_path'), encoded_env),
                            line)
                    new_script.append(line)
            # change all shebangs to python
            # TODO: what about shebangs to global python?
            elif old_script.startswith(b'#!'):
                new_script = old_script.splitlines()
                new_script[0] = b'#!%s/bin/python' % encoded_env
            if new_script:
                utils.file_writelines(script, (b'%s\n' % line for line in new_script), 'wb')


def install(user, directory, extension=None, requirements_path=None, skip_web_check=False, apps_manager=None):
    """
    Install defined extension to python web application
    :param requirements_path: path to the text file with list of python modules for installation
    :param user: name of owner of web application
    :param directory: application's root
    :param extension: name of installed module
    :param skip_web_check: skip check web application after change it's properties
    :param apps_manager: Application Manager. Class that responsible
           for gathering and writing information about applications
    :return: None
    """

    if apps_manager is None:
        apps_manager = ApplicationsManager()

    user = clselectctl.get_user(user)
    user_config = apps_manager.get_user_config_data(user)
    try:
        app_data = utils.get_using_realpath_keys(user, directory, user_config)
    except KeyError:
        raise ClSelectExcept.WrongData('Record about application {} is absent'.format(directory))
    if app_data['app_status'] != APP_STARTED_CONST:
        skip_web_check = True

    environment = _get_environment(user, directory, apps_manager=apps_manager)

    def action():
        if extension is not None:
            environment.extension_install(extension)
        elif requirements_path is not None:
            environment.extension_install_requirements(os.path.join(directory, requirements_path))

    if not skip_web_check:
        try:
            check_response_from_webapp(
                domain=app_data['domain'],
                alias=app_data['app_uri'],
                action=action,
            )
        except ClSelectExcept.WebAppError as err:
            raise ClSelectExcept.WebAppError('An error occured during installation of modules. %s' % err)
    else:
        action()


def list_extensions(user, directory):
    user = clselectctl.get_user(user)
    environment = _get_environment(user, directory)
    return environment.extensions()


def restart(user, app_directory, doc_root=None, apps_manager=None):
    """
    Restarts web app with specified directory for specified user
    :param user: username
    :param app_directory: application directory
    :param doc_root: Document root for selected domain
    :param apps_manager: Application Manager. Class that responsible
           for gathering and writing information about applications
    :return: None
    """

    if apps_manager is None:
        apps_manager = ApplicationsManager()

    app_config = apps_manager.get_app_config(user, app_directory)
    if app_config is not None and app_config.get('app_status') == APP_STOPPED_CONST:
        # If application was stopped - start it
        # doc_root is None during 'update-interpreter' command
        if doc_root is not None:
            start(user, app_directory, doc_root)
    else:
        apps_summary = clpassenger.summary(user)
        if app_directory not in apps_summary:
            raise ClSelectExcept.WrongData(
                'No such application (or application not configured) "%s"' % app_directory)
    clpassenger.restart(user, app_directory)


def stop(user, app_directory, doc_root, apps_manager=None):
    """
    Stops web app with specified directory for specified user
    :param user: username
    :param app_directory: application directory
    :param doc_root: Document root for selected domain
    :param apps_manager: Application Manager. Class that responsible
           for gathering and writing information about applications
    :return: None
    """

    if apps_manager is None:
        apps_manager = ApplicationsManager()

    app_config = apps_manager.get_app_config(user, app_directory)
    if app_config is None:
        raise ClSelectExcept.WrongData("No such application (or application not configured) \"%s\""
                                       % app_directory)
    # Do nothing if application already stopped
    if app_config.get('app_status') == APP_STOPPED_CONST:
        return
    htaccess_filename = apps_manager.get_htaccess_by_appdir(user, app_directory, doc_root, app_config)
    # Remove passenger lines from .htaccess
    clpassenger.remove_passenger_lines_from_htaccess(htaccess_filename)
    # Save new application status in user's config
    apps_manager.set_app_status(user, app_directory, APP_STOPPED_CONST)
    # Update application status in passenger
    clpassenger.restart(user, app_directory)


def start(user, app_directory, doc_root, apps_manager=None):
    """
    Starts web app with specified directory for specified user
    :param user: username
    :param app_directory: application directory
    :param doc_root: Document root for selected domain
    :param apps_manager: Application Manager. Class that responsible
           for gathering and writing information about applications
    :return: None
    """

    if apps_manager is None:
        apps_manager = ApplicationsManager()

    app_config = apps_manager.get_app_config(user, app_directory)
    if app_config is None:
        raise ClSelectExcept.WrongData("No such application (or application not configured) \"%s\""
                                       % app_directory)
    # Do nothing if application already started
    if app_config.get('app_status') == APP_STARTED_CONST:
        return
    # Create .htaccess file for the application
    apps_manager.update_htaccess_file(user, app_directory, doc_root)
    # Save new application status in user's config
    apps_manager.set_app_status(user, app_directory, APP_STARTED_CONST)
    # Update application status in passenger
    clpassenger.restart(user, app_directory)
    # Update wsgi file. It can be changed while app was stopped
    startup_file = app_config.get('startup_file')
    if startup_file != DEFAULT_STARTUP_FILE:
        startup_file_full_path = _get_full_path_to_startup_file(
            user,
            app_directory,
            app_config.get('startup_file'),
        )
        setup_wsgi(user, app_directory, startup_file_full_path, app_config.get('entry_point'))



def setup_wsgi(user, directory, wsgi_path, callable_object=None):
    user = clselectctl.get_user(user)
    abs_dir, _ = utils.get_abs_rel(user, directory)
    apps_summary = clpassenger.summary(user)
    if directory not in apps_summary:
        raise ClSelectExcept.WrongData("No such application (or application not configured) \"%s\"" % directory)
    if os.path.isabs(wsgi_path):
        if not os.path.isfile(wsgi_path):
            raise ClSelectExcept.WrongData('No such WSGI script \"%s\"' % wsgi_path)
        wsgi_path = utils.s_partition(wsgi_path, abs_dir)[2].strip(os.path.sep)
        if not wsgi_path:
            raise ClSelectExcept.WrongData('WSGI script is not in webapp directory')
    else:
        raise ClSelectExcept.WrongData('Path to WSGI script must be absolute')

    app_wsgi = os.path.join(abs_dir, clpassenger.WSGI_PATH)
    # Set new callable object if need
    if not callable_object:
        callable_object = DEFAULT_APP_ENTRYPOINT
    utils.file_write(app_wsgi, WSGI_TEMPLATE % (wsgi_path, callable_object), 'w')
    # Restart application
    clpassenger.restart(user, directory)


def has_virtenv_dir(user):
    """
    return True if user home dir has virtualenv dir
    """
    homedir = pwd.getpwnam(user).pw_dir
    path = os.path.join(homedir, 'virtualenv')
    return os.path.isdir(path)


def summary(user):
    user = clselectctl.get_user(user)

    summ = {}
    if not has_virtenv_dir(user):
        return summ

    for directory, data in iteritems(clpassenger.summary(user)):
        if data['interpreter'] != ApplicationsManager.INTERPRETER:
            continue
        try:
            environment = _get_environment(user, directory, data).as_deepdict()
            app_wsgi = os.path.join(data['directory'], clpassenger.WSGI_PATH)
            wsgi_conf = utils.file_read(app_wsgi)
            match = WSGI_PATTERN.search(wsgi_conf)
            if match:
                groupdict = match.groupdict()
                wsgi_path = groupdict['script']
                callable_object = groupdict['callable']
            else:
                wsgi_path = callable_object = ''

            env_info = {
                'alias': data['alias'],
                'domain': data['domain'],
                'environment': environment['name'],
                'interpreter': environment['interpreter'],
                'extensions': environment['extensions'],
                'wsgi': {'script': wsgi_path, 'callable': callable_object},
            }

            # add only list with additions domains
            if "domains" in data and len(data["domains"]) > 1:
                env_info["domains"] = data["domains"]
        except Exception as e:
            env_info = {"error": str(e)}

        summ[directory] = env_info

    return summ


def _get_full_path_to_startup_file(user, directory, startup_file):
    # type: (str, str, str) -> str
    """
    :param user: name of system user
    :param directory: name of dir with python applicatoin
    :param startup_file: name of startup file of python application
    Get full path to startup file of python application
    """

    abs_dir, rel_dir = utils.get_abs_rel(user, directory)
    return os.path.join(abs_dir, startup_file)


def uninstall(user, directory, extension):
    user = clselectctl.get_user(user)
    environment = _get_environment(user, directory)
    environment.extension_uninstall(extension)


def update(user, directory, extension):
    user = clselectctl.get_user(user)
    environment = _get_environment(user, directory)
    environment.extension_update(extension)


# TODO: We should remove that method completely, because this script already doesn't caused directly
def main():
    try:
        opts, args = getopt.getopt(
            sys.argv[1:],
            'hi:v:u:lGsynfFZtcbe:d:r:go:pjK:', [
                'help',
                'interpreter=',
                'version=',
                'user=',
                'domain=',
                'list',
                'list-extensions',
                'user-summary',
                'create-webapp',
                'update-interpreter',
                'destroy-webapp',
                'relocate-webapp',
                'transit-webapp',
                'restart-webapp',
                'start-webapp',
                'stop-webapp',
                'user-current',
                'set-user-current',
                'enable-user-extensions=',
                'disable-user-extensions=',
                'replace-user-extensions=',
                'recreate-virtualenv',
                'freeze-requirements',
                'list-user-extensions',
                'setup-wsgi=',
                'print-summary',
                'json',
                'verbose',
                'list-extensions-version='
                ])
    except getopt.GetoptError as err:
        sys.stderr.write(str(err))
        usage()
        sys.exit(1)

    ext_list = ''
    fmt = 'text'
    verbose = False
    print_summary = False
    user = None
    domain = None
    action = ''
    version = None

    if not opts:
        usage()
        sys.exit(1)

    allowed_opts = ('--update-interpreter', )
    if not any(item[0] in allowed_opts for item in opts):
        clprint.print_diag(fmt, {'status': 'ERROR',
                                 'message': 'Unsupported option was passed. Currently, selectorctl utility '
                                            'supports only update-interpreter option. '
                                            'Please, use cloudlinux-selector utility for python '
                                            'interpreter'})
        sys.exit(1)

    for o, a in opts:
        if o in ('-i', '--interpreter'):
            pass
        elif o in ('-l', '--list'):
            action = 'list'
        elif o in ('-y', '--create-webapp'):
            action = 'create-webapp'
        elif o == '--update-interpreter':
            action = 'update-interpreter'
        elif o in ('-n', '--destroy-webapp'):
            action = 'destroy-webapp'
        elif o in ('-f', '--relocate-webapp'):
            action = 'relocate-webapp'
        elif o in ('-F', '--transit-webapp'):
            action = 'transit-webapp'
        elif o in ('-Z', '--restart-webapp'):
            action = 'restart-webapp'
        elif o in ('-S', '--start-webapp'):
            action = 'start-webapp'
        elif o in ('-t', '--stop-webapp'):
            action = 'stop-webapp'
        elif o in ('-c', '--user-current'):
            action = 'user-current'
        elif o in ('-b', '--set-user-current'):
            action = 'set-user-current'
        elif o in ('-g', '--list-user-extensions'):
            action = 'list-user-extensions'
        elif o in ('-e', '--enable-user-extensions'):
            action = 'enable-user-extensions'
            ext_list = a
        elif o in ('--recreate-virtualenv',):
            action = 'recreate-virtualenv'
            ext_list = a
        elif o in ('--freeze-requirements',):
            action = 'freeze-requirements'
            ext_list = a
        elif o in ('-s', '--user-summary'):
            action = 'user-summary'
        elif o in ('-j', '--json'):
            fmt = 'json'
        elif o == '--verbose':
            verbose = True
        elif o in ('-r', '--replace-user-extensions'):
            action = 'replace-user-extensions'
            ext_list = a
        elif o in ('-d', '--disable-user-extensions'):
            action = 'disable-user-extensions'
            ext_list = a
        elif o in ('-v', '--version'):
            version = a
        elif o in ('-G', '--list-extensions'):
            action = 'list-extensions'
        elif o in ('-u', '--user'):
            user = a.strip("'\"")  # for deleting \' and \"
        elif o == '--domain':
            domain = a
        elif o in ('-o', '--setup-wsgi'):
            action = 'setup-wsgi'
            wsgi_path, _, callable_object = utils.s_partition(a, ':')
        elif o in ('-p', '--print-summary'):
            print_summary = True
        elif o in ('-K', '--list-extensions-version'):
            action = 'list-extensions-version'
            ext_list = a
        else:
            sys.stderr.write('unhandled option')
            sys.exit(1)

    if action == '':
        sys.stderr.write('ERROR:you must provide option for interpreter python')
        sys.exit(1)

    if action in (
            'create-webapp',
            'destroy-webapp',
            'relocate-webapp',
            'transit-webapp',
            'restart-webapp',
            'start-webapp',
            'stop-webapp',
            'enable-user-extensions',
            'list-user-extensions',
            'replace-user-extensions',
            'disable-user-extensions',
            'setup-wsgi',
            'user-current',
            'set-user-current',
    ):
        if not args:
            sys.stderr.write('webapp must be specified')
            sys.exit(3)

    if ext_list == '-':
        if action == 'enable-user-extensions':
            sys.stderr.write('installation of all extensions is not possible')
            sys.exit(4)
    elif ext_list:
        _exts = [_f for _f in ext_list.split(',') if _f]

    if domain:
        try:
            doc_root, user_ = docroot(domain)
        except NoDomain:
            clprint.print_diag(fmt, {'status': 'ERROR', 'message': 'No such domain: "%s"' % domain})
            sys.exit(1)
        except NotSupported:
            clprint.print_diag(fmt, {'status': 'ERROR', 'message': 'Python selector not supported for %s' % CP_NAME})
            sys.exit(1)

        if not user:  # we can obtain user name for domain
            user = user_
        elif user != user_:
            clprint.print_diag(fmt, {'status': 'ERROR', 'message': 'domain "%s" is not owned by the user "%s"' % (domain, user)})
            sys.exit(1)
    else:
        # commands that need user specified
        if action in (
            'create-webapp',
            'destroy-webapp',
            'relocate-webapp',
            'transit-webapp',
            'restart-webapp',
            'start-webapp',
            'stop-webapp',
            'enable-user-extensions',
            'list-user-extensions',
            'replace-user-extensions',
            'disable-user-extensions',
            'setup-wsgi',
            'user-summary',
            'update-interpreter',
            'user-current',
            'set-user-current',
        ):
            try:
                user = clselectctl.get_user(user)
            except ValueError as e:
                clprint.print_diag(fmt, {'status': 'ERROR', 'message': str(e)})
                sys.exit(1)
        if user and not action == 'update-interpreter':
            try:
                domain_list = userdomains(user)
            except NotSupported:
                clprint.print_diag(fmt, {'status': 'ERROR', 'message': 'Python selector not supported for %s' % CP_NAME})
                sys.exit(1)

            _, doc_root = domain_list[0]  # get document root for main domain

    try:
        error = 0
        ok = 0
        result = {}
        if action == 'list-extensions':
            raise DeprecationWarning('%s is deprecated in old python selector' % action)
        elif action == 'list-extensions-version':
            result = extensions.ExtensionInfo().list_extensions_version(_exts)
            ext_list = ''
        elif action == 'list':
            result = interpreters.interpreters_dict('version')
        elif action == 'user-summary':
            result = summary(user)
        elif action == 'create-webapp':
            create(user, args[0], args[1], version, doc_root=doc_root)
        elif action == 'update-interpreter':
            app_name = '*'  # all applications as default
            if len(args) == 1:
                app_name = args[0].strip("'\"")
            update_interpreter(user=user, app_name=app_name, version=(version,), verbose=verbose)
        elif action == 'destroy-webapp':
            destroy(user, args[0], doc_root)
        elif action == 'relocate-webapp':
            raise DeprecationWarning('%s is deprecated in old python selector' % action)
        elif action == 'transit-webapp':
            new_doc_root = None
            if domain:  # remove app to new domain if domain present
                new_doc_root = doc_root
            raise DeprecationWarning('%s is deprecated in old python selector' % action)
        elif action == 'restart-webapp':
            # username, app-dir
            restart(user, args[0], doc_root)
        elif action == 'start-webapp':
            # username, app-dir, doc_root
            start(user, args[0], doc_root)
        elif action == 'stop-webapp':
            # username, app-dir, doc_root
            stop(user, args[0], doc_root)
        elif action == 'user-current':
            raise DeprecationWarning('%s is deprecated in old python selector' % action)
        elif action == 'set-user-current':
            raise DeprecationWarning('%s is deprecated in old python selector' % action)
        elif action == 'list-user-extensions':
            result = list_extensions(user, args[0])
        elif action == 'setup-wsgi':
            # --setup-wsgi=<file_path:callable>
            # args[0] -- app_root
            #  wsgi_path -- file_path
            #  callable_object -- callable
            setup_wsgi(user, args[0], wsgi_path, callable_object)
        elif action == 'recreate-virtualenv':
            for env in environments.environments():
                env.recreate()
        elif action == 'freeze-requirements':
            for env in environments.environments():
                env.pip_freeze()
        else:
            alias = args[0]
            if ext_list == '-':
                _exts = list_extensions(user, alias)

            for extension in _exts:
                try:
                    if action == 'enable-user-extensions':
                        install(user, alias, extension)
                    elif action == 'disable-user-extensions':
                        uninstall(user, alias, extension)
                    elif action == 'replace-user-extensions':
                        update(user, alias, extension)
                    result.update({extension: {'status': 'OK'}})
                    ok += 1
                except (ValueError, ClSelectExcept.ExternalProgramFailed) as err:
                    result.update(
                        {extension: {'status': 'ERROR', 'message': str(err)}})
                    error += 1
    except (BaseClSelectException, extensions.PyPIRpcApiError) as err:
        clprint.print_diag(fmt, {'status': 'ERROR', 'message': str(err)})
        sys.exit(1)
    except NotSupported:
        clprint.print_diag(fmt, {'status': 'ERROR', 'message': 'Python selector not supported for %s' % CP_NAME})
        sys.exit(1)
    except Exception as err:
        msg = traceback.format_exc()
        clprint.print_diag(fmt, {'status': 'ERROR', 'message': msg})
        sys.exit(1)

    if not result and print_summary:
        result = summary(user)

    if error and ok:
        status = 'PARTIAL'
        exit_status = 2
    elif error:
        status = 'ERROR'
        exit_status = 5
    else:
        if ext_list and ok < 2:
            if print_summary:
                clprint.print_data(fmt, summary(user))
            else:
                clprint.print_data(fmt, {})
        else:
            clprint.print_data(fmt, result)
        sys.exit(0)
    message = '\n'.join(
        '%s: %s' % (k, v.get('message', v.get('status', '')))
        for k, v in iteritems(result))
    clprint.print_diag(fmt, {'status': status, 'message': message})
    sys.exit(exit_status)


def _glob_users(user='*'):
    """
    Return filtered control panels users
    :param user:
    :return:
    """
    return [user_name for user_name in cpusers() if fnmatch.fnmatch(user_name, user)]


def update_interpreter(user='*', version=('2.7', '3.3', '3.4'), app_name='*', verbose=True):
    if detect.is_plesk():
        sys.stderr.write("WARNING: Update interpreter with Python selector not supported on %s, skipped\n" % CP_NAME)
        return

    alt_interpreters_dict = interpreters.interpreters(key='version')  # obtain data about installed alt-python
    is_da_panel = detect.detect_DA_admin()
    apps_manager = ApplicationsManager()
    for user_name in _glob_users(user):
        if is_da_panel and utils.user_should_be_skipped(user_name):
            continue
        if not has_virtenv_dir(user_name):
            continue

        # change uid/gid to user_name
        user_pwd = pwd.getpwnam(user_name)
        real_uid = os.getuid()
        real_gid = os.getgid()
        os.setregid(user_pwd.pw_gid, real_uid)
        os.setreuid(user_pwd.pw_uid, real_gid)
        # turns off quota
        secureio.set_capability()

        # get users apps from selector json config
        for app_name_, data in iteritems(apps_manager.get_user_config_data(user_name)):
            if not fnmatch.fnmatch(app_name_, app_name):  # filtering application name
                continue
            try:
                environment = _get_environment(user_name, app_name_, apps_manager=apps_manager)
                env_python_ver = environment.interpreter().version
                if env_python_ver not in version:  # filtering virtual environment python version
                    continue
                if verbose:
                    print('Updating user: "%s"; application: "%s"...' % (user_name, app_name_))
                update_result = environment.update_python_interpreter(backup=True, verbose=verbose, force=False,
                                                                      _alt_interpreters_dict=alt_interpreters_dict)
                if update_result:  # restart application only if updating success
                    try:
                        # do not worry, stopped apps will not be started
                        # see restart method for details
                        restart(user_name, app_name_)
                    except (OSError, IOError) as e:
                        if verbose:
                            print('Failed to restart application "%s"; %s' % (app_name_, str(e)))
            except ClSelectExcept.InterpreterError as e:
                if verbose:
                    print('Failed updating user: "%s"; application: "%s"; %s' % (user_name, app_name_, str(e)))

        # restore original uid/gid
        os.setuid(real_uid)
        os.setgid(real_gid)

Youez - 2016 - github.com/yon3zu
LinuXploit