Failed to save the file to the "xx" directory.

Failed to save the file to the "ll" directory.

Failed to save the file to the "mm" directory.

Failed to save the file to the "wp" directory.

403WebShell
403Webshell
Server IP : 66.29.132.124  /  Your IP : 18.118.7.18
Web Server : LiteSpeed
System : Linux business141.web-hosting.com 4.18.0-553.lve.el8.x86_64 #1 SMP Mon May 27 15:27:34 UTC 2024 x86_64
User : wavevlvu ( 1524)
PHP Version : 7.4.33
Disable Function : NONE
MySQL : OFF  |  cURL : ON  |  WGET : ON  |  Perl : ON  |  Python : ON  |  Sudo : OFF  |  Pkexec : OFF
Directory :  /opt/cloudlinux/venv/lib/python3.11/site-packages/clcommon/cpapi/GeneralPanel/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ Back ]     

Current File : /opt/cloudlinux/venv/lib/python3.11/site-packages/clcommon/cpapi/GeneralPanel/general_panel.py
# -*- coding: utf-8 -*-

# Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2021 All Rights Reserved
#
# Licensed under CLOUD LINUX LICENSE AGREEMENT
# http://cloudlinux.com/docs/LICENSE.TXT

# General class, implementing common methods, using all cpapi plugins by default

import json
import os
import subprocess
import collections

from pwd import getpwuid

from clcommon.features import (
    Feature,
)
from clcommon.cpapi.cpapiexceptions import (
    NotSupported,
    CPAPIExternalProgramFailed
)
from clcommon.lock import acquire_lock


def _not_supported(func):
    def _exception(*a, **kw):
        raise NotSupported(f'"{func.__name__}" api not supported')
    return _exception


GET_CP_PACKAGE_SCRIPT = '/usr/bin/getcontrolpaneluserspackages'
PANEL_USERS_COUNT_FILE = '/var/lve/panel_users_count'

CPAPI_CACHE_STORAGE = '/var/clcpapi'


