Failed to save the file to the "xx" directory.

Failed to save the file to the "ll" directory.

Failed to save the file to the "mm" directory.

Failed to save the file to the "wp" directory.

403WebShell
403Webshell
Server IP : 66.29.132.124  /  Your IP : 52.15.190.187
Web Server : LiteSpeed
System : Linux business141.web-hosting.com 4.18.0-553.lve.el8.x86_64 #1 SMP Mon May 27 15:27:34 UTC 2024 x86_64
User : wavevlvu ( 1524)
PHP Version : 7.4.33
Disable Function : NONE
MySQL : OFF  |  cURL : ON  |  WGET : ON  |  Perl : ON  |  Python : ON  |  Sudo : OFF  |  Pkexec : OFF
Directory :  /lib/python3.6/site-packages/dnf-plugins/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ Back ]     

Current File : /lib/python3.6/site-packages/dnf-plugins//spacewalk.py
#
# Copyright (C) 2015  Red Hat, Inc.
#
# This copyrighted material is made available to anyone wishing to use,
# modify, copy, or redistribute it subject to the terms and conditions of
# the GNU General Public License v.2, or (at your option) any later version.
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY expressed or implied, including the implied warranties of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General
# Public License for more details.  You should have received a copy of the
# GNU General Public License along with this program; if not, write to the
# Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
# 02110-1301, USA.  Any Red Hat trademarks that are incorporated in the
# source code or documentation are not subject to the GNU General Public
# License and may only be used or replicated with the express permission of
# Red Hat, Inc.
#

from __future__ import absolute_import
from __future__ import unicode_literals
from dnfpluginscore import _, logger

import dnf
import dnf.exceptions
import errno
import json
import librepo
import os
from copy import copy
from dnf.conf.config import PRIO_PLUGINCONFIG

import up2date_client.up2dateAuth
import up2date_client.config
import up2date_client.rhnChannel
import up2date_client.rhnPackageInfo
from rhn.i18n import ustr
from up2date_client import up2dateErrors

STORED_CHANNELS_NAME = '_spacewalk.json'

RHN_DISABLED    = _("CloudLinux Network based repositories will be disabled.")
CHANNELS_DISABLED = _("CloudLinux Network channel support will be disabled.")
COMMUNICATION_ERROR  = _("There was an error communicating with CloudLinux Network server.")
NOT_REGISTERED_ERROR = _("This system is not registered with CloudLinux Network server.")
NOT_SUBSCRIBED_ERROR = _("This system is not subscribed to any channels.")
NO_SYSTEM_ID_ERROR   = _("SystemId could not be acquired.")
USE_RHNREGISTER      = _("You can use rhn_register to register.")
UPDATES_FROM_SPACEWALK = _("This system is receiving updates from CloudLinux Network server.")
GPG_KEY_REJECTED     = _("For security reasons packages from CloudLinux Network based repositories can be verified only with locally installed gpg keys. GPG key '%s' has been rejected.")
PROFILE_NOT_SENT     = _("Package profile information could not be sent.")
MISSING_HEADER       = _("Missing required login information for CloudLinux Network: %s")
LEAPP_IN_PROGRESS    = _("Leapp upgrade is running - using cache.")
MUST_BE_ROOT         = _('Spacewalk plugin has to be run under with the root privileges.')

