Failed to save the file to the "xx" directory.

Failed to save the file to the "ll" directory.

Failed to save the file to the "mm" directory.

Failed to save the file to the "wp" directory.

403WebShell
403Webshell
Server IP : 66.29.132.124  /  Your IP : 3.146.178.220
Web Server : LiteSpeed
System : Linux business141.web-hosting.com 4.18.0-553.lve.el8.x86_64 #1 SMP Mon May 27 15:27:34 UTC 2024 x86_64
User : wavevlvu ( 1524)
PHP Version : 7.4.33
Disable Function : NONE
MySQL : OFF  |  cURL : ON  |  WGET : ON  |  Perl : ON  |  Python : ON  |  Sudo : OFF  |  Pkexec : OFF
Directory :  /usr/lib/python3.6/site-packages/slip/dbus/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ Back ]     

Current File : /usr/lib/python3.6/site-packages/slip/dbus/polkit.py
# -*- coding: utf-8 -*-

# slip.dbus.polkit -- convenience decorators and functions for using PolicyKit
# with dbus services and clients
#
# Copyright © 2008, 2009, 2012, 2013, 2015 Red Hat, Inc.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
#
# Authors:
# Nils Philippsen <nils@redhat.com>

"""This module contains convenience decorators and functions for using
PolicyKit with dbus services and clients."""

from __future__ import absolute_import

import collections
import dbus
from decorator import decorator
from functools import reduce

from .constants import method_call_no_timeout

__all__ = ["require_auth", "enable_proxy", "AUTHFAIL_DONTCATCH",
           "NotAuthorizedException", "AreAuthorizationsObtainable",
           "IsSystemBusNameAuthorizedAsync"]


def require_auth(polkit_auth):
    """Decorator for DBus service methods.

    Specify that a user needs a specific PolicyKit authorization `polkit_auth´
    to execute it."""

    def require_auth_decorator(method):
        assert hasattr(method, "_dbus_is_method")

        setattr(method, "_slip_polkit_auth_required", polkit_auth)
        return method

    return require_auth_decorator


AUTH_EXC_PREFIX = \
    "org.fedoraproject.slip.dbus.service.PolKit.NotAuthorizedException."


class AUTHFAIL_DONTCATCH(object):
    pass


def enable_proxy(
    func=None, authfail_result=AUTHFAIL_DONTCATCH, authfail_exception=None,
    authfail_callback=None):

    """Decorator for DBus proxy methods.

    Let's you (optionally) specify either a result value or an exception type
    and a callback which is returned, thrown or called respectively if a
    PolicyKit authorization doesn't exist or can't be obtained in the DBus
    mechanism, i.e. an appropriate DBus exception is thrown.

    An exception constructor may and a callback must accept an `action_id´
    parameter which will be set to the id of the PolicyKit action for which
    authorization could not be obtained.

    Examples:

    1) Return `False´ in the event of an authorization problem, and call
    `error_handler´:

        def error_handler(action_id=None):
            print "Authorization problem:", action_id

        class MyProxy(object):
            @polkit.enable_proxy(authfail_result=False,
                                 authfail_callback=error_handler)
            def some_method(self, ...):
                ...

    2) Throw a `MyAuthError´ instance in the event of an authorization problem:

        class MyAuthError(Exception):
            def __init__(self, *args, **kwargs):
                action_id = kwargs.pop("action_id")
                super(MyAuthError, self).__init__(*args, **kwargs)
                self.action_id = action_id

        class MyProxy(object):
            @polkit.enable_proxy(authfail_exception=MyAuthError)
            def some_method(self, ...):
                ..."""

    assert(func is None or isinstance(func, collections.Callable))

    assert(
        authfail_result in (None, AUTHFAIL_DONTCATCH) or
        authfail_exception is None)
    assert(
        authfail_callback is None or
        isinstance(authfail_callback, collections.Callable))
    assert(
        authfail_exception is None or
        issubclass(authfail_exception, Exception))

    def _enable_proxy(func, *p, **k):

        try:
            return func(*p, **k)
        except dbus.DBusException as e:
            exc_name = e.get_dbus_name()

            if not exc_name.startswith(AUTH_EXC_PREFIX):
                raise

            action_id = exc_name[len(AUTH_EXC_PREFIX):]

            if authfail_callback is not None:
                authfail_callback(action_id=action_id)

            if authfail_exception is not None:
                try:
                    af_exc = authfail_exception(action_id=action_id)
                except:
                    af_exc = authfail_exception()
                raise af_exc

            if authfail_result is AUTHFAIL_DONTCATCH:
                raise

            return authfail_result

    if func is not None:
        return decorator(_enable_proxy, func)
    else:
        def decorate(func):
            return decorator(_enable_proxy, func)
        return decorate


