package org.apache.twill.internal.kafka.client;

import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.cache.RemovalListener;
import com.google.common.cache.RemovalNotification;
import com.google.common.collect.AbstractIterator;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.Futures;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import kafka.api.FetchRequestBuilder;
import kafka.api.OffsetRequest;
import kafka.api.PartitionOffsetRequestInfo;
import kafka.common.ErrorMapping;
import kafka.common.TopicAndPartition;
import kafka.javaapi.FetchResponse;
import kafka.javaapi.OffsetResponse;
import kafka.javaapi.consumer.SimpleConsumer;
import kafka.javaapi.message.ByteBufferMessageSet;
import kafka.message.MessageAndOffset;
import org.apache.twill.common.Cancellable;
import org.apache.twill.common.Threads;
import org.apache.twill.kafka.client.BrokerInfo;
import org.apache.twill.kafka.client.BrokerService;
import org.apache.twill.kafka.client.FetchedMessage;
import org.apache.twill.kafka.client.KafkaConsumer;
import org.apache.twill.kafka.client.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:temp/org/apache/twill/internal/kafka/client/SimpleKafkaConsumer.class */
public final class SimpleKafkaConsumer implements KafkaConsumer {
    private static final Logger LOG = LoggerFactory.getLogger(SimpleKafkaConsumer.class);
    private static final int FETCH_SIZE = 1048576;
    private static final int SO_TIMEOUT = 5000;
    private static final int MAX_WAIT = 1000;
    private static final long CONSUMER_EXPIRE_MINUTES = 1;
    private static final long CONSUMER_FAILURE_RETRY_INTERVAL = 2000;
    private static final long EMPTY_FETCH_WAIT = 500;
    private final BrokerService brokerService;
    private final LoadingCache<BrokerInfo, SimpleConsumer> consumers = CacheBuilder.newBuilder().expireAfterAccess(1, TimeUnit.MINUTES).removalListener(createRemovalListener()).build(createConsumerLoader());
    private final BlockingQueue<Cancellable> consumerCancels = new LinkedBlockingQueue();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:temp/org/apache/twill/internal/kafka/client/SimpleKafkaConsumer$ConsumerThread.class */
    public final class ConsumerThread extends Thread {
        private final TopicPartition topicPart;
        private final long startOffset;
        private final KafkaConsumer.MessageCallback callback;
        private final BasicFetchedMessage fetchedMessage;
        private volatile boolean running;

        private ConsumerThread(TopicPartition topicPartition, long j, KafkaConsumer.MessageCallback messageCallback) {
            super(String.format("Kafka-Consumer-%s-%d", topicPartition.getTopic(), Integer.valueOf(topicPartition.getPartition())));
            this.topicPart = topicPartition;
            this.startOffset = j;
            this.callback = messageCallback;
            this.running = true;
            this.fetchedMessage = new BasicFetchedMessage(topicPartition);
        }

        /* JADX WARN: Can't wrap try/catch for region: R(11:4|(2:6|(6:42|43|44|45|47|24)(1:8))(1:51)|9|(1:11)|12|13|14|16|(5:28|29|(1:31)|32|33)(3:18|19|(3:25|26|27)(3:21|22|23))|24|2) */
        /* JADX WARN: Code restructure failed: missing block: B:34:0x00fe, code lost:
        
            r12 = move-exception;
         */
        /* JADX WARN: Code restructure failed: missing block: B:36:0x0104, code lost:
        
            if (r6.running != false) goto L32;
         */
        /* JADX WARN: Code restructure failed: missing block: B:39:0x011f, code lost:
        
            r6.this$0.consumers.refresh(r8.getKey());
            r8 = null;
         */
        /* JADX WARN: Code restructure failed: missing block: B:41:0x010f, code lost:
        
            org.apache.twill.internal.kafka.client.SimpleKafkaConsumer.LOG.info("Exception when fetching message on {}.", r6.topicPart, r12);
         */
        @Override // java.lang.Thread, java.lang.Runnable
        /*
            Code decompiled incorrectly, please refer to instructions dump.
            To view partially-correct add '--show-bad-code' argument
        */
        public void run() {
            /*
                Method dump skipped, instructions count: 342
                To view this dump add '--comments-level debug' option
            */
            throw new UnsupportedOperationException("Method not decompiled: org.apache.twill.internal.kafka.client.SimpleKafkaConsumer.ConsumerThread.run():void");
        }

