Failed to save the file to the "xx" directory.

Failed to save the file to the "ll" directory.

Failed to save the file to the "mm" directory.

Failed to save the file to the "wp" directory.

403WebShell
403Webshell
Server IP : 66.29.132.124  /  Your IP : 3.139.80.194
Web Server : LiteSpeed
System : Linux business141.web-hosting.com 4.18.0-553.lve.el8.x86_64 #1 SMP Mon May 27 15:27:34 UTC 2024 x86_64
User : wavevlvu ( 1524)
PHP Version : 7.4.33
Disable Function : NONE
MySQL : OFF  |  cURL : ON  |  WGET : ON  |  Perl : ON  |  Python : ON  |  Sudo : OFF  |  Pkexec : OFF
Directory :  /opt/imunify360/venv/lib/python3.11/site-packages/clcommon/lib/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ Back ]     

Current File : /opt/imunify360/venv/lib/python3.11/site-packages/clcommon/lib/cledition.py
# coding=utf-8
#
# Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2020 All Rights Reserved
#
# Licensed under CLOUD LINUX LICENSE AGREEMENT
# http://cloudlinux.com/docs/LICENCE.TXT
#

import os
import subprocess
import warnings
from enum import Enum
from typing import AnyStr

from jwt import exceptions

from clcommon.lib.consts import CLN_JWT_TOKEN_PATH, CL_EDITION_FILE_FOR_USERS
from clcommon.lib.jwt_token import read_jwt, decode_jwt, jwt_token_check
from clcommon.clexception import FormattedException
from clcommon.clcagefs import in_cagefs
# The function below used to be in this module, but were moved to clcommon.utils
# So, importing them just for backward compatibility
from clcommon.utils import get_os_version, is_ubuntu, is_secureboot_enabled  # NOQA

CL_SOLO_EDITION_FILE_MARKER = '/etc/cloudlinux-edition-solo'
CL_ADMIN_EDITION_FILE_MARKER = '/etc/cloudlinux-edition-admin'

SHARED_PRO_EDITION_HUMAN_READABLE = 'CloudLinux OS Shared Pro'
SHARED_EDITION_HUMAN_READABLE = 'CloudLinux OS Shared'
SOLO_EDITION_HUMAN_READABLE = 'CloudLinux OS Solo'
ADMIN_EDITION_HUMAN_READABLE = 'CloudLinux OS Admin'


class SupportedEditions(Enum):
    """
    Keeps supported CloudLinux editions
    """

    SOLO = 'solo'
    SHARED = 'shared'
    SHARED_PRO = 'shared_pro'
    ADMIN = 'admin'


class CLEditionDetectionError(FormattedException):
    def __init__(self, message, **context):
        FormattedException.__init__(self, {
            'message': message,
            'context': context
        })


HUMAN_READABLE_TOKEN_EDITION_PAIRS = {
    SOLO_EDITION_HUMAN_READABLE: SupportedEditions.SOLO.value,
    SHARED_PRO_EDITION_HUMAN_READABLE: SupportedEditions.SHARED_PRO.value,
    SHARED_EDITION_HUMAN_READABLE: SupportedEditions.SHARED.value,
    ADMIN_EDITION_HUMAN_READABLE: SupportedEditions.ADMIN.value
}


