脚本小子修养之ESD源码分析(二)
xq17 安全工具 4334浏览 · 2021-07-22 15:39

脚本小子修养之ESD源码分析(二)

章节:

脚本小子修养之开发分布式扫描工具(一)

0x0 前言

​ 关于Celery实现分布式的Online目录扫描已经初步实现了,在使用的过程中,我明白了一个道理:理想很美好,现实很骨感,要想快速挖到洞,神器需在手,开发知识少不了! 不过我也试着安慰自己,说一款工具肯定要在实践中不断迭代才能变得更称手,自己虽然写代码能力很弱鸡,但是多写写多磨磨应该也是可以进步的。

0x1 背景

​ 自己开发的时候非常依赖requests库,为了提高执行速度,会通过使用多线程、协程的方式来进行加速,但是就速度而言,感觉还是差那么一丢丢。后来,知道Python3.4内置了asyncio,异步的编程方式进入我的视线,但是碍于自己弱鸡的学习能力和异步的编程风格不同于多线程,故没去尝试实践,现在因为挖洞需要只能硬着头皮去实践了,后发现自己写的不伦不类,很多东西没考虑进去,如下:

程序是怎么组织的,如何控制具体的速率、如何显示进度、如何处理常见异常,如何做一些结果的实时保存等。

所谓他山之玉可以攻石,esd作为一款优秀的异步子域名枚举工具其优秀的性能,惊艳了我,让我迫不急待学习一番。

0x2 源码结构

1.0版本的源码地址:https://github.com/FeeiCN/ESD/archive/refs/tags/v1.0.zip

:tree -L 2

├── ESD.py # 核心代码
├── LICENSE
├── README.md #介绍
├── data # 存放扫描结果
├── requirements.txt #依赖
├── servers.esd #DNS服务器IP
├── subs.esd # 子域名字典规则
├── targets.txt
└── tests # 测试目录
    └── test_esd.py #测试脚本

可以看到源码结构是较为简单的,而且核心代码仅200多行,很难想到这是一款强大的工具,但恰恰因此,其学习价值非常高。

develop版源码:

git clone https://github.com/FeeiCN/ESD.git
cd ESD
├── ESD
│   ├── __init__.py # 核心代码
│   ├── cacert.pem # https证书
│   ├── key.ini # 第三方API KEY
│   ├── subs-test.esd # 测试子域名列表
│   └── subs.esd # 子域名字典
├── LICENSE
├── README.md
├── setup.py # 安装脚本
└── tests # 测试目录
    └── test_esd.py # 测试脚本

ESD作为Python3包的结构:

├── __init__.py # main函数所在
├── argparser.py # 解析参数
├── banner.py # 打印banner
├── cacert.pem
├── engine.py # 入口run函数
├── key.ini 
├── lib # 功能函数 
│   ├── __init__.py
│   ├── __pycache__
│   ├── basePackage
│   ├── caInfo.py
│   ├── dnsPod.py
│   ├── dnsQuery.py
│   ├── dnsSec.py
│   ├── dnsTransfer.py
│   ├── logger.py
│   ├── searchEngine.py
│   └── thirdpartApi.py
├── subs-test.esd
└── subs.esd # 字典文件

相比于1.0版本,开发版增加了许多功能,代码量也达到了上K行。

0x3 分析1.0版本源码

​ 采取的分析方式,还是以一个完整执行流程作为例子。

0x3.1 测试脚本

可以看到首先从ESD引入EnumSubdomain函数作为所有的入口,很简洁。

然后分别调用load_sub_domain_dictgenerate_general_dicts函数进行默认字典加载和规则字典加载。

0x3.2 入口分析

关于入口,可以参考工具的文档,命令行的调用:

# 安装依赖
pip install -r requirements.txt

# 扫描单个域名
python ESD.py qq.com

# 扫描多个域名(英文逗号分隔)
python ESD.py qq.com,tencent.com

# 扫描文件(文件中每行一个域名)
python ESD.py targets.txt

比较简单,支持3种模式,单域名、多域名、文件,其代码实现:

使用sys.argv来做参数的提取,不是特别优雅但简单,提取域名有正则检验,r'^(([a-z0-9]+(-[a-z0-9]+)*\.)+[a-z]{2,})$',然后做了判断,符合则添加到domains否则输出错误,这部分其实蛮多余。

DNS规定,域名中的标号都由英文字母和数字组成,每一个标号不超过63个字符,也不区分大小写字母。标号中除连字符(-)外不能使用其他的标点符号。级别最低的域名写在最左边,而级别最高的域名写在最右边。由多个标号组成的完整域名总共不超过255个字符。

