怎么在Spring Boot中使用KafkaAdminClient集群管理工具?相信很多没有经验的人对此束手无策,为此本文总结了问题出现的原因和解决方法,通过这篇文章希望你能解决这个问题。

创新互联是少有的网站设计制作、成都网站设计、营销型企业网站、小程序开发、手机APP,开发、制作、设计、卖友情链接、推广优化一站式服务网络公司,自2013年起,坚持透明化,价格低,无套路经营理念。让网页惊喜每一位访客多年来深受用户好评
原理介绍
在Kafka官网中这么描述AdminClient:The AdminClient API supports managing and inspecting topics, brokers, acls, and other Kafka objects. 具体的KafkaAdminClient包含了一下几种功能(以Kafka1.0.0版本为准):
创建Topic:createTopics(Collection
删除Topic:deleteTopics(Collection
罗列所有Topic:listTopics()
查询Topic:describeTopics(Collection
查询集群信息:describeCluster()
查询ACL信息:describeAcls(AclBindingFilter filter)
创建ACL信息:createAcls(Collection
删除ACL信息:deleteAcls(Collection
查询配置信息:describeConfigs(Collection
修改配置信息:alterConfigs(Map
修改副本的日志目录:alterReplicaLogDirs(Map
查询节点的日志目录信息:describeLogDirs(Collection
查询副本的日志目录信息:describeReplicaLogDirs(Collection
增加分区:createPartitions(Map
其内部原理是使用Kafka自定义的一套二进制协议来实现,详细可以参见Kafka协议。主要实现步骤:
客户端根据方法的调用创建相应的协议请求,比如创建Topic的createTopics方法,其内部就是发送CreateTopicRequest请求。
客户端发送请求至Kafka Broker。
Kafka Broker处理相应的请求并回执,比如与CreateTopicRequest对应的是CreateTopicResponse。
客户端接收相应的回执并进行解析处理。
和协议有关的请求和回执的类基本都在org.apache.kafka.common.requests包中,AbstractRequest和AbstractResponse是这些请求和回执类的两个基本父类。
代码如下
@Component
public class KafkaConfig{
// 配置Kafka
public Properties getProps(){
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
/* props.put("retries", 2); // 重试次数
props.put("batch.size", 16384); // 批量发送大小
props.put("buffer.memory", 33554432); // 缓存大小,根据本机内存大小配置
props.put("linger.ms", 1000); // 发送频率,满足任务一个条件发送*/
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
return props;
}
}@RestController
public class KafkaTopicManager {
@Autowired
private KafkaConfig kafkaConfig;
@GetMapping("createTopic")
public void createTopic(){
AdminClient adminClient = KafkaAdminClient.create(kafkaConfig.getProps());
NewTopic newTopic = new NewTopic("test1",4, (short) 1);
Collection newTopicList = new ArrayList<>();
newTopicList.add(newTopic);
adminClient.createTopics(newTopicList);
adminClient.close();
}
@GetMapping("deleteTopic")
public void deleteTopic(){
AdminClient adminClient = KafkaAdminClient.create(kafkaConfig.getProps());
adminClient.deleteTopics(Arrays.asList("test1"));
adminClient.close();
}
@GetMapping("listAllTopic")
public void listAllTopic(){
AdminClient adminClient = KafkaAdminClient.create(kafkaConfig.getProps());
ListTopicsResult result = adminClient.listTopics();
KafkaFuture> names = result.names();
try {
names.get().forEach((k)->{
System.out.println(k);
});
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
adminClient.close();
}
@GetMapping("getTopic")
public void getTopic(){
AdminClient adminClient = KafkaAdminClient.create(kafkaConfig.getProps());
DescribeTopicsResult describeTopics = adminClient.describeTopics(Arrays.asList("syn-test"));
Collection> values = describeTopics.values().values();
if(values.isEmpty()){
System.out.println("找不到描述信息");
}else{
for (KafkaFuture value : values) {
System.out.println(value);
}
}
adminClient.close();
}
} 看完上述内容,你们掌握怎么在Spring Boot中使用KafkaAdminClient集群管理工具的方法了吗?如果还想学到更多技能或想了解更多相关内容,欢迎关注创新互联行业资讯频道,感谢各位的阅读!