Failed to save the file to the "xx" directory.

Failed to save the file to the "ll" directory.

Failed to save the file to the "mm" directory.

Failed to save the file to the "wp" directory.

403WebShell
403Webshell
Server IP : 66.29.132.124  /  Your IP : 18.219.158.84
Web Server : LiteSpeed
System : Linux business141.web-hosting.com 4.18.0-553.lve.el8.x86_64 #1 SMP Mon May 27 15:27:34 UTC 2024 x86_64
User : wavevlvu ( 1524)
PHP Version : 7.4.33
Disable Function : NONE
MySQL : OFF  |  cURL : ON  |  WGET : ON  |  Perl : ON  |  Python : ON  |  Sudo : OFF  |  Pkexec : OFF
Directory :  /opt/cloudlinux/venv/lib/python3.11/site-packages/clcommon/lib/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ Back ]     

Current File : /opt/cloudlinux/venv/lib/python3.11/site-packages/clcommon/lib/cledition.py
# coding=utf-8
#
# Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2020 All Rights Reserved
#
# Licensed under CLOUD LINUX LICENSE AGREEMENT
# http://cloudlinux.com/docs/LICENCE.TXT
#

import os
import subprocess
import warnings
from enum import Enum
from typing import AnyStr

from jwt import exceptions

from clcommon.lib.consts import CLN_JWT_TOKEN_PATH, CL_EDITION_FILE_FOR_USERS
from clcommon.lib.jwt_token import read_jwt, decode_jwt, jwt_token_check
from clcommon.clexception import FormattedException
from clcommon.clcagefs import in_cagefs
# The function below used to be in this module, but were moved to clcommon.utils
# So, importing them just for backward compatibility
from clcommon.utils import get_os_version, is_ubuntu, is_secureboot_enabled  # NOQA

CL_SOLO_EDITION_FILE_MARKER = '/etc/cloudlinux-edition-solo'
CL_ADMIN_EDITION_FILE_MARKER = '/etc/cloudlinux-edition-admin'

SHARED_PRO_EDITION_HUMAN_READABLE = 'CloudLinux OS Shared Pro'
SHARED_EDITION_HUMAN_READABLE = 'CloudLinux OS Shared'
SOLO_EDITION_HUMAN_READABLE = 'CloudLinux OS Solo'
ADMIN_EDITION_HUMAN_READABLE = 'CloudLinux OS Admin'


class SupportedEditions(Enum):
    """
    Keeps supported CloudLinux editions
    """

    SOLO = 'solo'
    SHARED = 'shared'
    SHARED_PRO = 'shared_pro'
    ADMIN = 'admin'


class CLEditionDetectionError(FormattedException):
    def __init__(self, message, **context):
        FormattedException.__init__(self, {
            'message': message,
            'context': context
        })


HUMAN_READABLE_TOKEN_EDITION_PAIRS = {
    SOLO_EDITION_HUMAN_READABLE: SupportedEditions.SOLO.value,
    SHARED_PRO_EDITION_HUMAN_READABLE: SupportedEditions.SHARED_PRO.value,
    SHARED_EDITION_HUMAN_READABLE: SupportedEditions.SHARED.value,
    ADMIN_EDITION_HUMAN_READABLE: SupportedEditions.ADMIN.value
}


