Failed to save the file to the "xx" directory.

Failed to save the file to the "ll" directory.

Failed to save the file to the "mm" directory.

Failed to save the file to the "wp" directory.

403WebShell
403Webshell
Server IP : 66.29.132.124  /  Your IP : 18.117.231.160
Web Server : LiteSpeed
System : Linux business141.web-hosting.com 4.18.0-553.lve.el8.x86_64 #1 SMP Mon May 27 15:27:34 UTC 2024 x86_64
User : wavevlvu ( 1524)
PHP Version : 7.4.33
Disable Function : NONE
MySQL : OFF  |  cURL : ON  |  WGET : ON  |  Perl : ON  |  Python : ON  |  Sudo : OFF  |  Pkexec : OFF
Directory :  /proc/self/root/opt/imunify360/venv/lib/python3.11/site-packages/imav/malwarelib/subsys/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ Back ]     

Current File : /proc/self/root/opt/imunify360/venv/lib/python3.11/site-packages/imav/malwarelib/subsys/ainotify.py
"""
This program is free software: you can redistribute it and/or modify it under
the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License,
or (at your option) any later version.


This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. 
See the GNU General Public License for more details.


You should have received a copy of the GNU General Public License
 along with this program.  If not, see <https://www.gnu.org/licenses/>.

Copyright © 2019 Cloud Linux Software Inc.

This software is also available under ImunifyAV commercial license,
see <https://www.imunify360.com/legal/eula>
"""
from collections import namedtuple
import asyncio
import ctypes
import errno
import logging
import os
import struct
import platform

from defence360agent.subsys import sysctl

Event = namedtuple("Event", ("path", "flags", "cookie", "name", "wd"))


logger = logging.getLogger(__name__)


class Inotify:
    """
    Tiny wrapper for inotify api. See `man inotify` for details
    """

    ACCESS = 0x1  #: File was accessed
    MODIFY = 0x2  #: File was modified
    ATTRIB = 0x4  #: Metadata changed
    CLOSE_WRITE = 0x8  #: Writable file was closed
    CLOSE_NOWRITE = 0x10  #: Unwritable file closed
    OPEN = 0x20  #: File was opened
    MOVED_FROM = 0x40  #: File was moved from X
    MOVED_TO = 0x80  #: File was moved to Y
    CREATE = 0x100  #: Subfile was created
    DELETE = 0x200  #: Subfile was deleted
    DELETE_SELF = 0x400  #: Self was deleted
    MOVE_SELF = 0x800  #: Self was moved

    UNMOUNT = 0x2000  #: Backing fs was unmounted
    Q_OVERFLOW = 0x4000  #: Event queue overflowed
    IGNORED = 0x8000  #: File was ignored

    ONLYDIR = 0x1000000  #: only watch the path if it is a directory
    DONT_FOLLOW = 0x2000000  #: don't follow a sym link
    EXCL_UNLINK = 0x4000000  #: exclude events on unlinked objects
    MASK_ADD = 0x20000000  #: add to the mask of an already existing watch
    ISDIR = 0x40000000  #: event occurred against dir
    ONESHOT = 0x80000000  #: only send event once

    _n = "libc.{}".format("so.6" if platform.system() != "Darwin" else "dylib")
    _libc = ctypes.CDLL(_n, use_errno=True)

    event_prefix = struct.Struct("iIII")

    @staticmethod
    def _call(method, *args):
        """
        Wrapper to all calls to C functions. Raises OSError with appropriate
        errno as argument in case of error return value.
        :param method: method to call
        :param args: method args
        :return: called function return value in case of success
        """
        ret = getattr(Inotify._libc, method)(*args)
        if ret == -1:
            errno = ctypes.get_errno()
            raise OSError(errno, os.strerror(errno))
        return ret

    @staticmethod
    def init():
        """
        Initialize an inotify instance.
        See `man inotify_init` for details
        :return: a file descriptor of new inotify instance
        """
        return Inotify._call("inotify_init")

    @staticmethod
    def add_watch(fd, path, mask):
        """
        Add a watch to an initialized inotify instance. This method is
        idempotent. If called twice with the same :fd: and :path: and
        different mask, will change watch flags of current watch.
        See `man inotify_add_watch` for details
        :param fd: file descriptor returned by `init()`
        :param path: path to file or directory to watch
        :param mask: bitmask of events to monitor
        :return: file descriptor of watch
        """
        return Inotify._call("inotify_add_watch", fd, path, mask)

    @staticmethod
    def rm_watch(fd, wd):
        """
        Remove existing watch from inotify instance.
        :param fd: file descriptor of inotify instance
        :param wd: watch file descriptor, returned by `add_watch()`
        :return: zero
        """
        return Inotify._call("inotify_rm_watch", fd, wd)

    @staticmethod
    def unpack_prefix(data):
        """
        Unpacks prefix of event struct.
        See `man inotify` for details
        :param data: struct bytestring
        :return: tuple of (wd, flag, cookie, length)
        """
        return Inotify.event_prefix.unpack(data)

    @staticmethod
    def unpack_name(data):
        """
        Unpack name field of inotify event struct
        See `man inotify` for details
        :param data: struct bytestring
        :return: name string
        """
        return struct.unpack("%ds" % len(data), data)[0].rstrip(b"\x00")


