Failed to save the file to the "xx" directory.

Failed to save the file to the "ll" directory.

Failed to save the file to the "mm" directory.

Failed to save the file to the "wp" directory.

403WebShell
403Webshell
Server IP : 66.29.132.124  /  Your IP : 3.137.186.26
Web Server : LiteSpeed
System : Linux business141.web-hosting.com 4.18.0-553.lve.el8.x86_64 #1 SMP Mon May 27 15:27:34 UTC 2024 x86_64
User : wavevlvu ( 1524)
PHP Version : 7.4.33
Disable Function : NONE
MySQL : OFF  |  cURL : ON  |  WGET : ON  |  Perl : ON  |  Python : ON  |  Sudo : OFF  |  Pkexec : OFF
Directory :  /opt/cloudlinux/venv/lib64/python3.11/site-packages/clcommon/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ Back ]     

Current File : /opt/cloudlinux/venv/lib64/python3.11/site-packages/clcommon/clcagefs.py
# -*- coding: utf-8 -*-

# Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2018 All Rights Reserved
#
# Licensed under CLOUD LINUX LICENSE AGREEMENT
# http://cloudlinux.com/docs/LICENSE.TXT
#

import os
import re
import subprocess
import sys

CAGEFS_MP_FILENAME = '/etc/cagefs/cagefs.mp'
CAGEFSCTL_TOOL     = '/usr/sbin/cagefsctl'


class CagefsMpConflict(Exception):
    def __init__(self, new_item, existing_item):

        self._msg = (
            f"Conflict in adding '{new_item}' to {CAGEFS_MP_FILENAME} "
            f"because of pre-existing alternative specification: '{existing_item}'"
        )

    def __str__(self):
        return self._msg


class CagefsNotSupportedError(Exception):
    """Cagefs Not Supported Exception"""
    def __init__(self, message):
        Exception.__init__(self, message)


class CagefsMpItem:

    PREFIX_LIST        = '@!%*'
    _PREFIX_MOUNT_RW   = ''
    _PREFIX_MOUNT_RO   = '!'

    def __init__(self, arg):
        """Constructor

        :param arg: Is either path to add to cagefs.mp or a raw line is read from cagefs.mp
        :param prefix: The same as adding prefix '!' to arg before passing it to ctor"""

        if arg[:1] == '#':  # is a comment? then init as dummy
            self._path_spec = None
        elif arg.strip() == '':  # init as dummy for empty lines
            self._path_spec = None
        else:
            self._path_spec = arg

    def mode(self, mode):
        """Specify mode as in fluent constructor"""

        if self.prefix() == '@' and mode is not None:
            self._path_spec = f"{self._path_spec},{mode:03o}"

        return self

    def __str__(self):
        return self._path_spec

    @staticmethod
    def _add_slash(path):
        if path == '':
            return '/'
        if path[-1] != '/':
            return path + '/'
        return path

    def pre_exist_in(self, another):

        adopted = CagefsMpItem._adopt(another)

        # overkill: just to keep strictly to comparing NULL objects principle
        if self.is_dummy() or adopted.is_dummy():
            return False

        this_path = CagefsMpItem._add_slash(self.path())
        test_preexist_in_path = CagefsMpItem._add_slash(adopted.path())
        return this_path.startswith(test_preexist_in_path)

    def is_compatible_by_prefix_with(self, existing):

        adopted = CagefsMpItem._adopt(existing)

        # overkill: just to keep strictly to comparing NULL objects principle
        if self.is_dummy() or adopted.is_dummy():
            return False

        if self.prefix() == adopted.prefix():
            return True

        prefix_compatibility_map = { CagefsMpItem._PREFIX_MOUNT_RW : [CagefsMpItem._PREFIX_MOUNT_RO] }
        null_options = []

        return self.prefix() in prefix_compatibility_map.get(adopted.prefix(), null_options)

    def is_dummy(self):
        return self._path_spec is None

    @staticmethod
    def _adopt(x):
        if isinstance(x, CagefsMpItem):
            return x
        else:
            return CagefsMpItem(x)

    @staticmethod
    def _cut_off_mode(path_spec):
        """Cut off mode from path spec like @/var/run/screen,777

        Only one comma per path spec is allowed ;-)"""

        return path_spec.split(',')[0]

    @staticmethod
    def _cut_off_prefix(path_spec):
        return path_spec.lstrip(CagefsMpItem.PREFIX_LIST)

    def path(self):
        return CagefsMpItem._cut_off_prefix(CagefsMpItem._cut_off_mode(self._path_spec))

    def prefix(self):
        if self._path_spec != self.path():
            return self._path_spec[0]
        else:
            return ''


def is_cagefs_present():
    return os.path.exists(CAGEFSCTL_TOOL)


def _mk_mount_dir_setup_perm(path, mode=0o755, owner_id=None, group_id=None):
    # -1 means 'unchanged'
    if group_id is None:
        group_id = -1
    if owner_id is None:
        owner_id = -1

    if not os.path.isdir(path):
        os.mkdir(path)

    if mode is not None:
        os.chmod(path, mode)

    os.chown(path, owner_id, group_id)


def _remount_cagefs(user=None, remount_in_background=False):
    if not is_cagefs_present():
        return

    if user is None:
        command = [CAGEFSCTL_TOOL, "--wait-lock", "--remount-all"]
    else:
        command = [CAGEFSCTL_TOOL, "--remount", user]
    if remount_in_background:
        subprocess.Popen(  # pylint: disable=consider-using-with
            command,
            stdout=subprocess.DEVNULL,
            stderr=subprocess.DEVNULL,
        )
    else:
        subprocess.run(
            command,
            stdout=subprocess.DEVNULL,
            stderr=subprocess.DEVNULL,
            check=False,
        )


