Failed to save the file to the "xx" directory.

Failed to save the file to the "ll" directory.

Failed to save the file to the "mm" directory.

Failed to save the file to the "wp" directory.

403WebShell
403Webshell
Server IP : 66.29.132.124  /  Your IP : 3.17.175.167
Web Server : LiteSpeed
System : Linux business141.web-hosting.com 4.18.0-553.lve.el8.x86_64 #1 SMP Mon May 27 15:27:34 UTC 2024 x86_64
User : wavevlvu ( 1524)
PHP Version : 7.4.33
Disable Function : NONE
MySQL : OFF  |  cURL : ON  |  WGET : ON  |  Perl : ON  |  Python : ON  |  Sudo : OFF  |  Pkexec : OFF
Directory :  /lib64/nagios/plugins/nccustom/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ Back ]     

Current File : /lib64/nagios/plugins/nccustom/check_if_ips_tcp.py
#!/usr/libexec/platform-python

## Created by Andrii Ivanov
##       Namecheap

##
##  We need to monitor the state of TCP connections from the public IP interface on our servers.
##  Check status based on successful TCP probes to destination address: port.
##  

##
## Icinga Status
## CRITICAL - counter is equal to 0 successful TCP probes.
## WARNING - counter is less {--checks_count} and more then 0 successful TCP probes
## OK - counter is equal to {--checks_count} successful TCP probes
##

import argparse
import socket
import threading
import re
import os
from time import sleep, time

# Arguments parser
parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument("-a", "--dst_address", default="google.com", help="Destination address")
parser.add_argument("-p", "--dst_port", default="80", help="Destination port")
parser.add_argument("-t", "--sock_timeout", default="5", help="Socket connection timeout")
parser.add_argument("-c", "--checks_count", default="3", help="Connection attempt count")
parser.add_argument("-v", "--verbose", default=False, action="store_true", help="Extra output")
parser.add_argument("-d", "--dynamic_src_port", default=True, action="store_false", help="Dynamic src port for probes")
parser.add_argument("-l", "--include_private_ip", default=False, action="store_true", help="Include private IP")
parser.add_argument("-x", "--exclude_ips", nargs="+", help="Exclude IPs")
args = parser.parse_args()

# Variables
DST_ADDRESS = args.dst_address
DST_PORT = int(args.dst_port)
SOCK_TIMEOUT = int(args.sock_timeout)
CHECKS_COUNT = int(args.checks_count)
EXCLUDE_IPS = args.exclude_ips
PRIVATE_REGEX = re.compile('(^127\.)|(^10\.)|(^172\.1[6-9]\.)|(^172\.2[0-9]\.)|(^172\.3[0-1]\.)|(^192\.168\.)')
GLOBAL_REGEX = re.compile('(\d+)(?<!10)\.(\d+)(?<!192\.168)(?<!172\.(1[6-9]|2\d|3[0-1]))\.(\d+)\.(\d+)')
THREADS = []
MAX_THREADS = 10
REPORT = {}
CRITICAL = ""
WARNING = ""
TOTAL = ""

def get_source_ip():
    """a function for get set of local IP addresses"""
    addresses = []
    IP_LIST = os.popen('hostname -I')
    IPS = IP_LIST.read().strip().split(" ")
    for IP in IPS:
        if GLOBAL_REGEX.match(IP): 
            addresses.append(IP)
        if args.include_private_ip: 
            if PRIVATE_REGEX.match(IP): 
                addresses.append(IP)
        if args.exclude_ips:
            for E_IP in EXCLUDE_IPS:
                if re.compile("^"+E_IP).match(IP):
                    addresses.remove(IP)
    return addresses


def check_tcp_connection(SRC_IP):
    """a function for TCP check the connection and generate the report"""
    SRC_PORT = 0
    COUNTER = 0
    for _ in range(CHECKS_COUNT):
        try:
            if args.verbose: 
                start_time = time()
            client_socket = socket.socket()
            client_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
            client_socket.settimeout(SOCK_TIMEOUT)
            client_socket.bind((SRC_IP, SRC_PORT))
            if args.dynamic_src_port: 
                SRC_PORT = int(client_socket.getsockname()[1]) # minimize ports usage
            result = client_socket.connect_ex((DST_ADDRESS, DST_PORT))
            client_socket.close()
            if args.verbose:
                end_time = time()
                exec_time = end_time - start_time
            if result == 0:
                COUNTER += 1
                if args.verbose: print ("%s:%s open counter=%d time=%f" % (SRC_IP, SRC_PORT, COUNTER, exec_time))
            else:
                if args.verbose: print ("%s:%s closed counter=%d time=%f" % (SRC_IP, SRC_PORT, COUNTER, exec_time))
                pass
            sleep(0.1)
        except socket.error as exc:
            print("%s: %s. Is destination address correct?" % (SRC_IP,exc))
            pass   
    if COUNTER == CHECKS_COUNT:
        REPORT[SRC_IP] = "OK"
    elif COUNTER < CHECKS_COUNT and COUNTER > 0:
        REPORT[SRC_IP] = "WARNING"
    elif COUNTER == 0:
        REPORT[SRC_IP] = "CRITICAL"


# multithreading
for SRC_IP in get_source_ip():
    THREAD = threading.Thread(target=check_tcp_connection, args=[SRC_IP])
    THREADS.append(THREAD)
# threads start
for THREAD in THREADS:
    THREAD.start()
    while threading.active_count() > MAX_THREADS:
        sleep(0.5)
# THREAD waiting
for THREAD in THREADS:
    THREAD.join()

if args.verbose: print(" ")
if args.verbose: print(REPORT)
if args.verbose: print(" ")

# generating Icinga check result output
for key, value in REPORT.items():
    if value == "CRITICAL":
        CRITICAL = CRITICAL + key + " "
    elif value == "WARNING":
        WARNING = WARNING + key + " "

if CRITICAL:
    TOTAL = "CRITICAL - " + CRITICAL
if WARNING:
    TOTAL = TOTAL + "WARNING - " + WARNING

if TOTAL:
    print(TOTAL)
else:
    print("OK")

Youez - 2016 - github.com/yon3zu
LinuXploit