Failed to save the file to the "xx" directory.

Failed to save the file to the "ll" directory.

Failed to save the file to the "mm" directory.

Failed to save the file to the "wp" directory.

403WebShell
403Webshell
Server IP : 66.29.132.124  /  Your IP : 18.227.46.202
Web Server : LiteSpeed
System : Linux business141.web-hosting.com 4.18.0-553.lve.el8.x86_64 #1 SMP Mon May 27 15:27:34 UTC 2024 x86_64
User : wavevlvu ( 1524)
PHP Version : 7.4.33
Disable Function : NONE
MySQL : OFF  |  cURL : ON  |  WGET : ON  |  Perl : ON  |  Python : ON  |  Sudo : OFF  |  Pkexec : OFF
Directory :  /opt/imunify360/venv/lib/python3.11/site-packages/cerberus/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ Back ]     

Current File : /opt/imunify360/venv/lib/python3.11/site-packages/cerberus/schema.py
from __future__ import absolute_import

from warnings import warn

from cerberus import errors
from cerberus.platform import (
    _str_type,
    Callable,
    Hashable,
    Mapping,
    MutableMapping,
    Sequence,
)
from cerberus.utils import (
    get_Validator_class,
    validator_factory,
    mapping_hash,
    TypeDefinition,
)


class _Abort(Exception):
    pass


class SchemaError(Exception):
    """
    Raised when the validation schema is missing, has the wrong format or contains
    errors."""

    pass