[a-zA-Z0-9][-a-zA-Z0-9]{0,62}(\.[a-zA-Z0-9][-a-zA-Z0-9]{0,62})+

解析完脚本参数之后,进行for循环,依次传入域名到EnumSudDomain类中,然后调用run()执行枚举。

0x3.3 核心代码

核心类EnumSubDomain,先分析初始化构造函数:

获取脚本的绝对路径,获取目标域名,解析servers.esd获取dns查询需要的dns服务器,然后加载进了aiodns的DNSResolver函数,定义了self.general_dicts存放字典列表,完成了类的初始化功能。

下面分析核心类定义的功能函数:

(1) 异步dns查询函数:query

第一步先拼接二级子域名sub,然后调用self.solver.query进行DNS的A记录查询,利用await关键字等待异步查询返回的结果,然后取出所有查询的host,放入self.process进行处理,如果dns解析失败,会获取失败的code,捕捉不常见的异常,最终返回subret二元组

(2)存储结果函数: process

这个函数主要是将dns查询得到的IP存入到以查询子域名为键值,排序解析得到的IP组作为键值的self.data字典中,并记录相关日志信息。

(3)加载字典函数:

通过with上下文管理打开绝对路径的文件,有利用资源的利用和释放,然后for循环读取文件内容,通过if判断 跳过#注释的内容,如果匹配到行中存在{letter}或者{number}的标识符存在,就会传入到self.generate_general_dicts函数,进行规则的生成,然后合并全部结果用set进行去重,转化list返回结果。

大量去重的话,笔者推荐list(dict.fromkeys([1,2,1,2])),这样速度会更快。

(4)根据规则生成字典函数:generate_gengeral_dicts

可以看到先统计了字母和数字标识符的数目,然后用itertools.product生成相应长度的全排列字母和数字的组合,然后通过for循环一次替换相应长度的标识符,最终添加到self.general_dicts

这种方式只能处理连续的标识符,对于{letter}{number}{letter}则没办法支持解析。

(5)任务入口函数:run

可以看到先记录开始执行时间start_time,然后查询一个不存在子域名enumsubdomain-feei如果返回解析IP说明存在泛解析,程序终止运行。然后接着就是加载字典,封装异步query任务列表tasks,传入给self.start函数进行处理,然后调用self.loop.run_until_complete等待异步请求执行完成。接着就是定义一个时间戳错作为名称,一个域名作为名称的路径,然后遍历self.data的内容()(通过调用self.query->self.process->self.data)写入到这两个路径中,最后输出日志信息和执行时间。

这个异步并发有一个好处就是基本不用锁来控制结果列表的写入,因为程序还是顺序执行的,依序访问资源。

(6)处理子域名query任务列表:start函数

调用了limited_concurrency_coroutines处理tasks列表,然后for遍历返回的函数,await等待执行完成。

(7)控制并发请求数量:limited_concurrency_coroutines

通过itertoolsislice函数默认分割10000个任务,存入asyncio.ensure_future(c)包装为futures列表,然后写了个while循环判断,列表是否大于0,大于0,则yield调用first_to_finish函数,通过循环任务,判断f.done任务执行完毕,则remove掉任务列表中相应的函数,然后取next,添加新任务到到任务列表,这样可以保持任务的数量一直<=limit=10000,其中可以看到对迭代的异常进行捕捉,防止遍历到0时next报错。

这个函数控制任务数量的方法并不是很好,代码逻辑不优雅,也不直观。

0x3.4 小结

​ 这个程序执行流程就是加载目标->初始化EnumSubDomain类->同步函数加载字典->封装dns查询的任务列表->控制并发数量->执行->等待程序全部执行成功->输出self.data的结果data目录。

0x4 分析Develop版本源码

Develop的改动相比于1.0来说是比较大的,主要方面体现在功能的丰富度和泛解析的处理上。

0x4.1 执行流程

运行程序,查看输出,可以看到先确定目标->生成62w子域名->检测DNS服务器状态->判断是否存在泛解析->执行扫描->动态输出进度条

核心的DNS域名爆破扫描完成之后,启用一些辅助模块:CA证书查询、DNS域传输漏洞、搜索引擎,但是效果并不是很好。

0x4.2 源码分析

入口从main函数开始,使用optparse库来处理复杂的传参

然后接着就是一些常规化的赋值处理(代理设置,第三方引擎选用、是否开启debug模式等...)

其中在检验域名的时候还加了tldextract.extrace(p).suffix判断后缀是否为空,精细化了点。