        public void terminate() {
            SimpleKafkaConsumer.LOG.info("Terminate requested {}", getName());
            this.running = false;
            interrupt();
        }

        private Map.Entry<BrokerInfo, SimpleConsumer> getConsumerEntry() {
            BrokerInfo leader = SimpleKafkaConsumer.this.brokerService.getLeader(this.topicPart.getTopic(), this.topicPart.getPartition());
            if (leader == null) {
                return null;
            }
            return Maps.immutableEntry(leader, SimpleKafkaConsumer.this.consumers.getUnchecked(leader));
        }

        private FetchResponse fetchMessages(SimpleConsumer simpleConsumer, long j) {
            return simpleConsumer.fetch(new FetchRequestBuilder().clientId(simpleConsumer.clientId()).addFetch(this.topicPart.getTopic(), this.topicPart.getPartition(), j, 1048576).maxWait(1000).build());
        }

        private boolean sleepIfEmpty(ByteBufferMessageSet byteBufferMessageSet) {
            if (!Iterables.isEmpty(byteBufferMessageSet)) {
                return false;
            }
            SimpleKafkaConsumer.LOG.trace("No message fetched. Sleep for {} ms before next fetch.", (Object) 500L);
            try {
                TimeUnit.MILLISECONDS.sleep(500L);
                return true;
            } catch (InterruptedException e) {
                return true;
            }
        }

        private void invokeCallback(ByteBufferMessageSet byteBufferMessageSet, AtomicLong atomicLong) {
            long j = atomicLong.get();
            try {
                this.callback.onReceived(createFetchedMessages(byteBufferMessageSet, atomicLong));
            } catch (Throwable th) {
                SimpleKafkaConsumer.LOG.error("Callback throws exception. Retry from offset {} for {}", new Object[]{Long.valueOf(this.startOffset), this.topicPart, th});
                atomicLong.set(j);
            }
        }

        private Iterator<FetchedMessage> createFetchedMessages(ByteBufferMessageSet byteBufferMessageSet, final AtomicLong atomicLong) {
            final Iterator it2 = byteBufferMessageSet.iterator();
            return new AbstractIterator<FetchedMessage>() { // from class: org.apache.twill.internal.kafka.client.SimpleKafkaConsumer.ConsumerThread.1
                /* JADX INFO: Access modifiers changed from: protected */
                /* JADX WARN: Can't rename method to resolve collision */
                @Override // com.google.common.collect.AbstractIterator
                public FetchedMessage computeNext() {
                    while (it2.hasNext()) {
                        MessageAndOffset messageAndOffset = (MessageAndOffset) it2.next();
                        long offset = messageAndOffset.offset();
                        if (offset >= atomicLong.get()) {
                            atomicLong.set(messageAndOffset.nextOffset());
                            ConsumerThread.this.fetchedMessage.setPayload(messageAndOffset.message().payload());
                            ConsumerThread.this.fetchedMessage.setNextOffset(atomicLong.get());
                            return ConsumerThread.this.fetchedMessage;
                        }
                        SimpleKafkaConsumer.LOG.trace("Received old offset {}, expecting {} on {}. Message Ignored.", new Object[]{Long.valueOf(offset), Long.valueOf(atomicLong.get()), ConsumerThread.this.topicPart});
                    }
                    return endOfData();
                }
            };
        }
    }

    /* loaded from: input_file:temp/org/apache/twill/internal/kafka/client/SimpleKafkaConsumer$SimplePreparer.class */
    private final class SimplePreparer implements KafkaConsumer.Preparer {
        private final Map<TopicPartition, Long> requests;
        private final ThreadFactory threadFactory;

        private SimplePreparer() {
            this.requests = Maps.newHashMap();
            this.threadFactory = Threads.createDaemonThreadFactory("message-callback-%d");
        }

