Failed to save the file to the "xx" directory.

Failed to save the file to the "ll" directory.

Failed to save the file to the "mm" directory.

Failed to save the file to the "wp" directory.

403WebShell
403Webshell
Server IP : 66.29.132.124  /  Your IP : 18.119.253.198
Web Server : LiteSpeed
System : Linux business141.web-hosting.com 4.18.0-553.lve.el8.x86_64 #1 SMP Mon May 27 15:27:34 UTC 2024 x86_64
User : wavevlvu ( 1524)
PHP Version : 7.4.33
Disable Function : NONE
MySQL : OFF  |  cURL : ON  |  WGET : ON  |  Perl : ON  |  Python : ON  |  Sudo : OFF  |  Pkexec : OFF
Directory :  /proc/self/root/opt/imunify360/venv/lib/python3.11/site-packages/defence360agent/rpc_tools/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ Back ]     

Current File : /proc/self/root/opt/imunify360/venv/lib/python3.11/site-packages/defence360agent/rpc_tools/utils.py
import pickle
import asyncio
import os
import socket
from contextlib import suppress
from functools import lru_cache, wraps
from itertools import chain
from pathlib import Path
from typing import Optional, Tuple, Generator

import yaml
import psutil

from defence360agent.contracts.config import SimpleRpc as Config
from defence360agent.model.simplification import run_in_executor
from defence360agent.rpc_tools.validate import ValidationError
from defence360agent.utils import (
    AV_PID_PATH,
    IM360_NON_RESIDENT_PID_PATH,
    IM360_RESIDENT_PID_PATH,
    antivirus_mode,
    is_centos6_or_cloudlinux6,
    is_root_user,
)


def rpc_is_running():
    """Check if non-resident agent instance is running"""
    # we use socket activation, so we could not use socket for this purpose
    # check process instead
    rpc_process_pid_path = (
        AV_PID_PATH if antivirus_mode.enabled else IM360_NON_RESIDENT_PID_PATH
    )
    if rpc_process_pid_path.exists():
        current_pid = os.getpid()
        with suppress(Exception):
            pid = int(rpc_process_pid_path.read_text())
            if is_centos6_or_cloudlinux6():
                # imunify agent process is a child process of minidaemon
                pid = psutil.Process(pid).children()[0].pid
            return pid != current_pid and psutil.pid_exists(pid)
    return False


def is_running():
    """Check if the agent instance is running"""
    if Config.SOCKET_ACTIVATION:
        return rpc_is_running()

    if IM360_RESIDENT_PID_PATH.exists():
        current_pid = os.getpid()
        pid = int(IM360_RESIDENT_PID_PATH.read_text())
        return pid != current_pid and psutil.pid_exists(pid)
    return False


def _find_schema(base=None, schema_dir="schema"):
    for path in find_schema_files(base, schema_dir):
        if (p := path.with_suffix(".pickle")).exists():
            document = pickle.loads(p.read_bytes())
        else:
            document = yaml.safe_load(path.read_text())

        for k, v in document.items():
            # converting keys - from strings to tuples
            yield tuple(k.split(" ")), v


def find_schema_files(
    base=None, schema_dir="schema"
) -> Generator[Path, None, None]:
    p = base / schema_dir
    for path in [*p.rglob("*.yaml"), *p.rglob("*.yml")]:
        yield path


def get_schema_paths(paths: Optional[Tuple[Path]] = None):
    result = list(paths)[:] if paths else []
    path = Path(__file__).parent.parent / "simple_rpc"
    result.extend(
        [
            path,
            path.parent / "feature_management" / "rpc",
        ]
    )
    return result


@lru_cache(1)
def prepare_schema(paths):
    schema = []
    for base_path in get_schema_paths(paths):
        schema.append(_find_schema(base_path))
    return dict(chain(*schema))


def run_in_executor_decorator(f):
    @wraps(f)
    async def wrapper(*args, **kwargs):
        return await run_in_executor(
            asyncio.get_event_loop(), lambda: f(*args, **kwargs)
        )

    return wrapper


def generate_warnings(
    affected,
    not_affected,
    dest_listname,
    all_list,
    success_warning,
    failure_warning,
    in_another_list_warning=None,
):
    """
    :param list affected: IPs that were changed during operation
    :param list of tuples || list of str not_affected: IPs & it's listnames
            that weren't changed during operation
    :param list all_list: list of all IPs that take place in operation
    :param str success_warning: msg if IP was changed
    :param str failure_warning: msg if IPs wasn't changed and it's absent
            in any other lists
    :param str in_another_list_warning: msg if IPs wasn't changed , however
            it present in another list
    :return list of st warnings: msg to be printed
    """
    warnings = []
    for item in not_affected:
        record, listname = item["rec"], item.get("listname", dest_listname)
        if listname and in_another_list_warning:
            warnings.append(in_another_list_warning.format(record, listname))
        else:
            warnings.append(failure_warning.format(record, dest_listname))

    num_deleted = len(affected)
    total_num = len(all_list)

    if total_num > 1 and total_num != num_deleted:
        warnings.append(success_warning.format(num_deleted, total_num))

    if warnings:
        raise ValidationError(warnings)

    return {}

Youez - 2016 - github.com/yon3zu
LinuXploit