最后就是遍历目标域名,依次初始化EnumSubDomain类对象,执行run(),这里可以注意传入的参数,包括了域名、返回内容过滤、dns服务器IP列表、第三方key等信息。

跟入EnumSubDomain__init__初始化函数:

这里有几个可以关注的点,就是dns选取的时候,作者对其做了一些性能分类,比如AliDNS223.5.5.5就挺稳定的,然后就是创建异步事件循环,其中在代码开头设置了使用uvloop做代理,这进一步提升了异步的性能,而且使用非常简单,只需要在脚本的全局处使用如下一行代码:

asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())

接下来跟入程序真正的核心功能入口:run

先正常加载字典,其中load_sub_domain_dict函数做了些优化,适配了其他字典格式,

继续回到run函数,接着尝试检查dns服务器是否可用,

查询dns可不可用,其中self.check主要是构造socket请求,检查是否超时,如果dns可用,就会设置为DNS查询的服务器,去查询一个存在的目标子域名,返回解析的结果存起来,继续循环下一个dns服务器,比较不同的解析结果。

如果其中不同的DNS服务器返回不同的解析结果,说明目标域名做了泛解析和CDN,esd不支持对这个爆破,会直接跳过,后面直接进行rsc检查。

如果查询不到不存在的域名的解析结果,说明目标域名是非泛解析的,否则,接下来会进行DNS+RSC的爆破模式。

ESD会去尝试获取两个不存在且不相同的域名的HTML返回内容,如果没办法建立连接,则设置self.skip_rsc=True来跳过rsc过滤泛解析,因为rsc需要建立连接获取内容来做比较,没有连接就没有内容可以比较了。

然后进入self.start函数,开始执行子域名爆破,尝试跟进。

async def start(self, tasks, tasks_num):
        """
        Limit the number of coroutines for reduce memory footprint
        :param tasks:
        :return:
        """
        if self.process_bar:
            for res in tqdm(self.limited_concurrency_coroutines(tasks, self.coroutine_count), bar_format="%s{l_bar}%s{bar}%s{r_bar}%s" % (Fore.YELLOW, Fore.YELLOW, Fore.YELLOW, Fore.RESET), total=tasks_num):
                await res
        else:
            for res in self.limited_concurrency_coroutines(tasks, self.coroutine_count):
                await res

和之前一样,采用限制limited_concurrency_coroutines并发任务数来减少内存占用,这里比较好的一点是支持进度条了,通过采用tqdm处理异步任务列表,await等待执行完成,来更新进度条,默认100000

通过使用:

mytime python3 my_test.py

可以看到峰值的时候,内存占用也才30M左右,这个要是可以动态控制就好了(先获取可用内存的大小,然后动态分配资源来达到,但是完全动态挺难实现的,隔一段时间修改或许可以)

核心功能函数是DNS查询:query

其中代码有些许冗余,判断解析出的子域名IP是否为不存在的域名得到的泛解析IP为子集就行了,没必要再去做==判断,如果目标域名不存在泛解析,并且fuzz的子域名不在用户定义的子域名范围,则直接保存结果到self.data

然后接下来就是调用搜索引擎、证书透明度获取到域名等,最后汇总在一起,来做RSC。

如果当前目标域名存在泛解析且没有设置self.skip_rsc,则会进入RSC比对。

if self.is_wildcard_domain and not self.skip_rsc:
        # Response similarity comparison
        total_subs = set(subs + list(subdomains) + transfer_info + ca_subdomains)
        self.wildcard_subs = list(set(subs).union(total_subs))
        logger.info('Enumerates {len} sub domains by DNS mode in {tcd}.'.format(len=len(self.data), tcd=str(datetime.timedelta(seconds=time_consume_dns))))
        logger.info('Will continue to test the distinct({len_subs}-{len_exist})={len_remain} domains used by RSC, the speed will be affected.'.format(len_subs=len(subs), len_exist=len(self.data),
                                                                                                                                                      len_remain=len(self.wildcard_subs)))
        self.coroutine_count = self.coroutine_count_request
        self.remainder = len(self.wildcard_subs)
        tasks = (self.similarity(sub) for sub in self.wildcard_subs)
        self.loop.run_until_complete(self.start(tasks, len(self.wildcard_subs)))

        # Distinct last domains use RSC
        # Maybe misinformation
        # self.distinct()

        time_consume_request = int(time.time() - dns_time)
        logger.info('Requests time consume {tcr}'.format(tcr=str(datetime.timedelta(seconds=time_consume_request))))
    # RS(redirect/response) domains

