sqlmap源码分析
前言:
最近突发奇想想研究一下安全工具,所以就分析了最常见但是写的特别特别好的sqlmap,因为本人属于小白,如果文章分析内容有错误,望大佬们帮忙指正!Q'w'Q
0x000 sqlmap 执行的流程图
首先贴一张sqlmap源码流程图
https://www.processon.com/view/5835511ce4b0620292bd7285
0x001 sqlmap.py 入口分析
直接先来看sqlmap.py
,前面都是导入基础的包,和一些lib
先略过,看main
函数
try:
dirtyPatches() # Place for "dirty" Python related patches
resolveCrossReferences() # Place for cross-reference resolution
checkEnvironment()
setPaths(modulePath())
banner()
这部分就是初始化sqlmap
的操作
checkEnvironment()
就是检测当前系统的环境,sqlmap
的安装目录 python
的环境等等
setPaths(modulePath())
调用moudlePath()
再传入setPaths()
中
def modulePath():
"""
This will get us the program's directory, even if we are frozen
using py2exe
"""
try:
# 尝试定义`_`变量为当前执行程序的路径或者当前文件的路径
# `WeAreFrozen()`函数使用来判断当前程序是否被封装
_ = sys.executable if weAreFrozen() else __file__
except NameError:
# 如果发生了NameError的话就用inspect模块中的getsourceFile函数来获取文件路径
_ = inspect.getsourcefile(modulePath)
# 将上面定义的路径的目录部分,转换为系统文件系统编码后返回
return getUnicode(os.path.dirname(os.path.realpath(_)), encoding=sys.getfilesystemencoding() or UNICODE_ENCODING)
'''
WeareFrozen()
Returns whether we are frozen via py2exe.
This will affect how we find out where we are located.
getUnicode()
Returns the unicode representation of the supplied value
'''
setPaths()
函数实现的功能就是Sets absolute paths for project directories and files
就是因为setPaths(modulePath())
所以我们在哪个目录下执行都不会报错
然后banner()
函数就是用来输出sqlmap
的版本信息和有意思的图案的
# Store original command line options for possible later restoration
args = cmdLineParser()
cmdLineOptions.update(args.__dict__ if hasattr(args, "__dict__") else args)
# 弱国args有一个`__dict__`属性,表示它可能是一个类的实例,那么会使用这个`__dict__`属性的内容区更新cmdLineOptions
# 如果args没有`__dict__`属性,就直接使用args本身去更新cmdLineOptions
initOptions(cmdLineOptions)
if checkPipedInput():
conf.batch = True
if conf.get("api"):
# heavy imports
from lib.utils.api import StdDbOut
from lib.utils.api import setRestAPILog
# Overwrite system standard output and standard error to write
# to an IPC database
sys.stdout = StdDbOut(conf.taskid, messagetype="stdout")
sys.stderr = StdDbOut(conf.taskid, messagetype="stderr")
setRestAPILog()
conf.showTime = True
dataToStdout("[!] legal disclaimer: %s\n\n" % LEGAL_DISCLAIMER, forceOutput=True)
dataToStdout("[*] starting @ %s\n\n" % time.strftime("%X /%Y-%m-%d/"), forceOutput=True)
init()
.....
然后就是对参数进行参数解析
所以命令行的参数配置都在lib/parse/cmdline.py
之中,我们可以看到
parser.add_argument("--hh", dest="advancedHelp", action="store_true",
help="Show advanced help message and exit")
parser.add_argument("--version", dest="showVersion", action="store_true",
help="Show program's version number and exit")
parser.add_argument("-v", dest="verbose", type=int,
help="Verbosity level: 0-6 (default %d)" % defaults.verbose)
调用help
,查看version
都是在lib/parse/cmdline.py
中定义的
然后检查args
参数,将args
参数传入到cmdLineOptions
中,再传递给initOptions
def initOptions(inputOptions=AttribDict(), overrideOptions=False):
_setConfAttributes()
_setKnowledgeBaseAttributes()
_mergeOptions(inputOptions, overrideOptions)
就是初始化环境,包括配置文件等等....
继续往下走,程序调用了checkPipedInput()
函数,用来检测用户输入是否标准,比如python sqlmap -r test.txt
,如果通过了就将conf.batch
设置为True
如果conf
字典中有关键字api
则会导入下面的包,并且覆盖要写入的系统标准输入和标准错误到IPC数据库,还调用setRestAPILog()
函数,这里先略过
然后调用了init()
函数
def init():
"""
Set attributes into both configuration and knowledge base singletons
based upon command line and configuration file options.
"""
.........
也是用来初始化一些配置信息的
if not conf.updateAll:
# Postponed imports (faster start)
if conf.smokeTest:
from lib.core.testing import smokeTest
os._exitcode = 1 - (smokeTest() or 0)
# 设置os._exitcode的值为1 - (smokeTest() or 0) 也就是smokeTest()的执行结果,成功的话为0,失败则为1,所以smokeTest()执行成功的话os._exitcode为1,反之则为0
elif conf.vulnTest:
from lib.core.testing import vulnTest
os._exitcode = 1 - (vulnTest() or 0)
else:
from lib.controller.controller import start
if conf.profile:
from lib.core.profiling import profile
globals()["start"] = start
profile()
# This will run the program and present profiling data in a nice looking graph
else:
try:
if conf.crawlDepth and conf.bulkFile:
targets = getFileItems(conf.bulkFile)
for i in xrange(len(targets)):
target = None
try:
kb.targets = OrderedSet()
target = targets[i]
if not re.search(r"(?i)\Ahttp[s]*://", target):
target = "http://%s" % target
infoMsg = "starting crawler for target URL '%s' (%d/%d)" % (target, i + 1, len(targets))
logger.info(infoMsg)
crawl(target)
except Exception as ex:
if target and not isinstance(ex, SqlmapUserQuitException):
errMsg = "problem occurred while crawling '%s' ('%s')" % (target, getSafeExString(ex))
logger.error(errMsg)
else:
raise
else:
if kb.targets:
start()
else:
start()
except Exception as ex:
os._exitcode = 1
if "can't start new thread" in getSafeExString(ex):
errMsg = "unable to start new threads. Please check OS (u)limits"
logger.critical(errMsg)
raise SystemExit
else:
raise
上文一大段代码,是sqlmap
根据conf
中的配置信息来执行不同的操作选项,最终启动主程序
简单说一下执行的smokeTest()
和vlunTest()
smokeTest()
: 检查错误的正则表达式,扫描python模块,动态导入模块等等...
vulnTest()
: 自动化执行对vulnserver
的漏洞测试,通过调用sqlmap
工具并对比预期结果来验证是否存在漏洞
mian()
函数的后半段就是争对sqlmap.py
运行时候产生的报错进行一些处理,比如
elif "_mkstemp_inner" in excMsg:
errMsg = "there has been a problem while accessing temporary files"
logger.critical(errMsg)
raise SystemExit
因为本人python代码的功底不是很夯实,主要还是看是如何进行注入,以及如何进行注入判断的
0x002 sqlmap的参数解析
重新看到cmdLineParser()
函数
首先检测是否存在argv
,如果没有则设置为系统默认的命令行参数
再调用checkSystemEncoding()
函数来检测系统的编码设置,确保后续能够正确的处理字符编码
调用os.path.basename(argv[0])
来获取脚本的文件名,再通过getUnicode()
函数来将其转换为合适的编码格式
构建命令行工具的使用说明usage
.根据当前环境的不同,将可执行文件的路径和脚本文件名组合成一个使用说明字符串
创建了参数解析器ArgumentParser
,设置了工具的使用说明为上一步构建的usage
字符串
再往下就是参数列表了(Target options
等等)
接下来我们回到init()
函数中
def init():
"""
Set attributes into both configuration and knowledge base singletons
based upon command line and configuration file options.
"""
_useWizardInterface()
setVerbosity() # 设置sqlmap输出的详细程度
_saveConfig() # 将命令行选项保存到sqlmap配置的ini文件中
_setRequestFromFile() # 从传入的文件中获取url
_cleanupOptions() # 清除配置文件
_cleanupEnvironment() # 清除环境变量
_purge() # 安全的删除sqlmap根目录下的数据
_checkDependencies() # 检查是否缺少依赖项
_createHomeDirectories()
_createTemporaryDirectory()
_basicOptionValidation()
_setProxyList()
_setTorProxySettings()
_setDNSServer()
_adjustLoggingFormatter()
_setMultipleTargets()
_listTamperingFunctions()
_setTamperingFunctions() # 设置tamper脚本
_setPreprocessFunctions()
_setPostprocessFunctions()
_setTrafficOutputFP()
_setupHTTPCollector()
_setHttpChunked()
_checkWebSocket()
parseTargetDirect()
if any((conf.url, conf.logFile, conf.bulkFile, conf.requestFile, conf.googleDork, conf.stdinPipe)):
_setHostname()
_setHTTPTimeout()
_setHTTPExtraHeaders()
_setHTTPCookies()
_setHTTPReferer()
_setHTTPHost()
_setHTTPUserAgent()
_setHTTPAuthentication()
_setHTTPHandlers()
_setDNSCache()
_setSocketPreConnect()
_setSafeVisit()
_doSearch()
_setStdinPipeTargets()
_setBulkMultipleTargets()
_checkTor()
_setCrawler()
_findPageForms()
_setDBMS()
_setTechnique()
_setThreads()
_setOS()
_setWriteFile()
_setMetasploit()
_setDBMSAuthentication()
loadBoundaries()
loadPayloads()
_setPrefixSuffix()
update()
_loadQueries()
这边我们先看_setRequestFromFile()
方法,先来理解,如果我们通过-r
传递一个文本文件,sqlmap
是如何理解并转换成参数的呢
if conf.requestFile:
for requestFile in re.split(PARAMETER_SPLITTING_REGEX, conf.requestFile):
requestFile = safeExpandUser(requestFile)
url = None
seen = set()
if not checkFile(requestFile, False):
errMsg = "specified HTTP request file '%s' " % requestFile
errMsg += "does not exist"
raise SqlmapFilePathException(errMsg)
先调用了re.split()
方法对我们传入的conf.requestFile
变量进行拆分,查询setting.py
我们可以找到PARAMETER_SPLITTING_REGEX = r"[,|;]"
然后再调用safeExpandUser()
函数,确保路径的稳定性和可靠性
在调用checkFile()
函数确保文件的存在和可读.否则就报错specified HTTP request file
&&does not exist
当文件确认没有问题的时候,就开始获取目标的URL
for target in parseRequestFile(requestFile):
url = target[0]
if url not in seen:
kb.targets.add(target)
if len(kb.targets) > 1:
conf.multipleTargets = True
seen.add(url)
跟进parseRequestFile()
函数
def parseRequestFile(reqFile, checkParams=True):
"""
Parses WebScarab and Burp logs and adds results to the target URL list
>>> handle, reqFile = tempfile.mkstemp(suffix=".req")
>>> content = b"POST / HTTP/1.0\\nUser-agent: foobar\\nHost: www.example.com\\n\\nid=1\\n"
>>> _ = os.write(handle, content)
>>> os.close(handle)
>>> next(parseRequestFile(reqFile)) == ('http://www.example.com:80/', 'POST', 'id=1', None, (('User-agent', 'foobar'), ('Host', 'www.example.com')))
True
"""
这里并没有给出这个函数的实现方法,但是把返回值展示了出来,如果我们传入一个WebScarab
或者burp
日志,通过这个函数会返回一个数组,其中target[0]
就是URL
回到_setRequestFromFile()
函数,如果url
不在seen
中,就会自动把url
加入到seen
中
下面就是如果有第二个请求接下来的处理,我们先略过
经过_setRequestFromFile()
函数后,如果我们传入的req
文件没有问题的话,此时seen
中就有了url
属性
然后再经过init
后半段代码,将这些属性初始化
0x003 SQLMAP如何检测是否存在SQL注入
我们直接看到lib.controller.controller
中的strat()
方法
因为start()
代码较长,所以我们这里就调重要的代码进行分析
parseTargetUrl()
testSqlInj = False
if PLACE.GET in conf.parameters and not any((conf.data, conf.testParameter)):
for parameter in re.findall(r"([^=]+)=([^%s]+%s?|\Z)" % (re.escape(conf.paramDel or "") or DEFAULT_GET_POST_DELIMITER, re.escape(conf.paramDel or "") or DEFAULT_GET_POST_DELIMITER), conf.parameters[PLACE.GET]):
paramKey = (conf.hostname, conf.path, PLACE.GET, parameter[0])
if paramKey not in kb.testedParams:
testSqlInj = True
break
else:
paramKey = (conf.hostname, conf.path, None, None)
if paramKey not in kb.testedParams:
testSqlInj = True
if testSqlInj and conf.hostname in kb.vulnHosts:
if kb.skipVulnHost is None:
message = "SQL injection vulnerability has already been detected "
message += "against '%s'. Do you want to skip " % conf.hostname
message += "further tests involving it? [Y/n]"
kb.skipVulnHost = readInput(message, default='Y', boolean=True)
testSqlInj = not kb.skipVulnHost
if not testSqlInj:
infoMsg = "skipping '%s'" % targetUrl
logger.info(infoMsg)
continue
首先调用了parseTargetUrl()
函数进行了解析
def parseTargetUrl():
"""
Parse target URL and set some attributes into the configuration singleton
>>> pushValue(conf.url)
>>> conf.url = "https://www.test.com/?id=1"
>>> parseTargetUrl()
>>> conf.hostname
'www.test.com'
>>> conf.scheme
'https'
>>> conf.url = popValue()
"""
就是简单解析了一下url
区分是http
还是https
等等
testSqlInj
用来表示是否需要测试SQL注入漏洞,初始值为False
然后判断是否为GET
请求,且没有conf.data
和conf.testParameter
两个参数
如果上述条件满足就可以对每个GET
请求参数进行分析,判断是否这些参数是否在testedParameter
参数中,如果不在,则将testSqlInj
设置为True
如果需要测试的主机在已经知道存在漏洞的主机列表kb.vulnHosts
中,则进一步处理,提示用户是否需要跳过对这个主机的测试
这里正则的逻辑就是先匹配一段非=的任意长度的字符
然后用=
号分割键和值,再对值进行匹配
if testSqlInj:
try:
if place == PLACE.COOKIE:
pushValue(kb.mergeCookies)
kb.mergeCookies = False
check = heuristicCheckSqlInjection(place, parameter)
if check != HEURISTIC_TEST.POSITIVE:
if conf.smart or (kb.ignoreCasted and check == HEURISTIC_TEST.CASTED):
infoMsg = "skipping %sparameter '%s'" % ("%s " % paramType if paramType != parameter else "", parameter)
logger.info(infoMsg)
continue
infoMsg = "testing for SQL injection on %sparameter '%s'" % ("%s " % paramType if paramType != parameter else "", parameter)
logger.info(infoMsg)
# 真正开始sql注入的地方
injection = checkSqlInjection(place, parameter, value)
proceed = not kb.endDetection
injectable = False
if getattr(injection, "place", None) is not None:
if NOTE.FALSE_POSITIVE_OR_UNEXPLOITABLE in injection.notes:
kb.falsePositives.append(injection)
else:
injectable = True
kb.injections.append(injection)
if not kb.alerted:
if conf.alert:
infoMsg = "executing alerting shell command(s) ('%s')" % conf.alert
logger.info(infoMsg)
try:
process = subprocess.Popen(conf.alert, shell=True)
process.wait()
except Exception as ex:
errMsg = "error occurred while executing '%s' ('%s')" % (conf.alert, getSafeExString(ex))
logger.error(errMsg)
kb.alerted = True
# In case when user wants to end detection phase (Ctrl+C)
if not proceed:
break
msg = "%sparameter '%s' " % ("%s " % injection.place if injection.place != injection.parameter else "", injection.parameter)
msg += "is vulnerable. Do you want to keep testing the others (if any)? [y/N] "
if not readInput(msg, default='N', boolean=True):
proceed = False
paramKey = (conf.hostname, conf.path, None, None)
kb.testedParams.add(paramKey)
if not injectable:
warnMsg = "%sparameter '%s' does not seem to be injectable" % ("%s " % paramType if paramType != parameter else "", parameter)
logger.warning(warnMsg)
finally:
if place == PLACE.COOKIE:
kb.mergeCookies = popValue()
这段代码就是真正进行sql注入
的地方
首先启发性检测sql注入
,通过heuristicCheckSqlInjection()
这个函数
如果启发性测试成功了的话,check
就会等于HEURISTIC_TEST.POSITIVE
这个参数,不成功的话就跳过
我们来看看heuristicCheckSqlInjection()
函数也就是启发性sql注入
的代码
def heuristicCheckSqlInjection(place, parameter):
if conf.skipHeuristics:
return None
# 获取参数
origValue = conf.paramDict[place][parameter]
paramType = conf.method if conf.method not in (None, HTTPMETHOD.GET, HTTPMETHOD.POST) else place
prefix = ""
suffix = ""
randStr = ""
# 由conf的值决定prefix和suffix
if conf.prefix or conf.suffix:
if conf.prefix:
prefix = conf.prefix
if conf.suffix:
suffix = conf.suffix
# rendStr 生成的启发式payload 单引号或者双引号的个数不等于1
while randStr.count('\'') != 1 or randStr.count('\"') != 1:
"""
返回具有给定字符数的随机字符串值
长度10 HEURISTIC_CHECK_ALPHABET
可以在lib/core/settings.py中找到HEURISTIC_CHECK_ALPHABET的配置
HEURISTIC_CHECK_ALPHABET = ('"', '\'', ')', '(', ',', '.')
随机生成长度为10的 且生成的内容的基础是HEURISTIC_CHECK_ALPHABET
"""
randStr = randomStr(length=10, alphabet=HEURISTIC_CHECK_ALPHABET)
# 启发模式设置为True
kb.heuristicMode = True
# 将生成的随机字符串拼接进入payload,如果我们conf的属性中prefix和suffix为空的话,这里直接就是randStr了
payload = "%s%s%s" % (prefix, randStr, suffix)
# 将请求类型 place 请求参数 parameter 和生成的payload也就是randStr传入agent.payload中
# 也就是?id=1 插入payload分隔符 1 后面再加上随机字符串,然后再加上payload分隔符
payload = agent.payload(place, parameter, newValue=payload)
page, _, _ = Request.queryPage(payload, place, content=True, raise404=False)
kb.heuristicPage = page
kb.heuristicMode = False
# 通过返回的页面获取可能的路径,也是正则匹配
parseFilePaths(page)
# 通过页面回显查询是否有可识别的数据库报错,如果匹配到了返回True反之则为False
result = wasLastResponseDBMSError()
infoMsg = "heuristic (basic) test shows that %sparameter '%s' might " % ("%s " % paramType if paramType != parameter else "", parameter)
def _(page):
# 启发式的请求的返回有异常字符串
# FORMAT_EXCEPTION_STRINGS同样在setting.py中
# FORMAT_EXCEPTION_STRINGS = ("Type mismatch", "Error converting", "Please enter a", "Conversion failed", "String or binary data would be truncated", "Failed to convert", "unable to interpret text value", "Input string was not in a correct format", "System.FormatException", "java.lang.NumberFormatException", "ValueError: invalid literal", "TypeMismatchException", "CF_SQL_INTEGER", "CF_SQL_NUMERIC", " for CFSQLTYPE ", "cfqueryparam cfsqltype", "InvalidParamTypeException", "Invalid parameter type", "Attribute validation error for tag", "is not of type numeric", "<cfif Not IsNumeric(", "invalid input syntax for integer", "invalid input syntax for type", "invalid number", "character to number conversion error", "unable to interpret text value", "String was not recognized as a valid", "Convert.ToInt", "cannot be converted to a ", "InvalidDataException", "Arguments are of the wrong type")
return any(_ in (page or "") for _ in FORMAT_EXCEPTION_STRINGS)
casting = _(page) and not _(kb.originalPage) # 如果页面回显有异常字符串有True,反之为False
# 这里是启发式判断是否有 数据库报错获取数据库类型 代码执行的错误
if not casting and not result and kb.dynamicParameter and origValue.isdigit() and not kb.heavilyDynamic:
# 上面请求的回显没有报错的话就执行下面的代码
randInt = int(randomInt())
payload = "%s%s%s" % (prefix, "%d-%d" % (int(origValue) + randInt, randInt), suffix)
payload = agent.payload(place, parameter, newValue=payload, where=PAYLOAD.WHERE.REPLACE)
result = Request.queryPage(payload, place, raise404=False) # 比较页面内容
if not result:
# 如果还是不行就随机生成字符串再尝试一遍
randStr = randomStr()
payload = "%s%s%s" % (prefix, "%s.%d%s" % (origValue, random.randint(1, 9), randStr), suffix)
payload = agent.payload(place, parameter, newValue=payload, where=PAYLOAD.WHERE.REPLACE)
casting = Request.queryPage(payload, place, raise404=False)
# 根据上面的返回来查看是否匹配了一些异常 也就是setting中定义的值和页面的内容来判断是否存在出入
kb.heuristicTest = HEURISTIC_TEST.CASTED if casting else HEURISTIC_TEST.NEGATIVE if not result else HEURISTIC_TEST.POSITIVE
if kb.heavilyDynamic:
debugMsg = "heuristic check stopped because of heavy dynamicity"
logger.debug(debugMsg)
return kb.heuristicTest
# 报错信息
if casting:
errMsg = "possible %s casting detected (e.g. '" % ("integer" if origValue.isdigit() else "type")
platform = conf.url.split('.')[-1].lower()
if platform == WEB_PLATFORM.ASP:
errMsg += "%s=CInt(request.querystring(\"%s\"))" % (parameter, parameter)
elif platform == WEB_PLATFORM.ASPX:
errMsg += "int.TryParse(Request.QueryString[\"%s\"], out %s)" % (parameter, parameter)
elif platform == WEB_PLATFORM.JSP:
errMsg += "%s=Integer.parseInt(request.getParameter(\"%s\"))" % (parameter, parameter)
else:
errMsg += "$%s=intval($_REQUEST[\"%s\"])" % (parameter, parameter)
errMsg += "') at the back-end web application"
logger.error(errMsg)
if kb.ignoreCasted is None:
message = "do you want to skip those kind of cases (and save scanning time)? %s " % ("[Y/n]" if conf.multipleTargets else "[y/N]")
kb.ignoreCasted = readInput(message, default='Y' if conf.multipleTargets else 'N', boolean=True)
elif result:
infoMsg += "be injectable"
if Backend.getErrorParsedDBMSes():
infoMsg += " (possible DBMS: '%s')" % Format.getErrorParsedDBMSes()
logger.info(infoMsg)
else:
infoMsg += "not be injectable"
logger.warning(infoMsg)
kb.heuristicMode = True
# 禁用HTML编码
kb.disableHtmlDecoding = True
# 随机生成两个payload,长度为6
randStr1, randStr2 = randomStr(NON_SQLI_CHECK_PREFIX_SUFFIX_LENGTH), randomStr(NON_SQLI_CHECK_PREFIX_SUFFIX_LENGTH)
value = "%s%s%s" % (randStr1, DUMMY_NON_SQLI_CHECK_APPENDIX, randStr2)
payload = "%s%s%s" % (prefix, "'%s" % value, suffix)
payload = agent.payload(place, parameter, newValue=payload)
page, _, _ = Request.queryPage(payload, place, content=True, raise404=False)
# 传入探测是否存在sql注入
paramType = conf.method if conf.method not in (None, HTTPMETHOD.GET, HTTPMETHOD.POST) else place
# Reference: https://bugs.python.org/issue18183
if value.upper() in (page or "").upper():
infoMsg = "heuristic (XSS) test shows that %sparameter '%s' might be vulnerable to cross-site scripting (XSS) attacks" % ("%s " % paramType if paramType != parameter else "", parameter)
logger.info(infoMsg)
if conf.beep:
beep()
for match in re.finditer(FI_ERROR_REGEX, page or ""):
if randStr1.lower() in match.group(0).lower():
infoMsg = "heuristic (FI) test shows that %sparameter '%s' might be vulnerable to file inclusion (FI) attacks" % ("%s " % paramType if paramType != parameter else "", parameter)
logger.info(infoMsg)
if conf.beep:
beep()
break
kb.disableHtmlDecoding = False
kb.heuristicMode = False
return kb.heuristicTest
接下来的检测sql注入的主要函数我们后续再详细分析
如果文章中内容有误,希望大佬们帮忙指正!