Server IP : 66.29.132.124 / Your IP : 18.225.92.25 Web Server : LiteSpeed System : Linux business141.web-hosting.com 4.18.0-553.lve.el8.x86_64 #1 SMP Mon May 27 15:27:34 UTC 2024 x86_64 User : wavevlvu ( 1524) PHP Version : 7.4.33 Disable Function : NONE MySQL : OFF | cURL : ON | WGET : ON | Perl : ON | Python : ON | Sudo : OFF | Pkexec : OFF Directory : /opt/cloudlinux/venv/lib64/python3.11/site-packages/clselector/ |
Upload File : |
#!/opt/cloudlinux/venv/bin/python3 -bb # coding=utf-8 # # Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2019 All Rights Reserved # # Licensed under CLOUD LINUX LICENSE AGREEMENT # http://cloudlinux.com/docs/LICENSE.TXT from __future__ import print_function from __future__ import division from __future__ import absolute_import import base64 import collections import glob import json import os import pathlib import pwd import re import shutil import subprocess import sys import traceback from typing import Set # NOQA from future.utils import iteritems import cldetectlib as detect import clselect.clselectctlnodejsuser import clselect.clselectexcept import clselect.clselectctlpython import clselect.clselectctlruby import clselect.clselectpythonuser import clselect.clselectnodejsuser from clcommon.clexception import FormattedException from clcommon.clpwd import resolve_username_and_doc_root from clcommon.cpapi import CP_NAME, docroot from clcommon.cpapi.cpapiexceptions import NoDomain, NotSupported, IncorrectData from cllimits.lib import exec_utility from clselect import clselectctl from clselect import clpassenger from clselect import ClUserSelect, ClSelect, ClExtSelect from clselect.baseclselect import APP_STARTED_CONST, ENABLED_STATUS, DISABLED_STATUS, BaseSelectorError from clselect.clselectctlnodejsuser import validate_env_vars from clselect.clselectctlphp import format_summary, API_1 from clselect.clselectexcept import ClSelectExcept as ClSelectExceptions from clselect.clselectnodejs import NodeJSConfigError from clselect.clselectnodejs.node_manager import NodeManager from clselect.clselectpython.apps_manager import ( PythonAppFormatVersion, get_venv_rel_path ) from clselect.clselectpython.python_manager import PythonManager from clselect.clselectpythonuser.environments import Environment as PythonEnvironment # NOQA from clselect.clselectnodejsuser.environments import Environment as NodeJsEnvironment # NOQA from clselect.utils import ( mkdir_p, file_read, file_write, get_using_realpath_keys, get_abs_rel, delete_using_realpath_keys, ) from secureio import get_perm from clconfig.ui_config_lib import _set_ui_config, UIConfigException class ClSelectExcept(FormattedException): pass class ClSelectDomainNotFound(ClSelectExcept): """ Custom exception in case if user doesn't have the specific domain """ pass OK_RES_DICT = {"status": "ok"} class CloudlinuxSelectorLib(object): def __init__(self, interpreter): self.interpreter = interpreter self._SELECTORCTL_UTILITY = "/usr/bin/selectorctl" self.DYNAMIC_UI_CTL_CMD = '/usr/share/l.v.e-manager/utils/dynamicui.py' self.CLOUDLINUX_SELECTOR_UTILITY = '/usr/sbin/cloudlinux-selector' self.NODEJS_INTERPRETER = "nodejs" self.PYTHON_INTERPRETER = "python" self.RUBY_INTERPRETER = "ruby" self.PHP_INTERPRETER = "php" # self.selector_manager - responsible for actual selector high-level API # self.apps_manager - responsible for gathering and set information about applications # self.selector_user_lib - set of modules which responsible user's part of work with virtual envs and etc # self.selector_old_lib - old libraries for work with applications self.selector_manager = None self.apps_manager = None self.selector_user_lib = None self.selector_old_lib = None if self.interpreter == self.NODEJS_INTERPRETER: from clselect.clselectnodejs.apps_manager import ApplicationsManager self.apps_manager = ApplicationsManager() self.selector_manager = NodeManager() self.selector_user_lib = clselect.clselectnodejsuser self.selector_old_lib = clselect.clselectctlnodejsuser elif self.interpreter == self.PYTHON_INTERPRETER: from clselect.clselectpython.apps_manager import ApplicationsManager self.apps_manager = ApplicationsManager() self.selector_manager = PythonManager() self.selector_user_lib = clselect.clselectpythonuser self.selector_old_lib = clselect.clselectctlpython elif self.interpreter == self.PHP_INTERPRETER: from clselect.clselectphp.php_manager import PhpManager self.selector_manager = PhpManager() def check_selector_is_available(self): """ Checks that selector is able to work on current os environment :return: """ if self.interpreter != self.PHP_INTERPRETER: return # check that native version is installed for php self.selector_manager.cl_select_lib.check_requirements() def safely_resolve_doc_root_for_app(self, username, app_root): """Get doc_root from application config or raise exception""" domain = self.apps_manager.get_app_domain(username, app_root) _, doc_root = self.safely_resolve_username_and_doc_root(username, domain) return doc_root @staticmethod def safely_resolve_username_and_doc_root(user=None, domain=None): """ Safely resolve username and doc_root by domain, or resolve document root by username, or resolve document root and username by effective uid :param user: str -> name of unix user :param domain: str -> domain of panel user :return: tuple -> user, doc_root """ try: result_user, result_doc_root = resolve_username_and_doc_root( user=user, domain=domain, ) except NoDomain: raise ClSelectDomainNotFound( { 'message': 'No such domain: %(domain)s' if domain is not None else 'No such user: %(user)s', 'context': { 'domain': domain, 'user': user, }, } ) except NotSupported: raise ClSelectExcept( { 'message': 'Nodejs selector not supported for %(panel)s', 'context': { 'panel': CP_NAME }, } ) except IncorrectData: raise ClSelectExcept( { 'message': 'Domain %(domain)s is not owned by the user %(user)s', 'context': { 'domain': domain, 'user': user }, } ) return result_user, result_doc_root @staticmethod def should_be_runned_as_user(opts): """ Check whether selector should be run through "su - user" :param opts: dict of parsed cli params :return: True if should be run through su or False if not """ result = True euid, egid = get_perm() if opts is None: result = False elif not isinstance(opts, dict): result = False elif opts['--user'] is None and opts['--domain'] is None: # if --user and --domain are absent - root result = False elif euid != 0 and egid != 0: result = False elif opts['--interpreter'] == 'php': result = False return result @staticmethod def should_run_user_without_cagefs(opts): return opts['--interpreter'] == 'php' and\ opts['--user'] is not None and\ os.geteuid() == 0 @staticmethod def _get_user_pwd_data(user=None): """ Resolves user eigher with passed username or with getting current user ID :param user: str or None -> username to be resolved :return: obj -> pwd user object """ userdata = pwd.getpwnam(user) return userdata @staticmethod def user_and_domain_checker(user=None, domain=None): """ Check if user and domain are None :param user: name of unix user :param domain: domain of panel user :return: None """ if user is None and domain is None: raise ClSelectExcept( { 'message': 'User or domain parameter must be specified if current user is root', 'context': {} } ) @staticmethod def _return_error(result, **kwargs): err = {"status": "ERROR: %s" % result} if len(kwargs) > 0: err.update(kwargs) return err @staticmethod def _return_with_status_error(result, details=None): """ Construct error dict in one place :param result: error string :return: dict with 'status':'error' and error message """ err = { 'status': 'error', 'result': result } if details: err.update({'details': details}) return err def _change_selector_status(self, status): """ Set CL selector status in it's config :param status: set status of selector :return: error or ok message """ if status not in (ENABLED_STATUS, DISABLED_STATUS,): return self._return_error( 'Unknown selector status provided: "{}". ' "Can be only 'enabled' or 'disabled'".format(status)) try: self.selector_manager.selector_enabled = status == ENABLED_STATUS except BaseSelectorError as e: return self._return_error(e) # Backward compatibility with cloudlinux-config on python if self.interpreter == self.PYTHON_INTERPRETER: # Create hidePythonApp dictionary config_dict = {'uiSettings': {'hidePythonApp': status != ENABLED_STATUS}} try: # _set_ui_config writes changes and updates UI _set_ui_config(config_dict) # We dont need to call self.update_ui after return OK_RES_DICT except UIConfigException: pass # Backward compatibility with cloudlinux-config on nodejs if self.interpreter == self.NODEJS_INTERPRETER: config_dict = {'uiSettings': {'hideNodeJsApp': status != ENABLED_STATUS}} try: _set_ui_config(config_dict) return OK_RES_DICT except UIConfigException: pass self.update_ui() return OK_RES_DICT @staticmethod def get_nodejs_selector_status(): res = {'selector_enabled': False} if NodeManager().selector_enabled: res['selector_enabled'] = True return res def get_php_selector_status(self): return {'PHPSelector': ENABLED_STATUS if self.selector_manager.selector_enabled else DISABLED_STATUS} def get_summary(self): try: res = self.selector_manager.get_summary() res.update(self.get_selector_status()) return res except BaseSelectorError as e: return self._return_error(e) @staticmethod def get_python_selector_status(): res = {'selector_enabled': False} if PythonManager().selector_enabled: res['selector_enabled'] = True return res @staticmethod def get_user_home(user): try: return pwd.getpwnam(user).pw_dir except KeyError: raise ClSelectExcept({ 'message': 'No such user: `%(user)s`' }) def get_nodejs_summary(self): res = NodeManager().get_summary() res.update(self.get_selector_status()) return res def run_script(self, user, app_root, script_name, script_args=None): """ Runs script to execute other script inside user app environment :param user: str -> owner of application :param app_root: str -> application directory :param script_name: str -> name of script :param script_args: list of str -> arguments for the script :return: dict """ user_home = self.get_user_home(user) if self.interpreter == self.NODEJS_INTERPRETER: interpreter_path = self.apps_manager.get_binary_path( user, app_root, user_home, 'npm') elif self.interpreter == self.PYTHON_INTERPRETER: interpreter_path = self.apps_manager.get_binary_path( user, app_root, user_home, 'python') else: raise NotImplementedError() cmd = [interpreter_path] if self.interpreter == self.NODEJS_INTERPRETER: cmd.append('run-script') cmd.append(script_name) if script_args is not None: if self.interpreter == self.NODEJS_INTERPRETER: cmd.append('--') cmd.extend(script_args) result = self._run_interpreter(cmd, user_home, app_root) return result @staticmethod def _run_interpreter(cmd, user_home, app_root): """ Run interpreter in users environment :param cmd: list -> command to execute :param user_home: -> user home directory :param app_root: -> app path :return: dict """ p = subprocess.Popen( args=cmd, cwd=os.path.join(user_home, app_root), stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) try: stdout, stderr = p.communicate() except OSError as e: raise ClSelectExcept({ 'message': ('run-script call: `%(args)s` failed ' 'with error: %(err)s'), 'context': {'args': cmd, 'err': e} }) output_string = ( 'returncode: {}\n' 'stdout:\n{}\n' 'stderr:\n{}\n' ).format( p.returncode, stdout.strip(), stderr.strip()) if any('out of memory' in output.lower() for output in (stdout, stderr)): pid = os.getpid() proc_limits_path = f'/proc/{pid}/limits' process_limits = pathlib.Path(proc_limits_path).read_text() output_string += ( '\nOut of memory error may be caused by hitting LVE limits\n' 'or "Max data size", "Max address space" or "Max resident set" process limits\n' 'Please check LVE limits and process limits. Readjust them if necessary\n' 'More info: https://docs.cloudlinux.com/shared/cloudlinux_os_components/#known-restrictions-and-issues' f'\n\nprocess limits "{proc_limits_path}":\n{process_limits}\n' ) result = {'data': base64.b64encode(output_string.encode()).decode()} result.update({'status': 'success'}) if p.returncode != 0: result['warning'] = f'Script exit code: {p.returncode}' return result def get_full(self): if self.interpreter in (self.NODEJS_INTERPRETER, self.PHP_INTERPRETER): return self.selector_manager.get_summary() @staticmethod def _add_statistics_field(versions_list): """ Add selector usage statistics (amount of domains that use some version, etc) Fist parameter is an array with such format: [{'version': '5.6'}, {'version': '7.6'}] Output is an array with such format: [{'version': '5.6', 'users': 10}, ...] :type versions_list: list :rtype: list """ user_version_map = ClUserSelect().get_user_version_map() # count users per php version version_user_map = collections.Counter() for user, version in iteritems(user_version_map): version_user_map[version] += 1 # and add additional field to the output for item in versions_list: item['total_users'] = version_user_map[item['version']] return versions_list def get_supported_versions(self): """ Retrieves supported versions list and default version """ if self.interpreter == self.PHP_INTERPRETER: # ToDo: make get_supported_versions do not request all information (like it return PHP) data = ClSelect(self.interpreter).get_summary(False) json_data = format_summary(data, format='json', api_version=API_1) selectorctl_result = json.loads(json_data) # Our Interpreter-specific selectorctl commands should support API >= 1 elif self.selector_manager is not None: selectorctl_result = self.selector_manager.get_summary() else: selectorctl_result = {} if selectorctl_result.get('status') == 'ERROR': # e.g. {"status": "ERROR", "message": "alt-php packages not found"} result = {"status": selectorctl_result['message']} else: result = selectorctl_result return result def get_current_version(self, user=None): """ Retrives current version for user """ user = [user] if user else None if self.interpreter == self.PHP_INTERPRETER: return self.selector_manager.get_current_version(user) else: return self._return_error('Supported only by php selector') def get_default_version(self): json_dict = self.get_supported_versions() try: default_version = json_dict['default_version'] except KeyError: if 'message' in json_dict: return self._return_error(json_dict['message']) else: return {"status": "ERROR", "data": json_dict} return {'default_version': default_version} def get_selector_status(self): if self.interpreter == self.NODEJS_INTERPRETER: return self.get_nodejs_selector_status() elif self.interpreter == self.PYTHON_INTERPRETER: return self.get_python_selector_status() elif self.interpreter == self.PHP_INTERPRETER: return self.get_php_selector_status() def php_selector_is_disabled(self): return not self.selector_manager.selector_enabled def php_selector_is_enabled(self): return self.selector_manager.selector_enabled def check_multiphp_system_default(self): """ Returns True when MultiPHP system default PHP version is alt-php and PHP Selector is NOT disabled For details please see LVEMAN-1170 """ if self.interpreter != self.PHP_INTERPRETER: return False try: from clcagefslib.selector.configure import is_ea4_enabled, read_cpanel_ea4_php_conf except ImportError: return False if is_ea4_enabled() and not self.php_selector_is_disabled(): conf = read_cpanel_ea4_php_conf() if conf: try: # get default system php version selected via MultiPHP Manager in cPanel WHM default_php = conf['default'] if not default_php.startswith('ea-php'): return True except KeyError: pass return False def set_supported_versions(self, versions): if self.interpreter in (self.NODEJS_INTERPRETER, self.PYTHON_INTERPRETER,): for version, enable_disable in iteritems(versions): try: result = self.set_version_status(enable_disable, version) except BaseSelectorError as e: return self._return_error(e) if result != OK_RES_DICT: return result return OK_RES_DICT json_dict = self.get_supported_versions() try: alternatives_list = json_dict["available_versions"] except KeyError: return self._return_error("corrupted answer from %s --json --summary --interpreter %s" % ( self._SELECTORCTL_UTILITY, self.interpreter)) alternatives_versions = {x["version"] for x in alternatives_list} if set(versions.keys()) - alternatives_versions: return self._return_error( "invalid alternative versions (%s), see %s --summary --interpreter %s for valid versions" % ( ', '.join(set(versions.keys()) - alternatives_versions), self._SELECTORCTL_UTILITY, self.interpreter)) # TODO: change error message success = 0 errors_list = [] for version, to_enable in iteritems(versions): if to_enable: ClSelect(self.interpreter).enable_version(str(version)) else: ClSelect(self.interpreter).disable_version(str(version)) success += 1 if success == len(versions): return OK_RES_DICT elif success > 0: return { "status": "WARNING: only {} of {} commands was successful. " "Errors was: {}".format(success, len(versions), errors_list) } else: return self._return_error("All commands were failed" "Errors was: {}".format(errors_list)) def set_default_version(self, version): try: if self.selector_manager is not None: self.selector_manager.switch_default_version(version) except clselect.clselectexcept.ClSelectExcept.NoSuchAlternativeVersion: # No such alt-php version raise ClSelectExcept({'message': "No such php version: %(ver)s", 'context': {'ver': version}}) except (clselect.clselectexcept.BaseClSelectException, NodeJSConfigError) as e: return self._return_with_status_error(str(e)) return OK_RES_DICT def set_version_status(self, target_version_status, version): """ Disable or enable version of selector :param target_version_status: disable or enable version of interpreter :param version: version of interpreter :return: OK_RES_DICT """ if target_version_status: self.selector_manager.set_version_status(version, ENABLED_STATUS) else: self.selector_manager.set_version_status(version, DISABLED_STATUS) return OK_RES_DICT def set_current_version(self, version): try: if self.interpreter == self.PHP_INTERPRETER: self.selector_manager.switch_current_version(version) except clselect.clselectexcept.ClSelectExcept.NoSuchAlternativeVersion: # No such alt-php version raise ClSelectExcept({'message': "No such php version: %(ver)s", 'context': {'ver': version}}) except clselect.clselectexcept.BaseClSelectException as e: return self._return_with_status_error(str(e)) return OK_RES_DICT def reset_extensions(self, version): extensions = {} try: if self.interpreter == self.PHP_INTERPRETER: extensions = self.selector_manager.reset_extensions(version) except clselect.clselectexcept.ClSelectExcept.NoSuchAlternativeVersion: # No such alt-php version raise ClSelectExcept({'message': "No such php version: %(ver)s", 'context': {'ver': version}}) except clselect.clselectexcept.BaseClSelectException as e: return self._return_with_status_error(str(e)) return extensions def set_selector_status(self, status): if self.interpreter in (self.NODEJS_INTERPRETER, self.PYTHON_INTERPRETER, self.PHP_INTERPRETER): return self._change_selector_status(status) def update_ui(self): if detect.is_cpanel() or detect.is_plesk() or detect.is_da(): retcode, out, err = exec_utility( self.DYNAMIC_UI_CTL_CMD, ['--sync-conf=all'], stderr=True) if retcode != 0: return self._return_error( "Can not sync UI with reason: {} {}".format(out, err)) return None def set_extensions(self, extensions, version): result = OK_RES_DICT.copy() if self.interpreter == self.PHP_INTERPRETER: try: data = self.selector_manager.set_extensions(version, extensions) if data: result.update(data) except clselect.clselectexcept.BaseClSelectException as e: return self._return_with_status_error(str(e)) return result def set_options(self, options, version): if self.interpreter == self.PHP_INTERPRETER: try: self.selector_manager.set_options(version, options) except clselect.clselectexcept.BaseClSelectException as e: return self._return_with_status_error(str(e)) return OK_RES_DICT def resolve_version(self, version): """ Attempts to get or verify version to be passed to external program Currently supported version is one digit (6 or 8). If version is None, return a default version :param version: str or None -> version to be verified or found :return: str -> digit as string """ if version is None: default_version = self.get_default_version().get('default_version', '') if default_version is None or default_version == '': # Interpreter default version not defined raise ClSelectExcept("{} default version not defined".format(self.interpreter)) if self.interpreter == self.NODEJS_INTERPRETER: m = re.match(r'(?P<version>\d+)', default_version) if not m: raise ClSelectExcept({'message': "Incorrect selector version: %(ver)s", 'context': {'ver': default_version}}) return m.group('version') else: return default_version if isinstance(version, (int, float)): if self.interpreter == self.NODEJS_INTERPRETER: # For NodeJS use only major version return str(int(version)) # TODO: check among supported versions else: # For Python use full version return str(version) if version == 'native': raise ClSelectExcept({'message': "Unsupported version: %(ver)s", 'context': {'ver': version}}) return version # TODO: do check among supported versions def create_app(self, app_root, app_uri, version, user=None, domain=None, app_mode=None, startup_file=None, env_vars=None, entry_point=None, passenger_log_file=None): """ Creates application for specified user, interpreter and version If user is None we hope that the external application resolves a user Currently NodeJS supported only :param domain: str -> domain of the application :param app_root: str -> app path relative to user home :param app_uri: str -> URI path of the application :param version: str or None -> version of the interpreter :param user: str or None -> username of user who owns the app :param app_mode: str or None -> application mode (development or production) :param startup_file: str or None -> main application file :param env_vars: json_string or None -> enviroment variables for application :param entry_point: Application entry point (used only for python interpreter). :param passenger_log_file: Passenger log filename :return: dict """ self.user_and_domain_checker(user, domain) version = self.resolve_version(version) if env_vars is not None: env_vars = validate_env_vars(json.loads(env_vars)) user, doc_root = self.safely_resolve_username_and_doc_root(user, domain) try: if self.interpreter == self.PYTHON_INTERPRETER: self.selector_old_lib.create(user, app_root, app_uri, version, doc_root=doc_root, env_vars=env_vars, startup_file=startup_file, domain_name=domain, entry_point=entry_point, apps_manager=self.apps_manager, passenger_log_file=passenger_log_file) elif self.interpreter == self.RUBY_INTERPRETER: clselect.clselectctlruby.create(user, app_root, app_uri, version, doc_root=doc_root) # ruby elif self.interpreter == self.NODEJS_INTERPRETER: # args[0] - directory, args[1] - alias (app-uri) # nodejs self.selector_old_lib.create( user, app_root, app_uri, version=version, doc_root=doc_root, app_mode=app_mode, env_vars=env_vars, startup_file=startup_file, domain_name=domain, apps_manager=self.apps_manager, passenger_log_file=passenger_log_file ) except clselect.clselectexcept.BaseClSelectException as e: return self._return_with_status_error(result=str(e)) # TODO: catch ClSelectExcept.BusyApplicationRoot return OK_RES_DICT @staticmethod def _get_application_path(app_root, userdata): """ Resolve app root to absolute path and checks if it exists :param app_root: str -> relative a user homedir app path :param userdata: obj -> pwd user object :return: str -> absolute path to app """ app_path = os.path.join(userdata.pw_dir, app_root) if not os.path.isdir(app_path): raise ClSelectExcept({'message': "No such application: %(app)s", 'context': {'app': app_root}}) return app_path def read_app_config(self, app_root, config_path, user=None): """ Reads file and returns its content as Base64 string :param app_root: str -> path to app relative to user home :param user: str -> username to resolve app path :param config_path: str -> file to be read (relative to app path) :return: dict """ result = OK_RES_DICT.copy() self.user_and_domain_checker(user, None) userdata = self._get_user_pwd_data(user) app_path = self._get_application_path(app_root, userdata) full_config_path = os.path.join(app_path, config_path) if not os.path.exists(full_config_path): raise ClSelectExcept( { 'message': "Configuration file not found: %(path)s", 'context': {'path': full_config_path} } ) try: with open(full_config_path, 'rb') as f: data = f.read() result.update({'data': base64.b64encode(data).decode()}) return result except Exception as e: return self._return_with_status_error(getattr(e, 'strerror', '')) def save_app_config(self, app_root, config_path, content, user=None): """ Saves data passed as Base64 string to specified file :param content: data for saving in app's config :param app_root: str -> path to app relative to user home :param user: str -> username to resolve app path :param config_path: str -> file to be read (relative to app path) :param content: str -> Base64-encoded string :return: dict """ self.user_and_domain_checker(user, None) userdata = self._get_user_pwd_data(user) app_path = self._get_application_path(app_root, userdata) full_config_path = os.path.join(app_path, config_path) try: with open(full_config_path, 'wb') as f: f.write(base64.b64decode(content)) return OK_RES_DICT except Exception as e: return self._return_with_status_error(getattr(e, 'strerror', '')) @staticmethod def _add_user_or_domain(user, domain, args): result_args = list(args) if domain is not None: result_args.extend(['--domain', domain]) elif user is not None: result_args.extend(['--user', user]) else: raise ClSelectExcept('User or domain parameter must be specified if current user is root') return result_args def uninstall_modules(self, app_root, modules, user=None, domain=None, skip_web_check=False): """ Uninstall described modules for user's webapp :param app_root: directory with webapp :param modules: comma-separated list of modules to uninstall :param user: name of unix user :param domain: domain of user :param skip_web_check: skip check web application after changing its properties :return: None """ self.user_and_domain_checker(user, domain) if self.interpreter != self.PYTHON_INTERPRETER: raise ClSelectExcept({ 'message': 'Uninstall command is available only for python interpreter, not %(interp)s', 'context': { 'interp': self.interpreter, }, }) try: for module in modules: self.selector_old_lib.uninstall(user, app_root, module) except clselect.clselectexcept.BaseClSelectException as e: return self._return_with_status_error(str(e)) else: return OK_RES_DICT def install_modules(self, app_root, user=None, domain=None, skip_web_check=False, spec_file=None, modules=()): """ Install described modules for user's webapp :type domain: domain of user :param user: name of unix user :param app_root: directory with webapp :param skip_web_check: skip check web application after change it's properties :param spec_file: file containing modules and their versions to install :param modules: list of installed modules :return: None """ self.user_and_domain_checker(user, domain) if self.interpreter == self.NODEJS_INTERPRETER: try: self.selector_old_lib.install(user, app_root, skip_web_check=skip_web_check, apps_manager=self.apps_manager) except clselect.clselectexcept.BaseClSelectException as e: return self._return_with_status_error(str(e)) else: return OK_RES_DICT elif self.interpreter == self.PYTHON_INTERPRETER: try: if modules: for module in modules: self.selector_old_lib.install(user, app_root, module, None, skip_web_check=skip_web_check, apps_manager=self.apps_manager) elif spec_file: self.selector_old_lib.install(user, app_root, None, spec_file, skip_web_check=skip_web_check, apps_manager=self.apps_manager) else: err = "Please, specify modules or requirements file with modules" return self._return_with_status_error(err) except clselect.clselectexcept.BaseClSelectException as e: return self._return_with_status_error(str(e)) else: return OK_RES_DICT else: raise ClSelectExcept({ 'message': 'Unknown interpreter: %(interp)s', 'context': { 'interp': self.interpreter, }, }) def destroy_app(self, app_root, user): """ Destroy specified application root directory and user name :param app_root: Application directory :param user: name of unix user :return: dict """ self.user_and_domain_checker(user, None) try: if self.interpreter == self.RUBY_INTERPRETER: clselect.clselectctlruby.destroy(user, app_root) else: try: doc_root = self.safely_resolve_doc_root_for_app(user, app_root) except TypeError: raise ClSelectExceptions.WrongData( message='No such application or it\'s broken. ' 'Unable to find app-root folder by this path %(app_root)s', context={ 'app_root': app_root } ) except ClSelectDomainNotFound: # We still want to clean up an application doc_root = None if self.interpreter in (self.NODEJS_INTERPRETER, self.PYTHON_INTERPRETER): self.selector_old_lib.destroy(user, app_root, doc_root=doc_root, apps_manager=self.apps_manager) else: raise ClSelectExceptions.InterpreterError( message='Unknown interpreter: %(interp)s', context={'interp': self.interpreter}) except clselect.clselectexcept.BaseClSelectException as e: return self._return_with_status_error(str(e)) return OK_RES_DICT def start_app(self, app_root, username): """ Start specified application root directory and user name :param app_root: Application directory :param username: name of unix user :return: dict """ self.user_and_domain_checker(username, None) try: doc_root = self.safely_resolve_doc_root_for_app(username, app_root) if self.interpreter in (self.NODEJS_INTERPRETER, self.PYTHON_INTERPRETER): self.selector_old_lib.start(username, app_root, doc_root, self.apps_manager) else: raise ClSelectExceptions.InterpreterError( message='Unknown interpreter: %(interp)s', context={'interp': self.interpreter}) return OK_RES_DICT except clselect.clselectexcept.BaseClSelectException as e: return self._return_with_status_error(str(e)) def restart_app(self, app_root, username): """ Destroy specified application root directory and user name :param app_root: Application directory :param username: name of unix user :return: dict """ self.user_and_domain_checker(username, None) try: doc_root = self.safely_resolve_doc_root_for_app(username, app_root) if self.interpreter == self.NODEJS_INTERPRETER: self.selector_old_lib.restart(username, app_root, doc_root, self.apps_manager) elif self.interpreter == self.PYTHON_INTERPRETER: self.selector_old_lib.restart(username, app_root, doc_root, self.apps_manager) else: raise ClSelectExceptions.InterpreterError( message='Unknown interpreter: %(interp)s', context={'interp': self.interpreter}) return OK_RES_DICT except clselect.clselectexcept.BaseClSelectException as e: return self._return_with_status_error(str(e)) def stop_app(self, app_root, user): """ Start specified application root directory and user name :param app_root: Application directory :param user: name of unix user :return: dict """ self.user_and_domain_checker(user, None) try: doc_root = self.safely_resolve_doc_root_for_app(user, app_root) if self.interpreter == self.NODEJS_INTERPRETER: self.selector_old_lib.stop(user, app_root, doc_root, self.apps_manager) elif self.interpreter == self.PYTHON_INTERPRETER: self.selector_old_lib.stop(user, app_root, doc_root, self.apps_manager) else: raise ClSelectExceptions.InterpreterError( message='Unknown interpreter: %(interp)s', context={'interp': self.interpreter}) return OK_RES_DICT except clselect.clselectexcept.BaseClSelectException as e: return self._return_with_status_error(str(e)) @staticmethod def _replace_old_env_and_prompt_in_binaries_in_environment(new_env, old_env, new_rel, old_rel): # type: (str, str, str, str) -> None """ Replace old prompt and env_var in binaries in new environment Working with bytes here, because of python binary """ old_prompt = ('(' + old_rel + ':').encode() new_prompt = ('(' + new_rel + ':').encode() for venv_bin_file in glob.glob(os.path.join(new_env, '*', 'bin', '*')): if not os.path.isdir(venv_bin_file): try: old_activate = file_read(venv_bin_file, mode='rb') if old_env.encode() in old_activate or old_prompt in old_activate: _new_activate = old_activate.replace(old_env.encode(), new_env.encode()) new_activate = _new_activate.replace(old_prompt, new_prompt) file_write(venv_bin_file, new_activate, 'wb') except IOError: _, _, traceback_ = sys.exc_info() sys.stderr.write(str(traceback.print_tb(traceback_))) def get_app_summary(self, username, app_config_dict_full, app_root): """ Retrieve application info from user's applications config. Analog of function clpassenger.summary :param username: User name :param app_config_dict_full: Full user's application config. :param app_root: Application root :return: Dictionary with application info Example: { 'binary': '/home/cltest1/virtualenv/new_app_root/2.7/bin/python', # + 'domain': 'cltest1.com', # + 'alias': 'app1', # + 'htaccess': '/home/cltest1/public_html/app1/.htaccess', # + 'interpreter': 'python', # + 'directory': '/home/cltest1/new_app_root', # + 'docroot': '/home/cltest1/public_html', # + 'domains': ['cltest1.com'] } """ try: app_config_dict = get_using_realpath_keys(username, app_root, app_config_dict_full) user_home = pwd.getpwnam(username).pw_dir doc_root = docroot(app_config_dict['domain'])[0] app_info_dict = { app_root: { 'interpreter': self.apps_manager.INTERPRETER, 'binary': self.apps_manager.get_binary_path(username, app_root, user_home), 'alias': app_config_dict['app_uri'], 'domain': app_config_dict['domain'], 'docroot': doc_root, 'htaccess': '/'.join([doc_root, app_config_dict['app_uri'], '.htaccess']), 'directory': '/'.join([user_home, app_root]), 'domains': [app_config_dict['domain']] } } return app_info_dict except KeyError: # we return empty dict because app doesn't exist return {} @staticmethod def _move_app_from_old_dir_to_new(old_directory, new_directory): # type: (str, str) -> None """ Move all items from old directory of application to new directory :param old_directory: full real path to old directory of applicaton :param new_directory: full real path to new directory of applicaton """ if not os.path.exists(new_directory): mkdir_p(new_directory) os.rename(old_directory, new_directory) else: # move all items from old directory to new for item in os.listdir(old_directory): shutil.move(os.path.join(old_directory, item), new_directory) os.rmdir(old_directory) def _relocate(self, user, old_directory, new_directory): """ Move user's application from directory to new_directory :param user: application owner. unix like user name :param old_directory: current directory with application :param new_directory: new directory for application :return: None """ full_config = self.apps_manager.get_user_config_data(user) try: app_config = get_using_realpath_keys(user, old_directory, full_config) except KeyError: raise ClSelectExceptions.NoSuchApplication( 'No such application (or application not configured) "%s"' % old_directory) old_abs, old_rel = get_abs_rel(user, old_directory) new_abs, new_rel = get_abs_rel(user, new_directory) try: # Directory name must not be one of the reserved names and # should not contain invalid symbols. clselectctl.check_directory(new_rel) except ValueError as e: raise ClSelectExceptions.WrongData(str(e)) # Get application summary for the application # Application summary example # {'new_app_root': # {'binary': '/home/cltest1/virtualenv/new_app_root/2.7/bin/python', # 'domain': 'cltest1.com', # 'alias': 'app1', # 'htaccess': '/home/cltest1/public_html/app1/.htaccess', # 'interpreter': 'python', # 'directory': '/home/cltest1/new_app_root', # 'docroot': '/home/cltest1/public_html', # 'domains': ['cltest1.com']} # } # TODO: why do we check only for applications of same type and not other (node/ruby/python)? new_user_summary = self.get_app_summary(user, full_config, new_rel) try: get_using_realpath_keys(user, new_rel, new_user_summary) except KeyError: pass else: raise ClSelectExceptions.AppRootBusy(new_abs) old_user_summary = self.get_app_summary(user, full_config, old_rel) try: old_user_app_summary = get_using_realpath_keys(user, old_rel, old_user_summary) except KeyError: raise ClSelectExceptions.WrongData("No such application (or application not configured) \"%s\"" % old_directory) doc_root = old_user_app_summary['docroot'] alias = old_user_app_summary['alias'] env_name = self.selector_old_lib._get_environment( user, old_directory, app_summary=old_user_summary[old_directory]).name if self.interpreter == self.PYTHON_INTERPRETER: ver, rel_venv = get_venv_rel_path(user, old_directory) _old_env, _ = get_abs_rel( user, rel_venv) _new_env, _ = get_abs_rel( user, get_venv_rel_path(user, new_directory, version=ver)[1]) elif self.interpreter == self.NODEJS_INTERPRETER: _old_env, _ = get_abs_rel(user, os.path.join( self.selector_user_lib.environments.DEFAULT_PREFIX, old_rel)) _new_env, _ = get_abs_rel(user, os.path.join( self.selector_user_lib.environments.DEFAULT_PREFIX, new_rel)) else: raise NotImplementedError() old_env = os.path.join(_old_env, '') new_env = os.path.join(_new_env, '') new_env_dir = os.path.dirname(_new_env) # needed to avoid copy in shutil.move if not os.path.exists(new_env_dir): os.makedirs(new_env_dir) shutil.move(old_env, new_env) self._move_app_from_old_dir_to_new(old_abs, new_abs) if self.interpreter == self.PYTHON_INTERPRETER: self._replace_old_env_and_prompt_in_binaries_in_environment(new_env, old_env, new_rel, old_rel) if self.interpreter == self.PYTHON_INTERPRETER: _, prefix = get_venv_rel_path(user, new_directory) elif self.interpreter == self.NODEJS_INTERPRETER: prefix = self.selector_old_lib._get_prefix(user, new_directory) else: raise NotImplementedError() environment = self.selector_user_lib.environments.Environment(env_name, user, prefix) binary = environment.interpreter().binary app_status = get_using_realpath_keys(user, old_directory, full_config)['app_status'] if app_status == APP_STARTED_CONST: # Clear .htaccess from CL's directives clpassenger.unconfigure(user, old_directory) passenger_log_file_to_set = full_config[old_directory].get('passenger_log_file', None) clpassenger.configure(user, new_directory, alias, self.interpreter, binary, doc_root=doc_root, startup_file=app_config['startup_file'], passenger_log_file=passenger_log_file_to_set) clpassenger.restart(user, new_directory) # update config delete_using_realpath_keys(user, old_directory, full_config) full_config[new_directory] = app_config self.apps_manager.write_full_user_config_data(user, full_config) def relocate(self, user, old_app_root, new_app_root): """ Call selectorctl to relocate application from old_app_root to new_app_root :param user: application owner :param old_app_root: current application directory (current application name) :param new_app_root: new application directory (new application name) :return: json """ try: if self.interpreter in (self.PYTHON_INTERPRETER, self.NODEJS_INTERPRETER): self._relocate(user, old_app_root, new_app_root) elif self.interpreter == self.RUBY_INTERPRETER: # last param in relocate not used clselect.clselectctlruby.relocate(user, old_app_root, new_app_root, None) except clselect.clselectexcept.BaseClSelectException as e: return self._return_with_status_error(str(e)) return OK_RES_DICT # TODO: we have something similar in clpassenger.move # one day we should remove one of that methods @staticmethod def _transit_htaccess_file(old_doc_root, old_alias, new_doc_root, new_alias): # type: (str, str, str, str) -> None """ :param old_doc_root: path to old doc root of application :param old_alias: old alias (uri) of application :param new_doc_root: path to new doc root of application :param new_alias: new alias (uri) of application :return: None """ # Copy existing .htaccess to new location htaccess = '.htaccess' # Get path to old .htaccess old_htaccess_file = os.path.join(old_doc_root, old_alias, htaccess) # Create path for new .htaccess new_htaccess_file = os.path.join(new_doc_root, new_alias, htaccess) if os.path.realpath(old_htaccess_file) \ == os.path.realpath(new_htaccess_file): return new_htaccess_path = os.path.dirname(new_htaccess_file) if not os.path.isdir(new_htaccess_path): os.makedirs(new_htaccess_path) shutil.copy(old_htaccess_file, new_htaccess_file) def _transit(self, user, directory, new_doc_root, new_domain, alias=None): """ Change application URI :param user: application owner. unix like user name :param directory: directory with application. (app-root) :param alias: new alias (app-uri) for application or None if change only the domain :param new_doc_root: NEW doc_root to transit application to :param new_domain: NEW domain to transit application to :return: None """ full_config = self.apps_manager.get_user_config_data(user) try: app_config = get_using_realpath_keys(user, directory, full_config) except KeyError: raise ClSelectExceptions.NoSuchApplication( 'No such application (or application not configured) "{}"'.format( directory)) apps_summary = self.get_app_summary(user, full_config, directory) try: old_app_summary = get_using_realpath_keys(user, directory, apps_summary) except KeyError: raise ClSelectExceptions.WrongData( 'No such application ' '(or application not configured) "{}"'.format(directory)) old_alias = old_app_summary['alias'] old_doc_root = old_app_summary['docroot'] new_alias = old_alias if alias is None else clselectctl.get_alias(alias) environment = self.selector_old_lib._get_environment( user, directory, app_summary=apps_summary[directory]) binary = environment.interpreter().binary if full_config[directory]['app_status'] == APP_STARTED_CONST: passenger_log_file_to_set = full_config[directory].get('passenger_log_file', None) clpassenger.configure(user, directory, new_alias, self.interpreter, binary, True, 'transit', doc_root=new_doc_root, startup_file=app_config['startup_file'], passenger_log_file=passenger_log_file_to_set) clpassenger.move(user, directory, old_alias, new_alias, old_doc_root=old_doc_root, new_doc_root=new_doc_root) clpassenger.restart(user, directory) else: # New doc root should be equal to old doc root # if we don't want to change domain for application. if new_doc_root is None: new_doc_root = old_doc_root self._transit_htaccess_file(old_doc_root, old_alias, new_doc_root, new_alias) app_config['app_uri'] = new_alias if new_domain is not None: app_config['domain'] = new_domain self.apps_manager.write_full_user_config_data(user, full_config) def transit(self, user, app_root, new_app_uri=None, new_domain=None): """ Call selectorctl to transit application to new_app_uri :param user: application owner :param app_root: application directory (application name) :param new_app_uri: new uri or None if change only the domain :param new_domain: new domain or None if change only the app_uri :return: json """ try: if new_domain is None: new_doc_root = None else: _, new_doc_root = self.safely_resolve_username_and_doc_root(user, new_domain) if self.interpreter in (self.PYTHON_INTERPRETER, self.NODEJS_INTERPRETER): self._transit(user, app_root, new_doc_root, new_domain, new_app_uri) elif self.interpreter == self.RUBY_INTERPRETER: clselect.clselectctlruby.transit(user, app_root, new_app_uri, doc_root=new_doc_root) else: raise NotImplementedError() except clselect.clselectexcept.BaseClSelectException as e: return self._return_with_status_error(str(e)) return OK_RES_DICT def _relocate_nodejs_extensions(self, user, app_root, new_env, old_extensions): # type: (str, str, NodeJsEnvironment, Set) -> None """ Install nodejs extensions to new nodejs environment and change symlink <user_homedir>/<app-root>/node_modules to new environment. Raise exception `WebAppError` if npm will return non-zero code """ npm_ret_code = 0 if old_extensions: try: npm_ret_code = new_env.extension_install_single_call(old_extensions) except ClSelectExceptions.FileProcessError: pass # Change symlink <user_homedir>/<app-root>/node_modules to new environment self.selector_old_lib._create_symlink_to_node_modules(user, new_env.path, app_root) if npm_ret_code != 0: raise ClSelectExceptions.WebAppError("Module installation has been failed. Please, check npm logs.") @staticmethod def _relocate_python_extensions(new_env, old_extensions): # type: (PythonEnvironment, Set) -> None """ Install python extensions to new python environment. They are equivalent to extensions from old environment. Remove python extensions which not existing in old environment from new environment """ new_extensions = set(new_env.extensions()) for extension in new_extensions - old_extensions: try: new_env.extension_uninstall(extension) except ClSelectExceptions.ExternalProgramFailed: # TODO: logging # https://cloudlinux.atlassian.net/browse/LVEMAN-1465 pass for extension in old_extensions - new_extensions: try: new_env.extension_install(extension) except ClSelectExceptions.ExternalProgramFailed: # TODO: logging # https://cloudlinux.atlassian.net/browse/LVEMAN-1465 pass def _change_version(self, user, directory, version=None, skip_web_check=False): """ Set current interpreter version for the application :param user: application owner. unix like user name :param directory: app_root - main directory with user application :param version: new version of python interpreter or None if we get current :param skip_web_check: skip check web application after change it's properties :return: None """ full_config = self.apps_manager.get_user_config_data(user) try: app_config = get_using_realpath_keys(user, directory, full_config) except KeyError: raise ClSelectExceptions.NoSuchApplication( 'No such application (or application not configured) "%s"' % directory) old_environment = self.selector_old_lib._get_environment(user, directory) # reads .htaccess if not version: return {old_environment.name: old_environment.interpreter().as_dict()} # SET new interpreter: new_environment = self.selector_old_lib._create_environment(user, directory, version) self._ensure_version_enabled(version, user) # Get extensions list for old environment installed_extensions = set(old_environment.extensions()) if self.interpreter == self.PYTHON_INTERPRETER: self._relocate_python_extensions(new_environment, installed_extensions) elif self.interpreter == self.NODEJS_INTERPRETER: self._relocate_nodejs_extensions(user, directory, new_environment, installed_extensions) # Reconfigure clpassenger user_summary_data = clpassenger.summary(user) app_summary = get_using_realpath_keys(user, directory, user_summary_data) doc_root = app_summary['docroot'] binary = new_environment.interpreter().binary alias, app_domain = self.selector_old_lib._get_info_about_webapp(app_summary, user) def action(): # Clear .htaccess clpassenger.unconfigure(user, directory) passenger_log_file_to_set = full_config[directory].get('passenger_log_file', None) clpassenger.configure(user, directory, alias, self.interpreter, binary, doc_root=doc_root, startup_file=app_config['startup_file'], passenger_log_file=passenger_log_file_to_set) # Create restart.txt file in tmp directory clpassenger.restart(user, directory) full_config[directory]['%s_version' % self.interpreter] = version self.apps_manager.write_full_user_config_data(user, full_config) if not skip_web_check: try: self.selector_old_lib.check_response_from_webapp( domain=app_domain, alias=alias, action=action, ) except ClSelectExceptions.WebAppError as err: raise ClSelectExceptions.WebAppError('An error occured during changing version. %s' % err) else: action() def change_version(self, user, app_root, new_version, skip_web_check): """ Call selectorctl to change current interpreter version to new_version for application :param user: application owner :param app_root: application directory (application name) :param new_version: new nodejs interpreter version :param skip_web_check: skip check web application after change it's properties :return: json """ try: if self.interpreter not in (self.NODEJS_INTERPRETER, self.PYTHON_INTERPRETER): raise NotImplementedError # forbid user to change python version unless he migrates application # to new python selector version if self.interpreter == self.PYTHON_INTERPRETER: app_version = self.apps_manager.get_app_config(user, app_root).get( 'app_version', PythonAppFormatVersion.LEGACY) if app_version == PythonAppFormatVersion.LEGACY: return { 'status': 'This application was created by too old version ' 'of python selector and we cannot change version ' 'without migration to the new application format. ' 'To do that you can use `cloudlinux-selector migrate` ' 'command or just click button in web UI.', } if self.apps_manager.get_app_status(user, app_root) == APP_STARTED_CONST: self._change_version(user, app_root, new_version, skip_web_check=skip_web_check) else: # Supplied application is stopper - run special function for change interpreter version for it self._change_version_for_stopped_app(user, app_root, new_version) return OK_RES_DICT except clselect.clselectexcept.BaseClSelectException as e: return self._return_with_status_error(str(e)) def set_variables_for_litespeed(self, user, app_root, env_vars): if self.interpreter == self.PYTHON_INTERPRETER or self.interpreter == self.NODEJS_INTERPRETER: doc_root = self.safely_resolve_doc_root_for_app(user, app_root) # need for add_env_vars_for_htaccess try: self.apps_manager.add_env_vars_for_htaccess(user, app_root, env_vars, doc_root) except Exception as err: raise ClSelectExceptions.WebAppError( "Unable to set environment variables in htaccess file for the application." "Error: {}".format(err) ) def _set_variables(self, user, directory, app_mode, env_vars, startup_file, entry_point, config_files, passenger_log_file): """ Set application mode, environment variables and startup_file for application :param config_files: names of config files (such as requirements.txt or etc) :param entry_point: the specified entrypoint for application :param user: application owner. unix like user name :param directory: directory with application :param app_mode: expected application mode :param env_vars: dict with environment variables for application :param startup_file: main file for application :param passenger_log_file: Passenger log filename :return: None """ full_config = self.apps_manager.get_user_config_data(user) try: app_config_data = get_using_realpath_keys(user, directory, full_config) except KeyError: raise ClSelectExceptions.NoSuchApplication( 'No such application (or application not configured) "%s"' % directory) if self.interpreter == self.PYTHON_INTERPRETER \ and env_vars is not None \ and app_config_data['env_vars'] != env_vars \ and app_config_data['app_version'] == PythonAppFormatVersion.LEGACY: raise ClSelectExceptions.WebAppError( "Unable to set environment variables. " "Application was created too long time ago. " "Please, migrate your application to newer version " "before changing interpreter version" ) # Update user's config if app_mode is not None: app_config_data['app_mode'] = app_mode if env_vars is not None: # Python env vars task # TODO: LVEMAN-1466 app_config_data['env_vars'] = env_vars if entry_point is not None: app_config_data['entry_point'] = entry_point if config_files is not None: app_config_data['config_files'] = config_files if startup_file is not None and startup_file != app_config_data.get('startup_file') or \ passenger_log_file is not None: if startup_file is not None and startup_file != app_config_data.get('startup_file'): # Startup file changing app_config_data['startup_file'] = startup_file if self.interpreter == self.PYTHON_INTERPRETER: startup_file_full_path = self.selector_old_lib._get_full_path_to_startup_file( user, directory, startup_file ) # We are using the main startup file with name `passenger_wsgi.py` in case custom name of # startup file, because passenger doesn't support directive for set entry_point. # In tje file `passenger_wsgi.py` we can set entry_point and custom name of startup file. self.selector_old_lib.setup_wsgi(user, directory, startup_file_full_path, entry_point) if passenger_log_file is not None: # Set/remove PassengerAppLogFile # Remove passenger log from app config if passenger_log_file path is empty app_config_data['passenger_log_file'] = None if passenger_log_file == '' else passenger_log_file env = self.selector_old_lib._get_environment(user, directory) user_summary = clpassenger.summary(user) user_app_summary = get_using_realpath_keys(user, directory, user_summary) alias = user_app_summary['alias'] binary = env.interpreter().binary doc_root = user_app_summary['docroot'] htaccess_path = user_app_summary['htaccess'] clpassenger._unconfigure(htaccess=htaccess_path) # If main file is not configured, use default value if startup_file is None and app_config_data.get('startup_file') is None: clpassenger.configure(user, directory, alias, self.interpreter, binary, doc_root=doc_root, passenger_log_file=passenger_log_file) else: clpassenger.configure(user, directory, alias, self.interpreter, binary, doc_root=doc_root, startup_file=app_config_data.get('startup_file'), passenger_log_file=passenger_log_file) clpassenger.restart(user, directory) self.apps_manager.write_full_user_config_data(user, full_config) self.set_variables_for_litespeed(user, directory, env_vars) def set_variables(self, user, app_root, app_mode, env_vars, startup_file, entry_point, config_files, passenger_log_file): """ Call selectorctl to set variables for application :param config_files: names of config files (such as requirements.txt or etc) (only for python) :param entry_point: the specified entrypoint for application (only for python) :param user: application owner :param app_root: application directory (application name) :param app_mode: application mode :param env_vars: json_string with environment variables for application :param startup_file: main file for application :param passenger_log_file: Passenger log filename :return: json """ if env_vars is not None: try: env_dict = validate_env_vars(json.loads(env_vars)) except (TypeError, ValueError): return self._return_with_status_error('wrong json format for environment variable list') else: env_dict = None try: if self.interpreter in (self.NODEJS_INTERPRETER, self.PYTHON_INTERPRETER) and \ self.apps_manager.get_app_status(user, app_root) == APP_STARTED_CONST: self._set_variables(user, app_root, app_mode, env_dict, startup_file, entry_point, config_files, passenger_log_file) else: # Supplied application is stopped - run special function for change a few variables of application self._set_variables_for_stopped_app(user, app_root, app_mode, env_dict, startup_file, entry_point, config_files, passenger_log_file) return OK_RES_DICT except clselect.clselectexcept.BaseClSelectException as e: return self._return_with_status_error(str(e)) def get_apps_users_info(self, user=None): """ Retrieves info about all installed interpreters and user(s) applictions :param user: User name for read applictions. If None all users will be processed :return: Dict with info """ try: result_dict = self.apps_manager.get_applications_users_info(user) return result_dict except BaseSelectorError as e: return {'result': e.message, 'context': e.context} # pylint: disable=exception-message-attribute def _ensure_version_enabled(self, new_version, username): """ Check whether particular interpreter version is enabled and raises exception if not :param username: user to include in exception :param new_version: new interpreter version """ if not self.selector_manager.is_version_enabled(new_version): raise clselect.ClSelectExcept.UnableToSetAlternative(username, new_version, 'version is not enabled') def _change_version_for_stopped_app(self, username, app_root, new_version): """ Changes version for stopped application :param username: application owner :param app_root: application directory (application name) :param new_version: new nodejs interpreter version :return: None """ self._ensure_version_enabled(new_version, username) # Get extensions list fom old environment old_version = self.apps_manager.get_interpreter_version_for_app(username, app_root) old_environment = self.selector_old_lib._create_environment(username, app_root, old_version, None) old_extensions = set(old_environment.extensions()) # Create new environment new_environment = self.selector_old_lib._create_environment(username, app_root, new_version, None) # install extension to new app for extension in old_extensions: try: new_environment.extension_install(extension) except clselect.clselectexcept.ClSelectExcept.ExternalProgramFailed: pass # Update user's app config app_config = self.apps_manager.get_user_config_data(username) app_config[app_root]['%s_version' % self.interpreter] = new_version self.apps_manager.write_full_user_config_data(username, app_config) def _set_variables_for_stopped_app(self, username, app_root, app_mode, env_vars_dict, startup_file, entry_point, config_files, passenger_log_file): """ Sets new app_mode, environment variables and startup file for stopped NodeJS application :param config_files: names of config files (such as requirements.txt or etc) (only for python) :param entry_point: the specified entrypoint for application (only for python) :param str username: application owner :param str app_root: application directory (application name) :param str app_mode: New application mode, can be None :param dict env_vars_dict: New environment variables, can be None :param startup_file: New startup file, can be None :param passenger_log_file: Passenger log filename :return: None """ # Update user's app config app_config = self.apps_manager.get_user_config_data(username) if app_mode: app_config[app_root]['app_mode'] = app_mode if env_vars_dict: app_config[app_root]['env_vars'] = env_vars_dict if startup_file and startup_file != app_config[app_root].get('startup_file'): app_config[app_root]['startup_file'] = startup_file if entry_point: app_config[app_root]['entry_point'] = entry_point if config_files: app_config[app_root]['config_files'] = config_files if passenger_log_file is not None and passenger_log_file != '': app_config[app_root]['passenger_log_file'] = passenger_log_file else: app_config[app_root]['passenger_log_file'] = None self.apps_manager.write_full_user_config_data(username, app_config) self.set_variables_for_litespeed(username, app_root, env_vars_dict) @staticmethod def get_major_version_from_short(version): """ Retrieves major version from full. If already short, return it with no difference :param version: Full/short :return: Short version as string """ return str(int(version.split('.')[0])) @staticmethod def replace_mysqli(): """ Replace mysqli extension to nd_mysqli for defaults. Warning: only for PHP. See LVEMAN-1399 for details :return: """ # Get available alt-php versions list. # For example: ['5.1', '5.2', '5.3', '5.4', '5.5', '5.6', '4.4', '7.2', '7.0', '7.1'] alt_php_versions_list = list(ClSelect('php').get_all_alternatives_data().keys()) cl_ext_select = ClExtSelect() # php by default for alt_php_ver in alt_php_versions_list: # Replace mysqli -> nd_mysqli for the version for new installations according to LVEMAN-1399 cl_ext_select.list_extensions(alt_php_ver) @classmethod def setup_selector(cls): """ Setup php selector for work (suggested to use after native php is installed) """ subprocess.check_output(['cagefsctl', '--force-update']) subprocess.check_output(['cagefsctl', '--remount-all']) subprocess.check_output(['/usr/sbin/cloudlinux-selector', 'make-defaults-config', '--json', '--interpreter', 'php']) subprocess.check_output(['/usr/share/l.v.e-manager/utils/cache_phpdata.py']) cls.replace_mysqli() def run_import_applications(self): """ Scan users home dirs for .htaccess files and import applications to new config file. """ if self.interpreter != self.PYTHON_INTERPRETER: raise NotImplementedError # We don't need to import apps for DA elif detect.is_da(): return OK_RES_DICT try: self.apps_manager.import_legacy_applications_to_config() return OK_RES_DICT except Exception as e: return self._return_with_status_error(str(e)) def run_migrate_application(self, user, app_root): """ Convert applications created in older selector versions to new format """ if self.interpreter != self.PYTHON_INTERPRETER: raise NotImplementedError("Migration is only available " "for python selector") self.apps_manager.migrate_application(user, app_root) return OK_RES_DICT