Failed to save the file to the "xx" directory.

Failed to save the file to the "ll" directory.

Failed to save the file to the "mm" directory.

Failed to save the file to the "wp" directory.

403WebShell
403Webshell
Server IP : 66.29.132.124  /  Your IP : 18.119.116.77
Web Server : LiteSpeed
System : Linux business141.web-hosting.com 4.18.0-553.lve.el8.x86_64 #1 SMP Mon May 27 15:27:34 UTC 2024 x86_64
User : wavevlvu ( 1524)
PHP Version : 7.4.33
Disable Function : NONE
MySQL : OFF  |  cURL : ON  |  WGET : ON  |  Perl : ON  |  Python : ON  |  Sudo : OFF  |  Pkexec : OFF
Directory :  /opt/cloudlinux/venv/lib64/python3.11/site-packages/clcommon/cpapi/plugins/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ Back ]     

Current File : /opt/cloudlinux/venv/lib64/python3.11/site-packages/clcommon/cpapi/plugins/ispmanager.py
# -*- coding: utf-8 -*-

import re
import os
import grp
import subprocess
from typing import Dict  # NOQA

from clcommon import mysql_lib
from clcommon.cpapi.cpapiexceptions import NotSupported, NoDBAccessData
from clcommon.cpapi.GeneralPanel import GeneralPanelPluginV1
from clcommon.features import Feature

__cpname__ = 'ISPManager'


def _is_5_version():
    return os.path.isfile('/usr/local/mgr5/sbin/mgrctl')


# WARN: Probably will be deprecated for our "official" plugins.
# See pluginlib.detect_panel_fast()
def detect():
    return os.path.isfile('/usr/local/ispmgr/bin/ispmgr') or _is_5_version()


ISP_DB_CONF = '/usr/local/ispmgr/etc/ispmgr.conf'
ISP5_DB_CONF = '/usr/local/mgr5/etc/ispmgr.conf.d/db.conf'

SECTION_PATTERN = r'(\S+) "([^"]+)" {([^}]+)}'
KEYWORDS_PATTERN = r'(\S+)\s+(\S+)'


def conf_pars(sectype, secname=None, seckeys=None, path=ISP_DB_CONF):
    """
    /usr/local/ispmgr/etc/ispmgr.conf parser
    :param sectype: Type sector for example: Service or DbServer or Account
    :param secname: Name sector. May be different
    :param seckeys: Name key for retrieving and filtering
    :param path: path to config file default /usr/local/ispmgr/etc/ispmgr.conf
    :return: list
    """
    seckeys_filter = {}
    seckeys_extracted = None
    if seckeys:
        seckeys_extracted = []
        for key_val in seckeys:
            key_val_splited = key_val.split()
            if len(key_val_splited) == 2:
                seckeys_filter.update(dict([key_val_splited]))
                seckeys_extracted.append(key_val_splited[0])
            elif len(key_val_splited) == 1:
                seckeys_extracted.append(key_val_splited[0])
    with open(path, encoding='utf-8') as f:
        result_list = []
        for stype, sname, sbody in re.findall(SECTION_PATTERN, f.read()):
            blst = re.findall(KEYWORDS_PATTERN, sbody)
            if stype == sectype and secname in (None, secname):
                result = dict([(k, v)
                               for k, v in blst
                               if seckeys_extracted is None or k in seckeys_extracted])

                if set(seckeys_filter.items()).issubset(set(result.items())):
                    result_list.append(result)

    return result_list


def _db_access_5():
    try:
        with open(ISP5_DB_CONF, encoding='utf-8') as db_conf:
            cnf = dict(re.findall(KEYWORDS_PATTERN, db_conf.read()))
            return {'pass': cnf['DBPassword'], 'login': cnf['DBUser'], 'host': cnf['DBHost'], 'db': 'mysql'}
    except IOError as e:
        raise NoDBAccessData(f'Can not open config file {KEYWORDS_PATTERN}') from e
    except IndexError as e:
        raise NoDBAccessData(
            f'Can not find database access data in config file {KEYWORDS_PATTERN}'
        ) from e


def db_access(_conf_path=ISP_DB_CONF):
    if _is_5_version():
        # ISP Manager 5
        return _db_access_5()
    # ISP Manager 4
    access = {}
    access_list = conf_pars(sectype='DbServer', seckeys=('Hostname', 'Password', 'Type mysql', 'User'), path=_conf_path)
    for access_from_conf in access_list:
        try:
            access['pass'] = access_from_conf['Password']
            access['login'] = access_from_conf['User']
            access['host'] = access_from_conf['Hostname']
            access['db'] = 'mysql'
            return access
        except KeyError:
            pass
    raise NoDBAccessData(
        f'Can not find database access data for localhost in config file {_conf_path}'
    )


def _dbname_dblogin_pairs(access):
    dbhost = access.get('host', 'localhost')
    dblogin = access['login']
    dbpass = access['pass']
    sql = 'SELECT db.Db, db.User FROM db GROUP BY db.User, db.Db'
    connector = mysql_lib.MySQLConnector(host=dbhost, user=dblogin,
                                         passwd=dbpass, db='mysql')
    with connector.connect() as db:
        return db.execute_query(sql)


