Failed to save the file to the "xx" directory.

Failed to save the file to the "ll" directory.

Failed to save the file to the "mm" directory.

Failed to save the file to the "wp" directory.

403WebShell
403Webshell
Server IP : 66.29.132.124  /  Your IP : 3.142.198.70
Web Server : LiteSpeed
System : Linux business141.web-hosting.com 4.18.0-553.lve.el8.x86_64 #1 SMP Mon May 27 15:27:34 UTC 2024 x86_64
User : wavevlvu ( 1524)
PHP Version : 7.4.33
Disable Function : NONE
MySQL : OFF  |  cURL : ON  |  WGET : ON  |  Perl : ON  |  Python : ON  |  Sudo : OFF  |  Pkexec : OFF
Directory :  /proc/self/root/opt/imunify360/venv/lib/python3.11/site-packages/imav/malwarelib/subsys/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ Back ]     

Current File : /proc/self/root/opt/imunify360/venv/lib/python3.11/site-packages/imav/malwarelib/subsys/malware.py
"""
This program is free software: you can redistribute it and/or modify it under
the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License,
or (at your option) any later version.


This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. 
See the GNU General Public License for more details.


You should have received a copy of the GNU General Public License
 along with this program.  If not, see <https://www.gnu.org/licenses/>.

Copyright © 2019 Cloud Linux Software Inc.

This software is also available under ImunifyAV commercial license,
see <https://www.imunify360.com/legal/eula>
"""
import asyncio
import binascii
import functools
import os
import pwd
import shutil
import time
from collections import defaultdict
from logging import getLogger
from pathlib import Path
from typing import (
    Callable,
    Collection,
    Dict,
    Iterable,
    List,
    TYPE_CHECKING,
    Tuple,
    TypeVar,
    Union,
    cast,
)

from peewee import IntegrityError

from defence360agent.contracts.config import (
    Core,
    HackerTrap,
    MyImunifyConfig,
    UserType,
    choose_use_backups_start_from_date,
    choose_value_from_config,
    should_try_autorestore_malicious,
)
from defence360agent.contracts.permissions import (
    MS_CONFIG_DEFAULT_ACTION_EDIT,
    has_permission,
    myimunify_protection_enabled,
)
from defence360agent.internals.global_scope import g
from defence360agent.model.simplification import run_in_executor
from defence360agent.subsys import web_server
from defence360agent.subsys.panels import hosting_panel
from defence360agent.subsys.panels.base import (
    ModsecVendorsError,
    PanelException,
)
from defence360agent.utils import (
    COPY_TO_MODSEC_MAXTRIES,
    LazyLock,
    atomic_rewrite,
    base64_decode_filename,
    base64_encode_filename,
    log_failed_to_copy_to_modsec,
    retry_on,
    safe_sequence,
)
from imav.contracts.messages import (
    MalwareCleanupRevert,
    MalwareCleanupTask,
)
from imav.malwarelib.config import (
    ADDED_TO_IGNORE,
    CLEANUP,
    CLEANUP_DONE,
    CLEANUP_ON_SCHEDULE,
    CLEANUP_REMOVED,
    DELETED_FROM_IGNORE,
    FAILED_TO_CLEANUP,
    FAILED_TO_DELETE_FROM_IGNORE,
    FAILED_TO_IGNORE,
    FAILED_TO_RESTORE_FROM_BACKUP,
    FAILED_TO_RESTORE_ORIGINAL,
    FAILED_TO_STORE_ORIGINAL,
    FOUND,
    MalwareEvent,
    MalwareEventPostponed,
    MalwareHitStatus,
    MalwareScanResourceType,
    MalwareScanType,
    NOTIFY,
    REQUIRES_MYIMUNIFY_PROTECTION,
    RESTORED_FROM_BACKUP,
    RESTORED_ORIGINAL,
    SUBMITTED_FOR_ANALYSIS,
    UNABLE_TO_CLEANUP,
)
from imav.malwarelib.model import (
    MalwareHistory,
    MalwareHit,
    MalwareHitAlternate,
    MalwareIgnorePath,
    MalwareScan,
)
from imav.malwarelib.scan.mds.report import MalwareDatabaseHitInfo
from imav.malwarelib.subsys.restore_from_backup import restore_files
from imav.malwarelib.utils import hash_path
from imav.malwarelib.utils.submit import submit_in_background
from imav.plugins.event_hook_executor import detected_hook

if TYPE_CHECKING:
    from imav.malwarelib.cleanup.storage import RestoreReport

