Failed to save the file to the "xx" directory.

Failed to save the file to the "ll" directory.

Failed to save the file to the "mm" directory.

Failed to save the file to the "wp" directory.

403WebShell
403Webshell
Server IP : 66.29.132.124  /  Your IP : 3.145.53.196
Web Server : LiteSpeed
System : Linux business141.web-hosting.com 4.18.0-553.lve.el8.x86_64 #1 SMP Mon May 27 15:27:34 UTC 2024 x86_64
User : wavevlvu ( 1524)
PHP Version : 7.4.33
Disable Function : NONE
MySQL : OFF  |  cURL : ON  |  WGET : ON  |  Perl : ON  |  Python : ON  |  Sudo : OFF  |  Pkexec : OFF
Directory :  /opt/imunify360/venv/lib/python3.11/site-packages/peewee_migrate/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ Back ]     

Current File : /opt/imunify360/venv/lib/python3.11/site-packages/peewee_migrate/cli.py
"""CLI integration."""
from __future__ import annotations

import logging
import re
import sys
from pathlib import Path
from typing import TYPE_CHECKING, List, Optional, Pattern, Union

import click
from playhouse.db_url import connect

from .logs import logger
from .models import MIGRATE_TABLE
from .router import Router

if TYPE_CHECKING:
    from peewee_migrate.types import TParams

CLEAN_RE: Pattern = re.compile(r"\s+$", re.M)
VERBOSE: List[str] = ["WARNING", "INFO", "DEBUG", "NOTSET"]


def get_router(
    directory: Optional[Union[str, Path]] = None,
    database: Optional[str] = None,
    migratetable: str = MIGRATE_TABLE,
    verbose: int = 0,
) -> Router:
    """Load and initialize a router."""
    config: TParams = {}
    logging_level: str = VERBOSE[verbose]
    ignore = schema = None

    if directory:
        directory = Path(directory)
        try:
            with directory.joinpath("conf.py").open() as cfg:
                code = compile(cfg.read(), "<string>", "exec", dont_inherit=True)
                exec(code, config, config)
                database = config.get("DATABASE", database)
                ignore = config.get("IGNORE", ignore)
                schema = config.get("SCHEMA", schema)
                migratetable = config.get("MIGRATE_TABLE", migratetable)
                logging_level = config.get("LOGGING_LEVEL", logging_level).upper()

        except IOError:
            pass

    if isinstance(database, str):
        database = connect(database)

    logger.setLevel(logging_level)

    if not database:
        logger.error("Database is undefined")
        return sys.exit(1)

    try:
        return Router(
            database,
            migrate_table=migratetable,
            migrate_dir=directory,
            ignore=ignore,
            schema=schema,
        )
    except RuntimeError:
        logger.exception("Failed to initialize router")
        return sys.exit(1)


@click.group()
def cli():
    """Migrate database with Peewee ORM."""
    logging.basicConfig(level=logging.INFO)


@cli.command()
@click.option("--name", default=None, help="Select migration")
@click.option("--database", default=None, help="Database connection")
@click.option("--directory", default="migrations", help="Directory where migrations are stored")
@click.option("--fake", is_flag=True, default=False, help="Run migration as fake.")
@click.option("--migratetable", default="migratehistory", help="Migration table.")
@click.option("-v", "--verbose", count=True)
def migrate(  # noqa:
    name: Optional[str] = None,
    database: Optional[str] = None,
    directory: Optional[str] = None,
    migratetable: str = MIGRATE_TABLE,
    verbose: int = 0,
    fake: bool = False,  # noqa:
):
    """Migrate database."""
    router = get_router(directory, database, migratetable, verbose)
    migrations = router.run(name, fake=fake)
    if migrations:
        click.echo("Migrations completed: %s" % ", ".join(migrations))


@cli.command()
@click.argument("name")
@click.option(
    "--auto",
    default=False,
    is_flag=True,
    help="Scan sources and create db migrations automatically. Supports autodiscovery.",
)
@click.option(
    "--auto-source",
    default=None,
    help=(
        "Set to python module path for changes autoscan (e.g. 'package.models'). "
        "Current directory will be recursively scanned by default."
    ),
)
@click.option("--database", default=None, help="Database connection")
@click.option("--directory", default="migrations", help="Directory where migrations are stored")
@click.option("--migratetable", default="migratehistory", help="Migration table.")
@click.option("-v", "--verbose", count=True)
def create(  # noqa:
    name: Optional[str] = None,
    database: Optional[str] = None,
    directory: Optional[str] = None,
    migratetable: Optional[str] = None,
    verbose: int = 0,
    auto: bool = False,  # noqa:
    auto_source: Optional[str] = None,
):
    """Create a migration."""
    router: Router = get_router(directory, database, migratetable or MIGRATE_TABLE, verbose)
    router.create(name or "auto", auto=auto_source if auto and auto_source else auto)


@cli.command()
@click.option(
    "--count",
    required=False,
    default=1,
    type=int,
    help="Number of last migrations to be rolled back.Ignored in case of non-empty name",
)
@click.option("--database", default=None, help="Database connection")
@click.option("--directory", default="migrations", help="Directory where migrations are stored")
@click.option("--migratetable", default="migratehistory", help="Migration table.")
@click.option("-v", "--verbose", count=True)
def rollback(
    database: Optional[str] = None,
    directory: Optional[str] = None,
    migratetable: Optional[str] = None,
    verbose: int = 0,
    count: int = 1,
):
    """Rollback a migration with the given steps --count of last migrations as integer number"""
    router: Router = get_router(directory, database, migratetable or MIGRATE_TABLE, verbose)
    if len(router.done) < count:
        raise RuntimeError(
            "Unable to rollback %s migrations from %s: %s" % (count, len(router.done), router.done)
        )
    for _ in range(count):
        router.rollback()


@cli.command()
@click.option("--database", default=None, help="Database connection")
@click.option("--directory", default="migrations", help="Directory where migrations are stored")
@click.option("--migratetable", default="migratehistory", help="Migration table.")
@click.option("-v", "--verbose", count=True)
def list(  # noqa:
    database: Optional[str] = None,
    directory: Optional[str] = None,
    migratetable: Optional[str] = None,
    verbose: int = 0,
):
    """List migrations."""
    router: Router = get_router(directory, database, migratetable or MIGRATE_TABLE, verbose)
    click.echo("Migrations are done:")
    click.echo("\n".join(router.done))
    click.echo("")
    click.echo("Migrations are undone:")
    click.echo("\n".join(router.diff))


@cli.command()
@click.option("--database", default=None, help="Database connection")
@click.option("--directory", default="migrations", help="Directory where migrations are stored")
@click.option("--migratetable", default="migratehistory", help="Migration table.")
@click.option("-v", "--verbose", count=True)
def merge(
    database: Optional[str] = None,
    directory: Optional[str] = None,
    migratetable: Optional[str] = None,
    verbose: int = 0,
):
    """Merge migrations into one."""
    router: Router = get_router(directory, database, migratetable or MIGRATE_TABLE, verbose)
    router.merge()

Youez - 2016 - github.com/yon3zu
LinuXploit