Failed to save the file to the "xx" directory.

Failed to save the file to the "ll" directory.

Failed to save the file to the "mm" directory.

Failed to save the file to the "wp" directory.

403WebShell
403Webshell
Server IP : 66.29.132.124  /  Your IP : 3.21.105.222
Web Server : LiteSpeed
System : Linux business141.web-hosting.com 4.18.0-553.lve.el8.x86_64 #1 SMP Mon May 27 15:27:34 UTC 2024 x86_64
User : wavevlvu ( 1524)
PHP Version : 7.4.33
Disable Function : NONE
MySQL : OFF  |  cURL : ON  |  WGET : ON  |  Perl : ON  |  Python : ON  |  Sudo : OFF  |  Pkexec : OFF
Directory :  /opt/cloudlinux/venv/lib/python3.11/site-packages/clcommon/cpapi/plugins/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ Back ]     

Current File : /opt/cloudlinux/venv/lib/python3.11/site-packages/clcommon/cpapi/plugins/directadmin.py
# -*- coding: utf-8 -*-
"""
CloudLinux API for DirectAdmin control panel
"""

import glob
import os
import re
import subprocess
import sys
import syslog
from traceback import format_exc
from typing import Dict, List, Tuple  # NOQA
from urllib.parse import urlparse

import requests

from clcommon.clconfpars import (
    WebConfigMissing,
    WebConfigParsingError,
    apache_conf_parser,
    load_fast,
    nginx_conf_parser,
    read_unicode_file_with_decode_fallback,
)
from clcommon.clconfpars import load as loadconfig
from clcommon.clpwd import ClPwd
from clcommon.cpapi.cpapicustombin import (
    _docroot_under_user_via_custom_bin,
    get_domains_via_custom_binary,
)
from clcommon.cpapi.cpapiexceptions import (
    CpApiTypeError,
    NoDBAccessData,
    NoDomain,
    NoPanelUser,
    ParsingError,
    ReadFileError,
)
from clcommon.cpapi.GeneralPanel import (
    DomainDescription,
    GeneralPanelPluginV1,
    PHPDescription,
)
from clcommon.cpapi.plugins.universal import (
    get_admin_email as universal_get_admin_email,
)
from clcommon.features import Feature
from clcommon.utils import (
    ExternalProgramFailed,
    find_module_param_in_config,
    get_file_lines,
    grep,
)

__cpname__ = 'DirectAdmin'


DA_DIR = '/usr/local/directadmin'
DA_CONF = os.path.join(DA_DIR, 'conf/directadmin.conf')
DA_DATA_DIR = os.path.join(DA_DIR, 'data')
DA_DB_CONF = os.path.join(DA_DIR, 'conf/mysql.conf')
DA_USERS_PATH = os.path.join(DA_DATA_DIR, 'users')
DA_OPT_PATH = os.path.join(DA_DIR, 'custombuild', 'options.conf')
USER_CONF = 'user.conf'
DOMAINOWNERS = '/etc/virtual/domainowners'
ADMIN_DIR = os.path.join(DA_DATA_DIR, 'admin')
RESELLERS_LIST = os.path.join(ADMIN_DIR, 'reseller.list')
ADMINS_LIST = os.path.join(ADMIN_DIR, 'admin.list')
USER_PATTERN = re.compile(rf'.+/(.+)/{re.escape(USER_CONF)}')


# WARN: Probably will be deprecated for our "official" plugins.
# See pluginlib.detect_panel_fast()
def detect():
    return os.path.isfile('/usr/local/directadmin/directadmin') or \
           os.path.isfile('/usr/local/directadmin/custombuild/build')


def db_access():
    access = {}
    try:
        login_data = loadconfig(DA_DB_CONF)
        access['login'] = login_data['user']
        access['pass'] = login_data['passwd']
    except IOError as err:
        raise NoDBAccessData(
            'Can not open file with data to database access; ' + str(err)
        ) from err
    except KeyError as err:
        raise NoDBAccessData(
            f'Can not get database access data from file {DA_DB_CONF}'
        ) from err
    return access


def cpusers():
    match_list = [USER_PATTERN.match(path) for path in glob.glob(os.path.join(DA_USERS_PATH, '*', USER_CONF))]
    users_list = [match.group(1) for match in match_list if match]
    return tuple(users_list)


def resellers():
    with open(RESELLERS_LIST, encoding='utf-8') as f:
        resellers_list = [line.strip() for line in f]
    return tuple(resellers_list)


def admins():
    with open(ADMINS_LIST, encoding='utf-8') as f:
        admins_list = [line.strip() for line in f]
    return set(admins_list)


def dblogin_cplogin_pairs(cplogin_lst=None, with_system_users=False):
    from clcommon.cpapi.plugins.universal import _dblogin_cplogin_pairs  # pylint: disable=import-outside-toplevel
    access = db_access()
    data = _dblogin_cplogin_pairs(cplogin_lst=cplogin_lst, access=access)
    if with_system_users:
        data += tuple(get_da_user(DA_USERS_PATH).items())
    return data