logger = getLogger(__name__)
#: a type for generic path functions
PathLike = Union[str, bytes, os.PathLike]
#: Declare type variable, to be used in generic functions
T = TypeVar("T")
#: Type variable for generic apply_default_action function
HitInfoType = TypeVar(
    "HitInfoType", MalwareHitAlternate, MalwareDatabaseHitInfo
)


def update_malware_history(coro):
    """Decorator responsible for logging malware events into DB"""

    @functools.wraps(coro)
    async def async_wrapper(
        cls,
        path,
        file_owner,
        file_user,
        initiator=None,
        cause=None,
        resource_type=None,
        app_name=None,
        db_host=None,
        db_port=None,
        db_name=None,
        table_name=None,
        table_field=None,
        table_row_inf=None,
        scan_id=None,
        **kwargs,
    ):
        result = await coro(
            cls,
            path=path,
            file_owner=file_owner,
            file_user=file_user,
            initiator=initiator or UserType.ROOT,
            app_name=app_name,
            resource_type=resource_type,
            db_host=db_host,
            db_port=db_port,
            db_name=db_name,
            scan_id=scan_id,
            **kwargs,
        )

        await run_in_executor(
            asyncio.get_event_loop(),
            lambda: MalwareHistory.save_event(
                event=result.title,
                path=path,
                app_name=app_name,
                resource_type=resource_type,
                file_owner=file_owner,
                file_user=file_user,
                initiator=initiator,
                cause=cause,
                db_host=db_host,
                db_port=db_port,
                db_name=db_name,
                table_name=table_name,
                table_field=table_field,
                table_row_inf=table_row_inf,
                scan_id=scan_id,
            ),
        )
        return result

    @functools.wraps(coro)
    def wrapper(
        cls,
        path,
        file_owner,
        file_user,
        initiator=None,
        cause=None,
        resource_type=None,
        app_name=None,
        db_host=None,
        db_port=None,
        db_name=None,
        table_name=None,
        table_field=None,
        table_row_inf=None,
        scan_id=None,
        **kwargs,
    ):
        result = coro(  # coro is a ordinary function here
            cls,
            path=path,
            file_owner=file_owner,
            file_user=file_user,
            initiator=initiator or UserType.ROOT,
            app_name=app_name,
            resource_type=resource_type,
            db_host=db_host,
            db_port=db_port,
            db_name=db_name,
            scan_id=scan_id,
            **kwargs,
        )

        MalwareHistory.save_event(
            event=result.title,
            path=path,
            app_name=app_name,
            resource_type=resource_type,
            file_owner=file_owner,
            file_user=file_user,
            initiator=initiator,
            cause=cause,
            db_host=db_host,
            db_port=db_port,
            db_name=db_name,
            table_name=table_name,
            table_field=table_field,
            table_row_inf=table_row_inf,
            scan_id=scan_id,
        )
        return result

    return async_wrapper if asyncio.iscoroutinefunction(coro) else wrapper


def multiple_update_malware_history(coro):
    """
    Decorator responsible for logging multiple malware events into DB at once.
    Decorated function accepts an iterable of `MalwareHit`s.
    """

    async def wrapper(
        cls, hits: Iterable[MalwareHit], initiator=None, cause=None
    ):
        results = await asyncio.gather(
            *(
                coro(
                    cls,
                    path=hit.orig_file,
                    file_owner=hit.owner,
                    file_user=hit.user,
                )
                for hit in hits
            )
        )
        if not results:
            return results
        MalwareHistory.save_events(
            [
                {
                    "event": result.title,
                    "path": hit.orig_file,
                    "resource_type": hit.resource_type,
                    "app_name": hit.app_name,
                    "file_owner": hit.owner,
                    "file_user": hit.user,
                    "cause": cause or MalwareScanType.MANUAL,
                    "initiator": initiator or UserType.ROOT,
                    "db_host": hit.db_host,
                    "db_port": hit.db_port,
                    "db_name": hit.db_name,
                    "scan_id": hit.scanid,
                }
                for hit, result in zip(hits, results)
            ]
        )
        return results

    return wrapper


