Failed to save the file to the "xx" directory.

Failed to save the file to the "ll" directory.

Failed to save the file to the "mm" directory.

Failed to save the file to the "wp" directory.

403WebShell
403Webshell
Server IP : 66.29.132.124  /  Your IP : 3.135.215.149
Web Server : LiteSpeed
System : Linux business141.web-hosting.com 4.18.0-553.lve.el8.x86_64 #1 SMP Mon May 27 15:27:34 UTC 2024 x86_64
User : wavevlvu ( 1524)
PHP Version : 7.4.33
Disable Function : NONE
MySQL : OFF  |  cURL : ON  |  WGET : ON  |  Perl : ON  |  Python : ON  |  Sudo : OFF  |  Pkexec : OFF
Directory :  /opt/cloudlinux/venv/lib/python3.11/site-packages/clcommon/cpapi/plugins/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ Back ]     

Current File : /opt/cloudlinux/venv/lib/python3.11/site-packages/clcommon/cpapi/plugins/plesk.py
# -*- coding: utf-8 -*-

import os
import re
import time
import xml.etree.ElementTree as ETree
from collections import defaultdict
from functools import wraps
from traceback import format_exc
from typing import List, Any, Tuple, Dict, AnyStr, Optional, Union # NOQA
from urllib.parse import urlparse

from clcommon import ClPwd, mysql_lib
from clcommon.features import Feature
from clcommon.cpapi.cpapiexceptions import (
    NotSupported, NoPanelUser, NoPackage, NoDomain, DuplicateData
)
from clcommon.clfunc import uid_max
from clcommon.cpapi.GeneralPanel import GeneralPanelPluginV1, PHPDescription, DomainDescription
from clcommon.cpapi.cpapicustombin import get_domains_via_custom_binary, _docroot_under_user_via_custom_bin
from clcommon.utils import run_command, find_module_param_in_config, ExternalProgramFailed

PSA_SHADOW_PATH = "/etc/psa/.psa.shadow"
SUPPORTED_CPINFO = {'cplogin', 'package', 'mail', 'reseller', 'dns', 'locale'}
UID_MAX = uid_max()


__cpname__ = 'Plesk'


# WARN: Probably will be deprecated for our "official" plugins.
# See pluginlib.detect_panel_fast()
def detect():
    return os.path.isfile('/usr/local/psa/version')


def db_access(_pass_path=PSA_SHADOW_PATH):
    access = {}
    access['login'] = 'admin'
    with open(_pass_path, 'r', encoding='utf-8') as f:
        access['pass'] = f.read().strip()
    return access


def query_sql(query, data=None, _access=None, _dbname='psa', as_dict=False):
    """
    Return the result of a Plesk database query

    :param query: SQL query string with possible parameters
    :param data: arguments for the SQL parameter insertion
    :param _access: database authentication data
    :param _dbname: the name of the database
    :param as_dict: controls the format of the output data
    :type query: str
    :type _access: dict
    :type as_dict: bool
    :return:
        Tuple of rows according to the query in the format specified by as_dict
    :rtype: tuple(tuple) or tuple(dict)
    """
    # Example of returned data:
    # >>> query_sql('SELECT login from sys_users')
    # ((u'cltest',), (u'cltest3',), (u'user2',), (u'user1tst',))
    # >>> query_sql('SELECT login from sys_users', as_dict=True)
    # ({'login': u'cltest'},
    #  {'login': u'cltest3'},
    #  {'login': u'user2'},
    #  {'login': u'user1tst'})
    access = _access or db_access()
    dbhost = access.get('host', 'localhost')
    dblogin = access['login']
    dbpass = access['pass']
    connector = mysql_lib.MySQLConnector(host=dbhost, user=dblogin, passwd=dbpass,
                                         db=_dbname, use_unicode=True, charset='utf8',
                                         as_dict=as_dict)
    with connector.connect() as db:
        return db.execute_query(query, args=data)


def cpusers(_access=None, _dbname='psa'):
    cpusers_lst = [fetched_one[0] for fetched_one in cpinfo(keyls=('cplogin', ))]
    return cpusers_lst


def resellers():
    sql = "SELECT clients.login FROM clients WHERE clients.type='reseller'"
    return [cplogin for (cplogin, ) in query_sql(sql)]


def admins():
    sql = "SELECT clients.login FROM clients WHERE clients.type='admin'"
    return set([cplogin for (cplogin, ) in query_sql(sql)])


