package org.apache.storm.kafka.monitor;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import kafka.shaded.api.OffsetRequest;
import kafka.shaded.api.PartitionOffsetRequestInfo;
import kafka.shaded.common.TopicAndPartition;
import kafka.shaded.javaapi.OffsetResponse;
import kafka.shaded.javaapi.consumer.SimpleConsumer;
import org.apache.commons.cli.shaded.CommandLine;
import org.apache.commons.cli.shaded.DefaultParser;
import org.apache.commons.cli.shaded.HelpFormatter;
import org.apache.commons.cli.shaded.Options;
import org.apache.curator.shaded.framework.CuratorFramework;
import org.apache.curator.shaded.framework.CuratorFrameworkFactory;
import org.apache.curator.shaded.retry.RetryOneTime;
import org.apache.curator.shaded.utils.ZKPaths;
import org.apache.kafka.shaded.clients.CommonClientConfigs;
import org.apache.kafka.shaded.clients.consumer.ConsumerConfig;
import org.apache.kafka.shaded.clients.consumer.KafkaConsumer;
import org.apache.kafka.shaded.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.shaded.common.PartitionInfo;
import org.apache.kafka.shaded.common.TopicPartition;
import org.apache.kafka.shaded.common.security.auth.KafkaPrincipal;
import org.apache.log4j.Priority;
import org.jboss.netty.handler.codec.rtsp.RtspHeaders;
import org.json.simple.shaded.JSONArray;
import org.json.simple.shaded.JSONValue;

/* loaded from: input_file:org/apache/storm/kafka/monitor/KafkaOffsetLagUtil.class */
public class KafkaOffsetLagUtil {
    private static final String OPTION_TOPIC_SHORT = "t";
    private static final String OPTION_TOPIC_LONG = "topics";
    private static final String OPTION_OLD_CONSUMER_SHORT = "o";
    private static final String OPTION_OLD_CONSUMER_LONG = "old-spout";
    private static final String OPTION_BOOTSTRAP_BROKERS_SHORT = "b";
    private static final String OPTION_BOOTSTRAP_BROKERS_LONG = "bootstrap-brokers";
    private static final String OPTION_GROUP_ID_SHORT = "g";
    private static final String OPTION_GROUP_ID_LONG = "groupid";
    private static final String OPTION_TOPIC_WILDCARD_SHORT = "w";
    private static final String OPTION_TOPIC_WILDCARD_LONG = "wildcard-topic";
    private static final String OPTION_PARTITIONS_SHORT = "p";
    private static final String OPTION_PARTITIONS_LONG = "partitions";
    private static final String OPTION_LEADERS_SHORT = "l";
    private static final String OPTION_LEADERS_LONG = "leaders";
    private static final String OPTION_ZK_SERVERS_SHORT = "z";
    private static final String OPTION_ZK_SERVERS_LONG = "zk-servers";
    private static final String OPTION_ZK_COMMITTED_NODE_SHORT = "n";
    private static final String OPTION_ZK_COMMITTED_NODE_LONG = "zk-node";
    private static final String OPTION_ZK_BROKERS_ROOT_SHORT = "r";
    private static final String OPTION_ZK_BROKERS_ROOT_LONG = "zk-brokers-root-node";
    private static final String OPTION_SECURITY_PROTOCOL_SHORT = "s";
    private static final String OPTION_SECURITY_PROTOCOL_LONG = "security-protocol";