def get_da_user(path, quiet=True):
    users = {}
    cur_dir = os.getcwd()
    os.chdir(path)
    dir_list = glob.glob('./*')

    for user_dir in dir_list:
        if os.path.isdir(user_dir):
            file_domains = path + '/' + user_dir + '/domains.list'
            try:
                with open(file_domains, encoding='utf-8') as f:
                    if len(f.readline()) > 0:
                        user_name = user_dir[2:]
                        users[user_name] = user_name
            except IOError:
                if not quiet:
                    sys.stderr.write("No file " + file_domains)
    os.chdir(cur_dir)
    return users


def cpinfo(cpuser=None, keyls=('cplogin', 'package', 'mail', 'reseller', 'dns', 'locale'), search_sys_users=True):
    returned = []
    if isinstance(cpuser, str):
        cpusers_list = [cpuser]
    elif isinstance(cpuser, (list, tuple)):
        cpusers_list = tuple(cpuser)
    elif cpuser is None:
        cpusers_list = cpusers()
    else:
        raise CpApiTypeError(funcname='cpinfo', supportedtypes='str|unicode|list|tuple',
                             received_type=type(cpuser).__name__)

    def _get_reseller(config):
        if config.get('usertype') == 'reseller':
            return config.get('username')
        return config.get('creator')

    _user_conf_map = {'cplogin': lambda config: config.get('username'),
                      'package': lambda config: config.get('package'),
                      'mail': lambda config: config.get('email'),
                      'reseller': lambda config: _get_reseller(config),
                      'dns': lambda config: config.get('domain'),
                      'locale': lambda config: config.get('language')}
    keyls_ = [_user_conf_map[key] for key in keyls]
    for username in cpusers_list:
        user_conf_file = os.path.join(DA_USERS_PATH, username, USER_CONF)
        if os.path.exists(user_conf_file):
            user_config = load_fast(user_conf_file)
            returned.append([key(user_config) for key in keyls_])
    return returned


def _docroot_under_root(domain):
    # type: (str) -> Tuple[str, str]
    """
    Old method for getting doc_root for domain under root
    Method reads DA config
    :return: (doc_root, username) cortege
    """

    user_name = None
    # Load /etc/virtual/domainowners
    _domain_to_user_map = _load_domains_owners()
    # Find supposed owner of domain
    for main_domain in list(_domain_to_user_map.keys()):
        if domain == main_domain or domain.endswith(f'.{main_domain}'):
            # Parent domain found
            user_name = _domain_to_user_map[main_domain]
            break
    if user_name is None:
        domains_list = []
    else:
        domains_list = userdomains(user_name)
    for d in domains_list:
        if domain in d:
            return d[1], user_name


def _docroot_under_user_old_mechanism(domain):
    # type: (str) -> Tuple[str, str]
    """
    Old method for getting doc_root for domain under user
    Method parses /home/<username>/domains directory
    :return: (doc_root, username) cortege
    """

    clpwd = ClPwd()
    user_pw = clpwd.get_pw_by_uid(os.getuid())[0]
    list_domains_and_doc_roots = _get_domains_list_as_user(user_pw.pw_dir)
    for domain_data in list_domains_and_doc_roots:
        if domain_data['server_name'] == domain:
            return domain_data['document_root'], user_pw.pw_name


def docroot(domain):
    # type: (str) -> Tuple[str, str]
    """
    Retrieves document root for domain
    :param domain: Domain to determine doc_root
    :return: Cortege: (doc_root, domain_user)
    """
    res = None
    domain = domain.strip()

    uid = os.getuid()
    euid = os.geteuid()
    if euid == 0 and uid == 0:
        res = _docroot_under_root(domain)
    else:
        res = _docroot_under_user_via_custom_bin(domain)

    # If there was successful result, res object will have
    # (doc_root, domain_user) format. If there wasn't found any correct
    # doc_roots, res will be None.
    if res is not None:
        return res
    raise NoDomain(f"Can't obtain document root for domain '{domain}'")


def _is_nginx_installed():
    """
    Check if nginx is installed via custombuild;
    """
    config = loadconfig(DA_CONF)
    return bool(int(config.get('nginx', 0)) or int(config.get('nginx_proxy', 0)))


def _get_domains_list_as_root(user_path):
    """
    Get domains list for user from httpd or nginx config as root
    :param user_path: path to DA directory of user's profile
    :return: parsed httpd or nginx config
    :rtype: list
    """

    try:
        if _is_nginx_installed():
            httpd_conf = nginx_conf_parser(os.path.join(user_path, 'nginx.conf'))
        else:
            httpd_conf = apache_conf_parser(os.path.join(user_path, 'httpd.conf'))
    except WebConfigParsingError as e:
        raise ParsingError(e.message) from e
    except WebConfigMissing:
        return []

    return httpd_conf


