Failed to save the file to the "xx" directory.

Failed to save the file to the "ll" directory.

Failed to save the file to the "mm" directory.

Failed to save the file to the "wp" directory.

403WebShell
403Webshell
Server IP : 66.29.132.124  /  Your IP : 3.145.79.214
Web Server : LiteSpeed
System : Linux business141.web-hosting.com 4.18.0-553.lve.el8.x86_64 #1 SMP Mon May 27 15:27:34 UTC 2024 x86_64
User : wavevlvu ( 1524)
PHP Version : 7.4.33
Disable Function : NONE
MySQL : OFF  |  cURL : ON  |  WGET : ON  |  Perl : ON  |  Python : ON  |  Sudo : OFF  |  Pkexec : OFF
Directory :  /opt/cloudlinux/venv/lib64/python3.11/site-packages/pyvirtualdisplay/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ Back ]     

Current File : /opt/cloudlinux/venv/lib64/python3.11/site-packages/pyvirtualdisplay//abstractdisplay.py
import fnmatch
import logging
import os
import select
import subprocess
import tempfile
import time
from threading import Lock

from pyvirtualdisplay import xauth
from pyvirtualdisplay.util import get_helptext, platform_is_osx

log = logging.getLogger(__name__)

# try:
#     import fcntl
# except ImportError:
#     fcntl = None

_mutex = Lock()
_mutex_popen = Lock()


_MIN_DISPLAY_NR = 1000
_USED_DISPLAY_NR_LIST = []

_X_START_TIMEOUT = 10
_X_START_TIME_STEP = 0.1
_X_START_WAIT = 0.1


class XStartTimeoutError(Exception):
    pass


class XStartError(Exception):
    pass


def _lock_files():
    tmpdir = "/tmp"
    try:
        ls = os.listdir(tmpdir)
    except FileNotFoundError:
        log.warning("missing /tmp")
        return []
    pattern = ".X*-lock"
    names = fnmatch.filter(ls, pattern)
    ls = [os.path.join(tmpdir, child) for child in names]
    ls = [p for p in ls if os.path.isfile(p)]
    return ls


def _search_for_display():
    # search for free display
    ls = list(map(lambda x: int(x.split("X")[1].split("-")[0]), _lock_files()))
    if len(ls):
        display = max(_MIN_DISPLAY_NR, max(ls) + 3)
    else:
        display = _MIN_DISPLAY_NR

    return display


