爬虫项目:高效ChromeDriver驱动的爱站信息精准获取与利用工具
简介
工具基于chromedriver
可应对爱站的反爬机制,并且使用了多线程。
作为白帽,此工具主要帮助大家批量查询域名的权重,补天漏洞平台的权重信息重要以爱站为主,在漏洞挖掘过程中方便查询域名信息,维护网络空间安全。
开发思路
- 采用
chromedriver
模拟用户的正常行为 - 对需要获取的信息采用
xpath
进行数据处理 - 数据在爬取过程中可能也会遗漏,对获取不到数据的域名我们将重复调用一次避免遗漏
- 使用
tqdm
来显示进度条优化用户体验 - 使用多线程提高爬取速度
- 对数据进行主域名处理再去重(包括
ip
地址)优化爬取机制
开发设计
- 针对以上开发思路,我们需要设计几个方法来实现
- 用户指定
.txt
文件说明要爬取的域名,读取并创建列表存储 - 对列表中的数据进行处理(主域名提取,去ip,去重)
- 创建线程池准备进行数据爬取(线程来调用我们下面的爬取函数)
- 设计域名查询的方法,采用
chromedriver
实现,需要有两个,一个爬取不到再调用另一个 - 最后将查询到的结果写入
.csv
文件
具体框架如下:
import argparse
def file_Read(domain):
pass
def data_Processing(domain):
pass
def Multi_threading(domain):
pass
def Weight_query(domain):
pass
def Weight_query_B(domain):
pass
if __name__ == '__main__':
parser = argparse.ArgumentParser(description="网站权重查询")
parser.add_argument('-u','--url',type=str,help='查询单个域名权重!')
parser.add_argument('-f', '--file', type=str, help='批量查询域名权重!')
args = parser.parse_args()
if '-u' in sys.argv:
pass
if '-f' in sys.argv:
Weight_data = [] #创建列表存储爬取到的数据
datas = file_Read(args.file)
domains = data_Processing(datas)
domains = list(set(domains)) #去重
Multi_threading(domains) #创建线程池,域名数据爬取的方法也是在这里面调用
Data_writing(Weight_data)
开发实现
这里主要主要分析创建多线程以及调用chromedriver
进行数据爬取,其他方面比较简单,最后代码也会给到大家。
-
多线程
线程设置最多10个,chromedriver
是比较耗费资源的,使用后发现10是最合适的,当然可以根据自己去情况进行调整def Multi_threading(domains): try: with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor: # 使用tqdm来显示进度条 with tqdm(total=len(domains), desc="域名权重查询中") as progress_bar: # 提交所有任务到线程池,并收集Future对象 futures = [executor.submit(Weight_query, domain) for domain in domains] # 遍历Future对象,当每个任务完成时更新进度条 for future in concurrent.futures.as_completed(futures): progress_bar.update(1) except Exception as e: pass
-
数据爬取
这里关键点是最后的递归调用,因为多线程不方便重复调用,也怕陷入死循环,区别在最后Weight_query
查询不到调用Weight_query_B
,Weight_query_B
查询不到将查询不到的域名添加到列表,最后再返回给用户。
def Weight_query(domain):
try:
target = 'https://www.aizhan.com/cha/' + domain
service = Service(r'./chromedriver.exe', service_log_path=logFilename)
driver = webdriver.Chrome(service=service, options=chrome_options)
driver.get(target, )
WebDriverWait(driver, 10)
page_source = driver.page_source
wait = WebDriverWait(driver, 10) # 设置超时时间为30秒
driver.quit()
tree = etree.HTML(page_source)
baidu = tree.xpath('//*[@id="baidurank_br"]/img/@alt')[0]
yidong = tree.xpath('//*[@id="baidurank_mbr"]/img/@alt')[0]
three60 = tree.xpath('//*[@id="360_pr"]/img/@alt')[0]
SM = tree.xpath('//*[@id="sm_pr"]/img/@alt')[0]
sogou = tree.xpath('//*[@id="sogou_pr"]/img/@alt')[0]
google = tree.xpath('//*[@id="google_pr"]/img/@alt')[0]
ICP_ICP = tree.xpath('//*[@id="icp_icp"]')[0]
ICP_company = tree.xpath('//*[@id="icp_company"]')[0]
line_to_write = f"{domain},{ICP_ICP.text},{ICP_company.text},{baidu},{yidong},{three60},{SM},{sogou},{google}\n"
line_to_write = line_to_write.replace('\n', '')
line_to_write = line_to_write + "\n"
Query_results(line_to_write)
except Exception as e:
Weight_query_B(domain)
def Weight_query_B(domain):
try:
target = 'https://www.aizhan.com/cha/' + domain
service = Service(r'./chromedriver.exe',service_log_path=logFilename)
driver = webdriver.Chrome(service=service, options=chrome_options)
driver.get(target, )
WebDriverWait(driver, 10)
page_source = driver.page_source
wait = WebDriverWait(driver, 10) # 设置超时时间为30秒
driver.quit()
tree = etree.HTML(page_source)
baidu = tree.xpath('//*[@id="baidurank_br"]/img/@alt')[0]
yidong = tree.xpath('//*[@id="baidurank_mbr"]/img/@alt')[0]
three60 = tree.xpath('//*[@id="360_pr"]/img/@alt')[0]
SM = tree.xpath('//*[@id="sm_pr"]/img/@alt')[0]
sogou = tree.xpath('//*[@id="sogou_pr"]/img/@alt')[0]
google = tree.xpath('//*[@id="google_pr"]/img/@alt')[0]
ICP_ICP = tree.xpath('//*[@id="icp_icp"]')[0]
ICP_company = tree.xpath('//*[@id="icp_company"]')[0]
line_to_write = f"{domain},{ICP_ICP.text},{ICP_company.text},{baidu},{yidong},{three60},{SM},{sogou},{google}\n"
line_to_write = line_to_write.replace('\n', '')
line_to_write = line_to_write + "\n"
Query_results(line_to_write)
except Exception as e:
err_domains.append(domain)
-
chromedriver
参数配置
chromedriver
使用过程中会有大量日志产生,优化体验要配置一些参数。
chrome_options = webdriver.ChromeOptions()
# 设置ChromeOptions,以便在后台运行
# chrome_options = Options()
chrome_options.add_argument('--headless') # 不显示浏览器窗口
chrome_options.add_argument('--no-sandbox') # 在Linux环境下去除沙箱模式
chrome_options.add_argument('--log-level=3')
chrome_options.add_experimental_option('excludeSwitches', ['enable-logging'])
- 使用
在这里主要的实现思路和方法就实现了,其他优化点比较简单这里就不过多最少,最后也有源码给大家参考。
下面来看一下实现:
使用注意
- 工具的实现主要是基于
chromedriver
,需要本地的chrome
版本与chromedriver
版本匹配,大家查看下自己的浏览器版本,这里给大家提供ChromeDriver下载链接,放在与这些文件同一目录下即可。 - 使用前检测一下是否可以访问爱站
完整代码
希望大家给个关注^_^
将完整代码给给到大家,其中肯定还有很多可以优化的地方,欢迎各位白帽给出建议
我也将代码打包成为.exe
文件,帮大家解决使用环境问题
爱站爬取exe版
import argparse
import random
import re
import sys
import datetime
import time
from tld import get_fld
from rich import print as rprint
import requests
from lxml import etree
from datetime import datetime
import threading
import requests
from queue import Queue
from tqdm import tqdm
import concurrent.futures
import tldextract
from fake_useragent import UserAgent
from selenium import webdriver
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.common.by import By
from selenium.common.exceptions import TimeoutException
import time
import logging
chrome_options = webdriver.ChromeOptions()
# 设置ChromeOptions,以便在后台运行
# chrome_options = Options()
chrome_options.add_argument('--headless') # 不显示浏览器窗口
chrome_options.add_argument('--no-sandbox') # 在Linux环境下去除沙箱模式
chrome_options.add_argument('--log-level=3')
chrome_options.add_experimental_option('excludeSwitches', ['enable-logging'])
#格式化输出
def error(date, body):
rprint("[[bold green]" + date + "[/bold green]] [[bold red]Error[/bold red]] > " + body)
def info(date, body):
rprint("[[bold green]" + date + "[/bold green]] [[bold blue]Info[/bold blue]] > " + body)
def prompt(date, body):
rprint("[[bold green]" + date + ": " + "[[bold blue]" + body +"[/bold blue]]" )
def file_Read(path):
try:
datas = []
with open(path, 'r', encoding='utf-8') as file:
while True:
data = file.readline()
if not data: # 如果读取到空字符串,表示文件已经读完
break
else:
datas.append(data)
return datas
except Exception as e:
error(datetime.now().strftime("%Y-%m-%d %H:%M:%S"),'文件读取出错,请检查传入路径及文件类型!')
info(datetime.now().strftime("%Y-%m-%d %H:%M:%S"),'程序已退出')
sys.exit()
def data_Processing(datas):
domains_one = []
for data in tqdm(datas, desc="数据初始化"):
try:
domain_name = tldextract.extract(data)
domain_name = f"{domain_name.domain}.{domain_name.suffix}"
domains_one.append(domain_name)
except Exception as e:
continue
# print(domains_one)
ip_pattern = re.compile(
r'(?:\d{1,3}\.){3}\d{1,3}' # IPv4
r'|(?:[A-Fa-f0-9]{1,4}:){7}[A-Fa-f0-9]{1,4}' # 简化的 IPv6(不包含压缩表示)
r'|(?:[A-Fa-f0-9]{1,4}:){1,7}:?' # 包含压缩表示的 IPv6(部分匹配)
)
def contains_ip_address(s):
return bool(ip_pattern.search(s))
domains = [item for item in domains_one if not contains_ip_address(item)]
return domains
def Weight_query_B(domain):
try:
target = 'https://www.aizhan.com/cha/' + domain
service = Service(r'./chromedriver.exe',service_log_path=logFilename)
driver = webdriver.Chrome(service=service, options=chrome_options)
driver.get(target, )
WebDriverWait(driver, 10)
page_source = driver.page_source
wait = WebDriverWait(driver, 10) # 设置超时时间为30秒
driver.quit()
tree = etree.HTML(page_source)
baidu = tree.xpath('//*[@id="baidurank_br"]/img/@alt')[0]
yidong = tree.xpath('//*[@id="baidurank_mbr"]/img/@alt')[0]
three60 = tree.xpath('//*[@id="360_pr"]/img/@alt')[0]
SM = tree.xpath('//*[@id="sm_pr"]/img/@alt')[0]
sogou = tree.xpath('//*[@id="sogou_pr"]/img/@alt')[0]
google = tree.xpath('//*[@id="google_pr"]/img/@alt')[0]
ICP_ICP = tree.xpath('//*[@id="icp_icp"]')[0]
ICP_company = tree.xpath('//*[@id="icp_company"]')[0]
line_to_write = f"{domain},{ICP_ICP.text},{ICP_company.text},{baidu},{yidong},{three60},{SM},{sogou},{google}\n"
line_to_write = line_to_write.replace('\n', '')
line_to_write = line_to_write + "\n"
Query_results(line_to_write)
except Exception as e:
err_domains.append(domain)
def Weight_query(domain):
try:
target = 'https://www.aizhan.com/cha/' + domain
service = Service(r'./chromedriver.exe', service_log_path=logFilename)
driver = webdriver.Chrome(service=service, options=chrome_options)
driver.get(target, )
WebDriverWait(driver, 10)
page_source = driver.page_source
wait = WebDriverWait(driver, 10) # 设置超时时间为30秒
driver.quit()
tree = etree.HTML(page_source)
baidu = tree.xpath('//*[@id="baidurank_br"]/img/@alt')[0]
yidong = tree.xpath('//*[@id="baidurank_mbr"]/img/@alt')[0]
three60 = tree.xpath('//*[@id="360_pr"]/img/@alt')[0]
SM = tree.xpath('//*[@id="sm_pr"]/img/@alt')[0]
sogou = tree.xpath('//*[@id="sogou_pr"]/img/@alt')[0]
google = tree.xpath('//*[@id="google_pr"]/img/@alt')[0]
ICP_ICP = tree.xpath('//*[@id="icp_icp"]')[0]
ICP_company = tree.xpath('//*[@id="icp_company"]')[0]
line_to_write = f"{domain},{ICP_ICP.text},{ICP_company.text},{baidu},{yidong},{three60},{SM},{sogou},{google}\n"
line_to_write = line_to_write.replace('\n', '')
line_to_write = line_to_write + "\n"
Query_results(line_to_write)
except Exception as e:
Weight_query_B(domain)
def Query_results(result):
Weight_data.append(result)
def Multi_threading(domains):
try:
with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:
# 使用tqdm来显示进度条
with tqdm(total=len(domains), desc="域名权重查询中") as progress_bar:
# 提交所有任务到线程池,并收集Future对象
futures = [executor.submit(Weight_query, domain) for domain in domains]
# 遍历Future对象,当每个任务完成时更新进度条
for future in concurrent.futures.as_completed(futures):
progress_bar.update(1)
except Exception as e:
pass
def Data_writing(Weight_data):
try:
filename = formatted_now + '.csv'
with open(filename, 'a', encoding='utf-8') as file:
file.write("domain_name,ICP,ICP_company,Baidu_weight,yidong_weight,360_weight,SM_weight,sogou_weight,google_weight\n")
for line in tqdm(Weight_data, desc=f"正在将结果写入文件: {filename}"):
file.write(line)
info(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), f'数据写入完成,请查看文件: {filename}')
except Exception as e:
error(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), '结果写入出错了!')
def logFile():
try:
with open(logFilename, 'a', encoding='utf-8') as file:
file.write("日志文件")
except Exception as e:
sys.exit()
if __name__ == '__main__':
parser = argparse.ArgumentParser(description="网站权重查询")
parser.add_argument('-u','--url',type=str,help='查询单个域名权重!')
parser.add_argument('-f', '--file', type=str, help='批量查询域名权重!')
args = parser.parse_args()
if '-u' in sys.argv:
now = datetime.now()
formatted_now = now.strftime("%Y%m%d%H%M%S")
logFilename = formatted_now + '.log'
info(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), '欢迎使用权重查询系统!当前是3.0版本!')
print(r'''
_______ ____ ____ __
/ ____\ \ / /\ \ / / \/ |
| | \ \_/ / \ \ /\ / /| \ / |
| | \ / \ \/ \/ / | |\/| |
| |____ | | \ /\ / | | | |
\_____| |_| \/ \/ |_| |_|
''')
domain_name = args.url
try:
domain_name = tldextract.extract(domain_name)
domain_name = f"{domain_name.domain}.{domain_name.suffix}"
except Exception as e:
error(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), '域名处理出错,请检查传入的域名!')
try:
Weight_data = []
Weight_query(domain_name)
Weight_data = Weight_data[0].replace('\n', '')
Weight_data = Weight_data.split(',')
print(Weight_data)
info(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), '查询结果如下:')
prompt('域名', Weight_data[0])
prompt('ICP',Weight_data[1])
prompt('企业名称', Weight_data[2])
prompt('百度权重', Weight_data[3])
prompt('移动权重', Weight_data[4])
prompt('360权重', Weight_data[5])
prompt('神马', Weight_data[6])
prompt('搜狗', Weight_data[7])
prompt('谷歌PR', Weight_data[8])
except Exception as e:
error(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), '啊哦!好像遇到点问题··· ···')
elif '-f' in sys.argv:
print(r'''
_______ ____ ____ __
/ ____\ \ / /\ \ / / \/ |
| | \ \_/ / \ \ /\ / /| \ / |
| | \ / \ \/ \/ / | |\/| |
| |____ | | \ /\ / | | | |
\_____| |_| \/ \/ |_| |_|
''')
info(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), '欢迎使用权重查询系统!当前是3.0版本!')
err_domains = []
now = datetime.now()
formatted_now = now.strftime("%Y%m%d%H%M%S")
logFilename = formatted_now + '.log'
# results_queue = Queue()
# thread_lock = threading.Lock()
Weight_data = []
datas = file_Read(args.file)
domains = data_Processing(datas)
domains = list(set(domains))
Multi_threading(domains)
info(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), '域名: '+ str(err_domains) + '查询不到结果!')
Data_writing(Weight_data)
else:
print(r'''
_______ ____ ____ __
/ ____\ \ / /\ \ / / \/ |
| | \ \_/ / \ \ /\ / /| \ / |
| | \ / \ \/ \/ / | |\/| |
| |____ | | \ /\ / | | | |
\_____| |_| \/ \/ |_| |_|
''')
error(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), '不是这样用的! --help 看看……')
0 条评论
可输入 255 字
热门文章