def is_reseller(username):
    sql = "SELECT clients.type FROM clients WHERE clients.login=%s"
    try:
        return query_sql(sql, (username,))[0][0] == 'reseller'
    except IndexError:
        return False


def _sys_users_info(sys_login, keyls):
    # type: (Any[str, None], Tuple[str]) -> List[Tuple]
    # Templates.name can be None and it is ok
    mapping = {
        'cplogin': 'sys_users.login AS cplogin',
        'mail': 'clients.email AS email',
        'reseller': 'reseller.login AS reseller',
        'dns': 'domains.name AS dns',
        'locale': 'clients.locale AS local',
        'package': 'Templates.name AS package'
    }

    select_query = ', '.join([mapping[key] for key in keyls])
    sql = rf"""SELECT {select_query}
                FROM sys_users
                JOIN hosting ON hosting.sys_user_id=sys_users.id
                JOIN domains ON hosting.dom_id=domains.id AND domains.webspace_id=0
                JOIN clients ON clients.id=domains.cl_id
                JOIN clients reseller ON reseller.id=domains.vendor_id
                LEFT JOIN Subscriptions ON Subscriptions.object_type = "domain" AND domains.id = Subscriptions.object_id
                LEFT JOIN PlansSubscriptions ON PlansSubscriptions.subscription_id = Subscriptions.id
                LEFT JOIN Templates AS Templates ON Templates.id = PlansSubscriptions.plan_id AND "domain" = Templates.type
                """

    # make query like "where x in (%s, %s, %s, ...)"
    if isinstance(sys_login, (list, tuple)):
        placeholders = ','.join(['%s'] * len(sys_login))
        sql += rf" WHERE sys_users.login IN ({placeholders})"

    users = query_sql(sql, data=sys_login)
    return users


def _resellers_info(sys_login, keyls):
    # type: (Any[str, None], Tuple[str]) -> List[Tuple]
    # items with 'NULL' are not available for this panel
    mapping = {
        'cplogin': 'clients.login AS cplogin',
        'mail': 'clients.email AS email',
        'reseller': 'NULL as reseller',
        'dns': 'NULL as dns',
        'locale': 'clients.locale AS local',
        'package': 'NULL as package'
    }

    select_query = ', '.join([mapping[key] for key in keyls])
    sql = f"SELECT {select_query} FROM clients WHERE clients.type IN (\"reseller\", \"admin\")"

    # make query like "where x in (%s, %s, %s, ...)"
    if isinstance(sys_login, (list, tuple)):
        placeholders = ','.join(['%s'] * len(sys_login))
        sql += rf" AND clients.login IN ({placeholders})"

    users = query_sql(sql, data=sys_login)
    return users


def cpinfo(cpuser=None, keyls=('cplogin', 'package', 'mail', 'reseller', 'dns', 'locale'),
           search_sys_users=True):
    """
    Get info about user[s] or about reseller[s].
    :param str|None cpuser: get info about specified login, None for all
    :param list|tuple keyls: keys to return
    :param bool search_sys_users: work with sys users or with resellers
    :rtype: tuple[tuple]
    """
    if isinstance(cpuser, str):
        cpuser = [cpuser]

    # just for developers
    for key in keyls:
        if key not in SUPPORTED_CPINFO:
            raise NotSupported(f'Key {key} is not supported for this control panel. '
                               f'Available keys: {SUPPORTED_CPINFO}')

    if search_sys_users:
        return _sys_users_info(cpuser, keyls)
    return _resellers_info(cpuser, keyls)


def get_admin_email(*args, **kwargs):
    try:
        return query_sql(r"SELECT val FROM misc WHERE param='admin_email';")[0][0]
    except IndexError:
        return None


def docroot_basic(domain):
    # type: (str) -> Any[None, Tuple[str, str]]
    sql = r"""
    SELECT hosting.www_root, sys_users.login
      FROM hosting
      JOIN domains ON hosting.dom_id=domains.id
      JOIN sys_users ON hosting.sys_user_id=sys_users.id
    WHERE domains.name=%s
    """
    try:
        return query_sql(sql, data=(domain,))[0]
    except IndexError as e:
        raise NoDomain(f'Cannot obtain document root for {domain}') from e