    public static void main(String[] strArr) {
        List<KafkaOffsetLagResult> offsetLags;
        OldKafkaSpoutOffsetQuery oldKafkaSpoutOffsetQuery;
        try {
            Options buildOptions = buildOptions();
            CommandLine parse = new DefaultParser().parse(buildOptions, strArr);
            if (!parse.hasOption("topics")) {
                printUsageAndExit(buildOptions, "topics is required");
            }
            String optionValue = parse.getOptionValue(OPTION_SECURITY_PROTOCOL_LONG);
            if (parse.hasOption(OPTION_OLD_CONSUMER_LONG)) {
                if (parse.hasOption(OPTION_GROUP_ID_LONG) || parse.hasOption(OPTION_BOOTSTRAP_BROKERS_LONG)) {
                    printUsageAndExit(buildOptions, "groupid or bootstrap-brokers is not accepted with option old-spout");
                }
                if (!parse.hasOption(OPTION_ZK_SERVERS_LONG) || !parse.hasOption(OPTION_ZK_COMMITTED_NODE_LONG)) {
                    printUsageAndExit(buildOptions, "zk-servers and zk-node are required  with old-spout");
                }
                String[] split = parse.getOptionValue("topics").split(",");
                if (split != null && split.length > 1) {
                    printUsageAndExit(buildOptions, "Multiple topics not supported with option old-spout. Either a single topic or a wildcard string for matching topics is supported");
                }
                if (parse.hasOption(OPTION_ZK_BROKERS_ROOT_LONG)) {
                    if (parse.hasOption("partitions") || parse.hasOption(OPTION_LEADERS_LONG)) {
                        printUsageAndExit(buildOptions, "partitions or leaders is not accepted with zk-brokers-root-node");
                    }
                    oldKafkaSpoutOffsetQuery = new OldKafkaSpoutOffsetQuery(parse.getOptionValue("topics"), parse.getOptionValue(OPTION_ZK_SERVERS_LONG), parse.getOptionValue(OPTION_ZK_COMMITTED_NODE_LONG), parse.hasOption(OPTION_TOPIC_WILDCARD_LONG), parse.getOptionValue(OPTION_ZK_BROKERS_ROOT_LONG), optionValue);
                } else {
                    if (parse.hasOption(OPTION_TOPIC_WILDCARD_LONG)) {
                        printUsageAndExit(buildOptions, "wildcard-topic is not supported without zk-brokers-root-node");
                    }
                    if (!parse.hasOption("partitions") || !parse.hasOption(OPTION_LEADERS_LONG)) {
                        printUsageAndExit(buildOptions, "partitions and leaders are required if zk-brokers-root-node is not provided");
                    }
                    if (parse.getOptionValue("partitions").split(",").length != parse.getOptionValue(OPTION_LEADERS_LONG).split(",").length) {
                        printUsageAndExit(buildOptions, "partitions and leaders need to be of same size");
                    }
                    oldKafkaSpoutOffsetQuery = new OldKafkaSpoutOffsetQuery(parse.getOptionValue("topics"), parse.getOptionValue(OPTION_ZK_SERVERS_LONG), parse.getOptionValue(OPTION_ZK_COMMITTED_NODE_LONG), parse.getOptionValue("partitions"), parse.getOptionValue(OPTION_LEADERS_LONG), optionValue);
                }
                offsetLags = getOffsetLags(oldKafkaSpoutOffsetQuery);
            } else {
                for (String str : new String[]{OPTION_TOPIC_WILDCARD_LONG, "partitions", OPTION_LEADERS_LONG, OPTION_ZK_SERVERS_LONG, OPTION_ZK_COMMITTED_NODE_LONG, OPTION_ZK_BROKERS_ROOT_LONG}) {
                    if (parse.hasOption(str)) {
                        printUsageAndExit(buildOptions, str + " is not accepted without " + OPTION_OLD_CONSUMER_LONG);
                    }
                }
                if (!parse.hasOption(OPTION_GROUP_ID_LONG) || !parse.hasOption(OPTION_BOOTSTRAP_BROKERS_LONG)) {
                    printUsageAndExit(buildOptions, "groupid and bootstrap-brokers are required if old-spout is not specified");
                }
                offsetLags = getOffsetLags(new NewKafkaSpoutOffsetQuery(parse.getOptionValue("topics"), parse.getOptionValue(OPTION_BOOTSTRAP_BROKERS_LONG), parse.getOptionValue(OPTION_GROUP_ID_LONG), optionValue));
            }
            System.out.print(JSONValue.toJSONString(keyByTopicAndPartition(offsetLags)));
        } catch (Exception e) {
            System.out.print("Unable to get offset lags for kafka. Reason: ");
            e.printStackTrace(System.out);
        }
    }

