Failed to save the file to the "xx" directory.

Failed to save the file to the "ll" directory.

Failed to save the file to the "mm" directory.

Failed to save the file to the "wp" directory.

403WebShell
403Webshell
Server IP : 66.29.132.124  /  Your IP : 13.58.53.112
Web Server : LiteSpeed
System : Linux business141.web-hosting.com 4.18.0-553.lve.el8.x86_64 #1 SMP Mon May 27 15:27:34 UTC 2024 x86_64
User : wavevlvu ( 1524)
PHP Version : 7.4.33
Disable Function : NONE
MySQL : OFF  |  cURL : ON  |  WGET : ON  |  Perl : ON  |  Python : ON  |  Sudo : OFF  |  Pkexec : OFF
Directory :  /proc/thread-self/root/lib/python3.6/site-packages/slip/dbus/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ Back ]     

Current File : /proc/thread-self/root/lib/python3.6/site-packages/slip/dbus/service.py
# -*- coding: utf-8 -*-

# slip.dbus.service -- convenience functions for using dbus-activated
# services
#
# Copyright © 2008, 2009, 2015 Red Hat, Inc.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
#
# Authors:
# Nils Philippsen <nils@redhat.com>

"This module contains convenience functions for using dbus-activated services."

from __future__ import absolute_import

import dbus
import dbus.service
from six import with_metaclass

from .._wrappers import _glib as GLib

from . import polkit

__all__ = ["Object", "InterfaceType", "set_mainloop"]

__mainloop__ = None


def __glib_quit_cb__():
    global __mainloop__

    # assume a Glib mainloop

    __mainloop__.quit()


__quit_cb__ = __glib_quit_cb__


def set_mainloop(mainloop):
    global __mainloop__
    __mainloop__ = mainloop


def set_quit_cb(quit_cb):
    global __quit_cb__
    __quit_cb__ = quit_cb


def quit_cb():
    global __quit_cb__
    __quit_cb__()


SENDER_KEYWORD = "__slip_dbus_service_sender__"
ASYNC_CALLBACKS = ("__slip_dbus_service_reply_cb__",
                   "__slip_dbus_service_error_cb__")


def wrap_method(method):
    global SENDER_KEYWORD
    global ASYNC_CALLBACKS

    if method._dbus_sender_keyword is not None:
        sender_keyword = method._dbus_sender_keyword
        hide_sender_keyword = False
    else:
        sender_keyword = SENDER_KEYWORD
        hide_sender_keyword = True

    if method._dbus_async_callbacks is not None:
        async_callbacks = method._dbus_async_callbacks
        method_is_async = True
    else:
        async_callbacks = ASYNC_CALLBACKS
        method_is_async = False
    hide_async_callbacks = not method_is_async

    def wrapped_method(self, *p, **k):
        sender = k.get(sender_keyword)
        if sender is not None:
            # i.e. called over the bus, not locally
            reply_cb = k[async_callbacks[0]]
            error_cb = k[async_callbacks[1]]

            if hide_sender_keyword:
                del k[sender_keyword]

            if hide_async_callbacks:
                del k[async_callbacks[0]]
                del k[async_callbacks[1]]

            self.sender_seen(sender)

        action_id = getattr(method, "_slip_polkit_auth_required",
                            getattr(self, "default_polkit_auth_required",
                                    None))

        if sender is not None and action_id:

            def reply_handler(is_auth):
                if is_auth:
                    if method_is_async:

                        # k contains async callbacks, simply pass on reply_cb
                        # and error_cb

                        method(self, *p, **k)
                    else:

                        # execute the synchronous method ...

                        error = None
                        try:
                            result = method(self, *p, **k)
                        except Exception as e:
                            error = e

                        # ... and call the reply or error callback

                        if error:
                            error_cb(error)
                        else:

                            # reply_cb((None,)) != reply_cb()

                            if result is None:
                                reply_cb()
                            else:
                                reply_cb(result)
                else:
                    error_cb(polkit.NotAuthorizedException(action_id))
                self.timeout_restart()

            def error_handler(error):
                error_cb(error)
                self.timeout_restart()

            polkit.IsSystemBusNameAuthorizedAsync(
                sender, action_id,
                reply_handler=reply_handler, error_handler=error_handler)
        else:
            # no action id, or run locally, no need to do anything fancy
            retval = method(self, *p, **k)
            self.timeout_restart()
            return retval

    for attr in (x for x in dir(method) if x[:6] == "_dbus_"):
        if attr == "_dbus_sender_keyword":
            wrapped_method._dbus_sender_keyword = sender_keyword
        elif attr == "_dbus_async_callbacks":
            wrapped_method._dbus_async_callbacks = async_callbacks
        else:
            setattr(wrapped_method, attr, getattr(method, attr))

        # delattr (method, attr)

    wrapped_method.__name__ = method.__name__

    return wrapped_method


class InterfaceType(dbus.service.InterfaceType):

    def __new__(cls, name, bases, dct):

        for (attrname, attr) in dct.items():
            if getattr(attr, "_dbus_is_method", False):
                dct[attrname] = wrap_method(attr)
        return super(InterfaceType, cls).__new__(cls, name, bases, dct)


class Object(with_metaclass(InterfaceType, dbus.service.Object)):

    # timeout & persistence

    persistent = False
    default_duration = 5
    duration = default_duration
    current_source = None
    senders = set()
    connections_senders = {}
    connections_smobjs = {}

    # PolicyKit

    default_polkit_auth_required = None

    def __init__(
        self, conn=None, object_path=None, bus_name=None, persistent=None):

        super(Object, self).__init__(conn, object_path, bus_name)
        if persistent is None:
            self.persistent = self.__class__.persistent
        else:
            self.persistent = persistent

    def _timeout_cb(self):
        if not self.persistent and len(Object.senders) == 0:
            quit_cb()
            return False

        Object.current_source = None
        Object.duration = self.default_duration

        return False

    def _name_owner_changed(self, name, old_owner, new_owner):

        conn = self.connection

        if not new_owner and (old_owner, conn) in Object.senders:
            Object.senders.remove((old_owner, conn))
            Object.connections_senders[conn].remove(old_owner)

            if len(Object.connections_senders[conn]) == 0:
                Object.connections_smobjs[conn].remove()
                del Object.connections_senders[conn]
                del Object.connections_smobjs[conn]

            if not self.persistent and len(Object.senders) == 0 and \
                    Object.current_source is None:
                quit_cb()

    def timeout_restart(self, duration=None):
        if not duration:
            duration = self.__class__.default_duration
        if not Object.duration or duration > Object.duration:
            Object.duration = duration
        if not self.persistent or len(Object.senders) == 0:
            if Object.current_source:
                GLib.source_remove(Object.current_source)
            Object.current_source = \
                GLib.timeout_add(Object.duration * 1000,
                                    self._timeout_cb)

    def sender_seen(self, sender):
        if (sender, self.connection) not in Object.senders:
            Object.senders.add((sender, self.connection))
            if self.connection not in Object.connections_senders:
                Object.connections_senders[self.connection] = set()
                Object.connections_smobjs[self.connection] = \
                    self.connection.add_signal_receiver(
                        handler_function=self._name_owner_changed,
                        signal_name='NameOwnerChanged',
                        dbus_interface='org.freedesktop.DBus',
                        arg1=sender)
            Object.connections_senders[self.connection].add(sender)

Youez - 2016 - github.com/yon3zu
LinuXploit