def docroot(domain):
    # type: (str) -> Any[None, Tuple[str, str]]
    res = None
    domain = domain.strip()

    uid = os.getuid()
    euid = os.geteuid()
    if euid == 0 and uid == 0:
        res = docroot_basic(domain)
    else:
        res = _docroot_under_user_via_custom_bin(domain)

    # If there was successful result, res object will have
    # (doc_root, domain_user) format. If there wasn't found any correct
    # doc_roots, res will be None.
    if res is not None:
        return res
    raise NoDomain(f"Can't obtain document root for domain '{domain}'")


def reseller_users(resellername):
    """
    Return reseller users
    :param resellername: reseller name; return empty list if None
    :return list[str]: user names list
    """
    if resellername is None:
        return []
    sql = """
        SELECT sys_users.login
        FROM clients as reseller
        JOIN domains ON domains.vendor_id=reseller.id
        JOIN hosting ON hosting.dom_id=domains.id
        JOIN sys_users ON hosting.sys_user_id=sys_users.id
        WHERE domains.webspace_id=0 AND reseller.login=%s;
    """
    return [sys_login for (sys_login,) in query_sql(sql, data=(resellername,))]


def memoize(f):

    cache = {'userdomains_map': {}}

    @wraps(f)
    def wrapper(cpuser, *args, **kwargs):
        if cpuser not in cache['userdomains_map']:
            cache['userdomains_map'] = f(cpuser, *args, **kwargs)
        return cache['userdomains_map'][cpuser]

    return wrapper


@memoize
def userdomains_basic(cpuser, _access=None, _dbname='psa'):
    """
    Return domains of given user

    :param str cpuser: Username
    :param str _dbname: Database name where is located data
    :return:
        List of domains pairs such as (domain_name, None) to be suitable for
        domain_lib, starting from a main domain.
    :rtype: list of tuples
    :raises NoPanelUser: User is not found in Plesk database.
    """
    # WARN: ORDER BY columns must be present in SELECT columns for newer Mysql
    # webspace_id == 0 is main domain
    sql = r"""
    SELECT DISTINCT su.login, d.name, h.www_root, d.webspace_id
    FROM domains as d, hosting as h, sys_users as su
    WHERE h.sys_user_id = su.id AND h.dom_id = d.id
    ORDER BY d.webspace_id ASC;
    """
    # data:
    # (
    #   (u'customer1', u'customer1.org', 10L),
    #   (u'customer1', u'mk.customer1.org.customer1.org', 10L)
    # )
    data = query_sql(sql, as_dict=True, _access=_access)
    # _user_to_domains_map:
    # { 'user1': [('user1.org', '/var/www/vhosts/user1.com/httpdocs'),
    #             ('mk.user1.org', '/var/www/vhosts/user1.com/mk.user1.org')] }
    _user_to_domains_map = defaultdict(list)
    for data_dict in data:
        _user_to_domains_map[data_dict['login']].append(
            (data_dict['name'], data_dict['www_root']))
    if cpuser not in _user_to_domains_map:
        raise NoPanelUser(
            f'User {cpuser} not found in the database')
    return _user_to_domains_map


def userdomains(cpuser, _access=None, _dbname='psa', as_root=False):
    """
    Return domains of given user

    :param str cpuser: Username
    :param str _dbname: Database name where is located data
    :return:
        List of domains pairs such as (domain_name, None) to be suitable for
        domain_lib, starting from a main domain.
    :rtype: list of tuples
    :raises NoPanelUser: User is not found in Plesk database.
    """
    euid = os.geteuid()
    if euid == 0 or _access or as_root:
        return userdomains_basic(cpuser, _access, _dbname)

    # this case works the same as above but through the rights escalation binary wrapper
    # call path: here -> binary -> python(diradmin euid) -> userdomains(as_root=True) -> print json result to stdout
    rc, res = get_domains_via_custom_binary()
    if rc == 0:
        return res
    elif rc == 11:
        raise NoPanelUser(f'User {cpuser} not found in the database')
    else:
        raise ExternalProgramFailed(f'Failed to get userdomains: {res}')


def domain_owner(domain, _access=None, _dbname='psa'):
    """
    Return domain owner
    :param str domain: Domain/sub-domain/add-domain name
    :param str _dbname: Database name where is located data
    :return: user name or None if domain not found
    :rtype: str
    """
    sql = r"""
    SELECT DISTINCT `su`.`login`
    FROM `sys_users` `su`, `hosting` `h`, `domains` `d`, `domains` `sd`
    WHERE `h`.`sys_user_id`=`su`.`id` AND `h`.`dom_id`=`d`.`id`
          AND (`d`.`name`=%s OR `d`.`id`=`sd`.`webspace_id` AND `sd`.`name`=%s)"""
    users_list = [u[0] for u in query_sql(sql, (domain, domain))]
    # FIXME: how this possible?
    if len(users_list) > 1:
        raise DuplicateData(
            f"domain {domain} belongs to few users: [{','.join(users_list)}]"
        )
    if len(users_list) == 0:
        return None
    return users_list[0]