class CLEditions:

    @staticmethod
    def get_from_jwt(token=None, verify_exp=True):
        """
        Note: be careful when modifying this method.
        Passing token in is used in X-Ray,
        ask @dkavchuk or someone else from C-Projects team for details
        :param token:
        :param verify_exp:
        :return:
        """
        if token is None:
            token = read_jwt(CLN_JWT_TOKEN_PATH)
        try:
            jwt = decode_jwt(token, verify_exp=verify_exp)
        except exceptions.PyJWTError as e:
            raise CLEditionDetectionError(
                f'Unable to detect edition from jwt token: {CLN_JWT_TOKEN_PATH}. '
                f'Please, make sure it is not broken, error: {e}'
            ) from e
        try:
            return jwt['edition']
        except KeyError as e:
            # fallback for old format of tokens
            cl_plus = jwt.get('cl_plus')
            if cl_plus is None:
                raise CLEditionDetectionError(
                    f'Unable to detect edition from jwt token: {CLN_JWT_TOKEN_PATH}. '
                    f'Please, make sure it is not broken, error: not enough fields for detection'
                ) from e
            return SupportedEditions.SHARED_PRO.value if cl_plus else SupportedEditions.SHARED.value

    @staticmethod
    def is_package_installed(package_name):
        with subprocess.Popen(
            ['rpm', '-q', package_name],
            stdout=subprocess.PIPE,
            stderr=subprocess.PIPE,
            text=True,
            env={**os.environ, **{'LC_ALL': 'C'}},
        ) as proc:
            out, err = proc.communicate()
            if proc.returncode != 0:
                if 'is not installed' in out:
                    return False

                raise CLEditionDetectionError(
                    f'Unable to check if package {package_name} is installed, '
                    f'reason: {out}, {err}'
                )
        return True

    @classmethod
    def get_cl_edition(
        cls,
        skip_jwt_check=False,
        skip_marker_check=False,
        raw_jwt=None,
        verify_exp=True,
    ) -> str | None:
        """
        1. Try to detect CL edition from JWT token;
            If edition field is in token -> return edition
            If edition field is not present -> recheck via cl_plus flag (see get_from_jwt)
            If token is broken -> raise an error about it
        2. Try to detect edition from files.
            If JWT edition retreival was attempted and failed, or if we're in cagefs
            -> get edition from CL_EDITION_FILE_FOR_USERS
            If a edition marker file for Solo or Admin edition is present -> return corresponding edition
            Otherwise, fall back to CL_EDITION_FILE_FOR_USERS again
            Detection from marker files may be switched off using the `skip_marker_check` parameter.

        @param skip_jwt_check: if True, skip checking JWT token for edition.
        @param skip_marker_check: if True, skip checking marker files for edition.
        @param raw_jwt: raw JWT token to use for edition detection. If None, token will be read from file.
        @param verify_exp: if True, verify JWT token expiration time.
        @return: Edition of CloudLinux OS or None if edition could not be determined.

        Note: be careful when modifying this method.
        It is used in X-Ray, ask @dkavchuk or someone else from C-Projects team for details.
        """
        # Skipping both jwt and file checks is not allowed
        if skip_jwt_check and skip_marker_check:
            raise CLEditionDetectionError(
                "Unable to detect edition: neither jwt token check, no file marker check enabled"
            )

        # If we have a valid JWT token, try to get edition from it
        if not skip_jwt_check and os.path.isfile(CLN_JWT_TOKEN_PATH):
            try:
                edition = cls.get_from_jwt(token=raw_jwt, verify_exp=verify_exp)
            except PermissionError:
                edition = user_cl_edition()
            if edition:
                return edition

        # If user is in cagefs, look at the CL_EDITION_FILE_FOR_USERS file first
        if in_cagefs():
            return user_cl_edition()

        # Otherwise, check for edition marker files
        if not skip_marker_check:
            # jwt has no 'edition' field -> ensure it really is Solo via file
            if os.path.isfile(CL_SOLO_EDITION_FILE_MARKER):
                return SupportedEditions.SOLO.value
            elif os.path.isfile(CL_ADMIN_EDITION_FILE_MARKER):
                return SupportedEditions.ADMIN.value
            # Finally, if no marker files are present, try CL_EDITION_FILE_FOR_USERS
            return user_cl_edition()

        # Couldn't detect edition through any means - return None
        return None


def is_cl_solo_edition(skip_jwt_check=True, skip_marker_check=False, verify_exp=True):
    """
    Allow skip_jwt_check ONLY if it not critical when license is not valid
    Use skip_marker_check=True when validity of license is critical
     and fallback to file is not applicable
    Note: be careful when modifying this method.
    It is used in X-Ray and SSA,
    ask @dkavchuk or someone else from C-Projects team for details
    """

    edition = CLEditions.get_cl_edition(
        skip_jwt_check=skip_jwt_check,
        skip_marker_check=skip_marker_check,
        verify_exp=verify_exp
    )
    return edition is not None and edition == SupportedEditions.SOLO.value


def is_cl_shared_edition(skip_jwt_check=False, skip_marker_check=False, verify_exp=True):
    """
    Allow skip_jwt_check ONLY if it not critical when license is not valid
    Use skip_marker_check=True when validity of license is critical
     and fallback to file is not applicable
    """

    edition = CLEditions.get_cl_edition(
        skip_jwt_check=skip_jwt_check,
        skip_marker_check=skip_marker_check,
        verify_exp=verify_exp
    )
    return edition is not None and edition == SupportedEditions.SHARED.value


