Failed to save the file to the "xx" directory.

Failed to save the file to the "ll" directory.

Failed to save the file to the "mm" directory.

Failed to save the file to the "wp" directory.

403WebShell
403Webshell
Server IP : 66.29.132.124  /  Your IP : 18.220.226.147
Web Server : LiteSpeed
System : Linux business141.web-hosting.com 4.18.0-553.lve.el8.x86_64 #1 SMP Mon May 27 15:27:34 UTC 2024 x86_64
User : wavevlvu ( 1524)
PHP Version : 7.4.33
Disable Function : NONE
MySQL : OFF  |  cURL : ON  |  WGET : ON  |  Perl : ON  |  Python : ON  |  Sudo : OFF  |  Pkexec : OFF
Directory :  /opt/cloudlinux/venv/lib/python3.11/site-packages/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ Back ]     

Current File : /opt/cloudlinux/venv/lib/python3.11/site-packages/cllicenselib.py
# -*- coding: utf-8 -*-

# CL LICENSE CHECK python lib

#
# Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2019 All Rights Reserved
#
# Licensed under CLOUD LINUX LICENSE AGREEMENT
# http://cloudlinux.com/docs/LICENSE.TXT

# CLASSES:
#
# LicenseData - main class for CL licence check
# : check_license
# : get_ip
# : date_format
# : open_letter_template
# : format_letter
#
# FUNCTIONS:
#
# License update cron              : update_license_timestamp_file
# License update new key           : update license with new key
# Check license                    : check_license
# Return Last License check date   : last_license_check
# Return Template to display       : return_template_to_display

import datetime
import os
import socket
import struct
import subprocess
import sys
import time
from urllib import error, request

import cldetectlib as detect
from clcommon.utils import mod_makedirs

RHN_CHECK_FILE = '/usr/sbin/rhn_check'
RHN_UPDATE_NEW_KEY = '/usr/sbin/rhnreg_ks'
LICENSE_TIMESTAMP_FILE = '/var/lve/lveinfo.ver'
LICENSE_TIMESTAMP_DIR = os.path.dirname(LICENSE_TIMESTAMP_FILE)
RHN_DIRECTORY = '/etc/sysconfig/rhn'
RHN_SYSTEMID = os.path.join(RHN_DIRECTORY, 'systemid')
JWT_TOKEN = os.path.join(RHN_DIRECTORY, 'jwt.token')
JWT_TOKEN_TIMESTAMP_FILE = os.path.join(LICENSE_TIMESTAMP_DIR, 'jwt_last_update_ts')

CACHE_TIME = 259200  # 3 days
JWT_UPDATE_INTERVAL = 3600 * 4

NO_VALID_LICENSE_FOUND_TEMPLATE = '/usr/share/cloudlinux/no_valid_license_screen.txt'
LICENSE_OUT_OF_DATE_EMAIL_TEMPLATE = '/usr/share/cloudlinux/license_out_of_date_email.txt'

TEMPLATES = {
    'Email': {
        'template_file': LICENSE_OUT_OF_DATE_EMAIL_TEMPLATE,
        'error_msg': 'Error: License out of date, email template missing.',
    },
    'NoValid': {
        'template_file': NO_VALID_LICENSE_FOUND_TEMPLATE,
        'error_msg': 'Error: No valid license found, template is missing.',
    },
}

SHOW_IP_LINK = 'http://cloudlinux.com/showip.php'


# License Data Class
class LicenseData:
    _license_last_timestamp = ''
    _server_ip = ''
    _letter_template = ''

    @staticmethod
    def _is_license_needs_update(license_timestamp_file):
        """
        Update license when:

        in case JWT token present:
        - if no timestamp file with jwt token last update time
        - if jwt token was not updated during last 4 hours
        - if jwt token was updated later than our timestamp in file

        in case JWT token is not present, but systemid exists
        (fallback for licensing w/o token)
        - update license only if systemid was changed after last timestamp update

        when no jwt and no systemid - re-update license
        """
        if os.path.exists(JWT_TOKEN):
            if not os.path.exists(JWT_TOKEN_TIMESTAMP_FILE):
                return True
            rhn_updated = int(os.path.getmtime(JWT_TOKEN))
            if rhn_updated < int(time.time()) - JWT_UPDATE_INTERVAL:
                return True
            jwt_last_update_timestamp = read_time_from_file(JWT_TOKEN_TIMESTAMP_FILE)
            if jwt_last_update_timestamp < rhn_updated:
                return True
            return False
        if os.path.exists(RHN_SYSTEMID):
            rhn_updated = os.path.getmtime(RHN_SYSTEMID)
            return rhn_updated > os.path.getmtime(license_timestamp_file)
        return True

    def check_license(self, license_timestamp_file):
        """
        Checks license:
        1. Reads cache file with timestamp
        2. Ensures license must be re-checked:
            - if timestamp in cache file > CACHE_TIME ->
            check if we need to re-update licensing
        """
        try:
            # Get timestamp of last license check
            self._license_last_timestamp = read_time_from_file(license_timestamp_file)
            if (int(time.time()) - self._license_last_timestamp) > CACHE_TIME:
                if self._is_license_needs_update(license_timestamp_file):
                    return update_license_timestamp_file()
                # because if server does not have jwt token and license is not needed update
                # consider no license
                return os.path.exists(JWT_TOKEN)
            return True
        except (IOError, struct.error):
            return False

    # Get IP
    def get_ip(self):
        if not self._server_ip:
            try:
                with request.urlopen(SHOW_IP_LINK) as response:
                    self._server_ip = response.read().decode('utf-8').strip()
            except error.URLError as e:
                print('Error: Get server IP. ' + str(e))
                sys.exit(1)

    # Format Date
    def date_format(self, format):
        return datetime.datetime.fromtimestamp(self._license_last_timestamp).strftime(format)

    # Open letter template
    def open_letter_template(self, template):
        try:
            with open(template, 'r', encoding='utf-8') as f:
                self._letter_template = f.read()
        except IOError as e:
            print('Error: Failed to open template file. ' + str(e))
            sys.exit(1)

    # Format Screen
    def format_letter(self):
        admin_email = detect.getCPAdminEmail()
        if not admin_email:
            return None

        self.get_ip()
        if not self._license_last_timestamp:
            return (
                self._letter_template.replace('%LIC_DATE%', '')
                .replace('%IP%', self._server_ip)
                .replace('%HOSTNAME%', socket.gethostname())
                .replace('%FROM%', admin_email)
            )

        return (
            self._letter_template.replace('%LIC_DATE%', ' since ' + self.date_format('%b %d, %y'))
            .replace('%IP%', self._server_ip)
            .replace('%HOSTNAME%', socket.gethostname())
            .replace('%FROM%', admin_email)
        )


