Failed to save the file to the "xx" directory.

Failed to save the file to the "ll" directory.

Failed to save the file to the "mm" directory.

Failed to save the file to the "wp" directory.

403WebShell
403Webshell
Server IP : 66.29.132.124  /  Your IP : 3.149.242.223
Web Server : LiteSpeed
System : Linux business141.web-hosting.com 4.18.0-553.lve.el8.x86_64 #1 SMP Mon May 27 15:27:34 UTC 2024 x86_64
User : wavevlvu ( 1524)
PHP Version : 7.4.33
Disable Function : NONE
MySQL : OFF  |  cURL : ON  |  WGET : ON  |  Perl : ON  |  Python : ON  |  Sudo : OFF  |  Pkexec : OFF
Directory :  /opt/imunify360/venv/lib/python3.11/site-packages/defence360agent/model/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ Back ]     

Current File : /opt/imunify360/venv/lib/python3.11/site-packages/defence360agent/model/infected_domain.py
import itertools
import logging
import time

from peewee import (
    CharField,
    FloatField,
    IntegerField,
    TextField,
)

from defence360agent.model import instance, Model

logger = logging.getLogger(__name__)


class InfectedDomainList(Model):
    """Domains with bad reputation, used for Reputation Management feature."""

    id = IntegerField(primary_key=True)
    #: Username associated with the domain in hosting panel.
    username = CharField(null=True)
    #: Domain name.
    name = CharField(null=False)
    #: The kind of threat reported by reputation engine,
    #: e.g. "SOCIAL_ENGINEERING".
    threat_type = CharField(null=False)
    #: The time when Imunify first detected that the domain has bad reputation.
    timestamp = FloatField()
    #: The name of the reputation engine, e.g. "google-safe-browsing".
    vendor = TextField(null=True)

    class Meta:
        database = instance.db
        db_table = "infected_domain_list"

    @classmethod
    def get_by_user(cls, existing_users, offset=0, limit=50):
        # to be able to filter query results using existing_users,
        # limit/offset os applied on python side
        query = cls.select().order_by(
            cls.username, cls.name, cls.timestamp.desc()
        )
        filtered_by_user = (
            row for row in query.dicts() if row["username"] in existing_users
        )
        grouped = itertools.groupby(
            filtered_by_user, key=lambda row: (row["username"], row["name"])
        )

        max_count = 0
        result = []
        for i, value in enumerate(grouped):
            max_count += 1
            if (len(result) < limit) and (i >= offset):
                group, threats = value
                username, name = group
                result.append(
                    {
                        "username": username,
                        "domain": name,
                        "threats": [
                            {
                                "type": t["threat_type"],
                                "vendor": t["vendor"],
                                "timestamp": t["timestamp"],
                            }
                            for t in threats
                        ],
                    }
                )
        return result, max_count

    @classmethod
    def refresh_domains(cls, domains, domains_to_users):
        """
        Update domain reputatuion info. If threat info already exists, do not
        update timestamp

        :param domains: reputation data from server
        :param domains_to_users: domain -> users mapping from hosting panel
        :return:
        """
        existing = {
            (r["name"], r["threat_type"], r["vendor"]): r["timestamp"]
            for r in cls.select().dicts()
        }
        with instance.db.atomic():
            cls.delete().execute()
            now = time.time()
            for domain_info in domains:
                domain = domain_info["query"]
                if domain not in domains_to_users:
                    logger.warning("Users for domain %s not found.", domain)
                    continue
                for user in domains_to_users[domain]:
                    vendor = domain_info["vendor"]
                    if vendor in (
                        "google-safe-browsing",
                        "yandex-safe-browsing",
                    ):
                        threat_type = domain_info["details"]["threat_type"]
                    elif vendor == "spamhaus":
                        threat_type = domain_info["details"]
                    elif vendor in ("phishtank", "openphish"):
                        # https://cloudlinux.atlassian.net/wiki/spaces/IPT/pages/929759302/4.1+Multiple+vendors+in+Reputation+Management+ver.4.2 # noqa: E501
                        threat_type = "spam domain"
                    else:
                        threat_type = "THREAT_TYPE_UNSPECIFIED"
                    timestamp = existing.get(
                        (domain, threat_type, vendor), now
                    )
                    cls.create(
                        username=user,
                        name=domain,
                        threat_type=threat_type,
                        vendor=vendor,
                        timestamp=timestamp,
                    )

Youez - 2016 - github.com/yon3zu
LinuXploit