    private static Map<String, Map<Integer, KafkaPartitionOffsetLag>> keyByTopicAndPartition(List<KafkaOffsetLagResult> list) {
        HashMap hashMap = new HashMap();
        for (KafkaOffsetLagResult kafkaOffsetLagResult : list) {
            String topic = kafkaOffsetLagResult.getTopic();
            Map map = (Map) hashMap.get(topic);
            if (map == null) {
                map = new HashMap();
                hashMap.put(topic, map);
            }
            map.put(Integer.valueOf(kafkaOffsetLagResult.getPartition()), new KafkaPartitionOffsetLag(kafkaOffsetLagResult.getConsumerCommittedOffset(), kafkaOffsetLagResult.getLogHeadOffset()));
        }
        return hashMap;
    }

    private static void printUsageAndExit(Options options, String str) {
        System.out.println(str);
        new HelpFormatter().printHelp("storm-kafka-monitor ", options);
        System.exit(1);
    }

    private static Options buildOptions() {
        Options options = new Options();
        options.addOption(OPTION_TOPIC_SHORT, "topics", true, "REQUIRED Topics (comma separated list) for fetching log head and spout committed offset");
        options.addOption(OPTION_OLD_CONSUMER_SHORT, OPTION_OLD_CONSUMER_LONG, false, "Whether request is for old spout");
        options.addOption(OPTION_BOOTSTRAP_BROKERS_SHORT, OPTION_BOOTSTRAP_BROKERS_LONG, true, "Comma separated list of bootstrap broker hosts for new consumer/spout e.g. hostname1:9092,hostname2:9092");
        options.addOption(OPTION_GROUP_ID_SHORT, OPTION_GROUP_ID_LONG, true, "Group id of consumer (applicable only for new kafka spout) ");
        options.addOption(OPTION_TOPIC_WILDCARD_SHORT, OPTION_TOPIC_WILDCARD_LONG, false, "Whether topic provided is a wildcard as supported by ZkHosts in old spout");
        options.addOption(OPTION_PARTITIONS_SHORT, "partitions", true, "Comma separated list of partitions corresponding to leaders for old spout with StaticHosts");
        options.addOption(OPTION_LEADERS_SHORT, OPTION_LEADERS_LONG, true, "Comma separated list of broker leaders corresponding to partitions for old spout with StaticHosts e.g. hostname1:9092,hostname2:9092");
        options.addOption(OPTION_ZK_SERVERS_SHORT, OPTION_ZK_SERVERS_LONG, true, "Comma separated list of zk servers for fetching spout committed offsets  and/or topic metadata for ZkHosts e.g hostname1:2181,hostname2:2181");
        options.addOption(OPTION_ZK_COMMITTED_NODE_SHORT, OPTION_ZK_COMMITTED_NODE_LONG, true, "Zk node prefix where old kafka spout stores the committed offsets without the topic and partition nodes");
        options.addOption(OPTION_ZK_BROKERS_ROOT_SHORT, OPTION_ZK_BROKERS_ROOT_LONG, true, "Zk node prefix where kafka stores broker information e.g. /brokers (applicable only for old kafka spout) ");
        options.addOption(OPTION_SECURITY_PROTOCOL_SHORT, OPTION_SECURITY_PROTOCOL_LONG, true, "Security protocol to connect to kafka");
        return options;
    }