class NotAuthorizedException(dbus.DBusException):

    """Exception which a DBus service method throws if an authorization
    required for executing it can't be obtained."""

    _dbus_error_name = \
        "org.fedoraproject.slip.dbus.service.PolKit.NotAuthorizedException"

    def __init__(self, action_id, *p, **k):

        self._dbus_error_name = self.__class__._dbus_error_name + "." +\
            action_id
        super(NotAuthorizedException, self).__init__(*p, **k)


class PolKit(object):

    """Convenience wrapper around polkit."""

    _dbus_name = 'org.freedesktop.PolicyKit1'
    _dbus_path = '/org/freedesktop/PolicyKit1/Authority'
    _dbus_interface = 'org.freedesktop.PolicyKit1.Authority'

    __interface = None
    __bus = None
    __bus_name = None
    __signal_receiver = None

    @classmethod
    def _on_name_owner_changed(cls, name, old_owner, new_owner):
        if name == cls._dbus_name and PolKit.__bus:
            PolKit.__bus.remove_signal_receiver(PolKit.__signal_receiver)
            PolKit.__bus = None
            PolKit.__signal_receiver = None
            PolKit.__interface = None

    @property
    def _bus(self):
        if not PolKit.__bus:
            PolKit.__bus = dbus.SystemBus()
            PolKit.__signal_receiver = PolKit.__bus.add_signal_receiver(
                handler_function=self._on_name_owner_changed,
                signal_name='NameOwnerChanged',
                dbus_interface='org.freedesktop.DBus',
                arg0=self._dbus_name)
        return PolKit.__bus

    @property
    def _bus_name(self):
        if not PolKit.__bus_name:
            PolKit.__bus_name = self._bus.get_unique_name()
        return PolKit.__bus_name

    @property
    def _interface(self):
        if not PolKit.__interface:
            try:
                PolKit.__interface = dbus.Interface(self._bus.get_object(
                    self._dbus_name, self._dbus_path),
                    self._dbus_interface)
            except dbus.DBusException:
                pass
        return PolKit.__interface

    @property
    def _polkit_present(self):
        return bool(self._interface)

    def __dbus_system_bus_name_uid(self, system_bus_name):
        bus_object = self._bus.get_object(
            'org.freedesktop.DBus', '/org/freedesktop/DBus')
        bus_interface = dbus.Interface(bus_object, 'org.freedesktop.DBus')
        try:
            uid = bus_interface.GetConnectionUnixUser(system_bus_name)
        except:
            uid = None
        return uid

    def __authorization_is_obtainable(self, authorization):
        if not self._polkit_present:
            return True

        (is_authorized, is_challenge, details) = \
            self._interface.CheckAuthorization(
                ("system-bus-name", {"name": self._bus_name}),
                authorization, {}, 0, "")

        return is_authorized or is_challenge

    def AreAuthorizationsObtainable(self, authorizations):
        if not self._polkit_present:
            return True

        if not isinstance(authorizations, (tuple, list, set)):
            authorizations = (authorizations,)

        obtainable = \
            reduce(
                lambda x, y: x and self.__authorization_is_obtainable(y),
                authorizations, True)

        return obtainable

    def IsSystemBusNameAuthorizedAsync(
        self, system_bus_name, action_id, reply_handler, error_handler,
        challenge=True, details={}):

        if not self._polkit_present:
            return reply_handler(action_id is None or
                self.__dbus_system_bus_name_uid(system_bus_name) == 0)

        flags = 0
        if challenge:
            flags |= 0x1

        def reply_cb(args):
            (is_authorized, is_challenge, details) = args
            reply_handler(is_authorized)

        self._interface.CheckAuthorization(
            ("system-bus-name", {"name": system_bus_name}),
            action_id, details, flags, "",
            reply_handler=reply_cb, error_handler=error_handler,
            timeout=method_call_no_timeout)


__polkit = PolKit()


def AreAuthorizationsObtainable(authorizations):
    return __polkit.AreAuthorizationsObtainable(authorizations)


def IsSystemBusNameAuthorizedAsync(
    system_bus_name, action_id, reply_handler, error_handler, challenge=True,
    details={}):

    return __polkit.IsSystemBusNameAuthorizedAsync(
        system_bus_name, action_id, reply_handler, error_handler, challenge,
        details)

Youez - 2016 - github.com/yon3zu
LinuXploit