403Webshell
Server IP : 66.29.132.124  /  Your IP : 18.119.124.204
Web Server : LiteSpeed
System : Linux business141.web-hosting.com 4.18.0-553.lve.el8.x86_64 #1 SMP Mon May 27 15:27:34 UTC 2024 x86_64
User : wavevlvu ( 1524)
PHP Version : 7.4.33
Disable Function : NONE
MySQL : OFF  |  cURL : ON  |  WGET : ON  |  Perl : ON  |  Python : ON  |  Sudo : OFF  |  Pkexec : OFF
Directory :  /opt/cloudlinux/venv/lib/python3.11/site-packages/clsummary/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ Back ]     

Current File : /opt/cloudlinux/venv/lib/python3.11/site-packages/clsummary/rpm_packages_statistics.py
# coding=utf-8
#
# Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2019 All Rights Reserved
#
# Licensed under CLOUD LINUX LICENSE AGREEMENT
# http://cloudlinux.com/docs/LICENSE.TXT

import logging
from subprocess import CalledProcessError, Popen, PIPE
from typing import Dict, AnyStr, Optional, List

from clcommon.evr_utils import serialize_evr
from clcommon.utils import is_ubuntu

app_logger = logging.getLogger('cloudlinux-summary.get_rpm_packages_info')


def get_rpm_packages_info() -> List[Optional[Dict[AnyStr, Optional[AnyStr]]]]:
    """
    Get full info about all of rpm packages:
        - name
        - epoch
        - version
        - release
        - arch
        - serialized_version
    """
    result = []
    rpm_db_error_pattern = 'Thread died in Berkeley DB library'
    is_ubuntu_os = is_ubuntu()
    if is_ubuntu_os:
        rpm_cmd = "dpkg-query -f '${Package}\t${Version}\t${Architecture}\n' -W"
    else:
        rpm_cmd = "rpm -qa --queryformat '%{name}\t%|epoch?{%{epoch}}:{None}|\t%{version}\t%{release}\t%{arch}\n'"
    error_message = 'Can\'t get information about rpm packages, because'
    rpm_db_warn_msg = 'Server has broken rpmdb. We can\'t get ' \
                      'statistics about rpm packages and skip its getting.'
    returncode = 0
    try:
        with Popen(
            rpm_cmd,
            stdout=PIPE,
            stderr=PIPE,
            shell=True,
            executable='/bin/bash',
            text=True,
        ) as proc:
            stdout, stderr = proc.communicate()
            returncode = proc.returncode
        # We shouldn't send event to Sentry if rpmdb is broken,
        # because we can't do anything for getting statistics about rpm packages
        # and can only skip this case
        if rpm_db_error_pattern in stderr:
            app_logger.warning(
                rpm_db_warn_msg,
            )
            return result
    except (CalledProcessError, OSError) as exception:
        app_logger.exception(
            '%s exception "%s',
            error_message,
            exception
        )
        return result

    if returncode != 0:
        app_logger.error(
            '%s command "%s" return non-zero code "%s"',
            error_message,
            rpm_cmd,
            returncode,
            extra={
                'stdout': stdout,
                'stderr': stderr,
            }
        )
        return result
    lines = stdout.strip().split('\n')
    for line in lines:
        try:
            name, epoch, version, release, arch = parse_package_manager_output(line, is_ubuntu_os)
        except ValueError:
            app_logger.error(
                'The result of call "%s" has an invalid line "%s". '
                'It should contain five elements.',
                rpm_cmd,
                line,
                extra={
                    'stdout': stdout,
                }
            )
            continue
        epoch = None if epoch == 'None' else epoch
        result.append({
            'name': name,
            'epoch': epoch,
            'version': version,
            'release': release,
            'arch': arch,
            'serialized_version': serialize_evr([
                epoch,
                version,
                release,
            ])
        })
    return result


def parse_package_manager_output(line, is_ubuntu_os):
    """
    rpm -qa already returns data in needed format, e.g:
    lve-utils       None    6.2.3   2.el7.cloudlinux.1639593336.cloudlinux.1639595623       x86_64
    but, dpkg-query output needed to be parsed a bit, cause version column cannot be split by util
    to epoch:version:release
    lve-utils       6.2.2.1639220776        amd64
    """
    if is_ubuntu_os:
        name, version, arch = line.split('\t')
        # deb package version [epoch:]upstream_version[-debian_revision]
        epoch = None
        if ':' in version:
            epoch, version = version.split(':')
        version, *release = version.split('-')
        release = '-'.join(release) if release else None
    else:
        name, epoch, version, release, arch = line.split('\t')
    return name, epoch, version, release, arch

Youez - 2016 - github.com/yon3zu
LinuXploit