kafka监控获取指定topic的消息总量示例
技术  /  管理员 发布于 7年前   362
我就废话不多说了,直接 上代码吧!
import kafka.api.PartitionOffsetRequestInfo;import kafka.common.TopicAndPartition;import kafka.javaapi.OffsetResponse;import kafka.javaapi.PartitionMetadata;import kafka.javaapi.TopicMetadata;import kafka.javaapi.TopicMetadataRequest;import kafka.javaapi.consumer.SimpleConsumer; import java.util.*;import java.util.Map.Entry; public class KafkaOffsetTools {public final static String KAFKA_TOPIC_NAME_ADAPTER = "sample";public final static String KAFKA_TOPIC_NAME_EXCEPTION = "exception";public final static String KAFKA_TOPIC_NAME_AUDIT = "audit";private static final String rawTopicTotal = "rawTopicTotalRecordCounter";private static final String avroTopicTotal = "avroTopicTotalRecordCounter";private static final String exceptionTopicTotal = "exceptionTopicTotalRecordCounter"; public KafkaOffsetTools() {} public static long getLastOffset(SimpleConsumer consumer, String topic,int partition, long whichTime, String clientName) {TopicAndPartition topicAndPartition = new TopicAndPartition(topic,partition);Map, PartitionOffsetRequestInfo> requestInfo = new HashMap, PartitionOffsetRequestInfo>();requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1));kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(requestInfo, kafka.api.OffsetRequest.CurrentVersion(),clientName);OffsetResponse response = consumer.getOffsetsBefore(request); if (response.hasError()) {System.err.println("Error fetching data Offset Data the Broker. Reason: " + response.errorCode(topic, partition));return 0;}long[] offsets = response.offsets(topic, partition);return offsets[0];} private TreeMap, PartitionMetadata> findLeader(List a_seedBrokers, String a_topic) {TreeMap, PartitionMetadata> map = new TreeMap, PartitionMetadata>();loop:for (String seed : a_seedBrokers) {SimpleConsumer consumer = null;try {String[] hostAndPort;hostAndPort = seed.split(":");consumer = new SimpleConsumer(hostAndPort[0], Integer.valueOf(hostAndPort[1]), 100000, 64 * 1024,"leaderLookup" + new Date().getTime());List topics = Collections.singletonList(a_topic);TopicMetadataRequest req = new TopicMetadataRequest(topics);kafka.javaapi.TopicMetadataResponse resp = consumer.send(req); List metaData = resp.topicsMetadata();for (TopicMetadata item : metaData) {for (PartitionMetadata part : item.partitionsMetadata()) {map.put(part.partitionId(), part);}}} catch (Exception e) {System.out.println("Error communicating with Broker [" + seed+ "] to find Leader for [" + a_topic + ", ] Reason: " + e);} finally {if (consumer != null)consumer.close();}}return map;} public static void main(String[] args) {String kafkaBrokerList = System.getenv("metadata.broker.list");if(kafkaBrokerList == null || kafkaBrokerList.length() == 0){System.err.println("No config kafka metadata.broker.list,it is null .");//for testkafkaBrokerList = "localhost:9092,localhost:9093";System.err.println("Use this broker list for test,metadata.broker.list="+kafkaBrokerList);}//init topic,logSize = 0Map,Integer> topics = new HashMap,Integer>();topics.put(KAFKA_TOPIC_NAME_ADAPTER,0);topics.put(KAFKA_TOPIC_NAME_EXCEPTION,0);topics.put(KAFKA_TOPIC_NAME_AUDIT,0);//init kafka broker listString[] kafkaHosts;kafkaHosts = kafkaBrokerList.split(",");if (kafkaHosts == null || kafkaHosts.length == 0) {System.err.println("No config kafka metadata.broker.list,it is null .");System.exit(1);}List seeds = new ArrayList();for (int i = 0; i < kafkaHosts.length; i++) {seeds.add(kafkaHosts[i]);} KafkaOffsetTools kot = new KafkaOffsetTools(); for(String topicName : topics.keySet()){TreeMap, PartitionMetadata> metadatas = kot.findLeader(seeds, topicName);int logSize = 0;for (Entry, PartitionMetadata> entry : metadatas.entrySet()) {int partition = entry.getKey();String leadBroker = entry.getValue().leader().host();String clientName = "Client_" + topicName + "_" + partition;SimpleConsumer consumer = new SimpleConsumer(leadBroker, entry.getValue().leader().port(), 100000,64 * 1024, clientName);long readOffset = getLastOffset(consumer, topicName, partition,kafka.api.OffsetRequest.LatestTime(), clientName);logSize += readOffset;if (consumer != null) consumer.close();}topics.put(topicName,logSize);}System.out.println(topics.toString());System.out.println(rawTopicTotal+"="+topics.get(KAFKA_TOPIC_NAME_ADAPTER)+" "+System.currentTimeMillis());System.out.println(avroTopicTotal+"="+topics.get(KAFKA_TOPIC_NAME_AUDIT)+" "+System.currentTimeMillis());System.out.println(exceptionTopicTotal+"="+topics.get(KAFKA_TOPIC_NAME_EXCEPTION)+" "+System.currentTimeMillis());}}
以上这篇kafka监控获取指定topic的消息总量示例就是小编分享给大家的全部内容了,希望能给大家一个参考,也希望大家多多支持。
122 在
学历:一种延缓就业设计,生活需求下的权衡之选中评论 工作几年后,报名考研了,到现在还没认真学习备考,迷茫中。作为一名北漂互联网打工人..123 在
Clash for Windows作者删库跑路了,github已404中评论 按理说只要你在国内,所有的流量进出都在监控范围内,不管你怎么隐藏也没用,想搞你分..原梓番博客 在
在Laravel框架中使用模型Model分表最简单的方法中评论 好久好久都没看友情链接申请了,今天刚看,已经添加。..博主 在
佛跳墙vpn软件不会用?上不了网?佛跳墙vpn常见问题以及解决办法中评论 @1111老铁这个不行了,可以看看近期评论的其他文章..1111 在
佛跳墙vpn软件不会用?上不了网?佛跳墙vpn常见问题以及解决办法中评论 网站不能打开,博主百忙中能否发个APP下载链接,佛跳墙或极光..
Copyright·© 2019 侯体宗版权所有·
粤ICP备20027696号