def bulk_update_malware_history(coro):
    """
    Decorator responsible for logging multiple malware events into DB at once.
    Decorated function accepts an iterable of `MalwareHit`s.
    """

    async def wrapper(
        cls, hits: Iterable[MalwareHit], cause=None, initiator=None, **kwargs
    ):
        hit_results = await coro(cls, hits, **kwargs)
        if not hit_results:
            return hit_results
        MalwareHistory.save_events(
            [
                {
                    "event": result.title,
                    "path": hit.orig_file,
                    "file_owner": hit.owner,
                    "file_user": hit.user,
                    "cause": cause or MalwareScanType.MANUAL,
                    "initiator": initiator or UserType.ROOT,
                }
                for hit, result in hit_results.items()
            ]
        )
        return hit_results

    return wrapper


def choose_action_for_malicious(username: str) -> Tuple[str, str]:
    if MyImunifyConfig.ENABLED:
        if not myimunify_protection_enabled(username):
            return NOTIFY, username

    if has_permission(MS_CONFIG_DEFAULT_ACTION_EDIT, username):
        return choose_value_from_config(
            "MALWARE_SCANNING", "default_action", username
        )

    return choose_value_from_config("MALWARE_SCANNING", "default_action")


class MalwareAction:
    """
    Responsible for manipulations with malware files.
    As long as each handler function is wrapped in `update_malware_history`,
    arguments should be passed in kwargs form.
    """

    _CALLBACK = defaultdict(set)

    @classmethod
    async def run_callbacks_for(cls, method_name, path, title):
        """Execute callback for specific action"""

        for callback in cls._CALLBACK[method_name]:
            try:
                await callback(path, MalwareEvent(title))
            except asyncio.CancelledError:
                raise
            except Exception as e:
                logger.exception(
                    "Error '{!r}' happened when run callback {} for"
                    "MalwareAction {} method".format(e, callback, method_name)
                )

    @classmethod
    def add_callback(cls, method_name, coro):
        cls._CALLBACK[method_name].add(coro)

    @classmethod
    @update_malware_history
    async def submit_for_analysis(
        cls, path, type, reason=None, **_
    ) -> MalwareEvent:
        submit_in_background(path, type, reason)
        return MalwareEvent(SUBMITTED_FOR_ANALYSIS)

    @classmethod
    @update_malware_history
    async def ignore(cls, path, resource_type, **_) -> MalwareEvent:
        try:
            await run_in_executor(
                asyncio.get_event_loop(),
                lambda: MalwareIgnorePath.create(
                    path=path, resource_type=resource_type
                ),
            )
        except IntegrityError:
            title = FAILED_TO_IGNORE
        else:
            title = ADDED_TO_IGNORE
        return MalwareEvent(title)

    @classmethod
    @update_malware_history
    def delete_from_ignore_sync(cls, path, **_) -> MalwareEvent:
        deleted = (
            MalwareIgnorePath.delete()
            .where(MalwareIgnorePath.path == path)
            .execute()
        )
        return MalwareEvent(
            DELETED_FROM_IGNORE if deleted else FAILED_TO_DELETE_FROM_IGNORE
        )

    @classmethod
    @update_malware_history
    async def notify(cls, *_, **__):
        # TODO: should be sending email here, but not implemented yet
        return MalwareEvent(FOUND)

    @classmethod
    @update_malware_history
    async def cleanup_failed_restore(cls, *_, **__):
        return MalwareEvent(FAILED_TO_RESTORE_ORIGINAL)

    @classmethod
    @update_malware_history
    async def cleanup_failed_store(cls, *_, **__):
        return MalwareEvent(FAILED_TO_STORE_ORIGINAL)

    @classmethod
    @update_malware_history
    async def cleanup_restored_original(
        cls, *_, initiator: str, report: "RestoreReport" = None, **__
    ):
        if report and (sink := g.get("sink")):
            report.initiator = initiator
            await sink.process_message(MalwareCleanupRevert(report.to_dict()))
        return MalwareEvent(RESTORED_ORIGINAL)

    @classmethod
    @multiple_update_malware_history
    async def cleanup_unable(cls, *_, **__):
        return MalwareEvent(UNABLE_TO_CLEANUP)

    @classmethod
    @multiple_update_malware_history
    async def cleanup_done(cls, path, *_, **__):
        await cls.run_callbacks_for("cleanup", path, CLEANUP_DONE)
        return MalwareEvent(CLEANUP_DONE)

    @classmethod
    @multiple_update_malware_history
    async def cleanup_removed(cls, *_, **__):
        return MalwareEvent(CLEANUP_REMOVED)

    @classmethod
    @multiple_update_malware_history
    async def cleanup_failed(cls, *_, **__):
        return MalwareEvent(FAILED_TO_CLEANUP)

    @classmethod
    @multiple_update_malware_history
    async def cleanup_requires_myimunify_protection(cls, *_, **__):
        return MalwareEvent(REQUIRES_MYIMUNIFY_PROTECTION)

    @classmethod
    async def apply_default_action(
        cls,
        hits: List[MalwareHitAlternate],
        initiator=None,
        cause=None,
        sink=None,
        **kwargs,
    ) -> List[Tuple[MalwareHitAlternate, MalwareEvent, str, bool]]:
        """Perform action with malware which user set in the config"""

        results = []
        for h in hits:
            _, config_owner = choose_action_for_malicious(username=h.user)
            event = await cls.notify(
                file_owner=h.owner,
                file_user=h.user,
                path=h.orig_file,
                initiator=initiator or config_owner,
                cause=cause,
                **kwargs,
            )
            results.append((h, event, NOTIFY, False))

        return results

    @classmethod
    async def multiple(cls, action, hits):
        """
        Apply the action to multiple hits
        :param action: thr action to apply
        :param hits: list of hits
        """
        for hit in hits:
            await action(hit.orig_file, hit.user)

    @classmethod
    def _get_tmp_dir(cls, file_owner):
        hp = hosting_panel.HostingPanel()
        try:
            user = pwd.getpwnam(file_owner)
        except (KeyError, TypeError):
            return Core.TMPDIR
        try:
            tmp_dir = str(hp.base_home_dir(user.pw_dir))
        except (RuntimeError, FileNotFoundError):
            return Core.TMPDIR

        return tmp_dir

    @classmethod
    def _split_hits_on_restore(cls, hits):
        to_restore = []
        not_restore = []
        for hit in hits:
            path = hit.orig_file
            file_ctime = None
            try:
                file_ctime = int(os.path.getctime(path))
            except FileNotFoundError:
                logger.warning(
                    "File %s not found during restore from backup process",
                    safe_sequence.path(path),
                )

            if (
                file_ctime is None
                or MalwareHistory.select()
                .where(
                    MalwareHistory.path == path,
                    MalwareHistory.event == FAILED_TO_RESTORE_FROM_BACKUP,
                    MalwareHistory.ctime >= file_ctime,
                )
                .first()
                is None
            ):
                to_restore.append(hit)
            else:
                not_restore.append(hit)

        return to_restore, not_restore

    @classmethod
    @bulk_update_malware_history
    async def restore_from_backup(
        cls, hits, **kwargs
    ) -> Dict[MalwareHit, MalwareEvent]:
        to_restore, not_restore = cls._split_hits_on_restore(hits)

        for f in not_restore:
            logger.warning(
                "File %s wasn't restored from backup"
                ", because last restore attempt failed",
                safe_sequence.path(f.orig_file),
            )

        user_hits = {}  # Dict[str, List[MalwareHit]]
        for hit in to_restore:
            user_hits.setdefault(hit.user, []).append(hit)

        res = {}  # type: Dict[MalwareHit, MalwareEvent]
        for user, _hits in user_hits.items():
            res.update(
                await cls._restore_from_backup(
                    _hits, file_owner=user, **kwargs
                )
            )
        res.update(
            (hit, MalwareEvent(FAILED_TO_RESTORE_FROM_BACKUP))
            for hit in not_restore
        )
        return res

    @classmethod
    async def _restore_from_backup(
        cls, hits, file_owner, sink=None, **_
    ) -> List[Tuple[MalwareHit, MalwareEvent]]:
        paths = [h.orig_file for h in hits]

        tmp_dir = cls._get_tmp_dir(file_owner)

        restored, failed = await restore_files(
            files=paths,
            until=choose_use_backups_start_from_date(file_owner),
            sink=sink,
            tmp_dir=tmp_dir,
        )
        res = []

        restored_hits = [h for h in hits if h.orig_file in restored]
        failed_hits = [h for h in hits if h.orig_file in failed]

        for p in restored:
            safe_path = safe_sequence.path(p)
            logger.info("File %s was restored from backup", safe_path)

        title = RESTORED_FROM_BACKUP
        res.extend([(rh, MalwareEvent(title)) for rh in restored_hits])

        for p in failed:
            safe_path = safe_sequence.path(p)
            logger.warning("File %s wasn't restored from backup", safe_path)

        title = FAILED_TO_RESTORE_FROM_BACKUP
        res.extend([(fh, MalwareEvent(title)) for fh in failed_hits])

        return res