class DefinitionSchema(MutableMapping):
    """A dict-subclass for caching of validated schemas."""

    def __new__(cls, *args, **kwargs):
        if 'SchemaValidator' not in globals():
            global SchemaValidator
            SchemaValidator = validator_factory('SchemaValidator', SchemaValidatorMixin)
            types_mapping = SchemaValidator.types_mapping.copy()
            types_mapping.update(
                {
                    'callable': TypeDefinition('callable', (Callable,), ()),
                    'hashable': TypeDefinition('hashable', (Hashable,), ()),
                }
            )
            SchemaValidator.types_mapping = types_mapping

        return super(DefinitionSchema, cls).__new__(cls)

    def __init__(self, validator, schema):
        """
        :param validator: An instance of Validator-(sub-)class that uses this
                          schema.
        :param schema: A definition-schema as ``dict``. Defaults to an empty
                       one.
        """
        if not isinstance(validator, get_Validator_class()):
            raise RuntimeError('validator argument must be a Validator-' 'instance.')
        self.validator = validator

        if isinstance(schema, _str_type):
            schema = validator.schema_registry.get(schema, schema)

        if not isinstance(schema, Mapping):
            try:
                schema = dict(schema)
            except Exception:
                raise SchemaError(errors.SCHEMA_ERROR_DEFINITION_TYPE.format(schema))

        self.validation_schema = SchemaValidationSchema(validator)
        self.schema_validator = SchemaValidator(
            None,
            allow_unknown=self.validation_schema,
            error_handler=errors.SchemaErrorHandler,
            target_schema=schema,
            target_validator=validator,
        )

        schema = self.expand(schema)
        self.validate(schema)
        self.schema = schema

    def __delitem__(self, key):
        _new_schema = self.schema.copy()
        try:
            del _new_schema[key]
        except ValueError:
            raise SchemaError("Schema has no field '%s' defined" % key)
        except Exception as e:
            raise e
        else:
            del self.schema[key]

    def __getitem__(self, item):
        return self.schema[item]

    def __iter__(self):
        return iter(self.schema)

    def __len__(self):
        return len(self.schema)

    def __repr__(self):
        return str(self)

    def __setitem__(self, key, value):
        value = self.expand({0: value})[0]
        self.validate({key: value})
        self.schema[key] = value

    def __str__(self):
        if hasattr(self, "schema"):
            return str(self.schema)
        else:
            return "No schema data is set yet."

    def copy(self):
        return self.__class__(self.validator, self.schema.copy())

    @classmethod
    def expand(cls, schema):
        try:
            schema = cls._expand_logical_shortcuts(schema)
            schema = cls._expand_subschemas(schema)
        except Exception:
            pass

        # TODO remove this with the next major release
        schema = cls._rename_deprecated_rulenames(schema)

        return schema

    @classmethod
    def _expand_logical_shortcuts(cls, schema):
        """
        Expand agglutinated rules in a definition-schema.

        :param schema: The schema-definition to expand.
        :return: The expanded schema-definition.
        """

        def is_of_rule(x):
            return isinstance(x, _str_type) and x.startswith(
                ('allof_', 'anyof_', 'noneof_', 'oneof_')
            )

        for field, rules in schema.items():
            for of_rule in [x for x in rules if is_of_rule(x)]:
                operator, rule = of_rule.split('_', 1)
                rules.update({operator: []})
                for value in rules[of_rule]:
                    rules[operator].append({rule: value})
                del rules[of_rule]
        return schema

    @classmethod
    def _expand_subschemas(cls, schema):
        def has_schema_rule():
            return isinstance(schema[field], Mapping) and 'schema' in schema[field]

        def has_mapping_schema():
            """
            Tries to determine heuristically if the schema-constraints are aimed to
            mappings.
            """
            try:
                return all(
                    isinstance(x, Mapping) for x in schema[field]['schema'].values()
                )
            except TypeError:
                return False

        for field in schema:
            if not has_schema_rule():
                pass
            elif has_mapping_schema():
                schema[field]['schema'] = cls.expand(schema[field]['schema'])
            else:  # assumes schema-constraints for a sequence
                schema[field]['schema'] = cls.expand({0: schema[field]['schema']})[0]

            # TODO remove the last two values in the tuple with the next major release
            for rule in ('keysrules', 'valuesrules', 'keyschema', 'valueschema'):
                if rule in schema[field]:
                    schema[field][rule] = cls.expand({0: schema[field][rule]})[0]

            for rule in ('allof', 'anyof', 'items', 'noneof', 'oneof'):
                if rule in schema[field]:
                    if not isinstance(schema[field][rule], Sequence):
                        continue
                    new_rules_definition = []
                    for item in schema[field][rule]:
                        new_rules_definition.append(cls.expand({0: item})[0])
                    schema[field][rule] = new_rules_definition
        return schema

    def get(self, item, default=None):
        return self.schema.get(item, default)

    def items(self):
        return self.schema.items()

    def update(self, schema):
        try:
            schema = self.expand(schema)
            _new_schema = self.schema.copy()
            _new_schema.update(schema)
            self.validate(_new_schema)
        except ValueError:
            raise SchemaError(errors.SCHEMA_ERROR_DEFINITION_TYPE.format(schema))
        except Exception as e:
            raise e
        else:
            self.schema = _new_schema

    # TODO remove with next major release
    @staticmethod
    def _rename_deprecated_rulenames(schema):
        for field, rules in schema.items():
            if isinstance(rules, str):  # registry reference
                continue

            for old, new in (
                ('keyschema', 'keysrules'),
                ('validator', 'check_with'),
                ('valueschema', 'valuesrules'),
            ):
                if old not in rules:
                    continue

                if new in rules:
                    raise RuntimeError(
                        "The rule '{new}' is also present with its old "
                        "name '{old}' in the same set of rules."
                    )

                warn(
                    "The rule '{old}' was renamed to '{new}'. The old name will "
                    "not be available in the next major release of "
                    "Cerberus.".format(old=old, new=new),
                    DeprecationWarning,
                )
                schema[field][new] = schema[field][old]
                schema[field].pop(old)

        return schema

    def regenerate_validation_schema(self):
        self.validation_schema = SchemaValidationSchema(self.validator)

    def validate(self, schema=None):
        """
        Validates a schema that defines rules against supported rules.

        :param schema: The schema to be validated as a legal cerberus schema
                       according to the rules of the assigned Validator object.
                       Raises a :class:`~cerberus.base.SchemaError` when an invalid
                       schema is encountered.
        """
        if schema is None:
            schema = self.schema
        _hash = (mapping_hash(schema), mapping_hash(self.validator.types_mapping))
        if _hash not in self.validator._valid_schemas:
            self._validate(schema)
            self.validator._valid_schemas.add(_hash)

    def _validate(self, schema):
        if isinstance(schema, _str_type):
            schema = self.validator.schema_registry.get(schema, schema)

        test_schema = {}
        for field, rules in schema.items():
            if isinstance(rules, _str_type):
                test_schema[field] = rules_set_registry.get(rules, rules)
            else:
                test_rules = {}
                for rule, constraint in rules.items():
                    test_rules[rule.replace(" ", "_")] = constraint
                test_schema[field] = test_rules

        if not self.schema_validator(test_schema, normalize=False):
            raise SchemaError(self.schema_validator.errors)