class GeneralPanelPluginV1:

    def __init__(self):
        self._custom_script_name = GET_CP_PACKAGE_SCRIPT


    def invalidate_cpapi_cache(self):
        pass

    @staticmethod
    def is_cache_valid(cpapi_cache, panel_markers):
        for marker in panel_markers:
            if not os.path.exists(marker):
                return False
        if not os.path.exists(cpapi_cache):
            return False

        for marker in panel_markers:
            # if at least 1 marker is older -> cache is invalid
            if os.path.getmtime(marker) > os.path.getmtime(cpapi_cache):
                return False
        return True

    @staticmethod
    def rewrite_cpapi_cache(actual_data, cache_file):
        try:
            with acquire_lock(cache_file + '.lock'):
                with open(cache_file, 'w', encoding='utf-8', errors='surrogateescape') as f:
                    json.dump({'data': actual_data}, f, indent=4)
        except PermissionError:
            # it's ok if we cannot update cache because we run as user/mockbuild
            pass

    @staticmethod
    def cache_call(**decorator_kwargs):
        def decorator(func):
            def wrapper(*args, **kwargs):
                cache_file = os.path.join(CPAPI_CACHE_STORAGE, func.__name__ + ".cache")
                cache_valid = GeneralPanelPluginV1.is_cache_valid(cache_file, decorator_kwargs['panel_parker'])
                if os.path.exists(cache_file) and cache_valid:
                    try:
                        with open(cache_file, "r", encoding='utf-8', errors='surrogateescape') as f:
                            data = json.load(f)['data']
                    except Exception:
                        # fallback to get data via api call and re-write broken json
                        data = func(*args, **kwargs)
                        GeneralPanelPluginV1.rewrite_cpapi_cache(data, cache_file)
                else:
                    data = func(*args, **kwargs)
                    GeneralPanelPluginV1.rewrite_cpapi_cache(data, cache_file)
                return data

            wrapper.__cached_func__ = func
            return wrapper
        return decorator

    def getCPName(self):
        """
        Return panel name
        :rtype: str
        :return: Name of panel
        """
        return "GeneralPanel"

    def _run_long_script(self, args):
        """
        Just wraps long script calls.
        :param args: arguments to pass
        :return: stdout, stderr
        """
        with subprocess.Popen(
            [self._custom_script_name] + args,
            stdout=subprocess.PIPE,
            stderr=subprocess.PIPE,
            text=True,
        ) as p:
            out, err = p.communicate()
            returncode = p.returncode

        return out, err, returncode

    def admin_packages(self, raise_exc=False):
        """
        Return list of available admin's packages
        :param raise_exc: raise exception on exit code != 0
        :return: List of packages. For example
            ['BusinessPackage', 'Package2']
        """
        packages_list = []
        stdout, stderr, returncode = self._run_long_script(['--list-packages'])
        if raise_exc and returncode != 0:
            raise CPAPIExternalProgramFailed(
                stderr or f'Failed to get information about packages: {stdout}')
        for line in stdout.strip().split('\n'):
            if line != '':
                packages_list.append(line)
        return packages_list

    def get_reseller_users(self, reseller):
        reseller_users = {}
        out, _, _ = self._run_long_script(['--list-reseller-users=' + str(reseller)])
        for line in out.strip().split('\n'):
            if ',' not in line:
                continue
            line = line.split(',')
            reseller_users[int(line[0])] = {
                'package': line[1],
                'reseller': reseller}
        return reseller_users

    def get_uids_list_by_package(self, package_name, reseller_name=None):
        """
        Retrieves uid list for package
        :param package_name: Package name
        :param reseller_name: Reseller name. None for admin's package
        :rtype: List
        :return: List of uids
            Example: [1000, 1002, 1006, 1007, 1008]
        """
        uid_list = []
        try:
            args = ['--package=' + str(package_name)]
            if reseller_name is not None:
                args.append('--reseller='+str(reseller_name))
            stdout, _, _ = self._run_long_script(args)
            uid_list = stdout.split('\n')
            del uid_list[len(uid_list) - 1]
        except (OSError, IOError, AttributeError):
            pass
        return uid_list

    def list_all(self, raise_exc=False):
        uid_package_map = {}
        out, _, returncode = self._run_long_script(['--list-all'])
        if raise_exc and returncode != 0:
            raise CPAPIExternalProgramFailed(
                f"Failed to get list of users and their packages: {out}")
        # if script prints error - skip output processing
        if 'error:' not in out:
            for line in out.split('\n'):
                line = line.split(' ', 1)
                if len(line) == 2:
                    uid_package_map[int(line[0])] = line[1]
        return uid_package_map

    def list_users(self, raise_exc=False):
        users = {}
        out, err, returncode = self._run_long_script(['--list-users'])
        if raise_exc and returncode != 0:
            raise CPAPIExternalProgramFailed(
                err or f'Failed to get information about users: {out}')
        for line in out.strip().split('\n'):
            if ',' not in line:
                continue
            line = line.split(',')
            users[int(line[0])] = {'package': line[1], 'reseller': line[2]}
        return users

    def resellers_packages(self, raise_exc=False):
        """
        Return dictionary, contains available resellers packages, grouped by resellers
        :return: Dictionary.
            Example:
            {'res1': ['BusinessPackage', 'UltraPackage', 'Package'],
             'res2': ['SimplePackage', 'Package'] }
        """
        resellers_packages = collections.defaultdict(list)
        out, err, returncode = self._run_long_script(['--list-resellers-packages'])
        if raise_exc and returncode != 0:
            raise CPAPIExternalProgramFailed(
                err or f'Failed to get information about reseller package: {out}')
        # packages_users output format:
        # {'res1': ['BusinessPackage', 'UltraPackage', 'Package'],
        #              'res2': ['SimplePackage', 'Package'] }
        lines = out.split('\n')
        for line in lines:
            line = line.strip()
            # Pass empty and invalid lines
            if not line:
                continue
            # 0 - reseller_name, 1 - package_name
            line_parts = line.split(' ', 1)
            if len(line_parts) != 2:
                continue
            res_name, pack_name = line_parts[0], line_parts[1]
            resellers_packages[res_name].append(pack_name)
        return resellers_packages

    def reseller_package_by_uid(self, user_id):
        # Get package for user
        out, _, _ = self._run_long_script(['--userid=' + str(user_id)])
        package = out.split('\n').pop(0)
        out, _, _ = self._run_long_script(['--get-user-reseller=' + str(user_id)])
        reseller = out.split('\n').pop(0)
        return reseller, package

    def admins(self):
        """
        List all admins names in given control panel
        :rtype: List
        :return: list of strings
        """
        return ['root']

    def is_admin(self, username):
        """
        Return True if username is in admin names
        :param str username: user to check
        :return: bool
        """
        return username in self.admins()

    @_not_supported
    def get_cp_description(self):
        """
        Retrieve panel name and it's version
        :return: dict: { 'name': 'panel_name', 'version': 'panel_version', 'additional_info': 'add_info'}
            or None if can't get info
        """
        pass

    @_not_supported
    def resellers(self):
        """
        Generates a list of resellers in the control panel
        :return: tuple of cpusers registered in the control panel
        :rtype: tuple
        :raise: NotSupported
        """
        pass

    @_not_supported
    def is_reseller(self, username):
        """
        Check if user is reseller;
        :type username: str
        :rtype: bool
        """
        pass

    @_not_supported
    def db_access(self):
        """
        Getting root access to mysql database.
        For example {'login': 'root', 'db': 'mysql', 'host': 'localhost', 'pass': '9pJUv38sAqqW'}
        :return: root access to mysql database
        :rtype: dict
        :raises: NoDBAccessData, NotSupported
        """
        pass

    @_not_supported
    def dblogin_cplogin_pairs(self, cplogin_lst=None, with_system_users=False):
        """
        Returs a list of pairs, the database user login - user login control panel
        For example:
            (('nata2_someuse', 'nata2'), ('testsome_dfrtbus', 'testsome'))
        :param list|tuple|None cplogin_lst: list of control panel users
        :param bool with_system_users: add system users to dbmapping
        :return: list of pairs, the database user login - user login control panel
        :rtype: tuple
        :raises: NotSupported, NoPackage
        """
        pass

    @_not_supported
    def cpinfo(self, cpuser=None, keyls=('cplogin', 'package', 'mail', 'reseller', 'dns'),
               search_sys_users=True):
        """
        Retrives information aboutv panel users
        :param str|unicode|list|tuple|None cpuser: user login
        :param keyls: - list of data which is necessary to obtain the user, the values can be:
               cplogin - name/login user control panel
               mail - Email users
               reseller - name reseller/owner users
               locale - localization of the user account
               package - User name of the package
               dns - domain of the user
               userid - user's uid
        :param search_sys_users:
        :return: returns a tuple of tuples of data in the same sequence as specified keys in keylst
        :rtype: tuple
        """
        pass

    @_not_supported
    def get_admin_emails_list(self):
        """
        Gets admin emails list
        :rtype: List
        :return: List: ['admin1@mail.com', 'admin2@mail.com' ]
        """
        pass

    @_not_supported
    def docroot(self, domain):
        """
        Return document root for domain
        :return: Cortege: (document_root, owner)
        """
        pass

    @_not_supported
    def userdomains(self, cpuser):
        """
        Return domain and document root pairs for control panel user
        first domain is main domain
        :param str|unicode cpuser: user login
        :rtype: List
        :return: list of tuples (domain_name, documen_root)
        """
        pass

    @_not_supported
    def homedirs(self):
        """
        Detects and returns list of folders contained the home dirs of users of the cPanel
        :rtype: List
        :return: list of folders, which are parent of home dirs of users of the panel
        """
        pass

    @_not_supported
    def reseller_users(self, resellername=None):
        """
        Return reseller users
        :param resellername: reseller name; autodetect name if None
        :rtype: List
        :return list[str]: user names list
        """
        pass

    @_not_supported
    def reseller_domains(self, resellername=None):
        """
        Return reseller users and their main domains
        :param resellername: reseller name; autodetect name if None
        :rtype: List
        :return dict[str, str]: pairs user <==> domain
        """
        pass

    @_not_supported
    def get_user_login_url(self, domain):
        """
        Get login url for current panel;
        :type domain: str
        :rtype: str
        :return: Panel login URL
        """
        pass

    @_not_supported
    def get_reseller_id_pairs(self):
        """
        Get dict reseller => id
        Optional method for panels without hard
        link reseller <=> system user
        :rtype: dict[str,int] - {'res1': id1}
        :return:
        """
        pass

    # not documented yet
    @_not_supported
    def get_domains_php_info(self):
        """
        Retrives dictionary information about php versions for each domain
        {
            'domain.com': {
                'php_version_id': 'ea-php70'
                'php_handler': lsapi | fpm | cgi | fastcgi
        }}
        :rtype: dict
        """
        pass

    @_not_supported
    def get_installed_php_versions(self):
        """
        Retrives list of php versions installed in panel
        :rtype: list
        """
        pass

    # not documented yet
    @_not_supported
    def get_system_php_info(self):
        """
        Retrives dictionary with system information about php
        :rtype: dict
        """
        pass

    @_not_supported
    def get_admin_locale(self):
        """
        :rtype: str
        """
        pass

    @staticmethod
    @_not_supported
    def get_encoding_name():
        """
        Retrive encoding name, used for package/reseller names, from panel
        :return:
        """
        pass

    def get_unsupported_cl_features(self) -> tuple[Feature, ...]:
        """
        Return list of CloudLinux features that cannot
        be used with current control panel.
        """
        raise NotImplementedError()

    @staticmethod
    def get_apache_ports_list():
        """
        Retrieves active httpd's ports from httpd's config
        :return: list of apache's ports
        """
        return [80]

    @staticmethod
    def get_apache_connections_number():
        """
        Retrieves Apache's connections number (from mod_status)
        For CM
        """
        # Unsupported panel
        return 0, "OK"

    @staticmethod
    def get_apache_max_request_workers():
        """
        Get current maximum request apache workers from httpd's config
        :return: tuple (max_req_num, message)
            max_req_num - Maximum request apache workers number or 0 if error
            message - OK/Trace
        """
        # Unsupported panel
        return 0, "OK"

    @staticmethod
    def get_main_username_by_uid(uid: int) -> str:
        """
        Get "main" panel username by uid.
        :param uid: uid
        :return Username
        """
        try:
            # Get username by id
            return getpwuid(uid).pw_name
        except KeyError:
            pass
        return 'N/A'

    @staticmethod
    def get_user_emails_list(username: str, domain: str) -> str:
        return ''

    @staticmethod
    def panel_login_link(username):
        return ''

    @staticmethod
    def panel_awp_link(username):
        return ''

    @staticmethod
    def get_hosting_accounts_count() -> int:
        """
        Get users count
        :return: number of users
        """
        if os.path.isfile(PANEL_USERS_COUNT_FILE):
            with open(PANEL_USERS_COUNT_FILE, 'r', encoding='utf-8') as f:
                count = f.read()
            if count:
                return int(count)
        return 0

    def get_customer_login(self, username):
        """
        99% of control panels log-in user with same username
        as system user has, except for Plesk
        """
        return username

    def get_domain_login(self, username, domain):
        """
        99% of control panels log-in user with same username
        as system user has, in the Plesk panel we need a subscription login
        """
        return username

    def get_server_ip(self):
        """
        Get ip of the server that is configured in control panel to be "main".
        """
        raise NotSupported('Unable to detect main ip for this server. '
                           'Contact CloudLinux support and report the issue.')

    def suspended_users_list(self):
        return []

Youez - 2016 - github.com/yon3zu
LinuXploit