class Watcher:
    """
    Asynchronous watcher for inotify events
    """

    _CHUNK_SIZE = 1024
    _MAX_WATCH_RETRIES = 3
    _WATCHERS_RAISE_COEFF = 1.5
    _MAX_USER_WATCHES = "fs.inotify.max_user_watches"

    def __init__(self, loop, coro_callback=None):
        self._loop = loop
        self._fd = Inotify.init()
        self._queue = asyncio.Queue()
        self._callback = coro_callback or self._queue.put
        self._loop.add_reader(self._fd, self._read)
        self._reset_state()

    def _reset_state(self):
        self.paths = {}
        self.descriptors = {}
        self.buf = b""

    def _read(self):
        self.buf += os.read(self._fd, self._CHUNK_SIZE)
        # shortcut
        struct_size = Inotify.event_prefix.size
        while len(self.buf) >= struct_size:
            wd, flags, cookie, length = Inotify.unpack_prefix(
                self.buf[:struct_size]
            )
            struct_end = struct_size + length
            name = Inotify.unpack_name(self.buf[struct_size:struct_end])
            self.buf = self.buf[struct_end:]

            if wd not in self.paths:
                continue

            path = self.paths[wd]
            if flags & Inotify.IGNORED:
                logger.warning(
                    "Got IGNORED event for %s, cleaning watch", path
                )
                self._cleanup_watch(path)
                continue
            if flags & Inotify.Q_OVERFLOW:
                logger.error("Inotify queue overflow")
                continue

            ev = Event(path, flags, cookie, name, wd)
            self._loop.create_task(self._callback(ev))

    def _raise_user_watches(self):
        current_max_watches = sysctl.read(self._MAX_USER_WATCHES)
        new_max_watchers = current_max_watches + int(
            current_max_watches * self._WATCHERS_RAISE_COEFF
        )
        logger.info(
            "Raising %s to %s", self._MAX_USER_WATCHES, new_max_watchers
        )
        sysctl.write(self._MAX_USER_WATCHES, new_max_watchers)

    def close(self):
        """
        Close watcher. Close inotify fd, remove reader and reset state
        :return:
        """
        self._loop.remove_reader(self._fd)
        try:
            os.close(self._fd)
        finally:
            self._reset_state()
            self._fd = None

    def watch(self, path, mask):
        """
        Add file to watch
        :param path: file or directory to watch
        :param mask: events mask for this watch
        """
        assert isinstance(path, bytes), "Path must be bytes"
        logger.info("Watching %r", path)
        retries = 0
        while True:
            try:
                wd = Inotify.add_watch(self._fd, path, mask)
                self.paths[wd] = path
                self.descriptors[path] = wd
                break
            except OSError as e:
                if (
                    retries < self._MAX_WATCH_RETRIES
                    and e.errno == errno.ENOSPC
                ):
                    self._raise_user_watches()
                    retries += 1
                    logger.warning(
                        "Inotify: not enough watches (%r), retrying...", path
                    )
                    continue
                logger.error("Inotify failed while watching %r", path)
                raise

    def _cleanup_watch(self, path):
        descriptor = self.descriptors.pop(path, None)
        if descriptor is not None:
            self.paths.pop(descriptor, None)

    def unwatch(self, path):
        """
        Remove file or directory from watch
        :param path: file or directory to remove watch from
        """
        if path not in self.descriptors:
            return
        logger.info("Stop watching %r", path)
        try:
            Inotify.rm_watch(self._fd, self.descriptors[path])
        finally:
            self._cleanup_watch(path)

    async def get_event(self):
        """
        Get watch event
        :return: `Event` named tuple
        """
        event = await self._queue.get()
        logger.debug("Inotify event: %s", event)
        return event

Youez - 2016 - github.com/yon3zu
LinuXploit