        @Override // org.apache.twill.kafka.client.KafkaConsumer.Preparer
        public KafkaConsumer.Preparer add(String str, int i, long j) {
            this.requests.put(new TopicPartition(str, i), Long.valueOf(j));
            return this;
        }

        @Override // org.apache.twill.kafka.client.KafkaConsumer.Preparer
        public KafkaConsumer.Preparer addFromBeginning(String str, int i) {
            this.requests.put(new TopicPartition(str, i), Long.valueOf(OffsetRequest.EarliestTime()));
            return this;
        }

        @Override // org.apache.twill.kafka.client.KafkaConsumer.Preparer
        public KafkaConsumer.Preparer addLatest(String str, int i) {
            this.requests.put(new TopicPartition(str, i), Long.valueOf(OffsetRequest.LatestTime()));
            return this;
        }

        @Override // org.apache.twill.kafka.client.KafkaConsumer.Preparer
        public Cancellable consume(KafkaConsumer.MessageCallback messageCallback) {
            final ExecutorService newSingleThreadExecutor = Executors.newSingleThreadExecutor(this.threadFactory);
            final ArrayList newArrayList = Lists.newArrayList();
            final AtomicBoolean atomicBoolean = new AtomicBoolean();
            Cancellable cancellable = new Cancellable() { // from class: org.apache.twill.internal.kafka.client.SimpleKafkaConsumer.SimplePreparer.1
                @Override // org.apache.twill.common.Cancellable
                public void cancel() {
                    if (atomicBoolean.compareAndSet(false, true)) {
                        SimpleKafkaConsumer.this.consumerCancels.remove(this);
                        SimpleKafkaConsumer.LOG.info("Requesting stop of all consumer threads.");
                        Iterator it2 = newArrayList.iterator();
                        while (it2.hasNext()) {
                            ((ConsumerThread) it2.next()).terminate();
                        }
                        SimpleKafkaConsumer.LOG.info("Wait for all consumer threads to stop.");
                        Iterator it3 = newArrayList.iterator();
                        while (it3.hasNext()) {
                            try {
                                ((ConsumerThread) it3.next()).join();
                            } catch (InterruptedException e) {
                                SimpleKafkaConsumer.LOG.warn("Interrupted exception while waiting for thread to complete.", (Throwable) e);
                            }
                        }
                        SimpleKafkaConsumer.LOG.info("All consumer threads stopped.");
                        newSingleThreadExecutor.shutdown();
                    }
                }
            };
            KafkaConsumer.MessageCallback wrapCallback = wrapCallback(messageCallback, newSingleThreadExecutor, cancellable);
            for (Map.Entry<TopicPartition, Long> entry : this.requests.entrySet()) {
                ConsumerThread consumerThread = new ConsumerThread(entry.getKey(), entry.getValue().longValue(), wrapCallback);
                consumerThread.setDaemon(true);
                consumerThread.start();
                newArrayList.add(consumerThread);
            }
            SimpleKafkaConsumer.this.consumerCancels.add(cancellable);
            return cancellable;
        }