def subscribe_to_malware_action(action, coro):
    MalwareAction.add_callback(action, coro)


class HackerTrapHitsSaver:
    BASE_DIR = HackerTrap.DIR
    BASE_PD_DIR = HackerTrap.DIR_PD
    NAME = HackerTrap.NAME
    MAX_HITS_COUNT = 1000  # lets do 1000 files for now, see how it works
    SECONDS_BEFORE_CLEAN = 24 * 60 * 60  # 24 hours between cleanups
    STANDALONE_MARK = "-SA-"
    LOCK = LazyLock()

    @classmethod
    def _filepath(cls, filename=None) -> Path:
        name = filename or cls.NAME
        return Path(cls.BASE_DIR, name)

    @classmethod
    def _clean_filepath(cls) -> Path:
        return Path(cls.BASE_DIR, cls.NAME + ".clean")

    @classmethod
    def _write(cls, file_list: List[Path], filename=None):
        try:
            atomic_rewrite(
                cls._filepath(filename),
                b"\n".join(base64_encode_filename(name) for name in file_list),
                backup=False,
                allow_empty_content=True,
                permissions=0o644,
            )
        except OSError as oe:
            logger.error("Unable to write HackerTrap file: %r", oe)

    @classmethod
    def _extend(cls, file_list: List[T], files_to_add: List[T]) -> List[T]:
        """
        adds files_to_add to file_list
        the method has side_effect (file_list will be modified)
        yet, given that it is private class method -- we can do it
        :param file_list: existing files
        :param files_to_add: files to add
        :return: joined list, limited to MAX_HITS_COUNT
        """
        file_set = set(file_list)  # we will use it to speed up lookups
        _file_list = file_list.copy()
        for file in files_to_add:
            # if we are re-adding file, re-add it at the bottom,
            # so it doesn't rotate out too fast
            if file in file_set:
                _file_list.remove(file)
            _file_list.append(file)
        return _file_list[-cls.MAX_HITS_COUNT :]

    @staticmethod
    def _clean_list(file_list: Iterable[PathLike]) -> List[PathLike]:
        """
        This method checks if any of the files on the list is present
        and removes that entry from the list
        :param file_list: list of files
        :return: new list of files, in the same order, with files that exist
        skipped
        """
        return [file for file in file_list if not os.path.exists(file)]

    @classmethod
    def _should_clean(cls, file_mtime, current_time):
        return current_time - file_mtime > cls.SECONDS_BEFORE_CLEAN

    @classmethod
    def _clean_file(cls, file_list: Iterable[PathLike]):
        """
        We will use extra file to track last time we cleaned
        For that we will use mtime of that file
        :param file_list: list to clean
        :return: cleaned list
        """
        p = cls._clean_filepath()
        if p.exists():
            if cls._should_clean(p.stat().st_mtime, time.time()):
                p.write_bytes(b"")
                file_list = cls._clean_list(file_list)
        else:
            p.write_bytes(b"")

        return file_list

    @classmethod
    def _read(cls, filename=None, skip_exists=True) -> List[Path]:
        try:
            file_list: List[bytes] = (
                cls._filepath(filename).read_bytes().split()
            )
            decoded_file_list: List[Path] = []
            for file in file_list:
                try:
                    decoded_file_list.append(base64_decode_filename(file))
                except binascii.Error as e:
                    logger.error(
                        "Can't decode filepath [%r] with error [%r]", file, e
                    )
            return (
                cls._clean_file(decoded_file_list)
                if skip_exists
                else decoded_file_list
            )

        except FileNotFoundError:
            return []

    @classmethod
    async def add_hits(cls, files_to_add: List[Path], *args, **kwargs):
        """Same behavior as for separate hit."""

        await cls._add_hits(files_to_add, *args, **kwargs)
        await cls.update_sa_hits(files_to_add=[], files_to_remove=files_to_add)

    @classmethod
    async def _add_hits(cls, files_to_add: List[Path], *args, **kwargs):
        try:
            file_list: List[Path] = cls._read()
            result: List[Path] = cls._extend(file_list, files_to_add)
            cls._write(result)
            await cls._copy_to_modsec_rules(cls.NAME)
        except OSError as oe:
            logger.error("Unable to read HackerTrap file %r", oe)

    @classmethod
    async def add_hit(cls, file_to_add: Path, *args, **kwargs):
        """When storing separate hit it needs to be added to
        malware_found_b64.list
        and excluded from malware_sa_found_b64.list as well from
        proactive/dangerous/[hash]"""
        return await cls.add_hits([file_to_add])

    @classmethod
    async def init(cls):
        await cls.add_hits([])

    @classmethod
    @retry_on(
        FileNotFoundError,
        max_tries=COPY_TO_MODSEC_MAXTRIES,
        on_error=log_failed_to_copy_to_modsec,
        silent=True,
    )
    async def _copy_to_modsec_rules(cls, malware_list_name):
        hp = hosting_panel.HostingPanel()

        try:
            vendor = await hp.get_i360_vendor_name()
        except (ModsecVendorsError, PanelException) as e:
            logger.warning(str(e))
            return False
        try:
            target = await hp.build_vendor_file_path(vendor, malware_list_name)
        except ModsecVendorsError as e:
            logger.exception("Can't get malware found list file: %s", e)
            return False

        found_list = Path(HackerTrap.DIR, malware_list_name)
        target_tmp = target.with_suffix(target.suffix + ".tmp")

        if (
            target.exists()
            and target.stat().st_size == found_list.stat().st_size
            and target.read_bytes() == found_list.read_bytes()
        ):
            logger.info("Nothing to update")
            return False

        try:
            shutil.copy(str(found_list), str(target_tmp))
            target_tmp.rename(target)
            return True
        except FileNotFoundError as e:
            raise e
        except OSError as e:
            logger.error("Failed to copy malware found list: %s", e)
            return False

    @classmethod
    def _get_exists_hash_files(cls):
        with os.scandir(cls.BASE_PD_DIR) as it:
            return [entry.name for entry in it if entry.is_file()]

    @classmethod
    def _create_hash_files(cls, files):
        for fname in files:
            (Path(cls.BASE_PD_DIR) / Path(fname)).touch(0o644)

    @classmethod
    def _remove_hash_files(cls, files):
        for fname in files:
            (Path(cls.BASE_PD_DIR) / Path(fname)).unlink()

    @classmethod
    def _update_sa_hash_files(cls):
        """
        SA hits stored for PD as sha256 hash of full path in
        HackerTrap.DIR_PD. Not more than MAX_HITS_COUNT files in dir.
        Remove older (by mtime) files first.
        """
        try:
            saved_files_list = cls._read(
                filename=HackerTrap.SA_NAME, skip_exists=False
            )
            hash_file_list = [
                hash_path(path) for path in saved_files_list if path
            ]
            exists_hash_file_list = cls._get_exists_hash_files()
            files_to_create = set(hash_file_list) - set(exists_hash_file_list)
            files_to_delete = set(exists_hash_file_list) - set(hash_file_list)
            cls._create_hash_files(files_to_create)
            cls._remove_hash_files(files_to_delete)
        except OSError as e:
            logger.warning(
                "HackerTrap error: %r%s",
                e,
                f" ({e.filename!r})" if e.filename else "",
            )

    @classmethod
    def _update_sa_hit_list(
        cls, files_to_add: List[Path], files_to_remove: List[Path]
    ) -> bool:
        """
        Update file of malware standalone list.
        Return True if malware standalone list was changed otherwise False.
        """
        try:
            saved_list: List[Path] = cls._read(
                filename=HackerTrap.SA_NAME, skip_exists=False
            )
            extended_list: List[Path] = cls._extend(saved_list, files_to_add)
            updated_list = [
                path for path in extended_list if path not in files_to_remove
            ]
            if updated_list != saved_list:
                cls._write(updated_list, filename=HackerTrap.SA_NAME)
                return True
        except OSError as e:
            logger.error("HackerTrap error: %s", e)
        return False

    @classmethod
    async def update_sa_hits(
        cls, files_to_add: List[Path], files_to_remove: List[Path]
    ):
        if files_to_add or files_to_remove:
            async with cls.LOCK:
                if cls._update_sa_hit_list(files_to_add, files_to_remove):
                    if await cls._copy_to_modsec_rules(HackerTrap.SA_NAME):
                        await web_server.graceful_restart()
                    cls._update_sa_hash_files()

    @classmethod
    async def reset_sa_hits(cls):
        """
        Re-populate HackerTrap records using data from database
        """
        # WARN: It is critically important to check the 'resource_type'!
        # In some cases when scanning DB for malwares the results contain
        # '-SA-' mark in the 'type' column. For instance:
        #   SMW-SA-20634-php.bkdr.wp.fakeplugin-0
        # What happens next:
        # 1) New 'MalwareHit' records appear, with 'resource_type'=="DB" and
        # 'orig_file'=="path to a root directory".
        # 2) The config 'malware_standalone_b64.list' gets these paths to root
        # directories, instead of paths to scripts.
        # 3) The action 'pmFromFile' in the modsec rule 77316817 (and some
        # others) matches 'SCRIPT_FILENAME' variable with lines in the config.
        # 4) The matching in the modsec module is not a strict comparison,
        # but the occurrence of a string within a string.
        # For instance, when the config contains the line:
        #   /home/domain/public_html
        # Than all the paths are match with it:
        #   /home/domain/public_html/admin.php
        #   /home/domain/public_html/cms/main.php
        # As the result of all above, the modsec rule makes false-positive
        # conclusion and blocks the request.
        # To prevent that, the 'resource_type' must be equal to 'FILE'.
        resource_type = MalwareScanResourceType.FILE.value
        async with cls.LOCK:
            files = (
                MalwareHit.select(MalwareHit.orig_file)
                .where(
                    # Only standalone malicious files that was found,
                    # but not yet cleared/restored
                    MalwareHit.status.in_(
                        [
                            MalwareHitStatus.FOUND,
                            MalwareHitStatus.CLEANUP_STARTED,
                            MalwareHitStatus.RESTORE_FROM_BACKUP_STARTED,
                        ]
                    ),
                    MalwareHit.malicious,
                    MalwareHit.type.contains(cls.STANDALONE_MARK),
                    MalwareHit.resource_type == resource_type,
                )
                .order_by(MalwareHit.timestamp.desc())
                .limit(cls.MAX_HITS_COUNT)
                .tuples()
            )
            cls._write(
                [os.fsencode(f) for [f] in files], filename=HackerTrap.SA_NAME
            )
            if await cls._copy_to_modsec_rules(HackerTrap.SA_NAME):
                await web_server.graceful_restart()
            cls._update_sa_hash_files()