def dblogin_cplogin_pairs(cplogin_lst=None, with_system_users=False):
    raise NotSupported('Getting binding credentials in the database to the user name in the system is not currently '
                       'supported.')


def homedirs(_sysusers=None, _cpusers=None):
    """
    Detects and returns list of folders contained the home dirs of users of the Plesk

    :param str|None _sysusers: for testing
    :param str|None _cpusers: for testing
    :return: list of folders, which are parent of home dirs of users of the panel
    """
    homedirs = []

    if _cpusers is None:
        try:
            results = cpusers()
        except NoPackage:
            results = None
    else:
        results = _cpusers

    users = []
    if results is not None:
        users = [line[0] for line in results]

    # Plesk assumes MIN_UID as 10000
    clpwd = ClPwd(10000)
    users_dict = clpwd.get_user_dict()

    # for testing only
    if isinstance(_sysusers, (list, tuple)):
        class pw:
            def __init__(self, name, dir):
                self.pw_name = name
                self.pw_dir = dir

        users_dict = {}
        for (name, dir) in _sysusers:
            users_dict[name] = pw(name, dir)

    for user_name, user_data in users_dict.items():
        if len(users) and user_name not in users:
            continue
        homedir = os.path.dirname(user_data.pw_dir)
        if homedir not in homedirs:
            homedirs.append(homedir)

    return homedirs


def get_user_login_url(domain):
    return f'https://{domain}:8443'


def get_reseller_id_pairs():
    """
    Plesk has no user associated with reseller, but we need some id
    for out internal purposes. Let's take it from database.
    """
    sql = """SELECT clients.login, clients.id + %s FROM clients WHERE clients.type='reseller'"""
    return dict(query_sql(sql, data=[UID_MAX]))


def reseller_domains(resellername):
    # type: (str) -> Dict[str, str]
    if not resellername:
        return {}

    sql = r"""SELECT sys_users.login AS cplogin,
                     domains.name AS dns
                    FROM sys_users
                    JOIN hosting ON hosting.sys_user_id=sys_users.id
                    JOIN domains ON hosting.dom_id=domains.id AND domains.webspace_id=0
                    JOIN clients reseller ON reseller.id=domains.vendor_id
                    WHERE reseller.login=%s
                    """

    users = query_sql(sql, data=[resellername])
    return dict(users)


def _extract_xml_value(xml_string, key):
    """
    Plesk stores some information in simple xml formatted strings.
    """
    try:
        elem = ETree.fromstring(xml_string).find(key)
    except ETree.ParseError:
        return None
    else:
        return elem.text if elem is not None else None


