package org.apache.storm.starter.trident;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.fs.shell.Test;
import org.apache.hadoop.registry.client.api.RegistryConstants;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.storm.kafka.spout.KafkaSpoutConfig;
import org.apache.storm.kafka.spout.KafkaSpoutRetryExponentialBackoff;
import org.apache.storm.kafka.spout.KafkaSpoutRetryService;
import org.apache.storm.kafka.spout.KafkaSpoutStreams;
import org.apache.storm.kafka.spout.KafkaSpoutStreamsNamedTopics;
import org.apache.storm.kafka.spout.KafkaSpoutTupleBuilder;
import org.apache.storm.kafka.spout.KafkaSpoutTuplesBuilder;
import org.apache.storm.kafka.spout.KafkaSpoutTuplesBuilderNamedTopics;
import org.apache.storm.kafka.spout.trident.KafkaManagerTridentSpout;
import org.apache.storm.kafka.spout.trident.KafkaOpaquePartitionedTridentSpout;
import org.apache.storm.starter.trident.DebugMemoryMapState;
import org.apache.storm.trident.Stream;
import org.apache.storm.trident.TridentState;
import org.apache.storm.trident.TridentTopology;
import org.apache.storm.trident.operation.builtin.Count;
import org.apache.storm.trident.operation.builtin.Debug;
import org.apache.storm.trident.testing.Split;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
import org.joni.constants.AsmConstants;

/* loaded from: input_file:org/apache/storm/starter/trident/TridentKafkaClientWordCount.class */
public class TridentKafkaClientWordCount extends TridentKafkaWordCount {

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/storm/starter/trident/TridentKafkaClientWordCount$TopicTestTupleBuilder.class */
    public class TopicTestTupleBuilder<K, V> extends KafkaSpoutTupleBuilder<K, V> {
        public TopicTestTupleBuilder(String... strArr) {
            super(strArr);
        }

        public List<Object> buildTuple(ConsumerRecord<K, V> consumerRecord) {
            return new Values(new Object[]{consumerRecord.value()});
        }
    }

    public TridentKafkaClientWordCount() {
        this(null, null);
    }

    public TridentKafkaClientWordCount(String str, String str2) {
        super(str, str2);
    }

    @Override // org.apache.storm.starter.trident.TridentKafkaWordCount
    protected TridentState addTridentState(TridentTopology tridentTopology) {
        Stream parallelismHint = tridentTopology.newStream("spout1", createOpaqueKafkaSpoutNew()).parallelismHint(1);
        return parallelismHint.each(parallelismHint.getOutputFields(), new Debug(true)).each(new Fields(new String[]{AsmConstants.STR}), new Split(), new Fields(new String[]{"word"})).groupBy(new Fields(new String[]{"word"})).persistentAggregate(new DebugMemoryMapState.Factory(), new Count(), new Fields(new String[]{org.apache.hadoop.fs.shell.Count.NAME}));
    }

    private KafkaOpaquePartitionedTridentSpout<String, String> createOpaqueKafkaSpoutNew() {
        return new KafkaOpaquePartitionedTridentSpout<>(getKafkaManager());
    }

    private KafkaManagerTridentSpout<String, String> getKafkaManager() {
        return new KafkaManagerTridentSpout<>(getKafkaSpoutConfig(getKafkaSpoutStreams()));
    }

    private KafkaSpoutConfig<String, String> getKafkaSpoutConfig(KafkaSpoutStreams kafkaSpoutStreams) {
        return new KafkaSpoutConfig.Builder(getKafkaConsumerProps(), kafkaSpoutStreams, getTuplesBuilder(), getRetryService()).setOffsetCommitPeriodMs(10000L).setFirstPollOffsetStrategy(KafkaSpoutConfig.FirstPollOffsetStrategy.EARLIEST).setMaxUncommittedOffsets(250).build();
    }

    private Map<String, Object> getKafkaConsumerProps() {
        HashMap hashMap = new HashMap();
        hashMap.put("bootstrap.servers", "127.0.0.1:9092");
        hashMap.put(ConsumerConfig.GROUP_ID_CONFIG, "kafkaSpoutTestGroup");
        hashMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        hashMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        return hashMap;
    }

    private KafkaSpoutTuplesBuilder<String, String> getTuplesBuilder() {
        return new KafkaSpoutTuplesBuilderNamedTopics.Builder(new KafkaSpoutTupleBuilder[]{new TopicTestTupleBuilder(Test.NAME)}).build();
    }

    private static KafkaSpoutRetryService getRetryService() {
        return new KafkaSpoutRetryExponentialBackoff(getTimeInterval(500L, TimeUnit.MICROSECONDS), KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(2L), Integer.MAX_VALUE, KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(10L));
    }

    private static KafkaSpoutRetryExponentialBackoff.TimeInterval getTimeInterval(long j, TimeUnit timeUnit) {
        return new KafkaSpoutRetryExponentialBackoff.TimeInterval(j, timeUnit);
    }

    private KafkaSpoutStreams getKafkaSpoutStreams() {
        return new KafkaSpoutStreamsNamedTopics.Builder(new Fields(new String[]{AsmConstants.STR}), new String[]{Test.NAME}).build();
    }

    public static void main(String[] strArr) throws Exception {
        runMain(strArr, new TridentKafkaClientWordCount(RegistryConstants.DEFAULT_REGISTRY_ZK_QUORUM, "localhost:9092"));
    }
}