class MalwareActionIm360(MalwareAction):
    @classmethod
    def _get_handler(cls, action) -> Callable:
        possible_actions = {
            NOTIFY: cls.notify,
            CLEANUP: cls.postpone(
                MalwareCleanupTask,
                post_action=cls.detect,
                action=CLEANUP,
            ),
            CLEANUP_ON_SCHEDULE: cls.postpone(
                MalwareCleanupTask,
                post_action=cls.detect,
                action=CLEANUP_ON_SCHEDULE,
            ),
        }
        try:
            result = possible_actions[action]
        except KeyError:
            result = possible_actions[NOTIFY]
            logger.error(
                "There is no such action '%s'. Config is invalid", action
            )
        return result

    @staticmethod
    def postpone(message, **kwargs):
        async def wrapper(*_, initiator, cause, **__):
            return MalwareEventPostponed(
                message, initiator=initiator, cause=cause, **kwargs
            )

        return wrapper

    @classmethod
    async def detect(cls, scan_id, sink, **_):
        scan = MalwareScan.get(scanid=scan_id)
        await detected_hook(
            sink,
            scan_id,
            scan.type,
            scan.started,
            scan.path,
            scan.total_resources,
        )

    @classmethod
    @bulk_update_malware_history
    async def restore_from_backup(
        cls, hits, **kwargs
    ) -> Dict[MalwareHit, MalwareEvent]:
        to_restore, not_restore = cls._split_hits_on_restore(hits)

        for f in not_restore:
            logger.warning(
                "File %s wasn't restored from backup"
                ", because last restore attempt failed",
                safe_sequence.path(f.orig_file),
            )

        user_hits = {}  # type: Dict[str, List[MalwareHit]]
        for hit in to_restore:
            user_hits.setdefault(hit.user, []).append(hit)

        res = {}  # type: Dict[MalwareHit, MalwareEvent]
        for user, _hits in user_hits.items():
            res.update(
                await cls._restore_from_backup(
                    _hits, file_owner=user, **kwargs
                )
            )
        res.update(
            (hit, MalwareEvent(FAILED_TO_RESTORE_FROM_BACKUP))
            for hit in not_restore
        )
        return res

    @classmethod
    def _split_hits_on_restore(cls, hits):
        to_restore = []
        not_restore = []
        for hit in hits:
            path = hit.orig_file
            file_ctime = None
            try:
                file_ctime = int(os.path.getctime(path))
            except FileNotFoundError:
                logger.warning(
                    "File %s not found during restore from backup process",
                    safe_sequence.path(path),
                )

            if (
                file_ctime is None
                or MalwareHistory.select()
                .where(
                    MalwareHistory.path == path,
                    MalwareHistory.event == FAILED_TO_RESTORE_FROM_BACKUP,
                    MalwareHistory.ctime >= file_ctime,
                )
                .first()
                is None
            ):
                to_restore.append(hit)
            else:
                not_restore.append(hit)

        return to_restore, not_restore

    @classmethod
    async def _restore_from_backup(
        cls, hits, file_owner, sink=None, **_
    ) -> List[Tuple[MalwareHit, MalwareEvent]]:
        paths = [h.orig_file for h in hits]

        tmp_dir = cls._get_tmp_dir(file_owner)

        restored, failed = await restore_files(
            files=paths,
            until=choose_use_backups_start_from_date(file_owner),
            sink=sink,
            tmp_dir=tmp_dir,
        )
        res = []

        restored_hits = [h for h in hits if h.orig_file in restored]
        failed_hits = [h for h in hits if h.orig_file in failed]

        for p in restored:
            safe_path = safe_sequence.path(p)
            logger.info("File %s was restored from backup", safe_path)

        title = RESTORED_FROM_BACKUP
        res.extend([(rh, MalwareEvent(title)) for rh in restored_hits])

        for p in failed:
            safe_path = safe_sequence.path(p)
            logger.warning("File %s wasn't restored from backup", safe_path)

        title = FAILED_TO_RESTORE_FROM_BACKUP
        res.extend([(fh, MalwareEvent(title)) for fh in failed_hits])

        return res

    @classmethod
    def _get_tmp_dir(cls, file_owner):
        hp = hosting_panel.HostingPanel()
        try:
            user = pwd.getpwnam(file_owner)
        except (KeyError, TypeError):
            return Core.TMPDIR
        try:
            tmp_dir = str(hp.base_home_dir(user.pw_dir))
        except (RuntimeError, FileNotFoundError):
            return Core.TMPDIR

        return tmp_dir

    @classmethod
    async def apply_default_action(
        cls,
        hits: Collection[HitInfoType],
        initiator=None,
        cause=None,
        sink=None,
        resource_type=None,
        **kwargs,
    ) -> List[Tuple[HitInfoType, MalwareEvent, str, bool]]:
        """Perform action with malware which user set in the config"""

        to_restore = [
            hit
            for hit in hits
            if should_try_autorestore_malicious(hit.user)
            # restore from backup does not apply to db scans
            and not isinstance(hit, MalwareDatabaseHitInfo)
        ]
        restore_events = await cls.restore_from_backup(
            to_restore, initiator=initiator, sink=sink, cause=cause, **kwargs
        )
        # FIXME: remove this mapping
        # when we start to store UID instead of username in the db
        panel_users = set(await hosting_panel.HostingPanel().get_users())
        uid_to_name = {
            pw.pw_uid: pw.pw_name
            for pw in pwd.getpwall()
            if pw.pw_name in panel_users
        }
        res = []
        for hit in hits:
            if isinstance(hit, MalwareDatabaseHitInfo):
                owner = uid_to_name.get(hit.owner, str(hit.owner))
                user = uid_to_name.get(hit.user, str(hit.user))
                path = cast(MalwareDatabaseHitInfo, hit).path
            else:
                owner = hit.owner
                user = hit.user
                path = cast(MalwareHitAlternate, hit).orig_file

            action, config_owner = choose_action_for_malicious(user)

            if hit in restore_events and restore_events[hit].successful:
                res.append((hit, restore_events[hit], action, True))
                continue

            handler_kw_args = kwargs.copy()
            if isinstance(hit, MalwareDatabaseHitInfo):
                handler_kw_args["db_name"] = hit.db_name
                handler_kw_args["db_host"] = hit.db_host
                handler_kw_args["db_port"] = hit.db_port

                handler_kw_args["table_name"] = hit.table_name
                handler_kw_args["table_field"] = hit.table_field
                handler_kw_args["table_row_inf"] = hit.table_row_inf

                handler_kw_args["scan_id"] = hit.scan_id

            handler = cls._get_handler(action)
            event = await handler(
                path=path,
                file_owner=owner,
                file_user=user,
                cause=cause,
                initiator=initiator or config_owner,
                sink=sink,
                app_name=hit.app_name,
                resource_type=resource_type,
                **handler_kw_args,
            )

            res.append((hit, event, action, False))

        return res

Youez - 2016 - github.com/yon3zu
LinuXploit