def get_domains_php_info():
    """
    Plesk stores the information about the handler in xml format.
    Return the php version info for each domain.
    Example output:
        {'cltest.com': {'handler_type': 'fpm',
                         'php_version_id': 'plesk-php71-fpm',
                         'username': 'cltest'},`
         'cltest2.com': {'handler_type': 'fastcgi',
                          'php_version_id': 'x-httpd-lsphp-custom',
                          'username': 'kek_2'},
         'cltest3.com': {'handler_type': 'fastcgi',
                          'php_version_id': 'plesk-php56-fastcgi',
                          'username': 'cltest3'},
         'omg.kek': {'handler_type': 'fastcgi',
                      'php_version_id': 'plesk-php71-fastcgi',
                      'username': 'cltest'}}
    :rtype: dict[str, dict]
    """
    sql = r"""
    SELECT sys_users.login, d.name, h.php_handler_id, handlers.value
    FROM domains AS d
        JOIN hosting AS h
            ON h.dom_id=d.id
        JOIN sys_users
            ON h.sys_user_id=sys_users.id
        JOIN (SELECT ServiceNodeEnvironment.*
            FROM ServiceNodeEnvironment
            WHERE (serviceNodeId = '1' AND section = 'phphandlers')) AS handlers
                ON handlers.name=h.php_handler_id
    WHERE h.php='true'
        """

    # Php hanlder info xml example:
    #
    # <?xml version="1.0" encoding="UTF-8"?>
    # <handler>
    #    <id>plesk-php71-fpm</id>
    #    <type>fpm</type>
    #    <typeName>FPM application</typeName>
    #    <version>7.1</version>
    #    <fullVersion>7.1.22</fullVersion>
    #    <displayname>7.1.22</displayname>
    #    <path>/opt/plesk/php/7.1/sbin/php-fpm</path>
    #    <clipath>/opt/plesk/php/7.1/bin/php</clipath>
    #    <phpini>/opt/plesk/php/7.1/etc/php.ini</phpini>
    #    <custom>true</custom>
    #    <registered>true</registered>
    #    <service>plesk-php71-fpm</service>
    #    <poold>/opt/plesk/php/7.1/etc/php-fpm.d</poold>
    #    <outdated />
    # </handler>

    domains_php_info = query_sql(sql)
    # yep, vendor php_handler_id has only "fpm/cgi/fastcgi" w/o version, so additional bicycle needed
    vendor_version_ids = ['cgi', 'fastcgi', 'fpm', 'x-httpd-lsphp-custom']

    php_versions = {}
    for username, domain, php_handler_id, handler_xml in domains_php_info:
        display_version = php_handler_id if php_handler_id not in vendor_version_ids \
            else f'vendor-php{_extract_xml_value(handler_xml, "version")}'.replace('.', '')

        def _cast(handler_name: str, version_id: str) -> str:
            if handler_name == 'fpm':
                return 'php-fpm'
            elif 'x-httpd-lsphp' in version_id:
                return 'lsapi'

            return handler_name

        handler = _extract_xml_value(handler_xml, key='type') or 'unknown'
        handler = _cast(handler, php_handler_id)

        # transform different php variations into some normal form
        display_version = display_version\
            .replace('-dedicated', '')\
            .replace('-fpm', '')\
            .replace('-fastcgi', '')\
            .replace('x-httpd-lsphp-', 'alt-php')

        php_versions[domain] = DomainDescription(
            username=username,
            php_version_id=display_version,  # not a typo
            handler_type=handler,
            display_version=display_version
        )

    return php_versions


def get_main_username_by_uid(uid: int) -> str:
    """
    Get "main" panel username by uid.
    :param uid: uid
    :return Username or 'N/A' if user not found
    """
    if uid == 0:
        return 'root'
    try:
        _clpwd = ClPwd()
        pwd_list = _clpwd.get_pw_by_uid(uid)
        if os.geteuid() == 0:
            for user_pwd in pwd_list:
                username = user_pwd.pw_name
                try:
                    userdomains(username)
                    return username
                except NoPanelUser:
                    pass
        else:
            # Under user cycle implemented in suid binary, see scripts/plesk_suid_caller.py
            username = pwd_list[0].pw_name
            userdomains(username)
            return username
    except (NoPanelUser, ClPwd.NoSuchUserException):
        pass
    return 'N/A'