def _get_domains_list_as_user(user_home):
    # type: (str) -> List[Dict[str, str, bool]]
    """
    Get domains list for user from ~/domains directory as user.
    Method DOESN'T search subdomains, because it's almost impossible detect by user's
    folders without privileges escalation
    :param user_home: path to user home
    :return: list of dictionaries {'server_name': 'domain', 'document_root': 'doc_root', 'ssl': False}
    """

    domains_dir = 'domains'
    doc_root_dir = 'public_html'

    domains_list = []
    domains_path = os.path.join(user_home, domains_dir)
    # Searching main domains
    # All directories of main domains are saved in ~/domains directory
    for domain_dir in os.listdir(domains_path):
        domain_path = os.path.join(domains_path, domain_dir)
        doc_root_path = os.path.join(domains_path, domain_dir, doc_root_dir)
        if os.path.isdir(domain_path) and os.path.isdir(doc_root_path):
            domains_list.append({
                    'server_name': domain_dir,
                    'document_root': doc_root_path,
                    'ssl': False,
            })
        else:
            continue

    return domains_list


def userdomains(cpuser, as_root=False):
    # type: (str, bool) -> List[Tuple[str, str]]
    """
    Get user's domains list
    :return list: domain names
        Example:
        [('cltest1.com', '/home/cltest1/domains/cltest1.com/public_html'),
         ('mk.cltest1.com', '/home/cltest1/domains/cltest1.com/public_html/mk'),
         ('cltest11.com', '/home/cltest1/domains/cltest11.com/public_html')
         ]
    """
    domains_list = []
    user_path = os.path.join(DA_USERS_PATH, cpuser)
    euid = os.geteuid()

    # old method to get list of user's domains
    main_domain_path = ''
    if not os.path.exists(user_path):
        return []

    user_home = os.path.expanduser('~' + cpuser)
    public_path = os.path.join(user_home, 'public_html')
    if os.path.exists(public_path) and os.path.islink(public_path):
        main_domain_path = os.path.realpath(public_path)
    if euid == 0 or as_root:
        httpd_conf = _get_domains_list_as_root(user_path)
        for domain in httpd_conf:
            if domain['ssl'] is True:
                continue
            # Put main domain in start of list
            if domain['server_name'] in main_domain_path:
                domains_list.insert(0, (domain['server_name'], domain['document_root']))
            else:
                domains_list.append((domain['server_name'], domain['document_root']))
        return domains_list

    # this case works the same as above but through the rights escalation binary wrapper
    # call path: here -> binary -> python(diradmin euid) -> userdomains(as_root=True) -> print json result to stdout
    rc, res = get_domains_via_custom_binary()
    if rc == 0:
        return res
    elif rc == 11:
        raise NoPanelUser(f'User {cpuser} not found in the database')
    else:
        raise ExternalProgramFailed(f'Failed to get userdomains: {res}')


def homedirs():
    """
    Detects and returns list of folders contained the home dirs of users of the DirectAdmin
    :return: list of folders, which are parent of home dirs of users of the panel
    """
    home_dirs = set()
    clpwd = ClPwd()
    users_dict = clpwd.get_user_dict()

    for user_name, pw_user in list(users_dict.items()):
        conf_file = os.path.join(DA_USERS_PATH, user_name, USER_CONF)
        if os.path.exists(conf_file):
            home_dir = os.path.dirname(pw_user.pw_dir)
            home_dirs.add(home_dir)
    return list(home_dirs)


def domain_owner(domain):
    """
    Return domain's owner
    :param domain: Domain/sub-domain/add-domain name
    :return: user name or None if domain not found
    """
    return _load_domains_owners().get(domain, None)


@GeneralPanelPluginV1.cache_call(panel_parker=[DOMAINOWNERS])
def _load_domains_owners() -> Dict[str, str]:
    """
    Get domain<->user map from /etc/virtual/domainowners file
    """
    # 1. Load DA data file
    try:
        domains_lines = read_unicode_file_with_decode_fallback(DOMAINOWNERS).splitlines()
    except (OSError, IOError) as e:
        raise ReadFileError(str(e)) from e

    # 2. File loaded successfully, parse data and fill dictionaries
    _domain_to_user_map = {}
    for line_ in domains_lines:
        line_ = line_.strip()
        # pass empty line
        if not line_:
            continue
        domain_, user_ = line_.split(':')
        domain_ = domain_.strip()
        user_ = user_.strip()
        # Fill domain to user map
        _domain_to_user_map[domain_] = user_
    return _domain_to_user_map


def reseller_users(resellername):
    """
    Return list of reseller users
    :param resellername: reseller name; return empty list if None
    :return list[str]: user names list
    """
    if resellername is None:
        return []

    all_users_dict = ClPwd().get_user_dict()

    users_list_file = os.path.join(DA_USERS_PATH, resellername, 'users.list')
    try:
        with open(users_list_file, encoding='utf-8') as users_list:
            users_list = [item.strip() for item in users_list]
            users_list.append(resellername)
            # performing intersection to avoid returning non-existing users
            # that are still present in config file for some reason
            return list(set(all_users_dict) & set(users_list))
    except (IOError, OSError):
        return []


