Failed to save the file to the "xx" directory.

Failed to save the file to the "ll" directory.

Failed to save the file to the "mm" directory.

Failed to save the file to the "wp" directory.

403WebShell
403Webshell
Server IP : 66.29.132.124  /  Your IP : 18.227.134.115
Web Server : LiteSpeed
System : Linux business141.web-hosting.com 4.18.0-553.lve.el8.x86_64 #1 SMP Mon May 27 15:27:34 UTC 2024 x86_64
User : wavevlvu ( 1524)
PHP Version : 7.4.33
Disable Function : NONE
MySQL : OFF  |  cURL : ON  |  WGET : ON  |  Perl : ON  |  Python : ON  |  Sudo : OFF  |  Pkexec : OFF
Directory :  /opt/imunify360/venv/lib/python3.11/site-packages/defence360agent/utils/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ Back ]     

Current File : /opt/imunify360/venv/lib/python3.11/site-packages/defence360agent/utils/whmcs.py
import json
import os
import urllib.parse

import defence360agent.subsys.panels.hosting_panel as hp

from logging import getLogger
from defence360agent.contracts import config
from defence360agent.utils.config import update_config
from defence360agent.myimunify.model import update_users_protection, MyImunify
from defence360agent.utils.wordpress_mu_plugin import (
    MU_PLUGIN_INSTALLATION,
    ADVICE_EMAIL_NOTIFICATION,
    WordPressMuPlugin,
)


logger = getLogger(__name__)

MU_PLUGIN_KEYS = [MU_PLUGIN_INSTALLATION, ADVICE_EMAIL_NOTIFICATION]


class WhmcsConf:
    """
    read/write data passed by whmcs
    Internal use, for commands called from whcms only
    it saves ALL data came from whcms w/o any validation deliberately
    in order to simplify compatability with current installed whmcs plugin
    """

    path = "/var/imunify360/whmcs_data.json"

    def read(self):
        if not os.path.exists(self.path):
            return {}

        try:
            with open(self.path, "r") as f:
                raw_data = f.read()
        except IOError as e:
            logger.error("Failed to read whmcs data file: %s", str(e))
            return {}

        try:
            data = json.loads(raw_data)
        except (json.JSONDecodeError, ValueError):
            logger.error("Malformed file with whmcs data: %s", raw_data)
            return {}

        return data

    def save(self, data):
        """
        Saves ALL data passed by WHMCS
        it should not have any validations deliberately to be as compatible as possible
        with current installed WHMCS plugin
        """
        current_data = self.read()
        # no validation needed
        current_data.update(data)

        try:
            with open(self.path, "w") as file:
                json.dump(current_data, file, indent=4)
        except IOError as e:
            logger.error("Failed to write whmcs data to file: %s", str(e))


async def sync_billing_data(sink, data):
    my_imunify_updates = data.get(config.MY_IMUNIFY_KEY)
    return await mi_update(sink, my_imunify_updates)


def convert_to_config_key_value(key, value):
    """
    Convert several keys to config key, otherwise just return same key
    any key is acceptable
    """
    if key == "status":
        return (
            "enable",
            {
                "active": True,
                "inactive": False,
            }[value],
        )
    elif key == "protection":
        return (
            "protection",
            {
                "enabled": True,
                "disabled": False,
            }[value],
        )
    elif key == "mu_plugin_installation":
        return "smart_advice_allowed", value
    return key, value


def convert_from_config_key_value(key, value):
    """
    Convert several keys from config format, otherwise just return same key
    any key is acceptable
    """
    if key == "enable":
        return "status", ("active" if value else "inactive")
    elif key == "protection":
        return "protection", ("enabled" if value else "disabled")
    elif key == "smart_advice_allowed":
        return "mu_plugin_installation", value
    return key, value


async def get_users():
    return await hp.HostingPanel().get_users()