def cpusers():
    raise NotSupported({
        'message': '%(action)s is not currently supported.',
        'context': {'action': 'Getting all users registered in the Control Panel'}
    })


def _dbname_cplogin_pairs_iter(cplogin_lst=None, _conf_path=ISP_DB_CONF):
    """
    Extract (database name <=> control panel login) pairs from ISPmanager config file
    :param cplogin_lst:
    :param _conf_path:
    :return:
    """
    grpid_login_dict = dict([(grp_tuple[2], grp_tuple[0]) for grp_tuple in grp.getgrall()])
    with open(_conf_path, encoding='utf-8') as f:
        for line_numb, line in enumerate(f):
            if line.startswith('DbAssign '):
                line_splited = line.split()
                if len(line_splited) >= 3:
                    dbname, user_uid = line_splited[2:]
                    try:
                        cplogin = grpid_login_dict.get(int(user_uid))
                        if cplogin is None:
                            print(f'WARNING: can not find group name with id {user_uid}; '
                                  f'line {line_numb} in file {_conf_path}')
                    except ValueError:  # if can not convert user_uid to int
                        print(f'WARNING: can not pars line {line_numb} in file {_conf_path}')
                        cplogin = None
                    if cplogin and (cplogin_lst is not None or cplogin in cplogin_lst):
                        yield dbname, cplogin


def get_user_login_url(domain):
    return f'https://{domain}:1500'


class PanelPlugin(GeneralPanelPluginV1):
    def __init__(self):
        super().__init__()

    def getCPName(self):
        """
        Return panel name
        :return:
        """
        return __cpname__

    def get_cp_description(self):
        """
        Retrieve panel name and it's version
        :return: dict: { 'name': 'panel_name', 'version': 'panel_version', 'additional_info': 'add_info'}
            or None if can't get info
        """
        ####################################################################
        # ISP Manager v4 and v5 (master and node) check
        # ISPmanager v5 check and detect its type - master/node
        # ISP Manager support recommendation:
        # Node has package ispmanager-node-common-5.56.0-3.el6.x86_64
        # Master has packages ispmanager-business-common-5.56.0-3.el6.x86_64 and
        #  ispmanager-business-5.56.0-3.el6.x86_64, may have ispmanager-node-common-5.56.0-3.el6.x86_64

        def _get_isp_manager_v4_description():
            """
            Handle ISP Manager v4 specific version retrieval.
            """
            with subprocess.Popen(
                ['/usr/local/ispmgr/bin/ispmgr', '-v'],
                stdout=subprocess.PIPE,
                stderr=subprocess.PIPE,
                text=True,
            ) as proc:
                out, _ = proc.communicate()

            version = out.replace('\n', '').split()[1]
            return {'name': __cpname__, 'version': version, 'additional_info': None}

        def _get_isp_manager_v5_description():
            """
            Handle ISP Manager v5 specific checks and version retrieval.
            """
            packages = [
                ('ispmanager-business-common', 'Master'),
                ('ispmanager-node-common', 'Node'),
                ('ispmanager-lite-common', 'Lite'),
            ]
            for package, node_type in packages:
                version = _query_package_version(package)
                if version is not None:
                    return {
                        'name': 'ISP Manager',
                        'version': version,
                        'additional_info': node_type,
                    }
            raise RuntimeError('Failed to detect version of ISP manager')

        def _query_package_version(package_name):
            """
            Query the version of a package using rpm.
            """
            with subprocess.Popen(
                ['/bin/rpm', '-q', '--queryformat', '[%{VERSION}]', package_name],
                stdout=subprocess.PIPE,
                stderr=subprocess.PIPE,
                text=True,
            ) as proc:
                version, _ = proc.communicate()
                if proc.returncode != 0 or not version:
                    return None
            return version.strip()

        try:
            if os.path.isfile('/usr/local/mgr5/sbin/mgrctl'):
                # ISP Manager v5
                return _get_isp_manager_v5_description()
            else:
                # ISPmanager v4
                return _get_isp_manager_v4_description()
        except Exception:
            return None

    def db_access(self):
        """
        Getting root access to mysql database.
        For example {'login': 'root', 'db': 'mysql', 'host': 'localhost', 'pass': '9pJUv38sAqqW'}

        :return: root access to mysql database
        :rtype: dict
        :raises: NoDBAccessData
        """
        return db_access()

    @staticmethod
    def useraliases(cpuser, domain):
        """
        Return aliases from user domain
        :param str|unicode cpuser: user login
        :param str|unicode domain:
        :return list of aliases
        """
        return []

    def cpusers(self):
        """
        Generates a list of cpusers registered in the control panel

        :return: list of cpusers registered in the control panel
        :rtype: tuple
        """
        return cpusers()

    def get_user_login_url(self, domain):
        """
        Get login url for current panel;
        :type domain: str
        :rtype: str
        """
        return get_user_login_url(domain)

    def get_unsupported_cl_features(self) -> tuple[Feature, ...]:
        return (
            Feature.RUBY_SELECTOR,
            Feature.PYTHON_SELECTOR,
            Feature.NODEJS_SELECTOR,
            Feature.RESELLER_LIMITS,
            Feature.WPOS,
        )

Youez - 2016 - github.com/yon3zu
LinuXploit