        private KafkaConsumer.MessageCallback wrapCallback(final KafkaConsumer.MessageCallback messageCallback, final ExecutorService executorService, final Cancellable cancellable) {
            final AtomicBoolean atomicBoolean = new AtomicBoolean();
            return new KafkaConsumer.MessageCallback() { // from class: org.apache.twill.internal.kafka.client.SimpleKafkaConsumer.SimplePreparer.2
                @Override // org.apache.twill.kafka.client.KafkaConsumer.MessageCallback
                public void onReceived(final Iterator<FetchedMessage> it2) {
                    if (atomicBoolean.get()) {
                        return;
                    }
                    Futures.getUnchecked(executorService.submit(new Runnable() { // from class: org.apache.twill.internal.kafka.client.SimpleKafkaConsumer.SimplePreparer.2.1
                        @Override // java.lang.Runnable
                        public void run() {
                            if (atomicBoolean.get()) {
                                return;
                            }
                            messageCallback.onReceived(it2);
                        }
                    }));
                }

                @Override // org.apache.twill.kafka.client.KafkaConsumer.MessageCallback
                public void finished() {
                    if (atomicBoolean.compareAndSet(false, true)) {
                        Futures.getUnchecked(executorService.submit(new Runnable() { // from class: org.apache.twill.internal.kafka.client.SimpleKafkaConsumer.SimplePreparer.2.2
                            @Override // java.lang.Runnable
                            public void run() {
                                messageCallback.finished();
                                cancellable.cancel();
                            }
                        }));
                    }
                }
            };
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public SimpleKafkaConsumer(BrokerService brokerService) {
        this.brokerService = brokerService;
    }

    @Override // org.apache.twill.kafka.client.KafkaConsumer
    public KafkaConsumer.Preparer prepare() {
        return new SimplePreparer();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void stop() {
        LOG.info("Stopping Kafka consumer");
        LinkedList newLinkedList = Lists.newLinkedList();
        this.consumerCancels.drainTo(newLinkedList);
        Iterator it2 = newLinkedList.iterator();
        while (it2.hasNext()) {
            ((Cancellable) it2.next()).cancel();
        }
        this.consumers.invalidateAll();
        LOG.info("Kafka Consumer stopped");
    }

    private CacheLoader<BrokerInfo, SimpleConsumer> createConsumerLoader() {
        return new CacheLoader<BrokerInfo, SimpleConsumer>() { // from class: org.apache.twill.internal.kafka.client.SimpleKafkaConsumer.1
            @Override // com.google.common.cache.CacheLoader
            public SimpleConsumer load(BrokerInfo brokerInfo) throws Exception {
                return new SimpleConsumer(brokerInfo.getHost(), brokerInfo.getPort(), 5000, 1048576, "simple-kafka-client");
            }
        };
    }

    private RemovalListener<BrokerInfo, SimpleConsumer> createRemovalListener() {
        return new RemovalListener<BrokerInfo, SimpleConsumer>() { // from class: org.apache.twill.internal.kafka.client.SimpleKafkaConsumer.2
            @Override // com.google.common.cache.RemovalListener
            public void onRemoval(RemovalNotification<BrokerInfo, SimpleConsumer> removalNotification) {
                SimpleConsumer value = removalNotification.getValue();
                if (value != null) {
                    value.close();
                }
            }
        };
    }

    /* JADX INFO: Access modifiers changed from: private */
    public long getLastOffset(TopicPartition topicPartition, long j) {
        BrokerInfo leader = this.brokerService.getLeader(topicPartition.getTopic(), topicPartition.getPartition());
        SimpleConsumer unchecked = leader == null ? null : this.consumers.getUnchecked(leader);
        if (unchecked == null) {
            LOG.warn("Failed to talk to any broker. Default offset to 0 for {}", topicPartition);
            return 0L;
        }
        OffsetResponse offsetsBefore = unchecked.getOffsetsBefore(new kafka.javaapi.OffsetRequest(ImmutableMap.of(new TopicAndPartition(topicPartition.getTopic(), topicPartition.getPartition()), new PartitionOffsetRequestInfo(j, 1)), OffsetRequest.CurrentVersion(), unchecked.clientId()));
        long[] offsets = offsetsBefore.hasError() ? null : offsetsBefore.offsets(topicPartition.getTopic(), topicPartition.getPartition());
        if (offsets != null && offsets.length > 0) {
            LOG.debug("Offset {} fetched for {} with timestamp {}.", new Object[]{Long.valueOf(offsets[0]), topicPartition, Long.valueOf(j)});
            return offsets[0];
        }
        short errorCode = offsetsBefore.errorCode(topicPartition.getTopic(), topicPartition.getPartition());
        if (errorCode == ErrorMapping.UnknownTopicOrPartitionCode()) {
            return 0L;
        }
        this.consumers.refresh(leader);
        LOG.warn("Failed to fetch offset for {} with timestamp {}. Error: {}. Default offset to 0.", new Object[]{topicPartition, Long.valueOf(j), Short.valueOf(errorCode)});
        return 0L;
    }
}
