Failed to save the file to the "xx" directory.

Failed to save the file to the "ll" directory.

Failed to save the file to the "mm" directory.

Failed to save the file to the "wp" directory.

403WebShell
403Webshell
Server IP : 66.29.132.124  /  Your IP : 18.224.60.19
Web Server : LiteSpeed
System : Linux business141.web-hosting.com 4.18.0-553.lve.el8.x86_64 #1 SMP Mon May 27 15:27:34 UTC 2024 x86_64
User : wavevlvu ( 1524)
PHP Version : 7.4.33
Disable Function : NONE
MySQL : OFF  |  cURL : ON  |  WGET : ON  |  Perl : ON  |  Python : ON  |  Sudo : OFF  |  Pkexec : OFF
Directory :  /opt/imunify360/venv/lib/python3.11/site-packages/defence360agent/utils/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ Back ]     

Current File : /opt/imunify360/venv/lib/python3.11/site-packages/defence360agent/utils//resource_limits.py
import asyncio
import logging

from enum import Enum
from os import fsdecode
from pathlib import Path
from typing import List

from defence360agent.utils import OsReleaseInfo

logger = logging.getLogger(__name__)


RUN_WITH_INTENSITY = "/usr/libexec/run-with-intensity"

LVECTL_BIN_PATH = Path("/usr/sbin/lvectl")
PROC_LVE_LIST_PATH = Path("/proc/lve/list")


class LimitsMethod(Enum):
    NICE = "nice"
    LVE = "lve"
    CGROUPS = "cgroups"


async def get_current_method() -> LimitsMethod:
    """Returns limit method, used in run-with-intensity tool."""
    proc = await asyncio.create_subprocess_exec(
        RUN_WITH_INTENSITY,
        "show",
        stdout=asyncio.subprocess.PIPE,
        stderr=asyncio.subprocess.PIPE,
    )

    stdout, stderr = await proc.communicate()
    stdout = fsdecode(stdout).strip()

    if stdout == "nice":
        return LimitsMethod.NICE
    if stdout == "lve":
        return LimitsMethod.LVE
    if stdout == "cgroups":
        return LimitsMethod.CGROUPS
    raise LookupError(
        "Parsing of used limitation method failed\nstdout: {}\nstderr: {}"
        .format(stdout, fsdecode(stderr).strip())
    )


async def create_subprocess(
    cmd: List[str],
    key: str,
    intensity_cpu: int,
    intensity_io: int,
    **subprocess_kwargs
) -> asyncio.subprocess.Process:
    """
    Creates asyncio.Process with limited resources (cpu & io),
    using run-with-intensity tool.

    :param cmd: command to execute
    :param intensity_cpu: cpu intensity limit
    :param intensity_io: io intensity limit
    :param subprocess_kwargs: keyword arguments for create_subprocess_exec func
    :return: executed Process
    """
    limits_cmd = [
        RUN_WITH_INTENSITY,
        "run",
        "--intensity-cpu",
        str(intensity_cpu),
        "--intensity-io",
        str(intensity_io),
    ]
    limits_cmd.extend(["--key", key])

    return await asyncio.create_subprocess_exec(
        *(limits_cmd + cmd), **subprocess_kwargs
    )


def is_lve_active() -> bool:
    """Checks that LVE-utils is active resource limiter."""
    # to avoid possible errors such as DEF-11941
    # make sure that OS is CL
    return PROC_LVE_LIST_PATH.exists() and OsReleaseInfo.is_cloudlinux()


def has_lvectl() -> bool:
    """Checks that LVE-utils is installed."""
    return LVECTL_BIN_PATH.exists()

Youez - 2016 - github.com/yon3zu
LinuXploit