Failed to save the file to the "xx" directory.

Failed to save the file to the "ll" directory.

Failed to save the file to the "mm" directory.

Failed to save the file to the "wp" directory.

403WebShell
403Webshell
Server IP : 66.29.132.124  /  Your IP : 18.222.161.119
Web Server : LiteSpeed
System : Linux business141.web-hosting.com 4.18.0-553.lve.el8.x86_64 #1 SMP Mon May 27 15:27:34 UTC 2024 x86_64
User : wavevlvu ( 1524)
PHP Version : 7.4.33
Disable Function : NONE
MySQL : OFF  |  cURL : ON  |  WGET : ON  |  Perl : ON  |  Python : ON  |  Sudo : OFF  |  Pkexec : OFF
Directory :  /opt/cloudlinux/venv/lib64/python3.11/site-packages/lvestats/lib/commons/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ Back ]     

Current File : /opt/cloudlinux/venv/lib64/python3.11/site-packages/lvestats/lib/commons/litespeed.py
# coding=utf-8
#
# Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2019 All Rights Reserved
#
# Licensed under CLOUD LINUX LICENSE AGREEMENT
# http://cloudlinux.com/docs/LICENSE.TXT

import base64
import urllib.request
import urllib.error
import urllib.parse
import ssl
import os

from lxml import etree
from lvestats.lib.commons.func import get_all_user_domains, normalize_domain


class LiteSpeedException(Exception):
    pass


class LiteSpeedDisabledException(LiteSpeedException):
    pass


class LiteSpeedInvalidCredentials(LiteSpeedException):
    pass


class LiteSpeedDataMapping(object):
    TIME = 3
    HOST = 8
    REQUEST = 14

    TOTAL_LEN = 15


class LiteSpeed(object):
    IGNORE_HOSTS = [b'_AdminVHost']
    PID_FILE_PATH = '/tmp/lshttpd/lshttpd.pid'
    HTPASSWD_PATH = '/usr/local/lsws/admin/htpasswds/status'
    HTTP_TIMEOUT = 2
    LS_ADMIN_CONFIG = "/usr/local/lsws/admin/conf/admin_config.xml"

    def __init__(self, login, password):
        self.login = login
        self.password = password

    @staticmethod
    def _get_litespeed_pid():
        """
        Returns pid that is stored in litespeed's pidfile
        :return: str
        """
        if os.path.isfile(LiteSpeed.PID_FILE_PATH) and os.path.isfile(LiteSpeed.HTPASSWD_PATH):
            with open(LiteSpeed.PID_FILE_PATH, encoding='utf-8') as f:
                return f.readline().rstrip(os.linesep)
        else:
            return None

    @staticmethod
    def is_litespeed_running():
        """
        Checks whether pid is not None.
        :return: bool
        """
        return LiteSpeed._get_litespeed_pid() is not None

    def _get_litespeed_webadmin_port(self):
        """
        Retrives current LiteSpeed webadmin console port
        :return: LiteSpeed webadmin console port as string
        """
        try:
            # Part of Litespeed config, containing console port:
            # <?xml version="1.0" encoding="UTF-8"?>
            # <adminConfig>
            #     <listenerList>
            #         <listener>
            #             <name>adminListener</name>
            #             <address>*:7080</address>
            #             <secure>0</secure>
            #         </listener>
            #     </listenerList>
            with open(self.LS_ADMIN_CONFIG, 'r', encoding='utf-8') as f:
                ls_adm_cfg = etree.parse(f).getroot()
            data = ls_adm_cfg.xpath("listenerList/listener/address")[0]
            return data.text.split(':')[1]
        except (AttributeError, IndexError, ValueError, OSError, IOError) as e:
            raise LiteSpeedException(
                "Can't determine current LiteSpeed webadmin console "
                f"port from config {self.LS_ADMIN_CONFIG}: {e}"
            ) from e

    def _get_requests(self):
        """
        Get info about connections from litespeed
        and returns array of rows with data
        :return: list
        :raise: [LiteSpeedInvalidCredentials, LiteSpeedDisabledException]
        """
        status_url = f'http://localhost:{self._get_litespeed_webadmin_port()}/status?rpt=details'
        request = urllib.request.Request(status_url)

        base64string = base64.b64encode(b'%s:%s' % (self.login.encode(), self.password.encode()))
        request.add_header(b"Authorization", b"Basic %s" % base64string)
        # get data from litespeed, check whether http code is 200
        try:
            context = ssl._create_unverified_context()  # pylint: disable=protected-access
            with urllib.request.urlopen(
                request,
                timeout=self.HTTP_TIMEOUT,
                context=context,
            ) as response:
                data = response.read()
        except urllib.error.HTTPError as e:
            if e.code in [401, 403]:
                raise LiteSpeedInvalidCredentials(
                    "Litespeed login / password invalid. "
                    "Please, try restart lvestats service."
                ) from e
            raise LiteSpeedDisabledException(str(e)) from e
        except Exception as e:  # not good, but urllib raises lot of exceptions
            raise LiteSpeedDisabledException(str(e)) from e

        # remove empty lines
        result = [row for row in data.split(os.linesep.encode()) if row.strip() != b'']
        return result

    def __is_host_valid(self, host):
        """
        Check whether host is not empty.
        :type host: str
        :return: bool
        """
        host = host.strip()
        if host and host not in self.IGNORE_HOSTS:
            return True
        return False

    def _parse_request_info(self, request: bytes):
        """
        :return: method, url, http_version
        """
        request_info = request.strip(b'"').split()
        if len(request_info) == 3:
            method, url, http_version = request_info
        elif len(request_info) == 2:
            method, url = request_info
            http_version = b''
        else:
            return None
        return method, url, http_version

    def get_user_data(self, username):
        """
        Returns information about processed by user pages.
        :param username:
        :return list[list]:
        list of the lists
        [[Pid, Domain, Http type, Path, Http version, Time],...]
        :raises: LiteSpeedDownException
        """
        data_delimiter = b'\t'

        pid = self._get_litespeed_pid()
        all_domains = get_all_user_domains(username)
        normalized_domains = set(map(normalize_domain, all_domains))

        requests = self._get_requests()
        litespeed_requests = []
        for request in requests:
            request_info = request.split(data_delimiter)
            if len(request_info) < LiteSpeedDataMapping.TOTAL_LEN:
                # that is not valid request info, skip it...
                continue

            host = request_info[LiteSpeedDataMapping.HOST]
            request = request_info[LiteSpeedDataMapping.REQUEST]

            # time since first request, seconds
            request_time = self.to_float(request_info[LiteSpeedDataMapping.TIME])

            if self.__is_host_valid(host) and \
                    normalize_domain(host.decode()) in normalized_domains:
                request_data = self._parse_request_info(request)
                if request_data is not None:
                    method, url, http_version = request_data
                    litespeed_requests.append((pid, host, method, url, http_version, request_time))

        return litespeed_requests

    @staticmethod
    def to_float(string):
        """
        Converts str to float, if can't return -1.
        :type string: str
        :rtype: float
        """
        try:
            return float(string)
        except ValueError:
            return -1.

Youez - 2016 - github.com/yon3zu
LinuXploit