Failed to save the file to the "xx" directory.

Failed to save the file to the "ll" directory.

Failed to save the file to the "mm" directory.

Failed to save the file to the "wp" directory.

403WebShell
403Webshell
Server IP : 66.29.132.124  /  Your IP : 3.133.155.253
Web Server : LiteSpeed
System : Linux business141.web-hosting.com 4.18.0-553.lve.el8.x86_64 #1 SMP Mon May 27 15:27:34 UTC 2024 x86_64
User : wavevlvu ( 1524)
PHP Version : 7.4.33
Disable Function : NONE
MySQL : OFF  |  cURL : ON  |  WGET : ON  |  Perl : ON  |  Python : ON  |  Sudo : OFF  |  Pkexec : OFF
Directory :  /opt/imunify360/venv/lib/python3.11/site-packages/defence360agent/rpc_tools/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ Back ]     

Current File : /opt/imunify360/venv/lib/python3.11/site-packages/defence360agent/rpc_tools/validate.py
import datetime
import logging
import math
import os
import re
from collections import namedtuple
from functools import wraps

from cerberus.validator import Validator
from defence360agent.contracts.config import (
    ANTIVIRUS_MODE,
    BackupRestore,
    Malware,
)
from defence360agent.contracts.license import LicenseCLN
from defence360agent.subsys.backup_systems import BackupSystem, get_backend

logger = logging.getLogger(__name__)

SHA256_REGEXP = re.compile("^[A-Fa-f0-9]{64}$")


class ValidationError(Exception):
    def __init__(self, errors, extra_data=None):
        if isinstance(errors, str):
            self.errors = [errors]
        else:
            self.errors = errors
        self.extra_data = extra_data or {}


OrderByBase = namedtuple("OrderByBase", ["column_name", "desc"])


class OrderBy(OrderByBase):
    def __new__(cls, column_name, desc):
        return super().__new__(cls, column_name, desc)

    @classmethod
    def fromstring(cls, ob_string):
        """
        :param ob_string: for example: 'user+', 'id-'
        :return:
        """
        try:
            col_name, sign = re.compile("^(.+)([+|-])").split(ob_string)[1:-1]
            return cls(col_name, sign == "-")
        except ValueError as e:
            raise ValueError(
                "Incorrect order_by: ({}): {}".format(str(e), ob_string)
            )


class SchemaValidator(Validator):
    _DATE_FORMAT = "%Y-%m-%d"

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.extra_data = {}

    def _normalize_coerce_order_by(self, value):
        if isinstance(value, OrderBy):
            return value
        return OrderBy.fromstring(value)

    def _normalize_coerce_sha256hash(self, value):
        return str(value).strip().lower()

    def _normalize_coerce_scan_db(self, value):
        if ANTIVIRUS_MODE:
            return False
        if value is None:
            return Malware.DATABASE_SCAN_ENABLED
        return value

    def _validate_type_order_by(self, value):
        if isinstance(value, OrderBy):
            return True
        return False

    def _validate_type_sha256hash(self, value: str):
        return SHA256_REGEXP.match(str(value).strip())

    def _validate_is_absolute_path(self, is_absolute_path, field, value):
        """{'type': 'boolean', 'empty': False}"""
        if is_absolute_path:
            if not os.path.isabs(value):
                self._error(field, "Path {} should be absolute".format(value))

    def _validate_isascii(self, isascii, field, value):
        """{'type': 'boolean'}"""
        if isascii:
            try:
                value.encode("ascii")
            except UnicodeEncodeError:
                self._error(field, "Must only contain ascii symbols")

    def _normalize_coerce_int(self, value):
        return int(value)

    def _normalize_default_setter_now(self, document) -> int:
        return math.ceil(datetime.datetime.now().timestamp())

    # for argparser support
    def _validate_cli(self, *args, **kwargs):
        """{'type': 'dict', 'empty': False, 'schema': {
        'users': {'type': 'list', 'allowed': ['non-root', 'root'],
            'empty': False},
        'require_rpc': {'type': 'string', 'empty': True, 'default': 'running',
                        'allowed': ['running', 'stopped', 'any', 'direct']}
        }}
        """

    # for argparser support
    def _validate_help(self, *args, **kwargs):
        """{'type': 'string', 'empty': False}"""

    # for argparser support
    def _validate_positional(self, *args, **kwargs):
        """{'type': 'boolean', 'empty': True, 'default': False}"""

    # metadata for response validation
    def _validate_return_type(self, *args, **kwargs):
        """{'type': 'string', 'empty': True}"""

    def _validate_cli_only(self, *args, **kwargs):
        """{'type': 'boolean', 'empty': False, 'default': False}"""

    def _validate_envvar(self, *args, **kwargs):
        """
        Parameter can be passed via the specified environment variable.
        The value specified via a CLI argument takes precedence.

        The rule's arguments are validated against this schema:
        {'type': 'string', 'empty': False}
        """

    def _validate_envvar_only(self, *args, **kwargs):
        """
        Parameter will only be accepted if provided via environment
        variable specified by `envvar`. It will be rejected if passed as
        a CLI argument.

        The rule's arguments are validated against this schema:
        {'type': 'boolean', 'default': False}
        """

    def _normalize_coerce_path(self, value: str):
        if value:
            return os.path.abspath(value)

        return value

    def _normalize_coerce_backup_system(self, value):
        if isinstance(value, BackupSystem):
            return value

        return get_backend(value)

    def _validator_backup_is_enabled(self, field, value):
        if not (BackupRestore.ENABLED and BackupRestore.backup_system()):
            self._error(field, "Backup is not enabled!")


def validate(validator, hashable, params):
    values = validator.normalized(
        {hashable: params}, always_return_document=True
    )
    if not validator.validate({hashable: values[hashable]}):
        logger.warning(
            "Validation error with command {}, params {}, errors {}".format(
                hashable, params, validator.errors
            )
        )
        raise ValidationError(validator.errors, validator.extra_data)

    return validator.document[hashable]


def validate_middleware(validator):
    def wrapped(f):
        @wraps(f)
        async def wrapper(request, *args, **kwargs):
            hashable = tuple(request["command"])
            request["params"] = validate(
                validator, hashable, request["params"]
            )
            result = await f(request, *args, **kwargs)
            return result

        return wrapper

    return wrapped


def validate_av_plus_license(func):
    """
    Decorator for CLI commands methods that ensures that the AV+ license
    is valid.

    :raises ValidationError:
    """
    exception = ValidationError("ImunifyAV+ license required")

    @wraps(func)
    async def async_wrapper(*args, **kwargs):
        if LicenseCLN.is_valid_av_plus():
            return await func(*args, **kwargs)
        raise exception

    if ANTIVIRUS_MODE:
        return async_wrapper
    return func

Youez - 2016 - github.com/yon3zu
LinuXploit