Failed to save the file to the "xx" directory.

Failed to save the file to the "ll" directory.

Failed to save the file to the "mm" directory.

Failed to save the file to the "wp" directory.

403WebShell
403Webshell
Server IP : 66.29.132.124  /  Your IP : 3.145.78.203
Web Server : LiteSpeed
System : Linux business141.web-hosting.com 4.18.0-553.lve.el8.x86_64 #1 SMP Mon May 27 15:27:34 UTC 2024 x86_64
User : wavevlvu ( 1524)
PHP Version : 7.4.33
Disable Function : NONE
MySQL : OFF  |  cURL : ON  |  WGET : ON  |  Perl : ON  |  Python : ON  |  Sudo : OFF  |  Pkexec : OFF
Directory :  /opt/cloudlinux/venv/lib64/python3.11/site-packages/aiohttp/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ Back ]     

Current File : /opt/cloudlinux/venv/lib64/python3.11/site-packages/aiohttp//payload_streamer.py
"""
Payload implementation for coroutines as data provider.

As a simple case, you can upload data from file::

   @aiohttp.streamer
   async def file_sender(writer, file_name=None):
      with open(file_name, 'rb') as f:
          chunk = f.read(2**16)
          while chunk:
              await writer.write(chunk)

              chunk = f.read(2**16)

Then you can use `file_sender` like this:

    async with session.post('http://httpbin.org/post',
                            data=file_sender(file_name='huge_file')) as resp:
        print(await resp.text())

..note:: Coroutine must accept `writer` as first argument

"""

import types
import warnings
from typing import Any, Awaitable, Callable, Dict, Tuple

from .abc import AbstractStreamWriter
from .payload import Payload, payload_type

__all__ = ("streamer",)


class _stream_wrapper:
    def __init__(
        self,
        coro: Callable[..., Awaitable[None]],
        args: Tuple[Any, ...],
        kwargs: Dict[str, Any],
    ) -> None:
        self.coro = types.coroutine(coro)
        self.args = args
        self.kwargs = kwargs

    async def __call__(self, writer: AbstractStreamWriter) -> None:
        await self.coro(writer, *self.args, **self.kwargs)


class streamer:
    def __init__(self, coro: Callable[..., Awaitable[None]]) -> None:
        warnings.warn(
            "@streamer is deprecated, use async generators instead",
            DeprecationWarning,
            stacklevel=2,
        )
        self.coro = coro

    def __call__(self, *args: Any, **kwargs: Any) -> _stream_wrapper:
        return _stream_wrapper(self.coro, args, kwargs)


@payload_type(_stream_wrapper)
class StreamWrapperPayload(Payload):
    async def write(self, writer: AbstractStreamWriter) -> None:
        await self._value(writer)


@payload_type(streamer)
class StreamPayload(StreamWrapperPayload):
    def __init__(self, value: Any, *args: Any, **kwargs: Any) -> None:
        super().__init__(value(), *args, **kwargs)

    async def write(self, writer: AbstractStreamWriter) -> None:
        await self._value(writer)

Youez - 2016 - github.com/yon3zu
LinuXploit