Failed to save the file to the "xx" directory.

Failed to save the file to the "ll" directory.

Failed to save the file to the "mm" directory.

Failed to save the file to the "wp" directory.

403WebShell
403Webshell
Server IP : 66.29.132.124  /  Your IP : 18.221.147.141
Web Server : LiteSpeed
System : Linux business141.web-hosting.com 4.18.0-553.lve.el8.x86_64 #1 SMP Mon May 27 15:27:34 UTC 2024 x86_64
User : wavevlvu ( 1524)
PHP Version : 7.4.33
Disable Function : NONE
MySQL : OFF  |  cURL : ON  |  WGET : ON  |  Perl : ON  |  Python : ON  |  Sudo : OFF  |  Pkexec : OFF
Directory :  /lib/python3.6/site-packages/dnf/db/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ Back ]     

Current File : /lib/python3.6/site-packages/dnf/db//history.py
# -*- coding: utf-8 -*-

# Copyright (C) 2009, 2012-2018  Red Hat, Inc.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Library General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
#

import calendar
import os
import time

import libdnf.transaction
import libdnf.utils

from dnf.i18n import ucd
from dnf.yum import misc
from dnf.exceptions import DatabaseError

from .group import GroupPersistor, EnvironmentPersistor, RPMTransaction


class RPMTransactionItemWrapper(object):
    def __init__(self, swdb, item):
        assert item is not None
        self._swdb = swdb
        self._item = item

    def __str__(self):
        return self._item.getItem().toStr()

    def __lt__(self, other):
        return self._item < other._item

    def __eq__(self, other):
        return self._item == other._item

    def __hash__(self):
        return self._item.__hash__()

    def match(self, pattern):
        return True

    def is_package(self):
        return self._item.getRPMItem() is not None

    def is_group(self):
        return self._item.getCompsGroupItem() is not None

    def is_environment(self):
        return self._item.getCompsEnvironmentItem() is not None

    def get_group(self):
        return self._item.getCompsGroupItem()

    def get_environment(self):
        return self._item.getCompsEnvironmentItem()

    @property
    def name(self):
        return self._item.getRPMItem().getName()

    @property
    def epoch(self):
        return self._item.getRPMItem().getEpoch()

    @property
    def version(self):
        return self._item.getRPMItem().getVersion()

    @property
    def release(self):
        return self._item.getRPMItem().getRelease()

    @property
    def arch(self):
        return self._item.getRPMItem().getArch()

    @property
    def evr(self):
        if self.epoch:
            return "{}:{}-{}".format(self.epoch, self.version, self.release)
        return "{}-{}".format(self.version, self.release)

    @property
    def nevra(self):
        return self._item.getRPMItem().getNEVRA()

    @property
    def action(self):
        return self._item.getAction()

    @action.setter
    def action(self, value):
        self._item.setAction(value)

    @property
    def reason(self):
        return self._item.getReason()

    @reason.setter
    def reason(self, value):
        return self._item.setReason(value)

    @property
    def action_name(self):
        try:
            return self._item.getActionName()
        except AttributeError:
            return ""

    @property
    def action_short(self):
        try:
            return self._item.getActionShort()
        except AttributeError:
            return ""

    @property
    def state(self):
        return self._item.getState()

    @state.setter
    def state(self, value):
        self._item.setState(value)

    @property
    def from_repo(self):
        return self._item.getRepoid()

    def ui_from_repo(self):
        if not self._item.getRepoid():
            return ""
        return "@" + self._item.getRepoid()

    @property
    def obsoleting(self):
        return None

    def get_reason(self):
        # TODO: get_history_reason
        return self._swdb.rpm.get_reason(self)

    @property
    def pkg(self):
        return self._swdb.rpm._swdb_ti_pkg[self._item]

    @property
    def files(self):
        return self.pkg.files

    @property
    def _active(self):
        return self.pkg


