Failed to save the file to the "xx" directory.

Failed to save the file to the "ll" directory.

Failed to save the file to the "mm" directory.

Failed to save the file to the "wp" directory.

403WebShell
403Webshell
Server IP : 66.29.132.124  /  Your IP : 3.142.198.148
Web Server : LiteSpeed
System : Linux business141.web-hosting.com 4.18.0-553.lve.el8.x86_64 #1 SMP Mon May 27 15:27:34 UTC 2024 x86_64
User : wavevlvu ( 1524)
PHP Version : 7.4.33
Disable Function : NONE
MySQL : OFF  |  cURL : ON  |  WGET : ON  |  Perl : ON  |  Python : ON  |  Sudo : OFF  |  Pkexec : OFF
Directory :  /opt/imunify360/venv/lib/python3.11/site-packages/defence360agent/subsys/features/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ Back ]     

Current File : /opt/imunify360/venv/lib/python3.11/site-packages/defence360agent/subsys/features/kernel_care.py
import logging
import os
import shutil
import json

from defence360agent.contracts.config import Core
from defence360agent.contracts.license import LicenseCLN
from defence360agent.utils import run_cmd_and_log, OsReleaseInfo, os_version
from defence360agent.subsys.features.abstract_feature import (
    AbstractFeature,
    FeatureError,
    FeatureStatus,
)
from defence360agent import utils
from defence360agent.rpc_tools import exceptions


logger = logging.getLogger(__name__)


class KernelCare(AbstractFeature):
    KC_PROPERTIES = "/var/imunify360/plesk-previous-kernelcare-stats.json"
    KC_SCRIPT_URL = (
        "https://repo.cloudlinux.com/kernelcare/kernelcare_install.sh"
    )
    LOG_DIR = "/var/log/%s" % Core.PRODUCT
    NAME = "KernelCare"
    BIN_PATH = "/usr/bin/kcarectl"
    INSTALL_LOG_FILE_MASK = "%s/install-kernelcare.log.*" % LOG_DIR
    REMOVE_LOG_FILE_MASK = "%s/remove-kernelcare.log.*" % LOG_DIR
    INSTALL_CMD = "curl -s %s | bash" % KC_SCRIPT_URL
    REMOVE_CMD_REDHAT = "yum remove -y kernelcare"
    REMOVE_CMD_DEBIAN = "apt-get -y remove kernelcare"

    _CMD_LIST = [INSTALL_CMD, REMOVE_CMD_REDHAT, REMOVE_CMD_DEBIAN]

    STATUS_MESSAGE = {
        0: "Host is updated to the latest patch level",
        1: "There are no applied patches",
        2: "There are new not applied patches",
        3: "Kernel is unsupported",
    }

    async def _check_installed_impl(self) -> bool:
        return os.path.exists(self.BIN_PATH)

    async def status(self):
        """
        :raises FeatureError: if kernelcare returns unexpected error
        :return: str: feature's current status
        """
        status = await super().status()
        is_feature_installed = (
            status["items"]["status"] == FeatureStatus.INSTALLED
        )
        if not is_feature_installed:
            return status

        ret, out, err = await self.get_output_kcarectl("--status")
        try:
            # EDF is obsolete since 6.1
            status["items"]["edf_supported"] = False
            status["items"]["message"] = self.STATUS_MESSAGE[ret]
        except KeyError:
            raise FeatureError(
                "Unknown error occured while getting status from kcarectl. "
                f"stdout: [{out}], stderr: [{err}], return code: [{ret}]"
            )

        return status

    @AbstractFeature.raise_if_shouldnt_install_now
    async def install(self):
        return await run_cmd_and_log(
            self.INSTALL_CMD,
            self.INSTALL_LOG_FILE_MASK,
            env=dict(os.environ, DEBIAN_FRONTEND="noninteractive"),
        )

    @AbstractFeature.raise_if_shouldnt_remove_now
    async def remove(self):
        if OsReleaseInfo.id_like() & OsReleaseInfo.DEBIAN:
            command = self.REMOVE_CMD_DEBIAN
        else:
            command = self.REMOVE_CMD_REDHAT
        return await run_cmd_and_log(command, self.REMOVE_LOG_FILE_MASK)

    async def get_plugin_info(self):
        try:
            output = await self.run_kcarectl("--plugin-info", "--json")
        except FileNotFoundError:
            raise exceptions.RpcError("kcarectl not found")
        except utils.CheckRunError as e:
            if not (e.returncode == 2 and b"--json" in e.stderr):
                raise  # reraise as is
            else:  # unrecognized arguments: --json
                # use RpcError, to get an error
                raise exceptions.RpcError(
                    "Your kcarectl version doesn't support --json option."
                    " Please, update to kernelcare-2.15-2 or newer."
                )
        try:
            results = json.loads(output.decode().partition("--START--")[-1])
        except ValueError:
            raise exceptions.RpcError(
                "Can't decode kcarectl output as json."
                " Try updating to the latest kernelcare version."
            )
        return results

    async def run_kcarectl(self, *options):
        return await utils.check_run((self.BIN_PATH,) + options)

    async def get_output_kcarectl(self, *options):
        ret, out, err = await utils.run([self.BIN_PATH, *options])
        return ret, out.decode(), err.decode()

Youez - 2016 - github.com/yon3zu
LinuXploit