Failed to save the file to the "xx" directory.

Failed to save the file to the "ll" directory.

Failed to save the file to the "mm" directory.

Failed to save the file to the "wp" directory.

403WebShell
403Webshell
Server IP : 66.29.132.124  /  Your IP : 18.119.19.251
Web Server : LiteSpeed
System : Linux business141.web-hosting.com 4.18.0-553.lve.el8.x86_64 #1 SMP Mon May 27 15:27:34 UTC 2024 x86_64
User : wavevlvu ( 1524)
PHP Version : 7.4.33
Disable Function : NONE
MySQL : OFF  |  cURL : ON  |  WGET : ON  |  Perl : ON  |  Python : ON  |  Sudo : OFF  |  Pkexec : OFF
Directory :  /opt/imunify360/venv/lib/python3.11/site-packages/peewee_migrate/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ Back ]     

Current File : /opt/imunify360/venv/lib/python3.11/site-packages/peewee_migrate/auto.py
"""Automatically create migrations."""
from __future__ import annotations

from collections.abc import Hashable
from typing import (
    TYPE_CHECKING,
    Any,
    Callable,
    Dict,
    Final,
    Iterable,
    List,
    Optional,
    Type,
    Union,
    cast,
)

import peewee as pw
from playhouse.reflection import Column as VanilaColumn

if TYPE_CHECKING:
    from .migrator import Migrator
    from .types import TModelType, TParams

INDENT: Final = "    "
NEWLINE: Final = "\n" + INDENT
FIELD_MODULES_MAP: Final = {
    "ArrayField": "pw_pext",
    "BinaryJSONField": "pw_pext",
    "DateTimeTZField": "pw_pext",
    "HStoreField": "pw_pext",
    "IntervalField": "pw_pext",
    "JSONField": "pw_pext",
    "TSVectorField": "pw_pext",
}
PW_MODULES: Final = "playhouse.postgres_ext", "playhouse.fields", "peewee"


def fk_to_params(field: pw.ForeignKeyField) -> TParams:
    """Get params from the given fk."""
    params = {}
    if field.on_delete is not None:
        params["on_delete"] = f"'{field.on_delete}'"

    if field.on_update is not None:
        params["on_update"] = f"'{field.on_update}'"

    return params


def dtf_to_params(field: pw.DateTimeField) -> TParams:
    """Get params from the given datetime field."""
    params = {}
    if not isinstance(field.formats, list):
        params["formats"] = field.formats

    return params


FIELD_TO_PARAMS: Dict[Type[pw.Field], Callable[[Any], TParams]] = {
    pw.CharField: lambda f: {"max_length": f.max_length},
    pw.DecimalField: lambda f: {
        "max_digits": f.max_digits,
        "decimal_places": f.decimal_places,
        "auto_round": f.auto_round,
        "rounding": f.rounding,
    },
    pw.ForeignKeyField: fk_to_params,
    pw.DateTimeField: dtf_to_params,
}


class Column(VanilaColumn):
    """Get field's migration parameters."""

    field_class: Type[pw.Field]

    def __init__(self, field: pw.Field, **kwargs):
        super(Column, self).__init__(
            field.name,
            find_field_type(field),
            field.field_type,
            field.null,
            primary_key=field.primary_key,
            column_name=field.column_name,
            index=field.index,
            unique=field.unique,
            extra_parameters=None,
        )
        if field.default is not None and not callable(field.default):
            self.default = repr(field.default)

        if self.field_class in FIELD_TO_PARAMS:
            if self.extra_parameters is None:  # type: ignore[has-type]
                self.extra_parameters = {}

            self.extra_parameters.update(FIELD_TO_PARAMS[self.field_class](field))

        self.rel_model = None
        self.to_field = None

        if isinstance(field, pw.ForeignKeyField):
            self.to_field = field.rel_field.name
            self.rel_model = (
                "'self'"
                if field.rel_model == field.model
                else "migrator.orm['%s']" % field.rel_model._meta.table_name
            )

    def get_field(self, space: str = " ") -> str:
        """Generate the field definition for this column."""
        field = super(Column, self).get_field()
        module = FIELD_MODULES_MAP.get(self.field_class.__name__, "pw")
        name, _, field = [s and s.strip() for s in field.partition("=")]
        return "{name}{space}={space}{module}.{field}".format(
            name=name, field=field, space=space, module=module
        )

    def get_field_parameters(self) -> TParams:
        """Generate parameters for self field."""
        params = super(Column, self).get_field_parameters()
        if self.default is not None:
            params["default"] = self.default
        return params


