package storm.kafka;

import java.util.Comparator;
import java.util.HashSet;
import java.util.Map;
import java.util.PriorityQueue;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;

/* loaded from: input_file:storm/kafka/ExponentialBackoffMsgRetryManager.class */
public class ExponentialBackoffMsgRetryManager implements FailedMsgRetryManager {
    private final long retryInitialDelayMs;
    private final double retryDelayMultiplier;
    private final long retryDelayMaxMs;
    private Queue<MessageRetryRecord> waiting = new PriorityQueue(11, new RetryTimeComparator());
    private Map<Long, MessageRetryRecord> records = new ConcurrentHashMap();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:storm/kafka/ExponentialBackoffMsgRetryManager$MessageRetryRecord.class */
    public class MessageRetryRecord {
        private final long offset;
        private final int retryNum;
        private final long retryTimeUTC;

        public MessageRetryRecord(ExponentialBackoffMsgRetryManager exponentialBackoffMsgRetryManager, long j) {
            this(j, 1);
        }

        private MessageRetryRecord(long j, int i) {
            this.offset = j;
            this.retryNum = i;
            this.retryTimeUTC = System.currentTimeMillis() + calculateRetryDelay();
        }

        public MessageRetryRecord createNextRetryRecord() {
            return new MessageRetryRecord(this.offset, this.retryNum + 1);
        }

        private long calculateRetryDelay() {
            double pow = ExponentialBackoffMsgRetryManager.this.retryInitialDelayMs * Math.pow(ExponentialBackoffMsgRetryManager.this.retryDelayMultiplier, this.retryNum - 1);
            Long l = Long.MAX_VALUE;
            return Math.min(pow >= l.doubleValue() ? l.longValue() : (long) pow, ExponentialBackoffMsgRetryManager.this.retryDelayMaxMs);
        }

        public boolean equals(Object obj) {
            return (obj instanceof MessageRetryRecord) && this.offset == ((MessageRetryRecord) obj).offset;
        }

        public int hashCode() {
            return Long.valueOf(this.offset).hashCode();
        }
    }

    /* loaded from: input_file:storm/kafka/ExponentialBackoffMsgRetryManager$RetryTimeComparator.class */
    class RetryTimeComparator implements Comparator<MessageRetryRecord> {
        RetryTimeComparator() {
        }

        @Override // java.util.Comparator
        public int compare(MessageRetryRecord messageRetryRecord, MessageRetryRecord messageRetryRecord2) {
            return Long.valueOf(messageRetryRecord.retryTimeUTC).compareTo(Long.valueOf(messageRetryRecord2.retryTimeUTC));
        }

        @Override // java.util.Comparator
        public boolean equals(Object obj) {
            return false;
        }
    }

    public ExponentialBackoffMsgRetryManager(long j, double d, long j2) {
        this.retryInitialDelayMs = j;
        this.retryDelayMultiplier = d;
        this.retryDelayMaxMs = j2;
    }

    @Override // storm.kafka.FailedMsgRetryManager
    public void failed(Long l) {
        MessageRetryRecord messageRetryRecord = this.records.get(l);
        MessageRetryRecord messageRetryRecord2 = messageRetryRecord == null ? new MessageRetryRecord(this, l.longValue()) : messageRetryRecord.createNextRetryRecord();
        this.records.put(l, messageRetryRecord2);
        this.waiting.add(messageRetryRecord2);
    }

    @Override // storm.kafka.FailedMsgRetryManager
    public void acked(Long l) {
        MessageRetryRecord remove = this.records.remove(l);
        if (remove != null) {
            this.waiting.remove(remove);
        }
    }

    @Override // storm.kafka.FailedMsgRetryManager
    public void retryStarted(Long l) {
        MessageRetryRecord messageRetryRecord = this.records.get(l);
        if (messageRetryRecord == null || !this.waiting.contains(messageRetryRecord)) {
            throw new IllegalStateException("cannot retry a message that has not failed");
        }
        this.waiting.remove(messageRetryRecord);
    }

    @Override // storm.kafka.FailedMsgRetryManager
    public Long nextFailedMessageToRetry() {
        if (this.waiting.size() <= 0) {
            return null;
        }
        MessageRetryRecord peek = this.waiting.peek();
        if (System.currentTimeMillis() < peek.retryTimeUTC) {
            return null;
        }
        if (this.records.containsKey(Long.valueOf(peek.offset))) {
            return Long.valueOf(peek.offset);
        }
        this.waiting.remove(peek);
        return nextFailedMessageToRetry();
    }

    @Override // storm.kafka.FailedMsgRetryManager
    public boolean shouldRetryMsg(Long l) {
        MessageRetryRecord messageRetryRecord = this.records.get(l);
        return messageRetryRecord != null && this.waiting.contains(messageRetryRecord) && System.currentTimeMillis() >= messageRetryRecord.retryTimeUTC;
    }

    @Override // storm.kafka.FailedMsgRetryManager
    public Set<Long> clearInvalidMessages(Long l) {
        MessageRetryRecord remove;
        HashSet hashSet = new HashSet();
        for (Long l2 : this.records.keySet()) {
            if (l2.longValue() < l.longValue() && (remove = this.records.remove(l2)) != null) {
                this.waiting.remove(remove);
                hashSet.add(l2);
            }
        }
        return hashSet;
    }
}