class CLEditions:

    @staticmethod
    def get_from_jwt(token=None, verify_exp=True):
        """
        Note: be careful when modifying this method.
        Passing token in is used in X-Ray,
        ask @dkavchuk or someone else from C-Projects team for details
        :param token:
        :param verify_exp:
        :return:
        """
        if token is None:
            token = read_jwt(CLN_JWT_TOKEN_PATH)
        try:
            jwt = decode_jwt(token, verify_exp=verify_exp)
        except exceptions.PyJWTError as e:
            raise CLEditionDetectionError(
                f'Unable to detect edition from jwt token: {CLN_JWT_TOKEN_PATH}. '
                f'Please, make sure it is not broken, error: {e}'
            ) from e
        try:
            return jwt['edition']
        except KeyError as e:
            # fallback for old format of tokens
            cl_plus = jwt.get('cl_plus')
            if cl_plus is None:
                raise CLEditionDetectionError(
                    f'Unable to detect edition from jwt token: {CLN_JWT_TOKEN_PATH}. '
                    f'Please, make sure it is not broken, error: not enough fields for detection'
                ) from e
            return SupportedEditions.SHARED_PRO.value if cl_plus else SupportedEditions.SHARED.value

    @staticmethod
    def is_package_installed(package_name):
        with subprocess.Popen(
            ['rpm', '-q', package_name],
            stdout=subprocess.PIPE,
            stderr=subprocess.PIPE,
            text=True,
            env={**os.environ, **{'LC_ALL': 'C'}},
        ) as proc:
            out, err = proc.communicate()
            if proc.returncode != 0:
                if 'is not installed' in out:
                    return False

                raise CLEditionDetectionError(
                    f'Unable to check if package {package_name} is installed, '
                    f'reason: {out}, {err}'
                )
        return True

    @classmethod
    def get_cl_edition(cls, skip_jwt_check=False, skip_marker_check=False,
                       raw_jwt=None, verify_exp=True):
        """
        1. Try to detect edition from jwt token
           if edition field is in token -> return edition
           if edition field is not present -> recheck via cl_plus flag
           if token is broken -> raise error about it
        2. Try to detect from file
           if edition is in file -> return edition
           if file is broken -> raise about it
           if file is missed -> check if meta package is present
           Detection from file may be switched off using skip_marker_check
        In case there is no token with correct edition,
        no file we consider edition as regular CL.
        Note: be careful when modifying this method.
        It is used in X-Ray, ask @dkavchuk or someone else from C-Projects team
        for details
        """
        # skipping both jwt and file checks is not allowed
        if skip_jwt_check and skip_marker_check:
            raise CLEditionDetectionError(
                'Unable to detect edition: neither jwt token check, no file marker check enabled')

        if not skip_jwt_check and os.path.isfile(CLN_JWT_TOKEN_PATH):
            try:
                edition = cls.get_from_jwt(token=raw_jwt, verify_exp=verify_exp)
            except PermissionError:
                edition = user_cl_edition()

            if edition:
                return edition

        # In case if user in cagefs.
        if in_cagefs():
            return user_cl_edition()

        # if fallback to file is applicable
        if not skip_marker_check:
            # jwt has no 'edition' field -> ensure it is really solo via file
            if os.path.isfile(CL_SOLO_EDITION_FILE_MARKER):
                return SupportedEditions.SOLO.value
            elif os.path.isfile(CL_ADMIN_EDITION_FILE_MARKER):
                return SupportedEditions.ADMIN.value
            return user_cl_edition() or SupportedEditions.SHARED.value


def is_cl_solo_edition(skip_jwt_check=True, skip_marker_check=False, verify_exp=True):
    """
    Allow skip_jwt_check ONLY if it not critical when license is not valid
    Use skip_marker_check=True when validity of license is critical
     and fallback to file is not applicable
    Note: be careful when modifying this method.
    It is used in X-Ray and SSA,
    ask @dkavchuk or someone else from C-Projects team for details
    """

    edition = CLEditions.get_cl_edition(
        skip_jwt_check=skip_jwt_check,
        skip_marker_check=skip_marker_check,
        verify_exp=verify_exp
    )
    return edition is not None and edition == SupportedEditions.SOLO.value


def is_cl_shared_edition(skip_jwt_check=False, skip_marker_check=False, verify_exp=True):
    """
    Allow skip_jwt_check ONLY if it not critical when license is not valid
    Use skip_marker_check=True when validity of license is critical
     and fallback to file is not applicable
    """

    edition = CLEditions.get_cl_edition(
        skip_jwt_check=skip_jwt_check,
        skip_marker_check=skip_marker_check,
        verify_exp=verify_exp
    )
    return edition is not None and edition == SupportedEditions.SHARED.value


def is_cl_shared_pro_edition(skip_jwt_check=False,
                             skip_marker_check=False,
                             verify_exp=True):
    """
    Allow skip_jwt_check ONLY if it not critical when license is not valid
    Use skip_marker_check=True when validity of license is critical
     and fallback to file is not applicable
    """

    edition = CLEditions.get_cl_edition(
        skip_jwt_check=skip_jwt_check,
        skip_marker_check=skip_marker_check,
        verify_exp=verify_exp
    )
    return edition is not None and edition == SupportedEditions.SHARED_PRO.value

