Failed to save the file to the "xx" directory.

Failed to save the file to the "ll" directory.

Failed to save the file to the "mm" directory.

Failed to save the file to the "wp" directory.

403WebShell
403Webshell
Server IP : 66.29.132.124  /  Your IP : 3.145.81.47
Web Server : LiteSpeed
System : Linux business141.web-hosting.com 4.18.0-553.lve.el8.x86_64 #1 SMP Mon May 27 15:27:34 UTC 2024 x86_64
User : wavevlvu ( 1524)
PHP Version : 7.4.33
Disable Function : NONE
MySQL : OFF  |  cURL : ON  |  WGET : ON  |  Perl : ON  |  Python : ON  |  Sudo : OFF  |  Pkexec : OFF
Directory :  /opt/cloudlinux/venv/lib64/python3.11/site-packages/lvestats/lib/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ Back ]     

Current File : /opt/cloudlinux/venv/lib64/python3.11/site-packages/lvestats/lib/uidconverter.py
# coding=utf-8
#
# Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2019 All Rights Reserved
#
# Licensed under CLOUD LINUX LICENSE AGREEMENT
# http://cloudlinux.com/docs/LICENSE.TXT

import pwd
from typing import Dict, Optional  # NOQA

from sqlalchemy.engine import Engine  # NOQA
from sqlalchemy.orm import sessionmaker
from sqlalchemy.orm.exc import NoResultFound

from clcommon import cpapi
from clcommon.clproc import LIMIT_LVP_ID
from lveapi import NameMap
from lvestats.lib.commons.func import deserialize_lve_id, serialize_lve_id
from lvestats.orm import user

__author__ = 'shaman'


__all__ = ('uid_to_username', 'username_to_uid')

_name_map_cache = None  # type: NameMap
_id_reseller_map_cache = None  # type: Dict[int, str]


def _get_name_map():
    # type: () -> NameMap
    global _name_map_cache
    if _name_map_cache is None:
        name_map = NameMap()
        name_map.link_xml_node()
        _name_map_cache = name_map
    return _name_map_cache


def uid_to_username_local(uid):
    # type: (int) -> Optional[str]
    try:
        return pwd.getpwuid(uid).pw_name
    except KeyError:
        return None


def _uid_to_reseller_local(uid):
    # type: (int) -> Optional[str]
    reseller = _get_name_map().get_name(uid)
    if reseller is None:
        try:
            reseller = pwd.getpwuid(uid).pw_name
        except KeyError:
            reseller = _get_reseller_name_from_panel(uid)
    return reseller


def _get_reseller_name_from_panel(uid):
    # type: (int) -> Optional[str]
    global _id_reseller_map_cache
    if _id_reseller_map_cache is None:
        try:
            _id_reseller_map_cache = \
                {v: k for k, v in cpapi.get_reseller_id_pairs().items()}
        except cpapi.NotSupported:
            _id_reseller_map_cache = {}
    reseller = _id_reseller_map_cache.get(uid)
    return reseller


def _reseller_to_uid_local(name):
    # type: (str) -> Optional[int]
    return _get_name_map().get_id(name)


def reseller_to_uid(name):
    result = _reseller_to_uid_local(name)
    if result is not None:
        return serialize_lve_id(LIMIT_LVP_ID, result)


def username_to_uid_local(username):
    # type: (str) -> Optional[int]
    try:
        return pwd.getpwnam(username).pw_uid
    except KeyError:
        return None


def _uid_to_username_db(uid, server_id, db_engine):
    # type: (int, str, Engine) -> Optional[str]
    session = sessionmaker(bind=db_engine)()
    try:
        user_inst = session.query(user).filter(user.uid == uid,
                                               user.server_id == server_id).one()
    except NoResultFound:
        return None
    finally:
        session.close()

    return user_inst.user_name


def _uid_to_reseller_db(uid, server_id, db_engine):
    # type: (int, str, Engine) -> Optional[str]
    # FIXME: store reseller's names in db (centralized database on Plesk/DA)
    return _uid_to_username_db(uid, server_id, db_engine)


def _username_to_uid_db(username, server_id, db_engine):
    # type: (str, str, Engine) -> Optional[int]
    session = sessionmaker(bind=db_engine)()
    try:
        user_inst = session.query(user).filter(user.user_name == username,
                                               user.server_id == server_id).one()
    except NoResultFound:
        return None
    finally:
        session.close()

    return user_inst.uid


def uid_to_username(uid, local_server_id, server_id, db_engine):
    # type: (int, str, str, Engine) -> Optional[str]
    if None in (uid, local_server_id, server_id, db_engine):
        raise ValueError("All parameters should be specified and not None for uid_to_username()")

    uid, is_reseller = deserialize_lve_id(uid)
    uid_to_name_local_func = _uid_to_reseller_local if is_reseller else uid_to_username_local
    uid_to_name_database_func = _uid_to_reseller_db if is_reseller else _uid_to_username_db

    if server_id == local_server_id:
        result = uid_to_name_local_func(uid)
        if result:
            return result

    return uid_to_name_database_func(uid, server_id, db_engine)


def username_to_uid(username, local_server_id, server_id, db_engine):
    # type: (str, str, str, Engine) -> Optional[int]
    """
    Lookups for uid in local user database (/etc/passwd) and global one (table lve_stats2_user).
    If local_server_id equals to server_id then local user database has priority.
    """
    if None in (username, local_server_id, server_id, db_engine):
        raise ValueError("All parameters should be specified and not None for username_to_uid()")

    if server_id == local_server_id:
        result = username_to_uid_local(username)
        if result:
            return result

    return _username_to_uid_db(username, server_id, db_engine)


def convert_id_to_lvp_id(any_id):
    # type: (int) -> int
    dummy, is_reseller = deserialize_lve_id(any_id)
    if is_reseller:
        return any_id
    else:
        return serialize_lve_id(LIMIT_LVP_ID, any_id)


def convert_name_to_lvp_id(name):
    # type: (str) -> int
    lvp_id = reseller_to_uid(name)
    if lvp_id:
        return lvp_id
    return -1

Youez - 2016 - github.com/yon3zu
LinuXploit