class TransactionWrapper(object):

    altered_lt_rpmdb = False
    altered_gt_rpmdb = False

    def __init__(self, trans):
        self._trans = trans

    @property
    def tid(self):
        return self._trans.getId()

    @property
    def cmdline(self):
        return self._trans.getCmdline()

    @property
    def releasever(self):
        return self._trans.getReleasever()

    @property
    def beg_timestamp(self):
        return self._trans.getDtBegin()

    @property
    def end_timestamp(self):
        return self._trans.getDtEnd()

    @property
    def beg_rpmdb_version(self):
        return self._trans.getRpmdbVersionBegin()

    @property
    def end_rpmdb_version(self):
        return self._trans.getRpmdbVersionEnd()

    @property
    def return_code(self):
        return int(self._trans.getState() != libdnf.transaction.TransactionItemState_DONE)

    @property
    def loginuid(self):
        return self._trans.getUserId()

    @property
    def data(self):
        return self.packages

    @property
    def is_output(self):
        output = self._trans.getConsoleOutput()
        return bool(output)

    @property
    def comment(self):
        return self._trans.getComment()

    def tids(self):
        return [self._trans.getId()]

    def performed_with(self):
        return []

    def packages(self):
        result = self._trans.getItems()
        return [RPMTransactionItemWrapper(self, i) for i in result]

    def output(self):
        return [i[1] for i in self._trans.getConsoleOutput()]

    def error(self):
        return []

    def compare_rpmdbv(self, rpmdbv):
        self.altered_gt_rpmdb = self._trans.getRpmdbVersionEnd() != rpmdbv


class MergedTransactionWrapper(TransactionWrapper):

    def __init__(self, trans):
        self._trans = libdnf.transaction.MergedTransaction(trans._trans)

    def merge(self, trans):
        self._trans.merge(trans._trans)

    @property
    def loginuid(self):
        return self._trans.listUserIds()

    def tids(self):
        return self._trans.listIds()

    @property
    def return_code(self):
        return [int(i != libdnf.transaction.TransactionItemState_DONE) for i in self._trans.listStates()]

    @property
    def cmdline(self):
        return self._trans.listCmdlines()

    @property
    def releasever(self):
        return self._trans.listReleasevers()

    @property
    def comment(self):
        return self._trans.listComments()

    def output(self):
        return [i[1] for i in self._trans.getConsoleOutput()]

class SwdbInterface(object):

    def __init__(self, db_dir, releasever=""):
        # TODO: record all vars
        # TODO: remove relreasever from options
        self.releasever = str(releasever)
        self._rpm = None
        self._group = None
        self._env = None
        self._addon_data = None
        self._swdb = None
        self._db_dir = db_dir
        self._output = []

    def __del__(self):
        self.close()

    @property
    def rpm(self):
        if self._rpm is None:
            self._rpm = RPMTransaction(self)
        return self._rpm

    @property
    def group(self):
        if self._group is None:
            self._group = GroupPersistor(self)
        return self._group

    @property
    def env(self):
        if self._env is None:
            self._env = EnvironmentPersistor(self)
        return self._env

    @property
    def dbpath(self):
        return os.path.join(self._db_dir, libdnf.transaction.Swdb.defaultDatabaseName)

    @property
    def swdb(self):
        """ Lazy initialize Swdb object """
        if not self._swdb:
            # _db_dir == persistdir which is prepended with installroot already
            try:
                self._swdb = libdnf.transaction.Swdb(self.dbpath)
            except RuntimeError as ex:
                raise DatabaseError(str(ex))
            self._swdb.initTransaction()
            # TODO: vars -> libdnf
        return self._swdb

    def transform(self, input_dir):
        transformer = libdnf.transaction.Transformer(input_dir, self.dbpath)
        transformer.transform()

    def close(self):
        try:
            del self._tid
        except AttributeError:
            pass
        self._rpm = None
        self._group = None
        self._env = None
        if self._swdb:
            self._swdb.closeTransaction()
            self._swdb.closeDatabase()
        self._swdb = None
        self._output = []

    @property
    def path(self):
        return self.swdb.getPath()

    def reset_db(self):
        return self.swdb.resetDatabase()

    # TODO: rename to get_last_transaction?
    def last(self, complete_transactions_only=True):
        # TODO: complete_transactions_only
        t = self.swdb.getLastTransaction()
        if not t:
            return None
        return TransactionWrapper(t)

    # TODO: rename to: list_transactions?
    def old(self, tids=None, limit=0, complete_transactions_only=False):
        tids = tids or []
        tids = [int(i) for i in tids]
        result = self.swdb.listTransactions()
        result = [TransactionWrapper(i) for i in result]
        # TODO: move to libdnf
        if tids:
            result = [i for i in result if i.tid in tids]

        # populate altered_lt_rpmdb and altered_gt_rpmdb
        for i, trans in enumerate(result):
            if i == 0:
                continue
            prev_trans = result[i-1]
            if trans._trans.getRpmdbVersionBegin() != prev_trans._trans.getRpmdbVersionEnd():
                trans.altered_lt_rpmdb = True
                prev_trans.altered_gt_rpmdb = True
        return result[::-1]

    def get_current(self):
        return TransactionWrapper(self.swdb.getCurrent())

    def set_reason(self, pkg, reason):
        """Set reason for package"""
        rpm_item = self.rpm._pkg_to_swdb_rpm_item(pkg)
        repoid = self.repo(pkg)
        action = libdnf.transaction.TransactionItemAction_REASON_CHANGE
        ti = self.swdb.addItem(rpm_item, repoid, action, reason)
        ti.setState(libdnf.transaction.TransactionItemState_DONE)
        return ti

    '''
    def package(self, pkg):
        """Get SwdbPackage from package"""
        return self.swdb.package(str(pkg))
    '''

    def repo(self, pkg):
        """Get repository of package"""
        return self.swdb.getRPMRepo(str(pkg))

    def package_data(self, pkg):
        """Get package data for package"""
        # trans item is returned
        result = self.swdb.getRPMTransactionItem(str(pkg))
        if result is None:
            return result
        result = RPMTransactionItemWrapper(self, result)
        return result