def is_cl_shared_pro_edition(skip_jwt_check=False,
                             skip_marker_check=False,
                             verify_exp=True):
    """
    Allow skip_jwt_check ONLY if it not critical when license is not valid
    Use skip_marker_check=True when validity of license is critical
     and fallback to file is not applicable
    """

    edition = CLEditions.get_cl_edition(
        skip_jwt_check=skip_jwt_check,
        skip_marker_check=skip_marker_check,
        verify_exp=verify_exp
    )
    return edition is not None and edition == SupportedEditions.SHARED_PRO.value


def is_cl_shared_pro_edition_safely(**kwargs):
    """
    Supress edition checking error and returns False
    """
    try:
        return is_cl_shared_pro_edition(**kwargs)
    except CLEditionDetectionError:
        return False


def is_cl_admin_edition(skip_jwt_check=False,
                        skip_marker_check=False,
                        verify_exp=True):
    """
    Allow skip_jwt_check ONLY if it not critical when license is not valid
    Use skip_marker_check=True when validity of license is critical
     and fallback to file is not applicable
    """

    edition = CLEditions.get_cl_edition(
        skip_jwt_check=skip_jwt_check,
        skip_marker_check=skip_marker_check,
        verify_exp=verify_exp
    )
    return edition is not None and edition == SupportedEditions.ADMIN.value


def get_cl_edition_readable() -> AnyStr:
    """
    Function returns current edition of CL:
    - CloudLinux OS Shared
    - CloudLinux OS Shared Pro
    - CloudLinux OS Solo
    - Error string
    """

    try:
        if is_cl_solo_edition(skip_jwt_check=True):
            return SOLO_EDITION_HUMAN_READABLE
        elif is_cl_admin_edition(skip_jwt_check=True):
            return ADMIN_EDITION_HUMAN_READABLE
    except CLEditionDetectionError:
        return SHARED_EDITION_HUMAN_READABLE

    success_flag, error_message, _ = jwt_token_check()

    # This error message indicates that user could not read jwt.token because not enough privileges
    # We have to check existence of token file, to be sure that privilege error acquired
    if 'read error' in error_message and (os.path.isfile(CLN_JWT_TOKEN_PATH) or in_cagefs()):
        with open(CL_EDITION_FILE_FOR_USERS, 'r', encoding='utf-8') as f:
            return f.read().strip()

    if success_flag:
        return SHARED_PRO_EDITION_HUMAN_READABLE
    else:
        return SHARED_EDITION_HUMAN_READABLE


def print_skip_message_on_solo():
    """Just print skip message"""

    warnings.warn(
        'as part of unification effort, features might not map directly to editions',
        DeprecationWarning,
    )
    print('CloudLinux Solo edition detected! \n'
          'Command is skipped, because it is unsupported and unneeded on current edition')


# NOTE(vlebedev): To properly avoid circular imports, these 2 functions' implementations
#                 are moved to "utils" submodule of "clcommon.cpapi" as it is the level
#                 where all necessary panel stuff is imported and constructed and which
#                 uses "clcommon.lib" module as a dependency. In other words "clcommon.lib" lays
#                 lower in the modules hierarchy and should not import from "clcommon.cpapi").
def skip_without_lve():
    warnings.warn(
        'since v3.4.1 use "clcommon.cpapi.utils:skip_without_lve" instead',
        DeprecationWarning,
    )
    from clcommon.cpapi.utils import skip_without_lve  # pylint: disable=cyclic-import,import-outside-toplevel
    return skip_without_lve()


def lve_supported_or_exit(f):
    warnings.warn(
        'since v3.4.1 use "clcommon.cpapi.utils:lve_supported_or_exit" instead',
        DeprecationWarning,
    )
    from clcommon.cpapi.utils import lve_supported_or_exit  # pylint: disable=cyclic-import,import-outside-toplevel
    return lve_supported_or_exit(f)


def user_cl_edition():
    """
    This function is used as a workaround for users to be able to get the CL Edition.
    Crontab will write output of `cldetect --detect-edition` command to CL_EDITION_FILE_FOR_USERS, with 0644 permission.
    """
    if os.path.exists(CL_EDITION_FILE_FOR_USERS):
        with open(CL_EDITION_FILE_FOR_USERS, "r", encoding="utf-8") as f:
            edition = f.read().strip()

        return HUMAN_READABLE_TOKEN_EDITION_PAIRS.get(edition)
    return None


def is_container() -> bool:
    """
    Determines is this system running inside container
    """
    return os.path.exists('/etc/cloudlinux-container')

Youez - 2016 - github.com/yon3zu
LinuXploit