def diff_one(model1: TModelType, model2: TModelType, **kwargs) -> List[str]:  # noqa:
    """Find difference between given peewee models."""
    changes = []

    meta1, meta2 = model1._meta, model2._meta  # type: ignore[]
    field_names1 = meta1.fields
    field_names2 = meta2.fields

    # Add fields
    names1 = set(field_names1) - set(field_names2)
    if names1:
        fields = [field_names1[name] for name in names1]
        changes.append(create_fields(model1, *fields, **kwargs))

    # Drop fields
    names2 = set(field_names2) - set(field_names1)
    if names2:
        changes.append(drop_fields(model1, *names2))

    # Change fields
    fields_ = []
    nulls_ = []
    indexes_ = []
    for name in set(field_names1) - names1 - names2:
        field1, field2 = field_names1[name], field_names2[name]
        diff = compare_fields(field1, field2)
        null = diff.pop("null", None)
        index = diff.pop("index", None)

        if diff:
            fields_.append(field1)

        if null is not None:
            nulls_.append((name, null))

        if index is not None:
            indexes_.append((name, index[0], index[1]))

    if fields_:
        changes.append(change_fields(model1, *fields_, **kwargs))

    for name, null in nulls_:
        changes.append(change_not_null(model1, name, null=null))

    for name, index, unique in indexes_:
        if index is True or unique is True:
            if field_names2[name].unique or field_names2[name].index:
                changes.append(drop_index(model1, name))
            changes.append(add_index(model1, name, unique=unique))
        else:
            changes.append(drop_index(model1, name))

    # Check additional compound indexes
    indexes1 = meta1.indexes
    indexes2 = meta2.indexes

    # Drop compound indexes
    indexes_to_drop = set(indexes2) - set(indexes1)
    for index in indexes_to_drop:
        if isinstance(index[0], (list, tuple)) and len(index[0]) > 1:
            changes.append(drop_index(model1, name=index[0]))

    # Add compound indexes
    indexes_to_add = set(indexes1) - set(indexes2)
    for index in indexes_to_add:
        if isinstance(index[0], (list, tuple)) and len(index[0]) > 1:
            changes.append(add_index(model1, name=index[0], unique=index[1]))

    return changes


def diff_many(
    models1: List[TModelType],
    models2: List[TModelType],
    migrator: Optional[Migrator] = None,
    *,
    reverse=False,
) -> List[str]:
    """Calculate changes for migrations from models2 to models1."""
    models1 = cast(List["TModelType"], pw.sort_models(models1))  # type: ignore[]
    models2 = cast(List["TModelType"], pw.sort_models(models2))  # type: ignore[]

    if reverse:
        models1 = list(reversed(models1))
        models2 = list(reversed(models2))

    models_map1 = {cast(str, m._meta.table_name): m for m in models1}  # type: ignore[]
    models_map2 = {cast(str, m._meta.table_name): m for m in models2}  # type: ignore[]

    changes: List[str] = []

    for name, model1 in models_map1.items():
        if name not in models_map2:
            continue
        changes.extend(diff_one(model1, models_map2[name], migrator=migrator))

    # Add models
    for name in [m for m in models_map1 if m not in models_map2]:
        changes.append(create_model(models_map1[name], migrator=migrator))

    # Remove models
    for name in [m for m in models_map2 if m not in models_map1]:
        changes.append(remove_model(models_map2[name]))

    return changes


def model_to_code(model_cls: TModelType, **kwargs) -> str:
    """Generate migrations for the given model."""
    template = "class {classname}(pw.Model):\n{fields}\n\n{meta}"
    meta = model_cls._meta  # type: ignore[]
    fields = INDENT + NEWLINE.join(
        [
            field_to_code(field, **kwargs)
            for field in meta.sorted_fields
            if not (isinstance(field, pw.PrimaryKeyField) and field.name == "id")
        ]
    )
    meta = INDENT + NEWLINE.join(
        filter(
            None,
            [
                "class Meta:",
                f'{INDENT}table_name = "{meta.table_name}"',
                f'{INDENT}schema = "{meta.schema}"' if meta.schema else "",
                (
                    f"{INDENT}primary_key = pw.CompositeKey{meta.primary_key.field_names!r}"
                    if isinstance(meta.primary_key, pw.CompositeKey)
                    else ""
                ),
                f"{INDENT}indexes = {meta.indexes!r}" if meta.indexes else "",
            ],
        )
    )

    return template.format(classname=model_cls.__name__, fields=fields, meta=meta)