#    def reason(self, pkg):
#        """Get reason for package"""
#        result = self.swdb.resolveRPMTransactionItemReason(pkg.name, pkg.arch, -1)
#        return result

    # TODO: rename to begin_transaction?
    def beg(self, rpmdb_version, using_pkgs, tsis, cmdline=None, comment=""):
        try:
            self.swdb.initTransaction()
        except:
            pass

        tid = self.swdb.beginTransaction(
            int(calendar.timegm(time.gmtime())),
            str(rpmdb_version),
            cmdline or "",
            int(misc.getloginuid()),
            comment)
        self.swdb.setReleasever(self.releasever)
        self._tid = tid

        return tid

    def pkg_to_swdb_rpm_item(self, po):
        rpm_item = self.swdb.createRPMItem()
        rpm_item.setName(po.name)
        rpm_item.setEpoch(po.epoch or 0)
        rpm_item.setVersion(po.version)
        rpm_item.setRelease(po.release)
        rpm_item.setArch(po.arch)
        return rpm_item

    def log_scriptlet_output(self, msg):
        if not hasattr(self, '_tid'):
            return
        if not msg:
            return
        for line in msg.splitlines():
            line = ucd(line)
            # logging directly to database fails if transaction runs in a background process
            self._output.append((1, line))

    '''
    def _log_errors(self, errors):
        for error in errors:
            error = ucd(error)
            self.swdb.log_error(self._tid, error)
    '''

    def end(self, end_rpmdb_version="", return_code=None, errors=None):
        if not hasattr(self, '_tid'):
            return  # Failed at beg() time

        if return_code is None:
            # return_code/state auto-detection
            return_code = libdnf.transaction.TransactionState_DONE
            for tsi in self.rpm:
                if tsi.state == libdnf.transaction.TransactionItemState_ERROR:
                    return_code = libdnf.transaction.TransactionState_ERROR
                    break

        for file_descriptor, line in self._output:
            self.swdb.addConsoleOutputLine(file_descriptor, line)
        self._output = []

        self.swdb.endTransaction(
            int(time.time()),
            str(end_rpmdb_version),
            return_code,
        )

        # Closing and cleanup is done in the close() method.
        # It is important to keep data around after the transaction ends
        # because it's needed by plugins to report installed packages etc.

    # TODO: ignore_case, more patterns
    def search(self, patterns, ignore_case=True):
        """ Search for history transactions which contain specified
            packages al. la. "yum list". Returns transaction ids. """
        return self.swdb.searchTransactionsByRPM(patterns)

    def user_installed(self, pkg):
        """Returns True if package is user installed"""
        reason = self.swdb.resolveRPMTransactionItemReason(pkg.name, pkg.arch, -1)
        if reason == libdnf.transaction.TransactionItemReason_USER:
            return True
        # if reason is not known, consider a package user-installed
        # because it was most likely installed via rpm
        if reason == libdnf.transaction.TransactionItemReason_UNKNOWN:
            return True
        return False

    def get_erased_reason(self, pkg, first_trans, rollback):
        """Get reason of package before transaction being undone. If package
        is already installed in the system, keep his reason.

        :param pkg: package being installed
        :param first_trans: id of first transaction being undone
        :param rollback: True if transaction is performing a rollback"""
        if rollback:
            # return the reason at the point of rollback; we're setting that reason
            result = self.swdb.resolveRPMTransactionItemReason(pkg.name, pkg.arch, first_trans)
        else:
            result = self.swdb.resolveRPMTransactionItemReason(pkg.name, pkg.arch, -1)

        # consider unknown reason as user-installed
        if result == libdnf.transaction.TransactionItemReason_UNKNOWN:
            result = libdnf.transaction.TransactionItemReason_USER
        return result

Youez - 2016 - github.com/yon3zu
LinuXploit