def reseller_domains(resellername=None):
    """
    Get pairs user <=> domain for given reseller;
    Empty list if cannot get or no users found;
    :type resellername: str
    :return list[tuple[str, str]]: tuple[username, main_domain]
    """
    if resellername is None:
        return []
    users = reseller_users(resellername)
    return dict(cpinfo(users, keyls=('cplogin', 'dns')))


def get_admin_email():
    admin_user_file = os.path.join(DA_USERS_PATH, 'admin', USER_CONF)
    cnf = loadconfig(admin_user_file)
    return cnf.get('email', universal_get_admin_email())


def is_reseller(username):
    """
    Check if given user is reseller;
    :type username: str
    :rtype: bool
    :raise: ParsingError, ReadFileError
    """
    user_config = os.path.join(DA_USERS_PATH, username, USER_CONF)
    if os.path.exists(user_config):
        try:
            return loadconfig(user_config)['usertype'] == 'reseller'
        except IndexError as e:
            raise ParsingError('User config exists, but no usertype given') from e
    return False


def get_user_login_url(domain):
    return f'http://{domain}:2222'


def _get_da_php_config():
    """
    Return map (PHP_DA_CODE:{PHP_HANDLER, PHP_VERSION})
    :return:
    """
    _php_da_map = {}
    try:
        php_cfg = loadconfig(DA_OPT_PATH)
    except (IOError, OSError):
        return None

    # iterate through custombuild options.conf php_mode and php_release options
    i = 1
    while f'php{i}_mode' in php_cfg and f'php{i}_release' in php_cfg:
        _php_da_map[str(i)] = {}
        _php_da_map[str(i)]['handler_type'] = php_cfg[f'php{i}_mode']
        _php_da_map[str(i)]['php_version_id'] = php_cfg[f'php{i}_release']
        i += 1
    return _php_da_map


def _get_php_code_info_for_domain(domain, owner):
    """
    Return php code from domain config
    :param domain:
    :param owner:
    :return: string '1' or '2' - php code in DA
    """
    domain_config_file = os.path.join(DA_USERS_PATH, str(owner), 'domains', str(domain) + '.conf')
    try:
        domain_config = loadconfig(domain_config_file)
    except (IOError, OSError):
        return '1'
    domain_php = domain_config.get('php1_select')
    # None - DA custombuild has only one php version
    # '0' - it means that user selected default version PHP of DA custombuild
    if domain_php is None or domain_php == '0':
        domain_php = '1'
    return domain_php


def _get_subdomains(all_domains, mapped_all_domains):
    subdomains = []
    for domain in all_domains:
        if domain[0] in mapped_all_domains.keys():
            continue
        subdomains.append(domain[0])
    return subdomains


def get_domains_php_info():
    """
    Return php version information for each domain
    :return: domain to php info mapping
    Example output:
    {'cltest.com': {'handler_type': 'mod_php',
                     'php_version_id': '7.1',
                     'username': 'cltest'},
     'cltest2.com': {'handler_type': 'fastcgi',
                      'php_version_id': '7.3',
                      'username': 'kek_2'},
     'cltest3.com': {'handler_type': 'suphp',
                      'php_version_id': '5.5',
                      'username': 'cltest3'},
     'omg.kek': {'handler_type': 'php-fpm',
                  'php_version_id': '5.2',
                  'username': 'cltest'}}
    :rtype: dict[str, dict]
    """
    # returns only main domains
    map_domain_user = _load_domains_owners()
    result_map = {}
    php_da_map = _get_da_php_config()
    if php_da_map is None:
        return result_map

    owner_to_domains: dict[str, list[str]] = {}
    for domain, owner in map_domain_user.items():
        owner_to_domains.setdefault(owner, []).append(domain)

    for owner, domains in owner_to_domains.items():
        all_domains_in_httpd_file = userdomains(owner)

        # get safely to not break something to other teams
        try:
            subdomains = _get_subdomains(all_domains_in_httpd_file, map_domain_user)
        except Exception:
            subdomains = []

        for domain in domains:
            php_info_code = _get_php_code_info_for_domain(domain, owner)
            if php_info_code not in php_da_map \
                    or php_da_map[php_info_code]['php_version_id'] == 'no':
                # 'no' means that php_release specified in user's config
                # does not exist in custombuild options.conf
                php_info_code = '1'

            php_info = php_da_map[php_info_code]

            try:
                domain_aliases = _useraliases(owner, domain)
            except Exception:
                domain_aliases = []

            # https://forum.directadmin.com/threads/sub-domain-different-php-version.58426/
            # subdomain version should be the same as main domain
            for domain_entity in [domain] + subdomains + domain_aliases:
                result_map[domain_entity] = DomainDescription(
                    username=owner,
                    php_version_id=php_info['php_version_id'],
                    handler_type=php_info['handler_type'],
                    display_version=f'php{php_info["php_version_id"].replace(".", "")}'
                )

    return result_map