def create_model(model_cls: TModelType, **kwargs) -> str:
    """Generate migrations to create model."""
    return "@migrator.create_model\n" + model_to_code(model_cls, **kwargs)


def remove_model(model_cls: TModelType, **kwargs) -> str:
    """Generate migrations to remove model."""
    meta = model_cls._meta  # type: ignore[]
    return "migrator.remove_model('%s')" % meta.table_name


def create_fields(model_cls: TModelType, *fields: pw.Field, **kwargs) -> str:
    """Generate migrations to add fields."""
    meta = model_cls._meta  # type: ignore[]
    return "migrator.add_fields(%s'%s', %s)" % (
        NEWLINE,
        meta.table_name,
        NEWLINE
        + ("," + NEWLINE).join([field_to_code(field, space=False, **kwargs) for field in fields]),
    )


def drop_fields(model_cls: TModelType, *fields: pw.Field, **kwargs) -> str:
    """Generate migrations to remove fields."""
    meta = model_cls._meta  # type: ignore[]
    return "migrator.remove_fields('%s', %s)" % (
        meta.table_name,
        ", ".join(map(repr, fields)),
    )


def field_to_code(field: pw.Field, *, space: bool = True, **kwargs) -> str:
    """Generate field description."""
    col = Column(field, **kwargs)
    return col.get_field(" " if space else "")


def compare_fields(field1: pw.Field, field2: pw.Field, **kwargs) -> Dict:
    """Find diffs between the given fields."""
    ftype1, ftype2 = find_field_type(field1), find_field_type(field2)
    if ftype1 != ftype2:
        return {"cls": True}

    params1 = field_to_params(field1)
    params1["null"] = field1.null
    params2 = field_to_params(field2)
    params2["null"] = field2.null

    return dict(set(params1.items()) - set(params2.items()))


def field_to_params(field: pw.Field, **kwargs) -> TParams:
    """Generate params for the given field."""
    ftype = find_field_type(field)
    params = FIELD_TO_PARAMS.get(ftype, lambda f: {})(field)
    if (
        field.default is not None
        and not callable(field.default)
        and isinstance(field.default, Hashable)
    ):
        params["default"] = field.default

    params["index"] = field.index and not field.unique, field.unique

    params.pop("backref", None)  # Ignore backref
    return params


def change_fields(model_cls: TModelType, *fields: pw.Field, **kwargs) -> str:
    """Generate migrations to change fields."""
    meta = model_cls._meta  # type: ignore[]
    return "migrator.change_fields('%s', %s)" % (
        meta.table_name,
        ("," + NEWLINE).join([field_to_code(f, space=False) for f in fields]),
    )


def change_not_null(model_cls: TModelType, name: str, *, null: bool) -> str:
    """Generate migrations."""
    meta = model_cls._meta  # type: ignore[]
    operation = "drop_not_null" if null else "add_not_null"
    return "migrator.%s('%s', %s)" % (operation, meta.table_name, repr(name))


def add_index(model_cls: TModelType, name: Union[str, Iterable[str]], *, unique: bool) -> str:
    """Generate migrations."""
    meta = model_cls._meta  # type: ignore[]
    columns = repr(name).strip("()[]")
    return f"migrator.add_index('{meta.table_name}', {columns}, unique={unique})"


def drop_index(model_cls: TModelType, name: Union[str, Iterable[str]]) -> str:
    """Generate migrations."""
    meta = model_cls._meta  # type: ignore[]
    columns = repr(name).strip("()[]")
    return f"migrator.drop_index('{meta.table_name}', {columns})"


def find_field_type(field: pw.Field) -> Type[pw.Field]:
    ftype = type(field)
    if ftype.__module__ not in PW_MODULES:
        for cls in ftype.mro():
            if cls.__module__ in PW_MODULES:
                return cls

    return ftype

Youez - 2016 - github.com/yon3zu
LinuXploit