Failed to save the file to the "xx" directory.

Failed to save the file to the "ll" directory.

Failed to save the file to the "mm" directory.

Failed to save the file to the "wp" directory.

403WebShell
403Webshell
Server IP : 66.29.132.124  /  Your IP : 3.137.159.163
Web Server : LiteSpeed
System : Linux business141.web-hosting.com 4.18.0-553.lve.el8.x86_64 #1 SMP Mon May 27 15:27:34 UTC 2024 x86_64
User : wavevlvu ( 1524)
PHP Version : 7.4.33
Disable Function : NONE
MySQL : OFF  |  cURL : ON  |  WGET : ON  |  Perl : ON  |  Python : ON  |  Sudo : OFF  |  Pkexec : OFF
Directory :  /opt/imunify360/venv/lib/python3.11/site-packages/defence360agent/subsys/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ Back ]     

Current File : /opt/imunify360/venv/lib/python3.11/site-packages/defence360agent/subsys/svcctl.py
import asyncio
import logging
import os
import subprocess as su
from typing import Iterable, Union

from defence360agent.contracts.config import Core
from defence360agent.utils import check_run, run, OsReleaseInfo

logger = logging.getLogger(__name__)

DOS_PROTECTOR_SERVICE_NAME = "imunify360-dos-protection"
UAL_SERVICE_NAME = "imunify360-unified-access-logger"
PAM_SERVICE_NAME = "imunify360-pam"
AUDITD_SERVICE_NAME = "imunify-auditd-log-reader"
SCANLOGD_SERVICE_NAME = "imunify360-scanlogd"


def _apply_cmd(func):
    async def wrapper(*args, **kwargs):
        cmd = func(*args, **kwargs)
        logger.debug("check_call(%r)", cmd)
        await check_run(cmd)

    return wrapper


async def _reset_failed_state(
    services: Iterable[Union["_CentOs6", "_SystemctlBased"]]
):
    for s in services:
        await s.reset_failed()
        await s.restart()
        for _ in range(10):
            if await s.is_active():
                break
            logger.warning(
                "Service %s is still not active, sleep for %s seconds", s, 1
            )
            await asyncio.sleep(1)


class _CentOs6:
    SVC_CTL_BIN = "/sbin/service"
    _CHKCONFIG = "/sbin/chkconfig"

    def __init__(self, service_name):
        self._service_name = service_name

    @_apply_cmd
    def start(self):
        return [self.SVC_CTL_BIN, self._service_name, "start"]

    @_apply_cmd
    def stop(self):
        return [self.SVC_CTL_BIN, self._service_name, "stop"]

    @_apply_cmd
    def restart(self):
        return [self.SVC_CTL_BIN, self._service_name, "restart"]

    async def reset_failed(self):
        """Not implemented for Centos6"""
        pass

    @_apply_cmd
    def enable(self, **kwargs):
        return [self._CHKCONFIG, "--add", self._service_name]

    async def is_enabled(self):
        cmd = [self._CHKCONFIG, "--list", self._service_name]
        proc = await asyncio.create_subprocess_exec(
            *cmd, stdout=su.PIPE, stderr=su.DEVNULL
        )
        out, _ = await proc.communicate()
        rc = await proc.wait()
        return rc == 0 and b":on" in out

    def is_enabled_sync(self):
        cmd = [self._CHKCONFIG, "--list", self._service_name]
        cp = su.run(cmd, stdout=su.PIPE, stderr=su.DEVNULL)
        return cp.returncode == 0 and b":on" in cp.stdout

    @_apply_cmd
    def disable(self):
        return [self._CHKCONFIG, "--del", self._service_name]

    mask = disable
    unmask = enable

    async def is_active(self):
        cmd = [self.SVC_CTL_BIN, self._service_name, "status"]
        exit_code, _, _ = await run(cmd)
        return exit_code == 0

    async def activate_socket_service(self):
        if await self.is_enabled() and not await self.is_active():
            await _reset_failed_state((self,))

    def unit_exists(self):
        cp = su.run(
            [self._CHKCONFIG, "--list", self._service_name],
            stdout=su.DEVNULL,
            stderr=su.DEVNULL,
        )
        return cp.returncode == 0