这里面total_subsself.wildcard_subs应该是相同的,集合是自动去重的,获取了前面多个途径拿到的子域名,然后设置self.coroutine_count_request = 100控制RSC的请求频率,然后传入self.similarity比较相似度,传入self.start做频率控制(前面分析过)

下面我们主要看看RSC是怎么进行过滤的。

先简单处理然后拼接成完整的域名例如,如: sub='admin' -> admin.qq.com。

然后判断这个完整域名是否出现过在之前回显的内容中,如果在,则去掉。(这一步其实可以省略

然后就是拼接出full_domain用于发起http请求,定义2个情况,302跳到主域名或者自身的就跳过。

接下来就是发出请求,判断是否为跳转,是的话,则获取跳转地址,判断跳转了多少次,同时将新出现的子域名分别添加到self.domains_rsself.domains_rs_processed列表。

接下来则是,如果域名没有HTTP服务,则直接return,跳过该域名,不处理,有的话,则提取返回的response的HTML中出现的域名,分别添加到self.domains_rsself.domains_rs_processed列。

下面是直接比较返回的html的长度是否与之前传入的一个不存在的子域名得到的html长度相等,相等的话,则说明他们是一样的。(这个其实算一点优化,因为长度相等,说明他们确实蛮相似的)

如果不等于,则判断sub为二级还是三级,分别与随机不存在的二级域名、三级域名返回的html做相似度比较,如果相似度大于>0.8,则说明他们是相似的,直接跳过,否则则说明他们确实不同,会再与用户定义的过滤内容做比较,如果返回的HTML内容不存在用户定义的需要过滤的多个关键字,则说明该子域是一个有效的子域,写入到self.data字典,用于最终结果的输出。

其实ESD做的事情就是,目标->判断是否为泛解析然后分两种情况

(1)非泛解析,直接进行DNS爆破

(2)是泛解析,判断不同DNS返回的IP是否相同->相同则DNS爆破,过滤掉解析为泛解析的IP的域名。

如果返回的IP不同,则直接跳过DNS爆破流程,后面直接通过RSC来过滤出泛解析域名。

0x5 个人拙见

​ ESD V1.0版本,功能单一只DNS爆破子域名,不支持泛解析爆破,稍显美中不足。ESD Develop则支持了多个途径去收集子域名,但我觉得真的没必要,核心只要做好 DNS爆破+RSC就可以了,完全没必要花费时间和精力去做其他的收集途径,所以我都没去看那部分的代码(因为这个内容很容易被其他工具来替代,并且会比自己写的更好)。其中的DNS爆破功能,ESD简单地使用aiodns来完成异步查询,简化了开发流程,异步的使用其实并不难,通过使用asyncio.ensure_future分装好异步任务到一个列表,遍历等待执行结果,执行则是通过self.loop.run_until_complete来启动异步任务,不过ESD控制了并发时的内存占用,但这个取值是写死的,高性能的机器或许就没办法充分利用性能。ESD同时支持进度条展示,这是挺好的,人性化设计。

而关于RSC部分的实现,正如ESD作者所说:https://github.com/FeeiCN/ESD/wiki

RSC的局限性还是挺大的,换句话而言就是要发送几十万的http请求,还要做相似度的比较,而且比较的结果也只能处理比较明显的泛解析类型,这个性价比其实并不高。

关于作者提到的DNS服务器优化方面,从代码上和wiki看,只能说没有很好解决这几个问题,只能说是做了一些简单的判断和优化,但是呈现的效果并没有让人很惊喜。

​ 目前而言,子域名爆破重难点就是如何解决泛解析问题、如何解决第三方API收集的出现子域名是店铺、或者类似QQ空间那种冗余度高的子域名。

单独设置的子域名可以覆盖泛域名解析

我个人觉得 http://sh3ll.me/archives/201704041222.txt 提到的利用TTL值可以有效过滤一部分泛解析情况,而且性能客观,可纳入考虑范围。

而关于冗余度高的那种域名,我初步想法判断子域名中数字的连续长度>=4,统计出现的比例,如果占据了全部结果的>=40%,那么只随机保留5%或者3%这类型的子域名,,这个具体的数字可以多实践微调。

0x6 总结

​ 本文先阐述了学习的动机和背景,然后从源码结构作为切入点,开始分析两个版本的源码的执行流程,以便更好理解ESD功能实现的逻辑,最后给出了一些自己的见解。不得不感叹,子域名爆破技术都已经发展好久了,一直以来,子域名的有效性并没有纳入重点考虑,而是更测重于速度的提升、字典的优化,以便获取到更多的结果,至于结果直接丢进扫描器慢慢挂着来跑就是了。

0 条评论
某人
表情
可输入 255