Failed to save the file to the "xx" directory.

Failed to save the file to the "ll" directory.

Failed to save the file to the "mm" directory.

Failed to save the file to the "wp" directory.

403WebShell
403Webshell
Server IP : 66.29.132.124  /  Your IP : 3.149.251.22
Web Server : LiteSpeed
System : Linux business141.web-hosting.com 4.18.0-553.lve.el8.x86_64 #1 SMP Mon May 27 15:27:34 UTC 2024 x86_64
User : wavevlvu ( 1524)
PHP Version : 7.4.33
Disable Function : NONE
MySQL : OFF  |  cURL : ON  |  WGET : ON  |  Perl : ON  |  Python : ON  |  Sudo : OFF  |  Pkexec : OFF
Directory :  /opt/imunify360/venv/lib/python3.11/site-packages/defence360agent/utils/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ Back ]     

Current File : /opt/imunify360/venv/lib/python3.11/site-packages/defence360agent/utils/cli.py
from collections import defaultdict
import json
import os
import subprocess
import sys
import time
import yaml

PRETTY_JSON_ARGS = {"sort_keys": True, "indent": 2, "separators": (",", ": ")}

EXITCODE_NOT_FOUND = 2
EXITCODE_WARNING = 3
EXITCODE_GENERAL_ERROR = 11

PAGERS = ["/bin/less", "/bin/more"]

SUCCESS, WARNING, ERROR = "success", "warnings", "error"  # see simple_rpc

_CLI_MSG_PREFIX = {WARNING: "WARNING", ERROR: "ERROR"}

EXIT_CODES = {
    SUCCESS: 0,
    WARNING: EXITCODE_WARNING,
    ERROR: EXITCODE_GENERAL_ERROR,
}


def pager(data):
    pager = os.environ.get(
        "PAGER", next((p for p in PAGERS if os.path.isfile(p)), None)
    )
    if pager is None:
        print(data)
    else:
        subprocess.run([pager], input=data.encode(), stdout=sys.stdout)


class TablePrinter:
    def __init__(self):
        self._headers = {}
        self._mappers = defaultdict(list)
        self._right_aligned = {}
        self._widths = {}

    def set_field_properties(
        self,
        field,
        mappers=None,
        max_width=None,
        right_align=False,
        header=None,
    ):
        if mappers:
            self._mappers[field] = mappers
        if max_width:
            self._widths[field] = max_width
        self._right_aligned[field] = right_align
        self._headers[field] = header if header else field.upper()

    def print(self, fields, items, file=sys.stdout):
        headers = [self._headers.get(field, field.upper()) for field in fields]
        widths = [len(field) for field in headers]
        rows = []
        for item in items:
            row = []
            for i, field in enumerate(fields):
                v = item.get(field)
                for mapper in self._mappers[field]:
                    v = mapper(v)
                v = str(v)
                if len(v) > widths[i]:
                    max_width = self._widths.get(field)
                    if max_width and len(v) > max_width:
                        v = v[: max_width - 3] + "..."
                    widths[i] = len(v)
                row.append(v)
            rows.append(row)
        print(self._format_row(headers, widths, False))
        for row in rows:
            print(
                self._format_row(
                    row, widths, self._right_aligned.get(field, False)
                )
            )

    @staticmethod
    def _add_padding(value, width, right_align):
        if right_align:
            return value.rjust(width)
        return value.ljust(width)

    @staticmethod
    def _format_row(columns, widths, right_aligned):
        cols = [
            TablePrinter._add_padding(value, widths[i], right_aligned)
            for i, value in enumerate(columns)
        ]
        return "  ".join(cols)


def n_a(value):
    return value if value is not None else "n/a"


def to_int(value):
    return int(value) if value is not None else value


def extract_field(field):
    def extractor(value):
        if isinstance(value, dict):
            return value.get(field)
        return value

    return extractor


def print_table(data, field_props):
    table = TablePrinter()
    for props in field_props:
        table.set_field_properties(*props)
    table.print([item[0] for item in field_props], data)