class AbstractDisplay(object):
    """
    Common parent for X servers (Xvfb,Xephyr,Xvnc)
    """

    def __init__(self, program, use_xauth, retries, extra_args, manage_global_env):
        self._extra_args = extra_args
        self._retries = retries
        self._program = program
        self.stdout = None
        self.stderr = None
        self.old_display_var = None
        self._subproc = None
        self.display = None
        self._is_started = False
        self._manage_global_env = manage_global_env
        self._reset_global_env = False
        self._pipe_wfd = None
        self._retries_current = 0

        helptext = get_helptext(program)
        self._has_displayfd = "-displayfd" in helptext
        if not self._has_displayfd:
            log.debug("-displayfd flag is missing.")
        PYVIRTUALDISPLAY_DISPLAYFD = os.environ.get("PYVIRTUALDISPLAY_DISPLAYFD")
        if PYVIRTUALDISPLAY_DISPLAYFD:
            log.debug("PYVIRTUALDISPLAY_DISPLAYFD=%s", PYVIRTUALDISPLAY_DISPLAYFD)
            # '0'->false, '1'->true
            self._has_displayfd = bool(int(PYVIRTUALDISPLAY_DISPLAYFD))
        else:
            # TODO: macos: displayfd is available on XQuartz-2.7.11 but it doesn't work, always 0 is returned
            if platform_is_osx():
                self._has_displayfd = False

        self._check_flags(helptext)

        if use_xauth and not xauth.is_installed():
            raise xauth.NotFoundError()

        self._use_xauth = use_xauth
        self._old_xauth = None
        self._xauth_filename = None

    def _check_flags(self, helptext):
        pass

    def _cmd(self):
        raise NotImplementedError()

    def _redirect_display(self, on):
        """
        on:
         * True -> set $DISPLAY to virtual screen
         * False -> set $DISPLAY to original screen

        :param on: bool
        """
        d = self.new_display_var if on else self.old_display_var
        if d is None:
            log.debug("unset $DISPLAY")
            try:
                del os.environ["DISPLAY"]
            except KeyError:
                log.warning("$DISPLAY was already unset.")
        else:
            log.debug("set $DISPLAY=%s", d)
            os.environ["DISPLAY"] = d

    def _env(self):
        env = os.environ.copy()
        env["DISPLAY"] = self.new_display_var
        return env

    def start(self):
        """
        start display

        :rtype: self
        """
        if self._is_started:
            raise XStartError(self, "Display was started twice.")
        self._is_started = True

        if self._has_displayfd:
            self._start1_has_displayfd()
        else:
            i = 0
            while True:
                self._retries_current = i + 1
                try:
                    self._start1()
                    break
                except XStartError:
                    log.warning("start failed %s", i + 1)
                    time.sleep(0.05)
                    i += 1
                    if i >= self._retries:
                        raise XStartError(
                            "No success after %s retries. Last stderr: %s"
                            % (self._retries, self.stderr)
                        )
        if self._manage_global_env:
            self._redirect_display(True)
            self._reset_global_env = True

    def _popen(self, use_pass_fds):
        with _mutex_popen:
            if use_pass_fds:
                self._subproc = subprocess.Popen(
                    self._command,
                    pass_fds=[self._pipe_wfd],
                    stdout=subprocess.PIPE,
                    stderr=subprocess.PIPE,
                    shell=False,
                )
            else:
                self._subproc = subprocess.Popen(
                    self._command,
                    stdout=subprocess.PIPE,
                    stderr=subprocess.PIPE,
                    shell=False,
                )

    def _start1_has_displayfd(self):
        # stdout doesn't work on osx -> create own pipe
        rfd, self._pipe_wfd = os.pipe()

        self._command = self._cmd() + self._extra_args
        log.debug("command: %s", self._command)

        self._popen(use_pass_fds=True)

        self.display = int(self._wait_for_pipe_text(rfd))
        os.close(rfd)
        os.close(self._pipe_wfd)

        self.new_display_var = ":%s" % int(self.display)

        if self._use_xauth:
            self._setup_xauth()

        # https://github.com/ponty/PyVirtualDisplay/issues/2
        # https://github.com/ponty/PyVirtualDisplay/issues/14
        self.old_display_var = os.environ.get("DISPLAY", None)

    def _start1(self):
        with _mutex:
            self.display = _search_for_display()
            while self.display in _USED_DISPLAY_NR_LIST:
                self.display += 1
            self.new_display_var = ":%s" % int(self.display)

            _USED_DISPLAY_NR_LIST.append(self.display)

        self._command = self._cmd() + self._extra_args
        log.debug("command: %s", self._command)

        self._popen(use_pass_fds=False)

        self.new_display_var = ":%s" % int(self.display)

        if self._use_xauth:
            self._setup_xauth()

        # https://github.com/ponty/PyVirtualDisplay/issues/2
        # https://github.com/ponty/PyVirtualDisplay/issues/14
        self.old_display_var = os.environ.get("DISPLAY", None)

        # wait until X server is active
        start_time = time.time()

        d = self.new_display_var
        ok = False
        time.sleep(0.05)  # give time for early exit
        while True:
            if not self.is_alive():
                break

            try:
                xdpyinfo = subprocess.Popen(
                    ["xdpyinfo"],
                    env=self._env(),
                    stdout=subprocess.PIPE,
                    stderr=subprocess.PIPE,
                    shell=False,
                )
                _, _ = xdpyinfo.communicate()
                exit_code = xdpyinfo.returncode
            except FileNotFoundError:
                log.warning(
                    "xdpyinfo was not found, X start can not be checked! Please install xdpyinfo!"
                )
                time.sleep(_X_START_WAIT)  # old method
                ok = True
                break
            # try:
            #     xdpyinfo = EasyProcess(["xdpyinfo"], env=self._env())
            #     xdpyinfo.enable_stdout_log = False
            #     xdpyinfo.enable_stderr_log = False
            #     exit_code = xdpyinfo.call().return_code
            # except EasyProcessError:
            #     log.warning(
            #         "xdpyinfo was not found, X start can not be checked! Please install xdpyinfo!"
            #     )
            #     time.sleep(_X_START_WAIT)  # old method
            #     ok = True
            #     break

            if exit_code != 0:
                pass
            else:
                log.info('Successfully started X with display "%s".', d)
                ok = True
                break

            if time.time() - start_time >= _X_START_TIMEOUT:
                break
            time.sleep(_X_START_TIME_STEP)
        if not self.is_alive():
            log.warning("process exited early. stderr:%s", self.stderr)
            msg = "Failed to start process: %s"
            raise XStartError(msg % self)
        if not ok:
            msg = 'Failed to start X on display "%s" (xdpyinfo check failed, stderr:[%s]).'
            raise XStartTimeoutError(msg % (d, xdpyinfo.stderr))

    def _wait_for_pipe_text(self, rfd):
        s = ""
        start_time = time.time()
        while True:
            (rfd_changed_ls, _, _) = select.select([rfd], [], [], 0.1)
            if not self.is_alive():
                raise XStartError(
                    "%s program closed. command: %s stderr: %s"
                    % (self._program, self._command, self.stderr)
                )
            if rfd in rfd_changed_ls:
                c = os.read(rfd, 1)
                if c == b"\n":
                    break
                s += c.decode("ascii")

            # this timeout is for "eternal" hang. see #62
            if time.time() - start_time >= 600:  # = 10 minutes
                raise XStartTimeoutError(
                    "No reply from program %s. command:%s"
                    % (
                        self._program,
                        self._command,
                    )
                )
        return s

    def stop(self):
        """
        stop display

        :rtype: self
        """
        if not self._is_started:
            raise XStartError("stop() is called before start().")

        if self._reset_global_env:
            self._redirect_display(False)

        if self.is_alive():
            try:
                self._subproc.kill()
            except OSError as oserror:
                log.debug("exception in terminate:%s", oserror)

            self._subproc.wait()
            self._read_stdout_stderr()
        if self._use_xauth:
            self._clear_xauth()
        return self

    def _read_stdout_stderr(self):
        if self.stdout is None:
            (self.stdout, self.stderr) = self._subproc.communicate()

            log.debug("stdout=%s", self.stdout)
            log.debug("stderr=%s", self.stderr)

    def _setup_xauth(self):
        """
        Set up the Xauthority file and the XAUTHORITY environment variable.
        """
        handle, filename = tempfile.mkstemp(
            prefix="PyVirtualDisplay.", suffix=".Xauthority"
        )
        self._xauth_filename = filename
        os.close(handle)
        # Save old environment
        self._old_xauth = {}
        self._old_xauth["AUTHFILE"] = os.getenv("AUTHFILE")
        self._old_xauth["XAUTHORITY"] = os.getenv("XAUTHORITY")

        os.environ["AUTHFILE"] = os.environ["XAUTHORITY"] = filename
        cookie = xauth.generate_mcookie()
        xauth.call("add", self.new_display_var, ".", cookie)

    def _clear_xauth(self):
        """
        Clear the Xauthority file and restore the environment variables.
        """
        os.remove(self._xauth_filename)
        for varname in ["AUTHFILE", "XAUTHORITY"]:
            if self._old_xauth[varname] is None:
                del os.environ[varname]
            else:
                os.environ[varname] = self._old_xauth[varname]
        self._old_xauth = None

    def __enter__(self):
        """used by the :keyword:`with` statement"""
        self.start()
        return self

    def __exit__(self, *exc_info):
        """used by the :keyword:`with` statement"""
        self.stop()

    def is_alive(self):
        if not self._subproc:
            return False
        # return self.return_code is None
        rc = self._subproc.poll()
        if rc is not None:
            # proc exited
            self._read_stdout_stderr()
        return rc is None

    # @property
    # def return_code(self):
    #     if not self._subproc:
    #         return None
    #     rc = self._subproc.poll()
    #     if rc is not None:
    #         # proc exited
    #         self._read_stdout_stderr()
    #     return rc

    @property
    def pid(self):
        """
        PID (:attr:`subprocess.Popen.pid`)

        :rtype: int
        """
        if self._subproc:
            return self._subproc.pid

Youez - 2016 - github.com/yon3zu
LinuXploit