class UnvalidatedSchema(DefinitionSchema):
    def __init__(self, schema={}):
        if not isinstance(schema, Mapping):
            schema = dict(schema)
        self.schema = schema

    def validate(self, schema):
        pass

    def copy(self):
        # Override ancestor's copy, because
        # UnvalidatedSchema does not have .validator:
        return self.__class__(self.schema.copy())


class SchemaValidationSchema(UnvalidatedSchema):
    def __init__(self, validator):
        self.schema = {
            'allow_unknown': False,
            'schema': validator.rules,
            'type': 'dict',
        }


class SchemaValidatorMixin(object):
    """
    This validator mixin provides mechanics to validate schemas passed to a Cerberus
    validator.
    """

    def __init__(self, *args, **kwargs):
        kwargs.setdefault('known_rules_set_refs', set())
        kwargs.setdefault('known_schema_refs', set())
        super(SchemaValidatorMixin, self).__init__(*args, **kwargs)

    @property
    def known_rules_set_refs(self):
        """The encountered references to rules set registry items."""
        return self._config['known_rules_set_refs']

    @property
    def known_schema_refs(self):
        """The encountered references to schema registry items."""
        return self._config['known_schema_refs']

    @property
    def target_schema(self):
        """The schema that is being validated."""
        return self._config['target_schema']

    @property
    def target_validator(self):
        """The validator whose schema is being validated."""
        return self._config['target_validator']

    def _check_with_bulk_schema(self, field, value):
        # resolve schema registry reference
        if isinstance(value, _str_type):
            if value in self.known_rules_set_refs:
                return
            else:
                self.known_rules_set_refs.add(value)
            definition = self.target_validator.rules_set_registry.get(value)
            if definition is None:
                self._error(field, 'Rules set definition %s not found.' % value)
                return
            else:
                value = definition

        _hash = (
            mapping_hash({'turing': value}),
            mapping_hash(self.target_validator.types_mapping),
        )
        if _hash in self.target_validator._valid_schemas:
            return

        validator = self._get_child_validator(
            document_crumb=field,
            allow_unknown=False,
            schema=self.target_validator.rules,
        )
        validator(value, normalize=False)
        if validator._errors:
            self._error(validator._errors)
        else:
            self.target_validator._valid_schemas.add(_hash)

    def _check_with_dependencies(self, field, value):
        if isinstance(value, _str_type):
            pass
        elif isinstance(value, Mapping):
            validator = self._get_child_validator(
                document_crumb=field,
                schema={'valuesrules': {'type': 'list'}},
                allow_unknown=True,
            )
            if not validator(value, normalize=False):
                self._error(validator._errors)
        elif isinstance(value, Sequence):
            if not all(isinstance(x, Hashable) for x in value):
                path = self.document_path + (field,)
                self._error(path, 'All dependencies must be a hashable type.')

    def _check_with_items(self, field, value):
        for i, schema in enumerate(value):
            self._check_with_bulk_schema((field, i), schema)

    def _check_with_schema(self, field, value):
        try:
            value = self._handle_schema_reference_for_validator(field, value)
        except _Abort:
            return

        _hash = (mapping_hash(value), mapping_hash(self.target_validator.types_mapping))
        if _hash in self.target_validator._valid_schemas:
            return

        validator = self._get_child_validator(
            document_crumb=field, schema=None, allow_unknown=self.root_allow_unknown
        )
        validator(self._expand_rules_set_refs(value), normalize=False)
        if validator._errors:
            self._error(validator._errors)
        else:
            self.target_validator._valid_schemas.add(_hash)

    def _check_with_type(self, field, value):
        value = set((value,)) if isinstance(value, _str_type) else set(value)
        invalid_constraints = value - set(self.target_validator.types)
        if invalid_constraints:
            self._error(
                field, 'Unsupported types: {}'.format(', '.join(invalid_constraints))
            )

    def _expand_rules_set_refs(self, schema):
        result = {}
        for k, v in schema.items():
            if isinstance(v, _str_type):
                result[k] = self.target_validator.rules_set_registry.get(v)
            else:
                result[k] = v
        return result

    def _handle_schema_reference_for_validator(self, field, value):
        if not isinstance(value, _str_type):
            return value
        if value in self.known_schema_refs:
            raise _Abort

        self.known_schema_refs.add(value)
        definition = self.target_validator.schema_registry.get(value)
        if definition is None:
            path = self.document_path + (field,)
            self._error(path, 'Schema definition {} not found.'.format(value))
            raise _Abort
        return definition

    def _validate_logical(self, rule, field, value):
        """{'allowed': ('allof', 'anyof', 'noneof', 'oneof')}"""
        if not isinstance(value, Sequence):
            self._error(field, errors.BAD_TYPE)
            return

        validator = self._get_child_validator(
            document_crumb=rule,
            allow_unknown=False,
            schema=self.target_validator.validation_rules,
        )

        for constraints in value:
            _hash = (
                mapping_hash({'turing': constraints}),
                mapping_hash(self.target_validator.types_mapping),
            )
            if _hash in self.target_validator._valid_schemas:
                continue

            validator(constraints, normalize=False)
            if validator._errors:
                self._error(validator._errors)
            else:
                self.target_validator._valid_schemas.add(_hash)