def print_incidents(data):
    field_props = (
        ("timestamp", [to_int]),
        ("abuser", [n_a]),
        ("country", [extract_field("code")]),
        ("times", [n_a]),
        ("name", [n_a]),
        ("severity", [n_a]),
    )
    print_table(data, field_props)


def add_ttl(data):
    now = int(time.time())
    for item in data:
        expiration = item.get("expiration", 0)
        if expiration > 0:
            item["ttl"] = expiration - now
        else:
            item["ttl"] = 0


def print_graylist(data):
    add_ttl(data)
    field_props = (
        ("ip",),
        ("ttl",),
        ("country", [extract_field("code")]),
    )
    print_table(data, field_props)


def print_bwlist(data):
    add_ttl(data)
    field_props = (
        ("ip",),
        ("ttl",),
        ("country", [extract_field("code")]),
        ("imported_from",),
        ("comment",),
    )
    print_table(data, field_props)


def guess_printer(data):
    if isinstance(data, (list, tuple)):
        if len(data):
            printer = TablePrinter()
            if isinstance(data[0], dict):
                keys = sorted(data[0].keys())
                printer.set_field_properties(
                    "country", mappers=[extract_field("code")]
                )
                printer.print(keys, data)
            else:
                for item in data:
                    print(item)
    else:
        print(data)


def yaml_printer(data):
    if isinstance(data, str):
        print(data)
    else:
        print(yaml.dump(data, default_flow_style=False))


def json_printer(data):
    if isinstance(data, str):
        print(data)
    else:
        print(json.dumps(data))


def hook_printer(data):
    if isinstance(data, dict):
        print("Status: {}".format(data["status"]))
    else:
        result = []
        for hook in data:
            result.append(
                "Event: {}, Path: {}{}".format(
                    hook["event"],
                    hook["path"],
                    "  native" if hook["native"] else "",
                )
            )
        print("\n".join(result))


PRINTERS = {
    ("config", "show"): json_printer,
    ("eula", "show"): pager,
    ("get",): print_incidents,
    ("whitelist",): print_bwlist,
    ("whitelist", "ip", "list"): print_bwlist,
    ("blacklist",): print_bwlist,
    ("blacklist", "ip", "list"): print_bwlist,
    ("graylist",): print_graylist,
    ("graylist", "ip", "list"): print_graylist,
    ("malware", "on-demand", "status"): yaml_printer,
    ("feature-management", "defaults"): yaml_printer,
    ("feature-management", "show"): yaml_printer,
    ("feature-management", "enable"): yaml_printer,
    ("feature-management", "disable"): yaml_printer,
    ("feature-management", "get"): yaml_printer,
    ("hook", "add"): hook_printer,
    ("hook", "delete"): hook_printer,
    ("hook", "list"): hook_printer,
    ("hook", "add-native"): hook_printer,
}


def print_response(method, result, is_json=False, is_verbose=False):
    if is_json:
        pretty_args = PRETTY_JSON_ARGS if is_verbose else {}
        print(json.dumps(result, **pretty_args))
    else:
        print_fun = PRINTERS.get(method, guess_printer)
        print_fun(result["items"] if result.get("items") is not None else "OK")


def print_warnings(data: dict):
    if not isinstance(data, dict):
        # This can happen, for example, if validation of cli args fails
        return

    for warning in data.get("warnings", []):
        print(warning, file=sys.stderr)


def print_error(
    result, messages, is_json=False, is_verbose=False, *, file=sys.stderr
):
    if is_json:
        pretty_args = PRETTY_JSON_ARGS if is_verbose else {}
        print(json.dumps({result: messages}, **pretty_args))
    else:
        if isinstance(messages, (list, tuple)):
            for msg in messages:
                print("%s: %s" % (_CLI_MSG_PREFIX[result], msg), file=file)
        else:
            print(messages, file=file)

Youez - 2016 - github.com/yon3zu
LinuXploit