def _get_installed_alt_php_versions():
    """
    Gets installed alt-phpXY - could be chosen via CloudLinux PHP Selector w/o being compiled via custombuild
    """
    installed_list = []
    alt_phps_directory = '/opt/alt/'
    pattern = re.compile(r'^php\d+$')

    for item in os.listdir(alt_phps_directory):
        item_path = os.path.join(alt_phps_directory, item)

        # Check if the item is a directory and its name matches the pattern
        if os.path.isdir(item_path) and pattern.match(item) and os.path.exists(f'{item_path}/usr/bin/php'):
            version = item.replace('php', '')
            installed_list.append(PHPDescription(
                identifier=f'alt-{item}',
                version=f'{version[:1]}.{version[1:]}',
                dir=f'{item_path}/',
                modules_dir=os.path.join(item_path, 'usr/lib64/php/modules/'),
                bin=os.path.join(item_path, 'usr/bin/php'),
                ini=os.path.join(item_path, 'link/conf/default.ini'),
            ))
    return installed_list


def _get_da_php_extension_dir(directadmin_php_dir):
    return subprocess.run(
        [f'{directadmin_php_dir}bin/php-config', '--extension-dir'],
        text=True,
        capture_output=True,
        check=False,
    ).stdout


def _get_compiled_custombuild_versions():
    """
    Gets compiled phpXY - could be chosen via DirectAdmin PHP Selector
    """
    php_da_map = _get_da_php_config()
    if php_da_map is None:
        return []

    # {'1': {'handler_type': 'php-fpm', 'php_version_id': '7.4'},
    # '2': {'handler_type': 'php-fpm', 'php_version_id': '8.0'},
    # '3': {'handler_type': 'php-fpm', 'php_version_id': 'no'},
    # '4': {'handler_type': 'php-fpm', 'php_version_id': 'no'}}

    installed_php_data = php_da_map.values()
    installed_list = []

    # obtain php version compiled via custombuild: phpXY
    for version_info in installed_php_data:
        version = version_info['php_version_id']

        if version == 'no':
            continue

        directadmin_php_dir = f'/usr/local/php{version.replace(".", "")}/'

        if not os.path.exists(directadmin_php_dir):
            continue

        modules_dir_path = _get_da_php_extension_dir(directadmin_php_dir)
        if modules_dir_path:
            modules_dir_path = modules_dir_path.strip()

        installed_list.append(PHPDescription(
            identifier=f'php{version.replace(".", "")}',
            version=version,
            dir=os.path.join(directadmin_php_dir),
            modules_dir=modules_dir_path,
            bin=os.path.join(directadmin_php_dir, 'bin/php'),
            ini=os.path.join(directadmin_php_dir, 'lib/php.ini'),
        ))
    return installed_list


def _get_aliases(path):
    """
    Parse user aliases file and return data
    """
    if not os.path.exists(path):
        return []
    data = []
    try:
        with open(path, encoding='utf-8') as f:
            data = f.readlines()
    except IOError as e:
        syslog.syslog(syslog.LOG_WARNING, f'Can`t open file "{path}" due to : "{e}"')
    return [record.strip().split('=')[0] for record in data]

def _useraliases(cpuser, domain):
    """
    Return aliases from user domain
    :param str|unicode cpuser: user login
    :param str|unicode domain:
    :return list of aliases
    """
    path = f'/usr/local/directadmin/data/users/{cpuser}/domains/{domain}.pointers'
    data = _get_aliases(path)
    return data