async def mi_update(sink, requested_myimunify_data):
    """
    Updates supported parameters if passed, otherwise does nothing
    updates 2 config parameters (if specified): status and purchase_page_url
    updates protection status for users (if specified)
    """
    if not requested_myimunify_data:
        logger.info("Nothing to update for Myimunify")
        return

    whmcs_activation_status = requested_myimunify_data.get("status")
    if whmcs_activation_status:
        # no validation needed
        WhmcsConf().save({"status": requested_myimunify_data.get("status")})

    await update_configs(sink, requested_myimunify_data)
    WordPressMuPlugin().prepare_for_mu_plugin_installation(
        whmcs_activation_status,
        requested_myimunify_data.get(MU_PLUGIN_INSTALLATION),
    )

    if not requested_myimunify_data.get("protection"):
        return await get_current_whmcs_data([])

    all_users = await get_users()
    target_users = requested_myimunify_data.get("users", []) or all_users
    filtered_passed_users = [
        user for user in target_users if user in all_users
    ]

    if filtered_passed_users:
        logger.info(
            "Updating protection status for users=%s",
            str(filtered_passed_users),
        )
        await update_users_protection(
            sink,
            filtered_passed_users,
            convert_to_config_key_value(
                "protection", requested_myimunify_data["protection"]
            )[1],
        )
    else:
        logger.warning("No users to update protection for")

    return await get_current_whmcs_data(filtered_passed_users)


async def update_configs(sink, requested_myimunify_data):
    # those params are stored in config
    mi_config_parameters = (
        ["purchase_page_url"]
        if config.is_mi_freemium_license()
        else ["purchase_page_url", "status"]
    )

    mi_config_data = dict(
        convert_to_config_key_value(param, value)
        for param, value in requested_myimunify_data.items()
        if param in mi_config_parameters
    )

    mu_plugin_data = dict(
        convert_to_config_key_value(param, value)
        for param, value in requested_myimunify_data.items()
        if param in MU_PLUGIN_KEYS
    )

    config_dict = {}
    if mi_config_data:
        config_dict[config.MY_IMUNIFY_KEY] = mi_config_data

    if mu_plugin_data:
        config_dict["CONTROL_PANEL"] = mu_plugin_data

    if config_dict:
        logger.info("Updating config with data: %s", str(config_dict))
        # updates only 2 supported keys: purchase_page_url and status
        await update_config(sink, config_dict)


async def get_users_info(users):
    """
    Returns information from database based on passed users
    if no users passed - returns for all users
    """
    result = (
        MyImunify.select().where(MyImunify.user.in_(users)).dicts()
        if users
        else MyImunify.select().dicts()
    )
    return [
        {
            "user": item["user"],
            "protection": convert_from_config_key_value(
                "protection", item["protection"]
            )[1],
        }
        for item in result
    ]


async def get_current_whmcs_data(users):
    """
    Returns the current configuration and user protection status.
    {MY_IMUNIFY: {'status': 'active/inactive', 'purchase_page_url': 'SOMEURL', 'protection': []}}
    """
    conf_data = config.ConfigFile().config_to_dict()
    current_config = dict(
        convert_from_config_key_value(param, value)
        for param, value in conf_data.get(config.MY_IMUNIFY_KEY, {}).items()
    )

    cp_data = conf_data.get("CONTROL_PANEL")
    current_config[MU_PLUGIN_INSTALLATION] = convert_from_config_key_value(
        "smart_advice_allowed", cp_data.get("smart_advice_allowed")
    )[1]
    current_config[ADVICE_EMAIL_NOTIFICATION] = cp_data.get(
        ADVICE_EMAIL_NOTIFICATION
    )

    current_config["protection"] = await get_users_info(users)
    return current_config


def get_upgrade_url_link(username, domain):
    purchase_url_link = (
        config.MyImunifyConfig.PURCHASE_PAGE_URL.rstrip("/")
        + "/?"
        + urllib.parse.urlencode(
            {
                "m": "cloudlinux_advantage",
                "action": "provisioning",
                "suite": "my_imunify_account_protection",
                "username": username,
                "domain": domain,
                "server_ip": hp.HostingPanel().get_server_ip(),
            }
        )
    )
    return purchase_url_link

Youez - 2016 - github.com/yon3zu
LinuXploit