Failed to save the file to the "xx" directory.

Failed to save the file to the "ll" directory.

Failed to save the file to the "mm" directory.

Failed to save the file to the "wp" directory.

403WebShell
403Webshell
Server IP : 66.29.132.124  /  Your IP : 13.59.130.154
Web Server : LiteSpeed
System : Linux business141.web-hosting.com 4.18.0-553.lve.el8.x86_64 #1 SMP Mon May 27 15:27:34 UTC 2024 x86_64
User : wavevlvu ( 1524)
PHP Version : 7.4.33
Disable Function : NONE
MySQL : OFF  |  cURL : ON  |  WGET : ON  |  Perl : ON  |  Python : ON  |  Sudo : OFF  |  Pkexec : OFF
Directory :  /proc/self/root/opt/cloudlinux/venv/lib/python3.11/site-packages/aiohttp/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ Back ]     

Current File : /proc/self/root/opt/cloudlinux/venv/lib/python3.11/site-packages/aiohttp//locks.py
import asyncio
import collections
from typing import Any, Deque, Optional


class EventResultOrError:
    """Event asyncio lock helper class.

    Wraps the Event asyncio lock allowing either to awake the
    locked Tasks without any error or raising an exception.

    thanks to @vorpalsmith for the simple design.
    """

    def __init__(self, loop: asyncio.AbstractEventLoop) -> None:
        self._loop = loop
        self._exc: Optional[BaseException] = None
        self._event = asyncio.Event()
        self._waiters: Deque[asyncio.Future[Any]] = collections.deque()

    def set(self, exc: Optional[BaseException] = None) -> None:
        self._exc = exc
        self._event.set()

    async def wait(self) -> Any:
        waiter = self._loop.create_task(self._event.wait())
        self._waiters.append(waiter)
        try:
            val = await waiter
        finally:
            self._waiters.remove(waiter)

        if self._exc is not None:
            raise self._exc

        return val

    def cancel(self) -> None:
        """Cancel all waiters"""
        for waiter in self._waiters:
            waiter.cancel()

Youez - 2016 - github.com/yon3zu
LinuXploit