def is_cl_shared_pro_edition_safely(**kwargs):
    """
    Supress edition checking error and returns False
    """
    try:
        return is_cl_shared_pro_edition(**kwargs)
    except CLEditionDetectionError:
        return False



def is_cl_admin_edition(skip_jwt_check=False,
                        skip_marker_check=False,
                        verify_exp=True):
    """
    Allow skip_jwt_check ONLY if it not critical when license is not valid
    Use skip_marker_check=True when validity of license is critical
     and fallback to file is not applicable
    """

    edition = CLEditions.get_cl_edition(
        skip_jwt_check=skip_jwt_check,
        skip_marker_check=skip_marker_check,
        verify_exp=verify_exp
    )
    return edition is not None and edition == SupportedEditions.ADMIN.value


def get_cl_edition_readable() -> AnyStr:
    """
    Function returns current edition of CL:
    - CloudLinux OS Shared
    - CloudLinux OS Shared Pro
    - CloudLinux OS Solo
    - Error string
    """

    try:
        if is_cl_solo_edition(skip_jwt_check=True):
            return SOLO_EDITION_HUMAN_READABLE
        elif is_cl_admin_edition(skip_jwt_check=True):
            return ADMIN_EDITION_HUMAN_READABLE
    except CLEditionDetectionError:
        return SHARED_EDITION_HUMAN_READABLE

    success_flag, error_message, _ = jwt_token_check()

    # This error message indicates that user could not read jwt.token because not enough privileges
    # We have to check existence of token file, to be sure that privilege error acquired
    if 'read error' in error_message and (os.path.isfile(CLN_JWT_TOKEN_PATH) or in_cagefs()):
        with open(CL_EDITION_FILE_FOR_USERS, 'r', encoding='utf-8') as f:
            return f.read().strip()

    if success_flag:
        return SHARED_PRO_EDITION_HUMAN_READABLE
    else:
        return SHARED_EDITION_HUMAN_READABLE


def print_skip_message_on_solo():
    """Just print skip message"""

    warnings.warn(
        'as part of unification effort, features might not map directly to editions',
        DeprecationWarning,
    )
    print('CloudLinux Solo edition detected! \n'
          'Command is skipped, because it is unsupported and unneeded on current edition')


# NOTE(vlebedev): To properly avoid circular imports, these 2 functions' implementations
#                 are moved to "utils" submodule of "clcommon.cpapi" as it is the level
#                 where all necessary panel stuff is imported and constructed and which
#                 uses "clcommon.lib" module as a dependency. In other words "clcommon.lib" lays
#                 lower in the modules hierarchy and should not import from "clcommon.cpapi").
def skip_without_lve():
    warnings.warn(
        'since v3.4.1 use "clcommon.cpapi.utils:skip_without_lve" instead',
        DeprecationWarning,
    )
    from clcommon.cpapi.utils import skip_without_lve  # pylint: disable=cyclic-import,import-outside-toplevel
    return skip_without_lve()


def lve_supported_or_exit(f):
    warnings.warn(
        'since v3.4.1 use "clcommon.cpapi.utils:lve_supported_or_exit" instead',
        DeprecationWarning,
    )
    from clcommon.cpapi.utils import lve_supported_or_exit  # pylint: disable=cyclic-import,import-outside-toplevel
    return lve_supported_or_exit(f)


def user_cl_edition():
    """This function is used as workaround for users to be able to get CL Edition.
    Crontab will write to CL_EDITION_FILE_FOR_USERS, output of cldetect --detect-edition command with 0644 permission.
    """
    if os.path.exists(CL_EDITION_FILE_FOR_USERS):
        with open(CL_EDITION_FILE_FOR_USERS, 'r', encoding='utf-8') as f:
            edition = f.read().strip()

        return HUMAN_READABLE_TOKEN_EDITION_PAIRS.get(edition)
    return None


def is_container() -> bool:
    """
    Determines is this system running inside container
    """
    return os.path.exists('/etc/cloudlinux-container')

Youez - 2016 - github.com/yon3zu
LinuXploit