    public static List<KafkaOffsetLagResult> getOffsetLags(NewKafkaSpoutOffsetQuery newKafkaSpoutOffsetQuery) {
        KafkaConsumer kafkaConsumer = null;
        ArrayList arrayList = new ArrayList();
        try {
            Properties properties = new Properties();
            properties.put("bootstrap.servers", newKafkaSpoutOffsetQuery.getBootStrapBrokers());
            properties.put(ConsumerConfig.GROUP_ID_CONFIG, newKafkaSpoutOffsetQuery.getConsumerGroupId());
            properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
            properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
            properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.shaded.common.serialization.StringDeserializer");
            properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.shaded.common.serialization.StringDeserializer");
            if (newKafkaSpoutOffsetQuery.getSecurityProtocol() != null) {
                properties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, newKafkaSpoutOffsetQuery.getSecurityProtocol());
            }
            ArrayList<TopicPartition> arrayList2 = new ArrayList();
            kafkaConsumer = new KafkaConsumer(properties);
            for (String str : newKafkaSpoutOffsetQuery.getTopics().split(",")) {
                List<PartitionInfo> partitionsFor = kafkaConsumer.partitionsFor(str);
                if (partitionsFor != null) {
                    for (PartitionInfo partitionInfo : partitionsFor) {
                        arrayList2.add(new TopicPartition(partitionInfo.topic(), partitionInfo.partition()));
                    }
                }
            }
            kafkaConsumer.assign(arrayList2);
            for (TopicPartition topicPartition : arrayList2) {
                OffsetAndMetadata committed = kafkaConsumer.committed(topicPartition);
                long offset = committed != null ? committed.offset() : -1L;
                kafkaConsumer.seekToEnd(toArrayList(topicPartition));
                arrayList.add(new KafkaOffsetLagResult(topicPartition.topic(), topicPartition.partition(), offset, kafkaConsumer.position(topicPartition)));
            }
            if (kafkaConsumer != null) {
                kafkaConsumer.close();
            }
            return arrayList;
        } catch (Throwable th) {
            if (kafkaConsumer != null) {
                kafkaConsumer.close();
            }
            throw th;
        }
    }

    private static Collection<TopicPartition> toArrayList(final TopicPartition topicPartition) {
        return new ArrayList<TopicPartition>(1) { // from class: org.apache.storm.kafka.monitor.KafkaOffsetLagUtil.1
            {
                add(topicPartition);
            }
        };
    }

    public static List<KafkaOffsetLagResult> getOffsetLags(OldKafkaSpoutOffsetQuery oldKafkaSpoutOffsetQuery) throws Exception {
        ArrayList arrayList = new ArrayList();
        Map<String, List<TopicPartition>> leadersAndTopicPartitions = getLeadersAndTopicPartitions(oldKafkaSpoutOffsetQuery);
        if (leadersAndTopicPartitions != null) {
            Map<String, Map<Integer, Long>> logHeadOffsets = getLogHeadOffsets(leadersAndTopicPartitions, oldKafkaSpoutOffsetQuery.getSecurityProtocol() != null ? oldKafkaSpoutOffsetQuery.getSecurityProtocol() : CommonClientConfigs.DEFAULT_SECURITY_PROTOCOL);
            HashMap hashMap = new HashMap();
            Iterator<Map.Entry<String, List<TopicPartition>>> it = leadersAndTopicPartitions.entrySet().iterator();
            while (it.hasNext()) {
                for (TopicPartition topicPartition : it.next().getValue()) {
                    if (!hashMap.containsKey(topicPartition.topic())) {
                        hashMap.put(topicPartition.topic(), new ArrayList());
                    }
                    ((List) hashMap.get(topicPartition.topic())).add(Integer.valueOf(topicPartition.partition()));
                }
            }
            Map<String, Map<Integer, Long>> oldConsumerOffsetsFromZk = getOldConsumerOffsetsFromZk(hashMap, oldKafkaSpoutOffsetQuery);
            for (Map.Entry<String, Map<Integer, Long>> entry : logHeadOffsets.entrySet()) {
                for (Map.Entry<Integer, Long> entry2 : entry.getValue().entrySet()) {
                    Map<Integer, Long> map = oldConsumerOffsetsFromZk.get(entry.getKey());
                    Long l = -1L;
                    if (map != null && map.containsKey(entry2.getKey())) {
                        l = map.get(entry2.getKey());
                    }
                    arrayList.add(new KafkaOffsetLagResult(entry.getKey(), entry2.getKey().intValue(), l.longValue(), entry2.getValue().longValue()));
                }
            }
        }
        return arrayList;
    }

    /* JADX WARN: Finally extract failed */
    private static Map<String, List<TopicPartition>> getLeadersAndTopicPartitions(OldKafkaSpoutOffsetQuery oldKafkaSpoutOffsetQuery) throws Exception {
        HashMap hashMap = new HashMap();
        if (oldKafkaSpoutOffsetQuery.getPartitions() != null) {
            String[] split = oldKafkaSpoutOffsetQuery.getPartitions().split(",");
            String[] split2 = oldKafkaSpoutOffsetQuery.getLeaders().split(",");
            for (int i = 0; i < split2.length; i++) {
                if (!hashMap.containsKey(split2[i])) {
                    hashMap.put(split2[i], new ArrayList());
                }
                ((List) hashMap.get(split2[i])).add(new TopicPartition(oldKafkaSpoutOffsetQuery.getTopic(), Integer.parseInt(split[i])));
            }
        } else {
            CuratorFramework curatorFramework = null;
            try {
                String brokersZkPath = oldKafkaSpoutOffsetQuery.getBrokersZkPath();
                if (!brokersZkPath.endsWith(ZKPaths.PATH_SEPARATOR)) {
                    brokersZkPath = brokersZkPath + ZKPaths.PATH_SEPARATOR;
                }
                String str = brokersZkPath + "topics";
                curatorFramework = CuratorFrameworkFactory.newClient(oldKafkaSpoutOffsetQuery.getZkServers(), Priority.INFO_INT, 15000, new RetryOneTime(1000));
                curatorFramework.start();
                ArrayList<String> arrayList = new ArrayList();
                if (oldKafkaSpoutOffsetQuery.isWildCardTopic()) {
                    for (String str2 : curatorFramework.getChildren().forPath(str)) {
                        if (str2.matches(oldKafkaSpoutOffsetQuery.getTopic())) {
                            arrayList.add(str2);
                        }
                    }
                } else {
                    arrayList.add(oldKafkaSpoutOffsetQuery.getTopic());
                }
                for (String str3 : arrayList) {
                    String str4 = str + ZKPaths.PATH_SEPARATOR + str3 + "/partitions";
                    List<String> forPath = curatorFramework.getChildren().forPath(str4);
                    for (int i2 = 0; i2 < forPath.size(); i2++) {
                        Map map = (Map) JSONValue.parse(new String(curatorFramework.getData().forPath(brokersZkPath + "ids/" + Integer.valueOf(((Number) ((Map) JSONValue.parse(new String(curatorFramework.getData().forPath(str4 + ZKPaths.PATH_SEPARATOR + i2 + "/state"), "UTF-8"))).get("leader")).intValue())), "UTF-8"));
                        String str5 = null;
                        Integer num = null;
                        if (map.containsKey("endpoints")) {
                            String[] split3 = ((String) ((JSONArray) map.get("endpoints")).get(0)).split(".+://")[1].split(KafkaPrincipal.SEPARATOR);
                            if (split3.length == 2) {
                                str5 = split3[0];
                                num = Integer.valueOf(Integer.parseInt(split3[1]));
                            }
                        } else {
                            str5 = (String) map.get("host");
                            num = Integer.valueOf(((Long) map.get(RtspHeaders.Values.PORT)).intValue());
                        }
                        String str6 = str5 + KafkaPrincipal.SEPARATOR + num;
                        if (!hashMap.containsKey(str6)) {
                            hashMap.put(str6, new ArrayList());
                        }
                        ((List) hashMap.get(str6)).add(new TopicPartition(str3, i2));
                    }
                }
                if (curatorFramework != null) {
                    curatorFramework.close();
                }
            } catch (Throwable th) {
                if (curatorFramework != null) {
                    curatorFramework.close();
                }
                throw th;
            }
        }
        return hashMap;
    }

    private static Map<String, Map<Integer, Long>> getLogHeadOffsets(Map<String, List<TopicPartition>> map, String str) {
        HashMap hashMap = new HashMap();
        if (map != null) {
            PartitionOffsetRequestInfo partitionOffsetRequestInfo = new PartitionOffsetRequestInfo(OffsetRequest.LatestTime(), 1);
            SimpleConsumer simpleConsumer = null;
            for (Map.Entry<String, List<TopicPartition>> entry : map.entrySet()) {
                try {
                    simpleConsumer = new SimpleConsumer(entry.getKey().split(KafkaPrincipal.SEPARATOR)[0], Integer.parseInt(entry.getKey().split(KafkaPrincipal.SEPARATOR)[1]), 10000, 65536, "LogHeadOffsetRequest", str);
                    HashMap hashMap2 = new HashMap();
                    for (TopicPartition topicPartition : entry.getValue()) {
                        hashMap2.put(new TopicAndPartition(topicPartition.topic(), topicPartition.partition()), partitionOffsetRequestInfo);
                        if (!hashMap.containsKey(topicPartition.topic())) {
                            hashMap.put(topicPartition.topic(), new HashMap());
                        }
                    }
                    OffsetResponse offsetsBefore = simpleConsumer.getOffsetsBefore(new kafka.shaded.javaapi.OffsetRequest(hashMap2, OffsetRequest.CurrentVersion(), "LogHeadOffsetRequest"));
                    for (TopicPartition topicPartition2 : entry.getValue()) {
                        ((Map) hashMap.get(topicPartition2.topic())).put(Integer.valueOf(topicPartition2.partition()), Long.valueOf(offsetsBefore.offsets(topicPartition2.topic(), topicPartition2.partition())[0]));
                    }
                    if (simpleConsumer != null) {
                        simpleConsumer.close();
                    }
                } catch (Throwable th) {
                    if (simpleConsumer != null) {
                        simpleConsumer.close();
                    }
                    throw th;
                }
            }
        }
        return hashMap;
    }

    private static Map<String, Map<Integer, Long>> getOldConsumerOffsetsFromZk(Map<String, List<Integer>> map, OldKafkaSpoutOffsetQuery oldKafkaSpoutOffsetQuery) throws Exception {
        HashMap hashMap = new HashMap();
        CuratorFramework newClient = CuratorFrameworkFactory.newClient(oldKafkaSpoutOffsetQuery.getZkServers(), Priority.INFO_INT, 15000, new RetryOneTime(1000));
        newClient.start();
        String zkPath = oldKafkaSpoutOffsetQuery.getZkPath();
        if (zkPath.endsWith(ZKPaths.PATH_SEPARATOR)) {
            zkPath = zkPath.substring(0, zkPath.length() - 1);
        }
        if (newClient.checkExists().forPath(zkPath) == null) {
            throw new IllegalArgumentException("zk-node '" + zkPath + "' dose not exists.");
        }
        if (map != null) {
            try {
                for (Map.Entry<String, List<Integer>> entry : map.entrySet()) {
                    HashMap hashMap2 = new HashMap();
                    for (Integer num : entry.getValue()) {
                        String str = zkPath + ZKPaths.PATH_SEPARATOR + (oldKafkaSpoutOffsetQuery.isWildCardTopic() ? entry.getKey() + ZKPaths.PATH_SEPARATOR : "") + "partition_" + num;
                        if (newClient.checkExists().forPath(str) != null) {
                            hashMap2.put(num, (Long) ((Map) JSONValue.parse(new String(newClient.getData().forPath(str), "UTF-8"))).get("offset"));
                        }
                    }
                    hashMap.put(entry.getKey(), hashMap2);
                }
            } finally {
                if (newClient != null) {
                    newClient.close();
                }
            }
        }
        return hashMap;
    }
}