def setup_mount_dir_cagefs(path, added_by=None, mode=0o755, owner_id=None, group_id=None,
                           prefix='', remount_cagefs=False, remount_in_background=False):
    """
    Add mount point to /etc/cagefs/cagefs.mp

    :param path: Directory path to be added in cagefs.mp and mounted
                 from within setup_mount_dir_cagefs().
                 If this directory does not exist, then it is created.

    :param added_by: package or component, mount dir relates to, or whatever will
                     stay in cagefs.mp with "# added by..." comment

    :param mode: If is not None: Regardless of whether directory exists or not prior this call,
                 it's permissions will be set to mode.

    :param owner_id: Regardless of whether directory exists or not prior this call,
                     it's owner id will be set to.
                     If None, the owner won't be changed.

    :param group_id: Regardless of whether directory exists or not prior this call,
                     it's group id will be set to.
                     If None, the group won't be changed.

    :param prefix: Mount point prefix. Default is mount as RW.
                   Pass '!' to add read-only mount point.
                   Refer CageFS section at http://docs.cloudlinux.com/ for more options.

    :param remount_cagefs: If True, cagefs skeleton will be automatically
                           remounted to apply changes.

    :param remount_in_background: If True, cagefs remount will be done in separate
                    background process, without waiting for completion

    :returns: None

    Propagates native EnvironmentError if no CageFS installed or something else goes wrong.

    Raises CagefsMpConflict if path is already specified in cagefs.mp, but in a way which is opposite
    to mount_as_readonly param.
    """

    _mk_mount_dir_setup_perm(path, mode, owner_id, group_id)

    if not is_cagefs_present():
        return

    # Create cagefs.mp if absent. It will be merged when cagefsctl --init.
    if not os.path.exists(CAGEFS_MP_FILENAME):
        subprocess.run(
            [CAGEFSCTL_TOOL, "--create-mp"],
            stdout=subprocess.DEVNULL,
            stderr=subprocess.DEVNULL,
            check=False,
        )

    add_new_line_to_cagefs_mp()
    # ^^
    # Hereafter we will not care if there was
    # 'no newline at the end of file'

    with open(CAGEFS_MP_FILENAME, "r+", encoding="utf-8") as f:
        new_item = CagefsMpItem(prefix + path).mode(mode)

        trim_nl_iter = (file_line.rstrip() for file_line in f)
        pre_exist_option = [x for x in trim_nl_iter if new_item.pre_exist_in(x)]

        if not pre_exist_option:
            f.seek(0, 2)  # 2: seek to the end of file

            if added_by is not None:
                # no newline is allowed
                added_by = added_by.strip()
                print("# next line is added by ", added_by, file=f)
            print(new_item, file=f)

            if remount_cagefs:
                _remount_cagefs(remount_in_background=remount_in_background)

        elif not new_item.is_compatible_by_prefix_with(pre_exist_option[-1]):
            raise CagefsMpConflict(new_item, pre_exist_option[-1])


def _get_cagefs_mp_lines():
    with open(CAGEFS_MP_FILENAME, "r", encoding="utf-8") as f:
        return f.readlines()


def _write_cagefs_mp_lines(lines):
    with open(CAGEFS_MP_FILENAME, "w", encoding="utf-8") as f:
        return f.writelines(lines)

# next line is added by
def add_new_line_to_cagefs_mp():
    """
    Add new line to the end of /etc/cagefs/cagefs.mp file if it is not there
    """
    lines = _get_cagefs_mp_lines()
    # file is not empty and last line in the file does not end with new line symbol ?
    if lines and lines[0] != '' and lines[-1][-1] != '\n':
        lines[-1] += '\n'
        _write_cagefs_mp_lines(lines)


def remove_mount_dir_cagefs(path, remount_cagefs=False, remount_in_background=False):
    """
    Remove mount points matching given path from cagefs.mp file
    :param str path: Path that should be removed from file.
    :param bool remount_cagefs: Remount cagefs skeleton or not
    :param remount_in_background: If True, cagefs remount will be done in separate
           background process, without waiting for completion
    :return: Nothing
    """
    lines = _get_cagefs_mp_lines()

    r = re.compile(rf'^[{CagefsMpItem.PREFIX_LIST}]?{re.escape(path)}(,\d+)?$')
    lines_with_excluded_path = [line for line in lines if not r.match(line)]

    # nothing to do if nothing found by pattern
    if len(lines) == len(lines_with_excluded_path):
        return

    _write_cagefs_mp_lines(lines_with_excluded_path)
    if remount_cagefs:
        _remount_cagefs(remount_in_background=remount_in_background)


def in_cagefs():
    """If this folder /var/.cagefs exists, it means process is inside cagefs"""
    return os.path.isdir('/var/.cagefs')


def _is_cagefs_enabled(user):
    """
    Check that cagefs enabled for user
    """
    try:
        cagefs_lib_dir = '/usr/share/cagefs/'
        if cagefs_lib_dir not in sys.path:
            sys.path.append(cagefs_lib_dir)
        import cagefsctl  # pylint: disable=import-outside-toplevel
    except ImportError:
        return False
    try:
        if not cagefsctl.is_user_enabled(user):
            return False
    except AttributeError as e:
        raise CagefsNotSupportedError(
            'ERROR: CageFS version is unsupported. Please update CageFS.'
        ) from e

    return True

Youez - 2016 - github.com/yon3zu
LinuXploit