class PanelPlugin(GeneralPanelPluginV1):
    def __init__(self):
        super().__init__()
        self.HTTPD_MPM_CONFIG = '/etc/httpd/conf.modules.d/01-cgi.conf'
        # Defaults of MaxRequestWorkers for all possible mpm modules
        self.MPM_MODULES = {
            "prefork": 256,
            "worker": 400,
            "event": 400
        }
        # Vars for httpd modules caching
        self.httpd_modules_ts = 0
        self.httpd_modules = ""

    def getCPName(self):
        """
        Return panel name
        :return:
        """
        return __cpname__

    def get_cp_description(self):
        """
        Retrieve panel name and it's version
        :return: dict: { 'name': 'panel_name', 'version': 'panel_version', 'additional_info': 'add_info'}
            or None if can't get info
        """
        try:
            with open("/usr/local/psa/version", "r", encoding="utf-8") as f:
                out = f.read()
            return {'name': __cpname__, 'version': out.split()[0], 'additional_info': None}
        except Exception:
            return None

    def db_access(self):
        """
        Getting root access to mysql database.
        For example {'login': 'root', 'db': 'mysql', 'host': 'localhost', 'pass': '9pJUv38sAqqW'}

        :return: root access to mysql database
        :rtype: dict
        :raises: NoDBAccessData
        """
        return db_access()

    def cpusers(self):
        """
        Generates a list of cpusers registered in the control panel

        :return: list of cpusers registered in the control panel
        :rtype: tuple
        """
        return cpusers()

    def resellers(self):
        """
        Generates a list of resellers in the control panel

        :return: list of cpusers registered in the control panel
        :rtype: tuple
        """
        return resellers()

    def is_reseller(self, username):
        """
        Check if given user is reseller;
        :type username: str
        :rtype: bool
        """
        return is_reseller(username)

    def dblogin_cplogin_pairs(self, cplogin_lst=None, with_system_users=False):
        """
        Get mapping between system and DB users
        @param cplogin_lst :list: list with usernames for generate mapping
        @param with_system_users :bool: add system users to result list or no.
                                        default: False
        """
        return dblogin_cplogin_pairs(cplogin_lst, with_system_users)

    def cpinfo(self, cpuser=None, keyls=('cplogin', 'package', 'mail', 'reseller', 'dns', 'locale'),
               search_sys_users=True):
        """
        Retrieves info about panel user(s)
        :param str|unicode|list|tuple|None cpuser: user login
        :param keyls: list of data which is necessary to obtain the user,
                        the valuescan be:
           cplogin - name/login user control panel
           mail - Email users
           reseller - name reseller/owner users
           locale - localization of the user account
           package - User name of the package
           dns - domain of the user
        :param bool search_sys_users: search for cpuser in sys_users or in control panel users (e.g. for Plesk)
        :return: returns a tuple of tuples of data in the same sequence as specified keys in keylst
        :rtype: tuple
        """
        return cpinfo(cpuser, keyls, search_sys_users=search_sys_users)

    def get_admin_email(self):
        """
        Retrieve admin email address
        :return: Host admin's email
        """
        return get_admin_email()

    def docroot(self, domain):
        """
        Return document root for domain
        :param str|unicode domain:
        :return str: document root for domain
        """
        return docroot(domain)

    @staticmethod
    def useraliases(cpuser, domain):
        """
        Return aliases from user domain
        :param str|unicode cpuser: user login
        :param str|unicode domain:
        :return list of aliases
        """
        sql = """
        SELECT a.name, d.name
        FROM domains AS d
            INNER JOIN domain_aliases AS a
                ON a.dom_id = d.id
            INNER JOIN hosting AS h
                ON h.dom_id = d.id
            INNER JOIN sys_users AS su
                ON h.sys_user_id = su.id
        WHERE su.login = %s AND d.name = %s
        """
        return [item[0] for item in query_sql(sql, (cpuser, domain))]

    def userdomains(self, cpuser):
        """
        Return domain and document root pairs for control panel user
        first domain is main domain
        :param str|unicode cpuser: user login
        :return list of tuples (domain_name, documen_root)
        """
        return userdomains(cpuser)

    def homedirs(self):
        """
        Detects and returns list of folders contained the home dirs of users of the cPanel
        :return: list of folders, which are parent of home dirs of users of the panel
        """
        return homedirs()

    def reseller_users(self, resellername=None):
        """
        Return reseller users
        :param resellername: reseller name; autodetect name if None
        :return list[str]: user names list
        """
        return reseller_users(resellername)

    def reseller_domains(self, resellername=None):
        """
        Get dict[user, domain]
        :param resellername: reseller's name
        :rtype: dict[str, str|None]
        :raises DomainException: if cannot obtain domains
        """
        return reseller_domains(resellername)

    def get_user_login_url(self, domain):
        """
        Get login url for current panel;
        :type domain: str
        :rtype: str
        """
        return get_user_login_url(domain)

    def admins(self):
        """
        List all admins names in given control panel
        :return: list of strings
        """
        return admins()

    def get_reseller_id_pairs(self):
        """
        Plesk has no user associated with reseller, but we need some id
        for out internal purposes. Let's take it from database.
        """
        return get_reseller_id_pairs()

    def domain_owner(self, domain):
        """
        Return domain's owner
        :param domain: Domain/sub-domain/add-domain name
        :rtype: str
        :return: user name or None if domain not found
        """
        return domain_owner(domain)

    def get_domains_php_info(self):
        """
        Return php version information for each domain
        :return: domain to php info mapping
        :rtype: dict[str, dict]
        """
        return get_domains_php_info()

    def get_installed_php_versions(self):
        """
        Get the list of PHP version installed in panel in the form of
        'versionXY', for example alt-php56 or plesk-php80
        "Versions by OS vendor" in Plesk DB have names:
            - module
            - synced
        They are FILTERED from the list
        :return: list
        """
        sql = """
        SELECT ServiceNodeEnvironment.name, ServiceNodeEnvironment.value
        FROM ServiceNodeEnvironment
        WHERE (serviceNodeId = '1' AND section = 'phphandlers')
        """
        # handler list example:
        # ['alt-php-internal-cgi', 'alt-php44-cgi', 'alt-php44-fastcgi',
        # 'alt-php51-cgi', 'alt-php51-fastcgi', 'fpm', 'cgi',
        # 'fastcgi', 'x-httpd-lsphp-custom']
        query_result = query_sql(sql)
        ver_name_pattern = re.compile(r'^(alt-|plesk-)php+\d+', re.IGNORECASE)
        named_php_handlers = [item[0] for item in query_result if ver_name_pattern.match(item[0])]
        vendor_handler_names = ['cgi', 'fastcgi', 'fpm', 'x-httpd-lsphp-custom']
        named_php_handlers.extend([self._cast_to_vendor_name(name, xmlconfig)
                                   for name, xmlconfig in query_result
                                   if name in vendor_handler_names])
        versions_set = set('-'.join(item.split('-')[:2]) for item in named_php_handlers)

        php_description = []
        for php_name in versions_set:
            if php_name.startswith("alt-") or php_name.startswith("x-httpd-lsphp-"):
                php_root_dir = f'/opt/{php_name.replace("-", "/")}/'
                php_description.append(PHPDescription(
                    identifier=php_name,
                    version=f'{php_name[-2]}.{php_name[-1]}',
                    dir=os.path.join(php_root_dir),
                    modules_dir=os.path.join(php_root_dir, 'usr/lib64/php/modules/'),
                    bin=os.path.join(php_root_dir, 'usr/bin/php'),
                    ini=os.path.join(php_root_dir, 'link/conf/default.ini'),
                ))
            elif php_name.startswith("plesk-"):
                php_root_dir = f'/opt/plesk/php/{php_name[-2]}.{php_name[-1]}/'

                php_description.append(PHPDescription(
                    identifier=php_name,
                    version=f'{php_name[-2]}.{php_name[-1]}',
                    modules_dir=os.path.join(php_root_dir, 'lib64/php/modules/'),
                    dir=os.path.join(php_root_dir),
                    bin=os.path.join(php_root_dir, 'bin/php'),
                    ini=os.path.join(php_root_dir, 'etc/php.ini'),
                ))
            elif php_name.startswith("vendor-"):
                php_root_dir = '/'

                php_description.append(PHPDescription(
                    identifier=php_name,
                    version=f'{php_name[-2]}.{php_name[-1]}',
                    modules_dir=os.path.join(php_root_dir, 'usr/lib64/php/modules/'),
                    dir=os.path.join(php_root_dir),
                    bin=os.path.join(php_root_dir, 'bin/php'),
                    ini=os.path.join(php_root_dir, 'etc/php.ini'),
                ))
            else:
                # unknown php, skip
                continue
        return php_description

    def _cast_to_vendor_name(self, name, value):
        return f'vendor-php{_extract_xml_value(value, "version")}-{name}'.replace('.', '')

    def get_unsupported_cl_features(self) -> tuple[Feature, ...]:
        return (
            Feature.RUBY_SELECTOR,
            Feature.PYTHON_SELECTOR,
            Feature.NODEJS_SELECTOR,
        )

    def _get_active_apache_mpm_module(self) -> Optional[AnyStr]:
        """
        Determines active MPM module for Apache Web Server
        :return: apache_active_module_name
                apache_active_module_name: 'prefork', 'event', 'worker'
        """
        try:
            # Caching httpd output and refresh it only one time in hour
            if time.time() - self.httpd_modules_ts > 3600:
                self.httpd_modules = run_command(["httpd", "-M"])
                self.httpd_modules_ts = time.time()
        except (OSError, IOError, ExternalProgramFailed):
            self.httpd_modules = ""
            self.httpd_modules_ts = time.time()
        for mpm_module in self.MPM_MODULES:
            if f"mpm_{mpm_module}_module" in self.httpd_modules:
                return mpm_module
        return None

    def _get_max_request_workers_for_module(self, apache_module_name: str) \
            -> Tuple[int, str]:
        """
        Determine MaxRequestWorkers directive value for specified apache module
        Reads config file /etc/httpd/conf.modules.d/01-cgi.conf
        :param apache_module_name: Current apache's module name:
        'prefork', 'event', 'worker'
        :return: tuple (max_req_num, message)
            max_req_num - Maximum request apache workers number or 0 if error
            message - OK/Error message
        """
        try:
            return find_module_param_in_config(self.HTTPD_MPM_CONFIG,
                                               apache_module_name,
                                               'MaxRequestWorkers',
                                               self.MPM_MODULES[apache_module_name])
        except (OSError, IOError, IndexError, ValueError):
            return 0, format_exc()

    def get_apache_max_request_workers(self) -> Tuple[int, str]:
        """
        Get current maximum request apache workers from httpd's config
        :return: tuple (max_req_num, message)
            max_req_num - Maximum request apache workers number or 0 if error
            message - OK/Error message
        """
        apache_active_module = self._get_active_apache_mpm_module()
        if apache_active_module is None:
            return 0, "httpd service doesn't work or mpm modules are absent"
        return self._get_max_request_workers_for_module(apache_active_module)

    @staticmethod
    def get_main_username_by_uid(uid: int) -> str:
        """
        Get "main" panel username by uid.
        :param uid: uid
        :return Username
        """
        return get_main_username_by_uid(uid)

    @staticmethod
    def get_user_emails_list(username: str, domain: str):
        sql = f"""
        SELECT clients.email
        FROM clients
        WHERE clients.id = (
            SELECT domains.cl_id
            FROM domains
            WHERE domains.name = '{domain}')
        """
        query_result = query_sql(sql)
        return ','.join(item[0] for item in query_result)

    @staticmethod
    def panel_login_link(username):
        link = run_command(['/usr/sbin/plesk', 'login'])
        if not link:
            return ''
        # https://10.51.32.129/login?secret=RZ3NqTqneO0ZQgkIb-QKxyMZkvOgdAS0SGaNnAgN-nKyAYgc -> https://10.51.32.129/
        parsed = urlparse(link)
        return f'{parsed.scheme}://{parsed.netloc}/'

    @staticmethod
    def panel_awp_link(username):
        link = PanelPlugin.panel_login_link(username).rstrip("/")
        if len(link) == 0:
            return ''
        return f'{link}/modules/plesk-lvemanager/index.php/awp/index#/'

    def get_customer_login(self, username):
        """
        In some rare situations we need customer
        login instead of system user name.
        E.g. when communicating with WHMCS.

        This method resolves customer login by his system user name.
        """
        sql = r"""SELECT clients.login
            FROM sys_users
            JOIN hosting ON hosting.sys_user_id=sys_users.id
            JOIN domains ON hosting.dom_id=domains.id AND domains.webspace_id=0
            JOIN clients ON clients.id=domains.cl_id
            WHERE sys_users.login = %s"""

        customers = query_sql(sql, data=[username])
        try:
            return customers[0][0]
        except IndexError as e:
            raise NoPanelUser(f'Unknown user {username}') from e

    def get_domain_login(self, username, domain):
        """
        In some rare situations we need subscription
        login instead of client login.
        E.g. when communicating with WHMCS.

        This method resolves sys_users login by domain.

        One client can create several subscriptions
        Each subscription creates a new login in the sys_users table
        The user can create several domains for one subscription

        upgrade_url requires subscription login from sys_users.
        """
        sql = r"""SELECT sys_users.login
            FROM sys_users
            JOIN hosting ON hosting.sys_user_id=sys_users.id
            JOIN domains ON hosting.dom_id=domains.id AND domains.webspace_id=0 AND domains.name = %s"""

        logins = query_sql(sql, data=[domain])
        try:
            return logins[0][0]
        except IndexError as e:
            raise NoPanelUser(f'Unknown user for domain {domain}') from e

    def get_server_ip(self):
        sql = r"""
        SELECT ip_address FROM IP_Addresses
        WHERE main = 'true'
        """
        ip_addresses = query_sql(sql)
        try:
            return ip_addresses[0][0]
        except IndexError as e:
            raise NotSupported(
                'Unable to detect main ip for this server. '
                'Contact CloudLinux support and report the issue.'
            ) from e

    def suspended_users_list(self):
        """
        Returns list of suspended system users
        suspended means domain status == 2
        """
        sql = r"""
        SELECT su.login FROM sys_users su 
        JOIN hosting h ON su.id = h.sys_user_id 
        JOIN domains d ON h.dom_id = d.id 
        WHERE d.status = 2
        """
        suspended = query_sql(sql)
        return [item[0] for item in suspended]

Youez - 2016 - github.com/yon3zu
LinuXploit