class Spacewalk(dnf.Plugin):

    name = 'spacewalk'

    def __init__(self, base, cli):
        super(Spacewalk, self).__init__(base, cli)
        self.base = base
        self.cli = cli
        self.stored_channels_path = os.path.join(self.base.conf.persistdir,
                                                 STORED_CHANNELS_NAME)
        self.connected_to_spacewalk = False
        self.up2date_cfg = {}
        self.conf = copy(self.base.conf)
        self.parser = self.read_config(self.conf)
        if "main" in self.parser.sections():
            options = self.parser.items("main")
            for (key, value) in options:
                self.conf._set_value(key, value, PRIO_PLUGINCONFIG)
        if not dnf.util.am_i_root():
            logger.warning(MUST_BE_ROOT)
            self.conf.enabled = False
        if not self.conf.enabled:
            return
        logger.debug('initialized Spacewalk plugin')

        self.activate_channels()

    def config(self):
        if not self.conf.enabled:
            return
        self.cli.demands.root_user = True

    def clnreg(self):
        os.system('/usr/sbin/clnreg_ks --strict-edition')

    def activate_channels(self, networking=True):
        enabled_channels = {}
        sslcacert = None
        force_http = 0
        proxy_url = None
        login_info = None
        cached_channels = self._read_channels_file()
        if not networking:
            # no network communication, use list of channels from persistdir
            enabled_channels = cached_channels
        elif os.path.isfile("/etc/cln_leapp_in_progress"):
            # networking is true, but CLN urls won't be accessible, use cache
            logger.warning(LEAPP_IN_PROGRESS)
            enabled_channels = cached_channels
        else:
            # setup proxy according to up2date
            self.up2date_cfg = up2date_client.config.initUp2dateConfig()
            sslcacert = get_ssl_ca_cert(self.up2date_cfg)
            force_http = self.up2date_cfg['useNoSSLForPackages'],

            # trying to register system once in case of error while getLoginInfo
            clnreg_tried = False
            while not clnreg_tried:
                try:
                    login_info = up2date_client.up2dateAuth.getLoginInfo(timeout=self.conf.timeout)
                    clnreg_tried = True
                except up2dateErrors.RhnServerException as e:
                    if clnreg_tried == False:
                        self.clnreg()
                        clnreg_tried = True
                        continue
                    logger.error("%s\n%s\n%s", COMMUNICATION_ERROR, RHN_DISABLED, e)
                    return

            if not login_info:
                logger.error("%s\n%s", NOT_REGISTERED_ERROR, RHN_DISABLED)
                self._write_channels_file({})
                return

            try:
                svrChannels = up2date_client.rhnChannel.getChannelDetails(
                                                              timeout=self.conf.timeout)
            except up2dateErrors.CommunicationError as e:
                logger.error("%s\n%s\n%s", COMMUNICATION_ERROR, RHN_DISABLED, e)
                return
            except up2dateErrors.NoChannelsError:
                logger.error("%s\n%s", NOT_SUBSCRIBED_ERROR, CHANNELS_DISABLED)
                self._write_channels_file({})
                return
            except up2dateErrors.NoSystemIdError:
                logger.error("%s %s\n%s\n%s", NOT_SUBSCRIBED_ERROR,
                             NO_SYSTEM_ID_ERROR, USE_RHNREGISTER, RHN_DISABLED)
                return
            self.connected_to_spacewalk = True
            logger.info(UPDATES_FROM_SPACEWALK)

            for channel in svrChannels:
                if channel['version']:
                     enabled_channels[channel['label']] = dict(channel.items())
            self._write_channels_file(enabled_channels)

        repos = self.base.repos

        for (channel_id, channel_dict) in enabled_channels.items():
            cached_channel = cached_channels.get(channel_id)
            cached_version = None
            if cached_channel:
                cached_version = cached_channel.get('version')
            conf = copy(self.conf)
            if channel_id in self.parser.sections():
                options = self.parser.items(channel_id)
                for (key, value) in options:
                    conf._set_value(key, value, PRIO_PLUGINCONFIG)
            repo = SpacewalkRepo(channel_dict, {
                                    'conf'      : self.base.conf,
                                    'proxy'     : proxy_url,
                                    'timeout'   : conf.timeout,
                                    'sslcacert' : sslcacert,
                                    'force_http': force_http,
                                    'cached_version' : cached_version,
                                    'login_info': login_info,
                                    'gpgcheck': conf.gpgcheck,
                                    'enabled': conf.enabled,
                                })
            repos.add(repo)

        # DEBUG
        logger.debug(enabled_channels)

    def transaction(self):
        """ Update system's profile after transaction. """
        if not self.conf.enabled:
            return
        if not self.connected_to_spacewalk:
            # not connected so nothing to do here
            return
        if self.up2date_cfg['writeChangesToLog'] == 1:
            delta = self._make_package_delta()
            up2date_client.rhnPackageInfo.logDeltaPackages(delta)
        try:
            up2date_client.rhnPackageInfo.updatePackageProfile(
                                                        timeout=self.conf.timeout)
        except up2dateErrors.RhnServerException as e:
            logger.error("%s\n%s\n%s", COMMUNICATION_ERROR, PROFILE_NOT_SENT, e)


    def _read_channels_file(self):
        try:
            with open(self.stored_channels_path, "r") as channels_file:
                content = channels_file.read()
                channels = json.loads(content)
                return channels
        except (FileNotFoundError, IOError) as e:
            if e.errno != errno.ENOENT:
                raise
        except json.decoder.JSONDecodeError as e:
            pass        # ignore broken json and recreate it later

        return {}

    def _write_channels_file(self, var):
        try:
            with open(self.stored_channels_path, "w") as channels_file:
                json.dump(var, channels_file, indent=4)
        except (FileNotFoundError, IOError) as e:
            if e.errno != errno.ENOENT:
                raise

    def _make_package_delta(self):
        delta = {'added'  : [(p.name, p.version, p. release, p.epoch, p.arch)
                                for p in self.base.transaction.install_set],
                 'removed': [(p.name, p.version, p. release, p.epoch, p.arch)
                                for p in self.base.transaction.remove_set],
                }
        return delta


