Failed to save the file to the "xx" directory.

Failed to save the file to the "ll" directory.

Failed to save the file to the "mm" directory.

Failed to save the file to the "wp" directory.

403WebShell
403Webshell
Server IP : 66.29.132.124  /  Your IP : 3.144.98.43
Web Server : LiteSpeed
System : Linux business141.web-hosting.com 4.18.0-553.lve.el8.x86_64 #1 SMP Mon May 27 15:27:34 UTC 2024 x86_64
User : wavevlvu ( 1524)
PHP Version : 7.4.33
Disable Function : NONE
MySQL : OFF  |  cURL : ON  |  WGET : ON  |  Perl : ON  |  Python : ON  |  Sudo : OFF  |  Pkexec : OFF
Directory :  /opt/imunify360/venv/lib/python3.11/site-packages/defence360agent/simple_rpc/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ Back ]     

Current File : /opt/imunify360/venv/lib/python3.11/site-packages/defence360agent/simple_rpc//plesk_stats.py
import datetime
import json
from contextlib import suppress

from imav.malwarelib.model import MalwareHit
from defence360agent.contracts.license import LicenseCLN
from defence360agent.rpc_tools.lookup import RootEndpoints, bind
from defence360agent.rpc_tools.utils import run_in_executor_decorator
from defence360agent.subsys.panels.hosting_panel import HostingPanel
from defence360agent.subsys.panels.plesk.api import list_docroots_domains_users
from defence360agent.utils import atomic_rewrite, Scope
from defence360agent.subsys.panels.plesk import Plesk
from defence360agent.subsys.features import kernel_care


class PleskStatsEndpoints(RootEndpoints):
    MAX_DOMAINS_COUNT = 100

    @bind("plesk-stats")
    async def plesk_stats(self):
        panel = HostingPanel()
        assert isinstance(panel, Plesk), "only for plesk"
        current_timestamp = int(round(datetime.datetime.now().timestamp()))
        last_modified_str = str(
            datetime.datetime.fromtimestamp(
                current_timestamp,
                datetime.timezone.utc,
            )
        )
        domains_stats = await self._domains_stats(
            await list_docroots_domains_users(),
        )
        return {
            "items": {
                "last_modified": current_timestamp * 1000,
                "last_modified_str": last_modified_str,
                **domains_stats,
                **(await self._get_stats_field_in_plugin_info()),
                "license": (1 if LicenseCLN.is_valid() else 0),
            }
        }

    @classmethod
    async def _get_stats_field_in_plugin_info(cls):
        if not await kernel_care.KernelCare().check_installed():
            return {}
        plugin_info = await kernel_care.KernelCare().get_plugin_info()
        previous = {
            "effective_kernel": None,
            "first_time_update_available": None,
        }
        with suppress(FileNotFoundError):
            with open(kernel_care.KernelCare.KC_PROPERTIES) as file:
                previous = json.load(file)
        update_available = plugin_info["updateCode"] == "1"
        first_time_update_available = (
            datetime.datetime.now(tz=datetime.timezone.utc)
            if plugin_info["effectiveKernel"] != previous["effective_kernel"]
            else datetime.datetime.fromtimestamp(
                previous["first_time_update_available"], datetime.timezone.utc
            )
        )
        outdated_since_days = (
            0
            if not update_available
            else (
                datetime.datetime.now(tz=datetime.timezone.utc)
                - first_time_update_available
            ).days
        )
        atomic_rewrite(
            kernel_care.KernelCare.KC_PROPERTIES,
            json.dumps(
                {
                    "effective_kernel": plugin_info["effectiveKernel"],
                    "first_time_update_available": first_time_update_available.timestamp(),  # noqa
                }
            ),
            backup=False,
        )
        return {
            "kernel_uptodate": plugin_info["autoUpdate"],
            "outdated_since_days": outdated_since_days,
        }

    @run_in_executor_decorator
    def _domains_stats(self, plesk_response):
        file_names = list(
            MalwareHit.select(MalwareHit.orig_file)
            .where(MalwareHit.is_infected())
            .tuples()
        )
        # usually infected_users << total_users (according to ch)
        # so we can compare each hit only with docroots, whose owners are
        # marked as infected in imunify database
        infected_users = set(
            data[0]
            for data in list(
                MalwareHit.select(MalwareHit.user)
                .where(MalwareHit.is_infected())
                .distinct()
                .tuples()
            )
        )
        infected_plesk_response = list(
            filter(
                lambda data: data[2] in infected_users,
                plesk_response,
            )
        )
        infected_sites = []

        for docroot, domain, user in infected_plesk_response:
            for (filename,) in file_names:
                if filename.startswith(docroot):
                    infected_sites.append(domain)
                    break

        return {
            "infected_sites": infected_sites[: self.MAX_DOMAINS_COUNT],
            "wsites_infected": len(infected_sites),
        }

Youez - 2016 - github.com/yon3zu
LinuXploit