####


class Registry(object):
    """
    A registry to store and retrieve schemas and parts of it by a name that can be used
    in validation schemas.

    :param definitions: Optional, initial definitions.
    :type definitions: any :term:`mapping`
    """

    def __init__(self, definitions={}):
        self._storage = {}
        self.extend(definitions)

    def add(self, name, definition):
        """
        Register a definition to the registry. Existing definitions are replaced
        silently.

        :param name: The name which can be used as reference in a validation
                     schema.
        :type name: :class:`str`
        :param definition: The definition.
        :type definition: any :term:`mapping`
        """
        self._storage[name] = self._expand_definition(definition)

    def all(self):
        """
        Returns a :class:`dict` with all registered definitions mapped to their name.
        """
        return self._storage

    def clear(self):
        """Purge all definitions in the registry."""
        self._storage.clear()

    def extend(self, definitions):
        """
        Add several definitions at once. Existing definitions are
        replaced silently.

        :param definitions: The names and definitions.
        :type definitions: a :term:`mapping` or an :term:`iterable` with
                           two-value :class:`tuple` s
        """
        for name, definition in dict(definitions).items():
            self.add(name, definition)

    def get(self, name, default=None):
        """
        Retrieve a definition from the registry.

        :param name: The reference that points to the definition.
        :type name: :class:`str`
        :param default: Return value if the reference isn't registered.
        """
        return self._storage.get(name, default)

    def remove(self, *names):
        """
        Unregister definitions from the registry.

        :param names: The names of the definitions that are to be
                      unregistered.
        """
        for name in names:
            self._storage.pop(name, None)


class SchemaRegistry(Registry):
    @classmethod
    def _expand_definition(cls, definition):
        return DefinitionSchema.expand(definition)


class RulesSetRegistry(Registry):
    @classmethod
    def _expand_definition(cls, definition):
        return DefinitionSchema.expand({0: definition})[0]


schema_registry, rules_set_registry = SchemaRegistry(), RulesSetRegistry()

Youez - 2016 - github.com/yon3zu
LinuXploit