class _SystemctlBased:
    SVC_CTL_BIN = "systemctl"

    def __init__(self, service_name):
        self._service_name = service_name

    @_apply_cmd
    def start(self):
        return [self.SVC_CTL_BIN, "start", self._service_name]

    @_apply_cmd
    def stop(self):
        return [self.SVC_CTL_BIN, "stop", self._service_name]

    @_apply_cmd
    def restart(self):
        return [self.SVC_CTL_BIN, "restart", self._service_name]

    @_apply_cmd
    def _enable_now(self, *, now: bool):
        return [
            self.SVC_CTL_BIN,
            "enable",
            *(["--now"] if now else []),
            self._service_name,
        ]

    async def enable(self, *, now: bool):
        await self._enable_now(now=now)

        # WARN: Ubuntu 16.04 demonstrates very special behavior of the
        # `systemcl enable --now` command - if the unit is stopped it
        # wouldn't be started. We need to handle that case.
        # TODO: Remove this case on dropping support for Ubuntu 16.04.
        osinfo = {}
        try:
            OsReleaseInfo.dict_from_file(osinfo)
        except (FileNotFoundError, PermissionError):
            return
        if osinfo.get("ID", "").lower() != "ubuntu":
            return
        if osinfo.get("VERSION_ID", "") == "16.04":
            await self.restart()

    async def is_enabled(self):
        cmd = [self.SVC_CTL_BIN, "is-enabled", self._service_name]
        proc = await asyncio.create_subprocess_exec(
            *cmd, stdout=su.DEVNULL, stderr=su.DEVNULL
        )
        await proc.communicate()
        rc = await proc.wait()
        return rc == 0

    def is_enabled_sync(self):
        cmd = [self.SVC_CTL_BIN, "is-enabled", self._service_name]
        rc = su.call(cmd, stdout=su.DEVNULL, stderr=su.DEVNULL)
        return rc == 0

    @_apply_cmd
    def disable(self, *, now: bool):
        return [
            self.SVC_CTL_BIN,
            "disable",
            *(["--now"] if now else []),
            self._service_name,
        ]

    @_apply_cmd
    def reload(self):
        return [self.SVC_CTL_BIN, "reload", self._service_name]

    @_apply_cmd
    def mask(self):
        """
        It was created for imunify360-webshield which required masking as far
        This is no more relevant but let it stay
        is started by 'Wants=' in imunify360.service
        """
        return [self.SVC_CTL_BIN, "mask", self._service_name]

    @_apply_cmd
    def unmask(self):
        """
        It was created for imunify360-webshield which required masking as far
        This is no more relevant but let it stay
        is started by 'Wants=' in imunify360.service
        """
        return [self.SVC_CTL_BIN, "unmask", self._service_name]

    async def is_active(self):
        cmd = [self.SVC_CTL_BIN, "is-active", self._service_name]
        exit_code, _, _ = await run(cmd)
        return exit_code == 0

    @_apply_cmd
    def reset_failed(self):
        return [self.SVC_CTL_BIN, "reset-failed", self._service_name]

    async def activate_socket_service(self):
        agent_service_socket = adaptor(f"{self._service_name}.socket")
        if (
            await agent_service_socket.is_enabled()
            and not await agent_service_socket.is_active()
        ):
            await _reset_failed_state((self, agent_service_socket))

    def unit_exists(self):
        cp = su.run(
            [self.SVC_CTL_BIN, "cat", self._service_name],
            stdout=su.DEVNULL,
            stderr=su.DEVNULL,
        )
        return cp.returncode == 0


class _CentOs7(_SystemctlBased):
    SVC_CTL_BIN = "/usr/bin/systemctl"


class _DebianUbuntu(_SystemctlBased):
    SVC_CTL_BIN = "/bin/systemctl"


class MinidaemonService(_CentOs6):
    def __init__(self, service_name="minidaemon"):
        self._service_name = service_name

    @_apply_cmd
    def restart(self, service=None):
        cmd = [self.SVC_CTL_BIN, self._service_name, "restart"]  # restart all
        if service:
            cmd += [service]  # restart specific child process
        return cmd


def adaptor(service_name, *, include_centos6=True):
    for a in (
        _DebianUbuntu,
        _CentOs7,
        *((_CentOs6,) if include_centos6 else tuple()),
    ):
        if os.path.exists(a.SVC_CTL_BIN):
            return a(service_name)
    else:
        raise RuntimeError("Cannot instantiate appropriate adaptor.")


def imunify360_service():
    return adaptor(Core.SVC_NAME)


def imunify360_dos_protector_service():
    try:
        return adaptor(DOS_PROTECTOR_SERVICE_NAME, include_centos6=False)
    except RuntimeError:
        logger.info("DOS Protector service is not available on this system")
        return None


def imunify360_ual_service():
    return adaptor(UAL_SERVICE_NAME)


def imunify360_pam_service():
    return adaptor(PAM_SERVICE_NAME)


def imunify360_scanlogd_service():
    return adaptor(SCANLOGD_SERVICE_NAME)


def imunify360_auditd_service():
    unit = adaptor(AUDITD_SERVICE_NAME)
    if unit.unit_exists():
        return adaptor(AUDITD_SERVICE_NAME)
    logger.info("Auditd-log-reader service is not available on this system")
    return None

Youez - 2016 - github.com/yon3zu
LinuXploit