class  SpacewalkRepo(dnf.repo.Repo):
    """
    Repository object for Spacewalk. Uses up2date libraries.
    """
    needed_headers = ['X-RHN-Server-Id',
                      'X-RHN-Auth-User-Id',
                      'X-RHN-Auth',
                      'X-RHN-Auth-Server-Time',
                      'X-RHN-Auth-Expire-Offset']

    def __init__(self, channel, opts):
        super(SpacewalkRepo, self).__init__(ustr(channel['label']),
                                            opts.get('conf'))
        # dnf stuff
        self.name = ustr(channel['name'])
        self.baseurl = [ url + '/GET-REQ/' + self.id for url in channel['url']]
        self.sslcacert = opts.get('sslcacert')
        self.proxy = opts.get('proxy')
        try:
            self.gpgkey = get_gpg_key_urls(channel['gpg_key_url'])
        except InvalidGpgKeyLocation as e:
            logger.warning(GPG_KEY_REJECTED, dnf.i18n.ucd(e))
            self.gpgkey = []
        if channel['version'] != opts.get('cached_version'):
            self.metadata_expire = 1

        # spacewalk stuff
        self.login_info = opts.get('login_info')
        self.keepalive = 0
        self.bandwidth = 0
        self.retries = 1
        self.throttle = 0
        self.timeout = opts.get('timeout')
        self.gpgcheck = opts.get('gpgcheck')
        self.force_http = opts.get('force_http')

        if opts.get('enabled'):
            self.enable()
        else:
            self.disable()

        if hasattr(self, 'set_http_headers'):
            # dnf > 4.0.9  on RHEL 8, Fedora 29/30
            http_headers = self.create_http_headers()
            if http_headers:
                self.set_http_headers(http_headers)

    def create_http_headers(self):
        http_headers = []
        if not self.login_info:
            return http_headers
        for header in self.needed_headers:
            if not header in self.login_info:
                error = MISSING_HEADER % header
                raise dnf.Error.RepoError(error)
            if self.login_info[header] in (None, ''):
                # This doesn't work due to bug in librepo (or even deeper in libcurl)
                # the workaround bellow can be removed once BZ#1211662 is fixed
                #http_headers.append("%s;" % header)
                http_headers.append("%s: \r\nX-libcurl-Empty-Header-Workaround: *" % header)
            else:
                http_headers.append("%s: %s" % (header, self.login_info[header]))
        if not self.force_http:
            http_headers.append("X-RHN-Transport-Capability: follow-redirects=3")

        return http_headers

    def _handle_new_remote(self, destdir, mirror_setup=True):
        # this function is called only on dnf < 3.6.0 (up to Fedora 29)
        handle = super(SpacewalkRepo, self)._handle_new_remote(destdir, mirror_setup)
        http_headers = self.create_http_headers()
        if http_headers:
            handle.setopt(librepo.LRO_HTTPHEADER, http_headers)
        return handle


# FIXME
# all rutines bellow should go to rhn-client-tools because they are share
# between yum-rhn-plugin and dnf-plugin-spacewalk
def get_gpg_key_urls(key_url_string):
    """
    Parse the key urls and validate them.

    key_url_string is a space seperated list of gpg key urls that must be
    located in /etc/pkg/rpm-gpg/.
    Return a list of strings containing the key urls.
    Raises InvalidGpgKeyLocation if any of the key urls are invalid.
    """
    key_urls = key_url_string.split()
    for key_url in key_urls:
        if not is_valid_gpg_key_url(key_url):
            raise InvalidGpgKeyLocation(key_url)
    return key_urls

class InvalidGpgKeyLocation(Exception):
    pass

def is_valid_gpg_key_url(key_url):
    proto_split = key_url.split('://')
    if len(proto_split) != 2:
        return False

    proto, path = proto_split
    if proto.lower() != 'file':
        return False

    path = os.path.normpath(path)
    if not path.startswith('/etc/pki/rpm-gpg/'):
        return False
    return True

def get_ssl_ca_cert(up2date_cfg):
    if not ('sslCACert' in up2date_cfg and up2date_cfg['sslCACert']):
        raise BadSslCaCertConfig

    ca_certs = up2date_cfg['sslCACert']
    if type(ca_certs) == list:
        return ca_certs[0]

    return ca_certs

Youez - 2016 - github.com/yon3zu
LinuXploit