# License Data Class Object
License = LicenseData()


def read_time_from_file(file: str):
    with open(file, 'rb') as f:
        return struct.unpack('i', f.read())[0]


def write_time_to_file(file: str, timestamp: int):
    with open(file, 'wb') as f:
        f.write(struct.pack('i', timestamp))


# License update cron
def update_license_timestamp_file():
    try:
        if not os.path.isdir(LICENSE_TIMESTAMP_DIR):
            mod_makedirs(LICENSE_TIMESTAMP_DIR, 0o755)
        with subprocess.Popen(
            [RHN_CHECK_FILE],
            stdout=subprocess.PIPE,
            stderr=subprocess.STDOUT,
        ) as proc:
            proc.communicate()
            if proc.returncode == 0:
                write_time_to_file(LICENSE_TIMESTAMP_FILE, int(time.time()))
                if os.path.exists(JWT_TOKEN):
                    write_time_to_file(JWT_TOKEN_TIMESTAMP_FILE, int(os.path.getmtime(JWT_TOKEN)))
                return True
            return False
    except (OSError, IOError):
        return False


# update license with new key
def update_license_with_key(key):
    try:
        if not os.path.isdir(LICENSE_TIMESTAMP_DIR):
            mod_makedirs(LICENSE_TIMESTAMP_DIR, 0o755)
        with subprocess.Popen(
            [RHN_UPDATE_NEW_KEY, '--activationkey=' + key, '--force'],
            stdout=subprocess.PIPE,
            stderr=subprocess.STDOUT,
        ) as proc:
            out, _ = proc.communicate()
            if proc.returncode == 0:
                write_time_to_file(LICENSE_TIMESTAMP_FILE, int(time.time()))
                if os.path.exists(JWT_TOKEN):
                    write_time_to_file(JWT_TOKEN_TIMESTAMP_FILE, int(os.path.getmtime(JWT_TOKEN)))
                print('OK')
                return True

            print(out.strip())
            return False
    except (OSError, IOError):
        print('Error: New key activation failed, please try again later.')
        return False


# Returns:
# True == license is Ok
# False == license expired or not found
def check_license():
    # Check for license timestamp file in /var/lve/lveinfo.ver
    if os.path.isfile(LICENSE_TIMESTAMP_FILE):
        return License.check_license(LICENSE_TIMESTAMP_FILE)
    # try to re-update license file if jwt token is present
    # or systemid file as fallback when jwt absent (goDaddy)
    if os.path.exists(JWT_TOKEN) or os.path.exists(RHN_SYSTEMID):
        return update_license_timestamp_file()
    return False


# Return Last license check Date
def last_license_check(is_valid):
    if is_valid:
        return 'OK'
    # Check for license timestamp file in /var/lve/lveinfo.ver
    if os.path.isfile(LICENSE_TIMESTAMP_FILE):
        return 'No valid license found, last successful check was on ' + License.date_format('%b %d, %y')
    return 'No valid license found.'


def get_email_template():
    return get_template_to_display(TEMPLATES['Email'])


def get_novalid_template():
    return get_template_to_display(TEMPLATES['NoValid'])


# Return Template Email, No Valid license found template.
def get_template_to_display(template):
    # check for template in /usr/share/cloudlinux
    if not os.path.isfile(template['template_file']):
        return template['error_msg']

    if not check_license():
        # set template text into License class attr.
        License.open_letter_template(template['template_file'])

        # Return Formatted Template
        return License.format_letter()

    return None

Youez - 2016 - github.com/yon3zu
LinuXploit