class PanelPlugin(GeneralPanelPluginV1):
    HTTPD_CONFIG_FILE = '/etc/httpd/conf/httpd.conf'
    HTTPD_MPM_CONFIG = '/etc/httpd/conf/extra/httpd-mpm.conf'
    HTTPD_INFO_CONFIG = '/etc/httpd/conf/extra/httpd-info.conf'

    def __init__(self):
        super().__init__()
        self.ADMINS_LIST = os.path.join(ADMIN_DIR, 'admin.list')

    def getCPName(self):
        """
        Return panel name
        :return:
        """
        return __cpname__

    def get_cp_description(self):
        """
        Retrieve panel name and it's version
        :return: dict: { 'name': 'panel_name', 'version': 'panel_version', 'additional_info': 'add_info'}
            or None if can't get info
        """
        try:
            with subprocess.Popen(
                ['/usr/local/directadmin/directadmin', 'v'],
                stdout=subprocess.PIPE,
                stderr=subprocess.PIPE,
                text=True
            ) as p:
                out, _ = p.communicate()

            # output may differ (depending on version):
            # 'Version: DirectAdmin v.1.642'
            # 'DirectAdmin v.1.643 55acaa256ec6ed99b9aaec1050de793b298f62b0'
            # 'DirectAdmin 1.644 55acaa256ec6ed99b9aaec1050de793b298f62b0'
            version_words = (word.lstrip('v.') for word in out.split())

            def _is_float(s):
                return s.replace('.', '').isdigit()

            version = next(filter(_is_float, version_words), '')
            return {'name': __cpname__, 'version': version, 'additional_info': None}
        except Exception:
            return None

    def db_access(self):
        """
        Getting root access to mysql database.
        For example {'login': 'root', 'db': 'mysql', 'host': 'localhost', 'pass': '9pJUv38sAqqW'}

        :return: root access to mysql database
        :rtype: dict
        :raises: NoDBAccessData
        """
        return db_access()

    def cpusers(self):
        """
        Generates a list of cpusers registered in the control panel

        :return: list of cpusers registered in the control panel
        :rtype: tuple
        """
        return cpusers()

    def resellers(self):
        """
        Generates a list of resellers in the control panel

        :return: list of cpusers registered in the control panel
        :rtype: tuple
        """
        return resellers()

    def is_reseller(self, username):
        """
        Check if given user is reseller;
        :type username: str
        :rtype: bool
        """
        return is_reseller(username)

    # unlike admins(), this method works fine in post_create_user
    # hook; looks like directadmin updates admins.list a little bit later
    # then calls post_create_user.sh
    def is_admin(self, username):
        """
        Return True if username is in admin names
        :param str username: user to check
        :return: bool
        """
        user_conf_file = os.path.join(DA_USERS_PATH, username, USER_CONF)
        if not os.path.exists(user_conf_file):
            return False
        user_config = load_fast(user_conf_file)
        return user_config['usertype'] == 'admin'

    def dblogin_cplogin_pairs(self, cplogin_lst=None, with_system_users=False):
        """
        Get mapping between system and DB users
        @param cplogin_lst :list: list with usernames for generate mapping
        @param with_system_users :bool: add system users to result list or no.
                                        default: False
        """
        return dblogin_cplogin_pairs(cplogin_lst, with_system_users)

    def cpinfo(self, cpuser=None, keyls=('cplogin', 'package', 'mail', 'reseller', 'dns', 'locale'),
               search_sys_users=True):
        """
        Retrieves info about panel user(s)
        :param str|unicode|list|tuple|None cpuser: user login
        :param keyls: list of data which is necessary to obtain the user,
                        the valuescan be:
           cplogin - name/login user control panel
           mail - Email users
           reseller - name reseller/owner users
           locale - localization of the user account
           package - User name of the package
           dns - domain of the user
        :param bool search_sys_users: search for cpuser in sys_users or in control panel users (e.g. for Plesk)
        :return: returns a tuple of tuples of data in the same sequence as specified keys in keylst
        :rtype: tuple
        """
        return cpinfo(cpuser, keyls, search_sys_users=search_sys_users)

    def get_admin_email(self):
        """
        Retrieve admin email address
        :return: Host admin's email
        """
        return get_admin_email()

    def docroot(self, domain):
        """
        Return document root for domain
        :param str|unicode domain:
        :return Cortege: (document_root, owner)
        """
        return docroot(domain)

    @staticmethod
    def useraliases(cpuser, domain):
        return _useraliases(cpuser, domain)

    def userdomains(self, cpuser):
        """
        Return domain and document root pairs for control panel user
        first domain is main domain
        :param str|unicode cpuser: user login
        :return list of tuples (domain_name, documen_root)
        """
        return userdomains(cpuser)

    def homedirs(self):
        """
        Detects and returns list of folders contained the home dirs of users of the cPanel
        :return: list of folders, which are parent of home dirs of users of the panel
        """
        return homedirs()

    def reseller_users(self, resellername=None):
        """
        Return reseller users
        :param resellername: reseller name; autodetect name if None
        :return list[str]: user names list
        """
        return reseller_users(resellername)

    def reseller_domains(self, resellername=None):
        """
        Get dict[user, domain]
        :param reseller_name: reseller's name
        :rtype: dict[str, str|None]
        :raises DomainException: if cannot obtain domains
        """
        return reseller_domains(resellername)

    def get_user_login_url(self, domain):
        """
        Get login url for current panel;
        :type domain: str
        :rtype: str
        """
        return get_user_login_url(domain)

    def admins(self):
        """
        List all admins names in given control panel
        :return: list of strings
        """
        return admins()

    def domain_owner(self, domain):
        """
        Return domain's owner
        :param domain: Domain/sub-domain/add-domain name
        :rtype: str
        :return: user name or None if domain not found
        """
        return domain_owner(domain)

    def get_domains_php_info(self):
        """
        Return php version information for each domain
        :return: domain to php info mapping
        :rtype: dict[str, dict]
        """
        return get_domains_php_info()

    @staticmethod
    def _get_da_skin_name():
        """
        Retrieve current DA skin name
        :return: Current DA skin name. None if unknown
        """
        config = loadconfig(DA_CONF)

        # starting from DA 1.664 `docsroot` option was replaced by `system_skin`
        if 'system_skin' in config:
            return config['system_skin']

        # grep '^docsroot=' /usr/local/directadmin/conf/directadmin.conf | cut -d/ -f4
        docsroot = config.get('docsroot', None)
        # docsroot like './data/skins/evolution'
        if docsroot is None:
            return None
        return docsroot.split('/')[-1]

    @staticmethod
    def get_encoding_name():
        """
        Retrieve encoding name, used for package/reseller names
        :return:
        """
        enhanced_skin_config = os.path.join(DA_DIR, "data/skins/enhanced/lang/en/lf_standard.html")
        default_encoding = 'utf8'
        current_skin = PanelPlugin._get_da_skin_name()
        if current_skin == 'enhanced':
            # For enchanced skin we read encoding from its config
            # :LANG_ENCODING=iso-8859-1  see LU-99 for more info
            skin_config = loadconfig(enhanced_skin_config)
            # Option in file is 'LANG_ENCODING', but key is lowercase
            return skin_config.get('lang_encoding', default_encoding)
        return default_encoding

    def get_unsupported_cl_features(self) -> tuple[Feature, ...]:
        return (
            Feature.RUBY_SELECTOR,
        )

    @staticmethod
    def get_apache_ports_list() -> List[int]:
        """
        Retrieves active httpd's ports from httpd's config
        :return: list of apache's ports
        """
        # cat /etc/apache2/conf/httpd.conf | grep Listen
        _httpd_ports_list = []
        try:
            lines = get_file_lines(PanelPlugin.HTTPD_CONFIG_FILE)
        except (OSError, IOError):
            return None
        lines = [line.strip() for line in lines]
        for line in grep('Listen', match_any_position=False, multiple_search=True, data_from_file=lines):
            # line examples:
            # Listen 0.0.0.0:80
            # Listen [::]:80
            try:
                value = int(line.split(' ')[1])
                if value not in _httpd_ports_list:
                    _httpd_ports_list.append(value)
            except (IndexError, ValueError):
                pass
        if not _httpd_ports_list:
            _httpd_ports_list.append(80)
        return _httpd_ports_list

    @staticmethod
    def _get_active_web_server_params() -> Tuple[str, str]:
        """
        Determines active web server from options.conf, directive 'webserver'
        :return: tuple (active_web_server_name, apache_active_module_name)
            active_web_server_name: 'apache', 'nginx', 'nginx_apache', 'litespeed', 'openlitespeed', etc
            apache_active_module_name: 'prefork', 'event', 'worker'
            (None, None) if DA options.conf read/parse error
        """
        web_server_name = None
        apache_active_module_name = None
        try:
            # cat /usr/local/directadmin/custombuild/options.conf | grep webserver
            # webserver=apache
            #   webserver can be: apache, nginx, nginx_apache, litespeed, openlitespeed.
            options_lines = get_file_lines(DA_OPT_PATH)
            grep_result_list = list(grep('^apache_mpm|^webserver', fixed_string=False, match_any_position=False,
                                         multiple_search=True, data_from_file=options_lines))
            # grep_result_list example: ['webserver=apache\n', 'apache_mpm=auto\n']
            for line in grep_result_list:
                line_parts = line.strip().split('=')
                if line_parts[0] == 'webserver':
                    web_server_name = line_parts[1]
                if line_parts[0] == 'apache_mpm':
                    apache_active_module_name = line_parts[1]
                    # modules are 'prefork', 'event', 'worker'. 'auto' == 'worker'
                    if apache_active_module_name == 'auto':
                        apache_active_module_name = 'worker'
        except (OSError, IOError, IndexError):
            pass
        return web_server_name, apache_active_module_name

    def _get_max_request_workers_for_module(self, apache_module_name: str) -> Tuple[int, str]:
        """
        Determine MaxRequestWorkers directive value for specified apache module.
        Reads config file /etc/httpd/conf/extra/httpd-mpm.conf
        :param apache_module_name: Current apache's module name: 'prefork', 'event', 'worker'
        :return: tuple (max_req_num, message)
            max_req_num - Maximum request apache workers number or 0 if error
            message - OK/Error message
        """
        try:
            return find_module_param_in_config(self.HTTPD_MPM_CONFIG,
                                               apache_module_name,
                                               'MaxRequestWorkers')
        except (OSError, IOError, IndexError, ValueError):
            return 0, format_exc()

    def get_apache_max_request_workers(self) -> Tuple[int, str]:
        """
        Get current maximum request apache workers from httpd's config
        :return: tuple (max_req_num, message)
            max_req_num - Maximum request apache workers number or 0 if error
            message - OK/Error message
        """
        web_server_name, apache_active_module_name = self._get_active_web_server_params()
        if web_server_name is None or apache_active_module_name is None:
            return 0, f"There was error during read/parse {DA_OPT_PATH}. Apache collector will not work"
        if web_server_name != "apache":
            return 0, f"DA is configured for web server '{web_server_name}'; but 'apache' is needed. " \
                      "Apache collector will not work"
        return self._get_max_request_workers_for_module(apache_active_module_name)

    def _get_httpd_status_uri(self) -> str:
        """
        Determine apache mod_status URI from /etc/httpd/conf/extra/httpd-info.conf config
        :return Apache mod_status URI or None if error/not found
        """
        location_uri = None
        try:
            # # grep -B 2 'SetHandler server-status' /etc/httpd/conf/extra/httpd-info.conf
            #
            # <Location /server-status>
            #     SetHandler server-status
            info_lines = get_file_lines(self.HTTPD_INFO_CONFIG)
            location_directive = '<Location'
            location_line = None
            for line in info_lines:
                line = line.strip()
                if line.startswith(location_directive):
                    # Location directive found, save it
                    location_line = line
                    continue
                if line.startswith('SetHandler server-status') and location_line:
                    # server-status found, Extract URI from Location directive start tag
                    location_uri = location_line.replace(location_directive, '').replace('>', '').strip()
                    break
        except (OSError, IOError):
            pass
        return location_uri

    def get_apache_connections_number(self):
        """
        Retrieves Apache's connections number (from apache's mod_status)
        :return: tuple (conn_num, message)
            conn_num - current connections number, 0 if error
            message - OK/Trace
        """
        web_server_name, _ = self._get_active_web_server_params()
        if web_server_name is None:
            return 0, f"There was error during read/parse {DA_OPT_PATH}. Apache collector will not work"
        if web_server_name != "apache":
            return 0, f"DA is configured for web server '{web_server_name}'; but 'apache' is needed. " \
                      "Apache collector will not work"
        try:
            # curl localhost/server-status?auto | grep "Total Accesses"
            # Total Accesses: 25
            location_uri = self._get_httpd_status_uri()
            if location_uri is None:
                return 0, "Can't found mod_status URI in configs"
            url = f'http://127.0.0.1{location_uri}?auto'
            response = requests.get(url, timeout=5)
            if response.status_code != 200:
                return 0, f"GET {url} response code is {response.status_code}"
            s_response = response.content.decode('utf-8')
            s_response_list = s_response.split('\n')
            out_list = list(grep("Total Accesses", data_from_file=s_response_list))
            # out_list example: ['Total Accesses: 200']
            s_total_accesses = out_list[0].split(':')[1].strip()
            return int(s_total_accesses), 'OK'
        except Exception:
            return 0, format_exc()

    @staticmethod
    def get_installed_php_versions():
        """
        Returns installed alt-php(s) on the server
        compiled phpXY via custombuild and alt-phpXY has different paths

        also user could choose version via PHP selector which was not compiled with custombuild
        (will be absent in DA configs)
        """
        return _get_installed_alt_php_versions() + _get_compiled_custombuild_versions()

    def get_server_ip(self):
        ip_list_file = '/usr/local/directadmin/data/admin/ip.list'
        if not os.path.exists(ip_list_file):
            return ''

        with open(ip_list_file, encoding='utf-8') as f:
            ips = f.readlines()

        if not ips:
            return ''

        return ips[0].strip()

    @staticmethod
    def get_user_emails_list(username: str, domain: str):
        user_conf = f'/usr/local/directadmin/data/users/{username}/user.conf'
        if not os.path.exists(user_conf):
            return ''

        user_conf = load_fast(user_conf)
        return user_conf.get('email', '')

    @staticmethod
    def panel_login_link(username):
        generated_login = subprocess.run(['/usr/local/directadmin/directadmin',
                                          '--create-login-url', f'user={username}'],
                                         capture_output=True, text=True, check=False).stdout
        # http://server-206-252-237-2.da.direct:2222/api/login/url?key=0SrJm1CNAIh34w4Fk8Kp4ohypUFp_pMm
        if len(generated_login) == 0:
            return ''
        parsed = urlparse(generated_login)
        return f'{parsed.scheme}://{parsed.netloc}/'

    @staticmethod
    def panel_awp_link(username):
        link = PanelPlugin.panel_login_link(username).rstrip("/")
        if len(link) == 0:
            return ''
        return f'{link}/evo/user/plugins/awp#/'

    def suspended_users_list(self):
        all_users = cpusers()
        suspended_users = []
        for user in all_users:
            user_conf_file = os.path.join(DA_USERS_PATH, user, USER_CONF)

            if not os.path.exists(user_conf_file):
                continue

            user_config = load_fast(user_conf_file)
            if user_config.get('suspended') == 'yes':
                suspended_users.append(user)

        return suspended_users

Youez - 2016 - github.com/yon3zu
LinuXploit